Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def __init__(self, **configs):
self._manager.start()

# Bootstrap on __init__
self._manager.run(self._manager.bootstrap, self.config['bootstrap_timeout_ms'])
self._manager.bootstrap(self.config['bootstrap_timeout_ms'])
self._closed = False
self._controller_id = None
self._coordinator_cache = {} # {group_id: node_id}
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def __init__(self, *topics, **configs):
# it. bootstrap is passed as a deferred coroutine so that once the IO
# thread is introduced in a later phase it runs on the IO thread.
if self._manager.broker_version_data is None:
self._manager.run(self._manager.bootstrap, self.config['api_version_auto_timeout_ms'])
self._manager.bootstrap(self.config['api_version_auto_timeout_ms'])
self.config['api_version'] = self._manager.broker_version

# Coordinator configurations are different for older brokers
Expand Down
25 changes: 13 additions & 12 deletions kafka/net/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,23 @@ def throttle_delay(self, node_id):
# Bootstrap / version

def bootstrap_connected(self):
return self._manager.bootstrapped
bootstrap_future = self._manager._bootstrap_future
return bootstrap_future is not None and not bootstrap_future.is_done

def get_broker_version(self, timeout_ms=None):
if self._manager.broker_version_data is not None:
return self._manager.broker_version_data.broker_version
else:
return self.check_version(timeout_ms=timeout_ms)
if self._manager.broker_version_data is None:
self._manager.bootstrap(timeout_ms)
return self._manager.broker_version_data.broker_version

def check_version(self, node_id=None, timeout_ms=10000):
f = self._manager.bootstrap()
self._manager.poll(timeout_ms=timeout_ms, future=f)
if f.failed():
raise f.exception
elif not f.is_done:
raise Errors.KafkaTimeoutError('check_version failed to complete within %s ms' % timeout_ms)
return self._manager.broker_version_data.broker_version
if not self._manager.bootstrapped:
self._manager.bootstrap(timeout_ms)
if node_id is None:
return self._manager.broker_version_data.broker_version
async def _check_version(broker_id):
conn = await self._manager.get_connection(broker_id)
return conn.broker_version
return self._manager.run(_check_version, node_id)

# Request sending

Expand Down
9 changes: 6 additions & 3 deletions kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,20 @@ async def _do_bootstrap(self, deadline):
finally:
self._conns.pop(bootstrap_broker.node_id, conn).close()
else:
raise Errors.KafkaConnectionError(
raise Errors.KafkaTimeoutError(
'Unable to bootstrap from %s' % (self.cluster.config['bootstrap_servers'],))

def bootstrap(self, timeout_ms=None):
def bootstrap_async(self, timeout_ms=None):
if self._bootstrap_future is not None and not self._bootstrap_future.is_done:
return self._bootstrap_future
deadline = None if timeout_ms is None else time.monotonic() + timeout_ms / 1000
self._bootstrap_future = self.call_soon(self._do_bootstrap, deadline)
self._bootstrap_future.add_errback(lambda exc: log.error('Bootstrap failed: %s', exc))
return self._bootstrap_future

def bootstrap(self, timeout_ms=None):
self.run(self.bootstrap_async, timeout_ms)

@property
def bootstrapped(self):
return self._bootstrap_future is not None and self._bootstrap_future.succeeded()
Expand Down Expand Up @@ -327,7 +330,7 @@ async def _do_update_metadata(self):
node_id = self.least_loaded_node()
if node_id is None:
if not self.bootstrapped:
await self.bootstrap()
await self.bootstrap_async()
continue
delay = self.config['reconnect_backoff_ms'] / 1000
log.debug("No node available for metadata request, retrying in %ss", delay)
Expand Down
16 changes: 16 additions & 0 deletions test/admin/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pytest

from kafka.admin import KafkaAdminClient


@pytest.fixture
def admin(broker):
admin = KafkaAdminClient(
kafka_client=broker.client_factory(),
bootstrap_servers='%s:%d' % (broker.host, broker.port),
request_timeout_ms=5000,
)
try:
yield admin
finally:
admin.close()
3 changes: 2 additions & 1 deletion test/net/test_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ class TestKafkaNetClientBootstrap:
def test_bootstrap_connected(self, client, manager):
assert not client.bootstrap_connected()
manager._bootstrap_future = Future()
manager._bootstrap_future.success(True)
assert client.bootstrap_connected()
manager._bootstrap_future.success(None)
assert not client.bootstrap_connected()

def test_get_broker_version(self, client, manager):
with pytest.raises(Errors.KafkaTimeoutError):
Expand Down
20 changes: 8 additions & 12 deletions test/net/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ def test_send_no_node_before_bootstrap(self, manager):


class TestKafkaConnectionManagerBootstrap:
def test_bootstrap_returns_future(self, manager):
f = manager.bootstrap()
def test_bootstrap_async_returns_future(self, manager):
f = manager.bootstrap_async()
assert isinstance(f, Future)
assert not f.is_done

def test_bootstrap_idempotent(self, manager):
f1 = manager.bootstrap()
f2 = manager.bootstrap()
def test_bootstrap_async_idempotent(self, manager):
f1 = manager.bootstrap_async()
f2 = manager.bootstrap_async()
assert f1 is f2

def test_bootstrap_connection_failure(self, net):
Expand All @@ -153,10 +153,8 @@ def test_bootstrap_connection_failure(self, net):
socket_connection_timeout_ms=500,
reconnect_backoff_ms=10,
reconnect_backoff_max_ms=100)
f = manager.bootstrap(timeout_ms=2000)
manager.poll(timeout_ms=3000, future=f)
assert f.failed()
assert isinstance(f.exception, Errors.KafkaConnectionError)
with pytest.raises(Errors.KafkaTimeoutError):
manager.bootstrap(timeout_ms=2000)
assert not manager._conns
failures, _ = manager._backoff['bootstrap-0']
assert failures > 1
Expand Down Expand Up @@ -206,10 +204,8 @@ def mock_update_metadata(response):

with patch.object(manager, 'get_connection', side_effect=mock_get_connection), \
patch.object(cluster, 'update_metadata', side_effect=mock_update_metadata):
f = manager.bootstrap(timeout_ms=2000)
manager.poll(timeout_ms=2000, future=f)
manager.bootstrap(timeout_ms=2000)

assert f.succeeded()
assert call_count[0] >= 2


Expand Down
Loading