Skip to content

Commit 75c9260

Browse files
fix(kernel): stop premature sync statement close (H4); bump KERNEL_REV to batch 2
Connector half of the kernel batch-2 fixes (kernel PR #123). Bumps KERNEL_REV to pick up the batch-2 kernel surface. H4 — don't close the kernel Statement at sync execute-return: execute_command's finally used to `stmt.close()` immediately after `stmt.execute()` succeeded. For a large CloudFetch result with paginated chunk links (all_fetched=false), the kernel fetches later links lazily (get_result_chunks against the LIVE statement) during consumption, so a premature CloseStatement broke those fetches. The kernel now auto-closes the server statement when its result stream drains (ExecutedStatement::next_batch end-of-stream), with the executed-handle Drop as the backstop for partial/abandoned reads. So the connector now flips close_stmt=False on a successful execute and only closes on the error path (no executed handle was produced). The other batch-2 fixes (cancelled-class -> OperationalError, U2M refresh fail-fast, metadata statement close-on-drop, per-binding OAuth client_id) are entirely kernel-side and need no connector code beyond the KERNEL_REV bump. Tests: unit (sync execute does-not-close on success / does-close on failure) + e2e (large multi-chunk result drains without premature close + cursor reuse; server cancel maps to OperationalError not ProgrammingError). All e2e verified live against dogfood. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent a2fe99f commit 75c9260

4 files changed

Lines changed: 143 additions & 6 deletions

File tree

KERNEL_REV

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
cbeaf44c66ebc57c5b3eed2a5c5d0290d4bf035b
1+
eb7da65552c8e7e10b192a7a2577910102e4dc6b

src/databricks/sql/backend/kernel/client.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -473,16 +473,29 @@ def execute_command(
473473
# Canceller is best-effort; never block execute on it.
474474
pass
475475
executed = stmt.execute()
476+
# Execute succeeded: the kernel now owns the statement
477+
# lifecycle. It auto-closes the server statement when the
478+
# result stream is fully drained (``ExecutedStatement::
479+
# next_batch`` end-of-stream), with the executed handle's
480+
# ``Drop`` as the backstop for partial/abandoned reads.
481+
# So we must NOT close ``stmt`` here: a premature
482+
# ``CloseStatement`` at execute-return broke lazy
483+
# CloudFetch chunk-link fetches (``get_result_chunks``
484+
# against the live statement) for large paginated-link
485+
# results — the H4 gap. Closing here is left ONLY for the
486+
# error path below, where no executed handle / result set
487+
# was produced to reap it.
488+
close_stmt = False
476489
except Exception as exc:
477490
raise _wrap_kernel_exception("execute_command", exc) from exc
478491
finally:
479492
with self._sync_cancellers_lock:
480493
self._sync_cancellers.pop(id(cursor), None)
481494
if close_stmt:
482-
# Sync path: ``Statement`` is a lifecycle owner separate
483-
# from the executed handle. Drop it here so the parent
484-
# doesn't outlive its caller. Swallow close errors —
485-
# they're not actionable.
495+
# Reached only when ``stmt.execute()`` did not succeed
496+
# (or async, which flipped the flag earlier): no executed
497+
# handle owns the statement, so close it here to avoid a
498+
# leak. Swallow close errors — not actionable.
486499
try:
487500
stmt.close()
488501
except Exception:

tests/e2e/test_kernel_backend.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424
import pytest
2525

2626
import databricks.sql as sql
27-
from databricks.sql.exc import DatabaseError, NotSupportedError, ServerOperationError
27+
from databricks.sql.exc import (
28+
DatabaseError,
29+
NotSupportedError,
30+
OperationalError,
31+
ServerOperationError,
32+
)
2833

2934
# Skip the whole module unless the kernel wheel is genuinely installed.
3035
# ``pytest.importorskip`` alone isn't enough: the kernel unit tests inject a
@@ -478,3 +483,54 @@ def cancel_after_delay():
478483
finally:
479484
t.join()
480485
cur.close()
486+
487+
488+
# ── Batch 2 ────────────────────────────────────────────────────────
489+
490+
491+
def test_large_result_drains_without_premature_close(conn):
492+
"""H4: a large multi-chunk result fully drains even though the
493+
connector no longer closes the statement at execute-return — the
494+
kernel auto-closes on drain. Guards the regression where a premature
495+
CloseStatement broke lazy CloudFetch chunk-link fetches."""
496+
n = 5_000_000
497+
with conn.cursor() as cur:
498+
cur.execute(f"SELECT id, cast(id AS string) s FROM range({n})")
499+
rows = cur.fetchall()
500+
assert len(rows) == n
501+
# Cursor is reusable after the auto-close fired on the prior result.
502+
cur.execute("SELECT 42 AS n")
503+
assert cur.fetchall()[0][0] == 42
504+
505+
506+
def test_server_cancel_maps_to_operational_error(conn):
507+
"""A server-side cancel surfaces as OperationalError (cancelled
508+
class), not ProgrammingError. We trigger it via a cross-thread
509+
cancel of a running query; the raised exception must be in the
510+
OperationalError family, not ProgrammingError."""
511+
import threading
512+
import time
513+
514+
from databricks.sql.exc import ProgrammingError
515+
516+
cur = conn.cursor()
517+
518+
def cancel_after_delay():
519+
time.sleep(15.0)
520+
cur.cancel()
521+
522+
t = threading.Thread(target=cancel_after_delay)
523+
t.start()
524+
try:
525+
with pytest.raises(Exception) as exc_info:
526+
cur.execute(
527+
"SELECT count(*) FROM range(0, 1000000000000) "
528+
"WHERE pow(rand(), 2) < 0.5 AND sqrt(id) > 1"
529+
)
530+
# The cancellation must not masquerade as a caller-argument
531+
# (ProgrammingError) error. It should be operational.
532+
assert not isinstance(exc_info.value, ProgrammingError)
533+
assert isinstance(exc_info.value, (OperationalError, DatabaseError))
534+
finally:
535+
t.join()
536+
cur.close()

tests/unit/test_kernel_client.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,74 @@ def fake_execute():
572572
assert id(cursor) not in c._sync_cancellers
573573

574574

575+
def test_sync_execute_does_not_close_statement_on_success():
576+
"""H4: on a successful sync execute(), the connector must NOT close
577+
the parent kernel Statement — the kernel now auto-closes the server
578+
statement when the result stream drains (with the executed handle's
579+
Drop as backstop). A premature close() here broke lazy CloudFetch
580+
chunk-link fetches for large paginated-link results."""
581+
c = _make_client()
582+
c._kernel_session = MagicMock()
583+
cursor = MagicMock()
584+
cursor.arraysize = 100
585+
cursor.buffer_size_bytes = 1024
586+
587+
stmt = MagicMock()
588+
stmt.execute.return_value = MagicMock(
589+
statement_id="stmt-id",
590+
arrow_schema=MagicMock(return_value=pa.schema([("x", pa.int64())])),
591+
)
592+
c._kernel_session.statement.return_value = stmt
593+
594+
c.execute_command(
595+
operation="SELECT 1",
596+
session_id=MagicMock(),
597+
max_rows=1,
598+
max_bytes=1,
599+
lz4_compression=False,
600+
cursor=cursor,
601+
use_cloud_fetch=False,
602+
parameters=[],
603+
async_op=False,
604+
enforce_embedded_schema_correctness=False,
605+
)
606+
607+
# The kernel owns the statement lifecycle post-execute; connector
608+
# leaves it alone (kernel auto-close-on-drain + Drop backstop).
609+
stmt.close.assert_not_called()
610+
611+
612+
def test_sync_execute_closes_statement_on_failure():
613+
"""On the error path (execute raised, no executed handle / result
614+
set produced), the connector still closes the parent Statement so
615+
it isn't leaked."""
616+
c = _make_client()
617+
c._kernel_session = MagicMock()
618+
cursor = MagicMock()
619+
cursor.arraysize = 100
620+
cursor.buffer_size_bytes = 1024
621+
622+
stmt = MagicMock()
623+
stmt.execute.side_effect = RuntimeError("boom")
624+
c._kernel_session.statement.return_value = stmt
625+
626+
with pytest.raises(Exception):
627+
c.execute_command(
628+
operation="SELECT 1",
629+
session_id=MagicMock(),
630+
max_rows=1,
631+
max_bytes=1,
632+
lz4_compression=False,
633+
cursor=cursor,
634+
use_cloud_fetch=False,
635+
parameters=[],
636+
async_op=False,
637+
enforce_embedded_schema_correctness=False,
638+
)
639+
640+
stmt.close.assert_called_once_with()
641+
642+
575643
def test_get_columns_accepts_none_catalog():
576644
"""The kernel's `list_columns` honours `catalog=None` by issuing
577645
`SHOW COLUMNS IN ALL CATALOGS` server-side. The connector should

0 commit comments

Comments
 (0)