Skip to content

Commit 4ba2175

Browse files
authored
[SEA-NodeJS] (2/3) SEA execution + result fetching (#410)
* feat(sea): SEA execution + result fetching [2/3] Second of three stacked PRs (base: [1/3] connect + auth). Wires the statement-execution + result-read path: - SeaSessionBackend.executeStatement: real implementation — runs SQL via the napi Connection and returns a SeaOperationBackend (replaces [1/3]'s stub). - SeaOperationBackend: fetch pipeline (napi Statement.fetchNextBatch → SeaResultsProvider → ArrowResultConverter → ResultSlicer) plus operation cancel/close/finished via the SeaOperationLifecycle helpers. - SeaResultsProvider / SeaArrowIpc / SeaArrowIpcDurationFix: Arrow IPC decode for inline + CloudFetch result batches (the duration-fix pre-processor rewrites Arrow Duration → Int64 so apache-arrow@13 can read it). - ArrowResultConverter: constructor now takes the neutral { schema? } shape so both the Thrift and SEA backends construct it without an adapter. - flatbuffers pinned to 23.5.26 to match apache-arrow@13's nested copy. Tests: executeStatement + openSession forwarding, M0 datatype round-trip through the shared converter (primitives + ARRAY/MAP/STRUCT), multi-batch streaming, and the neutral-metadata converter contract. Full INTERVAL-type value parity + exhaustive operation-lifecycle coverage land in [3/3]. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> * fix(sea): address #410 review — fetch cleanup, cooperative cancel, parity, docs Validated each finding against a live pecotesting warehouse first; the headline INTERVAL story turned out to be split-artifact, not breakage. - F7: getResultMetadata stored the *unpatched* Duration IPC bytes in meta.arrowSchema while advertising ArrowBased — store the patched bytes so an ArrowBased consumer doesn't hit `Unrecognized type "Duration" (18)`. - F3: fetchChunk now honors the `isClosed` cooperative-cancel probe (parity with ThriftOperationBackend) at its yield points. - F6: on a fetch error, best-effort close the statement (napi contract: stream is unspecified after Err) and surface a typed kernel error via decodeNapiKernelError. - F9: cancel-after-fetch now throws the canonical OperationStateError(Canceled) ("The operation was canceled by a client") — byte-matches the Thrift message. - F10: typed HiveDriverError (not raw Error) in the schema/fetchNextBatch guards. - F1: corrected SeaArrowIpcDurationFix docs — on this layer the rewriter only makes Duration *decodable* (raw Int64); the duration_unit formatter lands in #411 (verified live: byte-identical to Thrift). - F5: documented that nested Duration is a SHARED apache-arrow@13 limitation — verified the Thrift backend throws the identical error, so SEA matches parity. - F2: added a live e2e that drives a real Arrow Duration column through the rewriter (asserts no "Duration (18)" crash + raw-Int64 on this layer). - F8: pinned the no-`Failed` invariant in status() (failures reject at submit). - F12: renamed SeaResultsProvider's SeaStatementHandle → SeaFetchHandle (was a name collision with the lifecycle interface of a different shape). - F13: dropped the no-op await on the synchronous statement.schema(). - F14: fixed the Float-precision comment (Precision enum, not bit-width). - F15: SeaResultsProvider.prime loops instead of self-recursing on empty batches. Deferred (noted on the PR): F4 (per-batch triple-decode perf) and F11 (hasResultSet() hard-coded true for M0). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> * fix(sea): address #410 review — error fidelity, fetch perf, validation, tests Addresses the 8 review threads on PR #410. Validated against a live warehouse (results-e2e parity gate + interval-duration-e2e + execution-e2e all pass) plus new/updated unit tests. - SeaOperationLifecycle: rethrowKernelError now delegates to the canonical decodeNapiKernelError, so cancel/close errors get the same fidelity as fetch errors — the sqlState remap (envelope field is `sqlState`, the old code read `sqlstate` and dropped it), the kernelMetadata namespace, and the strict `startsWith` sentinel match (was a loose `indexOf >= 0`). - SeaArrowIpc: replace decodeIpcBatch (full RecordBatchReader materialization just to sum row counts) with countRowsInIpc, which reads RecordBatch header `length` via MessageReader and skips bodies — no vector decode. Removes ~2x Arrow decode CPU + transient allocation on the fetch hot path (the converter still re-decodes for values). SeaResultsProvider switched to it. - SeaArrowIpc: hermetic unit tests (tests/unit/sea/SeaArrowIpc.test.ts) for the framing walk, no-op/garbage rewrite paths, the row-count path, and the empty-schema guard. (The Duration-positive rewrite stays covered by the live e2e — apache-arrow@13 can't construct a Duration column hermetically.) - SeaOperationBackend: on a fetch-error cleanup close() that also fails, log the failure at warn (statement may leak) instead of fully swallowing it — the original fetch error is still surfaced. - SeaSessionBackend: reject queryTags / useLZ4Compression / stagingAllowedLocalPath with M0-style errors instead of silently ignoring them (+ unit tests). Silent no-ops are the worst failure mode for callers. - SeaArrowIpc: throw a typed HiveDriverError (not a raw TypeError) when an IPC payload carries no schema. - SeaArrowIpcDurationFix.readMessageAt: fail-closed guards for negative metadataLength / bodyLength (was relying on subarray clamping). - Fix stale `tests/integration/sea/...` doc refs → `tests/e2e/sea/`. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> * test(telemetry): de-flake FeatureFlagCache placeholder test (network seam) The "fetchFeatureFlag should return false as placeholder implementation" test called the real fetchFeatureFlag, which makes an HTTP call via fetchWithRetry (10s timeout) to a bogus host. Under mocha's 2s default it only passed when the DNS failure happened to resolve quickly — flaky across runners and Node versions (it timed out on Node 14/16/18 in CI, fail-fast-canceling the rest of the matrix; Node 20 passed). Stub the fetchWithRetry network seam so the test deterministically exercises the behavior under test (fetchFeatureFlag resolves to false) with no real network call. No production code change. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com> --------- Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 623a140 commit 4ba2175

18 files changed

Lines changed: 2891 additions & 22 deletions

lib/result/ArrowResultConverter.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import {
1313
RecordBatchReader,
1414
util as arrowUtils,
1515
} from 'apache-arrow';
16-
import { TGetResultSetMetadataResp, TColumnDesc } from '../../thrift/TCLIService_types';
16+
import { TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types';
1717
import IClientContext from '../contracts/IClientContext';
1818
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
1919
import { ArrowBatch, getSchemaColumns, convertThriftValue } from './utils';
@@ -42,7 +42,12 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
4242
// actually return a non-empty result
4343
private prefetchedRecordBatch?: RecordBatch<TypeMap>;
4444

45-
constructor(context: IClientContext, source: IResultsProvider<ArrowBatch>, { schema }: TGetResultSetMetadataResp) {
45+
// Only the column `schema` is consumed here. Typed as the minimal shape
46+
// (not the full Thrift `TGetResultSetMetadataResp`) so both the Thrift
47+
// operation backend and the SEA backend's neutral `ResultMetadata` —
48+
// which both carry `schema?: TTableSchema` — can construct the converter
49+
// without an adapter at the call site.
50+
constructor(context: IClientContext, source: IResultsProvider<ArrowBatch>, { schema }: { schema?: TTableSchema }) {
4651
this.context = context;
4752
this.source = source;
4853
this.schema = getSchemaColumns(schema);

lib/sea/SeaArrowIpc.ts

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
// Copyright (c) 2026 Databricks, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import { RecordBatchReader, MessageReader, MessageHeader, Schema, Field, DataType, TypeMap } from 'apache-arrow';
16+
import { TTableSchema, TTypeId, TPrimitiveTypeEntry } from '../../thrift/TCLIService_types';
17+
import { rewriteDurationToInt64, DURATION_UNIT_METADATA_KEY } from './SeaArrowIpcDurationFix';
18+
import HiveDriverError from '../errors/HiveDriverError';
19+
20+
/**
21+
* Field metadata key used by the kernel to attach the original Databricks
22+
* SQL type name to each Arrow field. See `databricks-sql-kernel/src/reader/mod.rs`.
23+
*/
24+
const DATABRICKS_TYPE_NAME = 'databricks.type_name';
25+
26+
/**
27+
* Sum the row counts of every RecordBatch message in an Arrow IPC
28+
* stream, WITHOUT materializing the Arrow vector tree.
29+
*
30+
* Why this exists: `ArrowResultConverter` consumes `ArrowBatch` objects
31+
* that carry an explicit `rowCount`, but the kernel's IPC payload only
32+
* carries per-RecordBatch `length` (no separate total). `SeaResultsProvider`
33+
* needs that count to build the `ArrowBatch` it hands to the converter —
34+
* which then re-decodes the same bytes for the actual values.
35+
*
36+
* The previous implementation used `RecordBatchReader` and iterated the
37+
* batches, which calls `_loadVectors` and materializes the full vector
38+
* tree for every batch just to read `numRows` and discard everything
39+
* else — ~2x Arrow decode CPU + transient allocation on the fetch hot
40+
* path. `MessageReader` instead reads only each message's FlatBuffer
41+
* metadata header (where `RecordBatch.length` lives) and skips the body
42+
* bytes, so no vectors are decoded here. The converter's later re-decode
43+
* is the only real materialization.
44+
*
45+
* `ipcBytes` must already be Duration-patched (row count is unaffected
46+
* by the Duration→Int64 rewrite, and the framing is unchanged). Returns
47+
* 0 for an empty / schema-only stream.
48+
*/
49+
export function countRowsInIpc(ipcBytes: Buffer): number {
50+
const reader = new MessageReader(ipcBytes);
51+
let rowCount = 0;
52+
for (const message of reader) {
53+
if (message.headerType === MessageHeader.RecordBatch) {
54+
// header() for a RecordBatch message carries `length` (the row
55+
// count) in the FlatBuffer metadata — no body decode needed.
56+
rowCount += Number((message.header() as { length: number | bigint }).length);
57+
}
58+
// Advance past the (undecoded) body so the next message reads at the
59+
// correct offset. readMessageBody returns a view; it does not decode
60+
// the body into Arrow vectors.
61+
const bodyLength = Number(message.bodyLength);
62+
if (bodyLength > 0) {
63+
reader.readMessageBody(bodyLength);
64+
}
65+
}
66+
return rowCount;
67+
}
68+
69+
/**
70+
* Decode an Arrow IPC schema payload (no record batches) into the
71+
* apache-arrow Schema object.
72+
*/
73+
export function decodeIpcSchema(ipcBytes: Buffer): Schema<TypeMap> {
74+
const patched = rewriteDurationToInt64(ipcBytes);
75+
const reader = RecordBatchReader.from<TypeMap>(patched);
76+
reader.open();
77+
// `RecordBatchReader.from(emptyBuffer).open()` does not throw — it
78+
// leaves `schema` undefined. Without this guard a downstream
79+
// `arrowSchemaToThriftSchema(undefined)` would hit `undefined.fields`
80+
// and surface a raw TypeError instead of a typed driver error. The
81+
// real kernel always materialises a schema, so this is defensive.
82+
if (!reader.schema) {
83+
throw new HiveDriverError('SEA result: Arrow IPC stream carried no schema (empty or truncated payload)');
84+
}
85+
return reader.schema;
86+
}
87+
88+
/**
89+
* Pre-process raw IPC bytes from the kernel so they're consumable by
90+
* `apache-arrow@13`. The current transformation is `Duration → Int64`
91+
* with the original duration unit preserved in field metadata (see
92+
* `SeaArrowIpcDurationFix.ts`). Returned bytes are byte-identical to
93+
* the input when no transformation is needed.
94+
*
95+
* Exposed so callers can pre-patch the buffer **once** and pass the
96+
* result through both `decodeIpcBatch` (for row-count extraction in
97+
* `SeaResultsProvider`) and `ArrowResultConverter.fetchNext` (which
98+
* re-decodes the same bytes via `RecordBatchReader.from`). Without
99+
* this, the converter would re-throw on `Duration` because it never
100+
* sees the patched bytes.
101+
*/
102+
export function patchIpcBytes(ipcBytes: Buffer): Buffer {
103+
return rewriteDurationToInt64(ipcBytes);
104+
}
105+
106+
/**
107+
* Map an Arrow `DataType` (with optional `databricks.type_name`
108+
* metadata) onto the closest Thrift `TTypeId`.
109+
*
110+
* This is the synthesis step that lets the existing
111+
* `ArrowResultConverter` Phase-2 dispatch (`convertThriftValue` in
112+
* `lib/result/utils.ts:61-98`) keep working unchanged for the SEA
113+
* path. Phase-2 keys exclusively off `TPrimitiveTypeEntry.type` per
114+
* column, so we synthesize a `TColumnDesc` whose `TTypeId` matches the
115+
* server-emitted Arrow type as closely as possible.
116+
*
117+
* Resolution order:
118+
* 1. The kernel attaches `databricks.type_name` (e.g. "DECIMAL",
119+
* "INTERVAL", "STRUCT") to each field's metadata. Prefer that when
120+
* present — it carries the original SQL semantic that the Arrow
121+
* type alone can lose (e.g. INTERVAL → Utf8 with metadata).
122+
* 2. Fall back to the Arrow `DataType.typeId` for primitive types.
123+
*
124+
* This matches the JDBC and Python drivers' policy of trusting the
125+
* server's logical type assignment over the wire-level Arrow encoding.
126+
*/
127+
function arrowTypeToTTypeId(field: Field<DataType>): TTypeId {
128+
const typeName = field.metadata.get(DATABRICKS_TYPE_NAME)?.toUpperCase();
129+
130+
switch (typeName) {
131+
case 'BOOLEAN':
132+
return TTypeId.BOOLEAN_TYPE;
133+
case 'TINYINT':
134+
case 'BYTE':
135+
return TTypeId.TINYINT_TYPE;
136+
case 'SMALLINT':
137+
case 'SHORT':
138+
return TTypeId.SMALLINT_TYPE;
139+
case 'INT':
140+
case 'INTEGER':
141+
return TTypeId.INT_TYPE;
142+
case 'BIGINT':
143+
case 'LONG':
144+
return TTypeId.BIGINT_TYPE;
145+
case 'FLOAT':
146+
case 'REAL':
147+
return TTypeId.FLOAT_TYPE;
148+
case 'DOUBLE':
149+
return TTypeId.DOUBLE_TYPE;
150+
case 'STRING':
151+
return TTypeId.STRING_TYPE;
152+
case 'VARCHAR':
153+
return TTypeId.VARCHAR_TYPE;
154+
case 'CHAR':
155+
return TTypeId.CHAR_TYPE;
156+
case 'BINARY':
157+
return TTypeId.BINARY_TYPE;
158+
case 'DATE':
159+
return TTypeId.DATE_TYPE;
160+
case 'TIMESTAMP':
161+
case 'TIMESTAMP_NTZ':
162+
return TTypeId.TIMESTAMP_TYPE;
163+
case 'DECIMAL':
164+
return TTypeId.DECIMAL_TYPE;
165+
case 'INTERVAL':
166+
case 'INTERVAL DAY':
167+
case 'INTERVAL DAY TO HOUR':
168+
case 'INTERVAL DAY TO MINUTE':
169+
case 'INTERVAL DAY TO SECOND':
170+
case 'INTERVAL HOUR':
171+
case 'INTERVAL HOUR TO MINUTE':
172+
case 'INTERVAL HOUR TO SECOND':
173+
case 'INTERVAL MINUTE':
174+
case 'INTERVAL MINUTE TO SECOND':
175+
case 'INTERVAL SECOND':
176+
return TTypeId.INTERVAL_DAY_TIME_TYPE;
177+
case 'INTERVAL YEAR':
178+
case 'INTERVAL YEAR TO MONTH':
179+
case 'INTERVAL MONTH':
180+
return TTypeId.INTERVAL_YEAR_MONTH_TYPE;
181+
case 'ARRAY':
182+
return TTypeId.ARRAY_TYPE;
183+
case 'MAP':
184+
return TTypeId.MAP_TYPE;
185+
case 'STRUCT':
186+
return TTypeId.STRUCT_TYPE;
187+
case 'NULL':
188+
case 'VOID':
189+
return TTypeId.NULL_TYPE;
190+
default:
191+
break;
192+
}
193+
194+
// Fall back to Arrow's own type id when no databricks metadata is set
195+
// (e.g. unit tests constructing batches without metadata).
196+
const arrowType = field.type;
197+
if (DataType.isBool(arrowType)) return TTypeId.BOOLEAN_TYPE;
198+
if (DataType.isInt(arrowType)) {
199+
// Duration columns are rewritten to Int64 with a
200+
// `databricks.arrow.duration_unit` metadata marker (see
201+
// `SeaArrowIpcDurationFix.ts`). Surface them as INTERVAL_DAY_TIME
202+
// so the converter formats them back into the thrift string form.
203+
if (arrowType.bitWidth === 64 && field.metadata.has(DURATION_UNIT_METADATA_KEY)) {
204+
return TTypeId.INTERVAL_DAY_TIME_TYPE;
205+
}
206+
switch (arrowType.bitWidth) {
207+
case 8:
208+
return TTypeId.TINYINT_TYPE;
209+
case 16:
210+
return TTypeId.SMALLINT_TYPE;
211+
case 32:
212+
return TTypeId.INT_TYPE;
213+
case 64:
214+
return TTypeId.BIGINT_TYPE;
215+
default:
216+
return TTypeId.BIGINT_TYPE;
217+
}
218+
}
219+
if (DataType.isFloat(arrowType)) {
220+
// `precision` is the Arrow `Precision` enum (HALF=0, SINGLE=1, DOUBLE=2),
221+
// NOT a bit-width — `=== 2` is DOUBLE. Everything else (HALF/SINGLE) maps
222+
// to the thrift FLOAT type.
223+
return arrowType.precision === 2 ? TTypeId.DOUBLE_TYPE : TTypeId.FLOAT_TYPE;
224+
}
225+
if (DataType.isDecimal(arrowType)) return TTypeId.DECIMAL_TYPE;
226+
if (DataType.isUtf8(arrowType)) return TTypeId.STRING_TYPE;
227+
if (DataType.isBinary(arrowType)) return TTypeId.BINARY_TYPE;
228+
if (DataType.isDate(arrowType)) return TTypeId.DATE_TYPE;
229+
if (DataType.isTimestamp(arrowType)) return TTypeId.TIMESTAMP_TYPE;
230+
// Native Arrow Interval types. The server-side INTERVAL YEAR-MONTH
231+
// (and the legacy IntervalDayTime variant) come through with type
232+
// id 11 / -25 / -26 — apache-arrow@13 surfaces them as `Int32Array`
233+
// pairs which the converter formats to thrift's `"Y-M"` / day-time
234+
// strings.
235+
if (DataType.isInterval(arrowType)) {
236+
// unit 0 = YEAR_MONTH, unit 1 = DAY_TIME, unit 2 = MONTH_DAY_NANO
237+
return arrowType.unit === 0 ? TTypeId.INTERVAL_YEAR_MONTH_TYPE : TTypeId.INTERVAL_DAY_TIME_TYPE;
238+
}
239+
if (DataType.isList(arrowType)) return TTypeId.ARRAY_TYPE;
240+
if (DataType.isMap(arrowType)) return TTypeId.MAP_TYPE;
241+
if (DataType.isStruct(arrowType)) return TTypeId.STRUCT_TYPE;
242+
if (DataType.isNull(arrowType)) return TTypeId.NULL_TYPE;
243+
244+
return TTypeId.STRING_TYPE;
245+
}
246+
247+
/**
248+
* Synthesize a Thrift `TTableSchema` from an Arrow schema decoded out
249+
* of the kernel's IPC stream. Used by `SeaOperationBackend.getResultMetadata`
250+
* to drive `ArrowResultConverter.convertThriftTypes` (Phase 2) without
251+
* changing that code.
252+
*/
253+
export function arrowSchemaToThriftSchema(arrowSchema: Schema<TypeMap>): TTableSchema {
254+
const columns = arrowSchema.fields.map((field, index) => {
255+
const primitiveEntry: TPrimitiveTypeEntry = {
256+
type: arrowTypeToTTypeId(field),
257+
};
258+
return {
259+
columnName: field.name,
260+
typeDesc: {
261+
types: [
262+
{
263+
primitiveEntry,
264+
},
265+
],
266+
},
267+
position: index + 1,
268+
};
269+
});
270+
return { columns };
271+
}

0 commit comments

Comments
 (0)