From 367bb570081e76a6c42b7a3c9af95c7f01e912df Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 10 Apr 2026 22:21:47 -0700 Subject: [PATCH 1/2] Respect metadata backoff in KafkaNetClient --- kafka/net/compat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/net/compat.py b/kafka/net/compat.py index 7fdf24cad..a56bfd841 100644 --- a/kafka/net/compat.py +++ b/kafka/net/compat.py @@ -129,7 +129,7 @@ def send_and_receive(self, node_id, request, timeout_ms=30000): def poll(self, timeout_ms=None, future=None): with self._lock: - if self._manager.cluster.need_update: + if self._manager.cluster.ttl() == 0: self._manager.update_metadata() return self._manager.poll(timeout_ms=timeout_ms, future=future) From f594e81320e99594b39067363108970a54c3cf12 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 11 Apr 2026 11:29:05 -0700 Subject: [PATCH 2/2] Clamp poll timeout w/ metadata_ttl --- kafka/net/compat.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/kafka/net/compat.py b/kafka/net/compat.py index a56bfd841..aaef65c54 100644 --- a/kafka/net/compat.py +++ b/kafka/net/compat.py @@ -129,8 +129,15 @@ def send_and_receive(self, node_id, request, timeout_ms=30000): def poll(self, timeout_ms=None, future=None): with self._lock: - if self._manager.cluster.ttl() == 0: + metadata_ttl = self._manager.cluster.ttl() + if metadata_ttl == 0: self._manager.update_metadata() + elif self._manager.cluster.need_update: + # A refresh is pending but retry_backoff hasn't elapsed yet. + # Cap timeout_ms so select() returns in time for us to come + # back and re-check ttl. + if timeout_ms is None or metadata_ttl < timeout_ms: + timeout_ms = metadata_ttl return self._manager.poll(timeout_ms=timeout_ms, future=future) def close(self, node_id=None):