Skip to content

Commit 368945a

Browse files
committed
[SEA-NodeJS] Address code-review findings on async/TLS/options (#413)
Code-review #413 (81/100). Validated each against the code + a live warehouse: - F1 (HIGH): the async poll loop threw plain HiveDriverError for server-driven Cancelled/Closed/Unknown. The DBSQLOperation facade only mirrors its cancelled/closed flags when `err instanceof OperationStateError` (and OperationStateError extends HiveDriverError, not the reverse), so a server-side cancel/close/admin-kill left the facade desynced. Now throws OperationStateError(Canceled/Closed/Unknown) — matching the Thrift backend. The Failed branch still surfaces the kernel SQL-error envelope via awaitResult. - F2 (MED): the server-Cancelled test asserted only instanceOf(HiveDriverError), which passes for both the correct and incorrect type — it couldn't catch F1. Now asserts instanceOf(OperationStateError) + errorCode, plus a new Closed test. - F3 (MED): queryTimeout was forwarded to submitStatement but the kernel ignores queryTimeoutSecs on submit (always wait_timeout=0s), so the documented public option was a silent no-op, and the poll loop had no client-side deadline (a stalled Running statement polled forever). Now enforced client-side: the poll loop tracks a deadline, best-effort cancels the statement on expiry, and throws OperationStateError(Timeout) — matching Thrift's server TIMEDOUT outcome. Stopped forwarding the ignored queryTimeoutSecs to the napi options. Validated live: a 2s timeout interrupts a slow cross-join with TIMEOUT. - F4 (LOW): customCaCert PEM string check now requires the END marker too (a truncated/headerless cert no longer passes), consistent with the Buffer path. - F5 (LOW): SeaAuth reads the SEA-only fields (checkServerCertificate / customCaCert / maxConnections) through `InternalConnectionOptions` instead of ad-hoc inline casts, so a typo'd key fails to compile. - F6 (LOW): corrected the poll-loop comment — the prior text justified polling by an incorrect "blocking awaitResult holds the mutex and queues cancel" claim; cancel() is documented lock-free. The real rationale (real-time status to the progress callback + cancel observed between ticks) is now stated. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 32929aa commit 368945a

4 files changed

Lines changed: 120 additions & 32 deletions

File tree

lib/sea/SeaAuth.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
import { ConnectionOptions } from '../contracts/IDBSQLClient';
16+
import { InternalConnectionOptions } from '../contracts/InternalConnectionOptions';
1617
import AuthenticationError from '../errors/AuthenticationError';
1718
import HiveDriverError from '../errors/HiveDriverError';
1819

@@ -181,10 +182,10 @@ const MAX_U32 = 0xffffffff;
181182
* (for strings) lacks a PEM certificate header.
182183
*/
183184
export function buildSeaTlsOptions(options: ConnectionOptions): SeaTlsOptions {
184-
const { checkServerCertificate, customCaCert } = options as {
185-
checkServerCertificate?: boolean;
186-
customCaCert?: Buffer | string;
187-
};
185+
// Read the SEA-only fields through the purpose-built internal options type
186+
// rather than an ad-hoc inline cast, so the shape can't silently drift from
187+
// its declaration and a typo'd key fails to compile.
188+
const { checkServerCertificate, customCaCert } = options as ConnectionOptions & InternalConnectionOptions;
188189

189190
const tls: SeaTlsOptions = {};
190191

@@ -194,10 +195,17 @@ export function buildSeaTlsOptions(options: ConnectionOptions): SeaTlsOptions {
194195

195196
if (customCaCert !== undefined) {
196197
if (typeof customCaCert === 'string') {
197-
if (!customCaCert.includes('-----BEGIN CERTIFICATE-----')) {
198+
// Light PEM sanity check — require both the BEGIN and END markers so a
199+
// truncated/headerless cert is rejected here rather than surfacing as an
200+
// opaque kernel TLS error. Full parsing is deferred to the kernel.
201+
if (
202+
!customCaCert.includes('-----BEGIN CERTIFICATE-----') ||
203+
!customCaCert.includes('-----END CERTIFICATE-----')
204+
) {
198205
throw new HiveDriverError(
199206
'SEA backend: `customCaCert` string does not look like a PEM certificate ' +
200-
"(missing '-----BEGIN CERTIFICATE-----'). Pass PEM text or a Buffer of PEM bytes.",
207+
"(missing the '-----BEGIN CERTIFICATE-----' / '-----END CERTIFICATE-----' markers). " +
208+
'Pass PEM text or a Buffer of PEM bytes.',
201209
);
202210
}
203211
tls.customCaCert = Buffer.from(customCaCert, 'utf8');
@@ -293,7 +301,7 @@ export function buildSeaConnectionOptions(options: ConnectionOptions): SeaNative
293301
// SEA-only pool sizing; read via cast to match how this function reads the
294302
// other SEA-specific options (TLS) — they live on the internal options
295303
// surface, not the published public `ConnectionOptions` `.d.ts`.
296-
const { maxConnections } = options as { maxConnections?: number };
304+
const { maxConnections } = options as ConnectionOptions & InternalConnectionOptions;
297305
if (maxConnections !== undefined) {
298306
if (!Number.isInteger(maxConnections) || maxConnections < 1) {
299307
throw new HiveDriverError(`SEA backend: \`maxConnections\` must be a positive integer; got ${maxConnections}.`);

lib/sea/SeaOperationBackend.ts

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import IClientContext from '../contracts/IClientContext';
4545
import { LogLevel } from '../contracts/IDBSQLLogger';
4646
import Status from '../dto/Status';
4747
import HiveDriverError from '../errors/HiveDriverError';
48+
import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError';
4849
import ArrowResultConverter from '../result/ArrowResultConverter';
4950
import ResultSlicer from '../result/ResultSlicer';
5051
import SeaResultsProvider from './SeaResultsProvider';
@@ -121,6 +122,15 @@ export interface SeaOperationBackendOptions {
121122
* handle exposes one, else a fresh UUIDv4.
122123
*/
123124
id?: string;
125+
/**
126+
* Client-side query timeout in whole seconds (the public `queryTimeout`).
127+
* The kernel ignores `queryTimeoutSecs` on the async submit path
128+
* (`submitStatement` always sends `wait_timeout=0s`), so the JS poll loop
129+
* enforces it as a deadline — on expiry it best-effort cancels the statement
130+
* and throws `OperationStateError(Timeout)`, matching the Thrift path's
131+
* server-side TIMEDOUT outcome. Omitted ⇒ no client-side deadline.
132+
*/
133+
queryTimeoutSecs?: number;
124134
}
125135

126136
export default class SeaOperationBackend implements IOperationBackend {
@@ -154,7 +164,11 @@ export default class SeaOperationBackend implements IOperationBackend {
154164
// already-terminal statement. Drives both fetch and result-metadata.
155165
private fetchHandlePromise?: Promise<SeaFetchHandle>;
156166

157-
constructor({ asyncStatement, statement, context, id }: SeaOperationBackendOptions) {
167+
// Client-side query-timeout deadline in ms (the public `queryTimeout`),
168+
// undefined when unset. Enforced in the async poll loop.
169+
private readonly queryTimeoutMs?: number;
170+
171+
constructor({ asyncStatement, statement, context, id, queryTimeoutSecs }: SeaOperationBackendOptions) {
158172
if ((asyncStatement === undefined) === (statement === undefined)) {
159173
throw new HiveDriverError('SeaOperationBackend: exactly one of `asyncStatement` or `statement` must be provided');
160174
}
@@ -163,6 +177,7 @@ export default class SeaOperationBackend implements IOperationBackend {
163177
this.lifecycleHandle = (asyncStatement ?? statement) as SeaStatementHandle;
164178
this.context = context;
165179
this._id = id ?? asyncStatement?.statementId ?? statement?.statementId ?? uuidv4();
180+
this.queryTimeoutMs = queryTimeoutSecs !== undefined && queryTimeoutSecs > 0 ? queryTimeoutSecs * 1000 : undefined;
166181
}
167182

168183
public get id(): string {
@@ -329,20 +344,31 @@ export default class SeaOperationBackend implements IOperationBackend {
329344
// ---------------------------------------------------------------------------
330345

331346
/**
332-
* Poll the kernel `AsyncStatement` to a terminal state, mirroring the Thrift
333-
* backend's `waitUntilReady` loop (100ms cadence). Polling `status()` rather
334-
* than awaiting `awaitResult()` directly is deliberate: a blocking
335-
* `awaitResult()` holds the kernel statement mutex for the whole query and
336-
* would queue a concurrent `cancel()` behind it, whereas the poll loop
337-
* releases the mutex between ticks so `cancel()` stays responsive. On
338-
* success it materialises the result handle (so the first fetch is free);
339-
* on a bad terminal state it surfaces the real kernel error.
347+
* Poll the kernel `AsyncStatement` to a terminal state on a fixed 100ms
348+
* cadence, mirroring the Thrift backend's `waitUntilReady` loop. We poll
349+
* `status()` (a cheap GetStatementStatus RPC) rather than awaiting
350+
* `awaitResult()` directly so that `status()` reports the real
351+
* Pending/Running/Succeeded state to a progress callback each tick, and so a
352+
* JS-initiated `cancel()`/`close()` is observed between ticks via
353+
* `failIfNotActive`. On success it materialises the result handle (so the
354+
* first fetch is free); on a server-driven terminal state it throws the typed
355+
* error the `IOperationBackend` contract requires.
356+
*
357+
* Terminal errors are thrown as `OperationStateError` (NOT plain
358+
* `HiveDriverError`) for Cancelled/Closed/Unknown, because the DBSQLOperation
359+
* facade only mirrors its `cancelled`/`closed` flags when
360+
* `err instanceof OperationStateError` — exactly as the Thrift backend does.
361+
* The Failed branch surfaces the kernel's typed SQL-error envelope via
362+
* `awaitResult()`.
340363
*/
341364
private async waitUntilReadyAsync(options?: IOperationBackendWaitOptions): Promise<void> {
342365
// Already materialised → terminal-and-ready, nothing to wait for.
343366
if (this.fetchHandlePromise) {
344367
return;
345368
}
369+
// Client-side timeout deadline: the kernel ignores queryTimeoutSecs on the
370+
// async submit path, so we enforce the public `queryTimeout` here.
371+
const deadline = this.queryTimeoutMs !== undefined ? Date.now() + this.queryTimeoutMs : undefined;
346372
for (;;) {
347373
// A JS-initiated cancel/close short-circuits before the next poll.
348374
failIfNotActive(this.lifecycle);
@@ -373,11 +399,20 @@ export default class SeaOperationBackend implements IOperationBackend {
373399
await this.throwAsyncError();
374400
break;
375401
case OperationState.Cancelled:
376-
throw new HiveDriverError(`SEA operation ${this._id} was cancelled server-side.`);
402+
throw new OperationStateError(OperationStateErrorCode.Canceled);
377403
case OperationState.Closed:
378-
throw new HiveDriverError(`SEA operation ${this._id} was closed before it produced a result.`);
404+
throw new OperationStateError(OperationStateErrorCode.Closed);
379405
default:
380-
throw new HiveDriverError(`SEA operation ${this._id} reached an unexpected state: ${state}.`);
406+
throw new OperationStateError(OperationStateErrorCode.Unknown);
407+
}
408+
409+
// Still Pending/Running — enforce the client-side timeout before sleeping.
410+
if (deadline !== undefined && Date.now() >= deadline) {
411+
// Best-effort server-side cancel so the statement doesn't keep running
412+
// after we stop waiting; never mask the timeout with a cancel failure.
413+
// eslint-disable-next-line no-await-in-loop
414+
await this.cancel().catch(() => undefined);
415+
throw new OperationStateError(OperationStateErrorCode.Timeout);
381416
}
382417

383418
// eslint-disable-next-line no-await-in-loop

lib/sea/SeaSessionBackend.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,15 @@ export default class SeaSessionBackend implements ISessionBackend {
169169
} catch (err) {
170170
throw this.logAndMapError('executeStatement', err);
171171
}
172-
return new SeaOperationBackend({ asyncStatement: asyncStatement!, context: this.context });
172+
// `queryTimeout` is enforced client-side by the operation backend's poll
173+
// loop: the kernel ignores `queryTimeoutSecs` on the async submit path
174+
// (`submitStatement` always sends `wait_timeout=0s`), so we do NOT forward
175+
// it to the napi options — passing it there would be a silent no-op.
176+
return new SeaOperationBackend({
177+
asyncStatement: asyncStatement!,
178+
context: this.context,
179+
queryTimeoutSecs: options.queryTimeout !== undefined ? Number(options.queryTimeout) : undefined,
180+
});
173181
}
174182

175183
/**
@@ -195,11 +203,9 @@ export default class SeaSessionBackend implements ISessionBackend {
195203
if (namedParams !== undefined) {
196204
execOptions.namedParams = namedParams;
197205
}
198-
// JDBC `setQueryTimeout` is whole seconds; the kernel's `queryTimeoutSecs`
199-
// (SEA wait timeout) is the native equivalent. The SEA wire caps it at 50s.
200-
if (options.queryTimeout !== undefined) {
201-
execOptions.queryTimeoutSecs = Number(options.queryTimeout);
202-
}
206+
// `queryTimeout` is intentionally NOT forwarded here — the kernel ignores
207+
// `queryTimeoutSecs` on `submitStatement`, so it is enforced client-side by
208+
// the operation backend's poll-loop deadline instead (see executeStatement).
203209
if (options.rowLimit !== undefined) {
204210
execOptions.rowLimit = Number(options.rowLimit);
205211
}

tests/unit/sea/execution.test.ts

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import IClientContext, { ClientConfig } from '../../../lib/contracts/IClientCont
2222
import IDBSQLLogger, { LogLevel } from '../../../lib/contracts/IDBSQLLogger';
2323
import HiveDriverError from '../../../lib/errors/HiveDriverError';
2424
import ParameterError from '../../../lib/errors/ParameterError';
25+
import OperationStateError, { OperationStateErrorCode } from '../../../lib/errors/OperationStateError';
2526
import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient';
2627
import { OperationState } from '../../../lib/contracts/OperationStatus';
2728

@@ -515,11 +516,14 @@ describe('SeaSessionBackend', () => {
515516
expect((thrown as Error).message).to.equal('Driver does not support both ordinal and named parameters.');
516517
});
517518

518-
it('executeStatement forwards queryTimeout as queryTimeoutSecs', async () => {
519+
it('executeStatement does NOT forward queryTimeout to submit (kernel ignores it; enforced client-side)', async () => {
519520
const connection = new FakeNativeConnection();
520521
const session = makeSession(connection);
521522
await session.executeStatement('SELECT 1', { queryTimeout: 30 });
522-
expect((connection.lastOptions as { queryTimeoutSecs?: number }).queryTimeoutSecs).to.equal(30);
523+
// queryTimeout alone must not produce napi submit options — the kernel
524+
// ignores queryTimeoutSecs on submitStatement, so it's enforced client-side
525+
// by the operation backend's poll deadline instead (covered below).
526+
expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(undefined);
523527
});
524528

525529
it('executeStatement forwards rowLimit', async () => {
@@ -734,9 +738,9 @@ describe('SeaOperationBackend', () => {
734738
});
735739

736740
describe('SeaOperationBackend — async (submitStatement) path', () => {
737-
const makeAsyncOp = (asyncStatement: FakeAsyncStatement) =>
741+
const makeAsyncOp = (asyncStatement: FakeAsyncStatement, queryTimeoutSecs?: number) =>
738742
// eslint-disable-next-line @typescript-eslint/no-explicit-any
739-
new SeaOperationBackend({ asyncStatement: asyncStatement as any, context: makeContext() });
743+
new SeaOperationBackend({ asyncStatement: asyncStatement as any, context: makeContext(), queryTimeoutSecs });
740744

741745
it('rejects when neither asyncStatement nor statement is provided', () => {
742746
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -794,16 +798,51 @@ describe('SeaOperationBackend — async (submitStatement) path', () => {
794798
expect((thrown as Error).message).to.match(/TABLE_OR_VIEW_NOT_FOUND/);
795799
});
796800

797-
it('waitUntilReady() throws on a server-side Cancelled statement', async () => {
801+
// A server-driven terminal state MUST throw OperationStateError (not a plain
802+
// HiveDriverError) so the DBSQLOperation facade — which only mirrors its
803+
// cancelled/closed flags when `err instanceof OperationStateError` — stays in
804+
// sync. Asserting the subclass + errorCode is what catches a regression to
805+
// the bare HiveDriverError (which would pass an `instanceOf HiveDriverError`
806+
// check since OperationStateError extends it).
807+
it('waitUntilReady() throws OperationStateError(Canceled) on a server-side Cancelled statement', async () => {
798808
const op = makeAsyncOp(new FakeAsyncStatement('Cancelled'));
799809
let thrown: unknown;
800810
try {
801811
await op.waitUntilReady();
802812
} catch (err) {
803813
thrown = err;
804814
}
805-
expect(thrown).to.be.instanceOf(HiveDriverError);
806-
expect((thrown as Error).message).to.match(/cancelled/i);
815+
expect(thrown).to.be.instanceOf(OperationStateError);
816+
expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Canceled);
817+
});
818+
819+
it('waitUntilReady() throws OperationStateError(Closed) on a server-side Closed statement', async () => {
820+
const op = makeAsyncOp(new FakeAsyncStatement('Closed'));
821+
let thrown: unknown;
822+
try {
823+
await op.waitUntilReady();
824+
} catch (err) {
825+
thrown = err;
826+
}
827+
expect(thrown).to.be.instanceOf(OperationStateError);
828+
expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Closed);
829+
});
830+
831+
it('waitUntilReady() enforces queryTimeout client-side: throws Timeout and cancels a stuck Running statement', async function timeoutTest() {
832+
// eslint-disable-next-line no-invalid-this
833+
this.timeout(5000);
834+
const stmt = new FakeAsyncStatement('Running'); // never reaches a terminal state
835+
const op = makeAsyncOp(stmt, 0.05); // 50ms client-side deadline
836+
let thrown: unknown;
837+
try {
838+
await op.waitUntilReady();
839+
} catch (err) {
840+
thrown = err;
841+
}
842+
expect(thrown).to.be.instanceOf(OperationStateError);
843+
expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Timeout);
844+
// Best-effort server-side cancel fired so the statement doesn't keep running.
845+
expect(stmt.cancelled).to.equal(true);
807846
});
808847

809848
it('cancel() forwards to the async statement and short-circuits a subsequent poll', async () => {

0 commit comments

Comments
 (0)