From 2628bd54a3284bd72ef942f4a2cb1ef9872a7601 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 12 Apr 2026 21:53:31 -0700 Subject: [PATCH] kafka.net: Raise UnknownBrokerIdError when connection fails because node_id is not in metadata --- kafka/errors.py | 4 ++++ kafka/net/manager.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/kafka/errors.py b/kafka/errors.py index 09691d536..992b0d21d 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -79,6 +79,10 @@ class NodeNotReadyError(KafkaError): retriable = True +class UnknownBrokerIdError(KafkaError): + pass + + class QuotaViolationError(KafkaError): pass diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 571a6ca19..48d8f4262 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -210,14 +210,14 @@ async def _connect(self, node, conn): def get_connection(self, node_id, pop_on_close=True, refresh_metadata_on_err=True): if node_id is None: - raise Errors.NodeNotReadyError() + raise Errors.NodeNotReadyError('No node_id provided') elif self.connection_delay(node_id) > 0: raise Errors.NodeNotReadyError(node_id) elif node_id in self._conns: return self._conns[node_id] node = self.cluster.broker_metadata(node_id) if node is None: - raise Errors.NodeNotReadyError(node_id) + raise Errors.UnknownBrokerIdError(node_id) conn = KafkaConnection(self._net, node_id=node_id, broker_version_data=self.broker_version_data, **self.config) if pop_on_close: conn.close_future.add_both(lambda _: self._conns.pop(node.node_id, None))