Skip to content

Commit 5d573a4

Browse files
authored
Merge pull request #288 from asonas/databrics
Add Databricks Data Source Support
2 parents be1f72c + dc0d1bc commit 5d573a4

6 files changed

Lines changed: 502 additions & 3 deletions

File tree

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"@fortawesome/fontawesome-free": "5.12.0",
3333
"@google-cloud/bigquery": "5.6.0",
3434
"@hokaccha/sql-formatter": "^1.4.0",
35+
"axios": "^1.11.0",
3536
"classnames": "2.2.5",
3637
"codemirror": "~5.58.2",
3738
"compare-versions": "^4.1.3",

src/lib/DataSource.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import TreasureData from "./DataSourceDefinition/TreasureData";
77
import Athena from "./DataSourceDefinition/Athena";
88
import SQLite3 from "./DataSourceDefinition/SQLite3";
99
import Timestream from "./DataSourceDefinition/Timestream";
10+
import Databricks from "./DataSourceDefinition/Databricks";
1011
import { DataSourceType } from "../renderer/pages/DataSource/DataSourceStore";
1112

1213
export type DataSourceClasses =
@@ -16,7 +17,8 @@ export type DataSourceClasses =
1617
| typeof TreasureData
1718
| typeof Athena
1819
| typeof SQLite3
19-
| typeof Timestream;
20+
| typeof Timestream
21+
| typeof Databricks;
2022

2123
export default class DataSource {
2224
static dataSources: { [dataSourceName: string]: DataSourceClasses };
@@ -44,4 +46,4 @@ export default class DataSource {
4446
}
4547
}
4648

