We want to measure these latencies under a constant load:
- from the moment the Kafka producer starts writing to the moment the data lands in Cloud Bigtable,
- from the moment the message is in Kafka to the moment the data lands in Cloud Bigtable,
- from the moment our sink receives the data (via
put()method call) to the moment the data lands in Cloud Bigtable.
Of course, results are not valid if any error occurs.
The performance test infrastructure is all set up within GCP.
The high level overview of it is presented in the following diagram:
flowchart
subgraph Kubernetes
KafkaConnect
OpentelemetryCollector
ManagedPrometheus
LoadGenerator
end
ManagedKafka
CloudBigtable
Stackdriver
LoadGenerator -->|Traces+Metrics| OpentelemetryCollector
KafkaConnect -->|Traces| OpentelemetryCollector --> Stackdriver
KafkaConnect -->|Metrics| ManagedPrometheus --> Stackdriver
LoadGenerator -->|Data| ManagedKafka -->|Data| KafkaConnect -->|Data| CloudBigtable
- Strimzi Kafka operator is used to configure Kafka Connect.
terraformcode describing used infrastructure is available in directory terraform.- Some Kubernetes manifests used are available in directory kubernetes.
The tests were conducted in europe-central2-a (Warsaw, Poland) zone.
- Kubernetes: 1.31.5
terraform: 1.10.5hashicorp/googleprovider: 6.19.0hashicorp/kubernetesprovider: 2.35.1
- Strimzi Kafka Operator: 0.45.0
- Kafka Connect: 3.8.1
Single-node cluster with SSD storage.
See bigtable.tf for details.
A cluster with 3 vCPU and 3 GB of RAM.
See kafka.tf for details.
A cluster consisting of 4 n1-standard-4 machines (4 vCPU, 15 GB of RAM)
A three-node cluster with each machine reserving 2.66 vCPU and 6 GB of RAM.
Its metrics are exported using JMX Prometheus exporter.
See kubernetes.tf for details.
A single Kubernetes Pod reserving 2 vCPU and 8 GB of RAM and running a simple script consisting of a three-part shell pipeline:
- a simple script generating data of desired shape,
pvused to rate limit the generator script,- upstream-provided wrapper for Kafka producer,
kafka-console-producer.sh.
An autoscaled service consisting of 6..10 Pods each limited to 0.2 vCPU and 0.8GB of RAM.
An autoscaled service using default configuration.
Kafka clients used by the load generator and Kafka Connect are configured to optimize latency as suggested in Confluent's "Optimizing Your Apache Kafka® Deployment" whitepaper.
Producer settings:
linger.ms:0compression.type:noneacks:1Consumer settings:fetch.min.bytes:1
Replication factor of Kafka topics used for internal state storage: the same as in Kafka.
Key converter: org.apache.kafka.connect.json.JsonConverter with schemas disabled.
Value converter; org.apache.kafka.connect.json.JsonConverter with schemas enabled.
It is also configured to use TLS when accessing Kafka API.
The topic is configured to use 60 partitions with replication to all 3 Kafka nodes.
The full configuration of the sink connector is available in kubernetes.tf. The interesting parts of these settings are:
{
"auto.create.column.families": "false",
"auto.create.tables": "false",
"error.mode": "FAIL",
"insert.mode": "upsert",
"max.batch.size": "1000",
"retry.timeout.ms": "90000",
"tasks.max": "60",
"value.null.mode": "write"
}generate_load.sh and main.rs are configured using environment variables:
KAFKA_CONNECT_BOOTSTRAP_SERVERS,KAFKA_CONNECT_TLS,KAFKA_CONNECT_SASL_USERNAME,KAFKA_CONNECT_SASL_PASSWORD_FILE,KAFKA_CONNECT_SASL_MECHANISM- the same meaning as in Strimzi's Kafka ConnectTHROUGHPUT- number of messages to be sent per secondTIMEOUT- duration of load generation (default unit: seconds)TOPIC- topic to write to The script generates data in two shapes:- a String key and a String value - when
COLUMN_FAMILIESis equal to0, - a String key and a Struct mapping to
COLUMN_FAMILIEScolumn families each containingCOLUMNS_PER_FAMILYcolumns. The values in each cell haveFIELD_VALUE_SIZEbytes each. Note that we use JSON with in-band schema, so messages contain schema bytes. This overhead is not insignificant, for example:
# Just a String value
$ FIELD_VALUE_SIZE=100 COLUMN_FAMILIES=0 COLUMNS_PER_FAMILY=1 ./gen 2>/dev/null | cut -d"|" -f2 | head -1 | wc --bytes
158The default values are:
THROUGHPUT-3000TIMEOUT-600COLUMN_FAMILIES-0COLUMNS_PER_FAMILY-1FIELD_VALUE_SIZE-100
All the code (the load generator, Kafka Connect, and our sink) is instrumented using Opentelemetry.
In case of the load generator and Kafka Connect, the instrumentation is automatic, because Kafka clients (both reader and producer) are supported by the Opentelemetry agent, which we run alongside these two components.
The sink is instrumented by calling appropriate functions in the code.
Interestingly, it also uses the Opentelemetry agent indirectly - our code uses GlobalOpenTelemetry which is injected by the agent.
All the traces pass through Opentelemetry collector, which is responsible for sending it into Google Stackdriver.
Tracing code is not present in the release code.
The code of tracing implementation is available in MessageTracer.java.
To enable tracing, put it into sink/src/main/java/com/google/cloud/kafka/connect/bigtable/tracing and apply enable_tracing.patch git patch.
A single test run consists of just running a load generator for a stretch of time and collecting metrics and traces from all the components.
terraform(with GCP credentials configured)kubectljqbash+ coreutilsenvsubstdockergcloud(logged into the desired project, with Application Default Credentials configured)java11, 17, or 21maven
- Adjust variables such as project name and region in
main.tf - Adjust the test config by modifying variables in
kubernetes.tf.
perf_test.sh is the script that executes all the needed steps:
Always run it from the directory it resides in.
Needs to only be executed once at the beginning.
It prepares all the environment: sets up the infrastructure, builds the docker container containing our connector and load generator, creates helper Kubernetes resources (Strimzi Kafka Operator, Opentelemetry collector, Managed Prometheus).
It is idempontent.
Creates Cloud Bigtable table and Kafka topic and then executes a performance test using current configuration from .tf files.
Cleans up the environment for the next perf_test.sh run: removes existing Kafka Connect and the connector and deletes Kafka topic and Cloud Bigtable table.
A shortcut for perf_test.sh cleanup && perf_test.sh run.
Removes all the infrastructure set up for the testing.
Note that it requires manual confirmation.
Import the following JSON into https://console.cloud.google.com/monitoring/dashboards: dashboard.json.
You might need to replace some strings such as project name analogically to main.tf modifications.
You can browse traces at https://console.cloud.google.com/traces/explorer.
Despite our configuration that attempts to record trace of every record, the Opentelemetry collector cannot keep up with too high rate of traces. To verify if it is the case you can compare rate of traces in Trace Explorer and metrics (or directly look at Opentelemetry collector's metrics).
For basic analysis use https://console.cloud.google.com/traces/tasks.
Sample queries:
+root:"bigtablesink-kafka-topic publish" +span:sinkProcessing- collect traces such that the first span is namedbigtablesink-kafka-topic publishand a spansinkProcessingis present+span:sinkProcessing- collect all traces containing span namedsinkProcessing
Query language reference: https://cloud.google.com/trace/docs/trace-filters.
Note that to compute total trace time this analysis sums time of all member spans rather than compute delta between end of the last span and start of the first one. Still, sample traces are still very useful to look at.
To aggregate wall time instead of CPU time, you can use https://console.cloud.google.com/logs/analytics.
The following query calculates percentiles for three kinds of latencies described in Measured Values. Remember to adjust project, topic name, and timestamps. Note that latency from the moment the message is in Kafka to the moment the data lands in Cloud Bigtable is calculated using the producer's trace since Kafka Connect consumer's span could start after the data arrives. Unfortunately, it might be inaccurate if producer process lacks CPU and closes span with delay. It's a good idea to exclude the first 60-90s and the last 30-60s of the test period so that outliers caused by scaling rate of messages up or down doesn't influence the results.
SELECT
APPROX_QUANTILES(Process.duration_nano / 1000, 100)[OFFSET(50)] as sink_p50,
APPROX_QUANTILES(Process.duration_nano / 1000, 100)[OFFSET(75)] as sink_p75,
APPROX_QUANTILES(Process.duration_nano / 1000, 100)[OFFSET(90)] as sink_p90,
APPROX_QUANTILES(Process.duration_nano / 1000, 100)[OFFSET(95)] as sink_p95,
APPROX_QUANTILES(Process.duration_nano / 1000, 100)[OFFSET(99)] as sink_p99,
APPROX_QUANTILES(Process.duration_nano / 1000, 1000)[OFFSET(995)] as sink_p995,
APPROX_QUANTILES(Process.duration_nano / 1000, 1000)[OFFSET(999)] as sink_p999,
MIN(Process.duration_nano / 1000) as sink_min,
AVG(Process.duration_nano / 1000) as sink_avg,
MAX(Process.duration_nano / 1000) as sink_max,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000) - (Publish.duration_nano / 1000), 100)[OFFSET(50)] as kafka_to_bigtable_p50,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000) - (Publish.duration_nano / 1000), 100)[OFFSET(75)] as kafka_to_bigtable_p75,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000) - (Publish.duration_nano / 1000), 100)[OFFSET(90)] as kafka_to_bigtable_p90,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000) - (Publish.duration_nano / 1000), 100)[OFFSET(95)] as kafka_to_bigtable_p95,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000) - (Publish.duration_nano / 1000), 100)[OFFSET(99)] as kafka_to_bigtable_p99,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000) - (Publish.duration_nano / 1000), 1000)[OFFSET(995)] as kafka_to_bigtable_p995,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000) - (Publish.duration_nano / 1000), 1000)[OFFSET(999)] as kafka_to_bigtable_p999,
MIN(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000) - (Publish.duration_nano / 1000)) as kafka_to_bigtable_min,
AVG(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000) - (Publish.duration_nano / 1000)) as kafka_to_bigtable_avg,
MAX(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000) - (Publish.duration_nano / 1000)) as kafka_to_bigtable_max,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000), 100)[OFFSET(50)] as total_p50,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000), 100)[OFFSET(75)] as total_p75,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000), 100)[OFFSET(90)] as total_p90,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000), 100)[OFFSET(95)] as total_p95,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000), 100)[OFFSET(99)] as total_p99,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000), 1000)[OFFSET(995)] as total_p995,
APPROX_QUANTILES(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000), 1000)[OFFSET(999)] as total_p999,
MIN(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000)) as total_min,
AVG(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000)) as total_avg,
MAX(TIMESTAMP_DIFF(Process.start_time, Publish.start_time, MICROSECOND) + (Process.duration_nano / 1000)) as total_max
FROM `unoperate-test.global._Trace._AllSpans` Publish, `unoperate-test.global._Trace._AllSpans` Process
WHERE
Publish.name = "bigtablesink-kafka-topic publish"
AND Process.name = "sinkProcessing"
AND Publish.start_time > TIMESTAMP('2025-02-21T12:52:00 UTC')
AND Publish.start_time < TIMESTAMP('2025-02-21T13:03:00 UTC')
AND Publish.trace_id = Process.trace_id