|
71 | 71 | # per-request skip-and-warn. |
72 | 72 | _KERNEL_MANAGED_HEADERS = frozenset({"authorization", "x-databricks-org-id"}) |
73 | 73 |
|
| 74 | +# Leading verbs of SQL volume/staging statements. Detected by the |
| 75 | +# leading token (case-insensitive) so the kernel backend can fail loud |
| 76 | +# on staging ops it can't service — see ``execute_command``. |
| 77 | +_STAGING_VERBS = ("PUT", "GET", "REMOVE") |
| 78 | + |
| 79 | + |
| 80 | +def _is_staging_statement(operation: str) -> bool: |
| 81 | + """True iff ``operation`` is a volume/staging statement (PUT / GET / |
| 82 | + REMOVE). |
| 83 | +
|
| 84 | + Matches the leading token only, so a normal query that merely |
| 85 | + *contains* the word (e.g. ``SELECT 'GET' AS x``) isn't misflagged. |
| 86 | + """ |
| 87 | + stripped = operation.lstrip() |
| 88 | + # First whitespace-delimited token, uppercased. |
| 89 | + verb = stripped.split(None, 1)[0].upper() if stripped else "" |
| 90 | + return verb in _STAGING_VERBS |
| 91 | + |
74 | 92 |
|
75 | 93 | # ─── Client ───────────────────────────────────────────────────────────────── |
76 | 94 |
|
@@ -172,6 +190,17 @@ def __init__( |
172 | 190 | # path. Same lock as ``_async_handles``. |
173 | 191 | self._closed_commands: Set[str] = set() |
174 | 192 | self._async_handles_lock = threading.RLock() |
| 193 | + # Sync-execute cancellers keyed by ``id(cursor)``. A blocking |
| 194 | + # ``execute()`` sets ``cursor.active_command_id`` only AFTER it |
| 195 | + # returns, so a concurrent ``cursor.cancel()`` (the documented |
| 196 | + # cross-thread PEP-249 shape) has no command id to target while |
| 197 | + # the query runs. We register a detached kernel |
| 198 | + # ``StatementCanceller`` here just before the blocking call and |
| 199 | + # drop it after; ``cancel_running_cursor`` (invoked by |
| 200 | + # ``Cursor.cancel`` when there's no command id yet) fires it. |
| 201 | + # Guarded by its own lock — cancel can race execute teardown. |
| 202 | + self._sync_cancellers: Dict[int, Any] = {} |
| 203 | + self._sync_cancellers_lock = threading.RLock() |
175 | 204 |
|
176 | 205 | # ── Session lifecycle ────────────────────────────────────────── |
177 | 206 |
|
@@ -354,6 +383,24 @@ def execute_command( |
354 | 383 | # ``_async_statements`` and closed by ``close_command``); the sync |
355 | 384 | # path drops it in finally. ``close_stmt`` is the post-success |
356 | 385 | # decision flag — it stays True on sync, flips to False on async. |
| 386 | + # Volume/staging (PUT/GET/REMOVE) is not supported on the kernel |
| 387 | + # path: the kernel returns the staging control row as a normal |
| 388 | + # result set (``KernelResultSet.is_staging_operation`` is always |
| 389 | + # False), so the connector's ``_handle_staging_operation`` never |
| 390 | + # fires and NO file is transferred. Rather than silently no-op |
| 391 | + # (the Thrift path performs the presigned-URL upload/download), |
| 392 | + # fail loud at the call site so ETL scripts don't ingest |
| 393 | + # stale/missing data. Detected by the leading SQL verb — the |
| 394 | + # only signal available pre-execute, since the kernel exposes no |
| 395 | + # staging marker today. |
| 396 | + if _is_staging_statement(operation): |
| 397 | + raise NotSupportedError( |
| 398 | + "Volume / staging operations (PUT / GET / REMOVE) are not " |
| 399 | + "supported on the kernel backend (use_kernel=True); the file " |
| 400 | + "transfer would silently not happen. Use the Thrift backend " |
| 401 | + "for staging operations." |
| 402 | + ) |
| 403 | + |
357 | 404 | close_stmt = True |
358 | 405 | try: |
359 | 406 | try: |
@@ -382,10 +429,23 @@ def execute_command( |
382 | 429 | self._async_statements[command_id.guid] = stmt |
383 | 430 | close_stmt = False |
384 | 431 | return None |
| 432 | + # Register a detached canceller BEFORE the blocking |
| 433 | + # execute so a concurrent ``cursor.cancel()`` can reach |
| 434 | + # the running statement (its server id is populated mid- |
| 435 | + # execute). Keyed by ``id(cursor)`` since no command id |
| 436 | + # exists yet. Dropped in the finally. |
| 437 | + try: |
| 438 | + with self._sync_cancellers_lock: |
| 439 | + self._sync_cancellers[id(cursor)] = stmt.canceller() |
| 440 | + except Exception: |
| 441 | + # Canceller is best-effort; never block execute on it. |
| 442 | + pass |
385 | 443 | executed = stmt.execute() |
386 | 444 | except Exception as exc: |
387 | 445 | raise _wrap_kernel_exception("execute_command", exc) from exc |
388 | 446 | finally: |
| 447 | + with self._sync_cancellers_lock: |
| 448 | + self._sync_cancellers.pop(id(cursor), None) |
389 | 449 | if close_stmt: |
390 | 450 | # Sync path: ``Statement`` is a lifecycle owner separate |
391 | 451 | # from the executed handle. Drop it here so the parent |
@@ -422,6 +482,30 @@ def cancel_command(self, command_id: CommandId) -> None: |
422 | 482 | except Exception as exc: |
423 | 483 | raise _wrap_kernel_exception("cancel_command", exc) from exc |
424 | 484 |
|
| 485 | + def cancel_running_cursor(self, cursor: "Cursor") -> bool: |
| 486 | + """Cancel an in-flight SYNC ``execute()`` on ``cursor``. |
| 487 | +
|
| 488 | + Invoked by ``Cursor.cancel()`` when ``active_command_id`` is |
| 489 | + still ``None`` — i.e. a blocking ``execute()`` hasn't returned, |
| 490 | + so the command id isn't set yet but the server statement may be |
| 491 | + running. Fires the detached ``StatementCanceller`` registered in |
| 492 | + ``execute_command`` before the blocking call. |
| 493 | +
|
| 494 | + Returns ``True`` if a canceller was found and fired (the |
| 495 | + statement was in flight), ``False`` otherwise so ``Cursor`` can |
| 496 | + emit its "no executing command" warning. Safe to call from |
| 497 | + another thread. |
| 498 | + """ |
| 499 | + with self._sync_cancellers_lock: |
| 500 | + canceller = self._sync_cancellers.get(id(cursor)) |
| 501 | + if canceller is None: |
| 502 | + return False |
| 503 | + try: |
| 504 | + canceller.cancel() |
| 505 | + except Exception as exc: |
| 506 | + raise _wrap_kernel_exception("cancel_running_cursor", exc) from exc |
| 507 | + return True |
| 508 | + |
425 | 509 | def close_command(self, command_id: CommandId) -> None: |
426 | 510 | with self._async_handles_lock: |
427 | 511 | handle = self._async_handles.pop(command_id.guid, None) |
|
0 commit comments