From e75a68ab6ba751e23ae304e4e662f7f8562d3cd2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 29 Apr 2026 07:08:11 -0700 Subject: [PATCH] kafka.net.wakeup_notifier --- kafka/cluster.py | 2 +- kafka/net/selector.py | 47 ---------------------------------- kafka/net/wakeup_notifier.py | 49 ++++++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 48 deletions(-) create mode 100644 kafka/net/wakeup_notifier.py diff --git a/kafka/cluster.py b/kafka/cluster.py index 3f5e4ccf2..722181137 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -10,7 +10,7 @@ from kafka import errors as Errors from kafka.future import Future -from kafka.net.selector import WakeupNotifier +from kafka.net.wakeup_notifier import WakeupNotifier from kafka.protocol.metadata import MetadataRequest, MetadataResponse from kafka.structs import TopicPartition from kafka.util import ensure_valid_topic_name diff --git a/kafka/net/selector.py b/kafka/net/selector.py index 57e19ab2a..bbb1e2853 100644 --- a/kafka/net/selector.py +++ b/kafka/net/selector.py @@ -6,7 +6,6 @@ import selectors import socket import time -import weakref from kafka.future import Future @@ -388,49 +387,3 @@ def _process_events(self, event_list): self._ready.append(reader) else: log.warning("Selector got READ event without reader...") - - -class WakeupNotifier: - """await wakeup(timeout_secs) when either ``timeout_secs`` elapses or - notify() is called -- whichever first. The notifier is safe to call - from any thread (it routes through call_soon_threadsafe). - - notify() is a noop unless there is a task currently waiting. - - Used by the metadata refresh loop to sleep on its TTL while remaining - interruptible by external callers (e.g. KafkaProducer / KafkaConsumer - invoking cluster.request_update() from another thread). - """ - def __init__(self, net): - self._net = weakref.proxy(net) - self._fut = None - - def _wakeup(self): - if self._fut is not None and not self._fut.is_done: - self._fut.success(None) - - async def __call__(self, timeout_secs=None): - self._fut = Future() - if timeout_secs is not None: - try: - timer = self._net.call_later(timeout_secs, self._wakeup) - except ReferenceError: - return - else: - timer = None - try: - await self._fut - finally: - self._fut = None - if timer is not None and not timer.is_done: - try: - self._net.unschedule(timer) - except (ValueError, RuntimeError, ReferenceError): - pass - - def notify(self): - if self._fut is not None: - try: - self._net.call_soon_threadsafe(self._wakeup) - except ReferenceError: - pass diff --git a/kafka/net/wakeup_notifier.py b/kafka/net/wakeup_notifier.py new file mode 100644 index 000000000..d4757adb1 --- /dev/null +++ b/kafka/net/wakeup_notifier.py @@ -0,0 +1,49 @@ +import weakref + +from kafka.future import Future + + +class WakeupNotifier: + """await wakeup(timeout_secs) when either ``timeout_secs`` elapses or + notify() is called -- whichever first. The notifier is safe to call + from any thread (it routes through call_soon_threadsafe). + + notify() is a noop unless there is a task currently waiting. + + Used by the metadata refresh loop to sleep on its TTL while remaining + interruptible by external callers (e.g. KafkaProducer / KafkaConsumer + invoking cluster.request_update() from another thread). + """ + def __init__(self, net): + self._net = weakref.proxy(net) + self._fut = None + + def _wakeup(self): + if self._fut is not None and not self._fut.is_done: + self._fut.success(None) + + async def __call__(self, timeout_secs=None): + self._fut = Future() + if timeout_secs is not None: + try: + timer = self._net.call_later(timeout_secs, self._wakeup) + except ReferenceError: + return + else: + timer = None + try: + await self._fut + finally: + self._fut = None + if timer is not None and not timer.is_done: + try: + self._net.unschedule(timer) + except (ValueError, RuntimeError, ReferenceError): + pass + + def notify(self): + if self._fut is not None: + try: + self._net.call_soon_threadsafe(self._wakeup) + except ReferenceError: + pass