Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ for a worked example.
- Make orjson optional in JSON serdes with stdlib json fallback (#2281)
- Handle non-http errors during retries (#2292)
- Use TLS certification verification with Hashicorp Vault (#2294)
- Return early from wakeable `Producer.poll()` once producer events are
served, instead of waiting for the full timeout.


confluent-kafka-python v2.15.0rc1 is based on librdkafka v2.15.0-RC1, see the
Expand Down
3 changes: 3 additions & 0 deletions src/confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ static int Producer_poll0(Handle *self, int tmout) {
break;
}
r += chunk_result; /* Accumulate events processed */
if (chunk_result > 0) {
break;
}

chunk_count++;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,21 @@ def delivery_callback(err, msg):
# Produce a test message with delivery callback
producer.produce(topic, value=b'test-message', on_delivery=delivery_callback)

# Poll with wakeable pattern - should trigger delivery callback
# Poll with wakeable pattern - should trigger delivery callback and return
# when that event is served, rather than waiting for the whole timeout.
poll_timeout = 5.0
start = time.time()
events_handled = producer.poll(timeout=2.0)
events_handled = producer.poll(timeout=poll_timeout)
elapsed = time.time() - start

# Verify delivery callback was called
assert len(delivery_called) > 0, "Expected delivery callback to be called"
assert len(delivery_errors) == 0, f"Unexpected delivery errors: {delivery_errors}"
assert events_handled >= 0, "poll() should return non-negative int"
# Allow time for delivery callback, but should complete reasonably quickly
assert elapsed < 2.5, f"Poll took {elapsed:.2f}s, expected < 2.5s"
assert elapsed < poll_timeout - 1.0, (
f"Poll took {elapsed:.2f}s after delivery, expected it to return "
f"well before the {poll_timeout:.1f}s timeout"
)

# Flush to ensure message is committed to Kafka
producer.flush(timeout=1.0)
Expand Down