-
Notifications
You must be signed in to change notification settings - Fork 106
Expand file tree
/
Copy pathenable_tracing.patch
More file actions
69 lines (66 loc) · 3.42 KB
/
enable_tracing.patch
File metadata and controls
69 lines (66 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/BigtableSinkTask.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/BigtableSinkTask.java
index 80bea508..3dcf2b27 100644
--- a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/BigtableSinkTask.java
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/BigtableSinkTask.java
@@ -37,10 +37,13 @@ import com.google.cloud.kafka.connect.bigtable.mapping.KeyMapper;
import com.google.cloud.kafka.connect.bigtable.mapping.MutationData;
import com.google.cloud.kafka.connect.bigtable.mapping.MutationDataBuilder;
import com.google.cloud.kafka.connect.bigtable.mapping.ValueMapper;
+import com.google.cloud.kafka.connect.bigtable.tracing.MessageTracer;
import com.google.cloud.kafka.connect.bigtable.version.PackageMetadata;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -75,6 +78,7 @@ public class BigtableSinkTask extends SinkTask {
private BigtableSchemaManager schemaManager;
@VisibleForTesting protected final Map<String, Batcher<RowMutationEntry, Void>> batchers;
@VisibleForTesting protected Logger logger = LoggerFactory.getLogger(BigtableSinkTask.class);
+ private Map<SinkRecord, Span> spans;
/**
* A default empty constructor. Initialization methods such as {@link BigtableSinkTask#start(Map)}
@@ -103,6 +107,8 @@ public class BigtableSinkTask extends SinkTask {
this.schemaManager = schemaManager;
this.context = context;
this.batchers = new HashMap<>();
+ // TODO: use a Map with limited capacity when tracing prod code.
+ this.spans = new HashMap<>();
}
@Override
@@ -166,6 +172,15 @@ public class BigtableSinkTask extends SinkTask {
if (records.isEmpty()) {
return;
}
+ records.forEach(
+ r -> {
+ Span span = MessageTracer.getRecordSpan(r, "sinkProcessing");
+ Span oldSpan = spans.put(r, span);
+ if (oldSpan != null) {
+ span.setStatus(StatusCode.ERROR, "Span not ended. Probably a previous put() failed.");
+ span.end();
+ }
+ });
Map<SinkRecord, MutationData> mutations = prepareRecords(records);
if (config.getBoolean(BigtableSinkTaskConfig.AUTO_CREATE_TABLES_CONFIG)) {
@@ -494,11 +509,16 @@ public class BigtableSinkTask extends SinkTask {
void handleResults(Map<SinkRecord, Future<Void>> perRecordResults) {
logger.trace("handleResults(#records={})", perRecordResults.size());
for (Map.Entry<SinkRecord, Future<Void>> recordResult : perRecordResults.entrySet()) {
+ SinkRecord record = recordResult.getKey();
+ Optional<Span> span = Optional.ofNullable(spans.remove(record));
try {
recordResult.getValue().get();
+ span.map(s -> s.setStatus(StatusCode.OK));
} catch (ExecutionException | InterruptedException e) {
- SinkRecord record = recordResult.getKey();
+ span.map(s -> s.setStatus(StatusCode.ERROR, e.getClass().getName()));
reportError(record, e);
+ } finally {
+ span.ifPresent(Span::end);
}
}
}