Skip to content

Commit c3492ba

Browse files
lofifncMichel Davit
andauthored
Add Batched lookups for streaming GRPC endpoints and BigTable (#5521)
Co-authored-by: Michel Davit <micheld@spotify.com>
1 parent d4b1ced commit c3492ba

File tree

10 files changed

+511
-82
lines changed

10 files changed

+511
-82
lines changed

build.sbt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
432432
// tf-metadata upgrade
433433
ProblemFilters.exclude[Problem](
434434
"org.tensorflow.metadata.v0.*"
435+
),
436+
// relax type hierarchy for batch stream
437+
ProblemFilters.exclude[IncompatibleMethTypeProblem](
438+
"com.spotify.scio.grpc.GrpcBatchDoFn.asyncLookup"
435439
)
436440
)
437441

scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncBatchLookupDoFn.java

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.spotify.scio.transforms;
1818

1919
import static java.util.Objects.requireNonNull;
20+
import static java.util.function.Function.identity;
2021

2122
import com.google.common.cache.Cache;
2223
import com.spotify.scio.transforms.BaseAsyncLookupDoFn.CacheSupplier;
@@ -26,6 +27,7 @@
2627
import java.util.Collections;
2728
import java.util.LinkedList;
2829
import java.util.List;
30+
import java.util.Map;
2931
import java.util.Queue;
3032
import java.util.UUID;
3133
import java.util.concurrent.ConcurrentHashMap;
@@ -275,48 +277,53 @@ private void createRequest() throws InterruptedException {
275277
}
276278

