Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from kafka import errors as Errors
from kafka.future import Future
from kafka.net.selector import WakeupNotifier
from kafka.protocol.metadata import MetadataRequest, MetadataResponse
from kafka.structs import TopicPartition
from kafka.util import ensure_valid_topic_name
Expand Down Expand Up @@ -66,7 +67,7 @@ def __init__(self, **configs):

self._refresh_loop_future = None
self._refresh_future = None
self._notify_wakeup = None
self._wakeup = None

self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
Expand All @@ -90,6 +91,7 @@ def attach(self, manager):
cycle; manager.close() still calls cluster.close() to clear eagerly.
"""
self._manager = weakref.proxy(manager)
self._wakeup = WakeupNotifier(self._manager._net)

def close(self):
# Drop manager reference cycle
Expand Down Expand Up @@ -123,8 +125,7 @@ async def _refresh_loop(self):
continue
try:
log.debug('Sleeping %s for next Metadata refresh', ttl_ms / 1000)
wakeup, self._notify_wakeup = self._manager.wakeup_pair(ttl_ms / 1000)
await wakeup()
await self._wakeup(ttl_ms / 1000)
except Exception as exc:
log.error('Metadata refresh loop error: %s', exc)

Expand Down Expand Up @@ -359,9 +360,7 @@ def request_update(self):
ret = self._future
if self._manager:
self.start_refresh_loop()
if self._notify_wakeup:
self._notify_wakeup()
self._notify_wakeup = None
self._wakeup.notify()
return ret

@property
Expand Down
28 changes: 0 additions & 28 deletions kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,31 +471,3 @@ async def waiter():
if state['exception'] is not None:
raise state['exception']
return state['value']

def wakeup_pair(self, timeout_secs):
"""Returns (awaitable, threadsafe_notifier) for an interruptible sleep.

The awaitable resolves when either ``timeout_secs`` elapses or the
notifier is called -- whichever first. The notifier is safe to call
from any thread (it routes through call_soon_threadsafe).

Used by the metadata refresh loop to sleep on its TTL while remaining
interruptible by external callers (e.g. KafkaProducer / KafkaConsumer
invoking cluster.request_update() from another thread).
"""
fut = Future()
wakeup = lambda f=fut: f.success(None) if not f.is_done else None
timer = self._net.call_later(timeout_secs, wakeup)
async def _wakeup():
try:
await fut
finally:
# early wakeup via _notify_wakeup
if not timer.is_done:
try:
self._net.unschedule(timer)
except (ValueError, RuntimeError):
pass
def _notify_wakeup():
self._net.call_soon_threadsafe(wakeup)
return _wakeup, _notify_wakeup
47 changes: 47 additions & 0 deletions kafka/net/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import selectors
import socket
import time
import weakref

from kafka.future import Future

Expand Down Expand Up @@ -387,3 +388,49 @@ def _process_events(self, event_list):
self._ready.append(reader)
else:
log.warning("Selector got READ event without reader...")


class WakeupNotifier:
"""await wakeup(timeout_secs) when either ``timeout_secs`` elapses or
notify() is called -- whichever first. The notifier is safe to call
from any thread (it routes through call_soon_threadsafe).

notify() is a noop unless there is a task currently waiting.

Used by the metadata refresh loop to sleep on its TTL while remaining
interruptible by external callers (e.g. KafkaProducer / KafkaConsumer
invoking cluster.request_update() from another thread).
"""
def __init__(self, net):
self._net = weakref.proxy(net)
self._fut = None

def _wakeup(self):
if self._fut is not None and not self._fut.is_done:
self._fut.success(None)

async def __call__(self, timeout_secs=None):
self._fut = Future()
if timeout_secs is not None:
try:
timer = self._net.call_later(timeout_secs, self._wakeup)
except ReferenceError:
return
else:
timer = None
try:
await self._fut
finally:
self._fut = None
if timer is not None and not timer.is_done:
try:
self._net.unschedule(timer)
except (ValueError, RuntimeError, ReferenceError):
pass

def notify(self):
if self._fut is not None:
try:
self._net.call_soon_threadsafe(self._wakeup)
except ReferenceError:
pass
Loading