Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 64 additions & 82 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def __init__(self, **configs):
self._manager.run(self._manager.bootstrap(timeout_ms=self.config['bootstrap_timeout_ms']))
self._closed = False
self._controller_id = None
self._coordinator_cache = {} # {group_id: node_id}
log.debug("KafkaAdminClient started.")

def close(self):
Expand Down Expand Up @@ -268,60 +269,34 @@ async def _refresh_controller_id(self, timeout_ms=30000):
else:
raise Errors.NodeNotReadyError('controller')

def _find_coordinator_id_request(self, group_id):
"""Send a FindCoordinatorRequest to a broker.
async def _find_coordinator_id(self, group_id):
"""Find the broker node_id of the coordinator for a consumer group.

Arguments:
group_id: The consumer group ID. This is typically the group
name as a string.

Returns:
FindCoordinatorRequest
"""
version = self._client.api_version(FindCoordinatorRequest, max_version=2)
if version <= 0:
request = FindCoordinatorRequest[version](group_id)
elif version <= 2:
request = FindCoordinatorRequest[version](group_id, 0)
return request # pylint: disable=E0606

def _find_coordinator_id_process_response(self, response):
"""Process a FindCoordinatorResponse.
Results are cached; subsequent calls for the same group_id return
the cached value without a network round-trip.

Arguments:
response: a FindCoordinatorResponse.
group_id (str): The consumer group ID.

Returns:
The node_id of the broker that is the coordinator.
int: The node_id of the group's coordinator broker.

Raises:
Errors from the FindCoordinatorResponse (e.g., CoordinatorNotAvailableError).
"""
cached = self._coordinator_cache.get(group_id)
if cached is not None:
return cached
request = FindCoordinatorRequest(group_id, 0, max_version=2) # key_type=0 for group
response = await self._manager.send(request)
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
# Note: When error_type.retriable, Java will retry... see
# KafkaAdminClient's handleFindCoordinatorError method
raise error_type(
"FindCoordinatorRequest failed with response '{}'."
.format(response))
self._coordinator_cache[group_id] = response.node_id
return response.node_id

def _find_coordinator_ids(self, group_ids):
"""Find the broker node_ids of the coordinators of the given groups.

Sends a FindCoordinatorRequest message to the cluster for each group_id.
Will block until the FindCoordinatorResponse is received for all groups.
Any errors are immediately raised.

Arguments:
group_ids: A list of consumer group IDs. This is typically the group
name as a string.

Returns:
A dict of {group_id: node_id} where node_id is the id of the
broker that is the coordinator for the corresponding group.
"""
requests = [(self._find_coordinator_id_request(group_id), None) for group_id in group_ids]
coordinator_ids = self.send_requests(requests, response_fn=self._find_coordinator_id_process_response)
return dict(zip(group_ids, coordinator_ids))

def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.

Expand Down Expand Up @@ -1337,6 +1312,15 @@ def _describe_consumer_groups_process_response(self, response):
return group_description
assert False, "DescribeGroupsResponse parsing failed"

async def _async_describe_consumer_groups(self, group_ids, group_coordinator_id=None):
results = []
for group_id in group_ids:
coordinator_id = group_coordinator_id or await self._find_coordinator_id(group_id)
request = self._describe_consumer_groups_request(group_id)
response = await self._manager.send(request, node_id=coordinator_id)
results.append(self._describe_consumer_groups_process_response(response))
return results

def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False):
"""Describe a set of consumer groups.

Expand All @@ -1360,16 +1344,7 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include
plan to change this to return namedtuples as well as decoding the
partition assignments.
"""
if group_coordinator_id is not None:
groups_coordinators = {group_id: group_coordinator_id for group_id in group_ids}
else:
groups_coordinators = self._find_coordinator_ids(group_ids)

requests = [
(self._describe_consumer_groups_request(group_id), coordinator_id)
for group_id, coordinator_id in groups_coordinators.items()
]
return self.send_requests(requests, response_fn=self._describe_consumer_groups_process_response)
return self._manager.run(self._async_describe_consumer_groups, group_ids, group_coordinator_id)

def _list_consumer_groups_request(self):
"""Send a ListGroupsRequest to a broker.
Expand All @@ -1394,6 +1369,18 @@ def _list_consumer_groups_process_response(self, response):
.format(response.API_VERSION))
return [(group.group_id, group.protocol_type) for group in response.groups]

