diff --git a/docs/imgs/data-agent/01-welcome.png b/docs/imgs/data-agent/01-welcome.png new file mode 100644 index 00000000000..62d44be8821 Binary files /dev/null and b/docs/imgs/data-agent/01-welcome.png differ diff --git a/docs/imgs/data-agent/02-list-tables.png b/docs/imgs/data-agent/02-list-tables.png new file mode 100644 index 00000000000..09e57fabcd6 Binary files /dev/null and b/docs/imgs/data-agent/02-list-tables.png differ diff --git a/docs/imgs/data-agent/03-approval-destructive.png b/docs/imgs/data-agent/03-approval-destructive.png new file mode 100644 index 00000000000..1701364a5d8 Binary files /dev/null and b/docs/imgs/data-agent/03-approval-destructive.png differ diff --git a/docs/imgs/data-agent/04-approval-strict.png b/docs/imgs/data-agent/04-approval-strict.png new file mode 100644 index 00000000000..0e24fa6abd1 Binary files /dev/null and b/docs/imgs/data-agent/04-approval-strict.png differ diff --git a/docs/imgs/data-agent/05-max-iterations.png b/docs/imgs/data-agent/05-max-iterations.png new file mode 100644 index 00000000000..3a5c2794457 Binary files /dev/null and b/docs/imgs/data-agent/05-max-iterations.png differ diff --git a/docs/imgs/data-agent/06-error-banner.png b/docs/imgs/data-agent/06-error-banner.png new file mode 100644 index 00000000000..8d112f0f2b7 Binary files /dev/null and b/docs/imgs/data-agent/06-error-banner.png differ diff --git a/docs/imgs/data-agent/07-trino.png b/docs/imgs/data-agent/07-trino.png new file mode 100644 index 00000000000..e89d1fc89da Binary files /dev/null and b/docs/imgs/data-agent/07-trino.png differ diff --git a/docs/imgs/data-agent/08-mysql.png b/docs/imgs/data-agent/08-mysql.png new file mode 100644 index 00000000000..20679731839 Binary files /dev/null and b/docs/imgs/data-agent/08-mysql.png differ diff --git a/docs/quick_start/index.rst b/docs/quick_start/index.rst index 60bad27d4c1..bc14e102f06 100644 --- a/docs/quick_start/index.rst +++ b/docs/quick_start/index.rst @@ -26,3 +26,4 @@ Quick Start quick_start quick_start_with_helm quick_start_with_jdbc + quick_start_with_data_agent diff --git a/docs/quick_start/quick_start_with_data_agent.md b/docs/quick_start/quick_start_with_data_agent.md new file mode 100644 index 00000000000..fed6573de1b --- /dev/null +++ b/docs/quick_start/quick_start_with_data_agent.md @@ -0,0 +1,211 @@ + + +# Data Agent Engine + +The Data Agent engine lets users explore and query data with natural language. It is a Kyuubi engine type (`DATA_AGENT`) that runs an LLM-driven [ReAct](https://arxiv.org/abs/2210.03629) agent. The agent reasons about the user's request, calls a small set of SQL tools against a JDBC-accessible backend (Spark, Trino, MySQL, SQLite, ...), and streams its responses to the bundled web UI. + +This page covers how to configure the engine, how the web UI behaves at runtime, the approval / risk model that gates SQL tool execution, and a few deployment notes worth knowing. + +## At a glance + +- **Engine type**: `DATA_AGENT`, share level follows `kyuubi.engine.share.level` like every other engine +- **LLM provider**: any OpenAI-compatible chat-completion endpoint (OpenAI, Azure OpenAI, vLLM, llama.cpp, or any other provider that exposes the OpenAI chat-completion API) +- **Datasource**: any JDBC URL the engine can reach. When unset, defaults to ZooKeeper service discovery for ZK-HA deployments, or `jdbc:hive2://localhost:/default` otherwise +- **Tools** (built in): `run_select_query` (low-risk, read-only) and `run_mutation_query` (high-risk, mutating). Per-dialect prompt templates ship for Spark / Trino / MySQL / SQLite / Generic +- **Interface**: the bundled Kyuubi web UI. Pick **Data Agent** from the left navigation after logging in + +## Quick start + +### 1. Configure the LLM provider + +Add the following to `$KYUUBI_HOME/conf/kyuubi-defaults.conf`: + +``` +kyuubi.engine.data.agent.provider OPENAI_COMPATIBLE + +# OpenAI-compatible endpoint and credentials +kyuubi.engine.data.agent.openai.endpoint https://api.openai.com/v1 +kyuubi.engine.data.agent.openai.api.key sk-xxxxxxxxxxxxxxxxxxxxxxxx +kyuubi.engine.data.agent.model gpt-4o-mini +``` + +Substitute the endpoint, API key, and model ID for whichever OpenAI-compatible provider you use. + +### 2. Restart Kyuubi and open the UI + +``` +$KYUUBI_HOME/bin/kyuubi restart +``` + +Open `http://:10099/ui` in a browser (the port comes from `kyuubi.frontend.rest.bind.port`, default `10099`). Pick **Data Agent** in the left navigation. + +### 3. Ask a question + +![Welcome page](../imgs/data-agent/01-welcome.png) + +Leave the **JDBC URL** field empty to query the same Kyuubi cluster the UI is running on. Type a question into the input box and press Enter. + +The agent reasons through the request, runs SQL on your behalf, and renders the result inline: + +![Query result with markdown table](../imgs/data-agent/02-list-tables.png) + +## Configuration reference + +All keys live under `kyuubi.engine.data.agent.*`. They can be set in `kyuubi-defaults.conf` (cluster default) or, for the few that have a UI control, overridden per request from the chat header. Server-side defaults from the source of truth in `org.apache.kyuubi.config.KyuubiConf`: + +| Key | Default | Purpose | +|------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `kyuubi.engine.data.agent.provider` | `ECHO` | Set to `OPENAI_COMPATIBLE` to use any OpenAI-style chat-completion endpoint. | +| `kyuubi.engine.data.agent.model` | _unset_ | Model ID. The UI **Model** input on the chat header overrides this for a single turn; leave it empty in the UI to fall back to this value. | +| `kyuubi.engine.data.agent.openai.endpoint` | _unset_ | Base URL of the OpenAI-compatible chat-completion endpoint. | +| `kyuubi.engine.data.agent.openai.api.key` | _unset_ | API key for that endpoint. Treat as a secret; see [Security notes](#security-notes). | +| `kyuubi.engine.data.agent.jdbc.url` | _unset_ | JDBC URL the agent should query. When unset, the engine falls back to the same Kyuubi cluster the UI is running on. The UI **JDBC URL** field on the welcome screen overrides this per session. | +| `kyuubi.engine.data.agent.approval.mode` | `NORMAL` | Default approval policy: `AUTO_APPROVE` runs every tool without prompting, `NORMAL` only prompts before `DESTRUCTIVE` tools, `STRICT` prompts before every tool call. The UI dropdown on the chat header overrides this per turn. | +| `kyuubi.engine.data.agent.max.iterations` | `100` | Cap on ReAct loop steps for a single user turn. Hitting the cap surfaces a `Reached maximum iterations (N)` banner in the UI. | +| `kyuubi.engine.data.agent.compaction.trigger.tokens` | `128000` | When the predicted next prompt size (real previous prompt tokens + estimated new tail) exceeds this value, older conversation history is summarised into a single message. | +| `kyuubi.engine.data.agent.query.timeout` | `PT3M` | Inner JDBC `Statement.setQueryTimeout` for SQL tools. Should be **lower than** `tool.call.timeout` so the backend has time to react before the outer cap fires. | +| `kyuubi.engine.data.agent.tool.call.timeout` | `PT5M` | Outer wall-clock cap on every tool call, enforced by the agent runtime. | +| `kyuubi.engine.data.agent.memory` | `1g` | Heap allocated to the engine JVM. | +| `kyuubi.engine.data.agent.java.options` | _unset_ | Extra JVM options. | +| `kyuubi.engine.data.agent.extra.classpath` | _unset_ | Extra classpath, e.g. for the LLM SDK or the JDBC driver of the target datasource. | +| `kyuubi.frontend.data.agent.operation.timeout` | `PT2M` | Server-side timeout for engine launch and operation start. Surfaces as an error banner if exceeded. | + +## Approval and risk model + +Each built-in tool carries a static risk level. The runtime gates execution on the active approval mode: + +| Tool | Risk | `AUTO_APPROVE` | `NORMAL` (default) | `STRICT` | +|----------------------|---------------|------------------|---------------------------|---------------------------| +| `run_select_query` | `SAFE` | runs immediately | runs immediately | prompts before every call | +| `run_mutation_query` | `DESTRUCTIVE` | runs immediately | prompts before every call | prompts before every call | + +When approval is required the agent pauses and renders an inline approval block in the chat. The block shows the tool name, its risk tag, and the structured arguments the model is about to send to the backend. + +![NORMAL mode prompts before a CREATE TABLE](../imgs/data-agent/03-approval-destructive.png) + +Under `STRICT`, even a read-only `SELECT` requires confirmation: + +![STRICT mode prompts before any SQL](../imgs/data-agent/04-approval-strict.png) + +Pressing **Approve** lets the call run, returns the result to the model, and the loop continues. Pressing **Deny** records the rejection and lets the model react — typically the agent acknowledges the denial and asks the user how to proceed, rather than retrying the same call in a loop. + +The approval mode dropdown on the chat header propagates per turn, so a user can keep `NORMAL` as the day-to-day default and switch to `STRICT` when running a query plan they are reviewing. + +## Web UI behaviour + +A few UI behaviours worth knowing: + +- **Multi-conversation**. The right rail lists every active conversation. Each conversation owns its own session handle, JDBC URL, model override, approval mode, and message history. Switching conversations restores all four. +- **Session storage persistence**. Per-session message history is mirrored in the browser `sessionStorage` so a tab reload restores the conversation without a server round trip. Older messages are slimmed before persisting to keep the entry under the storage quota. +- **Stop**. While a response is streaming, the chat header shows a **Stop** button that aborts the current turn cleanly: any in-flight tool call is cancelled and the input bar re-enables. +- **Error banner**. Engine and LLM errors render inline above the input bar with a dismiss `×`. Engine-fatal errors additionally surface a **Reset session** button that drops the session handle and starts a fresh one. +- **Datasource tag**. The chat header shows either `Data: Server default` (no JDBC URL configured) or the host portion of the configured JDBC URL with a tooltip carrying the full URL. + +The engine surfaces hard-cap errors (max iterations reached, model not found, datasource unreachable) as the same kind of banner: + +![Reached maximum iterations](../imgs/data-agent/05-max-iterations.png) + +![A bad model name surfaces the upstream 404](../imgs/data-agent/06-error-banner.png) + +## Datasources + +The agent attaches to one JDBC URL per session. Set it from any of: + +1. The **JDBC URL** input on the welcome screen. The field is editable until the first message lands; once the session is bound it becomes read-only for the rest of that conversation. +2. `kyuubi.engine.data.agent.jdbc.url` in `kyuubi-defaults.conf` as the cluster default. +3. Leave both empty to attach back to the same Kyuubi cluster — see the [Connecting back to Kyuubi](#connecting-back-to-kyuubi) note below. + +The dialect of the URL is auto-detected from the JDBC scheme and selects the matching SQL prompt fragment that ships with the engine. Out of the box: `Spark`/`Hive`, `Trino`, `MySQL`, `SQLite`, plus a `Generic` fallback. The Kyuubi/Hive, Trino, SQLite, and PostgreSQL JDBC drivers are bundled with the Data Agent engine; for any other dialect (e.g. MySQL), add the driver via `kyuubi.engine.data.agent.extra.classpath` (or copy the jar into `$KYUUBI_HOME/externals/engines/data-agent/`). + +### Configuring datasources + +Set the cluster default in `kyuubi-defaults.conf`, or paste the URL into the welcome-screen field per session. + +#### Spark / Kyuubi (the same cluster the UI runs on) + +The default. Leave the JDBC URL empty and the engine connects back to this Kyuubi cluster. **When Kyuubi has authentication enabled**, see [Connecting back to Kyuubi](#connecting-back-to-kyuubi). + +#### Spark / Kyuubi (a different cluster, or pinned host:port) + +``` +kyuubi.engine.data.agent.jdbc.url jdbc:hive2://kyuubi-host:10009/default +``` + +Add `;user=...;password=...` to the URL when the target cluster requires authentication. + +#### Trino + +``` +kyuubi.engine.data.agent.jdbc.url jdbc:trino://trino-coordinator:8080/hive/default?user=alice +``` + +For HTTPS / LDAP-secured Trino add `?SSL=true&user=...&password=...` to the URL. + +![Trino datasource — listing schemas](../imgs/data-agent/07-trino.png) + +#### MySQL + +``` +kyuubi.engine.data.agent.jdbc.url jdbc:mysql://mysql-host:3306/your_db?user=root&password=... +kyuubi.engine.data.agent.extra.classpath /path/to/mysql-connector-java-8.x.jar +``` + +Drop the MySQL JDBC driver jar onto `extra.classpath`, or copy it directly into `$KYUUBI_HOME/externals/engines/data-agent/`. + +![MySQL datasource — discover schema and run COUNT](../imgs/data-agent/08-mysql.png) + +#### SQLite (handy for local demos) + +``` +kyuubi.engine.data.agent.jdbc.url jdbc:sqlite:/absolute/path/to/file.db +``` + +#### Generic JDBC + +Anything else with a JDBC driver works behind the `Generic` dialect — point the URL at it and add the driver via `extra.classpath`. The model loses the dialect-specific syntax hints but the SQL tools still execute through `Statement` exactly the same way. + +### Connecting back to Kyuubi + +When `kyuubi.engine.data.agent.jdbc.url` is unset, Kyuubi derives a default URL: + +- **ZooKeeper HA** (`kyuubi.ha.addresses` set with the default `ZookeeperDiscoveryClient`): use ZK service discovery against the configured addresses. +- **Other deployments** (single-node, or HA with non-ZooKeeper discovery such as etcd): assume Kyuubi is reachable on `localhost` and produce `jdbc:hive2://localhost:/default`. + +If the Data Agent engine runs in a different network namespace from the Kyuubi server, or Kyuubi is not listening on `localhost`, set `kyuubi.engine.data.agent.jdbc.url` (or the welcome-screen field) explicitly. + +This works out of the box when the Kyuubi server is configured with `kyuubi.authentication=NONE`. **It does not work when LDAP authentication is enabled**, because the engine's JDBC connection carries the user identity (e.g. `alice`) but no password, and Kyuubi's LDAP provider rejects the empty bind with `LDAP error code 49`. In an authenticated deployment, embed credentials in the JDBC URL instead: + +``` +jdbc:hive2://kyuubi-host:10009/default;user=alice;password=... +``` + +## Operating notes + +### Engine lifecycle + +The Data Agent engine respects `kyuubi.engine.share.level`, `kyuubi.session.engine.idle.timeout`, and the rest of the standard engine lifecycle knobs, with one Data-Agent-specific addition: a Data Agent engine is bound to a single JDBC datasource, so the engine subdomain is automatically derived from the configured `kyuubi.engine.data.agent.jdbc.url`. With the default `USER` share level this means engines are keyed by **(user, JDBC URL)** — two sessions targeting the same datasource as the same user reuse one engine, and a session pointing at a different JDBC URL gets its own engine. + +### Logs + +Per-engine stdout/stderr lands under `$KYUUBI_LOG_DIR/work//kyuubi-data-agent-engine.log.0`. + +### Security notes + +- The engine launch command logged by the server has `kyuubi.engine.data.agent.openai.api.key` and `kyuubi.engine.data.agent.jdbc.url` redacted. Other places where these values may surface (for example session events, or `spark.`-prefixed variants rendered into other engine launch events) are not redacted unless you set `kyuubi.server.redaction.regex` to cover them — there is no default regex. +- The chat history (including SQL produced by the model and any data the model read) is held server-side for the lifetime of the session and is mirrored into the browser's `sessionStorage`. Treat the Data Agent UI as a tool with the same access scope as the configured JDBC URL. + diff --git a/externals/kyuubi-data-agent-engine/pom.xml b/externals/kyuubi-data-agent-engine/pom.xml index be29d409208..9b2ea10faf4 100644 --- a/externals/kyuubi-data-agent-engine/pom.xml +++ b/externals/kyuubi-data-agent-engine/pom.xml @@ -125,13 +125,19 @@ ${victools.jsonschema.version} - + org.xerial sqlite-jdbc ${sqlite.version} - test + + + + org.postgresql + postgresql diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/chatcompletion/ChatCompletionProvider.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/chatcompletion/ChatCompletionProvider.java index b9bb30edb7e..8940ab24fda 100644 --- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/chatcompletion/ChatCompletionProvider.java +++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/chatcompletion/ChatCompletionProvider.java @@ -118,15 +118,29 @@ private static DataSource attachJdbcDataSource( if (jdbcUrl == null) { return null; } - LOG.info("Data Agent JDBC URL configured ({})", jdbcUrl.replaceAll("//.*@", "//@")); - - String sessionUser = - ConfUtils.optionalString(conf, KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY()); - - DataSource ds = DataSourceFactory.create(jdbcUrl, sessionUser); + JdbcDialect dialect = JdbcDialect.fromUrl(jdbcUrl); + if (dialect == null) { + throw new IllegalArgumentException( + "Invalid " + KyuubiConf.ENGINE_DATA_AGENT_JDBC_URL().key()); + } + LOG.info("Data Agent JDBC datasource configured ({})", dialect.datasourceName()); + + // The kyuubi session user is only meaningful when we connect back to Kyuubi/HiveServer2, + // where it drives proxy-user impersonation for the downstream engine. For external JDBC + // backends (mysql, postgresql, trino, ...) it would shadow URL-supplied credentials and + // cause auth failures (e.g. MySQL Connector/J overriding ?user=root with anonymous), so + // leave the username unset and let the driver read it from the URL. + String subprotocol = JdbcDialect.extractSubprotocol(jdbcUrl); + boolean isKyuubiBackend = "hive2".equals(subprotocol) || "kyuubi".equals(subprotocol); + String hikariUser = + isKyuubiBackend + ? ConfUtils.optionalString(conf, KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY()) + : null; + + DataSource ds = DataSourceFactory.create(jdbcUrl, hikariUser); registry.register(new RunSelectQueryTool(ds, queryTimeoutSeconds)); registry.register(new RunMutationQueryTool(ds, queryTimeoutSeconds)); - promptBuilder.datasource(JdbcDialect.fromUrl(jdbcUrl).datasourceName()); + promptBuilder.datasource(dialect.datasourceName()); return ds; } diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/echo/EchoProvider.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/echo/EchoProvider.java index 77615ce864a..59152471c98 100644 --- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/echo/EchoProvider.java +++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/echo/EchoProvider.java @@ -54,7 +54,7 @@ public void run(String sessionId, ProviderRunRequest request, Consumer eventEmitter; private int iteration; - private long promptTokens; - private long completionTokens; - private long totalTokens; + // accumulatedXxx: summed across every LLM call in this run (billing). + // lastXxx: most recent LLM call only; UIs use lastPromptTokens as "current context size". + // total is forwarded from the provider verbatim (not prompt+completion) because reasoning / + // cached tokens don't show up in the split, and CompactionMiddleware needs the real total. + private long accumulatedPromptTokens; + private long accumulatedCompletionTokens; + private long lastPromptTokens; + private long lastCompletionTokens; private ApprovalMode approvalMode; public AgentRunContext(ConversationMemory memory, ApprovalMode approvalMode) { @@ -67,27 +72,34 @@ public void setIteration(int iteration) { this.iteration = iteration; } - public long getPromptTokens() { - return promptTokens; + public long getAccumulatedPromptTokens() { + return accumulatedPromptTokens; } - public long getCompletionTokens() { - return completionTokens; + public long getAccumulatedCompletionTokens() { + return accumulatedCompletionTokens; } - public long getTotalTokens() { - return totalTokens; + public long getLastPromptTokens() { + return lastPromptTokens; + } + + public long getLastCompletionTokens() { + return lastCompletionTokens; } /** * Record one LLM call's usage. Updates both the per-run counters on this context and the * session-level cumulative on the underlying {@link ConversationMemory}, so middlewares that need * a session-wide picture can read it directly from memory without keeping their own bookkeeping. + * The provider's {@code total} is forwarded as-is and may exceed {@code prompt + completion} when + * the provider counts cached or reasoning tokens separately. */ public void addTokenUsage(long prompt, long completion, long total) { - this.promptTokens += prompt; - this.completionTokens += completion; - this.totalTokens += total; + this.accumulatedPromptTokens += prompt; + this.accumulatedCompletionTokens += completion; + this.lastPromptTokens = prompt; + this.lastCompletionTokens = completion; memory.addCumulativeTokens(prompt, completion, total); } diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/LlmStreamClient.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/LlmStreamClient.java index 8d8d6494aaf..f41ba7dc0c9 100644 --- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/LlmStreamClient.java +++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/LlmStreamClient.java @@ -18,6 +18,7 @@ package org.apache.kyuubi.engine.dataagent.runtime; import com.openai.client.OpenAIClient; +import com.openai.core.JsonValue; import com.openai.core.http.StreamResponse; import com.openai.models.chat.completions.ChatCompletionAssistantMessageParam; import com.openai.models.chat.completions.ChatCompletionChunk; @@ -31,6 +32,7 @@ import java.util.List; import java.util.Map; import org.apache.kyuubi.engine.dataagent.runtime.event.ContentDelta; +import org.apache.kyuubi.engine.dataagent.runtime.event.ReasoningDelta; import org.apache.kyuubi.engine.dataagent.tool.ToolRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,10 +92,28 @@ private void consumeChunk(AgentRunContext ctx, ChatCompletionChunk chunk, Stream acc.content.append(text); ctx.emit(new ContentDelta(text)); }); + // qwen3 / DeepSeek-R1 stream chain-of-thought as a non-standard `reasoning_content` + // field on the delta. openai-java exposes it via _additionalProperties(). + String reasoning = extractReasoningDelta(c.delta()._additionalProperties()); + if (reasoning != null && !reasoning.isEmpty()) { + ctx.emit(new ReasoningDelta(reasoning)); + } c.delta().toolCalls().ifPresent(acc::mergeToolCallDeltas); } } + private static String extractReasoningDelta(Map extras) { + if (extras == null || extras.isEmpty()) return null; + JsonValue v = extras.get("reasoning_content"); + if (v == null) v = extras.get("reasoning"); + if (v == null) return null; + try { + return v.convert(String.class); + } catch (RuntimeException e) { + return null; + } + } + /** * Mutable accumulator for a single streaming LLM turn. Tool call fields are keyed by the chunk's * {@code index} because provider SDKs may deliver a single logical call across multiple chunks diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ReactAgent.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ReactAgent.java index b5138e9c661..f78c481d9fc 100644 --- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ReactAgent.java +++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ReactAgent.java @@ -194,9 +194,10 @@ private static void emitFinish(AgentRunContext ctx) { ctx.emit( new AgentFinish( ctx.getIteration(), - ctx.getPromptTokens(), - ctx.getCompletionTokens(), - ctx.getTotalTokens())); + ctx.getAccumulatedPromptTokens(), + ctx.getAccumulatedCompletionTokens(), + ctx.getLastPromptTokens(), + ctx.getLastCompletionTokens())); } /** diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentFinish.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentFinish.java index d9f962bbf0a..e3a05b13636 100644 --- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentFinish.java +++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentFinish.java @@ -17,47 +17,64 @@ package org.apache.kyuubi.engine.dataagent.runtime.event; -/** The agent has finished its analysis. */ +/** + * The agent has finished its analysis. Carries two distinct usage views: {@code accumulated*} sums + * every LLM call in this run (billing / total cost), {@code last*} reflects only the final call + * (current context size, last response size). + */ public final class AgentFinish extends AgentEvent { private final int totalSteps; - private final long promptTokens; - private final long completionTokens; - private final long totalTokens; + private final long accumulatedPromptTokens; + private final long accumulatedCompletionTokens; + private final long lastPromptTokens; + private final long lastCompletionTokens; - public AgentFinish(int totalSteps, long promptTokens, long completionTokens, long totalTokens) { + public AgentFinish( + int totalSteps, + long accumulatedPromptTokens, + long accumulatedCompletionTokens, + long lastPromptTokens, + long lastCompletionTokens) { super(EventType.AGENT_FINISH); this.totalSteps = totalSteps; - this.promptTokens = promptTokens; - this.completionTokens = completionTokens; - this.totalTokens = totalTokens; + this.accumulatedPromptTokens = accumulatedPromptTokens; + this.accumulatedCompletionTokens = accumulatedCompletionTokens; + this.lastPromptTokens = lastPromptTokens; + this.lastCompletionTokens = lastCompletionTokens; } public int totalSteps() { return totalSteps; } - public long promptTokens() { - return promptTokens; + public long accumulatedPromptTokens() { + return accumulatedPromptTokens; + } + + public long accumulatedCompletionTokens() { + return accumulatedCompletionTokens; } - public long completionTokens() { - return completionTokens; + public long lastPromptTokens() { + return lastPromptTokens; } - public long totalTokens() { - return totalTokens; + public long lastCompletionTokens() { + return lastCompletionTokens; } @Override public String toString() { return "AgentFinish{totalSteps=" + totalSteps - + ", promptTokens=" - + promptTokens - + ", completionTokens=" - + completionTokens - + ", totalTokens=" - + totalTokens + + ", accumulatedPromptTokens=" + + accumulatedPromptTokens + + ", accumulatedCompletionTokens=" + + accumulatedCompletionTokens + + ", lastPromptTokens=" + + lastPromptTokens + + ", lastCompletionTokens=" + + lastCompletionTokens + "}"; } } diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventType.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventType.java index d58e5de2ee7..96736fffc9e 100644 --- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventType.java +++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventType.java @@ -35,6 +35,9 @@ public enum EventType { /** A single token or chunk from the LLM streaming response. */ CONTENT_DELTA("content_delta"), + /** A chunk of the LLM's reasoning / chain-of-thought stream (provider-specific). */ + REASONING_DELTA("reasoning_delta"), + /** The complete LLM output for one reasoning step. */ CONTENT_COMPLETE("content_complete"), diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ReasoningDelta.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ReasoningDelta.java new file mode 100644 index 00000000000..144ac01c7e5 --- /dev/null +++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ReasoningDelta.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.dataagent.runtime.event; + +/** + * A chunk of the LLM's hidden reasoning / chain-of-thought stream. Emitted alongside {@link + * ContentDelta} when the provider exposes a {@code reasoning_content} delta field (e.g. qwen3, + * DeepSeek-R1). UI clients typically render these in a collapsible "thinking" panel. + */ +public final class ReasoningDelta extends AgentEvent { + private final String text; + + public ReasoningDelta(String text) { + super(EventType.REASONING_DELTA); + this.text = text; + } + + public String text() { + return text; + } + + @Override + public String toString() { + String preview = text != null && text.length() > 200 ? text.substring(0, 200) + "..." : text; + return "ReasoningDelta{text='" + preview + "'}"; + } +} diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/middleware/LoggingMiddleware.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/middleware/LoggingMiddleware.java index ef12e42758a..f2d4b27688a 100644 --- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/middleware/LoggingMiddleware.java +++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/middleware/LoggingMiddleware.java @@ -84,13 +84,19 @@ public void onAgentStart(AgentRunContext ctx) { @Override public void onAgentFinish(AgentRunContext ctx) { + long accPrompt = ctx.getAccumulatedPromptTokens(); + long accCompletion = ctx.getAccumulatedCompletionTokens(); LOG.info( - "{}FINISH steps={}, prompt_tokens={}, completion_tokens={}, total_tokens={}", + "{}FINISH steps={}, " + + "accumulated(prompt={}, completion={}, total={}), " + + "last(prompt={}, completion={})", prefix(), ctx.getIteration(), - ctx.getPromptTokens(), - ctx.getCompletionTokens(), - ctx.getTotalTokens()); + accPrompt, + accCompletion, + accPrompt + accCompletion, + ctx.getLastPromptTokens(), + ctx.getLastCompletionTokens()); } @Override @@ -105,16 +111,19 @@ public Decision afterLlmCall( AgentRunContext ctx, ChatCompletionAssistantMessageParam response) { String content = response.content().map(Object::toString).orElse(""); int toolCallCount = response.toolCalls().map(List::size).orElse(0); + long accPrompt = ctx.getAccumulatedPromptTokens(); + long accCompletion = ctx.getAccumulatedCompletionTokens(); LOG.info( "{}LLM response: step={}, content=\"{}\", tool_calls={}, " - + "usage(cumulative): prompt={}, completion={}, total={}", + + "accumulated(prompt={}, completion={}, total={}), last_prompt={}", prefix(), ctx.getIteration(), truncate(content), toolCallCount, - ctx.getPromptTokens(), - ctx.getCompletionTokens(), - ctx.getTotalTokens()); + accPrompt, + accCompletion, + accPrompt + accCompletion, + ctx.getLastPromptTokens()); return Decision.proceed(); } diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ExecuteStatement.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ExecuteStatement.scala index cb901462d0f..a0375544b87 100644 --- a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ExecuteStatement.scala +++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ExecuteStatement.scala @@ -25,7 +25,7 @@ import org.slf4j.MDC import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.dataagent.provider.{DataAgentProvider, ProviderRunRequest} -import org.apache.kyuubi.engine.dataagent.runtime.event.{AgentError, AgentEvent, AgentFinish, ApprovalRequest, Compaction, ContentDelta, EventType, StepEnd, StepStart, ToolCall, ToolResult} +import org.apache.kyuubi.engine.dataagent.runtime.event.{AgentError, AgentEvent, AgentFinish, ApprovalRequest, Compaction, ContentDelta, EventType, ReasoningDelta, StepEnd, StepStart, ToolCall, ToolResult} import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -111,6 +111,11 @@ class ExecuteStatement( incrementalIter.append(Array(toJson { n => n.put("type", sseType); n.put("text", delta.text()) })) + case EventType.REASONING_DELTA => + val delta = event.asInstanceOf[ReasoningDelta] + incrementalIter.append(Array(toJson { n => + n.put("type", sseType); n.put("text", delta.text()) + })) case EventType.TOOL_CALL => val toolCall = event.asInstanceOf[ToolCall] incrementalIter.append(Array(toJson { n => @@ -162,6 +167,10 @@ class ExecuteStatement( incrementalIter.append(Array(toJson { n => n.put("type", sseType) n.put("steps", finish.totalSteps()) + n.put("accumulatedPromptTokens", finish.accumulatedPromptTokens()) + n.put("accumulatedCompletionTokens", finish.accumulatedCompletionTokens()) + n.put("lastPromptTokens", finish.lastPromptTokens()) + n.put("lastCompletionTokens", finish.lastCompletionTokens()) })) case _ => // CONTENT_COMPLETE — internal to middleware pipeline } diff --git a/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/provider/mock/MockLlmProvider.java b/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/provider/mock/MockLlmProvider.java index 2756b3e2087..5a5775ef65a 100644 --- a/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/provider/mock/MockLlmProvider.java +++ b/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/provider/mock/MockLlmProvider.java @@ -133,7 +133,7 @@ private void runWithToolCall(String sql, Consumer onEvent) { } onEvent.accept(new ContentComplete(answer)); onEvent.accept(new StepEnd(2)); - onEvent.accept(new AgentFinish(2, 100, 50, 150)); + onEvent.accept(new AgentFinish(2, 100, 50, 60, 30)); } private void runWithoutToolCall(String question, Consumer onEvent) { @@ -142,7 +142,7 @@ private void runWithoutToolCall(String question, Consumer onEvent) { onEvent.accept(new ContentDelta(answer)); onEvent.accept(new ContentComplete(answer)); onEvent.accept(new StepEnd(1)); - onEvent.accept(new AgentFinish(1, 50, 20, 70)); + onEvent.accept(new AgentFinish(1, 50, 20, 50, 20)); } @Override diff --git a/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventTest.java b/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventTest.java index b6a5f093b61..10e90906230 100644 --- a/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventTest.java +++ b/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventTest.java @@ -107,6 +107,7 @@ public void testEventTypeSseNames() { assertEquals("agent_start", EventType.AGENT_START.sseEventName()); assertEquals("step_start", EventType.STEP_START.sseEventName()); assertEquals("content_delta", EventType.CONTENT_DELTA.sseEventName()); + assertEquals("reasoning_delta", EventType.REASONING_DELTA.sseEventName()); assertEquals("content_complete", EventType.CONTENT_COMPLETE.sseEventName()); assertEquals("tool_call", EventType.TOOL_CALL.sseEventName()); assertEquals("tool_result", EventType.TOOL_RESULT.sseEventName()); @@ -124,6 +125,6 @@ public void testAllEventTypesHaveUniqueSseNames() { for (EventType type : values) { assertTrue("Duplicate SSE name: " + type.sseEventName(), names.add(type.sseEventName())); } - assertEquals(11, values.length); + assertEquals(12, values.length); } } diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ApprovalRequest.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ApprovalRequest.java new file mode 100644 index 00000000000..11d081bdb0d --- /dev/null +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ApprovalRequest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.client.api.v1.dto; + +import java.util.Objects; +import org.apache.commons.lang3.builder.ReflectionToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +/** Request body for approving or denying a tool call in the Data Agent. */ +public class ApprovalRequest { + private String requestId; + private boolean approved; + + public ApprovalRequest() {} + + public ApprovalRequest(String requestId, boolean approved) { + this.requestId = requestId; + this.approved = approved; + } + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public boolean isApproved() { + return approved; + } + + public void setApproved(boolean approved) { + this.approved = approved; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ApprovalRequest that = (ApprovalRequest) o; + return approved == that.approved && Objects.equals(requestId, that.requestId); + } + + @Override + public int hashCode() { + return Objects.hash(requestId, approved); + } + + @Override + public String toString() { + return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE); + } +} diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ChatRequest.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ChatRequest.java new file mode 100644 index 00000000000..b3b2722b260 --- /dev/null +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ChatRequest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.client.api.v1.dto; + +import java.util.Objects; +import org.apache.commons.lang3.builder.ReflectionToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +public class ChatRequest { + private String text; + private String model; + private String approvalMode; + + public ChatRequest() {} + + public ChatRequest(String text) { + this.text = text; + } + + public ChatRequest(String text, String model) { + this.text = text; + this.model = model; + } + + public String getText() { + return text; + } + + public void setText(String text) { + this.text = text; + } + + public String getModel() { + return model; + } + + public void setModel(String model) { + this.model = model; + } + + public String getApprovalMode() { + return approvalMode; + } + + public void setApprovalMode(String approvalMode) { + this.approvalMode = approvalMode; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ChatRequest that = (ChatRequest) o; + return Objects.equals(getText(), that.getText()) + && Objects.equals(getModel(), that.getModel()) + && Objects.equals(getApprovalMode(), that.getApprovalMode()); + } + + @Override + public int hashCode() { + return Objects.hash(getText(), getModel(), getApprovalMode()); + } + + @Override + public String toString() { + return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE); + } +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index ab9e19cdb97..c351715b13e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -41,7 +41,7 @@ import org.apache.kyuubi.engine.hive.HiveProcessBuilder import org.apache.kyuubi.engine.jdbc.JdbcProcessBuilder import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.engine.trino.TrinoProcessBuilder -import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_ENGINE_REF_ID, HA_NAMESPACE} +import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_CLIENT_CLASS, HA_ENGINE_REF_ID, HA_NAMESPACE} import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider, DiscoveryPaths, ServiceNodeInfo} import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL} import org.apache.kyuubi.metrics.MetricsSystem @@ -264,12 +264,23 @@ private[kyuubi] class EngineRef( case DATA_AGENT => if (conf.get(ENGINE_DATA_AGENT_JDBC_URL).isEmpty) { val haAddresses = conf.get(HA_ADDRESSES) - if (haAddresses.nonEmpty) { - val jdbcUrl = s"jdbc:hive2://$haAddresses/default;" + + val isZkHa = haAddresses.nonEmpty && + conf.get(HA_CLIENT_CLASS).endsWith("ZookeeperDiscoveryClient") + val jdbcUrl = if (isZkHa) { + s"jdbc:hive2://$haAddresses/default;" + s"serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=$serverSpace" - conf.set(ENGINE_DATA_AGENT_JDBC_URL.key, jdbcUrl) - info(s"Data Agent JDBC URL not configured, using Kyuubi server via ZK: $jdbcUrl") + } else { + val port = conf.get(FRONTEND_THRIFT_BINARY_BIND_PORT) + if (port == 0) { + throw KyuubiSQLException( + s"Cannot derive a default Data Agent JDBC URL: " + + s"${FRONTEND_THRIFT_BINARY_BIND_PORT.key} is 0 (random). " + + s"Set ${ENGINE_DATA_AGENT_JDBC_URL.key} explicitly.") + } + s"jdbc:hive2://localhost:$port/default" } + conf.set(ENGINE_DATA_AGENT_JDBC_URL.key, jdbcUrl) + info(s"Data Agent JDBC URL not configured, using Kyuubi server: $jdbcUrl") } new DataAgentProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilder.scala index c53b749a8e5..998fc3ba8af 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilder.scala @@ -102,7 +102,7 @@ class DataAgentProcessBuilder( } else { redactConfValues( Utils.redactCommandLineArgs(conf, commands), - Set(ENGINE_DATA_AGENT_OPENAI_API_KEY.key)).map { + Set(ENGINE_DATA_AGENT_OPENAI_API_KEY.key, ENGINE_DATA_AGENT_JDBC_URL.key)).map { case arg if arg.startsWith("-") || arg == mainClass => s"\\\n\t$arg" case arg => arg }.mkString(" ") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala index 8f9be59a54a..e71ac256c6b 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala @@ -35,7 +35,7 @@ import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_BATCH_PENDING_MAX_ELAPSE import org.apache.kyuubi.operation.OperationState -import org.apache.kyuubi.server.api.v1.ApiRootResource +import org.apache.kyuubi.server.api.v1.{ApiRootResource, DataAgentResource} import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory} import org.apache.kyuubi.server.ui.{JettyServer, JettyUtils} import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service, ServiceUtils} @@ -316,6 +316,7 @@ class KyuubiRestFrontendService(override val serverable: Serverable) override def stop(): Unit = synchronized { ThreadUtils.shutdown(batchChecker) + DataAgentResource.shutdown() if (isStarted.getAndSet(false)) { server.stop() } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala index d518e67cae3..bf20b6ac919 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala @@ -62,6 +62,9 @@ private[v1] class ApiRootResource extends ApiRequestContext { @Path("admin") def admin: Class[AdminResource] = classOf[AdminResource] + @Path("data-agent") + def dataAgent: Class[DataAgentResource] = classOf[DataAgentResource] + @GET @Path("exception") @Produces(Array(MediaType.TEXT_PLAIN)) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/DataAgentResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/DataAgentResource.scala new file mode 100644 index 00000000000..60aea983c00 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/DataAgentResource.scala @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.server.api.v1 + +import java.io.{IOException, OutputStreamWriter} +import java.nio.charset.StandardCharsets +import java.util.concurrent.{CompletableFuture, ExecutionException, ExecutorService, RejectedExecutionException, TimeoutException, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean +import java.util.regex.Pattern +import javax.servlet.http.HttpServletResponse +import javax.ws.rs._ +import javax.ws.rs.core.{Context, MediaType} + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import com.fasterxml.jackson.databind.ObjectMapper +import io.swagger.v3.oas.annotations.media.Content +import io.swagger.v3.oas.annotations.responses.ApiResponse +import io.swagger.v3.oas.annotations.tags.Tag + +import org.apache.kyuubi.{KyuubiSQLException, Logging} +import org.apache.kyuubi.client.KyuubiSyncThriftClient +import org.apache.kyuubi.client.api.v1.dto.{ApprovalRequest, ChatRequest} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.operation.FetchOrientation +import org.apache.kyuubi.server.api.ApiRequestContext +import org.apache.kyuubi.session.{KyuubiSessionImpl, SessionHandle} +import org.apache.kyuubi.shaded.hive.service.rpc.thrift._ +import org.apache.kyuubi.util.ThreadUtils + +@Tag(name = "DataAgent") +@Consumes(Array(MediaType.APPLICATION_JSON)) +private[v1] class DataAgentResource extends ApiRequestContext with Logging { + + import DataAgentResource._ + + private def verifySessionOwnership(session: KyuubiSessionImpl): Unit = { + val userName = fe.getSessionUser(Map.empty[String, String]) + if (!fe.isAdministrator(userName) && session.user != userName) { + throw new ForbiddenException( + s"$userName is not allowed to access session ${session.handle}") + } + } + + // Keep auth failures as 4xx responses before any SSE bytes are sent. + private def resolveAndAuthorize(sessionHandleStr: String): KyuubiSessionImpl = { + val sessionHandle = + try { + SessionHandle.fromUUID(sessionHandleStr) + } catch { + case _: IllegalArgumentException => + throw new WebApplicationException("invalid sessionHandle", 400) + } + val session = + try { + fe.be.sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl] + } catch { + case _: KyuubiSQLException => + throw new WebApplicationException("session not found", 404) + } + verifySessionOwnership(session) + session + } + + @ApiResponse( + responseCode = "200", + content = Array(new Content(mediaType = "text/event-stream")), + description = "Send a message to the data agent and receive streaming SSE response") + @POST + @Path("{sessionHandle}/chat") + def chat( + @PathParam("sessionHandle") sessionHandleStr: String, + request: ChatRequest, + @Context response: HttpServletResponse): Unit = { + if (request == null) { + sendPreflightSseError(response, "request body is required") + return + } + val text = request.getText + if (text == null || text.trim.isEmpty) { + sendPreflightSseError(response, "text is required") + return + } + val model = Option(request.getModel).map(_.trim).filter(_.nonEmpty) + if (model.exists(m => m.length > MAX_MODEL_LENGTH || !MODEL_PATTERN.matcher(m).matches())) { + sendPreflightSseError(response, "invalid model name") + return + } + val approvalMode = Option(request.getApprovalMode).map(_.trim.toUpperCase).filter(_.nonEmpty) + if (approvalMode.exists(m => !VALID_APPROVAL_MODES.contains(m))) { + sendPreflightSseError( + response, + s"invalid approvalMode, must be one of: ${VALID_APPROVAL_MODES.mkString(", ")}") + return + } + var confOverlay = model + .map(m => Map(KyuubiConf.ENGINE_DATA_AGENT_MODEL.key -> m)) + .getOrElse(Map.empty[String, String]) + approvalMode.foreach { mode => + confOverlay = confOverlay + (KyuubiConf.ENGINE_DATA_AGENT_APPROVAL_MODE.key -> mode) + } + + // Validate and authorize before engine startup; launching can be expensive. + val session = resolveAndAuthorize(sessionHandleStr) + val operationTimeoutMs = fe.getConf.get(KyuubiConf.FRONTEND_DATA_AGENT_OPERATION_TIMEOUT) + val client: KyuubiSyncThriftClient = + try { + val launchOp = session.launchEngineOp + try { + launchOp.getBackgroundHandle.get(operationTimeoutMs, TimeUnit.MILLISECONDS) + } catch { + case _: TimeoutException => + sendPreflightSseError(response, "Engine did not start within timeout") + return + case e: ExecutionException => + val errMsg = Option(e.getCause).map(_.getMessage).getOrElse("Engine launch failed") + sendPreflightSseError(response, errMsg) + return + } + val c = session.client + if (c == null) { + sendPreflightSseError(response, "Engine session is not ready after waiting") + return + } + c + } catch { + case NonFatal(e) => + error(s"Error processing chat for session $sessionHandleStr", e) + if (!response.isCommitted) sendPreflightSseError(response, e.getMessage) + return + } + + val stream = new SseStream(response) + val deadlineAt = System.currentTimeMillis() + STREAM_MAX_DURATION_MS + stream.open() + stream.keepalive() + // Avoid logging raw user text; it may contain PII or control characters. + info(s"Data Agent chat: session=$sessionHandleStr, textLen=${text.length}," + + s" textHash=${Integer.toHexString(text.hashCode)}") + + // executeStatement may block before returning an op handle. Do not cancel the future on + // timeout; late handles are closed by whenComplete so engine operations do not leak. + val timedOut = new AtomicBoolean(false) + val closed = new AtomicBoolean(false) + val opSubmitFuture: CompletableFuture[TOperationHandle] = + try { + val f = CompletableFuture.supplyAsync( + () => client.executeStatement(text, confOverlay, true, 0L), + opSubmitter) + f.whenComplete((handle, _) => { + if (handle != null && timedOut.get() && closed.compareAndSet(false, true)) { + info(s"Closing orphaned op for session $sessionHandleStr (servlet already timed out)") + closeOperation(client, handle) + } + }) + f + } catch { + case _: RejectedExecutionException => + warn(s"Op-submit pool rejected chat for session $sessionHandleStr (queue full)") + stream.event("error", buildJsonMessage("Server is busy, please retry")) + stream.event("done", "{}") + return + } + + var opHandle: TOperationHandle = null + try { + try { + opHandle = opSubmitFuture.get(operationTimeoutMs, TimeUnit.MILLISECONDS) + } catch { + case _: TimeoutException => + timedOut.set(true) + // The worker may have completed just before timedOut was set. + if (opSubmitFuture.isDone) { + try { + val orphan = opSubmitFuture.getNow(null) + if (orphan != null && closed.compareAndSet(false, true)) { + info(s"Closing orphaned op for session $sessionHandleStr (race recovery)") + closeOperation(client, orphan) + } + } catch { + case NonFatal(_) => // future completed exceptionally; nothing to close + } + } + stream.event("error", buildJsonMessage("Operation submit timed out")) + stream.event("done", "{}") + return + case e: ExecutionException => + val cause = Option(e.getCause).getOrElse(e) + throw cause + } + streamResults(client, opHandle, stream, deadlineAt) + stream.event("done", "{}") + } catch { + case _: IOException => + info(s"Client disconnected during SSE stream for session $sessionHandleStr") + if (opHandle != null) cancelOperation(client, opHandle) + case NonFatal(e) => + warn(s"Error during SSE streaming for session $sessionHandleStr", e) + try { + stream.event("error", buildJsonMessage(e.getMessage)) + stream.event("done", "{}") + } catch { + case _: IOException => // client already gone + } + } finally { + if (opHandle != null) closeOperation(client, opHandle) + } + } + + @ApiResponse( + responseCode = "200", + content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)), + description = "Approve or deny a pending tool call") + @POST + @Path("{sessionHandle}/approve") + @Produces(Array(MediaType.APPLICATION_JSON)) + def approve( + @PathParam("sessionHandle") sessionHandleStr: String, + request: ApprovalRequest): String = { + if (request == null) { + throw new WebApplicationException("request body is required", 400) + } + val requestId = request.getRequestId + if (requestId == null || requestId.trim.isEmpty) { + throw new WebApplicationException("requestId is required", 400) + } + // The id is embedded in `__approve:` / `__deny:`. + if (requestId.length > MAX_REQUEST_ID_LENGTH || + !REQUEST_ID_PATTERN.matcher(requestId).matches()) { + throw new WebApplicationException("invalid requestId", 400) + } + val session = resolveAndAuthorize(sessionHandleStr) + val client = session.client + if (client == null) { + throw new WebApplicationException("Engine session is not ready", 503) + } + + val statement = if (request.isApproved) { + s"__approve:$requestId" + } else { + s"__deny:$requestId" + } + + val opHandle = client.executeStatement( + statement, + Map.empty[String, String], + false, + 60000L) + try { + val rowSet = client.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1, false) + val rows = extractStringRows(rowSet) + rows.headOption.getOrElse { + val node = JSON_MAPPER.createObjectNode() + node.put("status", "error") + node.put("requestId", requestId) + node.put("message", "No result returned") + JSON_MAPPER.writeValueAsString(node) + } + } finally { + closeOperation(client, opHandle) + } + } + + private def streamResults( + client: KyuubiSyncThriftClient, + opHandle: TOperationHandle, + stream: SseStream, + deadlineAt: Long): Unit = { + waitForRunning(client, opHandle, stream, deadlineAt) + + var backoffMs = MIN_BACKOFF_MS + var done = false + while (!done) { + if (System.currentTimeMillis() > deadlineAt) { + cancelOperation(client, opHandle) + stream.event("error", buildJsonMessage("stream exceeded maximum duration")) + done = true + } else { + // Fetch can race terminal-state transitions; status decides retry vs. finish. + val rows = + try { + fetchAndEmit(client, opHandle, stream) + } catch { + case _: KyuubiSQLException => 0 + case _: IllegalStateException => 0 // in-JVM engine path + } + if (rows > 0) { + backoffMs = MIN_BACKOFF_MS + } else { + val status = client.getOperationStatus(opHandle) + val opState = status.getOperationState + if (isTerminalState(opState)) { + if (opState == TOperationState.FINISHED_STATE) { + // FINISHED may still have buffered rows; keep deadline as the cap. + while (System.currentTimeMillis() < deadlineAt && + fetchAndEmit(client, opHandle, stream) > 0) {} + } + opState match { + case TOperationState.FINISHED_STATE => // success -- no error frame + case TOperationState.ERROR_STATE => + val errMsg = Option(status.getErrorMessage).getOrElse("Unknown error") + stream.event("error", buildJsonMessage(errMsg)) + case TOperationState.CANCELED_STATE => + stream.event("error", buildJsonMessage("operation was canceled")) + case TOperationState.TIMEDOUT_STATE => + stream.event("error", buildJsonMessage("operation timed out")) + case TOperationState.CLOSED_STATE => + stream.event("error", buildJsonMessage("operation closed before completion")) + case _ => // any other state isTerminalState() may grow to cover + } + done = true + } else { + if (stream.idleMs >= KEEPALIVE_INTERVAL_MS) stream.keepalive() + Thread.sleep(backoffMs) + backoffMs = math.min(backoffMs * 2, MAX_BACKOFF_MS) + } + } + } + } + } + + private def waitForRunning( + client: KyuubiSyncThriftClient, + opHandle: TOperationHandle, + stream: SseStream, + deadlineAt: Long): Unit = { + var sleepMs = 50L + var ready = false + while (!ready) { + if (System.currentTimeMillis() > deadlineAt) { + throw new IllegalStateException("Operation did not start within timeout") + } + val state = client.getOperationStatus(opHandle).getOperationState + state match { + case TOperationState.INITIALIZED_STATE | TOperationState.PENDING_STATE => + try { + Thread.sleep(sleepMs) + } catch { + case _: InterruptedException => + Thread.currentThread().interrupt() + throw new IllegalStateException("Interrupted while waiting for operation to start") + } + sleepMs = math.min(sleepMs * 2, 1000L) + // Cold starts can be quiet longer than the browser watchdog. + if (stream.idleMs >= KEEPALIVE_INTERVAL_MS) stream.keepalive() + case _ => + ready = true + } + } + } + + private def fetchAndEmit( + client: KyuubiSyncThriftClient, + opHandle: TOperationHandle, + stream: SseStream): Int = { + val rowSet = client.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10, false) + val rows = extractStringRows(rowSet) + for (row <- rows) { + stream.event(extractJsonType(row), row) + } + rows.size + } + + private def extractJsonType(json: String): String = { + try { + val node = JSON_MAPPER.readTree(json) + Option(node.get("type")).map(_.asText()).getOrElse("message") + } catch { + case NonFatal(_) => "message" + } + } + + private def extractStringRows(rowSet: TRowSet): Seq[String] = { + if (rowSet == null) return Seq.empty + // HS2 may encode the single string column as column-based or row-based data. + val columns = rowSet.getColumns + if (columns != null && !columns.isEmpty) { + val stringCol = columns.get(0).getStringVal + if (stringCol != null) { + return stringCol.getValues.asScala.toSeq + } + } + val rows = rowSet.getRows + if (rows != null && !rows.isEmpty) { + return rows.asScala.map { row => + val colVals = row.getColVals + if (colVals != null && !colVals.isEmpty) { + colVals.get(0).getStringVal.getValue + } else "" + }.toSeq + } + Seq.empty + } + + private def isTerminalState(state: TOperationState): Boolean = { + state == TOperationState.FINISHED_STATE || + state == TOperationState.CANCELED_STATE || + state == TOperationState.CLOSED_STATE || + state == TOperationState.ERROR_STATE || + state == TOperationState.TIMEDOUT_STATE + } + + private def buildJsonMessage(message: String): String = { + val node = JSON_MAPPER.createObjectNode() + node.put("message", if (message == null) "" else message) + JSON_MAPPER.writeValueAsString(node) + } + + private def sendPreflightSseError(response: HttpServletResponse, message: String): Unit = { + try { + val s = new SseStream(response) + s.open() + s.event("error", buildJsonMessage(message)) + s.event("done", "{}") + } catch { + case _: IOException => // client already gone + } + } + + private def cancelOperation( + client: KyuubiSyncThriftClient, + opHandle: TOperationHandle): Unit = { + try { + client.cancelOperation(opHandle) + } catch { + case NonFatal(e) => + debug(s"Failed to cancel operation on client disconnect", e) + } + } + + private def closeOperation( + client: KyuubiSyncThriftClient, + opHandle: TOperationHandle): Unit = { + try { + client.closeOperation(opHandle) + } catch { + case NonFatal(e) => + debug(s"Failed to close operation", e) + } + } +} + +private[server] object DataAgentResource { + // Bounded pool for blocking executeStatement submissions; rebuildable after service restart. + @volatile private var opSubmitExecutor: ExecutorService = newOpSubmitExecutor() + + private def newOpSubmitExecutor(): ExecutorService = + ThreadUtils.newDaemonQueuedThreadPool( + poolSize = 8, + poolQueueSize = 64, + keepAliveMs = 60000L, + threadPoolName = "data-agent-op-submit") + + private def opSubmitter: ExecutorService = { + val current = opSubmitExecutor + if (current != null && !current.isShutdown) current + else synchronized { + if (opSubmitExecutor == null || opSubmitExecutor.isShutdown) { + opSubmitExecutor = newOpSubmitExecutor() + } + opSubmitExecutor + } + } + + def shutdown(): Unit = synchronized { + val current = opSubmitExecutor + if (current != null) { + ThreadUtils.shutdown(current) + opSubmitExecutor = null + } + } + + private val JSON_MAPPER: ObjectMapper = new ObjectMapper() + + private val MAX_MODEL_LENGTH = 128 + private val MODEL_PATTERN: Pattern = Pattern.compile("^[a-zA-Z0-9._/:@-]+$") + private val VALID_APPROVAL_MODES: Set[String] = Set("AUTO_APPROVE", "NORMAL", "STRICT") + + private val MAX_REQUEST_ID_LENGTH = 256 + private val REQUEST_ID_PATTERN: Pattern = Pattern.compile("^[A-Za-z0-9._:@-]+$") + + private val MIN_BACKOFF_MS = 50L + private val MAX_BACKOFF_MS = 500L + private val KEEPALIVE_INTERVAL_MS = 15000L + private val STREAM_MAX_DURATION_MS = 10L * 60L * 1000L + + final private[v1] class SseStream(response: HttpServletResponse) { + private val out = response.getOutputStream + private val writer = new OutputStreamWriter(out, StandardCharsets.UTF_8) + private var lastWriteMs = System.currentTimeMillis() + + def open(): Unit = { + response.setBufferSize(0) // disable Jetty output buffering for streaming + response.setStatus(HttpServletResponse.SC_OK) + response.setContentType("text/event-stream") + response.setCharacterEncoding("UTF-8") + response.setHeader("Cache-Control", "no-cache") + response.setHeader("Connection", "keep-alive") + response.setHeader("X-Accel-Buffering", "no") + response.flushBuffer() + } + + def event(name: String, data: String): Unit = { + writer.write(s"event: $name\n") + // SSE requires one data field per physical payload line. + for (line <- data.split("\n", -1)) { + writer.write(s"data: $line\n") + } + writer.write("\n") + writer.flush() + lastWriteMs = System.currentTimeMillis() + } + + def keepalive(): Unit = { + // Named events reach fetch-event-source; comment frames do not. + writer.write("event: ping\ndata: {}\n\n") + writer.flush() + lastWriteMs = System.currentTimeMillis() + } + + def idleMs: Long = System.currentTimeMillis() - lastWriteMs + } +} diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilderSuite.scala index 32995b7eb32..087de38807d 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilderSuite.scala @@ -43,6 +43,18 @@ class DataAgentProcessBuilderSuite extends KyuubiFunSuite { s"toString should contain a redaction marker, got: $output") } + test("JDBC URL with credentials is redacted in toString") { + val conf = new KyuubiConf(false) + conf.set( + ENGINE_DATA_AGENT_JDBC_URL.key, + "jdbc:hive2://localhost:10009/default;user=alice;password=Alice2026!") + val builder = new DataAgentProcessBuilder("testUser", doAsEnabled = false, conf) + val output = builder.toString + assert( + !output.contains("Alice2026!"), + s"JDBC password should not appear in toString output: $output") + } + test("memory flag uses configured value") { val conf = new KyuubiConf(false) conf.set(ENGINE_DATA_AGENT_MEMORY.key, "2g") diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/DataAgentResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/DataAgentResourceSuite.scala new file mode 100644 index 00000000000..740fd0d9779 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/DataAgentResourceSuite.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.server.api.v1 + +import java.io.{ByteArrayOutputStream, OutputStream} +import java.util.UUID +import javax.servlet.{ServletOutputStream, WriteListener} +import javax.servlet.http.HttpServletResponse +import javax.ws.rs.client.Entity +import javax.ws.rs.core.MediaType + +import org.mockito.Mockito.{mock, verify, when} + +import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper} +import org.apache.kyuubi.client.api.v1.dto.{ApprovalRequest, ChatRequest} + +class DataAgentResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { + + // -- chat preflight --------------------------------------------------------- + + test("chat returns 400 for malformed sessionHandle") { + val response = webTarget.path("api/v1/data-agent/not-a-uuid/chat") + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(new ChatRequest("hi"), MediaType.APPLICATION_JSON_TYPE)) + assert(response.getStatus === 400) + } + + test("chat returns 404 for unknown sessionHandle") { + val unknown = UUID.randomUUID().toString + val response = webTarget.path(s"api/v1/data-agent/$unknown/chat") + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(new ChatRequest("hi"), MediaType.APPLICATION_JSON_TYPE)) + assert(response.getStatus === 404) + } + + // -- approve preflight ------------------------------------------------------ + + test("approve returns 400 for null body") { + val unknown = UUID.randomUUID().toString + val response = webTarget.path(s"api/v1/data-agent/$unknown/approve") + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.json("")) + assert(response.getStatus === 400) + } + + test("approve returns 400 for empty/whitespace requestId") { + val unknown = UUID.randomUUID().toString + Seq("", " ").foreach { id => + val req = new ApprovalRequest() + req.setRequestId(id) + req.setApproved(true) + val response = webTarget.path(s"api/v1/data-agent/$unknown/approve") + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE)) + assert(response.getStatus === 400, s"expected 400 for requestId=[$id]") + } + } + + test("approve returns 400 for requestId with disallowed chars") { + // The id is concatenated into `__approve:` so anything that could break command + // parsing or arrive with newlines must be rejected. + val bad = Seq( + "abc\nxyz", // newline -- log/command injection + "abc xyz", // space + "abc/xyz", // slash + "abc;xyz", // semicolon + "abc'xyz", // quote + " + + diff --git a/kyuubi-server/web-ui/src/views/data-agent/blocks/MarkdownContent.vue b/kyuubi-server/web-ui/src/views/data-agent/blocks/MarkdownContent.vue new file mode 100644 index 00000000000..ea6a5b3228f --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/blocks/MarkdownContent.vue @@ -0,0 +1,43 @@ + + + + + + diff --git a/kyuubi-server/web-ui/src/views/data-agent/blocks/ReasoningBlock.vue b/kyuubi-server/web-ui/src/views/data-agent/blocks/ReasoningBlock.vue new file mode 100644 index 00000000000..abcd901a7ff --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/blocks/ReasoningBlock.vue @@ -0,0 +1,99 @@ + + + + + + + diff --git a/kyuubi-server/web-ui/src/views/data-agent/blocks/TextBlock.vue b/kyuubi-server/web-ui/src/views/data-agent/blocks/TextBlock.vue new file mode 100644 index 00000000000..3a163201753 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/blocks/TextBlock.vue @@ -0,0 +1,108 @@ + + + + + + + diff --git a/kyuubi-server/web-ui/src/views/data-agent/blocks/ToolBlock.vue b/kyuubi-server/web-ui/src/views/data-agent/blocks/ToolBlock.vue new file mode 100644 index 00000000000..2ff7918610b --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/blocks/ToolBlock.vue @@ -0,0 +1,401 @@ + + + + + + + diff --git a/kyuubi-server/web-ui/src/views/data-agent/blocks/UsageFooter.vue b/kyuubi-server/web-ui/src/views/data-agent/blocks/UsageFooter.vue new file mode 100644 index 00000000000..a69c2c282a7 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/blocks/UsageFooter.vue @@ -0,0 +1,103 @@ + + + + + + + + diff --git a/kyuubi-server/web-ui/src/views/data-agent/components/ChatMessage.vue b/kyuubi-server/web-ui/src/views/data-agent/components/ChatMessage.vue new file mode 100644 index 00000000000..c0031174873 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/components/ChatMessage.vue @@ -0,0 +1,301 @@ + + + + + + + diff --git a/kyuubi-server/web-ui/src/views/data-agent/components/ChatPane.vue b/kyuubi-server/web-ui/src/views/data-agent/components/ChatPane.vue new file mode 100644 index 00000000000..7ef81fd120b --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/components/ChatPane.vue @@ -0,0 +1,629 @@ + + + + + + + diff --git a/kyuubi-server/web-ui/src/views/data-agent/components/InputBar.vue b/kyuubi-server/web-ui/src/views/data-agent/components/InputBar.vue new file mode 100644 index 00000000000..ef25c22d2c8 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/components/InputBar.vue @@ -0,0 +1,165 @@ + + + + + + + diff --git a/kyuubi-server/web-ui/src/views/data-agent/components/SessionsRail.vue b/kyuubi-server/web-ui/src/views/data-agent/components/SessionsRail.vue new file mode 100644 index 00000000000..a5501db0d79 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/components/SessionsRail.vue @@ -0,0 +1,346 @@ + + + + + + + diff --git a/kyuubi-server/web-ui/src/views/data-agent/composables/useChatScroll.ts b/kyuubi-server/web-ui/src/views/data-agent/composables/useChatScroll.ts new file mode 100644 index 00000000000..b41bcb49989 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/composables/useChatScroll.ts @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ref, onBeforeUnmount } from 'vue' + +export function useChatScroll() { + const messagesContainer = ref() + let scrollRafId: number | null = null + let stickToBottom = true + let programmaticScroll = false + + function onMessagesScroll() { + if (programmaticScroll) return + const el = messagesContainer.value + if (!el) return + const distanceFromBottom = el.scrollHeight - el.scrollTop - el.clientHeight + stickToBottom = distanceFromBottom < 40 + } + + function scrollToBottom(force = false) { + if (!force && !stickToBottom) return + if (scrollRafId) { + if (!force) return + cancelAnimationFrame(scrollRafId) + } + scrollRafId = requestAnimationFrame(() => { + scrollRafId = null + const el = messagesContainer.value + if (!el) return + programmaticScroll = true + el.scrollTop = el.scrollHeight + stickToBottom = true + requestAnimationFrame(() => { + programmaticScroll = false + }) + }) + } + + function anchorToBottom() { + stickToBottom = true + scrollToBottom(true) + } + + onBeforeUnmount(() => { + if (scrollRafId) cancelAnimationFrame(scrollRafId) + }) + + return { messagesContainer, scrollToBottom, onMessagesScroll, anchorToBottom } +} diff --git a/kyuubi-server/web-ui/src/views/data-agent/composables/useChatStream.ts b/kyuubi-server/web-ui/src/views/data-agent/composables/useChatStream.ts new file mode 100644 index 00000000000..f7f260ec179 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/composables/useChatStream.ts @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { onBeforeUnmount } from 'vue' +import { useI18n } from 'vue-i18n' +import { chatStream, approveToolCall } from '@/api/data-agent' +import { type DataAgentMessage, useDataAgentStore } from '@/pinia/data-agent' + +// In-flight network state is kept outside Pinia persistence. +interface StreamCtx { + abortController: AbortController + watchdogTimer: ReturnType | null +} + +const STREAM_SILENCE_TIMEOUT_MS = 30000 + +export function useStreamRegistry() { + const streamContexts = new Map() + + function cancelStream(id: string) { + const ctx = streamContexts.get(id) + if (!ctx) return + if (ctx.watchdogTimer) clearTimeout(ctx.watchdogTimer) + ctx.abortController.abort() + streamContexts.delete(id) + } + + return { streamContexts, cancelStream } +} + +export type StreamRegistry = ReturnType + +export function useChatStream(opts: { + store: ReturnType + registry: StreamRegistry + ensureSession: (id: string) => Promise + scrollToBottom: () => void + anchorToBottom: () => void +}) { + const { store, registry, ensureSession, scrollToBottom, anchorToBottom } = + opts + const { t } = useI18n() + let isUnmounted = false + + async function sendMessage(text: string) { + const id = store.activeSessionId + if (!id) return + const s = store.sessions[id] + if (!s || s.streaming || s.initializing) return + if (!(await ensureSession(id))) return + const sNow = store.sessions[id] + if (!sNow) return + + store.appendMessage(id, { + id: store.nextMsgId(id), + role: 'user', + text + }) + store.appendMessage(id, { + id: store.nextMsgId(id), + role: 'assistant', + blocks: [] + }) + store.setTitleIfEmpty(id, text) + const assistantMsg = sNow.messages[store.sessions[id]!.messages.length - 1] + + if (id === store.activeSessionId) { + anchorToBottom() + } + await startStreaming(id, assistantMsg, text) + } + + async function startStreaming( + sessionId: string, + assistantMsg: DataAgentMessage, + text: string + ) { + const s = store.sessions[sessionId] + if (!s) return + store.patchSession(sessionId, { + streaming: true, + errorMessage: '', + errorCanReset: false + }) + const ctx: StreamCtx = { + abortController: new AbortController(), + watchdogTimer: null + } + registry.streamContexts.set(sessionId, ctx) + let gotDone = false + let watchdogFired = false + + const armWatchdog = () => { + if (ctx.watchdogTimer) clearTimeout(ctx.watchdogTimer) + ctx.watchdogTimer = setTimeout(() => { + watchdogFired = true + store.patchSession(sessionId, { + errorMessage: t('data_agent.engine_unresponsive'), + errorCanReset: true + }) + ctx.abortController.abort() + }, STREAM_SILENCE_TIMEOUT_MS) + } + + armWatchdog() + + try { + await chatStream( + s.sessionHandle, + text, + (event) => { + armWatchdog() + if (event.event === 'done') gotDone = true + handleSseEvent(sessionId, assistantMsg, event) + }, + ctx.abortController.signal, + s.approvalMode, + s.model.trim() || undefined + ) + if ( + !gotDone && + !watchdogFired && + !ctx.abortController.signal.aborted && + !store.sessions[sessionId]?.errorMessage + ) { + // eslint-disable-next-line no-console + console.warn('SSE closed without `done` event') + store.patchSession(sessionId, { + errorMessage: t('data_agent.stream_incomplete'), + errorCanReset: true + }) + } + } catch (e: any) { + if (!watchdogFired && e.name !== 'AbortError') { + store.patchSession(sessionId, { + errorMessage: e.message || t('data_agent.stream_error') + }) + } + } finally { + if (ctx.watchdogTimer) clearTimeout(ctx.watchdogTimer) + registry.streamContexts.delete(sessionId) + if (!isUnmounted) { + store.patchSession(sessionId, { streaming: false }) + } + } + } + + function handleSseEvent( + sessionId: string, + msg: DataAgentMessage, + event: { event: string; data: string } + ) { + if (isUnmounted) return + if (event.event === 'ping') return + const blocks = msg.blocks! + let parsed: any + try { + parsed = JSON.parse(event.data) + } catch { + // eslint-disable-next-line no-console + console.warn('Invalid SSE event data:', event.data) + store.patchSession(sessionId, { + errorMessage: t('data_agent.malformed_response'), + errorCanReset: true + }) + // Stop before a later `done` event can make the run look successful. + registry.streamContexts.get(sessionId)?.abortController.abort() + return + } + + switch (event.event) { + case 'content_delta': { + const text = parsed.text || '' + if (!text) break + const last = blocks[blocks.length - 1] + if (last && last.type === 'text') { + last.text = (last.text || '') + text + } else { + blocks.push({ type: 'text', text }) + } + break + } + case 'reasoning_delta': { + const text = parsed.text || '' + if (!text) break + const last = blocks[blocks.length - 1] + if (last && last.type === 'reasoning') { + last.text = (last.text || '') + text + } else { + blocks.push({ type: 'reasoning', text, expanded: true }) + } + break + } + case 'tool_call': { + const toolCallId = parsed.id + if (!toolCallId || !parsed.name) break + const hasApproval = blocks.some( + (b) => b.type === 'approval_request' && b.toolCallId === toolCallId + ) + if (!hasApproval) { + blocks.push({ + type: 'tool_call', + toolCallId, + name: parsed.name, + args: parsed.args, + expanded: false + }) + } + break + } + case 'tool_result': + for (let i = blocks.length - 1; i >= 0; i--) { + const b = blocks[i] + if ( + (b.type === 'tool_call' || b.type === 'approval_request') && + b.toolCallId === parsed.id + ) { + b.result = parsed.output + b.isError = !!parsed.isError + break + } + } + break + case 'approval_request': + if (!parsed.requestId || !parsed.id || !parsed.name) break + blocks.push({ + type: 'approval_request', + toolCallId: parsed.id, + name: parsed.name, + args: parsed.args, + requestId: parsed.requestId, + riskLevel: parsed.riskLevel, + approvalStatus: 'pending' + }) + break + case 'agent_finish': + msg.usage = { + accumulatedPrompt: Number(parsed.accumulatedPromptTokens) || 0, + accumulatedCompletion: + Number(parsed.accumulatedCompletionTokens) || 0, + lastPrompt: Number(parsed.lastPromptTokens) || 0, + lastCompletion: Number(parsed.lastCompletionTokens) || 0, + steps: Number(parsed.steps) || undefined + } + for (const b of blocks) { + if (b.type === 'reasoning') b.expanded = false + } + break + case 'error': + store.patchSession(sessionId, { + errorMessage: parsed.message || t('data_agent.unknown_error') + }) + break + case 'done': + break + } + if (sessionId === store.activeSessionId) scrollToBottom() + } + + async function handleApproval(requestId: string, approved: boolean) { + const id = store.activeSessionId + if (!id) return + const s = store.sessions[id] + if (!s || !s.sessionHandle || s.approvingRequestId) return + store.patchSession(id, { approvingRequestId: requestId }) + + const setApprovalStatus = ( + status: 'pending' | 'approved' | 'denied', + onlyIfPending = false + ): boolean => { + let changed = false + for (const m of s.messages) { + if (!m.blocks) continue + for (const block of m.blocks) { + if ( + block.type === 'approval_request' && + block.requestId === requestId + ) { + if (onlyIfPending && block.approvalStatus !== 'pending') { + return false + } + block.approvalStatus = status + changed = true + } + } + } + return changed + } + + if (!setApprovalStatus(approved ? 'approved' : 'denied', true)) { + store.patchSession(id, { approvingRequestId: '' }) + return + } + + try { + const res = await approveToolCall(s.sessionHandle, requestId, approved) + if (res && res.status !== 'ok') { + store.patchSession(id, { + errorMessage: t('data_agent.approval_not_found') + }) + setApprovalStatus('pending') + } + } catch (e: any) { + store.patchSession(id, { + errorMessage: t('data_agent.approval_failed', { message: e.message }) + }) + setApprovalStatus('pending') + } finally { + store.patchSession(id, { approvingRequestId: '' }) + } + } + + function cancelActiveStream() { + if (store.activeSessionId) registry.cancelStream(store.activeSessionId) + } + + onBeforeUnmount(() => { + isUnmounted = true + for (const id of Array.from(registry.streamContexts.keys())) { + registry.cancelStream(id) + } + }) + + return { sendMessage, handleApproval, cancelActiveStream } +} diff --git a/kyuubi-server/web-ui/src/views/data-agent/composables/useSessionLifecycle.ts b/kyuubi-server/web-ui/src/views/data-agent/composables/useSessionLifecycle.ts new file mode 100644 index 00000000000..6e02eaf3b94 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/composables/useSessionLifecycle.ts @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { useI18n } from 'vue-i18n' +import { ElMessage } from 'element-plus' +import { openSession, closeSession, getSession } from '@/api/data-agent' +import { isValidApprovalMode, useDataAgentStore } from '@/pinia/data-agent' +import { sanitizeJdbcUrl, saveJdbcToHistory } from '../utils/jdbc' +import type { StreamRegistry } from './useChatStream' + +const APPROVAL_MODE_DEFAULT_KEY = 'data-agent-approval-mode' +const MODEL_DEFAULT_KEY = 'data-agent-model' + +export function defaultApprovalMode(): string { + const v = localStorage.getItem(APPROVAL_MODE_DEFAULT_KEY) + return isValidApprovalMode(v) ? (v as string) : 'NORMAL' +} + +export function defaultModel(): string { + return localStorage.getItem(MODEL_DEFAULT_KEY) || '' +} + +export function persistApprovalMode(mode: string) { + try { + localStorage.setItem(APPROVAL_MODE_DEFAULT_KEY, mode) + } catch { + /* ignore */ + } +} + +export function persistModel(model: string) { + try { + localStorage.setItem(MODEL_DEFAULT_KEY, (model || '').trim()) + } catch { + /* ignore */ + } +} + +export function useSessionLifecycle(opts: { + store: ReturnType + registry: StreamRegistry + syncUrl: (id: string | null) => void +}) { + const { store, registry, syncUrl } = opts + const { t } = useI18n() + + async function ensureSession(id: string): Promise { + const s = store.sessions[id] + if (!s) return false + if (s.sessionHandle) { + try { + await getSession(s.sessionHandle) + return true + } catch { + store.patchSession(id, { sessionHandle: '' }) + ElMessage.warning(t('data_agent.session_expired')) + return false + } + } + store.patchSession(id, { initializing: true }) + try { + const configs: Record = { + 'kyuubi.engine.type': 'DATA_AGENT' + } + const rawJdbc = s.jdbcUrl.trim() + if (rawJdbc) { + configs['kyuubi.engine.data.agent.jdbc.url'] = rawJdbc + } + const res: any = await openSession({ configs }) + const handle = res.identifier || res.id || '' + if (!handle) throw new Error('No session handle returned') + store.patchSession(id, { sessionHandle: handle }) + saveJdbcToHistory(rawJdbc) + // Backend used the raw URL; UI/persisted state should not keep credentials. + store.patchSession(id, { jdbcUrl: sanitizeJdbcUrl(rawJdbc) }) + return true + } catch (e: any) { + ElMessage.error( + t('data_agent.session_start_failed', { message: e.message }) + ) + return false + } finally { + store.patchSession(id, { initializing: false }) + } + } + + async function resetActiveSession() { + const id = store.activeSessionId + if (!id) return + const s = store.sessions[id] + if (!s) return + if (s.streaming) registry.cancelStream(id) + const handle = s.sessionHandle + if (handle) { + try { + await closeSession(handle) + } catch (e: any) { + // eslint-disable-next-line no-console + console.warn('Failed to close session:', e.message) + ElMessage.warning(t('data_agent.session_close_failed')) + } + } + store.resetSession(id) + } + + function dismissError() { + const id = store.activeSessionId + if (id) store.patchSession(id, { errorMessage: '', errorCanReset: false }) + } + + async function handleResetFromError() { + dismissError() + await resetActiveSession() + } + + function onNewSession() { + const id = store.createSession({ + approvalMode: defaultApprovalMode(), + model: defaultModel() + }) + syncUrl(id) + } + + function setActiveSession(id: string) { + store.setActive(id) + syncUrl(id) + } + + async function onCloseSession(id: string) { + const s = store.sessions[id] + if (!s) return + if (s.streaming) registry.cancelStream(id) + const handle = s.sessionHandle + if (handle) { + try { + await closeSession(handle) + } catch (e: any) { + // eslint-disable-next-line no-console + console.warn('Failed to close session:', e.message) + } + } + store.closeLocalSession(id) + if (!store.hasSessions) { + onNewSession() + } else { + syncUrl(store.activeSessionId) + } + } + + return { + ensureSession, + resetActiveSession, + dismissError, + handleResetFromError, + onNewSession, + setActiveSession, + onCloseSession + } +} diff --git a/kyuubi-server/web-ui/src/views/data-agent/composables/useStreamingMarkdown.ts b/kyuubi-server/web-ui/src/views/data-agent/composables/useStreamingMarkdown.ts new file mode 100644 index 00000000000..ed9d881d0b0 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/composables/useStreamingMarkdown.ts @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ref, watch, onBeforeUnmount, type Ref } from 'vue' +import { renderMarkdown } from '../utils/markdown' + +// Avoid reparsing marked+DOMPurify on every SSE delta. +export function useStreamingMarkdown( + text: () => string, + streaming: () => boolean +): Ref { + const STREAM_RENDER_INTERVAL = 80 + const html = ref('') + let throttleTimer: ReturnType | null = null + let lastRenderAt = 0 + + function flush() { + html.value = renderMarkdown(text()) + lastRenderAt = Date.now() + if (throttleTimer) { + clearTimeout(throttleTimer) + throttleTimer = null + } + } + + flush() + + watch(text, () => { + if (!streaming()) { + flush() + return + } + const elapsed = Date.now() - lastRenderAt + if (elapsed >= STREAM_RENDER_INTERVAL) { + flush() + } else if (!throttleTimer) { + throttleTimer = setTimeout(flush, STREAM_RENDER_INTERVAL - elapsed) + } + }) + + watch(streaming, (s) => { + if (!s) flush() + }) + + onBeforeUnmount(() => { + if (throttleTimer) clearTimeout(throttleTimer) + }) + + return html +} diff --git a/kyuubi-server/web-ui/src/views/data-agent/composables/useUrlSync.ts b/kyuubi-server/web-ui/src/views/data-agent/composables/useUrlSync.ts new file mode 100644 index 00000000000..21021c803c6 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/composables/useUrlSync.ts @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { watch } from 'vue' +import { useRoute, useRouter } from 'vue-router' +import type { useDataAgentStore } from '@/pinia/data-agent' + +const URL_QUERY_KEY = 'conversation' + +export function useUrlSync(store: ReturnType) { + const route = useRoute() + const router = useRouter() + + function syncUrl(id: string | null) { + const target = id || undefined + const current = + (route.query[URL_QUERY_KEY] as string | undefined) ?? undefined + if (current === target) return + router.replace({ + query: { ...route.query, [URL_QUERY_KEY]: target } + }) + } + + function readUrlSessionId(): string { + return typeof route.query[URL_QUERY_KEY] === 'string' + ? (route.query[URL_QUERY_KEY] as string) + : '' + } + + watch( + () => route.query[URL_QUERY_KEY], + (next) => { + const id = typeof next === 'string' ? next : '' + if (id && store.sessions[id] && id !== store.activeSessionId) { + store.setActive(id) + } + } + ) + + return { syncUrl, readUrlSessionId } +} diff --git a/kyuubi-server/web-ui/src/views/data-agent/index.vue b/kyuubi-server/web-ui/src/views/data-agent/index.vue new file mode 100644 index 00000000000..b4301f25b72 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/index.vue @@ -0,0 +1,185 @@ + + + + + + + + + diff --git a/kyuubi-server/web-ui/src/views/data-agent/types.ts b/kyuubi-server/web-ui/src/views/data-agent/types.ts new file mode 100644 index 00000000000..6627c7d86e9 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/types.ts @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export interface ChatBlock { + type: 'text' | 'tool_call' | 'approval_request' | 'reasoning' + toolCallId?: string + text?: string + name?: string + args?: unknown + result?: string + isError?: boolean + expanded?: boolean + requestId?: string + riskLevel?: string + approvalStatus?: 'pending' | 'approved' | 'denied' +} + +export interface TokenUsage { + accumulatedPrompt: number + accumulatedCompletion: number + lastPrompt: number + lastCompletion: number + steps?: number +} diff --git a/kyuubi-server/web-ui/src/views/data-agent/utils/clipboard.ts b/kyuubi-server/web-ui/src/views/data-agent/utils/clipboard.ts new file mode 100644 index 00000000000..22cbfd84aa3 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/utils/clipboard.ts @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ElMessage } from 'element-plus' +import { useI18n } from 'vue-i18n' +import { formatArgs } from './format' + +function legacyCopy(text: string): boolean { + const ta = document.createElement('textarea') + ta.value = text + ta.setAttribute('readonly', '') + ta.style.position = 'fixed' + ta.style.opacity = '0' + document.body.appendChild(ta) + ta.select() + let ok = false + try { + ok = document.execCommand('copy') + } catch { + ok = false + } + document.body.removeChild(ta) + return ok +} + +export function useCopyText() { + const { t } = useI18n() + return async function copyText(text: unknown) { + const payload = + typeof text === 'string' ? text : text == null ? '' : formatArgs(text) + let ok = false + if (navigator.clipboard?.writeText) { + try { + await navigator.clipboard.writeText(payload) + ok = true + } catch { + ok = false + } + } + if (!ok) ok = legacyCopy(payload) + if (ok) { + ElMessage.success({ message: t('data_agent.copied'), duration: 1500 }) + } else { + ElMessage.warning(t('data_agent.copy_failed')) + } + } +} diff --git a/kyuubi-server/web-ui/src/views/data-agent/utils/format.ts b/kyuubi-server/web-ui/src/views/data-agent/utils/format.ts new file mode 100644 index 00000000000..ec3aba73994 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/utils/format.ts @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export function formatTokens(n: number): string { + if (n >= 1000) return (n / 1000).toFixed(n >= 10000 ? 0 : 1) + 'k' + return String(n) +} + +export function formatArgs(args: unknown): string { + if (args == null) return '' + if (typeof args === 'string') { + try { + return JSON.stringify(JSON.parse(args), null, 2) + } catch { + return args + } + } + try { + return JSON.stringify(args, null, 2) + } catch { + return String(args) + } +} + +export function formatArgsForDisplay(args: unknown): string { + if (args == null) return '' + let value: unknown = args + if (typeof args === 'string') { + try { + value = JSON.parse(args) + } catch { + return args + } + } + return prettyPrintLoose(value, 0) +} + +// Display-only formatter; formatArgs remains the copy-safe JSON path. +function prettyPrintLoose(value: unknown, indent: number): string { + const pad = ' '.repeat(indent) + const padInner = ' '.repeat(indent + 1) + if (value === null) return 'null' + if (typeof value === 'string') { + if (value.includes('\n')) { + const lines = value.split('\n').map((l) => padInner + l) + return '"\n' + lines.join('\n') + '\n' + pad + '"' + } + return JSON.stringify(value) + } + if (typeof value !== 'object') return JSON.stringify(value) + if (Array.isArray(value)) { + if (value.length === 0) return '[]' + const items = value.map((v) => padInner + prettyPrintLoose(v, indent + 1)) + return '[\n' + items.join(',\n') + '\n' + pad + ']' + } + const entries = Object.entries(value as Record) + if (entries.length === 0) return '{}' + const items = entries.map( + ([k, v]) => + padInner + JSON.stringify(k) + ': ' + prettyPrintLoose(v, indent + 1) + ) + return '{\n' + items.join(',\n') + '\n' + pad + '}' +} diff --git a/kyuubi-server/web-ui/src/views/data-agent/utils/jdbc.ts b/kyuubi-server/web-ui/src/views/data-agent/utils/jdbc.ts new file mode 100644 index 00000000000..095b74da504 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/utils/jdbc.ts @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export const JDBC_TEMPLATES = [ + { + label: 'Spark / Hive (Thrift)', + value: + 'jdbc:hive2://localhost:10009/default;user=username;password=password', + isHistory: false + }, + { + label: 'Trino', + value: + 'jdbc:trino://localhost:8080/catalog/schema?user=username&password=password', + isHistory: false + }, + { + label: 'MySQL', + value: + 'jdbc:mysql://localhost:3306/mydb?user=username&password=password&useSSL=false', + isHistory: false + } +] + +const JDBC_HISTORY_KEY = 'data-agent-jdbc-history' +const SENSITIVE_PARAM = + /^(password|pwd|passwd|token|secret|authtoken|accesstoken|oauth2token)$/i + +export function sanitizeJdbcUrl(url: string): string { + return url + .replace(/^([a-zA-Z][\w+.-]*:[^/]*\/\/)[^/?#@]*@/, '$1') + .replace(/([?&;])([^=&;#]+)=([^&;#]*)/g, (match, sep, key) => + SENSITIVE_PARAM.test(key) ? sep : match + ) + .replace(/([?&;])[?&;]+/g, '$1') + .replace(/[?&;]$/, '') +} + +export function loadJdbcHistory(): string[] { + try { + return JSON.parse(localStorage.getItem(JDBC_HISTORY_KEY) || '[]') + } catch { + return [] + } +} + +export function saveJdbcToHistory(url: string) { + const sanitized = sanitizeJdbcUrl(url.trim()) + if (!sanitized) return + const history = loadJdbcHistory().filter((u) => u !== sanitized) + history.unshift(sanitized) + if (history.length > 10) history.length = 10 + try { + localStorage.setItem(JDBC_HISTORY_KEY, JSON.stringify(history)) + } catch { + /* quota exceeded or storage disabled — drop silently */ + } +} + +export function removeJdbcFromHistory(url: string) { + const history = loadJdbcHistory().filter((u) => u !== url) + try { + localStorage.setItem(JDBC_HISTORY_KEY, JSON.stringify(history)) + } catch { + /* ignore */ + } +} + +export interface JdbcSuggestion { + label: string + value: string + isHistory: boolean +} + +export function buildJdbcSuggestions( + query: string, + historyLabel: string +): JdbcSuggestion[] { + const history = loadJdbcHistory() + const templateValues = new Set(JDBC_TEMPLATES.map((tpl) => tpl.value)) + const historyItems: JdbcSuggestion[] = history + .filter((u) => !templateValues.has(u)) + .map((u) => ({ + label: historyLabel, + value: u, + isHistory: true + })) + const all = [...historyItems, ...JDBC_TEMPLATES] + if (!query) return all + const q = query.toLowerCase() + return all.filter( + (item) => + item.value.toLowerCase().includes(q) || + item.label.toLowerCase().includes(q) + ) +} diff --git a/kyuubi-server/web-ui/src/views/data-agent/utils/markdown.ts b/kyuubi-server/web-ui/src/views/data-agent/utils/markdown.ts new file mode 100644 index 00000000000..3bfd590c178 --- /dev/null +++ b/kyuubi-server/web-ui/src/views/data-agent/utils/markdown.ts @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { marked } from 'marked' +import DOMPurify from 'dompurify' + +// Force external links to open in a new tab without window.opener leakage. +// Guarded so repeated module imports don't stack duplicate hooks. +const DP = DOMPurify as unknown as { __kyuubiLinkHook?: boolean } +if (!DP.__kyuubiLinkHook) { + DOMPurify.addHook('afterSanitizeAttributes', (node) => { + if ('tagName' in node && (node as Element).tagName === 'A') { + const el = node as Element + el.setAttribute('target', '_blank') + el.setAttribute('rel', 'noopener noreferrer') + } + }) + DP.__kyuubiLinkHook = true +} + +export function renderMarkdown(content: string): string { + if (!content) return '' + try { + return DOMPurify.sanitize(marked.parse(content, { async: false }) as string) + } catch { + return DOMPurify.sanitize(content) + } +}