diff --git a/kafka/net/compat.py b/kafka/net/compat.py index 450182c9e..7791c98aa 100644 --- a/kafka/net/compat.py +++ b/kafka/net/compat.py @@ -129,15 +129,6 @@ def send_and_receive(self, node_id, request, timeout_ms=30000): def poll(self, timeout_ms=None, future=None): with self._lock: - metadata_ttl = self._manager.cluster.ttl() - if metadata_ttl == 0: - self._manager.update_metadata() - elif self._manager.cluster.need_update: - # A refresh is pending but retry_backoff hasn't elapsed yet. - # Cap timeout_ms so select() returns in time for us to come - # back and re-check ttl. - if timeout_ms is None or metadata_ttl < timeout_ms: - timeout_ms = metadata_ttl return self._manager.poll(timeout_ms=timeout_ms, future=future) def close(self, node_id=None): diff --git a/kafka/net/manager.py b/kafka/net/manager.py index ecf953225..db1c1409c 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -70,6 +70,7 @@ def __init__(self, net, cluster, **configs): self.broker_version_data = None self._bootstrap_future = None self._metadata_future = None + self._refresh_loop_task = None self._io_thread = None self._pending_waiters = {} # event -> state dict, for pending run() waiters self._pending_waiters_lock = threading.Lock() @@ -144,6 +145,8 @@ def bootstrap(self, timeout_ms=None): deadline = None if timeout_ms is None else time.monotonic() + timeout_ms / 1000 self._bootstrap_future = self.call_soon(self._do_bootstrap, deadline) self._bootstrap_future.add_errback(lambda exc: log.error('Bootstrap failed: %s', exc)) + if self._refresh_loop_task is None: + self._refresh_loop_task = self._net.call_soon(self._metadata_refresh_loop) return self._bootstrap_future @property @@ -344,10 +347,25 @@ async def _do_update_metadata(self): except Exception as exc: self.cluster.failed_update(exc) raise - finally: - # Schedule next periodic refresh - ttl = self.cluster.ttl() / 1000 - self._net.call_later(max(0, ttl), self.update_metadata) + + async def _metadata_refresh_loop(self): + """Long-running coroutine that owns periodic metadata refresh. + + Sleeps for cluster.ttl() then triggers update_metadata(). Exits with + the event loop — stop() tears down the loop and this task with it. + """ + while True: + ttl_ms = self.cluster.ttl() + if ttl_ms > 0: + log.debug(f'Metadata loop: sleeping for {ttl_ms / 1000} secs') + await self._net.sleep(ttl_ms / 1000) + continue + try: + log.debug(f'Metadata loop: updating metadata') + await self.update_metadata() + except Exception as exc: + log.debug('Metadata refresh failed: %s', exc) + # failed_update() already set backoff; loop re-reads ttl def close(self, node_id=None): if node_id is not None: diff --git a/test/net/test_manager.py b/test/net/test_manager.py index cf1325dee..1be969e32 100644 --- a/test/net/test_manager.py +++ b/test/net/test_manager.py @@ -1,4 +1,5 @@ import socket +import threading import time from unittest.mock import MagicMock, patch @@ -320,6 +321,45 @@ def test_refresh_metadata_retries_no_node(self, net, cluster): # Should have a scheduled retry assert len(net._scheduled) > 0 + def test_bootstrap_triggers_refresh_loop(self, net, cluster): + """bootstrap() schedules the periodic metadata refresh loop, so + update_metadata fires without anyone calling it from compat.poll().""" + manager = KafkaConnectionManager(net, cluster, + socket_connection_timeout_ms=1000, + reconnect_backoff_ms=10) + assert manager._refresh_loop_task is None + call_count = [0] + done = threading.Event() + orig = manager.update_metadata + + def counting_update(): + call_count[0] += 1 + if call_count[0] >= 1: + done.set() + return orig() + + with patch.object(manager, 'update_metadata', side_effect=counting_update): + manager.bootstrap(timeout_ms=100) + assert manager._refresh_loop_task is not None + manager.start() + try: + assert done.wait(timeout=2), "refresh loop never called update_metadata" + finally: + manager.stop(timeout=2) + assert call_count[0] >= 1 + + def test_refresh_loop_spawned_once(self, net, cluster): + """Calling bootstrap() multiple times must not spawn multiple refresh + loop tasks.""" + manager = KafkaConnectionManager(net, cluster, + socket_connection_timeout_ms=1000, + reconnect_backoff_ms=10) + manager.bootstrap(timeout_ms=100) + task = manager._refresh_loop_task + assert task is not None + manager.bootstrap(timeout_ms=100) + assert manager._refresh_loop_task is task + class TestKafkaConnectionManagerClose: def test_close_single_connection(self, manager):