diff --git a/kafka/net/connection.py b/kafka/net/connection.py index f7e778237..d3a4534b2 100644 --- a/kafka/net/connection.py +++ b/kafka/net/connection.py @@ -389,6 +389,7 @@ async def _sasl_authenticate(self): error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: + log.error('%s: SaslHandshake failed: %s', self, error_type.__name__) self.close(error_type()) return @@ -399,8 +400,12 @@ async def _sasl_authenticate(self): return # Step 2: SASL authentication exchange - mechanism = get_sasl_mechanism(self.config['sasl_mechanism'])( - host=self.transport.getPeer()[0], **self.config) + try: + mechanism = get_sasl_mechanism(self.config['sasl_mechanism'])( + host=self.transport.getPeer()[0], **self.config) + except Exception as exc: + self.close(exc) + return while not mechanism.is_done(): token = mechanism.auth_bytes() diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 48d8f4262..4645ab669 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -83,7 +83,10 @@ async def _do_bootstrap(self, deadline): while deadline is None or time.monotonic() < deadline: bootstrap_broker = random.choice(self.cluster.bootstrap_brokers()) try: - conn = self.get_connection(bootstrap_broker.node_id, pop_on_close=False, refresh_metadata_on_err=False) + conn = self.get_connection(bootstrap_broker.node_id, + pop_on_close=False, + refresh_metadata_on_err=False, + reset_backoff_on_connect=False) except Errors.NodeNotReadyError: delay = self.connection_delay(bootstrap_broker.node_id) if deadline is not None: @@ -109,14 +112,17 @@ async def _do_bootstrap(self, deadline): self.cluster.update_metadata(response) if not self.cluster.brokers(): log.warning('Bootstrap metadata response has no brokers. Retrying.') - await self._net.sleep(self.config['reconnect_backoff_ms'] / 1000) continue - self._conns.pop(bootstrap_broker.node_id, conn).close() + self.reset_backoff(bootstrap_broker.node_id) log.info('Bootstrap complete: %s', self.cluster) return True - except Exception as e: - self.cluster.failed_update(e) + except Exception as exc: + log.error(f'Bootstrap attempt to {bootstrap_broker.node_id} failed: {exc}') + self.update_backoff(bootstrap_broker.node_id) + self.cluster.failed_update(exc) continue + finally: + self._conns.pop(bootstrap_broker.node_id, conn).close() else: raise Errors.KafkaConnectionError( 'Unable to bootstrap from %s' % (self.cluster.config['bootstrap_servers'],)) @@ -191,7 +197,7 @@ async def _build_transport(self, node): else: return transport - async def _connect(self, node, conn): + async def _connect(self, node, conn, reset_backoff_on_connect=True): try: transport = await self._build_transport(node) conn.connection_made(transport) @@ -203,12 +209,13 @@ async def _connect(self, node, conn): if self._sensors: self._sensors.connection_created.record() - self.reset_backoff(node.node_id) + if reset_backoff_on_connect: + self.reset_backoff(node.node_id) if conn.broker_version_data is not None: if self.cluster.is_bootstrap(node.node_id): self.broker_version_data = conn.broker_version_data - def get_connection(self, node_id, pop_on_close=True, refresh_metadata_on_err=True): + def get_connection(self, node_id, pop_on_close=True, refresh_metadata_on_err=True, reset_backoff_on_connect=True): if node_id is None: raise Errors.NodeNotReadyError('No node_id provided') elif self.connection_delay(node_id) > 0: @@ -226,7 +233,7 @@ def get_connection(self, node_id, pop_on_close=True, refresh_metadata_on_err=Tru if refresh_metadata_on_err: conn.close_future.add_errback(lambda _: self.cluster.request_update()) self._conns[node_id] = conn - self._net.call_soon(lambda: self._connect(node, conn)) + self._net.call_soon(lambda: self._connect(node, conn, reset_backoff_on_connect=reset_backoff_on_connect)) self._net.call_later(self.config['socket_connection_timeout_ms'] / 1000, lambda: conn.close(Errors.KafkaConnectionError('Connection timed out')) if not conn.init_future.is_done else None) diff --git a/kafka/sasl/scram.py b/kafka/sasl/scram.py index 420d88327..57624ec0d 100644 --- a/kafka/sasl/scram.py +++ b/kafka/sasl/scram.py @@ -17,8 +17,8 @@ def xor_bytes(left, right): class SaslMechanismScram(SaslMechanism): def __init__(self, **config): - assert 'sasl_plain_username' in config, 'sasl_plain_username required for SCRAM sasl' - assert 'sasl_plain_password' in config, 'sasl_plain_password required for SCRAM sasl' + assert config.get('sasl_plain_username', ''), 'sasl_plain_username required for SCRAM sasl' + assert config.get('sasl_plain_password', ''), 'sasl_plain_password required for SCRAM sasl' assert config.get('sasl_mechanism', '') in ScramClient.MECHANISMS, 'Unrecognized SCRAM mechanism' if config.get('security_protocol', '') == 'SASL_PLAINTEXT': log.warning('Exchanging credentials in the clear during Sasl Authentication') diff --git a/test/net/test_manager.py b/test/net/test_manager.py index b3b360da0..cf1325dee 100644 --- a/test/net/test_manager.py +++ b/test/net/test_manager.py @@ -189,14 +189,23 @@ def mock_send_request(request): return f conn.send_request.side_effect = mock_send_request - manager._conns['bootstrap-0'] = conn - - # First update_metadata leaves brokers empty; second populates them + # The bootstrap loop closes/pops the conn in a `finally` on every + # iteration, so patch get_connection to hand back the same mock on + # each retry instead of letting the manager open a fresh real socket. + def mock_get_connection(node_id, **kwargs): + manager._conns[node_id] = conn + return conn + + # First update_metadata leaves brokers empty; second populates them. + # The real update_metadata also clears metadata_refresh_in_progress + # so subsequent metadata_request() calls don't raise. def mock_update_metadata(response): + cluster.metadata_refresh_in_progress = False if call_count[0] >= 2: cluster._brokers = {1: MagicMock(node_id=1)} - with patch.object(cluster, 'update_metadata', side_effect=mock_update_metadata): + with patch.object(manager, 'get_connection', side_effect=mock_get_connection), \ + patch.object(cluster, 'update_metadata', side_effect=mock_update_metadata): f = manager.bootstrap(timeout_ms=2000) manager.poll(timeout_ms=2000, future=f)