-
Notifications
You must be signed in to change notification settings - Fork 39
[PECOBLR-2321] Result Set Heartbeat / Keep-Alive for Ongoing Query Executions #1415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 11 commits
00b8d4b
6130304
d3f0c47
cedf144
5d9aadc
fa92347
291be6c
fe84cc4
53db645
994dbc2
a51bccc
723ce06
7534523
d24d62a
7631ee3
469a459
a037ae1
b881d4c
1910332
a859117
458aa5e
a40466a
832a1c8
fc46126
9798aa5
0a46f52
fd4e90a
75be9b7
c4a190d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ public class DatabricksConnection implements IDatabricksConnection, IDatabricksC | |
| private final Set<IDatabricksStatementInternal> statementSet = ConcurrentHashMap.newKeySet(); | ||
| private SQLWarning warnings = null; | ||
| private final IDatabricksConnectionContext connectionContext; | ||
| private final ResultHeartbeatManager heartbeatManager; | ||
|
|
||
| /** | ||
| * Creates an instance of Databricks connection for given connection context. | ||
|
|
@@ -49,6 +50,7 @@ public DatabricksConnection(IDatabricksConnectionContext connectionContext) | |
| this.connectionContext = connectionContext; | ||
| DatabricksThreadContextHolder.setConnectionContext(connectionContext); | ||
| this.session = new DatabricksSession(connectionContext); | ||
| this.heartbeatManager = createHeartbeatManager(connectionContext); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
|
|
@@ -58,10 +60,27 @@ public DatabricksConnection( | |
| this.connectionContext = connectionContext; | ||
| DatabricksThreadContextHolder.setConnectionContext(connectionContext); | ||
| this.session = new DatabricksSession(connectionContext, testDatabricksClient); | ||
| this.heartbeatManager = createHeartbeatManager(connectionContext); | ||
| UserAgentManager.setUserAgent(connectionContext); | ||
| TelemetryHelper.updateTelemetryAppName(connectionContext, null); | ||
| } | ||
|
|
||
| private static ResultHeartbeatManager createHeartbeatManager( | ||
| IDatabricksConnectionContext connectionContext) { | ||
| if (connectionContext instanceof DatabricksConnectionContext) { | ||
| DatabricksConnectionContext ctx = (DatabricksConnectionContext) connectionContext; | ||
| if (ctx.isHeartbeatEnabled()) { | ||
| return new ResultHeartbeatManager(ctx.getHeartbeatIntervalSeconds()); | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| /** Returns the heartbeat manager, or null if heartbeat is disabled. */ | ||
| ResultHeartbeatManager getHeartbeatManager() { | ||
| return heartbeatManager; | ||
| } | ||
|
|
||
| @Override | ||
| public void open() throws SQLException { | ||
| this.session.open(); | ||
|
|
@@ -420,6 +439,9 @@ public void close() throws SQLException { | |
| statement.close(false); | ||
| statementSet.remove(statement); | ||
| } | ||
| if (heartbeatManager != null) { | ||
| heartbeatManager.shutdown(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [HIGH] for (IDatabricksStatementInternal statement : statementSet) {
statement.close(false); // makes RPCs — can throw
statementSet.remove(statement);
}
if (heartbeatManager != null) {
heartbeatManager.shutdown(); // never reached on throw above
}
Fix: Wrap in try/finally so try {
for (IDatabricksStatementInternal statement : statementSet) {
try { statement.close(false); } catch (Exception e) {
LOGGER.warn("Error closing statement: {}", e.getMessage());
}
statementSet.remove(statement);
}
} finally {
if (heartbeatManager != null) {
heartbeatManager.shutdown();
}
} |
||
| } | ||
| this.session.close(); | ||
| TelemetryClientFactory.getInstance().closeTelemetryClient(connectionContext); | ||
| DatabricksClientConfiguratorManager.getInstance().removeInstance(connectionContext); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import com.databricks.jdbc.common.Nullable; | ||
| import com.databricks.jdbc.common.StatementType; | ||
| import com.databricks.jdbc.common.util.WarningUtil; | ||
| import com.databricks.jdbc.dbclient.IDatabricksClient; | ||
| import com.databricks.jdbc.dbclient.impl.common.StatementId; | ||
| import com.databricks.jdbc.exception.DatabricksParsingException; | ||
| import com.databricks.jdbc.exception.DatabricksSQLException; | ||
|
|
@@ -123,6 +124,7 @@ public DatabricksResultSet( | |
| this.cachedTelemetryCollector = resolveTelemetryCollector(parentStatement); | ||
| this.isClosed = false; | ||
| this.wasNull = false; | ||
| startHeartbeatIfEnabled(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [CRITICAL] Heartbeat never starts on Thrift result sets — feature is dead-on-arrival on the Thrift path The Thrift constructor (this method, lines 153-196) does not call All Thrift result sets are constructed via Per the design doc's eligibility table, Thrift inline (data only on cluster, server-evictable) is one of the most critical scenarios this feature is meant to cover. It's silently broken. The eligibility tests in Fix: Add |
||
| } | ||
|
|
||
| @VisibleForTesting | ||
|
|
@@ -283,18 +285,167 @@ public boolean next() throws SQLException { | |
| cachedTelemetryCollector.recordResultSetIteration( | ||
| statementId.toSQLExecStatementId(), resultSetMetaData.getChunkCount(), hasNext); | ||
| } | ||
| if (!hasNext) { | ||
| stopHeartbeat(); | ||
| } | ||
| return hasNext; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws DatabricksSQLException { | ||
| stopHeartbeat(); | ||
| isClosed = true; | ||
| this.executionResult.close(); | ||
| if (parentStatement != null) { | ||
| parentStatement.handleResultSetClose(this); | ||
| } | ||
| } | ||
|
|
||
| /** Starts heartbeat polling if enabled on the connection and this result set is eligible. */ | ||
| private void startHeartbeatIfEnabled() { | ||
| if (parentStatement == null || statementId == null) { | ||
| return; | ||
| } | ||
| if (!isHeartbeatEligible()) { | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| DatabricksConnection conn = | ||
| (DatabricksConnection) parentStatement.getStatement().getConnection(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [CRITICAL] Pooled connections (HikariCP, DBCP, This direct cast
The exception is swallowed by the outer Fix (one of):
Option 2 is cleaner and matches how the rest of the driver handles pooled access. |
||
| ResultHeartbeatManager mgr = conn.getHeartbeatManager(); | ||
| if (mgr == null) { | ||
| return; // heartbeat not enabled | ||
| } | ||
|
|
||
| IDatabricksClient client = conn.getSession().getDatabricksClient(); | ||
| final int maxConsecutiveFailures = 10; | ||
| final java.util.concurrent.atomic.AtomicInteger consecutiveFailures = | ||
| new java.util.concurrent.atomic.AtomicInteger(0); | ||
| // Get the stopped flag from the manager — shared between the heartbeat task and | ||
| // stopHeartbeat(). Prevents RPC on a just-closed client/session: stopHeartbeat sets | ||
| // the flag before cancel(false), so an in-flight tick sees it and skips the RPC. | ||
| final java.util.concurrent.atomic.AtomicBoolean stopped = mgr.getStoppedFlag(statementId); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [CRITICAL] Orphan The // ResultHeartbeatManager.java
void startHeartbeat(StatementId statementId, Runnable heartbeatTask) {
...
stopHeartbeat(statementId); // line 63 — REMOVES this flag from map AND sets it to true
getStoppedFlag(statementId).set(false); // line 66 — computeIfAbsent creates a NEW AtomicBoolean
...
}So the AtomicBoolean captured by the closure here is the removed/orphaned one — permanently set to Net effect: every tick, The integration test only passes because warehouses don't actually expire results in 15s — so the absence of heartbeats isn't observed. Fix options (any one):
Add a unit test that asserts |
||
|
|
||
| Runnable heartbeatTask = | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [CRITICAL] Lambda strong-captures This lambda invokes The future is held in
The C# ADBC reference avoids this: its poller is per-statement with linked cancellation, so even GC of the statement helps. The Java implementation here is connection-scoped, so GC of the ResultSet alone won't help — the future keeps a hard reference back to the ResultSet. Fix: Don't capture |
||
| () -> { | ||
| if (stopped.get()) { | ||
| return; // client/session may be closed, skip RPC | ||
| } | ||
| try { | ||
| boolean alive = client.checkStatementAlive(statementId); | ||
| consecutiveFailures.set(0); // reset on success | ||
| if (!alive) { | ||
| LOGGER.info( | ||
| "Heartbeat detected terminal state for statement {}, stopping", statementId); | ||
| stopped.set(true); | ||
| stopHeartbeat(); | ||
| } | ||
| } catch (Exception e) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium] The heartbeat lambda at line 374 catches
Empirically demonstrated: a JUnit test wires a task that throws
The consequence is worse than swallowing exceptions: there's no Fix: } catch (Throwable t) {
if (capturedMgr.getStoppedFlag(capturedStatementId).get()) return;
// ... same failure-counter logic ...
if (t instanceof Error && !(t instanceof VirtualMachineError)) {
// log + stop cleanly; VirtualMachineError should still propagate
capturedMgr.stopHeartbeat(capturedStatementId);
}
if (t instanceof VirtualMachineError) throw (VirtualMachineError) t;
}(Or at minimum, change the catch to |
||
| // If stopped was set during the RPC (connection closing), don't count as failure | ||
| if (stopped.get()) { | ||
| return; | ||
| } | ||
| int failures = consecutiveFailures.incrementAndGet(); | ||
| if (failures == 1) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium] Default The default Empirically verified with a JUnit test that builds a no-override
User-visible consequence: if anyone wires a custom The WARN says "results may expire" — but the actual cause is a missing client-side override. Fix options:
|
||
| // First failure — log at INFO so users see the initial problem | ||
| LOGGER.info( | ||
| "Heartbeat failed for statement {} (first failure): {}", | ||
| statementId, | ||
| e.getMessage()); | ||
| } else { | ||
| LOGGER.debug( | ||
| "Heartbeat failed for statement {} (failure {}/{}): {}", | ||
| statementId, | ||
| failures, | ||
| maxConsecutiveFailures, | ||
| e.getMessage()); | ||
| } | ||
| if (failures >= maxConsecutiveFailures) { | ||
| // Terminal failure — log at WARN so it's visible in default log config | ||
| LOGGER.warn( | ||
| "Heartbeat stopped for statement {} after {} consecutive failures. " | ||
| + "Server-side results may expire. Last error: {}", | ||
| statementId, | ||
| failures, | ||
| e.getMessage()); | ||
| stopped.set(true); | ||
| stopHeartbeat(); | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| mgr.startHeartbeat(statementId, heartbeatTask); | ||
| LOGGER.debug( | ||
| "Heartbeat started for statement {} (resultType={}, interval={}s)", | ||
| statementId, | ||
| resultSetType, | ||
| mgr.getIntervalSeconds()); | ||
| } catch (Exception e) { | ||
| LOGGER.debug("Failed to start heartbeat: {}", e.getMessage()); | ||
| } | ||
| } | ||
|
|
||
| /** Stops the heartbeat for this result set's statement. Idempotent. */ | ||
| private void stopHeartbeat() { | ||
| if (parentStatement == null || statementId == null) { | ||
| return; | ||
| } | ||
| try { | ||
| DatabricksConnection conn = | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [High] Asymmetric with the C3 fix on the start path. DatabricksConnection conn =
(DatabricksConnection) parentStatement.getStatement().getConnection();On any pooled connection this throws Empirical verification:
Fix: Extract a |
||
| (DatabricksConnection) parentStatement.getStatement().getConnection(); | ||
| ResultHeartbeatManager mgr = conn.getHeartbeatManager(); | ||
| if (mgr != null) { | ||
| mgr.stopHeartbeat(statementId); | ||
| } | ||
| } catch (Exception e) { | ||
| LOGGER.debug("Failed to stop heartbeat: {}", e.getMessage()); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Determines whether this result set is eligible for heartbeat polling. Package-visible for | ||
| * testing. | ||
| * | ||
| * <p>Heartbeat is NOT needed when: | ||
| * | ||
| * <ul> | ||
| * <li>No execution result (nothing to fetch, also covers async PENDING/RUNNING with no data) | ||
| * <li>SEA inline (InlineJsonResult): all rows loaded in memory at construction | ||
| * <li>Update count (DML): no result rows to keep alive | ||
| * <li>Direct results (CLOSED state): server already closed, data fully delivered | ||
| * <li>Async execution (PENDING/RUNNING): user controls polling via getExecutionResult() | ||
| * </ul> | ||
| */ | ||
| boolean isHeartbeatEligible() { | ||
| // No execution result — nothing to fetch | ||
| if (executionResult == null) { | ||
| return false; | ||
| } | ||
| // SEA inline — all data loaded in memory at construction | ||
| if (resultSetType == ResultSetType.SEA_INLINE) { | ||
| return false; | ||
| } | ||
| // Update count — no result rows | ||
| if (statementType == StatementType.UPDATE) { | ||
| return false; | ||
| } | ||
| // Check execution state | ||
| if (executionStatus != null) { | ||
| com.databricks.jdbc.api.ExecutionState state = executionStatus.getExecutionState(); | ||
| // Direct results — server already closed | ||
| if (state == com.databricks.jdbc.api.ExecutionState.CLOSED) { | ||
| return false; | ||
| } | ||
| // Async execution — user controls polling | ||
| if (state == com.databricks.jdbc.api.ExecutionState.PENDING | ||
| || state == com.databricks.jdbc.api.ExecutionState.RUNNING) { | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| private static TelemetryCollector resolveTelemetryCollector( | ||
| IDatabricksStatementInternal parentStatement) { | ||
| try { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -172,6 +172,13 @@ public void close(boolean removeFromSession) throws DatabricksSQLException { | |
| this.connection.closeStatement(this); | ||
| } | ||
| DatabricksThreadContextHolder.clearStatementInfo(); | ||
| // Safety net: stop any heartbeat for this statement | ||
| if (statementId != null) { | ||
| ResultHeartbeatManager mgr = connection.getHeartbeatManager(); | ||
| if (mgr != null) { | ||
| mgr.stopHeartbeat(statementId); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [HIGH] This After Fix: Add a heartbeat stop to public void cancel() throws SQLException {
...
if (statementId != null) {
ResultHeartbeatManager mgr = connection.getHeartbeatManager();
if (mgr != null) {
mgr.stopHeartbeat(statementId);
}
}
this.connection.getSession().getDatabricksClient().cancelStatement(statementId);
...
} |
||
| } | ||
| } | ||
| shutDownExecutor(); | ||
| this.updateCount = -1; | ||
| this.isClosed = true; | ||
|
|
@@ -672,6 +679,8 @@ public ResultSet executeAsync(String sql) throws SQLException { | |
| LOGGER.debug("ResultSet executeAsync() for statement {%s}", sql); | ||
| checkIfClosed(); | ||
|
|
||
| // No heartbeat during async wait — the user controls polling via getExecutionResult(). | ||
| // Heartbeat starts later when the ResultSet is constructed (after getExecutionResult()). | ||
| resetForNewExecution(); | ||
|
|
||
| IDatabricksClient client = connection.getSession().getDatabricksClient(); | ||
|
|
@@ -969,6 +978,16 @@ private void resetForNewExecution() { | |
| // when the server returns unexpected responses (e.g., WireMock 404 in tests). | ||
| // For direct results, the server already closed the handle. | ||
|
|
||
| // Stop heartbeat for the previous execution before clearing state. | ||
| // Without this, the old heartbeat (keyed by old statementId) would fail and self-terminate | ||
| // after 10 consecutive failures — wasteful and noisy in logs. | ||
| if (statementId != null) { | ||
| ResultHeartbeatManager mgr = connection.getHeartbeatManager(); | ||
| if (mgr != null) { | ||
| mgr.stopHeartbeat(statementId); | ||
| } | ||
| } | ||
|
|
||
| directResultsReceived = false; | ||
|
|
||
| // Per JDBC spec, re-executing a Statement implicitly closes the current ResultSet. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[HIGH]
instanceof DatabricksConnectionContextsilently disables heartbeat for any other context implisHeartbeatEnabled()andgetHeartbeatIntervalSeconds()live on the concrete classDatabricksConnectionContext, not on theIDatabricksConnectionContextinterface. Any test mock, test double, or alternate implementation ofIDatabricksConnectionContextfalls through toreturn null— heartbeat silently disabled.This pattern also makes the feature impossible to enable from any future context implementation (e.g., a wrapped/decorated context for telemetry or testing) without modifying this exact
instanceofcheck.Fix: Add the two methods to
IDatabricksConnectionContextwith default impls and drop theinstanceof:Then this method becomes: