|
71 | 71 | # per-request skip-and-warn. |
72 | 72 | _KERNEL_MANAGED_HEADERS = frozenset({"authorization", "x-databricks-org-id"}) |
73 | 73 |
|
| 74 | +# Leading verbs of SQL volume/staging statements. Detected by the |
| 75 | +# leading token (case-insensitive) so the kernel backend can fail loud |
| 76 | +# on staging ops it can't service — see ``execute_command``. |
| 77 | +_STAGING_VERBS = ("PUT", "GET", "REMOVE") |
| 78 | + |
| 79 | + |
| 80 | +def _strip_leading_sql_comments(sql: str) -> str: |
| 81 | + """Strip leading whitespace and SQL comments (``-- …`` line and |
| 82 | + ``/* … */`` block, possibly several) from ``sql``, returning the |
| 83 | + remainder. |
| 84 | +
|
| 85 | + Needed so staging detection sees the real leading verb: a |
| 86 | + comment-prefixed staging op (``-- upload\\nPUT …`` or |
| 87 | + ``/* c */ PUT …``, common in ETL scripts) must still be classified |
| 88 | + as staging, or it would slip past the guard into the silent-no-op |
| 89 | + bug. Block comments do not nest in Databricks SQL, so a simple |
| 90 | + scan-to-``*/`` is correct. |
| 91 | + """ |
| 92 | + i = 0 |
| 93 | + n = len(sql) |
| 94 | + while i < n: |
| 95 | + if sql[i].isspace(): |
| 96 | + i += 1 |
| 97 | + elif sql.startswith("--", i): |
| 98 | + # Line comment: skip to end of line (or string). |
| 99 | + nl = sql.find("\n", i) |
| 100 | + i = n if nl == -1 else nl + 1 |
| 101 | + elif sql.startswith("/*", i): |
| 102 | + # Block comment: skip to closing */ (or end if unterminated). |
| 103 | + close = sql.find("*/", i + 2) |
| 104 | + i = n if close == -1 else close + 2 |
| 105 | + else: |
| 106 | + break |
| 107 | + return sql[i:] |
| 108 | + |
| 109 | + |
| 110 | +def _is_staging_statement(operation: str) -> bool: |
| 111 | + """True iff ``operation`` is a volume/staging statement (PUT / GET / |
| 112 | + REMOVE). |
| 113 | +
|
| 114 | + Strips leading whitespace + SQL comments first (so a comment- |
| 115 | + prefixed staging op is still caught), then matches the leading token |
| 116 | + only — so a normal query that merely *contains* the word (e.g. |
| 117 | + ``SELECT 'GET' AS x``) isn't misflagged. |
| 118 | + """ |
| 119 | + stripped = _strip_leading_sql_comments(operation) |
| 120 | + # First whitespace-delimited token, uppercased. |
| 121 | + verb = stripped.split(None, 1)[0].upper() if stripped.strip() else "" |
| 122 | + return verb in _STAGING_VERBS |
| 123 | + |
74 | 124 |
|
75 | 125 | # ─── Client ───────────────────────────────────────────────────────────────── |
76 | 126 |
|
@@ -172,6 +222,17 @@ def __init__( |
172 | 222 | # path. Same lock as ``_async_handles``. |
173 | 223 | self._closed_commands: Set[str] = set() |
174 | 224 | self._async_handles_lock = threading.RLock() |
| 225 | + # Sync-execute cancellers keyed by ``id(cursor)``. A blocking |
| 226 | + # ``execute()`` sets ``cursor.active_command_id`` only AFTER it |
| 227 | + # returns, so a concurrent ``cursor.cancel()`` (the documented |
| 228 | + # cross-thread PEP-249 shape) has no command id to target while |
| 229 | + # the query runs. We register a detached kernel |
| 230 | + # ``StatementCanceller`` here just before the blocking call and |
| 231 | + # drop it after; ``cancel_running_cursor`` (invoked by |
| 232 | + # ``Cursor.cancel`` when there's no command id yet) fires it. |
| 233 | + # Guarded by its own lock — cancel can race execute teardown. |
| 234 | + self._sync_cancellers: Dict[int, Any] = {} |
| 235 | + self._sync_cancellers_lock = threading.RLock() |
175 | 236 |
|
176 | 237 | # ── Session lifecycle ────────────────────────────────────────── |
177 | 238 |
|
@@ -354,6 +415,24 @@ def execute_command( |
354 | 415 | # ``_async_statements`` and closed by ``close_command``); the sync |
355 | 416 | # path drops it in finally. ``close_stmt`` is the post-success |
356 | 417 | # decision flag — it stays True on sync, flips to False on async. |
| 418 | + # Volume/staging (PUT/GET/REMOVE) is not supported on the kernel |
| 419 | + # path: the kernel returns the staging control row as a normal |
| 420 | + # result set (``KernelResultSet.is_staging_operation`` is always |
| 421 | + # False), so the connector's ``_handle_staging_operation`` never |
| 422 | + # fires and NO file is transferred. Rather than silently no-op |
| 423 | + # (the Thrift path performs the presigned-URL upload/download), |
| 424 | + # fail loud at the call site so ETL scripts don't ingest |
| 425 | + # stale/missing data. Detected by the leading SQL verb — the |
| 426 | + # only signal available pre-execute, since the kernel exposes no |
| 427 | + # staging marker today. |
| 428 | + if _is_staging_statement(operation): |
| 429 | + raise NotSupportedError( |
| 430 | + "Volume / staging operations (PUT / GET / REMOVE) are not " |
| 431 | + "supported on the kernel backend (use_kernel=True); the file " |
| 432 | + "transfer would silently not happen. Use the Thrift backend " |
| 433 | + "for staging operations." |
| 434 | + ) |
| 435 | + |
357 | 436 | close_stmt = True |
358 | 437 | try: |
359 | 438 | try: |
@@ -382,10 +461,23 @@ def execute_command( |
382 | 461 | self._async_statements[command_id.guid] = stmt |
383 | 462 | close_stmt = False |
384 | 463 | return None |
| 464 | + # Register a detached canceller BEFORE the blocking |
| 465 | + # execute so a concurrent ``cursor.cancel()`` can reach |
| 466 | + # the running statement (its server id is populated mid- |
| 467 | + # execute). Keyed by ``id(cursor)`` since no command id |
| 468 | + # exists yet. Dropped in the finally. |
| 469 | + try: |
| 470 | + with self._sync_cancellers_lock: |
| 471 | + self._sync_cancellers[id(cursor)] = stmt.canceller() |
| 472 | + except Exception: |
| 473 | + # Canceller is best-effort; never block execute on it. |
| 474 | + pass |
385 | 475 | executed = stmt.execute() |
386 | 476 | except Exception as exc: |
387 | 477 | raise _wrap_kernel_exception("execute_command", exc) from exc |
388 | 478 | finally: |
| 479 | + with self._sync_cancellers_lock: |
| 480 | + self._sync_cancellers.pop(id(cursor), None) |
389 | 481 | if close_stmt: |
390 | 482 | # Sync path: ``Statement`` is a lifecycle owner separate |
391 | 483 | # from the executed handle. Drop it here so the parent |
@@ -422,6 +514,46 @@ def cancel_command(self, command_id: CommandId) -> None: |
422 | 514 | except Exception as exc: |
423 | 515 | raise _wrap_kernel_exception("cancel_command", exc) from exc |
424 | 516 |
|
| 517 | + def cancel_running_cursor(self, cursor: "Cursor") -> bool: |
| 518 | + """Cancel an in-flight SYNC ``execute()`` on ``cursor``. |
| 519 | +
|
| 520 | + Invoked by ``Cursor.cancel()`` when ``active_command_id`` is |
| 521 | + still ``None`` — i.e. a blocking ``execute()`` hasn't returned, |
| 522 | + so the command id isn't set yet but the server statement may be |
| 523 | + running. Fires the detached ``StatementCanceller`` registered in |
| 524 | + ``execute_command`` before the blocking call. |
| 525 | +
|
| 526 | + Returns ``True`` if a canceller was found and fired (the |
| 527 | + statement was in flight), ``False`` otherwise so ``Cursor`` can |
| 528 | + emit its "no executing command" warning. Safe to call from |
| 529 | + another thread. |
| 530 | +
|
| 531 | + Tolerant by design: ``cursor.cancel()`` is a best-effort |
| 532 | + PEP-249 method (callers don't expect it to raise), so a cancel |
| 533 | + failure is logged and swallowed rather than propagated. This |
| 534 | + also covers the early-cancel window — a cancel arriving before |
| 535 | + the kernel has observed the server statement id is a no-op in |
| 536 | + the kernel canceller, but if it ever raised (e.g. a transport |
| 537 | + hiccup on the cancel RPC) we must not surface that out of |
| 538 | + ``cancel()``. We still return ``True`` (a canceller was present |
| 539 | + and we attempted it) so ``Cursor`` doesn't emit the misleading |
| 540 | + "no executing command" warning. |
| 541 | + """ |
| 542 | + with self._sync_cancellers_lock: |
| 543 | + canceller = self._sync_cancellers.get(id(cursor)) |
| 544 | + if canceller is None: |
| 545 | + return False |
| 546 | + try: |
| 547 | + canceller.cancel() |
| 548 | + except Exception: |
| 549 | + logger.warning( |
| 550 | + "cancel_running_cursor: best-effort cancel of in-flight " |
| 551 | + "sync statement failed; swallowing (cursor.cancel() is " |
| 552 | + "tolerant by PEP-249 contract)", |
| 553 | + exc_info=True, |
| 554 | + ) |
| 555 | + return True |
| 556 | + |
425 | 557 | def close_command(self, command_id: CommandId) -> None: |
426 | 558 | with self._async_handles_lock: |
427 | 559 | handle = self._async_handles.pop(command_id.guid, None) |
|
0 commit comments