From 044c1902b478475bbfbe06a82c019a0b230f62e1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 27 Apr 2026 08:43:48 -0700 Subject: [PATCH] manager.wakeup_pair -> kafka.net.selector.WakeupNotifier --- kafka/cluster.py | 11 +++++----- kafka/net/manager.py | 28 -------------------------- kafka/net/selector.py | 47 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 34 deletions(-) diff --git a/kafka/cluster.py b/kafka/cluster.py index 57d0821dd..3f5e4ccf2 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -10,6 +10,7 @@ from kafka import errors as Errors from kafka.future import Future +from kafka.net.selector import WakeupNotifier from kafka.protocol.metadata import MetadataRequest, MetadataResponse from kafka.structs import TopicPartition from kafka.util import ensure_valid_topic_name @@ -66,7 +67,7 @@ def __init__(self, **configs): self._refresh_loop_future = None self._refresh_future = None - self._notify_wakeup = None + self._wakeup = None self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -90,6 +91,7 @@ def attach(self, manager): cycle; manager.close() still calls cluster.close() to clear eagerly. """ self._manager = weakref.proxy(manager) + self._wakeup = WakeupNotifier(self._manager._net) def close(self): # Drop manager reference cycle @@ -123,8 +125,7 @@ async def _refresh_loop(self): continue try: log.debug('Sleeping %s for next Metadata refresh', ttl_ms / 1000) - wakeup, self._notify_wakeup = self._manager.wakeup_pair(ttl_ms / 1000) - await wakeup() + await self._wakeup(ttl_ms / 1000) except Exception as exc: log.error('Metadata refresh loop error: %s', exc) @@ -359,9 +360,7 @@ def request_update(self): ret = self._future if self._manager: self.start_refresh_loop() - if self._notify_wakeup: - self._notify_wakeup() - self._notify_wakeup = None + self._wakeup.notify() return ret @property diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 2ef8c0f33..96ee515fe 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -471,31 +471,3 @@ async def waiter(): if state['exception'] is not None: raise state['exception'] return state['value'] - - def wakeup_pair(self, timeout_secs): - """Returns (awaitable, threadsafe_notifier) for an interruptible sleep. - - The awaitable resolves when either ``timeout_secs`` elapses or the - notifier is called -- whichever first. The notifier is safe to call - from any thread (it routes through call_soon_threadsafe). - - Used by the metadata refresh loop to sleep on its TTL while remaining - interruptible by external callers (e.g. KafkaProducer / KafkaConsumer - invoking cluster.request_update() from another thread). - """ - fut = Future() - wakeup = lambda f=fut: f.success(None) if not f.is_done else None - timer = self._net.call_later(timeout_secs, wakeup) - async def _wakeup(): - try: - await fut - finally: - # early wakeup via _notify_wakeup - if not timer.is_done: - try: - self._net.unschedule(timer) - except (ValueError, RuntimeError): - pass - def _notify_wakeup(): - self._net.call_soon_threadsafe(wakeup) - return _wakeup, _notify_wakeup diff --git a/kafka/net/selector.py b/kafka/net/selector.py index bbb1e2853..57e19ab2a 100644 --- a/kafka/net/selector.py +++ b/kafka/net/selector.py @@ -6,6 +6,7 @@ import selectors import socket import time +import weakref from kafka.future import Future @@ -387,3 +388,49 @@ def _process_events(self, event_list): self._ready.append(reader) else: log.warning("Selector got READ event without reader...") + + +class WakeupNotifier: + """await wakeup(timeout_secs) when either ``timeout_secs`` elapses or + notify() is called -- whichever first. The notifier is safe to call + from any thread (it routes through call_soon_threadsafe). + + notify() is a noop unless there is a task currently waiting. + + Used by the metadata refresh loop to sleep on its TTL while remaining + interruptible by external callers (e.g. KafkaProducer / KafkaConsumer + invoking cluster.request_update() from another thread). + """ + def __init__(self, net): + self._net = weakref.proxy(net) + self._fut = None + + def _wakeup(self): + if self._fut is not None and not self._fut.is_done: + self._fut.success(None) + + async def __call__(self, timeout_secs=None): + self._fut = Future() + if timeout_secs is not None: + try: + timer = self._net.call_later(timeout_secs, self._wakeup) + except ReferenceError: + return + else: + timer = None + try: + await self._fut + finally: + self._fut = None + if timer is not None and not timer.is_done: + try: + self._net.unschedule(timer) + except (ValueError, RuntimeError, ReferenceError): + pass + + def notify(self): + if self._fut is not None: + try: + self._net.call_soon_threadsafe(self._wakeup) + except ReferenceError: + pass