Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 54 additions & 64 deletions lib/sea/SeaSessionBackend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import ParameterError from '../errors/ParameterError';
import { LogLevel } from '../contracts/IDBSQLLogger';
import { SeaConnection, SeaNativeExecuteOptions, SeaStatement } from './SeaNativeLoader';
import { decodeNapiKernelError } from './SeaErrorMapping';
import { numberToInt64 } from '../thrift-backend/ThriftSessionBackend';
import SeaOperationBackend from './SeaOperationBackend';
import { buildSeaPositionalParams, buildSeaNamedParams } from './SeaPositionalParams';
import { seaServerInfoValue } from './SeaServerInfo';
Expand Down Expand Up @@ -122,38 +121,41 @@ export default class SeaSessionBackend implements ISessionBackend {
* Per-statement options forwarded to the kernel `ExecuteOptions`:
* - `ordinalParameters` / `namedParameters` → bound params (mutually
* exclusive — the kernel binds one placeholder style per statement);
* - `queryTimeout` → enforced client-side by the operation backend's poll
* deadline (the kernel ignores `queryTimeoutSecs` on the async submit
* path), NOT forwarded to the napi options;
* - `rowLimit` → `rowLimit` (SEA-only server-side row cap);
* - `queryTags` → serialised into the conf overlay's reserved
* `query_tags` key (the same wire shape Thrift's `serializeQueryTags`
* produces), merged with any explicit `statementConf`.
*
* Still rejected (genuinely unsupported on SEA, rather than silently
* dropped): `useCloudFetch` (governed by the kernel `ResultConfig`, not a
* per-statement knob), `useLZ4Compression` (kernel owns result compression),
* and `stagingAllowedLocalPath` (volume operations). `maxRows` is applied by
* the facade at fetch time, so it is intentionally not handled here.
* Accepted but IGNORED (no-op — the kernel exposes no per-statement knob, so
* we drop rather than reject; see the body for details and TODOs):
* `useCloudFetch`, `useLZ4Compression`, `stagingAllowedLocalPath`, and
* `queryTimeout`. `maxRows` is applied by the facade at fetch time, so it is
* intentionally not handled here.
*/
public async executeStatement(statement: string, options: ExecuteStatementOptions): Promise<IOperationBackend> {
this.failIfClosed();

if (options.useCloudFetch !== undefined) {
throw new HiveDriverError(
'SEA executeStatement: useCloudFetch is controlled by the kernel result configuration and is not a per-statement option on SEA',
);
}
if (options.useLZ4Compression !== undefined) {
throw new HiveDriverError(
'SEA executeStatement: useLZ4Compression is not supported on SEA (result compression is governed by the kernel)',
);
}
if (options.stagingAllowedLocalPath !== undefined) {
throw new HiveDriverError(
'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported on SEA',
);
}
// `useCloudFetch`, `useLZ4Compression`, and `stagingAllowedLocalPath` are
// accepted and IGNORED (no-op) on the kernel-backed SEA path rather than
// rejected — the kernel exposes no per-statement knob for any of them, so a
// hard failure would break callers that set these options globally. This
// mirrors the Python connector's kernel backend
// (`KernelDatabricksClient.execute_command`), which takes the same flags and
// never reads them.
//
// - `useCloudFetch`: result transport is governed by the session-level
// kernel `ResultConfig.cloudfetch_enabled` (default: CloudFetch on);
// there is no per-statement override on the napi surface.
// - `useLZ4Compression`: the kernel transparently decodes whatever
// compression the server returns (`manifest.result_compression`) and
// exposes no compression-request knob.
// - `stagingAllowedLocalPath`: the kernel has no Volume (PUT/GET/REMOVE)
// API yet, so `SeaOperationBackend` always reports
// `isStagingOperation: false` and `DBSQLSession` treats such statements
// as ordinary queries. Non-staging queries that set the option run
// normally (parity with Thrift).
// TODO(SEA): wire real volume operations once the kernel exposes a
// Volume API + napi `is_volume_operation`.

// `runAsync` selects the kernel execution path. NOTE: this is a SEA/kernel-
// specific use of the option — the Thrift backend hardcodes `runAsync: true`
Expand All @@ -166,26 +168,27 @@ export default class SeaSessionBackend implements ISessionBackend {
// - DEFAULT (`runAsync` false/undefined) — SYNC. Route through
// `executeStatementCancellable`: the kernel blocks on `execute()`
// (server-side direct-results / poll-to-terminal), which is faster and,
// with the napi sync canceller, fully cancellable mid-COMPUTE. The
// blocking drive runs in the operation backend's `result()` (inside
// `waitUntilReady`, which the facade invokes lazily at first fetch).
// `queryTimeoutSecs` IS honoured on this path (forwarded to the napi
// options below) since the kernel `execute()` consults it.
// with the napi sync canceller, fully cancellable mid-COMPUTE.
//
// - `runAsync: true` — ASYNC. Submit (`wait_timeout=0s`): the server
// returns a pending `AsyncStatement` immediately while the query runs;
// the backend polls `status()` to terminal in `waitUntilReady()` and
// materialises results via `awaitResult()`. `queryTimeoutSecs` is
// ignored by the kernel on submit, so it is enforced client-side by the
// operation backend's poll-loop deadline instead.
// materialises results via `awaitResult()`.
//
// TODO(SEA): `queryTimeout` is intentionally a NO-OP here. It must NOT be
// mapped to the SEA `wait_timeout` wire field: `wait_timeout` is the
// inline-result wait knob (valid range {0} ∪ [5,50]s, paired with
// `on_wait_timeout`), a different concept from a server statement-execution
// timeout, and out-of-range values fail with HTTP 400. The correct SEA
// mechanism is the `STATEMENT_TIMEOUT` session configuration (seconds); the
// Python connector forwards no per-statement timeout at all. Wiring this
// properly (STATEMENT_TIMEOUT and/or a client-side poll deadline) is
// deferred — until then the option is accepted and ignored.
const runAsync = options.runAsync ?? false;
const queryTimeoutSecs =
options.queryTimeout !== undefined ? numberToInt64(options.queryTimeout).toNumber() : undefined;

const execOptions = this.buildExecuteOptions(options);

if (!runAsync) {
// Sync path: forward `queryTimeoutSecs` to the napi options — the kernel
// `execute()` honours it (server statement timeout).
const execOptions = this.buildExecuteOptions(options, queryTimeoutSecs);
let cancellableExecution;
try {
cancellableExecution =
Expand All @@ -195,19 +198,22 @@ export default class SeaSessionBackend implements ISessionBackend {
} catch (err) {
throw this.logAndMapError('executeStatement', err);
}
return new SeaOperationBackend({
const op = new SeaOperationBackend({
cancellableExecution: cancellableExecution!,
context: this.context,
// The kernel honours `queryTimeoutSecs` on the sync `execute` path, so
// it is forwarded via the napi options (see `buildExecuteOptions`); the
// backend also keeps it as a deadline guard for parity with async.
queryTimeoutSecs,
});
// Option A (matches JDBC / ADBC C# / Python use_kernel): the default sync
// path drives the statement to terminal HERE, so by the time
// executeStatement resolves the side effect is committed and the result is
// materialised. This makes dependent statements + fire-and-forget DDL/DML
// "just work" and keeps `close()` a trivial non-blocking cleanup. Mid-run
// cancellation of a long query is done via `runAsync: true` (below).
// Errors are swallowed here and re-surfaced at first fetch/finished to
// preserve the existing error-timing contract.
await op.waitUntilReady().catch(() => undefined);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ [MAJOR] logical reviewer

Swallowed errors are not re-surfaced through finished() / status() as the comment claims. Line 211-212's comment says "Errors are swallowed here and re-surfaced at first fetch/finished to preserve the existing error-timing contract." However, this is only true for the fetch path. When waitUntilReady() fails (e.g. a SQL syntax error, kernel failure), it sets fetchHandlePromise to a rejected promise. On subsequent calls through finished()waitUntilReadyThroughBackend()backend.waitUntilReady()waitUntilReadyCancellable(), the early-return guard at SeaOperationBackend.ts:541 (if (this.fetchHandlePromise) { return; }) fires — it checks truthiness of the promise, not whether it resolved or rejected, so it returns successfully without re-throwing the error. This means: (1) finished() silently succeeds for a failed sync query, (2) status() reports Succeeded (line 389 uses the same truthiness check). The fetch path (fetchChunkgetResultSlicergetFetchHandle) does re-surface the error because it awaits the rejected promise, but callers using finished() to confirm DDL/DML completion — the exact fire-and-forget pattern this PR is designed to support — will miss the failure.

💡 Suggested Fix: In waitUntilReadyCancellable, instead of only checking whether fetchHandlePromise is set, await it so that a rejected promise re-throws. Alternatively, track a separate fetchError field and re-throw it in the early-return path. The status() method similarly needs to distinguish a resolved vs rejected fetchHandlePromise for the sync path (e.g. by storing a terminalState field set in the .then() / .catch() of getFetchHandle).

return op;
}

// Async path: do NOT forward `queryTimeoutSecs` (the kernel ignores it on
// submit — `wait_timeout=0s`); it is enforced client-side by the poll loop.
const execOptions = this.buildExecuteOptions(options);
let asyncStatement;
try {
asyncStatement =
Expand All @@ -217,16 +223,9 @@ export default class SeaSessionBackend implements ISessionBackend {
} catch (err) {
throw this.logAndMapError('executeStatement', err);
}
// `queryTimeout` is enforced client-side by the operation backend's poll
// loop: the kernel ignores `queryTimeoutSecs` on the async submit path
// (`submitStatement` always sends `wait_timeout=0s`), so we do NOT forward
// it to the napi options — passing it there would be a silent no-op.
return new SeaOperationBackend({
asyncStatement: asyncStatement!,
context: this.context,
// `queryTimeout` is typed `number | bigint | Int64`; `numberToInt64(...).toNumber()`
// coerces all three (a bare `Number(int64)` yields NaN — node-int64 has no valueOf).
queryTimeoutSecs,
});
}

Expand All @@ -235,10 +234,7 @@ export default class SeaSessionBackend implements ISessionBackend {
* `ExecuteOptions`, returning `undefined` when nothing is set so the
* no-options call shape (`executeStatement(sql)`) is preserved.
*/
private buildExecuteOptions(
options: ExecuteStatementOptions,
queryTimeoutSecs?: number,
): SeaNativeExecuteOptions | undefined {
private buildExecuteOptions(options: ExecuteStatementOptions): SeaNativeExecuteOptions | undefined {
// Positional (`?`) and named (`:name`) parameters are mutually exclusive —
// the kernel binds one placeholder style per statement. Use the SAME error
// type and message as the Thrift backend (`ThriftSessionBackend`) so a
Expand All @@ -256,14 +252,8 @@ export default class SeaSessionBackend implements ISessionBackend {
if (namedParams !== undefined) {
execOptions.namedParams = namedParams;
}
// `queryTimeoutSecs` is forwarded only on the SYNC path (the caller passes
// it in): the kernel `execute()` consults it as the server statement
// timeout. On the async submit path the caller omits it (the kernel ignores
// it under `wait_timeout=0s`), so it is enforced client-side by the
// operation backend's poll-loop deadline instead (see executeStatement).
if (queryTimeoutSecs !== undefined) {
execOptions.queryTimeoutSecs = queryTimeoutSecs;
}
// `queryTimeout` is intentionally NOT forwarded — it is a no-op on SEA (see
// the TODO in executeStatement). It must not become the SEA `wait_timeout`.
if (options.rowLimit !== undefined) {
execOptions.rowLimit = Number(options.rowLimit);
}
Expand Down
98 changes: 57 additions & 41 deletions tests/unit/sea/execution.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -591,44 +591,59 @@ describe('SeaSessionBackend', () => {
expect((thrown as Error).message).to.equal('Driver does not support both ordinal and named parameters.');
});

it('executeStatement (sync default) DOES forward queryTimeout to the napi options', async () => {
it('executeStatement does NOT forward queryTimeout — it is a no-op on SEA', async () => {
const connection = new FakeNativeConnection();
const session = makeSession(connection);
await session.executeStatement('SELECT 1', { queryTimeout: 30 });
// Sync path: the kernel `execute()` honours queryTimeoutSecs (server
// statement timeout), so the backend forwards it onto the napi options.
expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(30);
// queryTimeout is intentionally a NO-OP on SEA (see SeaSessionBackend): the
// kernel would otherwise map queryTimeoutSecs onto the SEA `wait_timeout` wire
// field (valid range {0} ∪ [5,50]s) — a different concept from a statement
// timeout, and out-of-range values fail HTTP 400. The proper mechanism is the
// `STATEMENT_TIMEOUT` session config (deferred). So it is neither forwarded
// nor allowed to synthesise an options object.
expect(connection.lastOptions).to.equal(undefined);
});

it('executeStatement (runAsync: true) does NOT forward queryTimeout to submit (kernel ignores it; enforced client-side)', async () => {
it('executeStatement (runAsync: true) does NOT forward queryTimeout (no-op on SEA)', async () => {
const connection = new FakeNativeConnection();
const session = makeSession(connection);
await session.executeStatement('SELECT 1', { queryTimeout: 30, runAsync: true });
// Async submit path: the kernel ignores queryTimeoutSecs under
// `wait_timeout=0s`, so it's enforced client-side by the poll deadline
// instead — never forwarded to the napi options.
expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(undefined);
});

it('coerces an Int64 queryTimeout into the client-side deadline on the async path (not NaN)', async function int64Timeout() {
// Regression: `Number(new Int64(...))` yields NaN (node-int64 has no valueOf),
// which would silently disable the deadline. The backend must coerce via
// numberToInt64(...).toNumber() so an Int64 queryTimeout still bounds the poll.
// Exercised on the async path, where the client-side poll deadline applies.
// eslint-disable-next-line no-invalid-this
this.timeout(5000);
it('accepts an Int64 queryTimeout without forwarding it or crashing (no-op on SEA)', async () => {
// queryTimeout is a no-op on SEA, but the option must still be ACCEPTED for
// any value type the public API allows (number | bigint | Int64) without
// throwing or synthesising napi options. Int64 is the awkward case
// (node-int64 has no valueOf), so assert it specifically.
const connection = new FakeNativeConnection();
connection.submitStatusValue = 'Running'; // never reaches a terminal state
const session = makeSession(connection);
const op = await session.executeStatement('SELECT 1', { queryTimeout: new Int64(1), runAsync: true });
let thrown: unknown;
try {
await op.waitUntilReady();
} catch (err) {
thrown = err;
}
expect(thrown, 'Int64(1) timeout must fire — NaN would poll forever').to.be.instanceOf(OperationStateError);
expect((thrown as OperationStateError).errorCode).to.equal(OperationStateErrorCode.Timeout);
await session.executeStatement('SELECT 1', { queryTimeout: new Int64(1) });
expect((connection.lastOptions as { queryTimeoutSecs?: number } | undefined)?.queryTimeoutSecs).to.equal(undefined);
expect(connection.lastOptions).to.equal(undefined);
});

it('Option A: sync executeStatement drives the statement to terminal before returning', async () => {
const connection = new FakeNativeConnection();
const session = makeSession(connection);
const op = await session.executeStatement('SELECT 1', {});
// The default sync path blocks to terminal in executeStatement (matching
// JDBC / ADBC C# / Python use_kernel), so the returned op is already finished
// — status reports Succeeded with no explicit waitUntilReady()/fetch. This is
// what makes fire-and-forget DDL/DML and dependent statements "just work".
const status = await op.status(false);
expect(status.state).to.equal(OperationState.Succeeded);
});

it('runAsync: true does NOT drive to terminal in executeStatement (returns a pending, cancellable handle)', async () => {
const connection = new FakeNativeConnection();
connection.submitStatusValue = 'Running';
const session = makeSession(connection);
const op = await session.executeStatement('SELECT 1', { runAsync: true });
// The async path returns immediately with a running handle so the caller can
// poll / cancel mid-run (the place to do mid-run cancellation under Option A).
const status = await op.status(false);
expect(status.state).to.equal(OperationState.Running);
});

it('executeStatement forwards rowLimit', async () => {
Expand Down Expand Up @@ -690,25 +705,26 @@ describe('SeaSessionBackend', () => {
}
});

// Genuinely unsupported on SEA — rejected (rather than silently ignored) so
// a caller/agent gets signal instead of a no-op. queryTags / queryTimeout /
// rowLimit are NOT here — they are forwarded (asserted above).
for (const { name, options, re } of [
{ name: 'useCloudFetch', options: { useCloudFetch: true }, re: /useCloudFetch/ },
{ name: 'useLZ4Compression', options: { useLZ4Compression: true }, re: /useLZ4Compression/ },
{ name: 'stagingAllowedLocalPath', options: { stagingAllowedLocalPath: '/tmp' }, re: /stagingAllowedLocalPath/ },
// Not per-statement knobs on the kernel-backed SEA path — ACCEPTED and IGNORED
// (no-op) rather than rejected, matching the Python connector's kernel backend
// (`KernelDatabricksClient.execute_command`, which takes the same flags and
// never reads them). A hard failure would break callers that set these
// globally; the kernel exposes no per-statement override for any of them
// (CloudFetch is the session-level ResultConfig, compression is decoded from
// the manifest, and volume operations have no kernel API yet).
for (const { name, options } of [
{ name: 'useCloudFetch', options: { useCloudFetch: true } },
{ name: 'useLZ4Compression', options: { useLZ4Compression: true } },
{ name: 'stagingAllowedLocalPath', options: { stagingAllowedLocalPath: '/tmp' } },
] as const) {
it(`executeStatement rejects ${name} rather than silently ignoring it`, async () => {
it(`executeStatement accepts and ignores ${name} (no throw, not forwarded)`, async () => {
const connection = new FakeNativeConnection();
const session = makeSession(connection);
let thrown: unknown;
try {
await session.executeStatement('SELECT 1', options);
} catch (err) {
thrown = err;
}
expect(thrown).to.be.instanceOf(HiveDriverError);
expect((thrown as Error).message).to.match(re);
// Must not throw...
await session.executeStatement('SELECT 1', options);
// ...and must not be forwarded to the napi options (it has no kernel knob),
// so a SELECT-1 with only this option still uses the no-options fast path.
expect(connection.lastOptions).to.equal(undefined);
});
}

Expand Down
Loading