Skip to content

Add gRPC client and worker connection resiliency#135

Open
berndverst wants to merge 30 commits into
mainfrom
grpc-resiliency-pr708-impl
Open

Add gRPC client and worker connection resiliency#135
berndverst wants to merge 30 commits into
mainfrom
grpc-resiliency-pr708-impl

Conversation

@berndverst
Copy link
Copy Markdown
Member

@berndverst berndverst commented Apr 24, 2026

Summary

  • Add public gRPC resiliency option types and shared transport helpers, then wire them through core and Azure Managed constructors.
  • Harden worker, sync client, and async client connection recovery to better survive silent disconnects, transport failures, and channel recreation scenarios.
  • Add focused regression coverage plus user-facing changelog and design/plan updates for the new resiliency behavior.

Test Plan

  • Focused pytest run covering grpc resiliency, worker resiliency, worker concurrency loop, client resiliency, and Azure Managed wrapper wiring.
  • flake8 on all changed source and test files.

Bernd Verst and others added 26 commits April 23, 2026 17:19
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Extend worker resiliency coverage with an end-to-end silent-disconnect recovery test and an explicit reconnect backoff assertion.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings April 24, 2026 09:35
Comment thread tests/durabletask/test_client.py Fixed
Comment thread tests/durabletask/test_client.py Fixed
Comment thread durabletask/client.py Fixed
Comment thread durabletask/worker.py Fixed
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds first-class gRPC connection resiliency to the Durable Task Python SDK (core durabletask/) and threads the new configuration through Azure Managed wrappers, with extensive regression tests and design docs.

Changes:

  • Introduces GrpcWorkerResiliencyOptions / GrpcClientResiliencyOptions and shared internal resiliency helpers (backoff, failure tracking, transport-failure classification).
  • Updates worker stream loop and sync/async clients to detect transport-shaped failures and safely recreate/retire SDK-owned channels while preserving caller-owned channel semantics.
  • Adds comprehensive unit tests (core + azuremanaged) plus docs/specs and changelog entries.

Reviewed changes

Copilot reviewed 16 out of 17 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
durabletask/worker.py Implements worker stream monitoring, backoff, failure tracking, and safe SDK-owned channel retirement with in-flight tracking.
durabletask/client.py Adds sync/async unary invocation wrappers and SDK-owned channel recreation + retirement handling.
durabletask/grpc_options.py Adds public resiliency option dataclasses with validation.
durabletask/internal/grpc_resiliency.py Adds shared backoff, FailureTracker, and transport-failure classification helpers.
durabletask-azuremanaged/durabletask/azuremanaged/client.py Forwards client resiliency options through Azure Managed client wrappers.
durabletask-azuremanaged/durabletask/azuremanaged/worker.py Forwards worker resiliency options through Azure Managed worker wrapper.
tests/durabletask/test_worker_resiliency.py New worker resiliency tests (silent disconnect, graceful close, recreation thresholds, in-flight close deferral).
tests/durabletask/test_grpc_resiliency.py New tests for option validation, backoff, FailureTracker, and transport-failure classification.
tests/durabletask/test_client.py Adds sync/async client channel recreation/retirement tests and wrapper verification.
tests/durabletask/test_worker_concurrency_loop.py Updates tests to call prepare_for_run() before reusing the worker manager.
tests/durabletask/test_worker_concurrency_loop_async.py Updates async loop tests to call prepare_for_run() before reusing the worker manager.
tests/durabletask-azuremanaged/test_azuremanaged_grpc_resiliency.py New tests validating Azure Managed wrapper pass-through of resiliency options.
CHANGELOG.md Documents new resiliency options and behavior changes in core SDK.
durabletask-azuremanaged/CHANGELOG.md Documents pass-through resiliency options in Azure Managed package.
docs/superpowers/specs/2026-04-23-grpc-resiliency-design.md Adds design spec for resiliency behavior and public API.
docs/superpowers/plans/2026-04-23-grpc-resiliency.md Adds implementation plan document for the work.
.gitignore Ignores .worktrees/ and normalizes coverage.lcov entry formatting.

Comment thread durabletask/internal/grpc_resiliency.py Outdated
Comment thread docs/superpowers/plans/2026-04-23-grpc-resiliency.md Outdated
Comment thread durabletask/worker.py Outdated
Comment thread durabletask/client.py Outdated
Comment thread durabletask/client.py Outdated
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Bernd Verst and others added 2 commits April 24, 2026 11:49
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@andystaples
Copy link
Copy Markdown
Contributor

🤖 The following comment was generated by an AI agent (GitHub Copilot CLI) as part of a code review pass. Treat as a reviewer suggestion, not authoritative.

[1/10] Centralize resiliency in a UnaryUnaryClientInterceptor instead of _invoke_unary

The PR rewrites ~38 stub call sites in durabletask/client.py (sync + async) to go through self._invoke_unary("MethodName", req). That has two ongoing-maintenance costs:

  • The "MethodName" string keys lose static type help; a future rename of a stub method silently bypasses resiliency.
  • Any new RPC added later must remember to be wrapped, or it won't participate in failure tracking / channel recreation.

The dotnet counterpart (PR #708) solves this with a single ChannelRecreatingCallInvoker that wraps the channel once. The Python equivalent is grpc.intercept_channel + a UnaryUnaryClientInterceptor (and grpc.aio.UnaryUnaryClientInterceptor for the async client). Doing it that way would let you:

  • Delete _invoke_unary and revert all 38 call-site edits.
  • Keep LONG_POLL_METHODS keyed on the gRPC method path (e.g. /TaskHubSidecarService/WaitForInstanceStart), matching the dotnet implementation.
  • Have exactly one place to maintain for any future stub additions.

@andystaples
Copy link
Copy Markdown
Contributor

🤖 The following comment was generated by an AI agent (GitHub Copilot CLI) as part of a code review pass. Treat as a reviewer suggestion, not authoritative.

[2/10] Align naming for the "SDK-owned channel" concept

durabletask/client.py already exposes self._owns_channel to mean "the SDK created the channel."

durabletask/worker.py introduces a new helper for the same concept but inverts the framing:

def _can_recreate_channel(self) -> bool:
    return self._channel is None

Here self._channel is the caller-provided channel, so _can_recreate_channel() actually tests "no caller-provided channel" — the same thing _owns_channel already names in the client. The opposite-sounding names for the same idea make the cross-file code harder to read.

Suggestion: pick one name (e.g., _uses_sdk_channel or just _owns_channel) and use it consistently in both client.py and worker.py.

@andystaples
Copy link
Copy Markdown
Contributor

🤖 The following comment was generated by an AI agent (GitHub Copilot CLI) as part of a code review pass. Treat as a reviewer suggestion, not authoritative.

[3/10] FailureTracker mutates state without a lock; sync client is multi-threaded

durabletask/internal/grpc_resiliency.py:

@dataclass
class FailureTracker:
    threshold: int
    consecutive_failures: int = 0

    def record_failure(self) -> bool:
        ...
        self.consecutive_failures += 1
        return self.consecutive_failures >= self.threshold

    def record_success(self) -> None:
        self.consecutive_failures = 0

The sync TaskHubGrpcClient is commonly called from multiple worker threads (gRPC sync clients are typically used that way), and the read-modify-write self.consecutive_failures += 1 is not atomic. The CPython GIL makes a corrupted counter unlikely in practice, but races can:

  • Cause two threads to simultaneously cross the threshold and double-recreate (the cooldown mostly hides this).
  • Cause record_success to lose a concurrent failure increment, masking a real transport problem.

Suggestion: add a threading.Lock inside FailureTracker (the async client only ever touches it from the single event loop, so an asyncio.Lock is unnecessary for that side). Alternatively split sync/async tracker variants.

@andystaples
Copy link
Copy Markdown
Contributor

🤖 The following comment was generated by an AI agent (GitHub Copilot CLI) as part of a code review pass. Treat as a reviewer suggestion, not authoritative.

[4/10] Don't depend on the private ThreadPoolExecutor._shutdown attribute

durabletask/worker.py (in _AsyncWorkerManager) reads CPython-private state in four places:

if getattr(self.thread_pool, "_shutdown", False):
    self.thread_pool = self._create_thread_pool()
...
if not getattr(self.thread_pool, "_shutdown", False):
    self.thread_pool.shutdown(wait=True)

_shutdown is implementation-private — it isn't part of concurrent.futures.ThreadPoolExecutor's public API and could be renamed or removed in a future Python release without notice.

Suggestion: track an explicit self._pool_is_shutdown: bool flag set inside shutdown() / prepare_for_run() and read everywhere instead of getattr(..., "_shutdown", False).

@andystaples
Copy link
Copy Markdown
Contributor

🤖 The following comment was generated by an AI agent (GitHub Copilot CLI) as part of a code review pass. Treat as a reviewer suggestion, not authoritative.

[5/10] wrap_execution and wrap_cancellation in _async_run_loop are identical

In durabletask/worker.py._async_run_loop:

def wrap_execution(handler, release):
    def wrapped(*args, **kwargs):
        try:
            return handler(*args, **kwargs)
        finally:
            release()
    return wrapped

def wrap_cancellation(handler, release):
    def wrapped(*args, **kwargs):
        try:
            return handler(*args, **kwargs)
        finally:
            release()
    return wrapped

These are byte-for-byte the same function with different names. Collapse to one helper (e.g. _with_release(handler, release)) and use it for both the execution and cancellation paths in submit_work_item.

@andystaples
Copy link
Copy Markdown
Contributor

🤖 The following comment was generated by an AI agent (GitHub Copilot CLI) as part of a code review pass. Treat as a reviewer suggestion, not authoritative.

[6/10] _async_run_loop is hard to navigate; consider extracting a _WorkItemStreamConsumer

After this PR, TaskHubGrpcWorker._async_run_loop is roughly 250 lines containing several nested closures (create_fresh_connection, wrap_execution, wrap_cancellation, submit_work_item, invalidate_connection, stream_reader, get_reconnect_delay_seconds). The function captures a lot of mutable state via nonlocal and is now central to the worker's correctness on reconnects, drains, and silent disconnects.

The merged dotnet PR (#708) split the equivalent code into WorkItemStreamConsumer, ReconnectBackoff, and ProcessorExitReason. That structure is what makes its behavior-level tests possible without monkey-patching internals.

Suggestion (not necessarily for this PR, but a strong follow-up):

  • Extract a _WorkItemStreamConsumer class that owns the reader thread, sentinel handling, and outcome classification.
  • Move create_fresh_connection / invalidate_connection to methods on TaskHubGrpcWorker.
  • Move the in-flight tracker acquire/release wrapping into a small helper class alongside _InFlightChannelTracker.

The resulting _async_run_loop should read closer to the dotnet processor: a small loop that asks each helper for the next step.

@andystaples
Copy link
Copy Markdown
Contributor

🤖 The following comment was generated by an AI agent (GitHub Copilot CLI) as part of a code review pass. Treat as a reviewer suggestion, not authoritative.

[7/10] Promote the 30.0s grace period and 30 jitter cap to named constants

Two semantically meaningful numbers are hard-coded:

  • durabletask/client.py — the retired-channel close timer in both sync and async clients:
    close_timer = threading.Timer(30.0, self._close_retired_channel, args=(old_channel,))
    ...
    await asyncio.sleep(30.0)
  • durabletask/internal/grpc_resiliency.py:
    capped_attempt = min(attempt, 30)

These both encode design choices (in-flight RPC drain window; exponent ceiling to keep 2 ** n bounded) that mirror the dotnet PR. Promoting them to module-level constants with a short comment makes the intent discoverable to future readers and lets tests reference the same constant.

Suggested names:

  • _RETIRED_CHANNEL_CLOSE_DELAY_SECONDS = 30.0
  • _MAX_JITTER_ATTEMPT_EXPONENT = 30

@andystaples
Copy link
Copy Markdown
Contributor

🤖 The following comment was generated by an AI agent (GitHub Copilot CLI) as part of a code review pass. Treat as a reviewer suggestion, not authoritative.

[9/10] _InFlightChannelTracker keys on id(channel); consider WeakKeyDictionary

class _InFlightChannelTracker:
    def __init__(self):
        self._lock = Lock()
        self._states: dict[int, _TrackedChannelState] = {}

    def acquire(self, channel: Any):
        channel_key = id(channel)
        ...

This is safe as written because _TrackedChannelState holds a strong reference to the channel, so id() reuse-after-GC can't bite here. But id-keyed dicts are an unusual idiom in Python and are a well-known footgun if anyone later changes the lifetime model (e.g., drops the strong reference in _TrackedChannelState). A reader has to verify the invariant by reading the whole class.

Suggestion: either use the channel itself as the key if grpc.Channel is hashable (it is, by identity), or use weakref.WeakKeyDictionary if you want explicit lifecycle semantics. Both make the invariant local instead of structural.

@andystaples
Copy link
Copy Markdown
Contributor

🤖 The following comment was generated by an AI agent (GitHub Copilot CLI) as part of a code review pass. Treat as a reviewer suggestion, not authoritative.

[10/10] Worker tests reach deep into private state; consider extracting _WorkItemStreamConsumer first

tests/durabletask/test_worker_resiliency.py is ~841 lines and exercises the worker by monkey-patching private attributes:

  • worker._async_worker_manager = DummyWorkerManager()
  • worker._shutdown.wait = lambda timeout: False
  • worker._execute_activity = ...
  • worker._cancel_activity = ...
  • monkeypatch.setattr("durabletask.worker.shared.get_grpc_channel", ...)
  • monkeypatch.setattr("durabletask.worker.stubs.TaskHubSidecarServiceStub", ...)

This mirrors the merged dotnet PR's style and gives strong behavioral coverage today, but it tightly couples the tests to the current shape of TaskHubGrpcWorker._async_run_loop and its module-level imports. Any of the follow-ups suggested elsewhere in this review (extracting the stream consumer, switching the client to a UnaryUnaryClientInterceptor, renaming _can_recreate_channel) will require fan-out edits across these tests.

Suggestion: pair the extraction proposed in comment [6/10] with a rewrite of the largest test cases against the new helper class. The remaining worker-level tests should only need to verify wiring, not transport semantics.

Comment thread durabletask/worker.py
async def reset_for_new_run(self):
"""Reset the manager state for a new run."""
self._shutdown = False
self.prepare_for_run()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: prepare_for_run seems to be abstracted from reset_for_new_run only for testing purposes, is this necessary and/or best practice?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reply generated by GitHub Copilot CLI on behalf of @berndverst.

Not test-only — prepare_for_run is also called from TaskHubGrpcWorker._async_run_loop at the start of each run (line 706). The split exists because reset_for_new_run additionally drains the queues (cancelling any in-flight items), which is only valid when there''s a running event loop, while prepare_for_run is the lightweight "this manager is about to run again" path that just resets _shutdown and the thread pool. The resiliency feature relies on the manager being reusable across reconnects, so we need a reset path that doesn''t require draining live queues. The tests call prepare_for_run for the same reason — they exercise the manager-reuse path, not a test-only seam.


import grpc

LONG_POLL_METHODS = {"WaitForInstanceStart", "WaitForInstanceCompletion"}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Hardcoded list of method names, though I'm not sure if that is preferable vs a string matching approach like WaitFor* - just thinking ahead to when we add new methods that would qualify, do we need to go back and maintain this list

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reply generated by GitHub Copilot CLI on behalf of @berndverst.

Good point — I considered prefix matching but kept the explicit set deliberately. The risk with WaitFor* is that a future RPC like WaitForApproval or WaitForExternalEvent might match the prefix but not actually be a server-side long-poll (i.e., its DEADLINE_EXCEEDED really would mean a transport failure). The explicit set forces an intentional decision when a new long-poll method ships, which is cheap because the list is exactly two entries and only changes when the proto changes.

I''ve added a short comment above the constant explaining the semantics. Happy to switch to a prefix-based match if you''d prefer; let me know.

Comment thread durabletask/client.py
Comment on lines 203 to 208
channel = shared.get_grpc_channel(
host_address=host_address,
host_address=self._host_address,
secure_channel=secure_channel,
interceptors=interceptors,
interceptors=self._interceptors,
channel_options=channel_options,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncertain: Pass-by-reference - does get_grpc_channel modify the interceptor list or the interceptors, and do we want those changes in the client's copy when the channel is recreated? If yes, disregard

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reply generated by GitHub Copilot CLI on behalf of @berndverst.

Checked — shared.get_grpc_channel does not mutate the interceptor sequence. It passes the values through grpc.intercept_channel(channel, *interceptors) (unpacking, no in-place modification) for sync, and forwards the sequence directly to grpc.aio.{secure,insecure}_channel(..., interceptors=...) on the async side. Neither call retains a write-through reference.

We also defensively copy in the client __init__ (self._interceptors = list(resolved_interceptors)), so even if gRPC ever started mutating the argument, the client''s copy would not be affected — and the same list is reused on every recreate so the recreated channel always gets the same interceptors. So this one''s fine as-is.

- Make FailureTracker thread-safe with an internal lock so multi-threaded
  sync clients can't race the consecutive-failure counter (review [3/10]).
- Track _AsyncWorkerManager pool shutdown via an explicit _pool_is_shutdown
  flag instead of reading ThreadPoolExecutor._shutdown (CPython private API,
  review [4/10]).
- Collapse identical wrap_execution/wrap_cancellation closures in the worker
  stream loop into a single wrap_with_release helper (review [5/10]).
- Promote the retired-channel close delay and jitter exponent cap to named
  module-level constants (review [7/10]).
- Key _InFlightChannelTracker on the channel object instead of id(channel)
  so the lifetime invariant is local to the tracker (review [9/10]).
- Rename TaskHubGrpcWorker._can_recreate_channel() to the existing
  _owns_channel attribute used by the clients, so both files use the same
  name for the same concept (review [2/10]).
- Add regression tests for FailureTracker concurrency and for thread-pool
  recreation after manager shutdown.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@berndverst
Copy link
Copy Markdown
Member Author

The following replies were generated by GitHub Copilot CLI on behalf of @berndverst as part of a review-response pass.

Thanks for the very detailed pass — really appreciated. Here's what I did for each item; pushed as a follow-up commit on the same branch.

[1/10] Centralize resiliency in a UnaryUnaryClientInterceptor — Deferring as a follow-up. Agree this is the right long-term shape (matches ChannelRecreatingCallInvoker on the .NET side), but it would require reverting all 38 call-site changes and reworking the failure-tracking integration around ClientCallDetails.method, and we''d want to add a new layer of interceptor-level tests before trusting it. I''ll file it as a follow-up issue rather than expand this PR''s scope further.

[2/10] Align naming for "SDK-owned channel" — Done. Added self._owns_channel = channel is None to TaskHubGrpcWorker.__init__ (mirroring the clients) and removed _can_recreate_channel(). All four call sites and the worker test now use self._owns_channel directly.

[3/10] FailureTracker thread safety — Done. FailureTracker now owns an internal threading.Lock and record_failure / record_success execute their read-modify-write under it. Async callers pay a negligible cost because they only touch the tracker from the event loop, but the sync path is no longer racy. Added a regression test that fires 8 threads × 500 increments and asserts the counter ends at 4000.

[4/10] Don''t depend on ThreadPoolExecutor._shutdown — Done. _AsyncWorkerManager now tracks an explicit self._pool_is_shutdown flag, set in __init__, reset in _ensure_thread_pool, and flipped to True in run()''s finally after shutdown(wait=True). The two getattr(self.thread_pool, "_shutdown", False) reads are gone. Added a test that runs the manager once, then prepare_for_runs and asserts a fresh pool was created.

[5/10] Collapse wrap_execution / wrap_cancellation — Done. Replaced with a single wrap_with_release(handler, release) helper used for both the execution and cancellation paths in submit_work_item.

[6/10] Extract _WorkItemStreamConsumer — Deferring per your "not necessarily for this PR" note. Agree the long-term shape should mirror the .NET WorkItemStreamConsumer / ReconnectBackoff / ProcessorExitReason split, and that will make the worker tests in [10/10] much smaller. I''ll track it together with [1/10] as the follow-up refactor.

[7/10] Promote 30.0s grace period and 30 jitter cap to named constants — Done. Added _RETIRED_CHANNEL_CLOSE_DELAY_SECONDS = 30.0 at module scope in durabletask/client.py (used by both the sync threading.Timer and the async asyncio.sleep) and _MAX_JITTER_ATTEMPT_EXPONENT = 30 in durabletask/internal/grpc_resiliency.py, each with a short comment explaining the intent.

[9/10] _InFlightChannelTracker keying — Done. The tracker now keys directly on the channel object (dict[Any, _TrackedChannelState]) instead of id(channel). gRPC channels are hashable by identity, and _TrackedChannelState continues to hold the strong reference, so the lifetime invariant is now local to the class and a reader doesn''t have to chase the strong-reference argument.

[10/10] Worker tests reach deep into private state — Deferring with [6/10]. Once the stream consumer is extracted, the largest blocks in test_worker_resiliency.py should collapse into thin wiring checks plus consumer-level behavior tests, and we won''t need the _async_worker_manager/_execute_activity/get_grpc_channel monkey-patching.

(No [8/10] block landed on the PR, so I assumed that one was intentionally skipped.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants