diff --git a/kafka/errors.py b/kafka/errors.py index 09691d536..992b0d21d 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -79,6 +79,10 @@ class NodeNotReadyError(KafkaError): retriable = True +class UnknownBrokerIdError(KafkaError): + pass + + class QuotaViolationError(KafkaError): pass diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 571a6ca19..48d8f4262 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -210,14 +210,14 @@ async def _connect(self, node, conn): def get_connection(self, node_id, pop_on_close=True, refresh_metadata_on_err=True): if node_id is None: - raise Errors.NodeNotReadyError() + raise Errors.NodeNotReadyError('No node_id provided') elif self.connection_delay(node_id) > 0: raise Errors.NodeNotReadyError(node_id) elif node_id in self._conns: return self._conns[node_id] node = self.cluster.broker_metadata(node_id) if node is None: - raise Errors.NodeNotReadyError(node_id) + raise Errors.UnknownBrokerIdError(node_id) conn = KafkaConnection(self._net, node_id=node_id, broker_version_data=self.broker_version_data, **self.config) if pop_on_close: conn.close_future.add_both(lambda _: self._conns.pop(node.node_id, None))