277279
private FutureType handleOutput(FutureType future, List<Input> batchInput, UUID key) {
280+
final Map<String, Input> keyedInputs =
281+
batchInput.stream().collect(Collectors.toMap(idExtractorFn::apply, identity()));
278282
return addCallback(
279283
future,
280284
response -> {
281-
batchResponseFn
282-
.apply(response)
283-
.forEach(
284-
pair -> {
285-
final String id = pair.getLeft();
286-
final Output output = pair.getRight();
287-
final List<ValueInSingleWindow<Input>> processInputs = inputs.remove(id);
288-
if (processInputs == null) {
289-
// no need to fail future here as we're only interested in its completion
290-
// finishBundle will fail the checkState as we do not produce any result
291-
LOG.error(
292-
"The ID '{}' received in the gRPC batch response does not "
293-
+ "match any IDs extracted via the idExtractorFn for the requested "
294-
+ "batch sent to the gRPC endpoint. Please ensure that the IDs returned "
295-
+ "from the gRPC endpoints match the IDs extracted using the provided"
296-
+ "idExtractorFn for the same input.",
297-
id);
298-
} else {
299-
final List<ValueInSingleWindow<KV<Input, TryWrapper>>> batchResult =
300-
processInputs.stream()
301-
.map(
302-
processInput -> {
303-
final Input i = processInput.getValue();
304-
final TryWrapper o = success(output);
305-
final Instant ts = processInput.getTimestamp();
306-
final BoundedWindow w = processInput.getWindow();
307-
final PaneInfo p = processInput.getPane();
308-
return ValueInSingleWindow.of(KV.of(i, o), ts, w, p);
309-
})
310-
.collect(Collectors.toList());
311-
results.add(Pair.of(key, batchResult));
312-
}
313-
});
285+
final Map<String, Output> keyedOutput =
286+
batchResponseFn.apply(response).stream()
287+
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
288+
289+
keyedInputs.forEach(
290+
(id, input) -> {
291+
final List<ValueInSingleWindow<Input>> processInputs = inputs.remove(id);
292+
if (processInputs == null) {
293+
// no need to fail future here as we're only interested in its completion
294+
// finishBundle will fail the checkState as we do not produce any result
295+
LOG.error(
296+
"The ID '{}' received in the gRPC batch response does not "
297+
+ "match any IDs extracted via the idExtractorFn for the requested "
298+
+ "batch sent to the gRPC endpoint. Please ensure that the IDs returned "
299+
+ "from the gRPC endpoints match the IDs extracted using the provided"
300+
+ "idExtractorFn for the same input.",
301+
id);
302+
} else {
303+
List<ValueInSingleWindow<KV<Input, TryWrapper>>> batchResult =
304+
processInputs.stream()
305+
.map(
306+
processInput -> {
307+
final Input i = processInput.getValue();
308+
final Output output = keyedOutput.get(id);
309+
final TryWrapper o =
310+
output == null
311+
? failure(new UnmatchedRequestException(id))
312+
: success(output);
313+
final Instant ts = processInput.getTimestamp();
314+
final BoundedWindow w = processInput.getWindow();
315+
final PaneInfo p = processInput.getPane();
316+
return ValueInSingleWindow.of(KV.of(i, o), ts, w, p);
317+
})
318+
.collect(Collectors.toList());
319+
results.add(Pair.of(key, batchResult));
320+
}
321+
});
314322
return null;
315323
},
316324
throwable -> {
317-
batchInput.forEach(
318-
element -> {
319-
final String id = idExtractorFn.apply(element);
325+
keyedInputs.forEach(
326+
(id, element) -> {
320327
final List<ValueInSingleWindow<KV<Input, TryWrapper>>> batchResult =
321328
inputs.remove(id).stream()
322329
.map(
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2024 Spotify AB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.spotify.scio.transforms;
18+
19+
import java.util.Objects;
20+
21+
public class UnmatchedRequestException extends RuntimeException {
22+
23+
private final String id;
24+
25+
public UnmatchedRequestException(String id) {
26+
super("Unmatched batch request for ID: " + id);
27+
this.id = id;
28+
}
29+
30+
public String getId() {
31+
return id;
32+
}
33+
34+
@Override
35+
public boolean equals(Object o) {
36+
if (o == null || getClass() != o.getClass()) return false;
37+
UnmatchedRequestException that = (UnmatchedRequestException) o;
38+
return Objects.equals(id, that.id);
39+
}
40+
41+
@Override
42+
public int hashCode() {
43+
return Objects.hashCode(id);
44+
}
45+
}

scio-core/src/test/scala/com/spotify/scio/transforms/AsyncBatchLookupDoFnTest.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,12 @@ class AsyncBatchLookupDoFnTest extends PipelineSpec {
6161
doFn: BaseAsyncBatchLookupDoFn[Int, List[Int], List[String], String, AsyncBatchClient, F, T]
6262
)(tryFn: T => Try[String]): Unit = {
6363
// batches of size 4 and size 3
64-
val output = runWithData(Seq[Seq[Int]](1 to 4, 8 to 10))(_.flatten.parDo(doFn)).map { kv =>
64+
val output = runWithData(
65+
Seq[Seq[Int]](
66+
1 to 4, // 1 and 3 are unmatched
67+
8 to 10 // failure
68+
)
69+
)(_.flatten.parDo(doFn)).map { kv =>
6570
val r = tryFn(kv.getValue) match {
6671
case Success(v) => v
6772
case Failure(e: CompletionException) => e.getCause.getMessage
@@ -70,8 +75,9 @@ class AsyncBatchLookupDoFnTest extends PipelineSpec {
7075
(kv.getKey, r)
7176
}
7277
output should contain theSameElementsAs (
73-
(1 to 4).map(x => x -> x.toString) ++
74-
(8 to 10).map(x => x -> "failure for 8,9,10")
78+
Seq(1, 3).map(x => x -> s"Unmatched batch request for ID: $x") ++
79+
Seq(2, 4).map(x => x -> x.toString) ++
80+
Seq(8, 9, 10).map(x => x -> "failure for 8,9,10")
7581
)
7682
}
7783

@@ -229,7 +235,7 @@ class FailingGuavaBatchLookupDoFn extends AbstractGuavaAsyncBatchLookupDoFn() {
229235
input: List[Int]
230236
): ListenableFuture[List[String]] =
231237
if (input.size % 2 == 0) {
232-
Futures.immediateFuture(input.map(_.toString))
238+
Futures.immediateFuture(input.filter(_ % 2 == 0).map(_.toString))
233239
} else {
234240
Futures.immediateFailedFuture(new RuntimeException("failure for " + input.mkString(",")))
235241
}
@@ -299,7 +305,7 @@ class FailingJavaBatchLookupDoFn extends AbstractJavaAsyncBatchLookupDoFn() {
299305
input: List[Int]
300306
): CompletableFuture[List[String]] =
301307
if (input.size % 2 == 0) {
302-
CompletableFuture.supplyAsync(() => input.map(_.toString))
308+
CompletableFuture.supplyAsync(() => input.filter(_ % 2 == 0).map(_.toString))
303309
} else {
304310
val f = new CompletableFuture[List[String]]()
305311
f.completeExceptionally(new RuntimeException("failure for " + input.mkString(",")))
@@ -347,7 +353,7 @@ class FailingScalaBatchLookupDoFn extends AbstractScalaAsyncBatchLookupDoFn() {
347353
override protected def newClient(): AsyncBatchClient = null
348354
override def asyncLookup(session: AsyncBatchClient, input: List[Int]): Future[List[String]] =
349355
if (input.size % 2 == 0) {
350-
Future.successful(input.map(_.toString))
356+
Future.successful(input.filter(_ % 2 == 0).map(_.toString))
351357
} else {
352358
Future.failed(new RuntimeException("failure for " + input.mkString(",")))
353359
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2024 Spotify AB.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package com.spotify.scio.bigtable;
19+
20+
import com.google.cloud.bigtable.config.BigtableOptions;
21+
import com.google.cloud.bigtable.grpc.BigtableSession;
22+
import com.google.common.util.concurrent.ListenableFuture;
23+
import com.spotify.scio.transforms.BaseAsyncLookupDoFn;
24+
import com.spotify.scio.transforms.GuavaAsyncBatchLookupDoFn;
25+
import java.io.IOException;
26+
import java.util.List;
27+
import org.apache.beam.sdk.transforms.DoFn;
28+
import org.apache.beam.sdk.transforms.SerializableFunction;
29+
import org.apache.commons.lang3.tuple.Pair;
30+
31+
/**
32+
* A {@link DoFn} which batches elements and performs asynchronous lookup for them using Google
33+
* Cloud Bigtable.
34+
*
35+
* @param <Input> input element type.
36+
* @param <BatchRequest> batched input element type
37+
* @param <BatchResponse> batched response from BigTable type
38+
* @param <Result> Bigtable lookup value type.
39+
*/
40+
public abstract class BigtableBatchDoFn<Input, BatchRequest, BatchResponse, Result>
41+
extends GuavaAsyncBatchLookupDoFn<Input, BatchRequest, BatchResponse, Result, BigtableSession> {
42+
43+
private final BigtableOptions options;
44+
45+
/** Perform asynchronous Bigtable lookup. */
46+
public abstract ListenableFuture<BatchResponse> asyncLookup(
47+
BigtableSession session, BatchRequest batchRequest);
48+
49+
/**
50+
* Create a {@link BigtableBatchDoFn} instance.
51+
*
52+
* @param options Bigtable options.
53+
*/
54+
public BigtableBatchDoFn(
55+
BigtableOptions options,
56+
int batchSize,
57+
SerializableFunction<List<Input>, BatchRequest> batchRequestFn,
58+
SerializableFunction<BatchResponse, List<Pair<String, Result>>> batchResponseFn,
59+
SerializableFunction<Input, String> idExtractorFn) {
60+
this(options, batchSize, batchRequestFn, batchResponseFn, idExtractorFn, 1000);
61+
}
62+
63+
/**
64+
* Create a {@link BigtableBatchDoFn} instance.
65+
*
66+
* @param options Bigtable options.
67+
* @param maxPendingRequests maximum number of pending requests on every cloned DoFn. This
68+
* prevents runner from timing out and retrying bundles.
69+
*/
70+
public BigtableBatchDoFn(
71+
BigtableOptions options,
72+
int batchSize,
73+
SerializableFunction<List<Input>, BatchRequest> batchRequestFn,
74+
SerializableFunction<BatchResponse, List<Pair<String, Result>>> batchResponseFn,
75+
SerializableFunction<Input, String> idExtractorFn,
76+
int maxPendingRequests) {
77+
this(
78+
options,
79+
batchSize,
80+
batchRequestFn,
81+
batchResponseFn,
82+
idExtractorFn,
83+
maxPendingRequests,
84+
new BaseAsyncLookupDoFn.NoOpCacheSupplier<>());
85+
}
86+
87+
/**
88+
* Create a {@link BigtableBatchDoFn} instance.
89+
*
90+
* @param options Bigtable options.
91+
* @param maxPendingRequests maximum number of pending requests on every cloned DoFn. This
92+
* prevents runner from timing out and retrying bundles.
93+
* @param cacheSupplier supplier for lookup cache.
94+
*/
95+
public BigtableBatchDoFn(
96+
BigtableOptions options,
97+
int batchSize,
98+
SerializableFunction<List<Input>, BatchRequest> batchRequestFn,
99+
SerializableFunction<BatchResponse, List<Pair<String, Result>>> batchResponseFn,
100+
SerializableFunction<Input, String> idExtractorFn,
101+
int maxPendingRequests,
102+
BaseAsyncLookupDoFn.CacheSupplier<String, Result> cacheSupplier) {
103+
super(
104+
batchSize,
105+
batchRequestFn,
106+
batchResponseFn,
107+
idExtractorFn,
108+
maxPendingRequests,
109+
cacheSupplier);
110+
this.options = options;
111+
}
112+
113+
@Override
114+
public ResourceType getResourceType() {
115+
// BigtableSession is backed by a gRPC thread safe client
116+
return ResourceType.PER_INSTANCE;
117+
}
118+
119+
protected BigtableSession newClient() {
120+
try {
121+
return new BigtableSession(options);
122+
} catch (IOException e) {
123+
throw new RuntimeException(e);
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)