diff --git a/kafka/cluster.py b/kafka/cluster.py index 2c4a4424a..57d0821dd 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -6,6 +6,7 @@ import socket import threading import time +import weakref from kafka import errors as Errors from kafka.future import Future @@ -20,9 +21,6 @@ class ClusterMetadata: """ A class to manage kafka cluster metadata. - This class does not perform any IO. It simply updates internal state - given API responses (MetadataResponse, FindCoordinatorResponse). - Keyword Arguments: retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. @@ -48,6 +46,7 @@ class ClusterMetadata: } def __init__(self, **configs): + self._manager = None self._topics = set() self._brokers = {} # node_id -> MetadataResponseBroker self._partitions = {} # topic -> partition -> PartitionMetadata @@ -64,7 +63,10 @@ def __init__(self, **configs): self.internal_topics = set() self.controller = None self.cluster_id = None - self.metadata_refresh_in_progress = False + + self._refresh_loop_future = None + self._refresh_future = None + self._notify_wakeup = None self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -74,6 +76,101 @@ def __init__(self, **configs): self._bootstrap_brokers = self._generate_bootstrap_brokers() self._coordinator_brokers = {} + @property + def metadata_refresh_in_progress(self): + """True if a refresh is mid-flight.""" + return self._refresh_future is not None and not self._refresh_future.is_done + + def attach(self, manager): + """Wire this cluster to its connection manager. + + Construction is split from attach so ClusterMetadata can be built + standalone (tests, snapshots) without a live manager. The reference is + held via weakref.proxy so that manager <-> cluster does not form a GC + cycle; manager.close() still calls cluster.close() to clear eagerly. + """ + self._manager = weakref.proxy(manager) + + def close(self): + # Drop manager reference cycle + self._manager = None + + def start_refresh_loop(self): + """Spawn the periodic refresh coroutine. Idempotent. Triggers bootstrap if needed.""" + if self._manager is None: + raise RuntimeError('start_refresh_loop requires prior attach()') + if self._refresh_loop_future is not None: + return + log.debug('Starting metadata refresh loop') + self._refresh_loop_future = self._manager.call_soon(self._refresh_loop) + + async def _refresh_loop(self): + """Awaits ttl() then triggers refresh_metadata(); request_update() wakes early.""" + if self._manager is None: + raise RuntimeError('start_refresh_loop requires prior attach()') + if not self._manager.bootstrapped: + await self._manager.bootstrap_async() + while True: + if self.metadata_refresh_in_progress: + await self._refresh_future + ttl_ms = self.ttl() + if ttl_ms == 0: + try: + await self.refresh_metadata() + except Exception as exc: + log.debug('Metadata refresh failed: %s', exc) + log.exception(exc) + continue + try: + log.debug('Sleeping %s for next Metadata refresh', ttl_ms / 1000) + wakeup, self._notify_wakeup = self._manager.wakeup_pair(ttl_ms / 1000) + await wakeup() + except Exception as exc: + log.error('Metadata refresh loop error: %s', exc) + + async def refresh_metadata(self, node_id=None): + """Send one MetadataRequest and apply the response. + + Concurrent callers share a single in-flight request: if a refresh is + already underway, additional callers await the same Future and see the + same outcome (success or exception). This avoids duplicate broker + requests when bootstrap and the refresh loop race, or when external + callers invoke refresh while the loop is mid-flight. + """ + if self._manager is None: + raise RuntimeError('refresh_metadata requires prior attach()') + if self.metadata_refresh_in_progress: + log.debug('Metadata refresh already in flight; awaiting existing') + await self._refresh_future + return + self._refresh_future = Future() + try: + await self._do_refresh_metadata(node_id) + except Exception as exc: + self._refresh_future.failure(exc) + raise + else: + self._refresh_future.success(None) + + async def _do_refresh_metadata(self, node_id): + log.debug(f'Metadata refresh (node_id={node_id})') + node_id = self._manager.least_loaded_node() if node_id is None else node_id + if node_id is None: + self._manager.update_backoff('metadata') + raise Errors.NodeNotReadyError('metadata') + else: + self._manager.reset_backoff('metadata') + try: + request = self.metadata_request() + log.debug("Sending metadata request %s to node %s", request, node_id) + response = await self._manager.send(request, node_id) + except Exception as exc: + log.error('Metadata refresh: failed %s', exc) + self.failed_update(exc) + raise + log.debug('Metadata refresh: success') + self.update_metadata(response) + def _generate_bootstrap_brokers(self): # collect_hosts does not perform DNS, so we should be fine to re-use bootstrap_hosts = collect_hosts(self.config['bootstrap_servers']) @@ -228,8 +325,9 @@ def coordinator_for_group(self, group): def ttl(self): """Milliseconds until metadata should be refreshed""" now = time.monotonic() * 1000 - if self.metadata_refresh_in_progress: - ttl = self.config['retry_backoff_ms'] + if self._manager is not None and self._manager.connection_delay('metadata'): + # Exponential backoff - KIP-580 + return self._manager.connection_delay('metadata') * 1000 elif self._need_update: ttl = 0 else: @@ -258,7 +356,13 @@ def request_update(self): self._need_update = True if not self._future or self._future.is_done: self._future = Future() - return self._future + ret = self._future + if self._manager: + self.start_refresh_loop() + if self._notify_wakeup: + self._notify_wakeup() + self._notify_wakeup = None + return ret @property def need_update(self): @@ -283,10 +387,6 @@ def topics(self, exclude_internal_topics=True): return topics def metadata_request(self): - if self.metadata_refresh_in_progress: - raise RuntimeError('MetadataRequest currently in-flight!') - else: - self.metadata_refresh_in_progress = True if self.need_all_topic_metadata: topics = MetadataRequest.ALL_TOPICS elif not self._topics: @@ -309,7 +409,6 @@ def failed_update(self, exception): f = self._future self._future = None self._last_refresh_ms = time.monotonic() * 1000 - self.metadata_refresh_in_progress = False if f: f.failure(exception) @@ -403,7 +502,6 @@ def update_metadata(self, metadata): now = time.monotonic() * 1000 self._last_refresh_ms = now self._last_successful_refresh_ms = now - self.metadata_refresh_in_progress = False if f: # In the common case where we ask for a single topic and get back an diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 50f07b764..586340c56 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -247,7 +247,7 @@ async def _fetch_offsets_by_times_async(self, timestamps, timeout_ms=None): if not getattr(exc, 'retriable', False): raise if getattr(exc, 'invalid_metadata', False) or self._client._manager.cluster.need_update: - refresh_future = self._client._manager.update_metadata() + refresh_future = self._client.cluster.request_update() try: await self._client._manager.wait_for(refresh_future, timer.timeout_ms) except Errors.KafkaTimeoutError: diff --git a/kafka/net/compat.py b/kafka/net/compat.py index 6729ff22f..8a001eb3d 100644 --- a/kafka/net/compat.py +++ b/kafka/net/compat.py @@ -4,7 +4,6 @@ import time import kafka.errors as Errors -from kafka.cluster import ClusterMetadata from kafka.net.manager import KafkaConnectionManager from kafka.net.selector import NetworkSelector @@ -20,13 +19,11 @@ class KafkaNetClient: KafkaConnectionManager directly (fire-and-forget via _request_buffer). """ def __init__(self, **configs): + # _lock is still used by the legacy Coordinator (kafka/coordinator/base.py). + # Remove once Coordinator moves to the IO thread (Phase D). self._lock = threading.RLock() self._net = NetworkSelector(**configs) - cluster = ClusterMetadata( - bootstrap_servers=configs.get('bootstrap_servers', ['localhost:9092']), - metadata_max_age_ms=configs.get('metadata_max_age_ms', 300000), - ) - self._manager = KafkaConnectionManager(self._net, cluster, **configs) + self._manager = KafkaConnectionManager(self._net, **configs) @property def cluster(self): @@ -129,16 +126,11 @@ def send_and_receive(self, node_id, request, timeout_ms=30000): # Delegation def poll(self, timeout_ms=None, future=None): + # _lock serializes with HeartbeatThread, which also drives poll() + # while holding this lock. Without it, both threads would call + # _net.poll() concurrently and race on selector / task state. + # The lock goes away once HeartbeatThread does (Phase D). with self._lock: - metadata_ttl = self._manager.cluster.ttl() - if metadata_ttl == 0: - self._manager.update_metadata() - elif self._manager.cluster.need_update: - # A refresh is pending but retry_backoff hasn't elapsed yet. - # Cap timeout_ms so select() returns in time for us to come - # back and re-check ttl. - if timeout_ms is None or metadata_ttl < timeout_ms: - timeout_ms = metadata_ttl return self._manager.poll(timeout_ms=timeout_ms, future=future) def close(self, node_id=None): diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 6e70b9d28..bc044e897 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -11,6 +11,7 @@ from .connection import KafkaConnection from .metrics import KafkaManagerMetrics from .transport import KafkaSSLTransport, KafkaTCPTransport +from kafka.cluster import ClusterMetadata import kafka.errors as Errors from kafka.protocol.broker_version_data import BrokerVersionData from kafka.future import Future @@ -21,6 +22,7 @@ class KafkaConnectionManager: DEFAULT_CONFIG = { + 'bootstrap_servers': 'localhost:9092', 'client_id': 'kafka-python-' + __version__, 'client_software_name': 'kafka-python', 'client_software_version': __version__, @@ -54,22 +56,26 @@ class KafkaConnectionManager: 'api_version_auto_timeout_ms': 2000, 'metrics': None, 'metric_group_prefix': '', + 'metadata_max_age_ms': 300000, } - def __init__(self, net, cluster, **configs): + def __init__(self, net, **configs): self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: self.config[key] = configs[key] self._net = net - self.cluster = cluster + self.cluster = ClusterMetadata( + bootstrap_servers=self.config['bootstrap_servers'], + metadata_max_age_ms=self.config['metadata_max_age_ms'], + ) + self.cluster.attach(self) self._conns = {} self._backoff = dict() # node_id => (failures, backoff_until) self._idle_check_delay = self.config['connections_max_idle_ms'] / 1000 self.close_idle_connections() self.broker_version_data = None self._bootstrap_future = None - self._metadata_future = None self._io_thread = None self._pending_waiters = {} # event -> state dict, for pending run() waiters self._pending_waiters_lock = threading.Lock() @@ -93,6 +99,7 @@ def least_used_connections(self): async def _do_bootstrap(self, deadline): while deadline is None or time.monotonic() < deadline: bootstrap_broker = random.choice(self.cluster.bootstrap_brokers()) + log.debug('Attempting bootstrap with %s', bootstrap_broker) try: conn = self.get_connection(bootstrap_broker.node_id, pop_on_close=False, @@ -106,32 +113,30 @@ async def _do_bootstrap(self, deadline): await self._net.sleep(delay) continue - if not conn.connected: - log.debug('Attempting bootstrap with %s', bootstrap_broker) - self.cluster.request_update() - try: - await conn.init_future - except Errors.IncompatibleBrokerVersion: - log.error('Did you attempt to connect to a kafka controller (no metadata support)?') - raise - except Exception as exc: - self._conns.pop(bootstrap_broker.node_id, conn).close(exc) - continue + try: + await conn + except Errors.IncompatibleBrokerVersion: + log.error('Did you attempt to connect to a kafka controller (no metadata support)?') + raise + except Exception as exc: + self._conns.pop(bootstrap_broker.node_id, conn).close(exc) + continue try: - response = await conn.send_request(self.cluster.metadata_request()) - self.cluster.update_metadata(response) + await self.cluster.refresh_metadata(bootstrap_broker.node_id) if not self.cluster.brokers(): log.warning('Bootstrap metadata response has no brokers. Retrying.') + self.update_backoff(bootstrap_broker.node_id) continue - self.reset_backoff(bootstrap_broker.node_id) - log.info('Bootstrap complete: %s', self.cluster) - return True except Exception as exc: log.error(f'Bootstrap attempt to {bootstrap_broker.node_id} failed: {exc}') self.update_backoff(bootstrap_broker.node_id) - self.cluster.failed_update(exc) continue + else: + self.reset_backoff(bootstrap_broker.node_id) + self.cluster.start_refresh_loop() + log.info('Bootstrap complete: %s', self.cluster) + return True finally: self._conns.pop(bootstrap_broker.node_id, conn).close() else: @@ -142,6 +147,7 @@ def bootstrap_async(self, timeout_ms=None): if self._bootstrap_future is not None and not self._bootstrap_future.is_done: return self._bootstrap_future deadline = None if timeout_ms is None else time.monotonic() + timeout_ms / 1000 + log.debug('Starting new bootstrap') self._bootstrap_future = self.call_soon(self._do_bootstrap, deadline) self._bootstrap_future.add_errback(lambda exc: log.error('Bootstrap failed: %s', exc)) return self._bootstrap_future @@ -216,8 +222,9 @@ async def _connect(self, node, conn, reset_backoff_on_connect=True): transport = await self._build_transport(node) conn.connection_made(transport) await conn.init_future - except Exception as e: - conn.connection_lost(e) + except Exception as exc: + log.error('Connection failed: %s', exc) + conn.connection_lost(exc) self.update_backoff(node.node_id) return @@ -314,44 +321,14 @@ def update_backoff(self, node_id): self._backoff[node_id] = (failures, backoff_until_time) def connection_delay(self, node_id): + """Connection delay in seconds. + + Uses exponential backoff/retry with jitter. See KIP-144. + """ if node_id not in self._backoff: return 0 return max(0, self._backoff[node_id][1] - time.monotonic()) - def update_metadata(self): - if self._metadata_future is not None and not self._metadata_future.is_done: - return self._metadata_future - self.cluster.request_update() - self._metadata_future = self.call_soon(self._do_update_metadata) - return self._metadata_future - - async def _do_update_metadata(self): - while True: - node_id = self.least_loaded_node() - if node_id is None: - if not self.bootstrapped: - await self.bootstrap_async() - continue - delay = self.config['reconnect_backoff_ms'] / 1000 - log.debug("No node available for metadata request, retrying in %ss", delay) - await self._net.sleep(delay) - continue - conn = self.get_connection(node_id) - try: - await conn.init_future - request = self.cluster.metadata_request() - log.debug("Sending metadata request %s to node %s", request, node_id) - response = await conn.send_request(request) - self.cluster.update_metadata(response) - return True - except Exception as exc: - self.cluster.failed_update(exc) - raise - finally: - # Schedule next periodic refresh - ttl = self.cluster.ttl() / 1000 - self._net.call_later(max(0, ttl), self.update_metadata) - def close(self, node_id=None): if node_id is not None: conn = self._conns.get(node_id) @@ -360,6 +337,7 @@ def close(self, node_id=None): else: for conn in list(self._conns.values()): conn.close() + self.cluster.close() def start(self): """Spawn a daemon IO thread that owns the event loop. Idempotent.""" @@ -491,3 +469,31 @@ async def waiter(): if state['exception'] is not None: raise state['exception'] return state['value'] + + def wakeup_pair(self, timeout_secs): + """Returns (awaitable, threadsafe_notifier) for an interruptible sleep. + + The awaitable resolves when either ``timeout_secs`` elapses or the + notifier is called -- whichever first. The notifier is safe to call + from any thread (it routes through call_soon_threadsafe). + + Used by the metadata refresh loop to sleep on its TTL while remaining + interruptible by external callers (e.g. KafkaProducer / KafkaConsumer + invoking cluster.request_update() from another thread). + """ + fut = Future() + wakeup = lambda f=fut: f.success(None) if not f.is_done else None + timer = self._net.call_later(timeout_secs, wakeup) + async def _wakeup(): + try: + await fut + finally: + # early wakeup via _notify_wakeup + if not timer.is_done: + try: + self._net.unschedule(timer) + except (ValueError, RuntimeError): + pass + def _notify_wakeup(): + self._net.call_soon_threadsafe(wakeup) + return _wakeup, _notify_wakeup diff --git a/kafka/net/selector.py b/kafka/net/selector.py index d6ae000a7..bbb1e2853 100644 --- a/kafka/net/selector.py +++ b/kafka/net/selector.py @@ -231,9 +231,9 @@ def register_event(self, fileobj, event, task): key = self._selector.get_key(fileobj) reader, writer = key.data if event == selectors.EVENT_READ and reader: - raise RuntimeError("EVENT_READ already registered for fileobj") + raise RuntimeError("EVENT_READ already registered for fileobj %s by %s (new: %s)" % (fileobj, reader, task)) if event == selectors.EVENT_WRITE and writer: - raise RuntimeError("EVENT_WRITE already registered for fileobj") + raise RuntimeError("EVENT_WRITE already registered for fileobj %s by %s (new: %s)" % (fileobj, writer, task)) self._selector.modify(fileobj, key.events | event, (task, writer) if event == selectors.EVENT_READ else (reader, task)) except KeyError: self._selector.register(fileobj, event, (task, None) if event == selectors.EVENT_READ else (None, task)) diff --git a/test/conftest.py b/test/conftest.py index 4917e7f8e..43580f4bb 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,7 +1,10 @@ import pytest from .mock_broker import MockBroker +from kafka.cluster import ClusterMetadata from kafka.net.compat import KafkaNetClient +from kafka.net.manager import KafkaConnectionManager +from kafka.net.selector import NetworkSelector from kafka.protocol.metadata import MetadataResponse @@ -49,3 +52,23 @@ def client(broker): yield cli finally: cli.close() + + +@pytest.fixture +def manager(broker): + manager = KafkaConnectionManager( + NetworkSelector(), + bootstrap_servers='%s:%d' % (broker.host, broker.port), + api_version=broker.broker_version, + request_timeout_ms=5000, + ) + broker.attach(manager) + try: + yield manager + finally: + manager.close() + + +@pytest.fixture +def cluster(): + return ClusterMetadata() diff --git a/test/consumer/test_fetcher.py b/test/consumer/test_fetcher.py index cf72dc8a5..325cc235c 100644 --- a/test/consumer/test_fetcher.py +++ b/test/consumer/test_fetcher.py @@ -868,7 +868,7 @@ def test_retriable_invalid_metadata_triggers_refresh(self, net_client, mocker): refresh_future = Future().success(None) update_metadata_mock = mocker.patch.object( - fetcher._client._manager, 'update_metadata', return_value=refresh_future) + fetcher._client.cluster, 'request_update', return_value=refresh_future) result = fetcher.offsets_by_times(timestamps, timeout_ms=10000) assert result == {tp: expected_offset} diff --git a/test/net/test_manager.py b/test/net/test_manager.py index 08747c80a..1af7051b9 100644 --- a/test/net/test_manager.py +++ b/test/net/test_manager.py @@ -1,4 +1,5 @@ import socket +import threading import time from unittest.mock import MagicMock, patch @@ -18,39 +19,27 @@ def net(): return NetworkSelector() -@pytest.fixture -def cluster(): - return ClusterMetadata(bootstrap_servers=['localhost:9092']) - - -@pytest.fixture -def manager(net, cluster): - return KafkaConnectionManager(net, cluster, - socket_connection_timeout_ms=1000, - reconnect_backoff_ms=10, - reconnect_backoff_max_ms=100) - - class TestKafkaConnectionManagerConfig: - def test_default_config(self, net, cluster): - m = KafkaConnectionManager(net, cluster) + def test_default_config(self, net): + m = KafkaConnectionManager(net) assert m.config['reconnect_backoff_ms'] == 50 assert m.config['reconnect_backoff_max_ms'] == 30000 assert m.config['socket_connection_timeout_ms'] == 5000 assert m.config['max_in_flight_requests_per_connection'] == 5 - def test_config_override(self, net, cluster): - m = KafkaConnectionManager(net, cluster, reconnect_backoff_ms=100) + def test_config_override(self, net): + m = KafkaConnectionManager(net, reconnect_backoff_ms=100) assert m.config['reconnect_backoff_ms'] == 100 - def test_initial_state(self, manager): + def test_initial_state(self, net): + manager = KafkaConnectionManager(net) assert manager._conns == {} assert manager._backoff == {} assert manager.broker_version_data is None assert not manager.bootstrapped - def test_api_versions(self, net, cluster): - m = KafkaConnectionManager(net, cluster, api_version=(1, 0)) + def test_api_versions(self, net): + m = KafkaConnectionManager(net, api_version=(1, 0)) assert m.broker_version_data == BrokerVersionData((1, 0)) @@ -95,7 +84,8 @@ def test_get_connection_during_backoff_raises(self, manager): with pytest.raises(Errors.NodeNotReadyError): manager.get_connection('bootstrap-0') - def test_get_connection_creates_connection(self, manager): + def test_get_connection_creates_connection(self, net): + manager = KafkaConnectionManager(net) conn = manager.get_connection('bootstrap-0') assert isinstance(conn, KafkaConnection) assert conn.node_id == 'bootstrap-0' @@ -148,8 +138,8 @@ def test_bootstrap_async_idempotent(self, manager): assert f1 is f2 def test_bootstrap_connection_failure(self, net): - cluster = ClusterMetadata(bootstrap_servers=['localhost:1']) - manager = KafkaConnectionManager(net, cluster, + manager = KafkaConnectionManager(net, + bootstrap_servers=['localhost:1'], socket_connection_timeout_ms=500, reconnect_backoff_ms=10, reconnect_backoff_max_ms=100) @@ -166,46 +156,20 @@ def test_bootstrapped_property(self, manager): manager._bootstrap_future.success(True) assert manager.bootstrapped - def test_bootstrap_retries_empty_brokers(self, net): - cluster = ClusterMetadata(bootstrap_servers=['localhost:9092']) - manager = KafkaConnectionManager(net, cluster, - socket_connection_timeout_ms=1000, - reconnect_backoff_ms=10) - conn = MagicMock() - conn.connected = True - conn.init_future = Future() - conn.init_future.success(True) - conn.close_future = Future() - conn.paused = set() - conn.in_flight_requests = [] + def test_bootstrap_retries_empty_brokers(self, manager): + cluster = manager.cluster + # First metadata update leaves brokers empty; second populates them. call_count = [0] - def mock_send_request(request): - call_count[0] += 1 - f = Future() - f.success(MagicMock()) - return f - conn.send_request.side_effect = mock_send_request - - # The bootstrap loop closes/pops the conn in a `finally` on every - # iteration, so patch get_connection to hand back the same mock on - # each retry instead of letting the manager open a fresh real socket. - def mock_get_connection(node_id, **kwargs): - manager._conns[node_id] = conn - return conn - - # First update_metadata leaves brokers empty; second populates them. - # The real update_metadata also clears metadata_refresh_in_progress - # so subsequent metadata_request() calls don't raise. def mock_update_metadata(response): - cluster.metadata_refresh_in_progress = False + call_count[0] += 1 if call_count[0] >= 2: cluster._brokers = {1: MagicMock(node_id=1)} - with patch.object(manager, 'get_connection', side_effect=mock_get_connection), \ - patch.object(cluster, 'update_metadata', side_effect=mock_update_metadata): + with patch.object(cluster, 'update_metadata', side_effect=mock_update_metadata): manager.bootstrap(timeout_ms=2000) + assert manager.bootstrapped assert call_count[0] >= 2 @@ -253,8 +217,7 @@ def test_connect_to_timeout_fires(self, net): blocker.connect(('127.0.0.1', port)) try: - cluster = ClusterMetadata(bootstrap_servers=['127.0.0.1:%d' % port]) - manager = KafkaConnectionManager(net, cluster, socket_connection_timeout_ms=100) + manager = KafkaConnectionManager(net, bootstrap_servers=['127.0.0.1:%d' % port], socket_connection_timeout_ms=100) conn = manager.get_connection('bootstrap-0') manager.poll(timeout_ms=1000, future=conn.init_future) assert conn.init_future.is_done @@ -263,60 +226,6 @@ def test_connect_to_timeout_fires(self, net): listener.close() -class TestKafkaConnectionManagerMetadataRefresh: - def test_update_metadata_returns_future(self, manager): - f = manager.update_metadata() - assert isinstance(f, Future) - assert not f.is_done - - def test_update_metadata_deduplicates(self, manager): - f1 = manager.update_metadata() - f2 = manager.update_metadata() - assert f1 is f2 - - def test_update_metadata_new_future_after_done(self, manager): - f1 = manager.update_metadata() - f1.success(True) - f2 = manager.update_metadata() - assert f2 is not f1 - - def test_update_metadata_sets_cluster_need_update(self, manager): - manager.update_metadata() - assert manager.cluster._need_update - - def test_refresh_metadata_schedules_next(self, net, cluster): - manager = KafkaConnectionManager(net, cluster, - socket_connection_timeout_ms=1000, - reconnect_backoff_ms=10) - # Simulate a connected node - conn = MagicMock() - conn.connected = True - conn.init_future = Future().success(True) - conn.paused = set() - conn.in_flight_requests = [] - conn.send_request.return_value = Future() - manager._conns['node-1'] = conn - with patch.object(cluster, 'brokers', return_value=[MagicMock(node_id='node-1')]): - f = manager.update_metadata() - # Run the scheduled _refresh_metadata task - net.poll(timeout_ms=100) - # Should have called send_request on the connection - assert conn.send_request.called - - def test_refresh_metadata_retries_no_node(self, net, cluster): - manager = KafkaConnectionManager(net, cluster, - socket_connection_timeout_ms=1000, - reconnect_backoff_ms=50) - # No connected nodes, empty cluster - with patch.object(cluster, 'brokers', return_value=[]): - f = manager.update_metadata() - net.poll(timeout_ms=0) - # Should not have resolved yet (retry scheduled) - assert not f.is_done - # Should have a scheduled retry - assert len(net._scheduled) > 0 - - class TestKafkaConnectionManagerClose: def test_close_single_connection(self, manager): conn = manager.get_connection('bootstrap-0') diff --git a/test/producer/test_producer.py b/test/producer/test_producer.py index 970932733..a3a1736c0 100644 --- a/test/producer/test_producer.py +++ b/test/producer/test_producer.py @@ -5,7 +5,6 @@ import pytest from kafka import KafkaProducer -from kafka.cluster import ClusterMetadata from kafka.producer.transaction_manager import TransactionManager, ProducerIdAndEpoch @@ -17,13 +16,13 @@ def test_kafka_producer_thread_close(): assert threading.active_count() == threads -def test_idempotent_producer_reset_producer_id(): +def test_idempotent_producer_reset_producer_id(cluster): transaction_manager = TransactionManager( transactional_id=None, transaction_timeout_ms=1000, retry_backoff_ms=100, api_version=(0, 11), - metadata=ClusterMetadata(), + metadata=cluster, ) test_producer_id_and_epoch = ProducerIdAndEpoch(123, 456) diff --git a/test/producer/test_sender.py b/test/producer/test_sender.py index b00a66066..cb0bb9533 100644 --- a/test/producer/test_sender.py +++ b/test/producer/test_sender.py @@ -86,13 +86,13 @@ def producer_batch(topic='foo', partition=0, magic=2): @pytest.fixture -def transaction_manager(): +def transaction_manager(cluster): return TransactionManager( transactional_id=None, transaction_timeout_ms=60000, retry_backoff_ms=100, api_version=(2, 1), - metadata=ClusterMetadata()) + metadata=cluster) @pytest.mark.parametrize(("api_version", "produce_version"), [ @@ -1017,6 +1017,7 @@ def test_unknown_producer_id_without_transaction_manager_fails(self, sender, acc class TestKip360SenderIntegration: + def _make_txn_manager(self, transactional_id=None): """Transaction manager on a KIP-360-capable broker version with a valid producer_id already set (simulating post-InitProducerId state).""" diff --git a/test/test_cluster.py b/test/test_cluster.py index b08189582..85c6245db 100644 --- a/test/test_cluster.py +++ b/test/test_cluster.py @@ -1,10 +1,12 @@ # pylint: skip-file import socket +from unittest.mock import MagicMock, patch import pytest -from kafka.cluster import ClusterMetadata, collect_hosts +from kafka.cluster import collect_hosts +from kafka.future import Future from kafka.protocol.metadata import MetadataResponse Broker = MetadataResponse.MetadataResponseBroker @@ -12,9 +14,39 @@ Partition = Topic.MetadataResponsePartition +def _make_metadata_response(version): + topic = Topic( + version=version, + error_code=0, + name='topic-1', + is_internal=False, + partitions=[ + Partition( + version=version, + error_code=0, + partition_index=0, + leader_id=0, + leader_epoch=0, + replica_nodes=[0], + isr_nodes=[0], + offline_replicas=[12], + ), + ], + ) + return MetadataResponse( + version=version, + throttle_time_ms=0, + brokers=[ + Broker(node_id=0, host='foo', port=12, rack='rack-1', version=version), + Broker(node_id=1, host='bar', port=34, rack='rack-2', version=version), + ], + cluster_id='cluster-foo', + controller_id=0, + topics=[topic]) + + class TestClusterMetadataUpdateMetadata: - def test_empty_broker_list(self): - cluster = ClusterMetadata() + def test_empty_broker_list(self, cluster): assert len(cluster.brokers()) == 0 cluster.update_metadata(MetadataResponse[0]( @@ -28,36 +60,8 @@ def test_empty_broker_list(self): assert len(cluster.brokers()) == 2 @pytest.mark.parametrize('version', range(0, MetadataResponse.max_version + 1)) - def test_metadata(self, version): - cluster = ClusterMetadata() - topic = Topic( - version=version, - error_code=0, - name='topic-1', - is_internal=False, - partitions=[ - Partition( - version=version, - error_code=0, - partition_index=0, - leader_id=0, - leader_epoch=0, - replica_nodes=[0], - isr_nodes=[0], - offline_replicas=[12], - ), - ], - ) - response = MetadataResponse( - version=version, - throttle_time_ms=0, - brokers=[ - Broker(node_id=0, host='foo', port=12, rack='rack-1', version=version), - Broker(node_id=1, host='bar', port=34, rack='rack-2', version=version), - ], - cluster_id='cluster-foo', - controller_id=0, - topics=[topic]) + def test_metadata(self, cluster, version): + response = _make_metadata_response(version) response = MetadataResponse.decode(response.encode(), version=version) cluster.update_metadata(response) assert len(cluster.topics()) == 1 @@ -78,8 +82,7 @@ def test_metadata(self, version): else: assert cluster._partitions['topic-1'][0].leader_epoch == -1 - def test_unauthorized_topic(self): - cluster = ClusterMetadata() + def test_unauthorized_topic(self, cluster): cluster.set_topics(['unauthorized-topic']) assert len(cluster.brokers()) == 0 @@ -95,8 +98,7 @@ def test_unauthorized_topic(self): class TestClusterMetadataTopics: - def test_set_topics(self): - cluster = ClusterMetadata() + def test_set_topics(self, cluster): cluster._need_update = False fut = cluster.set_topics(['t1', 't2']) @@ -169,3 +171,68 @@ def test_collect_hosts__protocol(self): ('foo.bar', 1234, socket.AF_UNSPEC), ('fizz.buzz', 5678, socket.AF_UNSPEC), ]) + + +class TestClusterMetadataRefresh: + def test_request_update_returns_future(self, cluster): + f = cluster.request_update() + assert isinstance(f, Future) + assert not f.is_done + + def test_request_update_deduplicates(self, cluster): + f1 = cluster.request_update() + f2 = cluster.request_update() + assert f1 is f2 + + def test_request_update_new_future_after_done(self, cluster): + f1 = cluster.request_update() + f1.success(True) + f2 = cluster.request_update() + assert f2 is not f1 + + def test_request_update_sets_cluster_need_update(self, cluster): + f = cluster.request_update() + assert cluster._need_update + + def test_request_update_sends_metadata_request(self, manager): + manager.bootstrap() + manager.cluster.config['retry_backoff_ms'] = 10 # reduce loop delay when metadata in progress + + response = _make_metadata_response(8) + with patch.object(manager, 'send', return_value=Future().success(response)): + f = manager.cluster.request_update() + # Drive the cluster refresh loop + manager.poll(timeout_ms=100, future=f) + assert manager.send.called + + def test_refresh_metadata_retries_no_node(self, manager): + # No connected nodes, empty cluster + cluster = manager.cluster + with patch.object(cluster, 'brokers', return_value=[]): + cluster.start_refresh_loop() + f = cluster.request_update() + manager.poll(timeout_ms=0) + # Should not have resolved yet (retry scheduled) + assert not f.is_done + # Should have a scheduled retry + assert len(manager._net._scheduled) > 0 + + def test_bootstrap_triggers_refresh_loop(self, manager, mocker): + """bootstrap() schedules the periodic metadata refresh loop on the + cluster, so refresh fires without anyone calling it from compat.poll().""" + cluster = manager.cluster + assert cluster._refresh_loop_future is None + spy = mocker.spy(cluster, 'refresh_metadata') + manager.bootstrap(timeout_ms=100) + assert cluster._refresh_loop_future is not None + assert spy.call_count >= 1 + + def test_refresh_loop_spawned_once(self, manager): + """Calling bootstrap() multiple times must not spawn multiple refresh + loop tasks.""" + cluster = manager.cluster + manager.bootstrap(timeout_ms=100) + future = cluster._refresh_loop_future + assert future is not None + manager.bootstrap(timeout_ms=100) + assert cluster._refresh_loop_future is future