diff --git a/kafka/admin/client.py b/kafka/admin/client.py index fcea90a64..329db3f26 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -297,45 +297,6 @@ async def _find_coordinator_id(self, group_id): self._coordinator_cache[group_id] = response.node_id return response.node_id - def _send_request_to_node(self, node_id, request): - """Send a Kafka protocol message to a specific broker. - - Arguments: - node_id: The broker id to which to send the message. - request: The message to send. - - Returns: - A future object that may be polled for status and results. - """ - return self._manager.send(request, node_id=node_id) - - def _wait_for_futures(self, futures): - """Block until all futures complete. If any fail, raise the encountered exception. - - Arguments: - futures: A list of Future objects awaiting results. - - Raises: - The first encountered exception if a future fails. - """ - failed = None - for future in futures: - self._manager.poll(future=future) - if failed is None and future.failed(): - failed = future - if failed: - raise failed.exception # pylint: disable-msg=raising-bad-type - - def send_request(self, request, node_id=None): - return self._manager.run(self._manager.send(request, node_id=node_id)) - - def send_requests(self, requests_and_node_ids, response_fn=lambda x: x): - futures = [] - for request, node_id in requests_and_node_ids: - futures.append(self._manager.send(request, node_id=node_id)) - self._wait_for_futures(futures) - return [response_fn(future.value) for future in futures] - async def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), raise_errors=True, ignore_errors=()): """Send a Kafka protocol message to the cluster controller. @@ -693,7 +654,7 @@ def describe_acls(self, acl_filter): operation=acl_filter.operation, permission_type=acl_filter.permission_type ) - response = self.send_request(request) # pylint: disable=E0606 + response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: # optionally we could retry if error_type.retriable @@ -806,7 +767,7 @@ def create_acls(self, acls): request = CreateAclsRequest[version]( creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls] ) - response = self.send_request(request) # pylint: disable=E0606 + response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 return self._convert_create_acls_response_to_acls(acls, response) @staticmethod @@ -920,7 +881,7 @@ def delete_acls(self, acl_filters): request = DeleteAclsRequest[version]( filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters] ) - response = self.send_request(request) # pylint: disable=E0606 + response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 return self._convert_delete_acls_response_to_matching_acls(acl_filters, response) @staticmethod @@ -941,23 +902,7 @@ def _convert_describe_config_resource_request(config_resource): ] if config_resource.configs else None ) - def describe_configs(self, config_resources, include_synonyms=False): - """Fetch configuration parameters for one or more Kafka resources. - - Arguments: - config_resources: An list of ConfigResource objects. - Any keys in ConfigResource.configs dict will be used to filter the - result. Setting the configs dict to None will get all values. An - empty dict will get zero values (as per Kafka protocol). - - Keyword Arguments: - include_synonyms (bool, optional): If True, return synonyms in response. Not - supported by all versions. Default: False. - - Returns: - List of DescribeConfigsResponses. - """ - + async def _async_describe_configs(self, config_resources, include_synonyms=False): # Break up requests by type - a broker config request must be sent to the specific broker. # All other (currently just topic resources) can be sent to any broker. broker_resources = [] @@ -975,30 +920,46 @@ def describe_configs(self, config_resources, include_synonyms=False): "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}." .format(self._manager.broker_version)) - requests = [] - if len(broker_resources) > 0: - for broker_resource in broker_resources: - try: - broker_id = int(broker_resource[1]) - except ValueError: - raise ValueError("Broker resource names must be an integer or a string represented integer") - - if version == 0: - request = DescribeConfigsRequest[version](resources=[broker_resource]) - else: - request = DescribeConfigsRequest[version]( - resources=[broker_resource], - include_synonyms=include_synonyms) - requests.append((request, broker_id)) + results = [] + for broker_resource in broker_resources: + try: + broker_id = int(broker_resource[1]) + except ValueError: + raise ValueError("Broker resource names must be an integer or a string represented integer") + if version == 0: + request = DescribeConfigsRequest[version](resources=[broker_resource]) + else: + request = DescribeConfigsRequest[version]( + resources=[broker_resource], + include_synonyms=include_synonyms) + results.append(await self._manager.send(request, node_id=broker_id)) - if len(topic_resources) > 0: + if topic_resources: if version == 0: request = DescribeConfigsRequest[version](resources=topic_resources) else: request = DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms) - requests.append((request, None)) + results.append(await self._manager.send(request)) + + return results - return self.send_requests(requests) + def describe_configs(self, config_resources, include_synonyms=False): + """Fetch configuration parameters for one or more Kafka resources. + + Arguments: + config_resources: An list of ConfigResource objects. + Any keys in ConfigResource.configs dict will be used to filter the + result. Setting the configs dict to None will get all values. An + empty dict will get zero values (as per Kafka protocol). + + Keyword Arguments: + include_synonyms (bool, optional): If True, return synonyms in response. Not + supported by all versions. Default: False. + + Returns: + List of DescribeConfigsResponses. + """ + return self._manager.run(self._async_describe_configs, config_resources, include_synonyms) @staticmethod def _convert_alter_config_resource_request(config_resource): @@ -1018,6 +979,19 @@ def _convert_alter_config_resource_request(config_resource): ] ) + async def _async_alter_configs(self, config_resources): + version = self._client.api_version(AlterConfigsRequest, max_version=1) + request = AlterConfigsRequest[version]( + resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] + ) + # TODO the Java client has the note: + # // We must make a separate AlterConfigs request for every BROKER resource we want to alter + # // and send the request to that specific broker. Other resources are grouped together into + # // a single request that may be sent to any broker. + # + # So this is currently broken as it always sends to the least_loaded_node() + return await self._manager.send(request) + def alter_configs(self, config_resources): """Alter configuration parameters of one or more Kafka resources. @@ -1033,17 +1007,7 @@ def alter_configs(self, config_resources): Returns: Appropriate version of AlterConfigsResponse class. """ - version = self._client.api_version(AlterConfigsRequest, max_version=1) - request = AlterConfigsRequest[version]( - resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] - ) - # TODO the Java client has the note: - # // We must make a separate AlterConfigs request for every BROKER resource we want to alter - # // and send the request to that specific broker. Other resources are grouped together into - # // a single request that may be sent to any broker. - # - # So this is currently broken as it always sends to the least_loaded_node() - return self.send_request(request) + return self._manager.run(self._async_alter_configs, config_resources) # alter replica logs dir protocol not yet implemented # Note: have to lookup the broker with the replica assignment and send the request to that broker @@ -1102,27 +1066,12 @@ def response_errors(r): yield Errors.for_code(result.error_code) return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) - def _get_leader_for_partitions(self, partitions, timeout_ms=None): - """Finds ID of the leader node for every given topic partition. - - Will raise UnknownTopicOrPartitionError if for some partition no leader can be found. - - Arguments: - partitions ([TopicPartition]): partitions for which to find leaders. - - Keyword Arguments: - timeout_ms (numeric, optional): Timeout in milliseconds, if None (default), will be read from - config. - - Returns: - dict with ``{leader_id -> {partitions}}`` - """ - timeout_ms = self._validate_timeout(timeout_ms) - + async def _async_get_leader_for_partitions(self, partitions): + """Finds ID of the leader node for every given topic partition.""" partitions = set(partitions) topics = set(tp.topic for tp in partitions) - metadata = self._manager.run(self._get_cluster_metadata, topics) + metadata = await self._get_cluster_metadata(topics) leader2partitions = defaultdict(list) valid_partitions = set() @@ -1142,40 +1091,16 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): return leader2partitions - def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): - """Delete records whose offset is smaller than the given offset of the corresponding partition. - - Arguments: - records_to_delete ({TopicPartition: int}): The earliest available offsets for the - given partitions. - - Keyword Arguments: - timeout_ms (numeric, optional): Timeout in milliseconds, if None (default), will be read from - config. - partition_leader_id (node_id / int, optional): If specified, all deletion requests will be sent to - this node. No check is performed verifying that this is indeed the leader for all - listed partitions: use with caution. - - Returns: - dict {topicPartition -> metadata}, where metadata is returned by the broker. - See DeleteRecordsResponse for possible fields. error_code for all partitions is - guaranteed to be zero, otherwise an exception is raised. - """ + async def _async_delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): timeout_ms = self._validate_timeout(timeout_ms) - responses = [] version = self._client.api_version(DeleteRecordsRequest, max_version=0) - # We want to make as few requests as possible - # If a single node serves as a partition leader for multiple partitions (and/or - # topics), we can send all of those in a single request. - # For that we store {leader -> {partitions for leader}}, and do 1 request per leader if partition_leader_id is None: - leader2partitions = self._get_leader_for_partitions( - set(records_to_delete), timeout_ms - ) + leader2partitions = await self._async_get_leader_for_partitions(set(records_to_delete)) else: leader2partitions = {partition_leader_id: set(records_to_delete)} + responses = [] for leader, partitions in leader2partitions.items(): topic2partitions = defaultdict(list) for partition in partitions: @@ -1188,7 +1113,7 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id ], timeout_ms=timeout_ms ) - response = self.send_request(request, node_id=leader) + response = await self._manager.send(request, node_id=leader) responses.append(response.to_dict()) partition2result = {} @@ -1219,6 +1144,27 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id return partition2result + def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): + """Delete records whose offset is smaller than the given offset of the corresponding partition. + + Arguments: + records_to_delete ({TopicPartition: int}): The earliest available offsets for the + given partitions. + + Keyword Arguments: + timeout_ms (numeric, optional): Timeout in milliseconds, if None (default), will be read from + config. + partition_leader_id (node_id / int, optional): If specified, all deletion requests will be sent to + this node. No check is performed verifying that this is indeed the leader for all + listed partitions: use with caution. + + Returns: + dict {topicPartition -> metadata}, where metadata is returned by the broker. + See DeleteRecordsResponse for possible fields. error_code for all partitions is + guaranteed to be zero, otherwise an exception is raised. + """ + return self._manager.run(self._async_delete_records, records_to_delete, timeout_ms, partition_leader_id) + # create delegation token protocol not yet implemented # Note: send the request to the least_loaded_node() @@ -1653,11 +1599,14 @@ def response_errors(r): ignore_errors = (Errors.ElectionNotNeededError,) return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors) + async def _async_describe_log_dirs(self): + version = self._client.api_version(DescribeLogDirsRequest, max_version=0) + return await self._manager.send(DescribeLogDirsRequest[version]()) + def describe_log_dirs(self): """Send a DescribeLogDirsRequest request to a broker. Returns: DescribeLogDirsResponse object """ - version = self._client.api_version(DescribeLogDirsRequest, max_version=0) - return self.send_request(DescribeLogDirsRequest[version]()) + return self._manager.run(self._async_describe_log_dirs)