KIP-932: Add Share Consumer soak tests + high-throughput perf variant#2230
KIP-932: Add Share Consumer soak tests + high-throughput perf variant#2230Ankith L (Ankith-Confluent) wants to merge 24 commits into
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
|
Ankith L (@Ankith-Confluent) |
a980019 to
b5cb017
Compare
78301f5 to
8e9dcd0
Compare
|
2978328 to
2153fe2
Compare
d600e92 to
5e41108
Compare
770ec69 to
449ec68
Compare
|
2ad627d to
4f8327e
Compare
65ff773 to
067298d
Compare
|
… sync and async commits
…ure compatibility with librdkafka v2.15.0-RC1
c2aad2f to
b219457
Compare
…ging in commit_sync
…consistency in logging
Kaushik Raina (k-raina)
left a comment
There was a problem hiding this comment.
Is there scope to common out code between soakclient.py and soakclient_perf.py?
|
|
||
| if enable_share: | ||
| if not HAS_SHARE_CONSUMER: | ||
| raise RuntimeError("ShareConsumer requested but not available in this " "confluent_kafka build.") |
There was a problem hiding this comment.
In case share consumer fails, should producer also shut down?
| self.share_err_cnt += 1 | ||
| self.incr_counter("consumer.error", 1) | ||
| else: | ||
| self.share_consumer.commit_async() |
There was a problem hiding this comment.
Should we plan to add ack callback to catch errors?
There was a problem hiding this comment.
Yes we can do that, when I implemented this before, that mechanism was not implemented yet in Librdkafka side.
Will add.
| if hw > 0: | ||
| if msg.offset() <= hw: | ||
| self.logger.warning( | ||
| "share: Old or duplicate message {} " |
There was a problem hiding this comment.
Does this logic prints "Old or duplicate message" for delivery retries?
There was a problem hiding this comment.
yes, I will change the the log text.
| self.msg_err_cnt += 1 | ||
| self.incr_counter("consumer.msgerr", 1) | ||
|
|
||
| self.msg_cnt += 1 |
There was a problem hiding this comment.
Do we want to count corrupt messages also?
There was a problem hiding this comment.
I think we should not count them because we already have another metric counting the erros.
So it would pollute the readings.
| self.share_err_cnt += 1 | ||
| self.incr_counter("consumer.error", 1) | ||
|
|
||
| self.share_consumer.close() |
There was a problem hiding this comment.
nit: should close be moved into finally?
| self.logger.info("share: aborted by user") | ||
| self.run = False | ||
| except Exception as ex: | ||
| self.logger.fatal("share: fatal exception: {}\n{}".format(ex, traceback.print_exc())) |
There was a problem hiding this comment.
| self.logger.fatal("share: fatal exception: {}\n{}".format(ex, traceback.print_exc())) | |
| self.logger.fatal("share: fatal exception: {}\n{}".format(ex, traceback.format_exc())) |
Check if format_exec is more relavant? Same for rest of files
|
|
||
| # Create topic (might already exist) | ||
| aconf = filter_config(conf, ["consumer.", "producer."], "admin.") | ||
| aconf = filter_config(conf, ["consumer.", "producer.", "share."], "admin.") |
There was a problem hiding this comment.
Adding ".share" in filter config changes share.acknowledgement.mode -> acknowledgement.mode, which is an invalid config. i think this will cause crash for explicit mode. Is my understanding correct?
There was a problem hiding this comment.
The second arg to filter_config is the drop list, not a rename. So share.acknowledgement.mode is dropped from aconf, not renamed to acknowledgement.mode.
Explicit mode still works because sconf['share.acknowledgement.mode'] = 'explicit' is set directly on the share conf after filter_config
I verified it end-to-end with --share --explicit.
|
Thanks! Kaushik Raina (@k-raina) |
|
In the current codebase, the two files (soakclient.py and soakclient_perf.py) overlap heavily:
Because the two files are largely copies, any fix applied to one must be manually ported to the other. Over time this leads to silent, unintended drift (for example, divergence in logging detail, error handing etc.). One way to address this is to make soakclient.py the base class and soakclient_perf.py a subclass. |
|
Kaushik Raina (k-raina)
left a comment
There was a problem hiding this comment.
Overall LGTM!
Since delta btw files is tiny, runtime branching works fine for this PR. In future if there are many hooks or variants, we might need to revisit and adopt Inheritance.
| self.msg_err_cnt = 0 | ||
| self.consumer_err_cnt = 0 | ||
| self.consumer_error_cb_cnt = 0 | ||
| self.last_commited = None |
There was a problem hiding this comment.
Does this need to be?
| self.last_committed = None |
| self.incr_counter("consumer.msg", 1) | ||
|
|
||
| # end-to-end latency | ||
| headers = dict(msg.headers()) |
There was a problem hiding this comment.
Nit: Does this crash for msg with no header?
| # duration the OS can actually honor. -r remains the target rate | ||
| # (an upper bound: if a batch takes longer than its time budget, | ||
| # we run as fast as we can, below -r). | ||
| batch = max(1, int(self.rate / 100)) # ~100 batches/sec |
There was a problem hiding this comment.
Note : By default , rate is hardcoded to 80 in run.sh:
time opentelemetry-instrument $testdir/soakclient.py -i $TESTID -t $topic -r 80 -f $1 $share_flag $explicit_flag $perf_flag
So each batch is one message and batching only kicks in at -r ≥ 200.







What
--shareflag to soakclient.py that runs Producer + ShareConsumerhigher-throughput
soakclient_perf.pyvariant for the HT soakChecklist
References
JIRA:
Test & Review
Open questions / Follow-ups