diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index e3f3471a6f8..f76f6ce7909 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -27,6 +27,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Added Gremlator, a single page web application, that translates Gremlin into various programming languages like Javascript and Python. * Removed `uuid` dependency from `gremlin-javascript` in favor of the built-in `globalThis.crypto.randomUUID()`. +* Added streaming HTTP response support to `gremlin-driver` for incremental result deserialization over GraphBinary. * Connected HTTP streaming response deserialization to the traversal API in `gremlin-javascript`, enabling `next()` to return the first result without waiting for the full response. * Changed `Client.stream()` in `gremlin-javascript` to return an `AsyncGenerator` for direct incremental consumption. * Removed `readable-stream` dependency from `gremlin-javascript`. diff --git a/docs/src/upgrade/release-4.x.x.asciidoc b/docs/src/upgrade/release-4.x.x.asciidoc index f0216e0b8d1..2ebd34fff25 100644 --- a/docs/src/upgrade/release-4.x.x.asciidoc +++ b/docs/src/upgrade/release-4.x.x.asciidoc @@ -270,6 +270,15 @@ try { The traversal API is not affected — `Next()`, `ToList()`, etc. still throw `ResponseException` directly since they block on the async stream internally. +==== Streaming Response Deserialization in gremlin-driver + +The Java driver now deserializes HTTP responses incrementally, delivering results to the `ResultSet` as they arrive +rather than buffering the entire response. This reduces time-to-first-result for large result sets. + +This change is automatic and requires no code changes. It applies only when using the default GraphBinary serializer. +Custom `MessageSerializer` implementations fall back to the non-streaming pipeline that buffers the full response +before deserialization. The `ResultSet` API is unchanged. + ==== More Secure Gremlin Server Previous versions of Gremlin Server relied on a Gremlin-flavored Groovy `ScriptEngine` for basic server initialization, diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 427f76c0563..59b9d02365c 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -34,9 +34,13 @@ import org.apache.tinkerpop.gremlin.driver.handler.HttpContentDecompressionHandler; import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder; import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder; +import org.apache.tinkerpop.gremlin.driver.handler.HttpStreamingResponseHandler; import org.apache.tinkerpop.gremlin.driver.handler.IdleConnectionHandler; import org.apache.tinkerpop.gremlin.driver.handler.InactiveChannelHandler; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.apache.tinkerpop.gremlin.util.MessageSerializer; import java.util.Collections; import java.util.Optional; @@ -92,7 +96,7 @@ abstract class AbstractChannelizer extends ChannelInitializer imp protected Connection connection; protected Cluster cluster; protected SslHandler sslHandler; - private AtomicReference pending; + protected AtomicReference pending; protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler"; protected static final String PIPELINE_SSL_HANDLER = "gremlin-ssl-handler"; @@ -158,7 +162,6 @@ protected void initChannel(final SocketChannel socketChannel) { } configure(pipeline); - pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending)); } @Override @@ -187,7 +190,9 @@ final class HttpChannelizer extends AbstractChannelizer { ResponseMessage.build().code(HttpResponseStatus.NO_CONTENT).result(Collections.emptyList()).create(); private HttpGremlinRequestEncoder gremlinRequestEncoder; + private HttpStreamingResponseHandler streamingResponseHandler; private HttpGremlinResponseDecoder gremlinResponseDecoder; + private boolean useStreaming; private HttpContentDecompressionHandler httpCompressionDecoder; private IdleStateHandler idleStateHandler; @@ -200,7 +205,19 @@ public void init(final Connection connection) { httpCompressionDecoder = new HttpContentDecompressionHandler(); gremlinRequestEncoder = new HttpGremlinRequestEncoder(cluster.getSerializer(), cluster.getRequestInterceptors(), cluster.isUserAgentOnConnectEnabled(), cluster.isBulkResultsEnabled(), connection.getUri()); - gremlinResponseDecoder = new HttpGremlinResponseDecoder(cluster.getSerializer()); + + final MessageSerializer serializer = cluster.getSerializer(); + if (serializer instanceof GraphBinaryMessageSerializerV4) { + useStreaming = true; + final GraphBinaryReader graphBinaryReader = + ((GraphBinaryMessageSerializerV4) serializer).getMapper().getReader(); + streamingResponseHandler = new HttpStreamingResponseHandler( + graphBinaryReader, pending, cluster.streamingReaderPool(), cluster.getMaxResponseContentLength()); + } else { + useStreaming = false; + gremlinResponseDecoder = new HttpGremlinResponseDecoder(serializer); + } + if (cluster.getIdleConnectionTimeout() > 0) { final int idleConnectionTimeout = (int) (cluster.getIdleConnectionTimeout() / 1000); idleStateHandler = new IdleStateHandler(idleConnectionTimeout, idleConnectionTimeout, 0); @@ -240,11 +257,22 @@ public void configure(final ChannelPipeline pipeline) { DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS, false); pipeline.addLast(PIPELINE_HTTP_CODEC, handler); - pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0 - ? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE)); - pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); - pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); - pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder); + if (useStreaming) { + pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); + pipeline.addLast(PIPELINE_HTTP_DECODER, streamingResponseHandler); + pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); + } else { + pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0 + ? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE)); + pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); + pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); + pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder); + } + + pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending, () -> { + connection.returnToPool(); + connection.tryShutdown(); + }, useStreaming)); } } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index cac193814c0..d4a3967f029 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -70,8 +70,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -383,6 +386,10 @@ ScheduledExecutorService connectionScheduler() { return manager.connectionScheduler; } + ExecutorService streamingReaderPool() { + return manager.streamingReaderPool; + } + Settings.ConnectionPoolSettings connectionPoolSettings() { return manager.connectionPoolSettings; } @@ -956,6 +963,12 @@ class Manager { */ private final ScheduledThreadPoolExecutor connectionScheduler; + /** + * Cached thread pool for streaming response reader threads. One thread per active streaming response, + * bounded implicitly by the connection pool size. + */ + private final ExecutorService streamingReaderPool; + private final int nioPoolSize; private final int workerPoolSize; private final int port; @@ -1023,6 +1036,10 @@ private Manager(final Builder builder) { this.connectionScheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build()); + this.streamingReaderPool = new ThreadPoolExecutor(0, builder.maxConnectionPoolSize * Math.max(contactPoints.size(), 1) * 4, + 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), + new BasicThreadFactory.Builder().namingPattern("gremlin-driver-stream-reader-%d").build()); + validationRequest = () -> RequestMessage.build(builder.validationRequest); } @@ -1133,6 +1150,7 @@ synchronized CompletableFuture close() { executor.shutdown(); hostScheduler.shutdown(); connectionScheduler.shutdown(); + streamingReaderPool.shutdownNow(); closeIt.complete(null); }); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index cb94b39b875..12a1de646c8 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -66,6 +66,14 @@ final class Connection { * Is a {@code Connection} borrowed from the pool. */ private final AtomicBoolean isBorrowed = new AtomicBoolean(false); + /** + * Prevents returnToPool() from being called more than once per borrow cycle. + */ + private final AtomicBoolean returned = new AtomicBoolean(false); + + void resetReturned() { + returned.set(false); + } /** * This boolean guards the replace of the connection and ensures that it only occurs once. */ @@ -201,7 +209,7 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab } else { final ResultSet resultSet = new ResultSet(cluster.executor(), requestMessage, pool.host); - resultSet.getReadCompleted().whenCompleteAsync((v, t) -> { + resultSet.getReadCompleted().whenComplete((v, t) -> { if (t != null) { // the callback for when the read failed. a failed read means the request went to the server // and came back with a server-side error of some sort. it means the server is responsive @@ -209,17 +217,13 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab // write operation. logger.debug("Error while processing request on the server {}.", this, t); handleConnectionCleanupOnError(thisConnection); - } else { - // the callback for when the read was successful, meaning that ResultSet.markComplete() - // was called - thisConnection.returnToPool(); } // While this request was in process, close might have been signaled in closeAsync(). // However, close would be blocked until all pending requests are completed. Attempt // the shutdown if the returned result cleared up the last pending message and unblocked // the close. tryShutdown(); - }, cluster.executor()); + }); pending.set(resultSet); @@ -234,7 +238,8 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab return requestPromise; } - private void returnToPool() { + void returnToPool() { + if (!returned.compareAndSet(false, true)) return; try { if (pool != null) pool.returnConnection(this); } catch (ConnectionException ce) { @@ -244,7 +249,7 @@ private void returnToPool() { } private void handleConnectionCleanupOnError(final Connection thisConnection) { - if (thisConnection.isDead()) { + if (thisConnection.isDead() || (thisConnection.channel != null && !thisConnection.channel.isOpen())) { if (pool != null) pool.replaceConnection(thisConnection); } else { thisConnection.returnToPool(); @@ -259,7 +264,7 @@ private boolean isOkToClose() { * Close was signaled in closeAsync() but there were pending messages at that time. This method attempts the * shutdown if the returned result cleared up the last pending message. */ - private void tryShutdown() { + void tryShutdown() { if (isClosing() && isOkToClose()) shutdown(closeFuture.get()); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java index ee4c34c5aa2..4f40b6733a9 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java @@ -525,6 +525,7 @@ private Connection getAvailableConnection() { while (head != null) { // try to borrow connection if (!head.isDead() && !head.isBorrowed().get() && head.isBorrowed().compareAndSet(false, true)) { + head.resetReturned(); available = head; break; } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java index d4cfb167d22..94898783eb4 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java @@ -49,9 +49,13 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler CAUGHT_EXCEPTION = AttributeKey.valueOf("caughtException"); private final AtomicReference pendingResultSet; + private final Runnable onResponseComplete; + private final boolean streaming; - public GremlinResponseHandler(final AtomicReference pending) { + public GremlinResponseHandler(final AtomicReference pending, final Runnable onResponseComplete, final boolean streaming) { this.pendingResultSet = pending; + this.onResponseComplete = onResponseComplete; + this.streaming = streaming; } @Override @@ -100,14 +104,17 @@ protected void channelRead0(final ChannelHandlerContext channelHandlerContext, f // Stream is done when the last content signaling response message is read. if (LAST_CONTENT_READ_RESPONSE == response) { - final ResultSet rs = pendingResultSet.getAndSet(null); - if (rs != null) { - if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) { - rs.markComplete(); - } else { - rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); + if (!streaming) { + final ResultSet rs = pendingResultSet.getAndSet(null); + if (rs != null) { + if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) { + rs.markComplete(); + } else { + rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); + } } } + onResponseComplete.run(); } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java new file mode 100644 index 00000000000..607fd6bdf94 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.CharsetUtil; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; +import org.apache.tinkerpop.gremlin.driver.stream.GraphBinaryStreamResponseReader; +import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.util.ser.SerTokens; +import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; +import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; + +/** + * Decodes chunked HTTP responses into streaming results without buffering the full response body. + *

+ * For GraphBinary responses, content chunks are passed to a {@link ByteBufQueueInputStream} consumed by a + * {@link GraphBinaryStreamResponseReader} on a separate thread. That reader deserializes results incrementally, + * delivers them to the {@code ResultSet}, and handles completion and cleanup. For non-GraphBinary error responses + * (e.g., JSON 401/500), the error body is accumulated and parsed when the response ends, then + * {@code LAST_CONTENT_READ_RESPONSE} is fired for {@link GremlinResponseHandler} to process. + */ +public class HttpStreamingResponseHandler extends MessageToMessageDecoder { + + private static final Logger logger = LoggerFactory.getLogger(HttpStreamingResponseHandler.class); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final GraphBinaryReader graphBinaryReader; + private final AtomicReference pendingResultSet; + private final ExecutorService readerPool; + private final long maxResponseContentLength; + + // Mutable state below is accessed exclusively from the channel's event loop thread. + private HttpResponseStatus responseStatus; + private String contentType; + private long bytesRead; + private ByteBufQueueInputStream queueInputStream; + private CompositeByteBuf errorBody; + + public HttpStreamingResponseHandler(final GraphBinaryReader graphBinaryReader, + final AtomicReference pendingResultSet, + final ExecutorService readerPool, + final long maxResponseContentLength) { + this.graphBinaryReader = graphBinaryReader; + this.pendingResultSet = pendingResultSet; + this.readerPool = readerPool; + this.maxResponseContentLength = maxResponseContentLength; + } + + @Override + protected void decode(final ChannelHandlerContext ctx, final HttpObject msg, + final List out) throws Exception { + if (msg instanceof HttpResponse) { + final HttpResponse resp = (HttpResponse) msg; + + // Reset mutable state for the new response cycle to prevent stale state from a previous + // response bleeding into this one when the handler is reused on the same connection. + resetState(); + + responseStatus = resp.status(); + contentType = resp.headers().get(HttpHeaderNames.CONTENT_TYPE); + queueInputStream = new ByteBufQueueInputStream(); + + // Spawn reader thread for GraphBinary responses + if (isGraphBinaryResponse()) { + final ResultSet rs = pendingResultSet.get(); + if (rs != null) { + final InputStreamBuffer buffer = new InputStreamBuffer(queueInputStream); + final GraphBinaryStreamResponseReader streamReader = + new GraphBinaryStreamResponseReader(buffer, graphBinaryReader, rs, pendingResultSet); + try { + readerPool.submit(streamReader::run); + } catch (RejectedExecutionException e) { + queueInputStream.signalEndOfStream(); + rs.markError(e); + pendingResultSet.compareAndSet(rs, null); + out.add(LAST_CONTENT_READ_RESPONSE); + } + } else { + // No pending ResultSet — close the stream and fire sentinel immediately + queueInputStream.signalEndOfStream(); + queueInputStream = null; + out.add(LAST_CONTENT_READ_RESPONSE); + } + } + } + + if (msg instanceof HttpContent) { + final ByteBuf content = ((HttpContent) msg).content(); + bytesRead += content.readableBytes(); + + if (bytesRead > 0 && ctx.channel().attr(InactiveChannelHandler.BYTES_READ).get() == null) { + ctx.channel().attr(InactiveChannelHandler.BYTES_READ).set(0); + } + + if (maxResponseContentLength > 0 && bytesRead > maxResponseContentLength) { + throw new TooLongFrameException("Response entity too large"); + } + + if (!isGraphBinaryResponse()) { + // Accumulate non-GraphBinary error body across chunks + if (content.readableBytes() > 0) { + if (errorBody == null) { + errorBody = ctx.alloc().compositeBuffer(); + } + // retain() because Netty releases the content ByteBuf after decode() returns + errorBody.addComponent(true, content.retain()); + } + } else if (content.readableBytes() > 0 && queueInputStream != null) { + // Feed bytes to the reader thread + // retain() because Netty releases the content ByteBuf after decode() returns + queueInputStream.offer(content.retain()); + } + + if (msg instanceof LastHttpContent) { + if (isGraphBinaryResponse()) { + if (queueInputStream != null) { + queueInputStream.signalEndOfStream(); + // Null out so any spurious content arriving between responses is dropped + // rather than offered to the already-closed stream. + queueInputStream = null; + } + out.add(LAST_CONTENT_READ_RESPONSE); + } else { + // Non-GraphBinary error — parse accumulated body and fire sentinel + handleNonGraphBinaryError(); + out.add(LAST_CONTENT_READ_RESPONSE); + } + } + } + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + if (queueInputStream != null) { + queueInputStream.signalEndOfStream(); + } + releaseErrorBody(); + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { + // Mark error before signaling end-of-stream so the reader thread can't race + // with an EOFException from the closed stream. + final ResultSet rs = pendingResultSet.getAndSet(null); + if (rs != null) { + rs.markError(cause); + } + if (queueInputStream != null) { + queueInputStream.signalEndOfStream(); + } + releaseErrorBody(); + super.exceptionCaught(ctx, cause); + } + + private void handleNonGraphBinaryError() { + final ResultSet rs = pendingResultSet.get(); + if (rs == null) return; + + try { + if (errorBody != null && errorBody.readableBytes() > 0) { + final JsonNode node = mapper.readTree(errorBody.toString(CharsetUtil.UTF_8)); + final String message = node.has("message") ? node.get("message").asText() : responseStatus.reasonPhrase(); + rs.markError(new ResponseException(responseStatus, message)); + } else { + rs.markError(new ResponseException(responseStatus, responseStatus.reasonPhrase())); + } + } catch (Exception e) { + logger.debug("Failed to parse error response body as JSON", e); + rs.markError(new ResponseException(responseStatus, responseStatus.reasonPhrase())); + } finally { + pendingResultSet.compareAndSet(rs, null); + releaseErrorBody(); + } + } + + private void resetState() { + // Clean up any leftover resources from a previous response on this connection + if (queueInputStream != null) { + queueInputStream.signalEndOfStream(); + queueInputStream = null; + } + releaseErrorBody(); + bytesRead = 0; + responseStatus = null; + contentType = null; + } + + private void releaseErrorBody() { + if (errorBody != null) { + errorBody.release(); + errorBody = null; + } + } + + private boolean isGraphBinaryResponse() { + return !isError(responseStatus) || SerTokens.MIME_GRAPHBINARY_V4.equals(contentType); + } + + private static boolean isError(final HttpResponseStatus status) { + return status != HttpResponseStatus.OK; + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java new file mode 100644 index 00000000000..757b2821cb0 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.stream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * An {@link InputStream} backed by a {@link BlockingQueue} of {@link ByteBuf} objects. The Netty event loop + * offers ByteBufs to the queue as HTTP content chunks arrive, and a reader thread consumes them via + * standard InputStream reads. + */ +public class ByteBufQueueInputStream extends InputStream { + + private static final ByteBuf END_OF_STREAM = Unpooled.buffer(0); + + private final BlockingQueue queue; + private ByteBuf current; + private volatile boolean eof; + + public ByteBufQueueInputStream() { + this.queue = new LinkedBlockingQueue<>(); + } + + /** + * Offer a ByteBuf to the queue. The caller must have already retained the ByteBuf if needed. + * The ByteBuf will be released after it is fully read. If the stream is already closed, + * the buffer is released immediately. + */ + public void offer(final ByteBuf buf) { + if (eof) { + if (buf != END_OF_STREAM && buf.refCnt() > 0) { + buf.release(); + } + return; + } + queue.add(buf); + } + + /** + * Signal that no more ByteBufs will be offered. + */ + public void signalEndOfStream() { + queue.offer(END_OF_STREAM); + } + + @Override + public int read() throws IOException { + if (eof) return -1; + + while (current == null || !current.isReadable()) { + releaseCurrent(); + try { + current = queue.poll(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for data", e); + } + if (current == null) throw new IOException("Timed out waiting for streaming response data"); + if (current == END_OF_STREAM) { + eof = true; + current = null; + return -1; + } + } + return current.readByte() & 0xFF; + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + if (eof) return -1; + if (len == 0) return 0; + + // Block until at least one byte is available, then return what we have (short read). + while (current == null || !current.isReadable()) { + releaseCurrent(); + try { + current = queue.poll(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for data", e); + } + if (current == null) throw new IOException("Timed out waiting for streaming response data"); + if (current == END_OF_STREAM) { + eof = true; + current = null; + return -1; + } + } + final int readable = Math.min(current.readableBytes(), len); + current.readBytes(b, off, readable); + return readable; + } + + @Override + public void close() throws IOException { + eof = true; + releaseCurrent(); + // drain and release any remaining buffers + ByteBuf buf; + while ((buf = queue.poll()) != null) { + if (buf != END_OF_STREAM && buf.refCnt() > 0) { + buf.release(); + } + } + } + + private void releaseCurrent() { + if (current != null && current != END_OF_STREAM && current.refCnt() > 0) { + current.release(); + } + current = null; + } + +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java new file mode 100644 index 00000000000..11491903e8b --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.stream; + +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser; +import org.apache.tinkerpop.gremlin.structure.io.Buffer; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.structure.io.binary.Marker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Performs pull-based streaming deserialization of a GraphBinary v4 response from an {@link InputStreamBuffer}. + * Reads one item at a time using the {@link GraphBinaryReader} and {@code TypeSerializer} infrastructure, + * pushing each result to the {@link ResultSet} as it is deserialized. + *

+ * Wire format: {@code [version_byte][bulked_flag_byte][items...][EndOfStream marker][status_code][message][exception]} + */ +public class GraphBinaryStreamResponseReader implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(GraphBinaryStreamResponseReader.class); + + private final Buffer buffer; + private final GraphBinaryReader reader; + private final ResultSet resultSet; + private final AtomicReference pendingResultSet; + + public GraphBinaryStreamResponseReader(final Buffer buffer, + final GraphBinaryReader reader, + final ResultSet resultSet, + final AtomicReference pendingResultSet) { + this.buffer = buffer; + this.reader = reader; + this.resultSet = resultSet; + this.pendingResultSet = pendingResultSet; + } + + @Override + public void run() { + try { + // Read header: version byte (MSB must be 1) and bulking flag + final byte version = buffer.readByte(); + if ((version & 0x80) == 0) { + throw new RuntimeException("Invalid GraphBinary response version: " + version); + } + final boolean bulked = (buffer.readByte() & 1) == 1; + + // Read items until EndOfStream marker + while (true) { + final Object obj = reader.read(buffer); + if (obj instanceof Marker) { + break; + } + + if (bulked) { + final long bulk = reader.read(buffer); + resultSet.add(new Result(new DefaultRemoteTraverser<>(obj, bulk))); + } else { + resultSet.add(new Result(obj)); + } + } + + // Read footer: status code, nullable message, nullable exception + final int statusCode = reader.readValue(buffer, Integer.class, false); + final String message = reader.readValue(buffer, String.class, true); + final String exception = reader.readValue(buffer, String.class, true); + + // Status code 0 means success in GraphBinary v4 — the server omits the HTTP status code + // in the binary footer when the response is successful. + if (statusCode == 0 || statusCode == HttpResponseStatus.OK.code()) { + resultSet.markComplete(); + } else { + resultSet.markError(new ResponseException(HttpResponseStatus.valueOf(statusCode), message, exception)); + } + } catch (Throwable t) { + logger.warn("Error reading streaming response", t); + resultSet.markError(t); + } finally { + pendingResultSet.compareAndSet(resultSet, null); + buffer.release(); + } + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java new file mode 100644 index 00000000000..4490975f23f --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/InputStreamBuffer.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.stream; + +import org.apache.tinkerpop.gremlin.structure.io.Buffer; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * A read-only {@link Buffer} implementation backed by a blocking {@link InputStream} via {@link DataInputStream}. + * Supports only sequential read operations — all write, random-access, and NIO methods throw + * {@link UnsupportedOperationException}. + *

+ * This allows the existing {@code TypeSerializer} implementations (which only use sequential reads) to work + * unchanged over a streaming HTTP response body. + */ +public class InputStreamBuffer implements Buffer { + + private final DataInputStream in; + private int bytesRead; + + public InputStreamBuffer(final InputStream inputStream) { + this.in = new DataInputStream(inputStream); + } + + @Override + public boolean readBoolean() { + try { + final boolean v = in.readBoolean(); + bytesRead += 1; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public byte readByte() { + try { + final byte v = in.readByte(); + bytesRead += 1; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public short readShort() { + try { + final short v = in.readShort(); + bytesRead += 2; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public int readInt() { + try { + final int v = in.readInt(); + bytesRead += 4; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public long readLong() { + try { + final long v = in.readLong(); + bytesRead += 8; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public float readFloat() { + try { + final float v = in.readFloat(); + bytesRead += 4; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public double readDouble() { + try { + final double v = in.readDouble(); + bytesRead += 8; + return v; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Buffer readBytes(final byte[] destination) { + try { + in.readFully(destination); + bytesRead += destination.length; + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Buffer readBytes(final byte[] destination, final int dstIndex, final int length) { + try { + in.readFully(destination, dstIndex, length); + bytesRead += length; + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Buffer readBytes(final ByteBuffer dst) { + try { + final byte[] tmp = new byte[dst.remaining()]; + in.readFully(tmp); + dst.put(tmp); + bytesRead += tmp.length; + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Buffer readBytes(final OutputStream out, final int length) throws IOException { + final byte[] tmp = new byte[length]; + in.readFully(tmp); + out.write(tmp); + bytesRead += length; + return this; + } + + @Override + public int readerIndex() { + return bytesRead; + } + + @Override + public int readableBytes() { + throw new UnsupportedOperationException("readableBytes() is not supported on a streaming Buffer"); + } + + @Override + public Buffer readerIndex(final int readerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public int writerIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writerIndex(final int writerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer markWriterIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer resetWriterIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public int capacity() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDirect() { + return false; + } + + @Override + public Buffer writeBoolean(final boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeByte(final int value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeShort(final int value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeInt(final int value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeLong(final long value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeFloat(final float value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeDouble(final double value) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeBytes(final byte[] src) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeBytes(final ByteBuffer src) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer writeBytes(final byte[] src, final int srcIndex, final int length) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release() { + try { + in.close(); + } catch (IOException e) { + // best-effort close + } + return true; + } + + @Override + public Buffer retain() { + throw new UnsupportedOperationException(); + } + + @Override + public int referenceCount() { + throw new UnsupportedOperationException(); + } + + @Override + public int nioBufferCount() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers(final int index, final int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer(final int index, final int length) { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer getBytes(final int index, final byte[] dst) { + throw new UnsupportedOperationException(); + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java new file mode 100644 index 00000000000..91ba9cf28a4 --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class ByteBufQueueInputStreamTest { + + @Test + public void shouldReadSingleByteBuf() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + final ByteBuf buf = Unpooled.buffer(); + buf.writeBytes(new byte[]{1, 2, 3, 4}); + stream.offer(buf); + stream.signalEndOfStream(); + + assertEquals(1, stream.read()); + assertEquals(2, stream.read()); + assertEquals(3, stream.read()); + assertEquals(4, stream.read()); + assertEquals(-1, stream.read()); + } + + @Test + public void shouldReadAcrossMultipleByteBufs() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(Unpooled.wrappedBuffer(new byte[]{1, 2})); + stream.offer(Unpooled.wrappedBuffer(new byte[]{3, 4})); + stream.signalEndOfStream(); + + final byte[] result = new byte[8]; + int totalRead = 0; + int read; + while ((read = stream.read(result, totalRead, result.length - totalRead)) != -1) { + totalRead += read; + } + assertEquals(4, totalRead); + assertArrayEquals(new byte[]{1, 2, 3, 4}, java.util.Arrays.copyOf(result, totalRead)); + } + + @Test + public void shouldReleaseByteBufsAfterReading() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4); + buf.writeBytes(new byte[]{1, 2, 3, 4}); + assertEquals(1, buf.refCnt()); + + stream.offer(buf); + stream.signalEndOfStream(); + + final byte[] result = new byte[4]; + stream.read(result, 0, 4); + stream.read(); // triggers release of buf and reads EOS + + assertEquals(0, buf.refCnt()); + } + + @Test + public void shouldCleanUpOnClose() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + final ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(2); + buf1.writeBytes(new byte[]{1, 2}); + final ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(2); + buf2.writeBytes(new byte[]{3, 4}); + + stream.offer(buf1); + stream.offer(buf2); + stream.close(); + + assertEquals(0, buf1.refCnt()); + assertEquals(0, buf2.refCnt()); + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java new file mode 100644 index 00000000000..484ec7d5246 --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/GraphBinaryStreamResponseReaderTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; +import org.apache.tinkerpop.gremlin.driver.stream.GraphBinaryStreamResponseReader; +import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer; +import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class GraphBinaryStreamResponseReaderTest { + + private ExecutorService executor; + private GraphBinaryMessageSerializerV4 serializer; + private GraphBinaryReader reader; + + @Before + public void setup() { + executor = Executors.newCachedThreadPool(); + serializer = new GraphBinaryMessageSerializerV4(); + reader = serializer.getMapper().getReader(); + } + + @After + public void teardown() { + executor.shutdownNow(); + } + + @Test + public void shouldReadSingleItemResponse() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.singletonList("hello")).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List results = rs.all().get(); + assertEquals(1, results.size()); + assertEquals("hello", results.get(0).getString()); + assertNull(pending.get()); + } + + @Test + public void shouldReadMultipleItemResponse() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Arrays.asList(1, 2, 3)).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List results = rs.all().get(); + assertEquals(3, results.size()); + assertEquals(1, results.get(0).getInt()); + assertEquals(2, results.get(1).getInt()); + assertEquals(3, results.get(2).getInt()); + } + + @Test + public void shouldReadBulkedResponse() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Arrays.asList("a", 3L)).bulked(true).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List results = rs.all().get(); + assertEquals(1, results.size()); + assertTrue(results.get(0).getObject() instanceof DefaultRemoteTraverser); + final DefaultRemoteTraverser traverser = (DefaultRemoteTraverser) results.get(0).getObject(); + assertEquals("a", traverser.get()); + assertEquals(3L, traverser.bulk()); + } + + @Test + public void shouldHandleErrorFooter() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.INTERNAL_SERVER_ERROR) + .statusMessage("Something went wrong") + .exception("java.lang.RuntimeException") + .result(Collections.emptyList()).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + assertTrue(rs.allItemsAvailable()); + try { + rs.all().get(); + fail("Expected exception"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof ResponseException); + final ResponseException re = (ResponseException) e.getCause(); + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, re.getResponseStatusCode()); + assertEquals("Something went wrong", re.getMessage()); + } + assertNull(pending.get()); + } + + @Test + public void shouldReadDataSplitAcrossChunks() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + + // Use a large string (~1000 chars) so the split point must land inside it regardless of + // header/footer overhead. The header+footer are ~30 bytes; the string item is ~1000 bytes. + final String largeValue = String.join("", Collections.nCopies(100, "abcdefghij")); + final ByteBuf fullPayload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.singletonList(largeValue)).create(), + ByteBufAllocator.DEFAULT); + + final int splitPoint = fullPayload.readableBytes() / 2; + final ByteBuf chunk1 = fullPayload.readSlice(splitPoint).retain(); + final ByteBuf chunk2 = fullPayload.retain(); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(chunk1); + stream.offer(chunk2); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List results = rs.all().get(); + assertEquals(1, results.size()); + assertEquals(largeValue, results.get(0).getString()); + + fullPayload.release(); + } + + @Test + public void shouldReadEmptyResponse() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + + final ByteBuf payload = serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.emptyList()).create(), + ByteBufAllocator.DEFAULT); + + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(payload); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + new GraphBinaryStreamResponseReader(buffer, reader, rs, pending).run(); + + final List results = rs.all().get(); + assertTrue(results.isEmpty()); + assertNull(pending.get()); + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java new file mode 100644 index 00000000000..fdd37818a44 --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.CharsetUtil; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.apache.tinkerpop.gremlin.util.ser.SerTokens; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; +import static org.junit.Assert.*; + +public class HttpStreamingResponseHandlerTest { + + private ExecutorService executor; + private GraphBinaryMessageSerializerV4 serializer; + private GraphBinaryReader reader; + + @Before + public void setup() { + executor = Executors.newSingleThreadExecutor(); + serializer = new GraphBinaryMessageSerializerV4(); + reader = serializer.getMapper().getReader(); + } + + @After + public void teardown() { + executor.shutdownNow(); + } + + private EmbeddedChannel createChannel(final AtomicReference pendingResultSet, final long maxResponseContentLength) { + final HttpStreamingResponseHandler handler = new HttpStreamingResponseHandler( + reader, pendingResultSet, executor, maxResponseContentLength); + return new EmbeddedChannel(handler); + } + + @Test + public void shouldEmitLastContentReadResponseOnHappyPath() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 0); + + // Serialize a valid GraphBinary response + final byte[] payload = toBytes(serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.singletonList(1)).create(), + channel.alloc())); + + // Send HttpResponse with GraphBinary content type + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + + // Send content + channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(payload))); + + // Send LastHttpContent + channel.writeInbound(new DefaultLastHttpContent()); + + // Verify LAST_CONTENT_READ_RESPONSE is emitted + final Object out = channel.readInbound(); + assertSame(LAST_CONTENT_READ_RESPONSE, out); + + channel.finishAndReleaseAll(); + } + + @Test + public void shouldHandleDoubleLastHttpContentWithoutError() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 0); + + final byte[] payload = toBytes(serializer.serializeResponseAsBinary( + ResponseMessage.build().code(HttpResponseStatus.OK) + .result(Collections.singletonList(1)).create(), + channel.alloc())); + + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(payload))); + channel.writeInbound(new DefaultLastHttpContent()); + + // Send a second LastHttpContent — should not throw NPE + channel.writeInbound(new DefaultLastHttpContent()); + + channel.finishAndReleaseAll(); + } + + @Test + public void shouldThrowTooLongFrameExceptionWhenMaxLengthExceeded() { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 10); + + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + + try { + // Send content exceeding the 10-byte limit + channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[20]))); + fail("Expected TooLongFrameException"); + } catch (Exception e) { + assertTrue(e instanceof TooLongFrameException); + } + + channel.finishAndReleaseAll(); + } + + @Test + public void shouldSignalQueueInputStreamOnChannelInactive() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 0); + + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + + // Send some content but no LastHttpContent + channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[]{1, 2, 3}))); + + // Fire channelInactive — should not throw and should signal the stream + channel.pipeline().fireChannelInactive(); + + channel.finishAndReleaseAll(); + } + + @Test + public void shouldMarkErrorOnResultSetForNonGraphBinaryError() throws Exception { + final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null); + final AtomicReference pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending, 0); + + // Send a 500 response with JSON content type + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json"); + channel.writeInbound(response); + + // Send JSON error body + final String errorJson = "{\"message\":\"test error\"}"; + channel.writeInbound(new DefaultLastHttpContent(Unpooled.copiedBuffer(errorJson, CharsetUtil.UTF_8))); + + // Verify error is marked on the ResultSet + try { + rs.all().get(); + fail("Expected exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof ResponseException); + final ResponseException re = (ResponseException) e.getCause(); + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, re.getResponseStatusCode()); + assertEquals("test error", re.getMessage()); + } + + // Verify pendingResultSet was cleared + assertNull(pending.get()); + + channel.finishAndReleaseAll(); + } + + private byte[] toBytes(final io.netty.buffer.ByteBuf buf) { + final byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + buf.release(); + return bytes; + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java new file mode 100644 index 00000000000..02d029d66e1 --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/InputStreamBufferTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.Unpooled; +import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; +import org.apache.tinkerpop.gremlin.driver.stream.InputStreamBuffer; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class InputStreamBufferTest { + + @Test + public void shouldReadPrimitivesThroughInputStreamBuffer() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + final io.netty.buffer.ByteBuf buf = Unpooled.buffer(); + buf.writeByte(42); + buf.writeInt(12345); + buf.writeLong(9876543210L); + buf.writeFloat(3.14f); + buf.writeDouble(2.718281828); + buf.writeShort(256); + buf.writeBoolean(true); + stream.offer(buf); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + assertEquals(42, buffer.readByte()); + assertEquals(12345, buffer.readInt()); + assertEquals(9876543210L, buffer.readLong()); + assertEquals(3.14f, buffer.readFloat(), 0.001f); + assertEquals(2.718281828, buffer.readDouble(), 0.000001); + assertEquals(256, buffer.readShort()); + assertTrue(buffer.readBoolean()); + } + + @Test + public void shouldReadBytesArray() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(Unpooled.wrappedBuffer(new byte[]{10, 20, 30})); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + final byte[] dest = new byte[3]; + buffer.readBytes(dest); + assertArrayEquals(new byte[]{10, 20, 30}, dest); + } + + @Test + public void shouldTrackReaderIndex() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); + stream.offer(Unpooled.wrappedBuffer(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9})); + stream.signalEndOfStream(); + + final InputStreamBuffer buffer = new InputStreamBuffer(stream); + assertEquals(0, buffer.readerIndex()); + buffer.readByte(); + assertEquals(1, buffer.readerIndex()); + buffer.readInt(); + assertEquals(5, buffer.readerIndex()); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowOnReadableBytes() { + new InputStreamBuffer(new ByteBufQueueInputStream()).readableBytes(); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowOnWriteInt() { + new InputStreamBuffer(new ByteBufQueueInputStream()).writeInt(1); + } + + @Test(expected = UnsupportedOperationException.class) + public void shouldThrowOnNioBuffer() { + new InputStreamBuffer(new ByteBufQueueInputStream()).nioBuffer(); + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index 7c474d6ba0c..f0cb2b9ea2c 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -275,7 +275,11 @@ public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage r // failures that follow this will show up in the response body instead. sendHttpResponse(ctx, OK, createResponseHeaders(ctx, serializer, requestCtx).toArray(CharSequence[]::new)); sendHttpContents(ctx, requestCtx); - sendLastHttpContent(ctx, HttpResponseStatus.OK, ""); + // Skip if writeError() already terminated the response (e.g., serialization error in makeChunk). + // Sending a second LastHttpContent would corrupt the HTTP framing on keep-alive connections. + if (requestCtx.getRequestState() != RequestState.ERROR) { + sendLastHttpContent(ctx, HttpResponseStatus.OK, ""); + } } catch (Throwable t) { writeError(requestCtx, formErrorResponseMessage(t, requestMessage), serializer.getValue1()); } finally { @@ -753,19 +757,21 @@ private static ByteBuf makeChunk(final Context ctx, final MessageSerializer s ctx.setRequestState(STREAMING); return serializer.writeHeader(responseMessage, nettyContext.alloc()); } - ctx.setRequestState(FINISHED); - return serializer.serializeResponseAsBinary(ResponseMessage.build() + final ByteBuf fullResponse = serializer.serializeResponseAsBinary(ResponseMessage.build() .result(aggregate) .bulked(bulking) .code(HttpResponseStatus.OK) .create(), nettyContext.alloc()); + ctx.setRequestState(FINISHED); + return fullResponse; case STREAMING: return serializer.writeChunk(aggregate, nettyContext.alloc()); case FINISHING: + final ByteBuf footer = serializer.writeFooter(responseMessage, nettyContext.alloc()); ctx.setRequestState(FINISHED); - return serializer.writeFooter(responseMessage, nettyContext.alloc()); + return footer; } return serializer.serializeResponseAsBinary(responseMessage, nettyContext.alloc()); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java index f0f6b652c09..4fa5ba84ec5 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java @@ -97,6 +97,13 @@ public static void sendError(final ChannelHandlerContext ctx, final HttpResponse * @param serializer The serializer to use to serialize the error response. */ static void writeError(final Context context, final ResponseMessage responseMessage, final MessageSerializer serializer) { + // Prevent writing after the response is already terminated. A second write would corrupt + // HTTP framing on keep-alive connections, poisoning them for subsequent requests. + if (context.getRequestState() == HttpGremlinEndpointHandler.RequestState.ERROR || + context.getRequestState() == HttpGremlinEndpointHandler.RequestState.FINISHED) { + return; + } + try { final ChannelHandlerContext ctx = context.getChannelHandlerContext(); final ByteBuf ByteBuf = context.getRequestState() == HttpGremlinEndpointHandler.RequestState.STREAMING diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java new file mode 100644 index 00000000000..708a63e13fc --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/StreamingResponseIntegrateTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; +import static org.junit.Assert.*; + +/** + * Integration tests for streaming HTTP response support in the Java driver. + * Verifies that the streaming pipeline (HttpStreamingResponseHandler + GraphBinaryStreamResponseReader) + * works correctly end-to-end against a real Gremlin Server. + */ +public class StreamingResponseIntegrateTest extends AbstractGremlinServerIntegrationTest { + + @Override + public Settings overrideSettings(final Settings settings) { + settings.channelizer = HttpChannelizer.class.getName(); + return settings; + } + + @Test + public void shouldStreamBasicResults() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + final List results = client.submit("g.inject(1,2,3)").all().get(); + assertEquals(3, results.size()); + assertEquals(1, results.get(0).getInt()); + assertEquals(2, results.get(1).getInt()); + assertEquals(3, results.get(2).getInt()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamIncrementallyWithIterator() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + final ResultSet rs = client.submit("g.inject(1,2,3,4,5)"); + + // Consume results one at a time via iterator + final Iterator iter = rs.iterator(); + final List collected = new ArrayList<>(); + while (iter.hasNext()) { + collected.add(iter.next().getInt()); + } + assertEquals(5, collected.size()); + for (int i = 0; i < 5; i++) { + assertEquals(i + 1, (int) collected.get(i)); + } + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamIncrementallyWithOne() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + final ResultSet rs = client.submit("g.inject(10,20,30)"); + + assertEquals(10, rs.one().getInt()); + assertEquals(20, rs.one().getInt()); + assertEquals(30, rs.one().getInt()); + assertNull(rs.one()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamLargeResultSet() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + // Generate a large result set that would span multiple HTTP chunks + final List results = client.submit( + "g.inject(1).repeat(__.identity()).times(1000).emit()").all().get(); + assertEquals(1000, results.size()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamEmptyResponse() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + final List results = client.submit("g.V().hasLabel('nonexistent')").all().get(); + assertTrue(results.isEmpty()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldHandleServerErrorDuringStreaming() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + client.submit("invalid_script_that_should_fail").all().get(); + fail("Expected exception"); + } catch (ExecutionException e) { + // Error should propagate correctly through the streaming pipeline + assertTrue(e.getCause() instanceof ResponseException); + final ResponseException re = (ResponseException) e.getCause(); + assertEquals(HttpResponseStatus.BAD_REQUEST, re.getResponseStatusCode()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldReuseConnectionAfterStreamingComplete() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + + // First request + final List results1 = client.submit("g.inject(1)").all().get(); + assertEquals(1, results1.size()); + + // Second request on same client (should reuse connection) + final List results2 = client.submit("g.inject(2)").all().get(); + assertEquals(1, results2.size()); + assertEquals(2, results2.get(0).getInt()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldHandleConcurrentStreamingRequests() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + + final CompletableFuture> f1 = client.submit("g.inject(1,2,3)").all(); + final CompletableFuture> f2 = client.submit("g.inject(4,5,6)").all(); + final CompletableFuture> f3 = client.submit("g.inject(7,8,9)").all(); + + assertEquals(3, f1.get().size()); + assertEquals(3, f2.get().size()); + assertEquals(3, f3.get().size()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamWithTraversalApi() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final GraphTraversalSource g = traversal().with( + DriverRemoteConnection.using(cluster)); + + final List results = g.inject(1, 2, 3).toList(); + assertEquals(3, results.size()); + assertTrue(results.contains(1)); + assertTrue(results.contains(2)); + assertTrue(results.contains(3)); + } finally { + cluster.close(); + } + } + + @Test + public void shouldStreamVerticesFromGraph() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + + // Use inject to create data rather than relying on pre-loaded graph + final List results = client.submit("g.inject(1,2,3,4,5,6)").all().get(); + assertEquals(6, results.size()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldReuseConnectionAfterServerError() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + try { + final Client client = cluster.connect(); + + // Submit a request that causes a server error + try { + client.submit("throw new RuntimeException('test error')").all().get(); + fail("Should have thrown"); + } catch (ExecutionException e) { + // expected + } + + // Connection should still be usable + final List results = client.submit("g.inject(1)").all().get(); + assertEquals(1, results.get(0).getInt()); + } finally { + cluster.close(); + } + } +}