diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 38c605ac0..0944d2743 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -223,7 +223,7 @@ def __init__(self, **configs): # Bootstrap on __init__ self._manager.run(self._manager.bootstrap(timeout_ms=self.config['bootstrap_timeout_ms'])) self._closed = False - self._refresh_controller_id() + self._controller_id = None log.debug("KafkaAdminClient started.") def close(self): @@ -248,32 +248,22 @@ def _validate_timeout(self, timeout_ms): """ return timeout_ms or self.config['request_timeout_ms'] - def _refresh_controller_id(self, timeout_ms=30000): + async def _refresh_controller_id(self, timeout_ms=30000): """Determine the Kafka cluster controller.""" - version = self._client.api_version(MetadataRequest, max_version=8) - if version == 0: + if self._manager.broker_version < (0, 10): raise UnrecognizedBrokerVersion( - "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}." - .format(version)) - # use defaults for allow_auto_topic_creation / include_authorized_operations in v6+ - request = MetadataRequest[version]() + "Kafka Admin Client controller requests requires broker version >= (0, 10)") + request = MetadataRequest() timeout_at = time.monotonic() + timeout_ms / 1000 while time.monotonic() < timeout_at: - response = self.send_request(request) + response = await self._manager.send(request) controller_id = response.controller_id if controller_id == -1: log.warning("Controller ID not available, got -1") - time.sleep(1) + await self._manager._net.sleep(1) continue - # verify the controller is new enough to support our requests - controller_version = self._client.check_version(node_id=controller_id) - if controller_version < (0, 10, 0): - raise IncompatibleBrokerVersion( - "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." - .format(controller_version)) - self._controller_id = controller_id - return + return controller_id else: raise Errors.NodeNotReadyError('controller') @@ -385,7 +375,7 @@ def send_requests(self, requests_and_node_ids, response_fn=lambda x: x): self._wait_for_futures(futures) return [response_fn(future.value) for future in futures] - def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), ignore_errors=(), raise_errors=True, tries=2): + async def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), raise_errors=True, ignore_errors=()): """Send a Kafka protocol message to the cluster controller. Will block until the message result is received. @@ -397,31 +387,31 @@ def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), ignor get_errors_fn (func): Function to process response and return an iterable of Error types. ignore_errors (tuple): Any non-zero error codes that should be ignored. Not used if raise_errors=False. raise_errors (bool): Whether to raise unhandled errors (True, default) or return response with errors (False). - tries (int): Number of times to refresh controller id and retry on NotControllerIdError. Returns: The Kafka protocol response for the message. """ # retry in case our controller_id is out of date - while tries: - tries -= 1 - response = self.send_request(request, node_id=self._controller_id) - for error_type in get_errors_fn(response): - if tries and error_type is Errors.NotControllerError: - # No need to inspect the rest of the errors for - # non-retriable errors because NotControllerError should - # either be thrown for all errors or no errors. - self._refresh_controller_id() - break - elif raise_errors: - if error_type is not Errors.NoError and error_type not in ignore_errors: - raise error_type( - "Request '{}' failed with response '{}'." - .format(request, response)) - else: - # No controller refresh needed - return response - raise RuntimeError("Failed to find active controller id!") + if self._controller_id is None or self._controller_id == -1: + self._controller_id = await self._refresh_controller_id() + + response = await self._manager.send(request, node_id=self._controller_id) + + # Refresh controller and retry on NotControllerError + if Errors.NotControllerError in get_errors_fn(response): + self._controller_id = await self._refresh_controller_id() + response = await self._manager.send(request, node_id=self._controller_id) + + for error_type in get_errors_fn(response): + if error_type is Errors.NoError: + continue + elif error_type is Errors.NotControllerError: + raise RuntimeError("Failed to find active controller id!") + elif raise_errors and error_type not in ignore_errors: + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + return response @staticmethod def _convert_new_topic_request(new_topic): @@ -471,8 +461,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ call :meth:`wait_for_topics` directly for finer control. Mutually exclusive with validate_only. Default: False - Returns: - Appropriate version of CreateTopicResponse class. + Returns: CreateTopicResponse """ if validate_only and wait_for_metadata: raise ValueError('validate_only and wait_for_metadata are mutually exclusive') @@ -499,7 +488,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ def get_response_errors(r): for topic in r.topics: yield Errors.for_code(topic[1]) - response = self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors) + response = self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors) if wait_for_metadata: # implies not validate_only self.wait_for_topics([new_topic.name for new_topic in new_topics]) return response @@ -601,7 +590,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): def get_response_errors(r): for response in r.responses: yield Errors.for_code(response[1]) - return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors) + return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors) def _process_acl_operations(self, obj): if obj.get('authorized_operations', None) is not None: @@ -1138,7 +1127,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal def get_response_errors(r): for result in r.results: yield Errors.for_code(result[1]) - return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors) + return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors) def _get_leader_for_partitions(self, partitions, timeout_ms=None): """Finds ID of the leader node for every given topic partition. @@ -1685,10 +1674,7 @@ def get_response_errors(r): for partition in result[1]: yield Errors.for_code(partition[1]) ignore_errors = (Errors.ElectionNotNeededError,) - return self._send_request_to_controller(request, - get_errors_fn=get_response_errors, - ignore_errors=ignore_errors, - raise_errors=raise_errors) + return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors, ignore_errors) def describe_log_dirs(self): """Send a DescribeLogDirsRequest request to a broker.