diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 12eee44bf..fcea90a64 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -225,6 +225,7 @@ def __init__(self, **configs): self._manager.run(self._manager.bootstrap(timeout_ms=self.config['bootstrap_timeout_ms'])) self._closed = False self._controller_id = None + self._coordinator_cache = {} # {group_id: node_id} log.debug("KafkaAdminClient started.") def close(self): @@ -268,60 +269,34 @@ async def _refresh_controller_id(self, timeout_ms=30000): else: raise Errors.NodeNotReadyError('controller') - def _find_coordinator_id_request(self, group_id): - """Send a FindCoordinatorRequest to a broker. + async def _find_coordinator_id(self, group_id): + """Find the broker node_id of the coordinator for a consumer group. - Arguments: - group_id: The consumer group ID. This is typically the group - name as a string. - - Returns: - FindCoordinatorRequest - """ - version = self._client.api_version(FindCoordinatorRequest, max_version=2) - if version <= 0: - request = FindCoordinatorRequest[version](group_id) - elif version <= 2: - request = FindCoordinatorRequest[version](group_id, 0) - return request # pylint: disable=E0606 - - def _find_coordinator_id_process_response(self, response): - """Process a FindCoordinatorResponse. + Results are cached; subsequent calls for the same group_id return + the cached value without a network round-trip. Arguments: - response: a FindCoordinatorResponse. + group_id (str): The consumer group ID. Returns: - The node_id of the broker that is the coordinator. + int: The node_id of the group's coordinator broker. + + Raises: + Errors from the FindCoordinatorResponse (e.g., CoordinatorNotAvailableError). """ + cached = self._coordinator_cache.get(group_id) + if cached is not None: + return cached + request = FindCoordinatorRequest(group_id, 0, max_version=2) # key_type=0 for group + response = await self._manager.send(request) error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: - # Note: When error_type.retriable, Java will retry... see - # KafkaAdminClient's handleFindCoordinatorError method raise error_type( "FindCoordinatorRequest failed with response '{}'." .format(response)) + self._coordinator_cache[group_id] = response.node_id return response.node_id - def _find_coordinator_ids(self, group_ids): - """Find the broker node_ids of the coordinators of the given groups. - - Sends a FindCoordinatorRequest message to the cluster for each group_id. - Will block until the FindCoordinatorResponse is received for all groups. - Any errors are immediately raised. - - Arguments: - group_ids: A list of consumer group IDs. This is typically the group - name as a string. - - Returns: - A dict of {group_id: node_id} where node_id is the id of the - broker that is the coordinator for the corresponding group. - """ - requests = [(self._find_coordinator_id_request(group_id), None) for group_id in group_ids] - coordinator_ids = self.send_requests(requests, response_fn=self._find_coordinator_id_process_response) - return dict(zip(group_ids, coordinator_ids)) - def _send_request_to_node(self, node_id, request): """Send a Kafka protocol message to a specific broker. @@ -1337,6 +1312,15 @@ def _describe_consumer_groups_process_response(self, response): return group_description assert False, "DescribeGroupsResponse parsing failed" + async def _async_describe_consumer_groups(self, group_ids, group_coordinator_id=None): + results = [] + for group_id in group_ids: + coordinator_id = group_coordinator_id or await self._find_coordinator_id(group_id) + request = self._describe_consumer_groups_request(group_id) + response = await self._manager.send(request, node_id=coordinator_id) + results.append(self._describe_consumer_groups_process_response(response)) + return results + def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False): """Describe a set of consumer groups. @@ -1360,16 +1344,7 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include plan to change this to return namedtuples as well as decoding the partition assignments. """ - if group_coordinator_id is not None: - groups_coordinators = {group_id: group_coordinator_id for group_id in group_ids} - else: - groups_coordinators = self._find_coordinator_ids(group_ids) - - requests = [ - (self._describe_consumer_groups_request(group_id), coordinator_id) - for group_id, coordinator_id in groups_coordinators.items() - ] - return self.send_requests(requests, response_fn=self._describe_consumer_groups_process_response) + return self._manager.run(self._async_describe_consumer_groups, group_ids, group_coordinator_id) def _list_consumer_groups_request(self): """Send a ListGroupsRequest to a broker. @@ -1394,6 +1369,18 @@ def _list_consumer_groups_process_response(self, response): .format(response.API_VERSION)) return [(group.group_id, group.protocol_type) for group in response.groups] + async def _async_list_consumer_groups(self, broker_ids=None): + if broker_ids is None: + broker_ids = [broker.node_id for broker in self._manager.cluster.brokers()] + # Use a set to prevent duplicates if a group coordinator migrates + # between brokers mid-query. + consumer_groups = set() + for broker_id in broker_ids: + request = self._list_consumer_groups_request() + response = await self._manager.send(request, node_id=broker_id) + consumer_groups.update(self._list_consumer_groups_process_response(response)) + return list(consumer_groups) + def list_consumer_groups(self, broker_ids=None): """List all consumer groups known to the cluster. @@ -1424,18 +1411,7 @@ def list_consumer_groups(self, broker_ids=None): CoordinatorLoadInProgressError: The coordinator is loading and hence can't process requests. """ - # While we return a list, internally use a set to prevent duplicates - # because if a group coordinator fails after being queried, and its - # consumer groups move to new brokers that haven't yet been queried, - # then the same group could be returned by multiple brokers. - if broker_ids is None: - broker_ids = [broker.node_id for broker in self._client.cluster.brokers()] - requests = [ - (self._list_consumer_groups_request(), broker_id) - for broker_id in broker_ids - ] - consumer_groups = self.send_requests(requests, response_fn=self._list_consumer_groups_process_response) - return list(set().union(*consumer_groups)) + return self._manager.run(self._async_list_consumer_groups, broker_ids) def _list_consumer_group_offsets_request(self, group_id, partitions=None): """Send an OffsetFetchRequest to a broker. @@ -1511,6 +1487,13 @@ def _list_consumer_group_offsets_process_response(self, response): .format(response.API_VERSION)) return offsets + async def _async_list_consumer_group_offsets(self, group_id, group_coordinator_id=None, partitions=None): + if group_coordinator_id is None: + group_coordinator_id = await self._find_coordinator_id(group_id) + request = self._list_consumer_group_offsets_request(group_id, partitions) + response = await self._manager.send(request, node_id=group_coordinator_id) + return self._list_consumer_group_offsets_process_response(response) + def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, partitions=None): """Fetch Consumer Offsets for a single consumer group. @@ -1542,11 +1525,23 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, TopicPartition. A `-1` can only happen for partitions that are explicitly specified. """ - if group_coordinator_id is None: - group_coordinator_id = self._find_coordinator_ids([group_id])[group_id] - request = self._list_consumer_group_offsets_request(group_id, partitions) - response = self.send_request(request, node_id=group_coordinator_id) - return self._list_consumer_group_offsets_process_response(response) + return self._manager.run(self._async_list_consumer_group_offsets, group_id, group_coordinator_id, partitions) + + async def _async_delete_consumer_groups(self, group_ids, group_coordinator_id=None): + coordinators_groups = defaultdict(list) + if group_coordinator_id is not None: + coordinators_groups[group_coordinator_id] = group_ids + else: + for group_id in group_ids: + coordinator_id = await self._find_coordinator_id(group_id) + coordinators_groups[coordinator_id].append(group_id) + + results = [] + for coordinator_id, coordinator_group_ids in coordinators_groups.items(): + request = self._delete_consumer_groups_request(coordinator_group_ids) + response = await self._manager.send(request, node_id=coordinator_id) + results.extend(self._convert_delete_groups_response(response)) + return results def delete_consumer_groups(self, group_ids, group_coordinator_id=None): """Delete Consumer Group Offsets for given consumer groups. @@ -1571,20 +1566,7 @@ def delete_consumer_groups(self, group_ids, group_coordinator_id=None): Returns: A list of tuples (group_id, KafkaError) """ - coordinators_groups = defaultdict(list) - if group_coordinator_id is not None: - coordinators_groups[group_coordinator_id] = group_ids - else: - for group_id, coordinator_id in self._find_coordinator_ids(group_ids).items(): - coordinators_groups[coordinator_id].append(group_id) - - requests = [ - (self._delete_consumer_groups_request(group_ids), coordinator_id) - for coordinator_id, group_ids in coordinators_groups.items() - ] - - results = self.send_requests(requests, response_fn=self._convert_delete_groups_response) - return list(itertools.chain(*results)) + return self._manager.run(self._async_delete_consumer_groups, group_ids, group_coordinator_id) def _convert_delete_groups_response(self, response): """Parse the DeleteGroupsResponse, mapping group IDs to their respective errors.