From 4ed49a8956b8496bea3641c5528e3c5720111d3e Mon Sep 17 00:00:00 2001 From: Cole-Greer Date: Fri, 24 Apr 2026 11:43:59 -0700 Subject: [PATCH 01/11] Add streaming HTTP response support to gremlin-driver Adds incremental GraphBinary response deserialization to the Java driver. HTTP response chunks are fed to a BlockingQueue-backed InputStream, and a dedicated reader thread deserializes items one at a time using the existing TypeSerializer infrastructure via an InputStream-backed Buffer adapter. Results are pushed to the ResultSet as they are deserialized, enabling consumers to process results before the full response is received. Non-GraphBinary serializers (GraphSON, custom MessageSerializer) fall back to the previous aggregating pipeline (HttpObjectAggregator + HttpGremlinResponseDecoder). --- CHANGELOG.asciidoc | 1 + docs/src/upgrade/release-4.x.x.asciidoc | 7 + .../tinkerpop/gremlin/driver/Channelizer.java | 38 ++- .../tinkerpop/gremlin/driver/Cluster.java | 18 + .../handler/ByteBufQueueInputStream.java | 134 ++++++++ .../GraphBinaryStreamResponseReader.java | 105 ++++++ .../handler/HttpStreamingResponseHandler.java | 228 +++++++++++++ .../driver/handler/InputStreamBuffer.java | 311 ++++++++++++++++++ .../handler/ByteBufQueueInputStreamTest.java | 94 ++++++ .../GraphBinaryStreamResponseReaderTest.java | 223 +++++++++++++ .../driver/handler/InputStreamBufferTest.java | 92 ++++++ .../StreamingResponseIntegrateTest.java | 217 ++++++++++++ 12 files changed, 1461 insertions(+), 7 deletions(-) create mode 100644 gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStream.java create mode 100644 gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReader.java create mode 100644 gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java create mode 100644 gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBuffer.java create mode 100644 gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java create mode 100644 gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java create mode 100644 gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java create mode 100644 gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index e3f3471a6f8..f76f6ce7909 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -27,6 +27,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Added Gremlator, a single page web application, that translates Gremlin into various programming languages like Javascript and Python. * Removed `uuid` dependency from `gremlin-javascript` in favor of the built-in `globalThis.crypto.randomUUID()`. +* Added streaming HTTP response support to `gremlin-driver` for incremental result deserialization over GraphBinary. * Connected HTTP streaming response deserialization to the traversal API in `gremlin-javascript`, enabling `next()` to return the first result without waiting for the full response. * Changed `Client.stream()` in `gremlin-javascript` to return an `AsyncGenerator` for direct incremental consumption. * Removed `readable-stream` dependency from `gremlin-javascript`. diff --git a/docs/src/upgrade/release-4.x.x.asciidoc b/docs/src/upgrade/release-4.x.x.asciidoc index f0216e0b8d1..c3ddadf3344 100644 --- a/docs/src/upgrade/release-4.x.x.asciidoc +++ b/docs/src/upgrade/release-4.x.x.asciidoc @@ -270,6 +270,13 @@ try { The traversal API is not affected — `Next()`, `ToList()`, etc. still throw `ResponseException` directly since they block on the async stream internally. +==== Streaming Response Deserialization in gremlin-driver + +The Java driver now deserializes HTTP responses incrementally, delivering results to the `ResultSet` as they arrive +rather than buffering the entire response. This is only supported with the default GraphBinary serializer; custom +`MessageSerializer` implementations will use a non-streaming pipeline that buffers the full response before +deserialization. + ==== More Secure Gremlin Server Previous versions of Gremlin Server relied on a Gremlin-flavored Groovy `ScriptEngine` for basic server initialization, diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 427f76c0563..940bd460973 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -34,9 +34,13 @@ import org.apache.tinkerpop.gremlin.driver.handler.HttpContentDecompressionHandler; import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder; import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder; +import org.apache.tinkerpop.gremlin.driver.handler.HttpStreamingResponseHandler; import org.apache.tinkerpop.gremlin.driver.handler.IdleConnectionHandler; import org.apache.tinkerpop.gremlin.driver.handler.InactiveChannelHandler; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.apache.tinkerpop.gremlin.util.MessageSerializer; import java.util.Collections; import java.util.Optional; @@ -92,7 +96,7 @@ abstract class AbstractChannelizer extends ChannelInitializer imp protected Connection connection; protected Cluster cluster; protected SslHandler sslHandler; - private AtomicReference pending; + protected AtomicReference pending; protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler"; protected static final String PIPELINE_SSL_HANDLER = "gremlin-ssl-handler"; @@ -187,7 +191,9 @@ final class HttpChannelizer extends AbstractChannelizer { ResponseMessage.build().code(HttpResponseStatus.NO_CONTENT).result(Collections.emptyList()).create(); private HttpGremlinRequestEncoder gremlinRequestEncoder; + private HttpStreamingResponseHandler streamingResponseHandler; private HttpGremlinResponseDecoder gremlinResponseDecoder; + private boolean useStreaming; private HttpContentDecompressionHandler httpCompressionDecoder; private IdleStateHandler idleStateHandler; @@ -200,7 +206,19 @@ public void init(final Connection connection) { httpCompressionDecoder = new HttpContentDecompressionHandler(); gremlinRequestEncoder = new HttpGremlinRequestEncoder(cluster.getSerializer(), cluster.getRequestInterceptors(), cluster.isUserAgentOnConnectEnabled(), cluster.isBulkResultsEnabled(), connection.getUri()); - gremlinResponseDecoder = new HttpGremlinResponseDecoder(cluster.getSerializer()); + + final MessageSerializer serializer = cluster.getSerializer(); + if (serializer instanceof GraphBinaryMessageSerializerV4) { + useStreaming = true; + final GraphBinaryReader graphBinaryReader = + ((GraphBinaryMessageSerializerV4) serializer).getMapper().getReader(); + streamingResponseHandler = new HttpStreamingResponseHandler( + graphBinaryReader, pending, cluster.streamingReaderPool(), cluster.getMaxResponseContentLength()); + } else { + useStreaming = false; + gremlinResponseDecoder = new HttpGremlinResponseDecoder(serializer); + } + if (cluster.getIdleConnectionTimeout() > 0) { final int idleConnectionTimeout = (int) (cluster.getIdleConnectionTimeout() / 1000); idleStateHandler = new IdleStateHandler(idleConnectionTimeout, idleConnectionTimeout, 0); @@ -240,11 +258,17 @@ public void configure(final ChannelPipeline pipeline) { DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS, false); pipeline.addLast(PIPELINE_HTTP_CODEC, handler); - pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0 - ? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE)); - pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); - pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); - pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder); + if (useStreaming) { + pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); + pipeline.addLast(PIPELINE_HTTP_DECODER, streamingResponseHandler); + pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); + } else { + pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0 + ? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE)); + pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); + pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); + pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder); + } } } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index cac193814c0..cd9a225a2d0 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -70,8 +70,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -383,6 +386,10 @@ ScheduledExecutorService connectionScheduler() { return manager.connectionScheduler; } + ExecutorService streamingReaderPool() { + return manager.streamingReaderPool; + } + Settings.ConnectionPoolSettings connectionPoolSettings() { return manager.connectionPoolSettings; } @@ -956,6 +963,12 @@ class Manager { */ private final ScheduledThreadPoolExecutor connectionScheduler; + /** + * Cached thread pool for streaming response reader threads. One thread per active streaming response, + * bounded implicitly by the connection pool size. + */ + private final ExecutorService streamingReaderPool; + private final int nioPoolSize; private final int workerPoolSize; private final int port; @@ -1023,6 +1036,10 @@ private Manager(final Builder builder) { this.connectionScheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build()); + this.streamingReaderPool = new ThreadPoolExecutor(0, builder.maxConnectionPoolSize, + 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), + new BasicThreadFactory.Builder().namingPattern("gremlin-driver-stream-reader-%d").build()); + validationRequest = () -> RequestMessage.build(builder.validationRequest); } @@ -1133,6 +1150,7 @@ synchronized CompletableFuture close() { executor.shutdown(); hostScheduler.shutdown(); connectionScheduler.shutdown(); + streamingReaderPool.shutdownNow(); closeIt.complete(null); }); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStream.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStream.java new file mode 100644 index 00000000000..9ae0dad2a6f --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStream.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * An {@link InputStream} backed by a {@link BlockingQueue} of {@link ByteBuf} objects. The Netty event loop + * offers ByteBufs to the queue as HTTP content chunks arrive, and a reader thread consumes them via + * standard InputStream reads. + */ +public class ByteBufQueueInputStream extends InputStream { + + private static final ByteBuf END_OF_STREAM = Unpooled.buffer(0); + + private final BlockingQueue queue; + private ByteBuf current; + private volatile boolean eof; + + public ByteBufQueueInputStream() { + this.queue = new LinkedBlockingQueue<>(); + } + + /** + * Offer a ByteBuf to the queue. The caller must have already retained the ByteBuf if needed. + * The ByteBuf will be released after it is fully read. If the stream is already closed, + * the buffer is released immediately. + */ + public void offer(final ByteBuf buf) { + if (eof) { + if (buf != END_OF_STREAM && buf.refCnt() > 0) { + buf.release(); + } + return; + } + queue.add(buf); + } + + /** + * Signal that no more ByteBufs will be offered. + */ + public void signalEndOfStream() { + queue.offer(END_OF_STREAM); + } + + @Override + public int read() throws IOException { + if (eof) return -1; + + while (current == null || !current.isReadable()) { + releaseCurrent(); + try { + current = queue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for data", e); + } + if (current == END_OF_STREAM) { + eof = true; + current = null; + return -1; + } + } + return current.readByte() & 0xFF; + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + if (eof) return -1; + if (len == 0) return 0; + + // Block until at least one byte is available, then return what we have (short read). + while (current == null || !current.isReadable()) { + releaseCurrent(); + try { + current = queue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for data", e); + } + if (current == END_OF_STREAM) { + eof = true; + current = null; + return -1; + } + } + final int readable = Math.min(current.readableBytes(), len); + current.readBytes(b, off, readable); + return readable; + } + + @Override + public void close() throws IOException { + eof = true; + releaseCurrent(); + // drain and release any remaining buffers + ByteBuf buf; + while ((buf = queue.poll()) != null) { + if (buf != END_OF_STREAM && buf.refCnt() > 0) { + buf.release(); + } + } + } + + private void releaseCurrent() { + if (current != null && current != END_OF_STREAM && current.refCnt() > 0) { + current.release(); + } + current = null; + } + +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReader.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReader.java new file mode 100644 index 00000000000..2dd80d2779c --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReader.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser; +import org.apache.tinkerpop.gremlin.structure.io.Buffer; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.structure.io.binary.Marker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Performs pull-based streaming deserialization of a GraphBinary v4 response from an {@link InputStreamBuffer}. + * Reads one item at a time using the {@link GraphBinaryReader} and {@code TypeSerializer} infrastructure, + * pushing each result to the {@link ResultSet} as it is deserialized. + *

