diff --git a/kafka/admin/client.py b/kafka/admin/client.py index c5d6d0133..61e8d5d7f 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -225,7 +225,7 @@ def __init__(self, **configs): self._manager.start() # Bootstrap on __init__ - self._manager.run(self._manager.bootstrap, self.config['bootstrap_timeout_ms']) + self._manager.bootstrap(self.config['bootstrap_timeout_ms']) self._closed = False self._controller_id = None self._coordinator_cache = {} # {group_id: node_id} diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 6cad5602b..5972d2bdf 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -387,7 +387,7 @@ def __init__(self, *topics, **configs): # it. bootstrap is passed as a deferred coroutine so that once the IO # thread is introduced in a later phase it runs on the IO thread. if self._manager.broker_version_data is None: - self._manager.run(self._manager.bootstrap, self.config['api_version_auto_timeout_ms']) + self._manager.bootstrap(self.config['api_version_auto_timeout_ms']) self.config['api_version'] = self._manager.broker_version # Coordinator configurations are different for older brokers diff --git a/kafka/net/compat.py b/kafka/net/compat.py index 450182c9e..6729ff22f 100644 --- a/kafka/net/compat.py +++ b/kafka/net/compat.py @@ -93,22 +93,23 @@ def throttle_delay(self, node_id): # Bootstrap / version def bootstrap_connected(self): - return self._manager.bootstrapped + bootstrap_future = self._manager._bootstrap_future + return bootstrap_future is not None and not bootstrap_future.is_done def get_broker_version(self, timeout_ms=None): - if self._manager.broker_version_data is not None: - return self._manager.broker_version_data.broker_version - else: - return self.check_version(timeout_ms=timeout_ms) + if self._manager.broker_version_data is None: + self._manager.bootstrap(timeout_ms) + return self._manager.broker_version_data.broker_version def check_version(self, node_id=None, timeout_ms=10000): - f = self._manager.bootstrap() - self._manager.poll(timeout_ms=timeout_ms, future=f) - if f.failed(): - raise f.exception - elif not f.is_done: - raise Errors.KafkaTimeoutError('check_version failed to complete within %s ms' % timeout_ms) - return self._manager.broker_version_data.broker_version + if not self._manager.bootstrapped: + self._manager.bootstrap(timeout_ms) + if node_id is None: + return self._manager.broker_version_data.broker_version + async def _check_version(broker_id): + conn = await self._manager.get_connection(broker_id) + return conn.broker_version + return self._manager.run(_check_version, node_id) # Request sending diff --git a/kafka/net/manager.py b/kafka/net/manager.py index ecf953225..6e70b9d28 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -135,10 +135,10 @@ async def _do_bootstrap(self, deadline): finally: self._conns.pop(bootstrap_broker.node_id, conn).close() else: - raise Errors.KafkaConnectionError( + raise Errors.KafkaTimeoutError( 'Unable to bootstrap from %s' % (self.cluster.config['bootstrap_servers'],)) - def bootstrap(self, timeout_ms=None): + def bootstrap_async(self, timeout_ms=None): if self._bootstrap_future is not None and not self._bootstrap_future.is_done: return self._bootstrap_future deadline = None if timeout_ms is None else time.monotonic() + timeout_ms / 1000 @@ -146,6 +146,9 @@ def bootstrap(self, timeout_ms=None): self._bootstrap_future.add_errback(lambda exc: log.error('Bootstrap failed: %s', exc)) return self._bootstrap_future + def bootstrap(self, timeout_ms=None): + self.run(self.bootstrap_async, timeout_ms) + @property def bootstrapped(self): return self._bootstrap_future is not None and self._bootstrap_future.succeeded() @@ -327,7 +330,7 @@ async def _do_update_metadata(self): node_id = self.least_loaded_node() if node_id is None: if not self.bootstrapped: - await self.bootstrap() + await self.bootstrap_async() continue delay = self.config['reconnect_backoff_ms'] / 1000 log.debug("No node available for metadata request, retrying in %ss", delay) diff --git a/test/admin/conftest.py b/test/admin/conftest.py new file mode 100644 index 000000000..3bd675acb --- /dev/null +++ b/test/admin/conftest.py @@ -0,0 +1,16 @@ +import pytest + +from kafka.admin import KafkaAdminClient + + +@pytest.fixture +def admin(broker): + admin = KafkaAdminClient( + kafka_client=broker.client_factory(), + bootstrap_servers='%s:%d' % (broker.host, broker.port), + request_timeout_ms=5000, + ) + try: + yield admin + finally: + admin.close() diff --git a/test/net/test_compat.py b/test/net/test_compat.py index ba3f5becb..3cce53b50 100644 --- a/test/net/test_compat.py +++ b/test/net/test_compat.py @@ -102,8 +102,9 @@ class TestKafkaNetClientBootstrap: def test_bootstrap_connected(self, client, manager): assert not client.bootstrap_connected() manager._bootstrap_future = Future() - manager._bootstrap_future.success(True) assert client.bootstrap_connected() + manager._bootstrap_future.success(None) + assert not client.bootstrap_connected() def test_get_broker_version(self, client, manager): with pytest.raises(Errors.KafkaTimeoutError): diff --git a/test/net/test_manager.py b/test/net/test_manager.py index cf1325dee..08747c80a 100644 --- a/test/net/test_manager.py +++ b/test/net/test_manager.py @@ -137,14 +137,14 @@ def test_send_no_node_before_bootstrap(self, manager): class TestKafkaConnectionManagerBootstrap: - def test_bootstrap_returns_future(self, manager): - f = manager.bootstrap() + def test_bootstrap_async_returns_future(self, manager): + f = manager.bootstrap_async() assert isinstance(f, Future) assert not f.is_done - def test_bootstrap_idempotent(self, manager): - f1 = manager.bootstrap() - f2 = manager.bootstrap() + def test_bootstrap_async_idempotent(self, manager): + f1 = manager.bootstrap_async() + f2 = manager.bootstrap_async() assert f1 is f2 def test_bootstrap_connection_failure(self, net): @@ -153,10 +153,8 @@ def test_bootstrap_connection_failure(self, net): socket_connection_timeout_ms=500, reconnect_backoff_ms=10, reconnect_backoff_max_ms=100) - f = manager.bootstrap(timeout_ms=2000) - manager.poll(timeout_ms=3000, future=f) - assert f.failed() - assert isinstance(f.exception, Errors.KafkaConnectionError) + with pytest.raises(Errors.KafkaTimeoutError): + manager.bootstrap(timeout_ms=2000) assert not manager._conns failures, _ = manager._backoff['bootstrap-0'] assert failures > 1 @@ -206,10 +204,8 @@ def mock_update_metadata(response): with patch.object(manager, 'get_connection', side_effect=mock_get_connection), \ patch.object(cluster, 'update_metadata', side_effect=mock_update_metadata): - f = manager.bootstrap(timeout_ms=2000) - manager.poll(timeout_ms=2000, future=f) + manager.bootstrap(timeout_ms=2000) - assert f.succeeded() assert call_count[0] >= 2