Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from kafka import errors as Errors
from kafka.future import Future
from kafka.net.selector import WakeupNotifier
from kafka.net.wakeup_notifier import WakeupNotifier
from kafka.protocol.metadata import MetadataRequest, MetadataResponse
from kafka.structs import TopicPartition
from kafka.util import ensure_valid_topic_name
Expand Down
47 changes: 0 additions & 47 deletions kafka/net/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import selectors
import socket
import time
import weakref

from kafka.future import Future

Expand Down Expand Up @@ -388,49 +387,3 @@ def _process_events(self, event_list):
self._ready.append(reader)
else:
log.warning("Selector got READ event without reader...")


class WakeupNotifier:
"""await wakeup(timeout_secs) when either ``timeout_secs`` elapses or
notify() is called -- whichever first. The notifier is safe to call
from any thread (it routes through call_soon_threadsafe).

notify() is a noop unless there is a task currently waiting.

Used by the metadata refresh loop to sleep on its TTL while remaining
interruptible by external callers (e.g. KafkaProducer / KafkaConsumer
invoking cluster.request_update() from another thread).
"""
def __init__(self, net):
self._net = weakref.proxy(net)
self._fut = None

def _wakeup(self):
if self._fut is not None and not self._fut.is_done:
self._fut.success(None)

async def __call__(self, timeout_secs=None):
self._fut = Future()
if timeout_secs is not None:
try:
timer = self._net.call_later(timeout_secs, self._wakeup)
except ReferenceError:
return
else:
timer = None
try:
await self._fut
finally:
self._fut = None
if timer is not None and not timer.is_done:
try:
self._net.unschedule(timer)
except (ValueError, RuntimeError, ReferenceError):
pass

def notify(self):
if self._fut is not None:
try:
self._net.call_soon_threadsafe(self._wakeup)
except ReferenceError:
pass
49 changes: 49 additions & 0 deletions kafka/net/wakeup_notifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import weakref

from kafka.future import Future


class WakeupNotifier:
"""await wakeup(timeout_secs) when either ``timeout_secs`` elapses or
notify() is called -- whichever first. The notifier is safe to call
from any thread (it routes through call_soon_threadsafe).

notify() is a noop unless there is a task currently waiting.

Used by the metadata refresh loop to sleep on its TTL while remaining
interruptible by external callers (e.g. KafkaProducer / KafkaConsumer
invoking cluster.request_update() from another thread).
"""
def __init__(self, net):
self._net = weakref.proxy(net)
self._fut = None

def _wakeup(self):
if self._fut is not None and not self._fut.is_done:
self._fut.success(None)

async def __call__(self, timeout_secs=None):
self._fut = Future()
if timeout_secs is not None:
try:
timer = self._net.call_later(timeout_secs, self._wakeup)
except ReferenceError:
return
else:
timer = None
try:
await self._fut
finally:
self._fut = None
if timer is not None and not timer.is_done:
try:
self._net.unschedule(timer)
except (ValueError, RuntimeError, ReferenceError):
pass

def notify(self):
if self._fut is not None:
try:
self._net.call_soon_threadsafe(self._wakeup)
except ReferenceError:
pass
Loading