Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima

* Added Gremlator, a single page web application, that translates Gremlin into various programming languages like Javascript and Python.
* Removed `uuid` dependency from `gremlin-javascript` in favor of the built-in `globalThis.crypto.randomUUID()`.
* Added streaming HTTP response support to `gremlin-driver` for incremental result deserialization over GraphBinary.
* Connected HTTP streaming response deserialization to the traversal API in `gremlin-javascript`, enabling `next()` to return the first result without waiting for the full response.
* Changed `Client.stream()` in `gremlin-javascript` to return an `AsyncGenerator` for direct incremental consumption.
* Removed `readable-stream` dependency from `gremlin-javascript`.
Expand Down
9 changes: 9 additions & 0 deletions docs/src/upgrade/release-4.x.x.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ try {
The traversal API is not affected — `Next()`, `ToList()`, etc. still throw `ResponseException` directly since they
block on the async stream internally.

==== Streaming Response Deserialization in gremlin-driver

The Java driver now deserializes HTTP responses incrementally, delivering results to the `ResultSet` as they arrive
rather than buffering the entire response. This reduces time-to-first-result for large result sets.

This change is automatic and requires no code changes. It applies only when using the default GraphBinary serializer.
Custom `MessageSerializer` implementations fall back to the non-streaming pipeline that buffers the full response
before deserialization. The `ResultSet` API is unchanged.

==== More Secure Gremlin Server

Previous versions of Gremlin Server relied on a Gremlin-flavored Groovy `ScriptEngine` for basic server initialization,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@
import org.apache.tinkerpop.gremlin.driver.handler.HttpContentDecompressionHandler;
import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder;
import org.apache.tinkerpop.gremlin.driver.handler.HttpStreamingResponseHandler;
import org.apache.tinkerpop.gremlin.driver.handler.IdleConnectionHandler;
import org.apache.tinkerpop.gremlin.driver.handler.InactiveChannelHandler;
import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;

import java.util.Collections;
import java.util.Optional;
Expand Down Expand Up @@ -92,7 +96,7 @@ abstract class AbstractChannelizer extends ChannelInitializer<SocketChannel> imp
protected Connection connection;
protected Cluster cluster;
protected SslHandler sslHandler;
private AtomicReference<ResultSet> pending;
protected AtomicReference<ResultSet> pending;

protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler";
protected static final String PIPELINE_SSL_HANDLER = "gremlin-ssl-handler";
Expand Down Expand Up @@ -158,7 +162,6 @@ protected void initChannel(final SocketChannel socketChannel) {
}

configure(pipeline);
pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending));
}

@Override
Expand Down Expand Up @@ -187,7 +190,9 @@ final class HttpChannelizer extends AbstractChannelizer {
ResponseMessage.build().code(HttpResponseStatus.NO_CONTENT).result(Collections.emptyList()).create();

private HttpGremlinRequestEncoder gremlinRequestEncoder;
private HttpStreamingResponseHandler streamingResponseHandler;
private HttpGremlinResponseDecoder gremlinResponseDecoder;
private boolean useStreaming;

private HttpContentDecompressionHandler httpCompressionDecoder;
private IdleStateHandler idleStateHandler;
Expand All @@ -200,7 +205,19 @@ public void init(final Connection connection) {
httpCompressionDecoder = new HttpContentDecompressionHandler();
gremlinRequestEncoder = new HttpGremlinRequestEncoder(cluster.getSerializer(), cluster.getRequestInterceptors(),
cluster.isUserAgentOnConnectEnabled(), cluster.isBulkResultsEnabled(), connection.getUri());
gremlinResponseDecoder = new HttpGremlinResponseDecoder(cluster.getSerializer());

final MessageSerializer<?> serializer = cluster.getSerializer();
if (serializer instanceof GraphBinaryMessageSerializerV4) {
useStreaming = true;
final GraphBinaryReader graphBinaryReader =
((GraphBinaryMessageSerializerV4) serializer).getMapper().getReader();
streamingResponseHandler = new HttpStreamingResponseHandler(
graphBinaryReader, pending, cluster.streamingReaderPool(), cluster.getMaxResponseContentLength());
} else {
useStreaming = false;
gremlinResponseDecoder = new HttpGremlinResponseDecoder(serializer);
}

if (cluster.getIdleConnectionTimeout() > 0) {
final int idleConnectionTimeout = (int) (cluster.getIdleConnectionTimeout() / 1000);
idleStateHandler = new IdleStateHandler(idleConnectionTimeout, idleConnectionTimeout, 0);
Expand Down Expand Up @@ -240,11 +257,22 @@ public void configure(final ChannelPipeline pipeline) {
DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS, false);

pipeline.addLast(PIPELINE_HTTP_CODEC, handler);
pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0
? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE));
pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder);
pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder);
if (useStreaming) {
pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder);
pipeline.addLast(PIPELINE_HTTP_DECODER, streamingResponseHandler);
pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
} else {
pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0
? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE));
pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder);
pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder);
}

pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending, () -> {
connection.returnToPool();
connection.tryShutdown();
}, useStreaming));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -383,6 +386,10 @@ ScheduledExecutorService connectionScheduler() {
return manager.connectionScheduler;
}

