diff --git a/CHANGELOG.md b/CHANGELOG.md index e99c5289f..38ca02f3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ for a worked example. - Make orjson optional in JSON serdes with stdlib json fallback (#2281) - Handle non-http errors during retries (#2292) - Use TLS certification verification with Hashicorp Vault (#2294) +- Return early from wakeable `Producer.poll()` once producer events are + served, instead of waiting for the full timeout. confluent-kafka-python v2.15.0rc1 is based on librdkafka v2.15.0-RC1, see the diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index 0bfc810b8..63620b2cf 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -395,6 +395,9 @@ static int Producer_poll0(Handle *self, int tmout) { break; } r += chunk_result; /* Accumulate events processed */ + if (chunk_result > 0) { + break; + } chunk_count++; diff --git a/tests/integration/producer/test_producer_wakeable_poll_flush.py b/tests/integration/producer/test_producer_wakeable_poll_flush.py index 49d1d47d0..250ced042 100644 --- a/tests/integration/producer/test_producer_wakeable_poll_flush.py +++ b/tests/integration/producer/test_producer_wakeable_poll_flush.py @@ -80,17 +80,21 @@ def delivery_callback(err, msg): # Produce a test message with delivery callback producer.produce(topic, value=b'test-message', on_delivery=delivery_callback) - # Poll with wakeable pattern - should trigger delivery callback + # Poll with wakeable pattern - should trigger delivery callback and return + # when that event is served, rather than waiting for the whole timeout. + poll_timeout = 5.0 start = time.time() - events_handled = producer.poll(timeout=2.0) + events_handled = producer.poll(timeout=poll_timeout) elapsed = time.time() - start # Verify delivery callback was called assert len(delivery_called) > 0, "Expected delivery callback to be called" assert len(delivery_errors) == 0, f"Unexpected delivery errors: {delivery_errors}" assert events_handled >= 0, "poll() should return non-negative int" - # Allow time for delivery callback, but should complete reasonably quickly - assert elapsed < 2.5, f"Poll took {elapsed:.2f}s, expected < 2.5s" + assert elapsed < poll_timeout - 1.0, ( + f"Poll took {elapsed:.2f}s after delivery, expected it to return " + f"well before the {poll_timeout:.1f}s timeout" + ) # Flush to ensure message is committed to Kafka producer.flush(timeout=1.0)