async def _async_list_consumer_groups(self, broker_ids=None):
if broker_ids is None:
broker_ids = [broker.node_id for broker in self._manager.cluster.brokers()]
# Use a set to prevent duplicates if a group coordinator migrates
# between brokers mid-query.
consumer_groups = set()
for broker_id in broker_ids:
request = self._list_consumer_groups_request()
response = await self._manager.send(request, node_id=broker_id)
consumer_groups.update(self._list_consumer_groups_process_response(response))
return list(consumer_groups)

def list_consumer_groups(self, broker_ids=None):
"""List all consumer groups known to the cluster.

Expand Down Expand Up @@ -1424,18 +1411,7 @@ def list_consumer_groups(self, broker_ids=None):
CoordinatorLoadInProgressError: The coordinator is loading and
hence can't process requests.
"""
# While we return a list, internally use a set to prevent duplicates
# because if a group coordinator fails after being queried, and its
# consumer groups move to new brokers that haven't yet been queried,
# then the same group could be returned by multiple brokers.
if broker_ids is None:
broker_ids = [broker.node_id for broker in self._client.cluster.brokers()]
requests = [
(self._list_consumer_groups_request(), broker_id)
for broker_id in broker_ids
]
consumer_groups = self.send_requests(requests, response_fn=self._list_consumer_groups_process_response)
return list(set().union(*consumer_groups))
return self._manager.run(self._async_list_consumer_groups, broker_ids)

def _list_consumer_group_offsets_request(self, group_id, partitions=None):
"""Send an OffsetFetchRequest to a broker.
Expand Down Expand Up @@ -1511,6 +1487,13 @@ def _list_consumer_group_offsets_process_response(self, response):
.format(response.API_VERSION))
return offsets

async def _async_list_consumer_group_offsets(self, group_id, group_coordinator_id=None, partitions=None):
if group_coordinator_id is None:
group_coordinator_id = await self._find_coordinator_id(group_id)
request = self._list_consumer_group_offsets_request(group_id, partitions)
response = await self._manager.send(request, node_id=group_coordinator_id)
return self._list_consumer_group_offsets_process_response(response)

def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
partitions=None):
"""Fetch Consumer Offsets for a single consumer group.
Expand Down Expand Up @@ -1542,11 +1525,23 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
TopicPartition. A `-1` can only happen for partitions that are
explicitly specified.
"""
if group_coordinator_id is None:
group_coordinator_id = self._find_coordinator_ids([group_id])[group_id]
request = self._list_consumer_group_offsets_request(group_id, partitions)
response = self.send_request(request, node_id=group_coordinator_id)
return self._list_consumer_group_offsets_process_response(response)
return self._manager.run(self._async_list_consumer_group_offsets, group_id, group_coordinator_id, partitions)

async def _async_delete_consumer_groups(self, group_ids, group_coordinator_id=None):
coordinators_groups = defaultdict(list)
if group_coordinator_id is not None:
coordinators_groups[group_coordinator_id] = group_ids
else:
for group_id in group_ids:
coordinator_id = await self._find_coordinator_id(group_id)
coordinators_groups[coordinator_id].append(group_id)

results = []
for coordinator_id, coordinator_group_ids in coordinators_groups.items():
request = self._delete_consumer_groups_request(coordinator_group_ids)
response = await self._manager.send(request, node_id=coordinator_id)
results.extend(self._convert_delete_groups_response(response))
return results

def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
"""Delete Consumer Group Offsets for given consumer groups.
Expand All @@ -1571,20 +1566,7 @@ def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
Returns:
A list of tuples (group_id, KafkaError)
"""
coordinators_groups = defaultdict(list)
if group_coordinator_id is not None:
coordinators_groups[group_coordinator_id] = group_ids
else:
for group_id, coordinator_id in self._find_coordinator_ids(group_ids).items():
coordinators_groups[coordinator_id].append(group_id)

requests = [
(self._delete_consumer_groups_request(group_ids), coordinator_id)
for coordinator_id, group_ids in coordinators_groups.items()
]

results = self.send_requests(requests, response_fn=self._convert_delete_groups_response)
return list(itertools.chain(*results))
return self._manager.run(self._async_delete_consumer_groups, group_ids, group_coordinator_id)

def _convert_delete_groups_response(self, response):
"""Parse the DeleteGroupsResponse, mapping group IDs to their respective errors.
Expand Down
Loading