Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
6982df6
[KYUUBI #7379][PR 4/4] Bundle SQLite and PostgreSQL JDBC drivers in D…
wangzhigang1999 May 5, 2026
30c035f
[KYUUBI #7379][PR 4/4] Rework AgentFinish/AgentRunContext token fields
wangzhigang1999 May 5, 2026
2e5478b
[KYUUBI #7379][PR 4/4] Stream reasoning content via ReasoningDelta event
wangzhigang1999 May 5, 2026
5af5a11
[KYUUBI #7379][PR 4/4] Add ChatRequest and ApprovalRequest DTOs for D…
wangzhigang1999 May 5, 2026
f8a45f7
[KYUUBI #7379][PR 4/4] Add DataAgent REST API for chat and approve
wangzhigang1999 May 5, 2026
88cf37c
[KYUUBI #7379][PR 4/4] Add Data Agent chat interface in Web UI
wangzhigang1999 May 5, 2026
36a7ee1
[KYUUBI #7379][PR 4/4] Fix JDBC sanitizer and persist credentials safely
wangzhigang1999 May 6, 2026
d197703
[KYUUBI #7379][PR 4/4] Harden Data Agent REST chat path
wangzhigang1999 May 6, 2026
39ad3a9
[KYUUBI #7379][PR 4/4] Slim chat messages on persist to fit sessionSt…
wangzhigang1999 May 6, 2026
cd7b9d6
[KYUUBI #7379][PR 4/4] Round 2 review fixes for Data Agent REST resource
wangzhigang1999 May 6, 2026
3e68495
[KYUUBI #7379][PR 4/4] Round 2 review fixes for Data Agent web UI
wangzhigang1999 May 6, 2026
d2f39e0
[KYUUBI #7379][PR 4/4] Harden Data Agent JDBC datasource setup
wangzhigang1999 May 7, 2026
dbf3bcb
[KYUUBI #7379][PR 4/4] Polish Data Agent web UI error and confirm flows
wangzhigang1999 May 7, 2026
dafad98
[KYUUBI #7379][PR 4/4] Make Data Agent default JDBC URL work outside …
wangzhigang1999 May 9, 2026
1f0def8
[KYUUBI #7379][PR 4/4] Add Data Agent quick start documentation
wangzhigang1999 May 9, 2026
e5fb963
[KYUUBI #7379][PR 4/4] Fix Data Agent web UI copy and persist limits
wangzhigang1999 May 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/imgs/data-agent/01-welcome.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/imgs/data-agent/02-list-tables.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/imgs/data-agent/03-approval-destructive.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/imgs/data-agent/04-approval-strict.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/imgs/data-agent/05-max-iterations.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/imgs/data-agent/06-error-banner.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/imgs/data-agent/07-trino.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/imgs/data-agent/08-mysql.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/quick_start/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ Quick Start
quick_start
quick_start_with_helm
quick_start_with_jdbc
quick_start_with_data_agent
211 changes: 211 additions & 0 deletions docs/quick_start/quick_start_with_data_agent.md

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions externals/kyuubi-data-agent-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,19 @@
<version>${victools.jsonschema.version}</version>
</dependency>

<!-- JDBC drivers are not bundled; users provide them at runtime via extra classpath.
SQLite and MySQL drivers are kept here in test scope only. -->
<!-- JDBC drivers. SQLite and PostgreSQL are bundled (Apache-compatible licenses, common
in AI demos). MySQL is GPL — kept in test scope only; production users must supply
the driver via kyuubi.engine.data.agent.extra.classpath. Other drivers (ClickHouse,
Snowflake, Doris, etc.) follow the same extra-classpath path. -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>${sqlite.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,29 @@ private static DataSource attachJdbcDataSource(
if (jdbcUrl == null) {
return null;
}
LOG.info("Data Agent JDBC URL configured ({})", jdbcUrl.replaceAll("//.*@", "//<redacted>@"));

String sessionUser =
ConfUtils.optionalString(conf, KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY());

DataSource ds = DataSourceFactory.create(jdbcUrl, sessionUser);
JdbcDialect dialect = JdbcDialect.fromUrl(jdbcUrl);
if (dialect == null) {
throw new IllegalArgumentException(
"Invalid " + KyuubiConf.ENGINE_DATA_AGENT_JDBC_URL().key());
}
LOG.info("Data Agent JDBC datasource configured ({})", dialect.datasourceName());

// The kyuubi session user is only meaningful when we connect back to Kyuubi/HiveServer2,
// where it drives proxy-user impersonation for the downstream engine. For external JDBC
// backends (mysql, postgresql, trino, ...) it would shadow URL-supplied credentials and
// cause auth failures (e.g. MySQL Connector/J overriding ?user=root with anonymous), so
// leave the username unset and let the driver read it from the URL.
String subprotocol = JdbcDialect.extractSubprotocol(jdbcUrl);
boolean isKyuubiBackend = "hive2".equals(subprotocol) || "kyuubi".equals(subprotocol);
String hikariUser =
isKyuubiBackend
? ConfUtils.optionalString(conf, KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY())
: null;

DataSource ds = DataSourceFactory.create(jdbcUrl, hikariUser);
registry.register(new RunSelectQueryTool(ds, queryTimeoutSeconds));
registry.register(new RunMutationQueryTool(ds, queryTimeoutSeconds));
promptBuilder.datasource(JdbcDialect.fromUrl(jdbcUrl).datasourceName());
promptBuilder.datasource(dialect.datasourceName());
return ds;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void run(String sessionId, ProviderRunRequest request, Consumer<AgentEven

onEvent.accept(new ContentComplete(reply));
onEvent.accept(new StepEnd(1));
onEvent.accept(new AgentFinish(1, 0, 0, 0));
onEvent.accept(new AgentFinish(1, 0, 0, 0, 0));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ public class AgentRunContext {
private final String sessionId;
private Consumer<AgentEvent> eventEmitter;
private int iteration;
private long promptTokens;
private long completionTokens;
private long totalTokens;
// accumulatedXxx: summed across every LLM call in this run (billing).
// lastXxx: most recent LLM call only; UIs use lastPromptTokens as "current context size".
// total is forwarded from the provider verbatim (not prompt+completion) because reasoning /
// cached tokens don't show up in the split, and CompactionMiddleware needs the real total.
private long accumulatedPromptTokens;
private long accumulatedCompletionTokens;
private long lastPromptTokens;
private long lastCompletionTokens;
private ApprovalMode approvalMode;

public AgentRunContext(ConversationMemory memory, ApprovalMode approvalMode) {
Expand Down Expand Up @@ -67,27 +72,34 @@ public void setIteration(int iteration) {
this.iteration = iteration;
}

public long getPromptTokens() {
return promptTokens;
public long getAccumulatedPromptTokens() {
return accumulatedPromptTokens;
}

public long getCompletionTokens() {
return completionTokens;
public long getAccumulatedCompletionTokens() {
return accumulatedCompletionTokens;
}

public long getTotalTokens() {
return totalTokens;
public long getLastPromptTokens() {
return lastPromptTokens;
}

public long getLastCompletionTokens() {
return lastCompletionTokens;
}

/**
* Record one LLM call's usage. Updates both the per-run counters on this context and the
* session-level cumulative on the underlying {@link ConversationMemory}, so middlewares that need
* a session-wide picture can read it directly from memory without keeping their own bookkeeping.
* The provider's {@code total} is forwarded as-is and may exceed {@code prompt + completion} when
* the provider counts cached or reasoning tokens separately.
*/
public void addTokenUsage(long prompt, long completion, long total) {
this.promptTokens += prompt;
this.completionTokens += completion;
this.totalTokens += total;
this.accumulatedPromptTokens += prompt;
this.accumulatedCompletionTokens += completion;
this.lastPromptTokens = prompt;
this.lastCompletionTokens = completion;
memory.addCumulativeTokens(prompt, completion, total);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.engine.dataagent.runtime;

import com.openai.client.OpenAIClient;
import com.openai.core.JsonValue;
import com.openai.core.http.StreamResponse;
import com.openai.models.chat.completions.ChatCompletionAssistantMessageParam;
import com.openai.models.chat.completions.ChatCompletionChunk;
Expand All @@ -31,6 +32,7 @@
import java.util.List;
import java.util.Map;
import org.apache.kyuubi.engine.dataagent.runtime.event.ContentDelta;
import org.apache.kyuubi.engine.dataagent.runtime.event.ReasoningDelta;
import org.apache.kyuubi.engine.dataagent.tool.ToolRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,10 +92,28 @@ private void consumeChunk(AgentRunContext ctx, ChatCompletionChunk chunk, Stream
acc.content.append(text);
ctx.emit(new ContentDelta(text));
});
// qwen3 / DeepSeek-R1 stream chain-of-thought as a non-standard `reasoning_content`
// field on the delta. openai-java exposes it via _additionalProperties().
String reasoning = extractReasoningDelta(c.delta()._additionalProperties());
if (reasoning != null && !reasoning.isEmpty()) {
ctx.emit(new ReasoningDelta(reasoning));
}
c.delta().toolCalls().ifPresent(acc::mergeToolCallDeltas);
}
}

private static String extractReasoningDelta(Map<String, JsonValue> extras) {
if (extras == null || extras.isEmpty()) return null;
JsonValue v = extras.get("reasoning_content");
if (v == null) v = extras.get("reasoning");
if (v == null) return null;
try {
return v.convert(String.class);
} catch (RuntimeException e) {
return null;
}
}

/**
* Mutable accumulator for a single streaming LLM turn. Tool call fields are keyed by the chunk's
* {@code index} because provider SDKs may deliver a single logical call across multiple chunks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,10 @@ private static void emitFinish(AgentRunContext ctx) {
ctx.emit(
new AgentFinish(
ctx.getIteration(),
ctx.getPromptTokens(),
ctx.getCompletionTokens(),
ctx.getTotalTokens()));
ctx.getAccumulatedPromptTokens(),
ctx.getAccumulatedCompletionTokens(),
ctx.getLastPromptTokens(),
ctx.getLastCompletionTokens()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,64 @@

package org.apache.kyuubi.engine.dataagent.runtime.event;

/** The agent has finished its analysis. */
/**
* The agent has finished its analysis. Carries two distinct usage views: {@code accumulated*} sums
* every LLM call in this run (billing / total cost), {@code last*} reflects only the final call
* (current context size, last response size).
*/
public final class AgentFinish extends AgentEvent {
private final int totalSteps;
private final long promptTokens;
private final long completionTokens;
private final long totalTokens;
private final long accumulatedPromptTokens;
private final long accumulatedCompletionTokens;
private final long lastPromptTokens;
private final long lastCompletionTokens;

public AgentFinish(int totalSteps, long promptTokens, long completionTokens, long totalTokens) {
public AgentFinish(
int totalSteps,
long accumulatedPromptTokens,
long accumulatedCompletionTokens,
long lastPromptTokens,
long lastCompletionTokens) {
super(EventType.AGENT_FINISH);
this.totalSteps = totalSteps;
this.promptTokens = promptTokens;
this.completionTokens = completionTokens;
this.totalTokens = totalTokens;
this.accumulatedPromptTokens = accumulatedPromptTokens;
this.accumulatedCompletionTokens = accumulatedCompletionTokens;
this.lastPromptTokens = lastPromptTokens;
this.lastCompletionTokens = lastCompletionTokens;
}

public int totalSteps() {
return totalSteps;
}

public long promptTokens() {
return promptTokens;
public long accumulatedPromptTokens() {
return accumulatedPromptTokens;
}

public long accumulatedCompletionTokens() {
return accumulatedCompletionTokens;
}

public long completionTokens() {
return completionTokens;
public long lastPromptTokens() {
return lastPromptTokens;
}

public long totalTokens() {
return totalTokens;
public long lastCompletionTokens() {
return lastCompletionTokens;
}

@Override
public String toString() {
return "AgentFinish{totalSteps="
+ totalSteps
+ ", promptTokens="
+ promptTokens
+ ", completionTokens="
+ completionTokens
+ ", totalTokens="
+ totalTokens
+ ", accumulatedPromptTokens="
+ accumulatedPromptTokens
+ ", accumulatedCompletionTokens="
+ accumulatedCompletionTokens
+ ", lastPromptTokens="
+ lastPromptTokens
+ ", lastCompletionTokens="
+ lastCompletionTokens
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public enum EventType {
/** A single token or chunk from the LLM streaming response. */
CONTENT_DELTA("content_delta"),

/** A chunk of the LLM's reasoning / chain-of-thought stream (provider-specific). */
REASONING_DELTA("reasoning_delta"),

/** The complete LLM output for one reasoning step. */
CONTENT_COMPLETE("content_complete"),

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.dataagent.runtime.event;

/**
* A chunk of the LLM's hidden reasoning / chain-of-thought stream. Emitted alongside {@link
* ContentDelta} when the provider exposes a {@code reasoning_content} delta field (e.g. qwen3,
* DeepSeek-R1). UI clients typically render these in a collapsible "thinking" panel.
*/
public final class ReasoningDelta extends AgentEvent {
private final String text;

public ReasoningDelta(String text) {
super(EventType.REASONING_DELTA);
this.text = text;
}

public String text() {
return text;
}

@Override
public String toString() {
String preview = text != null && text.length() > 200 ? text.substring(0, 200) + "..." : text;
return "ReasoningDelta{text='" + preview + "'}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,19 @@ public void onAgentStart(AgentRunContext ctx) {

@Override
public void onAgentFinish(AgentRunContext ctx) {
long accPrompt = ctx.getAccumulatedPromptTokens();
long accCompletion = ctx.getAccumulatedCompletionTokens();
LOG.info(
"{}FINISH steps={}, prompt_tokens={}, completion_tokens={}, total_tokens={}",
"{}FINISH steps={}, "
+ "accumulated(prompt={}, completion={}, total={}), "
+ "last(prompt={}, completion={})",
prefix(),
ctx.getIteration(),
ctx.getPromptTokens(),
ctx.getCompletionTokens(),
ctx.getTotalTokens());
accPrompt,
accCompletion,
accPrompt + accCompletion,
ctx.getLastPromptTokens(),
ctx.getLastCompletionTokens());
}

@Override
Expand All @@ -105,16 +111,19 @@ public Decision<ChatCompletionAssistantMessageParam> afterLlmCall(
AgentRunContext ctx, ChatCompletionAssistantMessageParam response) {
String content = response.content().map(Object::toString).orElse("");
int toolCallCount = response.toolCalls().map(List::size).orElse(0);
long accPrompt = ctx.getAccumulatedPromptTokens();
long accCompletion = ctx.getAccumulatedCompletionTokens();
LOG.info(
"{}LLM response: step={}, content=\"{}\", tool_calls={}, "
+ "usage(cumulative): prompt={}, completion={}, total={}",
+ "accumulated(prompt={}, completion={}, total={}), last_prompt={}",
prefix(),
ctx.getIteration(),
truncate(content),
toolCallCount,
ctx.getPromptTokens(),
ctx.getCompletionTokens(),
ctx.getTotalTokens());
accPrompt,
accCompletion,
accPrompt + accCompletion,
ctx.getLastPromptTokens());
return Decision.proceed();
}

Expand Down
Loading
Loading