From 92f0d183b2ac679a65093493244eba42ab33cbf1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 12 Apr 2026 09:43:33 -0700 Subject: [PATCH 1/2] Admin: send_request -> manager.run --- kafka/admin/client.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 38c605ac0..c3db27e8c 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -368,12 +368,7 @@ def _wait_for_futures(self, futures): raise future.exception # pylint: disable-msg=raising-bad-type def send_request(self, request, node_id=None): - if node_id is None: - node_id = self._client.least_loaded_node(bootstrap_fallback=True) - self._client.await_ready(node_id) - future = self._client.send(node_id, request) - self._wait_for_futures([future]) # raises exception on failure - return future.value + return self._manager.run(self._manager.send(request, node_id=node_id)) def send_requests(self, requests_and_node_ids, response_fn=lambda x: x): futures = [] From 82f121c74a50d188c951c32aa00c43599a6416f2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 12 Apr 2026 10:32:28 -0700 Subject: [PATCH 2/2] Admin: use manager to send requests --- kafka/admin/client.py | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index c3db27e8c..fef0435e5 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -331,25 +331,17 @@ def _find_coordinator_ids(self, group_ids): coordinator_ids = self.send_requests(requests, response_fn=self._find_coordinator_id_process_response) return dict(zip(group_ids, coordinator_ids)) - def _send_request_to_node(self, node_id, request, wakeup=True): + def _send_request_to_node(self, node_id, request): """Send a Kafka protocol message to a specific broker. Arguments: node_id: The broker id to which to send the message. request: The message to send. - - Keyword Arguments: - wakeup (bool, optional): Optional flag to disable thread-wakeup. - Returns: A future object that may be polled for status and results. """ - try: - self._client.await_ready(node_id) - except Errors.KafkaConnectionError as e: - return Future().failure(e) - return self._client.send(node_id, request, wakeup) + return self._manager.send(request, node_id=node_id) def _wait_for_futures(self, futures): """Block until all futures complete. If any fail, raise the encountered exception. @@ -360,12 +352,13 @@ def _wait_for_futures(self, futures): Raises: The first encountered exception if a future fails. """ - while not all(future.succeeded() for future in futures): - for future in futures: - self._client.poll(future=future) - - if future.failed(): - raise future.exception # pylint: disable-msg=raising-bad-type + failed = None + for future in futures: + self._manager.poll(future=future) + if failed is None and future.failed(): + failed = future + if failed: + raise failed.exception # pylint: disable-msg=raising-bad-type def send_request(self, request, node_id=None): return self._manager.run(self._manager.send(request, node_id=node_id)) @@ -373,10 +366,7 @@ def send_request(self, request, node_id=None): def send_requests(self, requests_and_node_ids, response_fn=lambda x: x): futures = [] for request, node_id in requests_and_node_ids: - if node_id is None: - node_id = self._client.least_loaded_node(bootstrap_fallback=True) - self._client.await_ready(node_id) - futures.append(self._client.send(node_id, request)) + futures.append(self._manager.send(request, node_id=node_id)) self._wait_for_futures(futures) return [response_fn(future.value) for future in futures]