diff --git a/kafka/net/compat.py b/kafka/net/compat.py index 7fdf24cad..aaef65c54 100644 --- a/kafka/net/compat.py +++ b/kafka/net/compat.py @@ -129,8 +129,15 @@ def send_and_receive(self, node_id, request, timeout_ms=30000): def poll(self, timeout_ms=None, future=None): with self._lock: - if self._manager.cluster.need_update: + metadata_ttl = self._manager.cluster.ttl() + if metadata_ttl == 0: self._manager.update_metadata() + elif self._manager.cluster.need_update: + # A refresh is pending but retry_backoff hasn't elapsed yet. + # Cap timeout_ms so select() returns in time for us to come + # back and re-check ttl. + if timeout_ms is None or metadata_ttl < timeout_ms: + timeout_ms = metadata_ttl return self._manager.poll(timeout_ms=timeout_ms, future=future) def close(self, node_id=None):