ExecutorService streamingReaderPool() {
return manager.streamingReaderPool;
}

Settings.ConnectionPoolSettings connectionPoolSettings() {
return manager.connectionPoolSettings;
}
Expand Down Expand Up @@ -956,6 +963,12 @@ class Manager {
*/
private final ScheduledThreadPoolExecutor connectionScheduler;

/**
* Cached thread pool for streaming response reader threads. One thread per active streaming response,
* bounded implicitly by the connection pool size.
*/
private final ExecutorService streamingReaderPool;

private final int nioPoolSize;
private final int workerPoolSize;
private final int port;
Expand Down Expand Up @@ -1023,6 +1036,10 @@ private Manager(final Builder builder) {
this.connectionScheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
new BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build());

this.streamingReaderPool = new ThreadPoolExecutor(0, builder.maxConnectionPoolSize * Math.max(contactPoints.size(), 1) * 4,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
new BasicThreadFactory.Builder().namingPattern("gremlin-driver-stream-reader-%d").build());

validationRequest = () -> RequestMessage.build(builder.validationRequest);
}

Expand Down Expand Up @@ -1133,6 +1150,7 @@ synchronized CompletableFuture<Void> close() {
executor.shutdown();
hostScheduler.shutdown();
connectionScheduler.shutdown();
streamingReaderPool.shutdownNow();
closeIt.complete(null);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ final class Connection {
* Is a {@code Connection} borrowed from the pool.
*/
private final AtomicBoolean isBorrowed = new AtomicBoolean(false);
/**
* Prevents returnToPool() from being called more than once per borrow cycle.
*/
private final AtomicBoolean returned = new AtomicBoolean(false);

void resetReturned() {
returned.set(false);
}
/**
* This boolean guards the replace of the connection and ensures that it only occurs once.
*/
Expand Down Expand Up @@ -201,25 +209,21 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab
} else {
final ResultSet resultSet = new ResultSet(cluster.executor(), requestMessage, pool.host);

resultSet.getReadCompleted().whenCompleteAsync((v, t) -> {
resultSet.getReadCompleted().whenComplete((v, t) -> {
if (t != null) {
// the callback for when the read failed. a failed read means the request went to the server
// and came back with a server-side error of some sort. it means the server is responsive
// so this isn't going to be like a potentially dead host situation which is handled above on a failed
// write operation.
logger.debug("Error while processing request on the server {}.", this, t);
handleConnectionCleanupOnError(thisConnection);
} else {
// the callback for when the read was successful, meaning that ResultSet.markComplete()
// was called
thisConnection.returnToPool();
}
// While this request was in process, close might have been signaled in closeAsync().
// However, close would be blocked until all pending requests are completed. Attempt
// the shutdown if the returned result cleared up the last pending message and unblocked
// the close.
tryShutdown();
}, cluster.executor());
});

pending.set(resultSet);

Expand All @@ -234,7 +238,8 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab
return requestPromise;
}

private void returnToPool() {
void returnToPool() {
if (!returned.compareAndSet(false, true)) return;
try {
if (pool != null) pool.returnConnection(this);
} catch (ConnectionException ce) {
Expand All @@ -244,7 +249,7 @@ private void returnToPool() {
}

private void handleConnectionCleanupOnError(final Connection thisConnection) {
if (thisConnection.isDead()) {
if (thisConnection.isDead() || (thisConnection.channel != null && !thisConnection.channel.isOpen())) {
if (pool != null) pool.replaceConnection(thisConnection);
} else {
thisConnection.returnToPool();
Expand All @@ -259,7 +264,7 @@ private boolean isOkToClose() {
* Close was signaled in closeAsync() but there were pending messages at that time. This method attempts the
* shutdown if the returned result cleared up the last pending message.
*/
private void tryShutdown() {
void tryShutdown() {
if (isClosing() && isOkToClose())
shutdown(closeFuture.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ private Connection getAvailableConnection() {
while (head != null) {
// try to borrow connection
if (!head.isDead() && !head.isBorrowed().get() && head.isBorrowed().compareAndSet(false, true)) {
head.resetReturned();
available = head;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response
private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class);
private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION = AttributeKey.valueOf("caughtException");
private final AtomicReference<ResultSet> pendingResultSet;
private final Runnable onResponseComplete;
private final boolean streaming;

public GremlinResponseHandler(final AtomicReference<ResultSet> pending) {
public GremlinResponseHandler(final AtomicReference<ResultSet> pending, final Runnable onResponseComplete, final boolean streaming) {
this.pendingResultSet = pending;
this.onResponseComplete = onResponseComplete;
this.streaming = streaming;
}

@Override
Expand Down Expand Up @@ -100,14 +104,17 @@ protected void channelRead0(final ChannelHandlerContext channelHandlerContext, f

// Stream is done when the last content signaling response message is read.
if (LAST_CONTENT_READ_RESPONSE == response) {
final ResultSet rs = pendingResultSet.getAndSet(null);
if (rs != null) {
if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) {
rs.markComplete();
} else {
rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
if (!streaming) {
final ResultSet rs = pendingResultSet.getAndSet(null);
if (rs != null) {
if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) {
rs.markComplete();
} else {
rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
}
}
}
onResponseComplete.run();
}
}

Expand Down
Loading
Loading