+ * Wire format: {@code [version_byte][bulked_flag_byte][items...][EndOfStream marker][status_code][message][exception]} + */ +public class GraphBinaryStreamResponseReader implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(GraphBinaryStreamResponseReader.class); + + private final Buffer buffer; + private final GraphBinaryReader reader; + private final ResultSet resultSet; + private final AtomicReference pendingResultSet; + + public GraphBinaryStreamResponseReader(final Buffer buffer, + final GraphBinaryReader reader, + final ResultSet resultSet, + final AtomicReference pendingResultSet) { + this.buffer = buffer; + this.reader = reader; + this.resultSet = resultSet; + this.pendingResultSet = pendingResultSet; + } + + @Override + public void run() { + try { + // Read header: version byte (MSB must be 1) and bulking flag + final byte version = buffer.readByte(); + if ((version & 0x80) == 0) { + throw new RuntimeException("Invalid GraphBinary response version: " + version); + } + final boolean bulked = (buffer.readByte() & 1) == 1; + + // Read items until EndOfStream marker + while (true) { + final Object obj = reader.read(buffer); + if (obj instanceof Marker) { + break; + } + + if (bulked) { + final long bulk = reader.read(buffer); + resultSet.add(new Result(new DefaultRemoteTraverser<>(obj, bulk))); + } else { + resultSet.add(new Result(obj)); + } + } + + // Read footer: status code, nullable message, nullable exception + final int statusCode = reader.readValue(buffer, Integer.class, false); + final String message = reader.readValue(buffer, String.class, true); + final String exception = reader.readValue(buffer, String.class, true); + + // Status code 0 means success in GraphBinary v4 — the server omits the HTTP status code + // in the binary footer when the response is successful. + if (statusCode == 0 || statusCode == HttpResponseStatus.OK.code()) { + resultSet.markComplete(); + } else { + resultSet.markError(new ResponseException(HttpResponseStatus.valueOf(statusCode), message, exception)); + } + } catch (Exception e) { + logger.warn("Error reading streaming response", e); + resultSet.markError(e); + } finally { + pendingResultSet.compareAndSet(resultSet, null); + buffer.release(); + } + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java new file mode 100644 index 00000000000..8d554684eca --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.DefaultHttpObject; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.CharsetUtil; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.util.ser.SerTokens; +import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; +import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; + +/** + * A Netty handler that receives raw HTTP response chunks and feeds them to a streaming GraphBinary reader + * running on a dedicated thread. Replaces {@code HttpObjectAggregator} + {@code HttpGremlinResponseDecoder} + * in the production pipeline. + *

+ * For GraphBinary responses, HTTP content chunks are fed to a {@link ByteBufQueueInputStream} which is consumed + * by a {@link GraphBinaryStreamResponseReader} on a reader thread. The reader thread handles result delivery, + * {@code ResultSet} completion, and {@code pendingResultSet} cleanup directly. For non-GraphBinary error responses + * (e.g., JSON 401/500), the error body is accumulated across chunks and parsed on {@link LastHttpContent}, then + * {@code LAST_CONTENT_READ_RESPONSE} is fired through the pipeline for {@link GremlinResponseHandler}. + */ +public class HttpStreamingResponseHandler extends MessageToMessageDecoder { + + private static final Logger logger = LoggerFactory.getLogger(HttpStreamingResponseHandler.class); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final GraphBinaryReader graphBinaryReader; + private final AtomicReference pendingResultSet; + private final ExecutorService readerPool; + private final long maxResponseContentLength; + + // Mutable state below is accessed exclusively from the channel's event loop thread. + private HttpResponseStatus responseStatus; + private String contentType; + private long bytesRead; + private ByteBufQueueInputStream queueInputStream; + private CompositeByteBuf errorBody; + + public HttpStreamingResponseHandler(final GraphBinaryReader graphBinaryReader, + final AtomicReference pendingResultSet, + final ExecutorService readerPool, + final long maxResponseContentLength) { + this.graphBinaryReader = graphBinaryReader; + this.pendingResultSet = pendingResultSet; + this.readerPool = readerPool; + this.maxResponseContentLength = maxResponseContentLength; + } + + @Override + protected void decode(final ChannelHandlerContext ctx, final DefaultHttpObject msg, + final List out) throws Exception { + if (msg instanceof HttpResponse) { + final HttpResponse resp = (HttpResponse) msg; + + // Reset mutable state for the new response cycle to prevent stale state from a previous + // response bleeding into this one when the handler is reused on the same connection. + resetState(); + + responseStatus = resp.status(); + contentType = resp.headers().get(HttpHeaderNames.CONTENT_TYPE); + queueInputStream = new ByteBufQueueInputStream(); + + // Spawn reader thread for GraphBinary responses + if (isGraphBinaryResponse()) { + final ResultSet rs = pendingResultSet.get(); + if (rs != null) { + final InputStreamBuffer buffer = new InputStreamBuffer(queueInputStream); + final GraphBinaryStreamResponseReader streamReader = + new GraphBinaryStreamResponseReader(buffer, graphBinaryReader, rs, pendingResultSet); + try { + readerPool.submit(streamReader::run); + } catch (RejectedExecutionException e) { + queueInputStream.signalEndOfStream(); + rs.markError(e); + pendingResultSet.compareAndSet(rs, null); + out.add(LAST_CONTENT_READ_RESPONSE); + } + } else { + // No pending ResultSet — close the stream and fire sentinel immediately + queueInputStream.signalEndOfStream(); + queueInputStream = null; + out.add(LAST_CONTENT_READ_RESPONSE); + } + } + } + + if (msg instanceof HttpContent) { + final ByteBuf content = ((HttpContent) msg).content(); + bytesRead += content.readableBytes(); + + if (maxResponseContentLength > 0 && bytesRead > maxResponseContentLength) { + // Don't signal here — exceptionCaught will handle cleanup + throw new TooLongFrameException("Response exceeded " + maxResponseContentLength + " bytes."); + } + + if (!isGraphBinaryResponse()) { + // Accumulate non-GraphBinary error body across chunks + if (content.readableBytes() > 0) { + if (errorBody == null) { + errorBody = ctx.alloc().compositeBuffer(); + } + // retain() because Netty releases the content ByteBuf after decode() returns + errorBody.addComponent(true, content.retain()); + } + } else if (content.readableBytes() > 0 && queueInputStream != null) { + // Feed bytes to the reader thread + // retain() because Netty releases the content ByteBuf after decode() returns + queueInputStream.offer(content.retain()); + } + + if (msg instanceof LastHttpContent) { + if (isGraphBinaryResponse()) { + if (queueInputStream != null) { + queueInputStream.signalEndOfStream(); + } + // Reader thread handles completion directly via markComplete/markError + } else { + // Non-GraphBinary error — parse accumulated body and fire sentinel + handleNonGraphBinaryError(); + out.add(LAST_CONTENT_READ_RESPONSE); + } + } + } + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + if (queueInputStream != null) { + queueInputStream.signalEndOfStream(); + } + releaseErrorBody(); + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { + if (queueInputStream != null) { + queueInputStream.signalEndOfStream(); + } + releaseErrorBody(); + super.exceptionCaught(ctx, cause); + } + + private void handleNonGraphBinaryError() { + final ResultSet rs = pendingResultSet.get(); + if (rs == null) return; + + try { + if (errorBody != null && errorBody.readableBytes() > 0) { + final JsonNode node = mapper.readTree(errorBody.toString(CharsetUtil.UTF_8)); + final String message = node.has("message") ? node.get("message").asText() : responseStatus.reasonPhrase(); + rs.markError(new ResponseException(responseStatus, message)); + } else { + rs.markError(new ResponseException(responseStatus, responseStatus.reasonPhrase())); + } + } catch (Exception e) { + logger.debug("Failed to parse error response body as JSON", e); + rs.markError(new ResponseException(responseStatus, responseStatus.reasonPhrase())); + } finally { + releaseErrorBody(); + } + } + + private void resetState() { + // Clean up any leftover resources from a previous response on this connection + if (queueInputStream != null) { + queueInputStream.signalEndOfStream(); + queueInputStream = null; + } + releaseErrorBody(); + bytesRead = 0; + responseStatus = null; + contentType = null; + } + + private void releaseErrorBody() { + if (errorBody != null) { + errorBody.release(); + errorBody = null; + } + } + + private boolean isGraphBinaryResponse() { + return !isError(responseStatus) || SerTokens.MIME_GRAPHBINARY_V4.equals(contentType); + } + + private static boolean isError(final HttpResponseStatus status) { + return status != HttpResponseStatus.OK; + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBuffer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBuffer.java new file mode 100644 index 00000000000..ebb58792f77 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBuffer.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import org.apache.tinkerpop.gremlin.structure.io.Buffer; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * A read-only {@link Buffer} implementation backed by a blocking {@link InputStream} via {@link DataInputStream}. + * Supports only sequential read operations — all write, random-access, and NIO methods throw + * {@link UnsupportedOperationException}. + *

+ * This allows the existing {@code TypeSerializer} implementations (which only use sequential reads) to work + * unchanged over a streaming HTTP response body. + */ +public class InputStreamBuffer implements Buffer { + + private final DataInputStream in; + private int bytesRead; + + public InputStreamBuffer(final InputStream inputStream) { + this.in = new DataInputStream(inputStream); + } + + @Override + public boolean readBoolean() { + try { + final boolean v = in.readBoolean(); + bytesRead += 1; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public byte readByte() { + try { + final byte v = in.readByte(); + bytesRead += 1; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public short readShort() { + try { + final short v = in.readShort(); + bytesRead += 2; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public int readInt() { + try { + final int v = in.readInt(); + bytesRead += 4; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public long readLong() { + try { + final long v = in.readLong(); + bytesRead += 8; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public float readFloat() { + try { + final float v = in.readFloat(); + bytesRead += 4; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public double readDouble() { + try { + final double v = in.readDouble(); + bytesRead += 8; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Buffer readBytes(final byte[] destination) { + try { + in.readFully(destination); + bytesRead += destination.length; + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Buffer readBytes(final byte[] destination, final int dstIndex, final int length) { + try { + in.readFully(destination, dstIndex, length); + bytesRead += length; + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Buffer readBytes(final ByteBuffer dst) { + try { + final byte[] tmp = new byte[dst.remaining()]; + in.readFully(tmp); + dst.put(tmp); + bytesRead += tmp.length; + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Buffer readBytes(final OutputStream out, final int length) throws IOException { + final byte[] tmp = new byte[length]; + in.readFully(tmp); + out.write(tmp); + bytesRead += length; + return this; + } + + @Override + public int readerIndex() { + return bytesRead; + } + + @Override + public int readableBytes() { + throw new UnsupportedOperationException("readableBytes() is not supported on a streaming Buffer"); + } + + @Override + public Buffer readerIndex(final int readerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public int writerIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writerIndex(final int writerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer markWriterIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer resetWriterIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public int capacity() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDirect() { + return false; + } + + @Override + public Buffer writeBoolean(final boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeByte(final int value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeShort(final int value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeInt(final int value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeLong(final long value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeFloat(final float value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeDouble(final double value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeBytes(final byte[] src) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeBytes(final ByteBuffer src) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeBytes(final byte[] src, final int srcIndex, final int length) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release() { + try { + in.close(); + } catch (IOException e) { + // best-effort close + } + return true; + } + + @Override + public Buffer retain() { + throw new UnsupportedOperationException(); + } + + @Override + public int referenceCount() { + throw new UnsupportedOperationException(); + } + + @Override + public int nioBufferCount() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers(final int index, final int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer(final int index, final int length) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer getBytes(final int index, final byte[] dst) { + throw new UnsupportedOperationException(); + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java new file mode 100644 index 00000000000..4ca161fdfc6 --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class ByteBufQueueInputStreamTest { + + @Test + public void shouldReadSingleByteBuf() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + final ByteBuf buf = Unpooled.buffer(); + buf.writeBytes(new byte[]{1, 2, 3, 4}); + stream.offer(buf); + stream.signalEndOfStream(); + + assertEquals(1, stream.read()); + assertEquals(2, stream.read()); + assertEquals(3, stream.read()); + assertEquals(4, stream.read()); + assertEquals(-1, stream.read()); + } + + @Test + public void shouldReadAcrossMultipleByteBufs() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(Unpooled.wrappedBuffer(new byte[]{1, 2})); + stream.offer(Unpooled.wrappedBuffer(new byte[]{3, 4})); + stream.signalEndOfStream(); + + final byte[] result = new byte[8]; + int totalRead = 0; + int read; + while ((read = stream.read(result, totalRead, result.length - totalRead)) != -1) { + totalRead += read; + } + assertEquals(4, totalRead); + assertArrayEquals(new byte[]{1, 2, 3, 4}, java.util.Arrays.copyOf(result, totalRead)); + } + + @Test + public void shouldReleaseByteBufsAfterReading() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4); + buf.writeBytes(new byte[]{1, 2, 3, 4}); + assertEquals(1, buf.refCnt()); + + stream.offer(buf); + stream.signalEndOfStream(); + + final byte[] result = new byte[4]; + stream.read(result, 0, 4); + stream.read(); // triggers release of buf and reads EOS + + assertEquals(0, buf.refCnt()); + } + + @Test + public void shouldCleanUpOnClose() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + final ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(2); + buf1.writeBytes(new byte[]{1, 2}); + final ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(2); + buf2.writeBytes(new byte[]{3, 4}); + + stream.offer(buf1); + stream.offer(buf2); + stream.close(); + + assertEquals(0, buf1.refCnt()); + assertEquals(0, buf2.refCnt()); + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java new file mode 100644 index 00000000000..38c15e3fc0d --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class GraphBinaryStreamResponseReaderTest { + + private ExecutorService executor; + private GraphBinaryMessageSerializerV4 serializer; + private GraphBinaryReader reader; + + @Before + public void setup() { + executor = Executors.newCachedThreadPool(); + serializer = new GraphBinaryMessageSerializerV4(); + reader = serializer.getMapper().getReader(); + } + + @After + public void teardown() { + executor.shutdownNow(); + } + + @Test + public void shouldReadSingleItemResponse() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.singletonList("hello")).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List results = rs.all().get(); + assertEquals(1, results.size()); + assertEquals("hello", results.get(0).getString()); + assertNull(pending.get()); + } + + @Test + public void shouldReadMultipleItemResponse() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Arrays.asList(1, 2, 3)).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List results = rs.all().get(); + assertEquals(3, results.size()); + assertEquals(1, results.get(0).getInt()); + assertEquals(2, results.get(1).getInt()); + assertEquals(3, results.get(2).getInt()); + } + + @Test + public void shouldReadBulkedResponse() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Arrays.asList("a", 3L)).bulked(true).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List results = rs.all().get(); + assertEquals(1, results.size()); + assertTrue(results.get(0).getObject() instanceof DefaultRemoteTraverser); + final DefaultRemoteTraverser traverser = (DefaultRemoteTraverser) results.get(0).getObject(); + assertEquals("a", traverser.get()); + assertEquals(3L, traverser.bulk()); + } + + @Test + public void shouldHandleErrorFooter() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.INTERNAL_SERVER_ERROR) + .statusMessage("Something went wrong") + .exception("java.lang.RuntimeException") + .result(Collections.emptyList()).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + assertTrue(rs.allItemsAvailable()); + try { + rs.all().get(); + fail("Expected exception"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof ResponseException); + final ResponseException re = (ResponseException) e.getCause(); + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, re.getResponseStatusCode()); + assertEquals("Something went wrong", re.getMessage()); + } + assertNull(pending.get()); + } + + @Test + public void shouldReadDataSplitAcrossChunks() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + + // Use a large string (~1000 chars) so the split point must land inside it regardless of + // header/footer overhead. The header+footer are ~30 bytes; the string item is ~1000 bytes. + final String largeValue = String.join("", Collections.nCopies(100, "abcdefghij")); + final ByteBuf fullPayload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.singletonList(largeValue)).create(), + ByteBufAllocator.DEFAULT); + + final int splitPoint = fullPayload.readableBytes() / 2; + final ByteBuf chunk1 = fullPayload.readSlice(splitPoint).retain(); + final ByteBuf chunk2 = fullPayload.retain(); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(chunk1); + stream.offer(chunk2); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List results = rs.all().get(); + assertEquals(1, results.size()); + assertEquals(largeValue, results.get(0).getString()); + + fullPayload.release(); + } + + @Test + public void shouldReadEmptyResponse() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.emptyList()).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List results = rs.all().get(); + assertTrue(results.isEmpty()); + assertNull(pending.get()); + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java new file mode 100644 index 00000000000..e94af313edd --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class InputStreamBufferTest { + + @Test + public void shouldReadPrimitivesThroughInputStreamBuffer() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + final io.netty.buffer.ByteBuf buf = Unpooled.buffer(); + buf.writeByte(42); + buf.writeInt(12345); + buf.writeLong(9876543210L); + buf.writeFloat(3.14f); + buf.writeDouble(2.718281828); + buf.writeShort(256); + buf.writeBoolean(true); + stream.offer(buf); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + assertEquals(42, buffer.readByte()); + assertEquals(12345, buffer.readInt()); + assertEquals(9876543210L, buffer.readLong()); + assertEquals(3.14f, buffer.readFloat(), 0.001f); + assertEquals(2.718281828, buffer.readDouble(), 0.000001); + assertEquals(256, buffer.readShort()); + assertTrue(buffer.readBoolean()); + } + + @Test + public void shouldReadBytesArray() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(Unpooled.wrappedBuffer(new byte[]{10, 20, 30})); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + final byte[] dest = new byte[3]; + buffer.readBytes(dest); + assertArrayEquals(new byte[]{10, 20, 30}, dest); + } + + @Test + public void shouldTrackReaderIndex() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(Unpooled.wrappedBuffer(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9})); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + assertEquals(0, buffer.readerIndex()); + buffer.readByte(); + assertEquals(1, buffer.readerIndex()); + buffer.readInt(); + assertEquals(5, buffer.readerIndex()); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowOnReadableBytes() { + new InputStreamBuffer(new ByteBufQueueInputStream()).readableBytes(); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowOnWriteInt() { + new InputStreamBuffer(new ByteBufQueueInputStream()).writeInt(1); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowOnNioBuffer() { + new InputStreamBuffer(new ByteBufQueueInputStream()).nioBuffer(); + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java new file mode 100644 index 00000000000..726331f4cbb --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; +import static org.junit.Assert.*; + +/** + * Integration tests for streaming HTTP response support in the Java driver. + * Verifies that the streaming pipeline (HttpStreamingResponseHandler + GraphBinaryStreamResponseReader) + * works correctly end-to-end against a real Gremlin Server. + */ +public class StreamingResponseIntegrateTest extends AbstractGremlinServerIntegrationTest { + + @Override + public Settings overrideSettings(final Settings settings) { + settings.channelizer = HttpChannelizer.class.getName(); + return settings; + } + + @Test + public void shouldStreamBasicResults() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + final List results = client.submit("g.inject(1,2,3)").all().get(); + assertEquals(3, results.size()); + assertEquals(1, results.get(0).getInt()); + assertEquals(2, results.get(1).getInt()); + assertEquals(3, results.get(2).getInt()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamIncrementallyWithIterator() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + final ResultSet rs = client.submit("g.inject(1,2,3,4,5)"); + + // Consume results one at a time via iterator + final Iterator iter = rs.iterator(); + final List collected = new ArrayList<>(); + while (iter.hasNext()) { + collected.add(iter.next().getInt()); + } + assertEquals(5, collected.size()); + for (int i = 0; i < 5; i++) { + assertEquals(i + 1, (int) collected.get(i)); + } + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamIncrementallyWithOne() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + final ResultSet rs = client.submit("g.inject(10,20,30)"); + + assertEquals(10, rs.one().getInt()); + assertEquals(20, rs.one().getInt()); + assertEquals(30, rs.one().getInt()); + assertNull(rs.one()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamLargeResultSet() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + // Generate a large result set that would span multiple HTTP chunks + final List results = client.submit( + "g.inject(1).repeat(__.identity()).times(1000).emit()").all().get(); + assertEquals(1000, results.size()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamEmptyResponse() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + final List results = client.submit("g.V().hasLabel('nonexistent')").all().get(); + assertTrue(results.isEmpty()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldHandleServerErrorDuringStreaming() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + client.submit("invalid_script_that_should_fail").all().get(); + fail("Expected exception"); + } catch (ExecutionException e) { + // Error should propagate correctly through the streaming pipeline + assertTrue(e.getCause() instanceof ResponseException); + final ResponseException re = (ResponseException) e.getCause(); + assertEquals(HttpResponseStatus.BAD_REQUEST, re.getResponseStatusCode()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldReuseConnectionAfterStreamingComplete() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + + // First request + final List results1 = client.submit("g.inject(1)").all().get(); + assertEquals(1, results1.size()); + + // Second request on same client (should reuse connection) + final List results2 = client.submit("g.inject(2)").all().get(); + assertEquals(1, results2.size()); + assertEquals(2, results2.get(0).getInt()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldHandleConcurrentStreamingRequests() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + + final CompletableFuture> f1 = client.submit("g.inject(1,2,3)").all(); + final CompletableFuture> f2 = client.submit("g.inject(4,5,6)").all(); + final CompletableFuture> f3 = client.submit("g.inject(7,8,9)").all(); + + assertEquals(3, f1.get().size()); + assertEquals(3, f2.get().size()); + assertEquals(3, f3.get().size()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamWithTraversalApi() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final GraphTraversalSource g = traversal().with( + DriverRemoteConnection.using(cluster)); + + final List results = g.inject(1, 2, 3).toList(); + assertEquals(3, results.size()); + assertTrue(results.contains(1)); + assertTrue(results.contains(2)); + assertTrue(results.contains(3)); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamVerticesFromGraph() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + + // Use inject to create data rather than relying on pre-loaded graph + final List results = client.submit("g.inject(1,2,3,4,5,6)").all().get(); + assertEquals(6, results.size()); + } finally { + cluster.close(); + } + } +} From 636c020af9599441d103fe7fa716dedd73115b9e Mon Sep 17 00:00:00 2001 From: Cole-Greer Date: Tue, 5 May 2026 17:24:52 -0700 Subject: [PATCH 02/11] fix tests --- .../gremlin/driver/handler/HttpStreamingResponseHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java index 8d554684eca..6177887fef7 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -128,7 +128,7 @@ protected void decode(final ChannelHandlerContext ctx, final DefaultHttpObject m if (maxResponseContentLength > 0 && bytesRead > maxResponseContentLength) { // Don't signal here — exceptionCaught will handle cleanup - throw new TooLongFrameException("Response exceeded " + maxResponseContentLength + " bytes."); + throw new TooLongFrameException("Response entity too large"); } if (!isGraphBinaryResponse()) { From ab3ab5d6b1382fae34c92b49d32c316bb96d31ce Mon Sep 17 00:00:00 2001 From: Cole-Greer Date: Wed, 6 May 2026 14:49:32 -0700 Subject: [PATCH 03/11] Fix connection corruption from double-LastHttpContent in streaming pipeline Server: Guard sendLastHttpContent with state check so it is skipped when writeError() in makeChunk() already terminated the response. Without this, a serialization error mid-stream sends two LastHttpContent messages, corrupting HTTP framing on keep-alive connections. Client: Null out queueInputStream after signalEndOfStream on LastHttpContent so spurious content between responses is dropped rather than offered to the closed stream. --- .../driver/handler/HttpStreamingResponseHandler.java | 3 +++ .../gremlin/server/handler/HttpGremlinEndpointHandler.java | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java index 6177887fef7..3db50fd74f1 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -150,6 +150,9 @@ protected void decode(final ChannelHandlerContext ctx, final DefaultHttpObject m if (isGraphBinaryResponse()) { if (queueInputStream != null) { queueInputStream.signalEndOfStream(); + // Null out so any spurious content arriving between responses is dropped + // rather than offered to the already-closed stream. + queueInputStream = null; } // Reader thread handles completion directly via markComplete/markError } else { diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index 7c474d6ba0c..bfdf41d66b4 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -275,7 +275,11 @@ public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage r // failures that follow this will show up in the response body instead. sendHttpResponse(ctx, OK, createResponseHeaders(ctx, serializer, requestCtx).toArray(CharSequence[]::new)); sendHttpContents(ctx, requestCtx); - sendLastHttpContent(ctx, HttpResponseStatus.OK, ""); + // Skip if writeError() already terminated the response (e.g., serialization error in makeChunk). + // Sending a second LastHttpContent would corrupt the HTTP framing on keep-alive connections. + if (requestCtx.getRequestState() != RequestState.ERROR) { + sendLastHttpContent(ctx, HttpResponseStatus.OK, ""); + } } catch (Throwable t) { writeError(requestCtx, formErrorResponseMessage(t, requestMessage), serializer.getValue1()); } finally { From 77128c549824ae28732a668e778c8a42bf7441fb Mon Sep 17 00:00:00 2001 From: Cole-Greer Date: Thu, 7 May 2026 09:24:06 -0700 Subject: [PATCH 04/11] Guard writeError() against double-write after response termination Prevent writeError() from sending data after the response is already in ERROR or FINISHED state. Without this guard, any code path that calls writeError() after the response is terminated sends a second LastHttpContent, corrupting HTTP framing on keep-alive connections and causing subsequent requests on the same connection to hang. Assisted-by: Kiro:claude-sonnet-4-20250514 --- .../tinkerpop/gremlin/server/handler/HttpHandlerUtil.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java index f0f6b652c09..4fa5ba84ec5 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java @@ -97,6 +97,13 @@ public static void sendError(final ChannelHandlerContext ctx, final HttpResponse * @param serializer The serializer to use to serialize the error response. */ static void writeError(final Context context, final ResponseMessage responseMessage, final MessageSerializer serializer) { + // Prevent writing after the response is already terminated. A second write would corrupt + // HTTP framing on keep-alive connections, poisoning them for subsequent requests. + if (context.getRequestState() == HttpGremlinEndpointHandler.RequestState.ERROR || + context.getRequestState() == HttpGremlinEndpointHandler.RequestState.FINISHED) { + return; + } + try { final ChannelHandlerContext ctx = context.getChannelHandlerContext(); final ByteBuf ByteBuf = context.getRequestState() == HttpGremlinEndpointHandler.RequestState.STREAMING From 5cbd3293f10598ec8c557517f0c0394ab134d7b4 Mon Sep 17 00:00:00 2001 From: Cole Greer Date: Thu, 7 May 2026 19:16:04 -0700 Subject: [PATCH 05/11] Fix serialization error handling and connection return race in streaming pipeline 1. HttpGremlinEndpointHandler.makeChunk(): Defer setRequestState(FINISHED) until after serialization succeeds. Previously, serialization failure left the state as FINISHED causing writeError() to bail out, resulting in empty responses and client-side EOFExceptions. 2. Connection.write(): Change whenCompleteAsync to whenComplete so returnToPool() runs synchronously when readCompleted fires. This prevents a race where the caller submits the next request before the connection is returned to the pool, causing connection starvation and idle timeout hangs. Assisted-by: Kiro:claude-sonnet-4-20250514 --- .../org/apache/tinkerpop/gremlin/driver/Connection.java | 4 ++-- .../server/handler/HttpGremlinEndpointHandler.java | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index cb94b39b875..9d16fd75680 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -201,7 +201,7 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab } else { final ResultSet resultSet = new ResultSet(cluster.executor(), requestMessage, pool.host); - resultSet.getReadCompleted().whenCompleteAsync((v, t) -> { + resultSet.getReadCompleted().whenComplete((v, t) -> { if (t != null) { // the callback for when the read failed. a failed read means the request went to the server // and came back with a server-side error of some sort. it means the server is responsive @@ -219,7 +219,7 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab // the shutdown if the returned result cleared up the last pending message and unblocked // the close. tryShutdown(); - }, cluster.executor()); + }); pending.set(resultSet); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index bfdf41d66b4..f0cb2b9ea2c 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -757,19 +757,21 @@ private static ByteBuf makeChunk(final Context ctx, final MessageSerializer s ctx.setRequestState(STREAMING); return serializer.writeHeader(responseMessage, nettyContext.alloc()); } - ctx.setRequestState(FINISHED); - return serializer.serializeResponseAsBinary(ResponseMessage.build() + final ByteBuf fullResponse = serializer.serializeResponseAsBinary(ResponseMessage.build() .result(aggregate) .bulked(bulking) .code(HttpResponseStatus.OK) .create(), nettyContext.alloc()); + ctx.setRequestState(FINISHED); + return fullResponse; case STREAMING: return serializer.writeChunk(aggregate, nettyContext.alloc()); case FINISHING: + final ByteBuf footer = serializer.writeFooter(responseMessage, nettyContext.alloc()); ctx.setRequestState(FINISHED); - return serializer.writeFooter(responseMessage, nettyContext.alloc()); + return footer; } return serializer.serializeResponseAsBinary(responseMessage, nettyContext.alloc()); From a9bc059ad4ee2b0e5c659064a32f9d0f9dff0c72 Mon Sep 17 00:00:00 2001 From: Cole Greer Date: Fri, 8 May 2026 12:33:33 -0700 Subject: [PATCH 06/11] Fix channel-error recovery by checking isOpen() in addition to isDead() When a channel error occurs (e.g. TooLongFrameException), GremlinResponseHandler calls ctx.close() which initiates an async channel close. The whenComplete callback fires synchronously and checks isDead() (channel.isActive()), but isActive() may still return true because the close hasn't completed deregistration yet. Adding a channel.isOpen() check resolves this TOCTOU race because isOpen() returns false immediately when close() is called, before deregistration completes. Assisted-by: Kiro:claude-sonnet-4-20250514 --- .../java/org/apache/tinkerpop/gremlin/driver/Connection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 9d16fd75680..693f56666bd 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -244,7 +244,7 @@ private void returnToPool() { } private void handleConnectionCleanupOnError(final Connection thisConnection) { - if (thisConnection.isDead()) { + if (thisConnection.isDead() || (thisConnection.channel != null && !thisConnection.channel.isOpen())) { if (pool != null) pool.replaceConnection(thisConnection); } else { thisConnection.returnToPool(); From 918307bb4b091d358b40b3c88a4e3b69a920d195 Mon Sep 17 00:00:00 2001 From: Cole Greer Date: Sat, 9 May 2026 15:17:36 -0700 Subject: [PATCH 07/11] Decouple connection pool return from ResultSet completion in streaming pipeline The streaming reader thread calls markComplete() when all results are deserialized, but this could fire before the HTTP response's LastHttpContent is processed by the codec. Reusing the connection before HTTP framing is complete caused the codec to silently drop subsequent responses. Changes: - Move connection return (returnToPool) from the whenComplete callback to GremlinResponseHandler, triggered by LAST_CONTENT_READ_RESPONSE. This ensures the connection is only reused after HTTP framing is fully complete. - Change HttpStreamingResponseHandler type parameter from DefaultHttpObject to HttpObject. Netty's LastHttpContent.EMPTY_LAST_CONTENT does not extend DefaultHttpObject, causing it to bypass decode() and preventing LAST_CONTENT_READ_RESPONSE from being emitted. - Add streaming flag to GremlinResponseHandler to skip markComplete() for streaming responses (the reader thread owns ResultSet completion). - Remove returnToPool() from Connection.write() whenComplete success path; it is now handled by the pipeline on LastHttpContent for both streaming and non-streaming paths. Assisted-by: Kiro:claude-sonnet-4-20250514 --- .../tinkerpop/gremlin/driver/Channelizer.java | 6 +++++- .../tinkerpop/gremlin/driver/Connection.java | 8 ++----- .../handler/GremlinResponseHandler.java | 21 ++++++++++++------- .../handler/HttpStreamingResponseHandler.java | 8 +++---- 4 files changed, 25 insertions(+), 18 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 940bd460973..59b9d02365c 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -162,7 +162,6 @@ protected void initChannel(final SocketChannel socketChannel) { } configure(pipeline); - pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending)); } @Override @@ -269,6 +268,11 @@ public void configure(final ChannelPipeline pipeline) { pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder); } + + pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending, () -> { + connection.returnToPool(); + connection.tryShutdown(); + }, useStreaming)); } } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 693f56666bd..25975a40141 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -209,10 +209,6 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab // write operation. logger.debug("Error while processing request on the server {}.", this, t); handleConnectionCleanupOnError(thisConnection); - } else { - // the callback for when the read was successful, meaning that ResultSet.markComplete() - // was called - thisConnection.returnToPool(); } // While this request was in process, close might have been signaled in closeAsync(). // However, close would be blocked until all pending requests are completed. Attempt @@ -234,7 +230,7 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab return requestPromise; } - private void returnToPool() { + void returnToPool() { try { if (pool != null) pool.returnConnection(this); } catch (ConnectionException ce) { @@ -259,7 +255,7 @@ private boolean isOkToClose() { * Close was signaled in closeAsync() but there were pending messages at that time. This method attempts the * shutdown if the returned result cleared up the last pending message. */ - private void tryShutdown() { + void tryShutdown() { if (isClosing() && isOkToClose()) shutdown(closeFuture.get()); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java index d4cfb167d22..94898783eb4 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java @@ -49,9 +49,13 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler CAUGHT_EXCEPTION = AttributeKey.valueOf("caughtException"); private final AtomicReference pendingResultSet; + private final Runnable onResponseComplete; + private final boolean streaming; - public GremlinResponseHandler(final AtomicReference pending) { + public GremlinResponseHandler(final AtomicReference pending, final Runnable onResponseComplete, final boolean streaming) { this.pendingResultSet = pending; + this.onResponseComplete = onResponseComplete; + this.streaming = streaming; } @Override @@ -100,14 +104,17 @@ protected void channelRead0(final ChannelHandlerContext channelHandlerContext, f // Stream is done when the last content signaling response message is read. if (LAST_CONTENT_READ_RESPONSE == response) { - final ResultSet rs = pendingResultSet.getAndSet(null); - if (rs != null) { - if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) { - rs.markComplete(); - } else { - rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); + if (!streaming) { + final ResultSet rs = pendingResultSet.getAndSet(null); + if (rs != null) { + if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) { + rs.markComplete(); + } else { + rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); + } } } + onResponseComplete.run(); } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java index 3db50fd74f1..635b9ac2880 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -23,7 +23,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.http.DefaultHttpObject; +import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpResponse; @@ -57,7 +57,7 @@ * (e.g., JSON 401/500), the error body is accumulated across chunks and parsed on {@link LastHttpContent}, then * {@code LAST_CONTENT_READ_RESPONSE} is fired through the pipeline for {@link GremlinResponseHandler}. */ -public class HttpStreamingResponseHandler extends MessageToMessageDecoder { +public class HttpStreamingResponseHandler extends MessageToMessageDecoder { private static final Logger logger = LoggerFactory.getLogger(HttpStreamingResponseHandler.class); private static final ObjectMapper mapper = new ObjectMapper(); @@ -85,7 +85,7 @@ public HttpStreamingResponseHandler(final GraphBinaryReader graphBinaryReader, } @Override - protected void decode(final ChannelHandlerContext ctx, final DefaultHttpObject msg, + protected void decode(final ChannelHandlerContext ctx, final HttpObject msg, final List out) throws Exception { if (msg instanceof HttpResponse) { final HttpResponse resp = (HttpResponse) msg; @@ -154,7 +154,7 @@ protected void decode(final ChannelHandlerContext ctx, final DefaultHttpObject m // rather than offered to the already-closed stream. queueInputStream = null; } - // Reader thread handles completion directly via markComplete/markError + out.add(LAST_CONTENT_READ_RESPONSE); } else { // Non-GraphBinary error — parse accumulated body and fire sentinel handleNonGraphBinaryError(); From ad33d4207b57637edf2594f03ef5aa424eea5fab Mon Sep 17 00:00:00 2001 From: Cole Greer Date: Mon, 11 May 2026 15:15:12 -0700 Subject: [PATCH 08/11] Harden streaming pipeline: fix races, improve robustness, add tests - Make Connection.returnToPool() idempotent via AtomicBoolean guard to prevent double-return corrupting the connection pool in error paths - Change streamingReaderPool max to scale with cluster topology (maxConnectionPoolSize * contactPoints * 4) instead of per-host limit - Catch Throwable (not just Exception) in GraphBinaryStreamResponseReader to prevent consumer hangs on Error subclasses (OOM, StackOverflow) - Add 30s poll timeout to ByteBufQueueInputStream to prevent reader threads from blocking indefinitely if end-of-stream is never signaled - Set BYTES_READ attribute on first content (not headers) to preserve idle timeout error messaging when server sends headers before execution - Clear pendingResultSet in non-GraphBinary error path to prevent stale state blocking graceful shutdown - Rename isGraphBinaryResponse() to shouldStreamResponse() for clarity - Expand upgrade documentation with user-visible behavior details - Add HttpStreamingResponseHandlerTest covering happy path, double LastHttpContent, max content length, channelInactive, and error cases - Add error-then-reuse integration test for connection recovery --- docs/src/upgrade/release-4.x.x.asciidoc | 8 +- .../tinkerpop/gremlin/driver/Cluster.java | 2 +- .../tinkerpop/gremlin/driver/Connection.java | 9 + .../gremlin/driver/ConnectionPool.java | 1 + .../handler/ByteBufQueueInputStream.java | 7 +- .../GraphBinaryStreamResponseReader.java | 6 +- .../handler/HttpStreamingResponseHandler.java | 13 +- .../HttpStreamingResponseHandlerTest.java | 207 ++++++++++++++++++ .../StreamingResponseIntegrateTest.java | 22 ++ 9 files changed, 262 insertions(+), 13 deletions(-) create mode 100644 gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java diff --git a/docs/src/upgrade/release-4.x.x.asciidoc b/docs/src/upgrade/release-4.x.x.asciidoc index c3ddadf3344..2ebd34fff25 100644 --- a/docs/src/upgrade/release-4.x.x.asciidoc +++ b/docs/src/upgrade/release-4.x.x.asciidoc @@ -273,9 +273,11 @@ block on the async stream internally. ==== Streaming Response Deserialization in gremlin-driver The Java driver now deserializes HTTP responses incrementally, delivering results to the `ResultSet` as they arrive -rather than buffering the entire response. This is only supported with the default GraphBinary serializer; custom -`MessageSerializer` implementations will use a non-streaming pipeline that buffers the full response before -deserialization. +rather than buffering the entire response. This reduces time-to-first-result for large result sets. + +This change is automatic and requires no code changes. It applies only when using the default GraphBinary serializer. +Custom `MessageSerializer` implementations fall back to the non-streaming pipeline that buffers the full response +before deserialization. The `ResultSet` API is unchanged. ==== More Secure Gremlin Server diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index cd9a225a2d0..d4a3967f029 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -1036,7 +1036,7 @@ private Manager(final Builder builder) { this.connectionScheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build()); - this.streamingReaderPool = new ThreadPoolExecutor(0, builder.maxConnectionPoolSize, + this.streamingReaderPool = new ThreadPoolExecutor(0, builder.maxConnectionPoolSize * Math.max(contactPoints.size(), 1) * 4, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new BasicThreadFactory.Builder().namingPattern("gremlin-driver-stream-reader-%d").build()); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 25975a40141..e941dffeaf2 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -66,6 +66,14 @@ final class Connection { * Is a {@code Connection} borrowed from the pool. */ private final AtomicBoolean isBorrowed = new AtomicBoolean(false); + /** + * Guards returnToPool() to ensure it is idempotent. + */ + private final AtomicBoolean returned = new AtomicBoolean(false); + + void resetReturned() { + returned.set(false); + } /** * This boolean guards the replace of the connection and ensures that it only occurs once. */ @@ -231,6 +239,7 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab } void returnToPool() { + if (!returned.compareAndSet(false, true)) return; try { if (pool != null) pool.returnConnection(this); } catch (ConnectionException ce) { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java index ee4c34c5aa2..4f40b6733a9 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java @@ -525,6 +525,7 @@ private Connection getAvailableConnection() { while (head != null) { // try to borrow connection if (!head.isDead() && !head.isBorrowed().get() && head.isBorrowed().compareAndSet(false, true)) { + head.resetReturned(); available = head; break; } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStream.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStream.java index 9ae0dad2a6f..a7e986a3e8f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStream.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStream.java @@ -25,6 +25,7 @@ import java.io.InputStream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * An {@link InputStream} backed by a {@link BlockingQueue} of {@link ByteBuf} objects. The Netty event loop @@ -72,11 +73,12 @@ public int read() throws IOException { while (current == null || !current.isReadable()) { releaseCurrent(); try { - current = queue.take(); + current = queue.poll(30, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted while waiting for data", e); } + if (current == null) throw new IOException("Timed out waiting for streaming response data"); if (current == END_OF_STREAM) { eof = true; current = null; @@ -95,11 +97,12 @@ public int read(final byte[] b, final int off, final int len) throws IOException while (current == null || !current.isReadable()) { releaseCurrent(); try { - current = queue.take(); + current = queue.poll(30, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted while waiting for data", e); } + if (current == null) throw new IOException("Timed out waiting for streaming response data"); if (current == END_OF_STREAM) { eof = true; current = null; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReader.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReader.java index 2dd80d2779c..0f28c989ff8 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReader.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReader.java @@ -94,9 +94,9 @@ public void run() { } else { resultSet.markError(new ResponseException(HttpResponseStatus.valueOf(statusCode), message, exception)); } - } catch (Exception e) { - logger.warn("Error reading streaming response", e); - resultSet.markError(e); + } catch (Throwable t) { + logger.warn("Error reading streaming response", t); + resultSet.markError(t); } finally { pendingResultSet.compareAndSet(resultSet, null); buffer.release(); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java index 635b9ac2880..c4faefc93b8 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -99,7 +99,7 @@ protected void decode(final ChannelHandlerContext ctx, final HttpObject msg, queueInputStream = new ByteBufQueueInputStream(); // Spawn reader thread for GraphBinary responses - if (isGraphBinaryResponse()) { + if (shouldStreamResponse()) { final ResultSet rs = pendingResultSet.get(); if (rs != null) { final InputStreamBuffer buffer = new InputStreamBuffer(queueInputStream); @@ -126,12 +126,16 @@ protected void decode(final ChannelHandlerContext ctx, final HttpObject msg, final ByteBuf content = ((HttpContent) msg).content(); bytesRead += content.readableBytes(); + if (bytesRead > 0 && ctx.channel().attr(InactiveChannelHandler.BYTES_READ).get() == null) { + ctx.channel().attr(InactiveChannelHandler.BYTES_READ).set(0); + } + if (maxResponseContentLength > 0 && bytesRead > maxResponseContentLength) { // Don't signal here — exceptionCaught will handle cleanup throw new TooLongFrameException("Response entity too large"); } - if (!isGraphBinaryResponse()) { + if (!shouldStreamResponse()) { // Accumulate non-GraphBinary error body across chunks if (content.readableBytes() > 0) { if (errorBody == null) { @@ -147,7 +151,7 @@ protected void decode(final ChannelHandlerContext ctx, final HttpObject msg, } if (msg instanceof LastHttpContent) { - if (isGraphBinaryResponse()) { + if (shouldStreamResponse()) { if (queueInputStream != null) { queueInputStream.signalEndOfStream(); // Null out so any spurious content arriving between responses is dropped @@ -198,6 +202,7 @@ private void handleNonGraphBinaryError() { logger.debug("Failed to parse error response body as JSON", e); rs.markError(new ResponseException(responseStatus, responseStatus.reasonPhrase())); } finally { + pendingResultSet.compareAndSet(rs, null); releaseErrorBody(); } } @@ -221,7 +226,7 @@ private void releaseErrorBody() { } } - private boolean isGraphBinaryResponse() { + private boolean shouldStreamResponse() { return !isError(responseStatus) || SerTokens.MIME_GRAPHBINARY_V4.equals(contentType); } diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java new file mode 100644 index 00000000000..fdd37818a44 --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.CharsetUtil; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.apache.tinkerpop.gremlin.util.ser.SerTokens; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; +import static org.junit.Assert.*; + +public class HttpStreamingResponseHandlerTest { + + private ExecutorService executor; + private GraphBinaryMessageSerializerV4 serializer; + private GraphBinaryReader reader; + + @Before + public void setup() { + executor = Executors.newSingleThreadExecutor(); + serializer = new GraphBinaryMessageSerializerV4(); + reader = serializer.getMapper().getReader(); + } + + @After + public void teardown() { + executor.shutdownNow(); + } + + private EmbeddedChannel createChannel(final AtomicReference pendingResultSet, final long maxResponseContentLength) { + final HttpStreamingResponseHandler handler = new HttpStreamingResponseHandler( + reader, pendingResultSet, executor, maxResponseContentLength); + return new EmbeddedChannel(handler); + } + + @Test + public void shouldEmitLastContentReadResponseOnHappyPath() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 0); + + // Serialize a valid GraphBinary response + final byte[] payload = toBytes(serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.singletonList(1)).create(), + channel.alloc())); + + // Send HttpResponse with GraphBinary content type + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + + // Send content + channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(payload))); + + // Send LastHttpContent + channel.writeInbound(new DefaultLastHttpContent()); + + // Verify LAST_CONTENT_READ_RESPONSE is emitted + final Object out = channel.readInbound(); + assertSame(LAST_CONTENT_READ_RESPONSE, out); + + channel.finishAndReleaseAll(); + } + + @Test + public void shouldHandleDoubleLastHttpContentWithoutError() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 0); + + final byte[] payload = toBytes(serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.singletonList(1)).create(), + channel.alloc())); + + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(payload))); + channel.writeInbound(new DefaultLastHttpContent()); + + // Send a second LastHttpContent — should not throw NPE + channel.writeInbound(new DefaultLastHttpContent()); + + channel.finishAndReleaseAll(); + } + + @Test + public void shouldThrowTooLongFrameExceptionWhenMaxLengthExceeded() { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 10); + + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + + try { + // Send content exceeding the 10-byte limit + channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[20]))); + fail("Expected TooLongFrameException"); + } catch (Exception e) { + assertTrue(e instanceof TooLongFrameException); + } + + channel.finishAndReleaseAll(); + } + + @Test + public void shouldSignalQueueInputStreamOnChannelInactive() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 0); + + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + + // Send some content but no LastHttpContent + channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[]{1, 2, 3}))); + + // Fire channelInactive — should not throw and should signal the stream + channel.pipeline().fireChannelInactive(); + + channel.finishAndReleaseAll(); + } + + @Test + public void shouldMarkErrorOnResultSetForNonGraphBinaryError() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 0); + + // Send a 500 response with JSON content type + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json"); + channel.writeInbound(response); + + // Send JSON error body + final String errorJson = "{\"message\":\"test error\"}"; + channel.writeInbound(new DefaultLastHttpContent(Unpooled.copiedBuffer(errorJson, CharsetUtil.UTF_8))); + + // Verify error is marked on the ResultSet + try { + rs.all().get(); + fail("Expected exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof ResponseException); + final ResponseException re = (ResponseException) e.getCause(); + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, re.getResponseStatusCode()); + assertEquals("test error", re.getMessage()); + } + + // Verify pendingResultSet was cleared + assertNull(pending.get()); + + channel.finishAndReleaseAll(); + } + + private byte[] toBytes(final io.netty.buffer.ByteBuf buf) { + final byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + buf.release(); + return bytes; + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java index 726331f4cbb..708a63e13fc 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java @@ -214,4 +214,26 @@ public void shouldStreamVerticesFromGraph() throws Exception { cluster.close(); } } + + @Test + public void shouldReuseConnectionAfterServerError() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + + // Submit a request that causes a server error + try { + client.submit("throw new RuntimeException('test error')").all().get(); + fail("Should have thrown"); + } catch (ExecutionException e) { + // expected + } + + // Connection should still be usable + final List results = client.submit("g.inject(1)").all().get(); + assertEquals(1, results.get(0).getInt()); + } finally { + cluster.close(); + } + } } From 1296ed498546cc15ce6b6c9b60b9ec9e99dd66ee Mon Sep 17 00:00:00 2001 From: Cole Greer Date: Mon, 11 May 2026 15:57:39 -0700 Subject: [PATCH 09/11] cleanup --- .../tinkerpop/gremlin/driver/Connection.java | 2 +- .../handler/HttpStreamingResponseHandler.java | 27 ++++++++++--------- .../ByteBufQueueInputStream.java | 2 +- .../GraphBinaryStreamResponseReader.java | 2 +- .../InputStreamBuffer.java | 2 +- .../handler/ByteBufQueueInputStreamTest.java | 1 + .../GraphBinaryStreamResponseReaderTest.java | 3 +++ .../driver/handler/InputStreamBufferTest.java | 2 ++ 8 files changed, 24 insertions(+), 17 deletions(-) rename gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/{handler => stream}/ByteBufQueueInputStream.java (98%) rename gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/{handler => stream}/GraphBinaryStreamResponseReader.java (98%) rename gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/{handler => stream}/InputStreamBuffer.java (99%) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index e941dffeaf2..12a1de646c8 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -67,7 +67,7 @@ final class Connection { */ private final AtomicBoolean isBorrowed = new AtomicBoolean(false); /** - * Guards returnToPool() to ensure it is idempotent. + * Prevents returnToPool() from being called more than once per borrow cycle. */ private final AtomicBoolean returned = new AtomicBoolean(false); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java index c4faefc93b8..1c768c7da4b 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -32,6 +32,9 @@ import io.netty.util.CharsetUtil; import org.apache.tinkerpop.gremlin.driver.ResultSet; import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; +import org.apache.tinkerpop.gremlin.driver.stream.GraphBinaryStreamResponseReader; +import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer; import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; import org.apache.tinkerpop.gremlin.util.ser.SerTokens; import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; @@ -47,15 +50,13 @@ import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; /** - * A Netty handler that receives raw HTTP response chunks and feeds them to a streaming GraphBinary reader - * running on a dedicated thread. Replaces {@code HttpObjectAggregator} + {@code HttpGremlinResponseDecoder} - * in the production pipeline. + * Decodes chunked HTTP responses into streaming results without buffering the full response body. *

- * For GraphBinary responses, HTTP content chunks are fed to a {@link ByteBufQueueInputStream} which is consumed - * by a {@link GraphBinaryStreamResponseReader} on a reader thread. The reader thread handles result delivery, - * {@code ResultSet} completion, and {@code pendingResultSet} cleanup directly. For non-GraphBinary error responses - * (e.g., JSON 401/500), the error body is accumulated across chunks and parsed on {@link LastHttpContent}, then - * {@code LAST_CONTENT_READ_RESPONSE} is fired through the pipeline for {@link GremlinResponseHandler}. + * For GraphBinary responses, content chunks are passed to a {@link ByteBufQueueInputStream} consumed by a + * {@link GraphBinaryStreamResponseReader} on a separate thread. That reader deserializes results incrementally, + * delivers them to the {@code ResultSet}, and handles completion and cleanup. For non-GraphBinary error responses + * (e.g., JSON 401/500), the error body is accumulated and parsed when the response ends, then + * {@code LAST_CONTENT_READ_RESPONSE} is fired for {@link GremlinResponseHandler} to process. */ public class HttpStreamingResponseHandler extends MessageToMessageDecoder { @@ -99,7 +100,7 @@ protected void decode(final ChannelHandlerContext ctx, final HttpObject msg, queueInputStream = new ByteBufQueueInputStream(); // Spawn reader thread for GraphBinary responses - if (shouldStreamResponse()) { + if (isGraphBinaryResponse()) { final ResultSet rs = pendingResultSet.get(); if (rs != null) { final InputStreamBuffer buffer = new InputStreamBuffer(queueInputStream); @@ -132,10 +133,10 @@ protected void decode(final ChannelHandlerContext ctx, final HttpObject msg, if (maxResponseContentLength > 0 && bytesRead > maxResponseContentLength) { // Don't signal here — exceptionCaught will handle cleanup - throw new TooLongFrameException("Response entity too large"); + throw new TooLongFrameException("Response exceeded " + maxResponseContentLength + " bytes."); } - if (!shouldStreamResponse()) { + if (!isGraphBinaryResponse()) { // Accumulate non-GraphBinary error body across chunks if (content.readableBytes() > 0) { if (errorBody == null) { @@ -151,7 +152,7 @@ protected void decode(final ChannelHandlerContext ctx, final HttpObject msg, } if (msg instanceof LastHttpContent) { - if (shouldStreamResponse()) { + if (isGraphBinaryResponse()) { if (queueInputStream != null) { queueInputStream.signalEndOfStream(); // Null out so any spurious content arriving between responses is dropped @@ -226,7 +227,7 @@ private void releaseErrorBody() { } } - private boolean shouldStreamResponse() { + private boolean isGraphBinaryResponse() { return !isError(responseStatus) || SerTokens.MIME_GRAPHBINARY_V4.equals(contentType); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStream.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java similarity index 98% rename from gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStream.java rename to gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java index a7e986a3e8f..757b2821cb0 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStream.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.gremlin.driver.handler; +package org.apache.tinkerpop.gremlin.driver.stream; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReader.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java similarity index 98% rename from gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReader.java rename to gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java index 0f28c989ff8..11491903e8b 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReader.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.gremlin.driver.handler; +package org.apache.tinkerpop.gremlin.driver.stream; import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.tinkerpop.gremlin.driver.Result; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBuffer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java similarity index 99% rename from gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBuffer.java rename to gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java index ebb58792f77..4490975f23f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBuffer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.gremlin.driver.handler; +package org.apache.tinkerpop.gremlin.driver.stream; import org.apache.tinkerpop.gremlin.structure.io.Buffer; diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java index 4ca161fdfc6..91ba9cf28a4 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; +import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; import org.junit.Test; import static org.junit.Assert.*; diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java index 38c15e3fc0d..484ec7d5246 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java @@ -24,6 +24,9 @@ import org.apache.tinkerpop.gremlin.driver.Result; import org.apache.tinkerpop.gremlin.driver.ResultSet; import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; +import org.apache.tinkerpop.gremlin.driver.stream.GraphBinaryStreamResponseReader; +import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer; import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser; import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; import org.apache.tinkerpop.gremlin.util.message.RequestMessage; diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java index e94af313edd..02d029d66e1 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java @@ -19,6 +19,8 @@ package org.apache.tinkerpop.gremlin.driver.handler; import io.netty.buffer.Unpooled; +import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; +import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer; import org.junit.Test; import static org.junit.Assert.*; From ba1c20cde5df019594cb9127c3f5f8310146ffac Mon Sep 17 00:00:00 2001 From: Cole Greer Date: Mon, 11 May 2026 17:17:49 -0700 Subject: [PATCH 10/11] fix tests --- .../gremlin/driver/handler/HttpStreamingResponseHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java index 1c768c7da4b..e9d81b46b2c 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -133,7 +133,7 @@ protected void decode(final ChannelHandlerContext ctx, final HttpObject msg, if (maxResponseContentLength > 0 && bytesRead > maxResponseContentLength) { // Don't signal here — exceptionCaught will handle cleanup - throw new TooLongFrameException("Response exceeded " + maxResponseContentLength + " bytes."); + throw new TooLongFrameException("Response entity too large"); } if (!isGraphBinaryResponse()) { From db7d83787bc771f80354aaae81ceb5f87077cfb1 Mon Sep 17 00:00:00 2001 From: Cole Greer Date: Tue, 12 May 2026 09:46:14 -0700 Subject: [PATCH 11/11] fix tests --- .../driver/handler/HttpStreamingResponseHandler.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java index e9d81b46b2c..607fd6bdf94 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -132,7 +132,6 @@ protected void decode(final ChannelHandlerContext ctx, final HttpObject msg, } if (maxResponseContentLength > 0 && bytesRead > maxResponseContentLength) { - // Don't signal here — exceptionCaught will handle cleanup throw new TooLongFrameException("Response entity too large"); } @@ -180,6 +179,12 @@ public void channelInactive(final ChannelHandlerContext ctx) throws Exception { @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { + // Mark error before signaling end-of-stream so the reader thread can't race + // with an EOFException from the closed stream. + final ResultSet rs = pendingResultSet.getAndSet(null); + if (rs != null) { + rs.markError(cause); + } if (queueInputStream != null) { queueInputStream.signalEndOfStream(); }