Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions kafka/net/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ async def _sasl_authenticate(self):

error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
log.error('%s: SaslHandshake failed: %s', self, error_type.__name__)
self.close(error_type())
return

Expand All @@ -399,8 +400,12 @@ async def _sasl_authenticate(self):
return

# Step 2: SASL authentication exchange
mechanism = get_sasl_mechanism(self.config['sasl_mechanism'])(
host=self.transport.getPeer()[0], **self.config)
try:
mechanism = get_sasl_mechanism(self.config['sasl_mechanism'])(
host=self.transport.getPeer()[0], **self.config)
except Exception as exc:
self.close(exc)
return

while not mechanism.is_done():
token = mechanism.auth_bytes()
Expand Down
25 changes: 16 additions & 9 deletions kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ async def _do_bootstrap(self, deadline):
while deadline is None or time.monotonic() < deadline:
bootstrap_broker = random.choice(self.cluster.bootstrap_brokers())
try:
conn = self.get_connection(bootstrap_broker.node_id, pop_on_close=False, refresh_metadata_on_err=False)
conn = self.get_connection(bootstrap_broker.node_id,
pop_on_close=False,
refresh_metadata_on_err=False,
reset_backoff_on_connect=False)
except Errors.NodeNotReadyError:
delay = self.connection_delay(bootstrap_broker.node_id)
if deadline is not None:
Expand All @@ -109,14 +112,17 @@ async def _do_bootstrap(self, deadline):
self.cluster.update_metadata(response)
if not self.cluster.brokers():
log.warning('Bootstrap metadata response has no brokers. Retrying.')
await self._net.sleep(self.config['reconnect_backoff_ms'] / 1000)
continue
self._conns.pop(bootstrap_broker.node_id, conn).close()
self.reset_backoff(bootstrap_broker.node_id)
log.info('Bootstrap complete: %s', self.cluster)
return True
except Exception as e:
self.cluster.failed_update(e)
except Exception as exc:
log.error(f'Bootstrap attempt to {bootstrap_broker.node_id} failed: {exc}')
self.update_backoff(bootstrap_broker.node_id)
self.cluster.failed_update(exc)
continue
finally:
self._conns.pop(bootstrap_broker.node_id, conn).close()
else:
raise Errors.KafkaConnectionError(
'Unable to bootstrap from %s' % (self.cluster.config['bootstrap_servers'],))
Expand Down Expand Up @@ -191,7 +197,7 @@ async def _build_transport(self, node):
else:
return transport

async def _connect(self, node, conn):
async def _connect(self, node, conn, reset_backoff_on_connect=True):
try:
transport = await self._build_transport(node)
conn.connection_made(transport)
Expand All @@ -203,12 +209,13 @@ async def _connect(self, node, conn):

if self._sensors:
self._sensors.connection_created.record()
self.reset_backoff(node.node_id)
if reset_backoff_on_connect:
self.reset_backoff(node.node_id)
if conn.broker_version_data is not None:
if self.cluster.is_bootstrap(node.node_id):
self.broker_version_data = conn.broker_version_data

def get_connection(self, node_id, pop_on_close=True, refresh_metadata_on_err=True):
def get_connection(self, node_id, pop_on_close=True, refresh_metadata_on_err=True, reset_backoff_on_connect=True):
if node_id is None:
raise Errors.NodeNotReadyError('No node_id provided')
elif self.connection_delay(node_id) > 0:
Expand All @@ -226,7 +233,7 @@ def get_connection(self, node_id, pop_on_close=True, refresh_metadata_on_err=Tru
if refresh_metadata_on_err:
conn.close_future.add_errback(lambda _: self.cluster.request_update())
self._conns[node_id] = conn
self._net.call_soon(lambda: self._connect(node, conn))
self._net.call_soon(lambda: self._connect(node, conn, reset_backoff_on_connect=reset_backoff_on_connect))
self._net.call_later(self.config['socket_connection_timeout_ms'] / 1000,
lambda: conn.close(Errors.KafkaConnectionError('Connection timed out'))
if not conn.init_future.is_done else None)
Expand Down
4 changes: 2 additions & 2 deletions kafka/sasl/scram.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def xor_bytes(left, right):

class SaslMechanismScram(SaslMechanism):
def __init__(self, **config):
assert 'sasl_plain_username' in config, 'sasl_plain_username required for SCRAM sasl'
assert 'sasl_plain_password' in config, 'sasl_plain_password required for SCRAM sasl'
assert config.get('sasl_plain_username', ''), 'sasl_plain_username required for SCRAM sasl'
assert config.get('sasl_plain_password', ''), 'sasl_plain_password required for SCRAM sasl'
assert config.get('sasl_mechanism', '') in ScramClient.MECHANISMS, 'Unrecognized SCRAM mechanism'
if config.get('security_protocol', '') == 'SASL_PLAINTEXT':
log.warning('Exchanging credentials in the clear during Sasl Authentication')
Expand Down
17 changes: 13 additions & 4 deletions test/net/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,23 @@ def mock_send_request(request):
return f
conn.send_request.side_effect = mock_send_request

manager._conns['bootstrap-0'] = conn

# First update_metadata leaves brokers empty; second populates them
# The bootstrap loop closes/pops the conn in a `finally` on every
# iteration, so patch get_connection to hand back the same mock on
# each retry instead of letting the manager open a fresh real socket.
def mock_get_connection(node_id, **kwargs):
manager._conns[node_id] = conn
return conn

# First update_metadata leaves brokers empty; second populates them.
# The real update_metadata also clears metadata_refresh_in_progress
# so subsequent metadata_request() calls don't raise.
def mock_update_metadata(response):
cluster.metadata_refresh_in_progress = False
if call_count[0] >= 2:
cluster._brokers = {1: MagicMock(node_id=1)}

with patch.object(cluster, 'update_metadata', side_effect=mock_update_metadata):
with patch.object(manager, 'get_connection', side_effect=mock_get_connection), \
patch.object(cluster, 'update_metadata', side_effect=mock_update_metadata):
f = manager.bootstrap(timeout_ms=2000)
manager.poll(timeout_ms=2000, future=f)

Expand Down
Loading