From a7e3e8ff5bf6313cbc1b1a7058436ecad2dc9a90 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 29 Apr 2026 15:36:10 -0700 Subject: [PATCH] kafka.net: Support get_connection timeout, use for check_version --- kafka/net/compat.py | 6 +++--- kafka/net/manager.py | 9 +++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/kafka/net/compat.py b/kafka/net/compat.py index ac5d00883..cc959d3fe 100644 --- a/kafka/net/compat.py +++ b/kafka/net/compat.py @@ -103,10 +103,10 @@ def check_version(self, node_id=None, timeout_ms=10000): self._manager.bootstrap(timeout_ms) if node_id is None: return self._manager.broker_version - async def _check_version(broker_id): - conn = await self._manager.get_connection(broker_id) + async def _check_version(broker_id, timeout_ms): + conn = await self._manager.get_connection(broker_id, timeout_ms=timeout_ms) return conn.broker_version - return self._manager.run(_check_version, node_id) + return self._manager.run(_check_version, node_id, timeout_ms) # Request sending diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 61a08afaf..138744d38 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -239,7 +239,10 @@ async def _connect(self, node, conn, reset_backoff_on_connect=True): if self.cluster.is_bootstrap(node.node_id): self.broker_version_data = conn.broker_version_data - def get_connection(self, node_id, pop_on_close=True, refresh_metadata_on_err=True, reset_backoff_on_connect=True): + def get_connection(self, node_id, timeout_ms=None, + pop_on_close=True, + refresh_metadata_on_err=True, + reset_backoff_on_connect=True): if node_id is None: raise Errors.NodeNotReadyError('No node_id provided') elif self.connection_delay(node_id) > 0: @@ -258,7 +261,9 @@ def get_connection(self, node_id, pop_on_close=True, refresh_metadata_on_err=Tru conn.close_future.add_errback(lambda _: self.cluster.request_update()) self._conns[node_id] = conn self._net.call_soon(lambda: self._connect(node, conn, reset_backoff_on_connect=reset_backoff_on_connect)) - self._net.call_later(self.config['socket_connection_timeout_ms'] / 1000, + if timeout_ms is None: + timeout_ms = self.config['socket_connection_timeout_ms'] + self._net.call_later(timeout_ms / 1000, lambda: conn.close(Errors.KafkaConnectionError('Connection timed out')) if not conn.init_future.is_done else None) return conn