Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
6b3affd
sea-napi-binding: scaffold native/sea/ crate with version() smoke test
msrathore-db May 15, 2026
0085928
sea-abstraction: introduce IBackend / ISessionBackend / IOperationBac…
msrathore-db May 15, 2026
f7cdb80
sea-errors-logging: kernel ErrorCode → JS error class mapping
msrathore-db May 15, 2026
40d0b57
sea-napi-binding: Database/Connection/Statement/ResultStream methods …
msrathore-db May 15, 2026
64c67a0
auth: merge sea-abstraction dep
msrathore-db May 15, 2026
75cba3a
auth: merge sea-napi-binding dep
msrathore-db May 15, 2026
271b22d
auth: merge sea-errors-logging dep
msrathore-db May 15, 2026
c894742
sea-auth: PAT auth flow through SeaBackend → napi binding
msrathore-db May 15, 2026
5eba37f
sea-auth-u2m: OAuth M2M + U2M through SeaBackend → napi binding → kernel
msrathore-db May 15, 2026
4f6ccc0
sea-auth-u2m: address round-1 M2M review parity — shared fakeBinding …
msrathore-db May 15, 2026
8e99b40
sea-auth-u2m: address round-1 review (HIGH error-mapping wiring + 7 m…
msrathore-db May 15, 2026
98d5ecf
sea-auth-u2m: round-2 fixup — wrap close() in decodeNapiKernelError, …
msrathore-db May 15, 2026
ee5f03e
sea-auth-u2m: round-3 fixup — namespace kernel metadata, dedupe predi…
msrathore-db May 15, 2026
37156db
sea-auth-u2m: flip kernel path-dep to napi-binding worktree (carries …
msrathore-db May 15, 2026
21916fd
sea-auth-u2m: rewire M2M e2e to AAD SP on pecotesting HTTP_PATH2
msrathore-db May 16, 2026
a97b96e
sea-auth-u2m: round-4 fixup — restore M2M-with-bad-secret class, stri…
msrathore-db May 16, 2026
e9131ae
sea-auth-u2m: round-5 fixup — JSDoc selector contract, defense-in-dep…
msrathore-db May 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 46 additions & 74 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import thrift from 'thrift';
import Int64 from 'node-int64';

import { EventEmitter } from 'events';
import TCLIService from '../thrift/TCLIService';
import { TProtocolVersion } from '../thrift/TCLIService_types';
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
import IDriver from './contracts/IDriver';
import IClientContext, { ClientConfig } from './contracts/IClientContext';
Expand All @@ -14,9 +12,11 @@ import IDBSQLSession from './contracts/IDBSQLSession';
import IAuthentication from './connection/contracts/IAuthentication';
import HttpConnection from './connection/connections/HttpConnection';
import IConnectionOptions from './connection/contracts/IConnectionOptions';
import Status from './dto/Status';
import HiveDriverError from './errors/HiveDriverError';
import { buildUserAgentString, definedOrError, serializeQueryTags } from './utils';
import { buildUserAgentString } from './utils';
import IBackend from './contracts/IBackend';
import ThriftBackend from './thrift-backend/ThriftBackend';
import SeaBackend from './sea/SeaBackend';
import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
import DatabricksOAuth, { OAuthFlow } from './connection/auth/DatabricksOAuth';
import {
Expand All @@ -39,19 +39,6 @@ function prependSlash(str: string): string {
return str;
}

function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) {
if (!catalogName && !schemaName) {
return {};
}

return {
initialNamespace: {
catalogName,
schemaName,
},
};
}

export type ThriftLibrary = Pick<typeof thrift, 'createClient'>;

export default class DBSQLClient extends EventEmitter implements IDBSQLClient, IClientContext {
Expand All @@ -75,6 +62,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

private readonly sessions = new CloseableCollection<DBSQLSession>();

private backend?: IBackend;

private static getDefaultLogger(): IDBSQLLogger {
if (!this.defaultLogger) {
this.defaultLogger = new DBSQLLogger();
Expand Down Expand Up @@ -248,38 +237,45 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

this.connectionProvider = this.createConnectionProvider(options);

const thriftConnection = await this.connectionProvider.getThriftConnection();

thriftConnection.on('error', (error: Error) => {
// Error.stack already contains error type and message, so log stack if available,
// otherwise fall back to just error type + message
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
try {
this.emit('error', error);
} catch (e) {
// EventEmitter will throw unhandled error when emitting 'error' event.
// Since we already logged it few lines above, just suppress this behaviour
}
});

thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => {
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`);
this.emit('reconnecting', params);
});

thriftConnection.on('close', () => {
this.logger.log(LogLevel.debug, 'Closing connection.');
this.emit('close');
});
this.backend = options.useSEA
? new SeaBackend()
: new ThriftBackend({
context: this,
onConnectionEvent: (event, payload) => this.forwardConnectionEvent(event, payload),
});

thriftConnection.on('timeout', () => {
this.logger.log(LogLevel.debug, 'Connection timed out.');
this.emit('timeout');
});
await this.backend.connect(options);

return this;
}

private forwardConnectionEvent(event: 'error' | 'reconnecting' | 'close' | 'timeout', payload?: unknown): void {
switch (event) {
case 'error': {
const error = payload as Error;
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
try {
this.emit('error', error);
} catch (e) {
// EventEmitter throws when 'error' has no listeners; we've already logged it.
}
return;
}
case 'reconnecting':
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(payload)}`);
this.emit('reconnecting', payload);
return;
case 'close':
this.logger.log(LogLevel.debug, 'Closing connection.');
this.emit('close');
return;
case 'timeout':
this.logger.log(LogLevel.debug, 'Connection timed out.');
this.emit('timeout');
// no default
}
}

/**
* Starts new session
* @public
Expand All @@ -290,44 +286,20 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
* const session = await client.openSession();
*/
public async openSession(request: OpenSessionRequest = {}): Promise<IDBSQLSession> {
// Prepare session configuration
const configuration = request.configuration ? { ...request.configuration } : {};

// Add metric view metadata config if enabled
if (this.config.enableMetricViewMetadata) {
configuration['spark.sql.thriftserver.metadata.metricview.enabled'] = 'true';
}

// Serialize queryTags dict and set in configuration; takes precedence over configuration.QUERY_TAGS
if (request.queryTags !== undefined) {
const serialized = serializeQueryTags(request.queryTags);
if (serialized) {
configuration.QUERY_TAGS = serialized;
} else {
delete configuration.QUERY_TAGS;
}
if (!this.backend) {
throw new HiveDriverError('DBSQLClient: not connected');
}

const response = await this.driver.openSession({
client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8),
...getInitialNamespaceOptions(request.initialCatalog, request.initialSchema),
configuration,
canUseMultipleCatalogs: true,
});

Status.assert(response.status);
const session = new DBSQLSession({
handle: definedOrError(response.sessionHandle),
context: this,
serverProtocolVersion: response.serverProtocolVersion,
});
const sessionBackend = await this.backend.openSession(request);
const session = new DBSQLSession({ backend: sessionBackend, context: this });
this.sessions.add(session);
return session;
}

public async close(): Promise<void> {
await this.sessions.closeAll();
await this.backend?.close();

this.backend = undefined;
this.client = undefined;
this.connectionProvider = undefined;
this.authProvider = undefined;
Expand Down
Loading
Loading