47-
DataSource.register(Mysql, Postgres, Redshift, BigQuery, TreasureData, Athena, SQLite3, Timestream);
49+
DataSource.register(Mysql, Postgres, Redshift, BigQuery, TreasureData, Athena, SQLite3, Timestream, Databricks);
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
import axios, { AxiosInstance } from "axios";
2+
import Base, { ConfigSchemasType, TableSummary } from "./Base";
3+
import { DataSourceKeys } from "../../renderer/pages/DataSource/DataSourceStore";
4+
import { Language } from "@hokaccha/sql-formatter";
5+
6+
interface DatabricksStatementResponse {
7+
statement_id: string;
8+
status: {
9+
state: "PENDING" | "RUNNING" | "SUCCEEDED" | "FAILED" | "CANCELED";
10+
};
11+
manifest?: {
12+
format: string;
13+
schema: {
14+
columns: Array<{
15+
name: string;
16+
type_name: string;
17+
}>;
18+
};
19+
};
20+
result?: {
21+
data_array?: any[][];
22+
chunk_index?: number;
23+
row_count?: number;
24+
row_offset?: number;
25+
};
26+
}
27+
28+
export default class Databricks extends Base {
29+
private static readonly POLL_MAX_ATTEMPTS = 60; // 5 minutes with 5 second intervals
30+
private static readonly POLL_INTERVAL_MS = 5000; // 5 seconds
31+
private static readonly QUERY_TIMEOUT_MS = 10000; // 10 seconds wait timeout for statement execution
32+
33+
private client: AxiosInstance;
34+
private currentStatementId: string | null = null;
35+
36+
static override get key(): DataSourceKeys {
37+
return "databricks";
38+
}
39+
40+
static override get label(): string {
41+
return "Databricks";
42+
}
43+
44+
static override get formatType(): Language {
45+
return "sql";
46+
}
47+
48+
static override get configSchema(): ConfigSchemasType {
49+
return [
50+
{
51+
name: "serverHostname",
52+
label: "Server Hostname",
53+
type: "string",
54+
placeholder: "your-workspace.databricks.com",
55+
required: true,
56+
},
57+
{
58+
name: "httpPath",
59+
label: "HTTP Path",
60+
type: "string",
61+
placeholder: "/sql/1.0/warehouses/your-warehouse-id",
62+
required: true,
63+
},
64+
{
65+
name: "personalAccessToken",
66+
label: "Personal Access Token",
67+
type: "password",
68+
required: true,
69+
},
70+
{
71+
name: "catalog",
72+
label: "Catalog",
73+
type: "string",
74+
placeholder: "main",
75+
default: "main",
76+
},
77+
{
78+
name: "schema",
79+
label: "Schema",
80+
type: "string",
81+
placeholder: "default",
82+
default: "default",
83+
},
84+
];
85+
}
86+
87+
constructor(config: any) {
88+
super(config);
89+
this.client = axios.create({
90+
baseURL: `https://${this.config.serverHostname}/api/2.0`,
91+
headers: {
92+
Authorization: `Bearer ${this.config.personalAccessToken}`,
93+
"Content-Type": "application/json",
94+
},
95+
});
96+
}
97+
98+
async execute(query: string, options: any = {}): Promise<any> {
99+
let statementId: string | null = null;
100+
101+
try {
102+
// Execute SQL statement
103+
const executeResponse = await this.client.post("/sql/statements", {
104+
statement: query,
105+
warehouse_id: this.extractWarehouseId(),
106+
catalog: this.config.catalog || "main",
107+
schema: this.config.schema || "default",
108+
wait_timeout: `${Databricks.QUERY_TIMEOUT_MS / 1000}s`,
109+
});
110+
111+
statementId = executeResponse.data.statement_id;
112+
this.currentStatementId = statementId;
113+
114+
// Poll for completion
115+
if (!statementId) {
116+
throw new Error("Statement ID is null");
117+
}
118+
const result = await this.pollForCompletion(statementId);
119+
120+
if (result.status.state === "FAILED") {
121+
throw new Error(`Query failed: ${JSON.stringify(result)}`);
122+
}
123+
124+
if (!result.manifest || !result.result) {
125+
throw new Error("No result data returned from Databricks");
126+
}
127+
128+
const fields = result.manifest.schema.columns.map((col) => col.name);
129+
const rows = result.result.data_array || [];
130+
131+
return { fields, rows };
132+
} catch (err: any) {
133+
throw this._errorWithLine(err, query, options.startLine || 1);
134+
} finally {
135+
// Only clear currentStatementId if it matches the current statement
136+
if (this.currentStatementId === statementId) {
137+
this.currentStatementId = null;
138+
}
139+
}
140+
}
141+
142+
async cancel(): Promise<void> {
143+
if (!this.currentStatementId) {
144+
console.log("Databricks cancel: No active statement to cancel");
145+
return;
146+
}
147+
148+
const statementIdToCancel = this.currentStatementId;
149+
console.log(`Databricks cancel: Attempting to cancel statement ${statementIdToCancel}`);
150+
151+
try {
152+
await this.client.post(`/sql/statements/${statementIdToCancel}/cancel`);
153+
console.log(`Databricks cancel: Successfully sent cancel request for statement ${statementIdToCancel}`);
154+
} catch (err: any) {
155+
console.warn(`Databricks cancel: Failed to cancel statement ${statementIdToCancel}:`, err.message);
156+
} finally {
157+
this.currentStatementId = null;
158+
}
159+
}
160+
161+
async connectionTest(): Promise<any> {
162+
await this.execute("SELECT 1 as test");
163+
return true;
164+
}
165+
166+
async fetchTables(): Promise<{ name: string; type: string; schema?: string }[]> {
167+
const catalog = this.config.catalog || "main";
168+
169+
try {
170+
const schemasQuery = `SHOW SCHEMAS IN ${catalog}`;
171+
const { rows: schemaRows } = await this.execute(schemasQuery);
172+
173+
const schemaPromises = schemaRows.map(async (schemaRow) => {
174+
const schemaName = schemaRow[0]; // schema name is in first column
175+
176+
try {
177+
const tablesQuery = `SHOW TABLES IN ${catalog}.${schemaName}`;
178+
const { rows: tableRows } = await this.execute(tablesQuery);
179+
180+
return tableRows.map((row) => ({
181+
name: row[1], // table name is in second column
182+
type: "TABLE", // SHOW TABLES doesn't distinguish types, assume TABLE
183+
schema: schemaName,
184+
}));
185+
} catch (err) {
186+
console.warn(`Could not access tables in schema ${schemaName}:`, err);
187+
return [];
188+
}
189+
});
190+
191+
const results = await Promise.all(schemaPromises);
192+
return results.flat();
193+
} catch (err) {
194+
console.warn("Could not fetch tables:", err);
195+
return [];
196+
}
197+
}
198+
199+
async fetchTableSummary({ schema, name }: { schema: string; name: string }): Promise<TableSummary> {
200+
const catalog = this.config.catalog || "main";
201+
const query = `DESCRIBE TABLE ${catalog}.${schema}.${name}`;
202+
203+
const result = await this.execute(query);
204+
const defs = {
205+
fields: ["col_name", "data_type", "comment"],
206+
rows: result.rows,
207+
};
208+
209+
return { schema, name, defs };
210+
}
211+
212+
dataSourceInfo(): Record<string, any> {
213+
return {
214+
type: Databricks.label,
215+
serverHostname: this.config.serverHostname,
216+
httpPath: this.config.httpPath,
217+
catalog: this.config.catalog || "main",
218+
schema: this.config.schema || "default",
219+
};
220+
}
221+
222+
private extractWarehouseId(): string {
223+
// Extract warehouse ID from HTTP path like "/sql/1.0/warehouses/abc123def456"
224+
const match = this.config.httpPath.match(/\/warehouses\/([^/]+)/);
225+
if (!match) {
226+
throw new Error("Could not extract warehouse ID from HTTP path");
227+
}
228+
return match[1];
229+
}
230+
231+
private async pollForCompletion(statementId: string): Promise<DatabricksStatementResponse> {
232+
let attempts = 0;
233+
234+
while (attempts < Databricks.POLL_MAX_ATTEMPTS) {
235+
// Check if this statement was cancelled
236+
if (this.currentStatementId !== statementId) {
237+
throw new Error("Query was cancelled");
238+
}
239+
240+
const response = await this.client.get(`/sql/statements/${statementId}`);
241+
const data: DatabricksStatementResponse = response.data;
242+
243+
if (data.status.state === "SUCCEEDED" || data.status.state === "FAILED" || data.status.state === "CANCELED") {
244+
return data;
245+
}
246+
247+
// Wait before next poll
248+
await new Promise((resolve) => setTimeout(resolve, Databricks.POLL_INTERVAL_MS));
249+
attempts++;
250+
}
251+
252+
throw new Error(
253+
`Query timed out after ${(Databricks.POLL_MAX_ATTEMPTS * Databricks.POLL_INTERVAL_MS) / 60000} minutes`
254+
);
255+
}
256+
257+
private _errorWithLine(err: any, _query: string, startLine: number): Error {
258+
let message = err.message || "Unknown error";
259+
260+
if (err.response?.data?.message) {
261+
message = err.response.data.message;
262+
}
263+
264+
// Try to extract line number from Databricks error if available
265+
const lineMatch = message.match(/line (\d+)/i);
266+
if (lineMatch) {
267+
const errorLine = parseInt(lineMatch[1]) + startLine - 1;
268+
message += ` (line: ${errorLine})`;
269+
}
270+
271+
return new Error(message);
272+
}
273+
}

src/renderer/pages/DataSource/DataSourceStore.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { TableSummary } from "../../../lib/DataSourceDefinition/Base";
55
export type DataSourceKeys =
66
| "athena"
77
| "bigquery"
8+
| "databricks"
89
| "mysql"
910
| "postgres"
1011
| "redshift"

0 commit comments

Comments
 (0)