Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions kafka/net/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,6 @@ def send_and_receive(self, node_id, request, timeout_ms=30000):

def poll(self, timeout_ms=None, future=None):
with self._lock:
metadata_ttl = self._manager.cluster.ttl()
if metadata_ttl == 0:
self._manager.update_metadata()
elif self._manager.cluster.need_update:
# A refresh is pending but retry_backoff hasn't elapsed yet.
# Cap timeout_ms so select() returns in time for us to come
# back and re-check ttl.
if timeout_ms is None or metadata_ttl < timeout_ms:
timeout_ms = metadata_ttl
return self._manager.poll(timeout_ms=timeout_ms, future=future)

def close(self, node_id=None):
Expand Down
26 changes: 22 additions & 4 deletions kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(self, net, cluster, **configs):
self.broker_version_data = None
self._bootstrap_future = None
self._metadata_future = None
self._refresh_loop_task = None
self._io_thread = None
self._pending_waiters = {} # event -> state dict, for pending run() waiters
self._pending_waiters_lock = threading.Lock()
Expand Down Expand Up @@ -144,6 +145,8 @@ def bootstrap(self, timeout_ms=None):
deadline = None if timeout_ms is None else time.monotonic() + timeout_ms / 1000
self._bootstrap_future = self.call_soon(self._do_bootstrap, deadline)
self._bootstrap_future.add_errback(lambda exc: log.error('Bootstrap failed: %s', exc))
if self._refresh_loop_task is None:
self._refresh_loop_task = self._net.call_soon(self._metadata_refresh_loop)
return self._bootstrap_future

@property
Expand Down Expand Up @@ -344,10 +347,25 @@ async def _do_update_metadata(self):
except Exception as exc:
self.cluster.failed_update(exc)
raise
finally:
# Schedule next periodic refresh
ttl = self.cluster.ttl() / 1000
self._net.call_later(max(0, ttl), self.update_metadata)

async def _metadata_refresh_loop(self):
"""Long-running coroutine that owns periodic metadata refresh.

Sleeps for cluster.ttl() then triggers update_metadata(). Exits with
the event loop — stop() tears down the loop and this task with it.
"""
while True:
ttl_ms = self.cluster.ttl()
if ttl_ms > 0:
log.debug(f'Metadata loop: sleeping for {ttl_ms / 1000} secs')
await self._net.sleep(ttl_ms / 1000)
continue
try:
log.debug(f'Metadata loop: updating metadata')
await self.update_metadata()
except Exception as exc:
log.debug('Metadata refresh failed: %s', exc)
# failed_update() already set backoff; loop re-reads ttl

def close(self, node_id=None):
if node_id is not None:
Expand Down
40 changes: 40 additions & 0 deletions test/net/test_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import socket
import threading
import time
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -320,6 +321,45 @@ def test_refresh_metadata_retries_no_node(self, net, cluster):
# Should have a scheduled retry
assert len(net._scheduled) > 0

def test_bootstrap_triggers_refresh_loop(self, net, cluster):
"""bootstrap() schedules the periodic metadata refresh loop, so
update_metadata fires without anyone calling it from compat.poll()."""
manager = KafkaConnectionManager(net, cluster,
socket_connection_timeout_ms=1000,
reconnect_backoff_ms=10)
assert manager._refresh_loop_task is None
call_count = [0]
done = threading.Event()
orig = manager.update_metadata

def counting_update():
call_count[0] += 1
if call_count[0] >= 1:
done.set()
return orig()

with patch.object(manager, 'update_metadata', side_effect=counting_update):
manager.bootstrap(timeout_ms=100)
assert manager._refresh_loop_task is not None
manager.start()
try:
assert done.wait(timeout=2), "refresh loop never called update_metadata"
finally:
manager.stop(timeout=2)
assert call_count[0] >= 1

def test_refresh_loop_spawned_once(self, net, cluster):
"""Calling bootstrap() multiple times must not spawn multiple refresh
loop tasks."""
manager = KafkaConnectionManager(net, cluster,
socket_connection_timeout_ms=1000,
reconnect_backoff_ms=10)
manager.bootstrap(timeout_ms=100)
task = manager._refresh_loop_task
assert task is not None
manager.bootstrap(timeout_ms=100)
assert manager._refresh_loop_task is task


class TestKafkaConnectionManagerClose:
def test_close_single_connection(self, manager):
Expand Down
Loading