diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index 1356c33fb..e7937d8cd 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -59,17 +59,25 @@ def _describe_groups_process_response(self, response): log.warn(f'Unable to decode member_assignment for {group}/{member.member_id}') pass # Return dict (key, val) tuples - return [(group.group_id, self._process_acl_operations(group.to_dict())) for group in response.groups] + results = {} + for group in response.groups: + group_id = group.group_id + result = self._process_acl_operations(group.to_dict()) + error_code = result.pop('error_code') + error_message = result.pop('error_message', '') # v6+ + result['error'] = str(Errors.for_code(error_code)(error_message)) if error_code else None + results[group_id] = result + return results async def _async_describe_groups(self, group_ids, group_coordinator_id=None): - results = [] + results = {} for group_id in group_ids: coordinator_id = group_coordinator_id or await self._find_coordinator_id(group_id) request = self._describe_groups_request(group_id) response = await self._manager.send(request, node_id=coordinator_id) - results.append(self._describe_groups_process_response(response)) + results.update(self._describe_groups_process_response(response)) # Combine key/vals from multiple requests into single dict - return dict(itertools.chain(*results)) + return results def describe_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False): """Describe a set of consumer groups. @@ -175,14 +183,20 @@ def _list_group_offsets_process_response(self, response): raise error_type( "OffsetFetchResponse failed with response '{}'." .format(response)) - def _partitions_to_dict(partitions): - d = {} - for p in partitions: - d[p.partition_index] = p.to_dict() - d[p.partition_index].pop('partition_index') - return d - return {topic.name: _partitions_to_dict(topic.partitions) - for topic in response.topics} + results = {} + for topic in response.topics: + for partition in topic.partitions: + tp = TopicPartition(topic.name, partition.partition_index) + error_type = Errors.for_code(partition.error_code) + if error_type is not Errors.NoError: + raise error_type( + f"OffsetFetchResponse failed for partition {tp.partition}") + results[tp] = OffsetAndMetadata( + offset=partition.committed_offset, + metadata=partition.metadata, + leader_epoch=partition.committed_leader_epoch + ) + return results async def _async_list_group_offsets(self, group_id, group_coordinator_id=None, partitions=None): if group_coordinator_id is None: @@ -191,8 +205,7 @@ async def _async_list_group_offsets(self, group_id, group_coordinator_id=None, p response = await self._manager.send(request, node_id=group_coordinator_id) return self._list_group_offsets_process_response(response) - def list_group_offsets(self, group_id, group_coordinator_id=None, - partitions=None): + def list_group_offsets(self, group_id, group_coordinator_id=None, partitions=None): """Fetch committed offsets for a single consumer group. Note: @@ -213,7 +226,8 @@ def list_group_offsets(self, group_id, group_coordinator_id=None, known offsets for the consumer group. Default: None. Returns: - dict: {topic: [{partition data}]} key/vals from OffsetCommitResponse}]} + A dict mapping :class:`~kafka.TopicPartition` to + :class:`~kafka.structs.OffsetAndMetadata`. """ return self._manager.run(self._async_list_group_offsets, group_id, group_coordinator_id, partitions) @@ -334,6 +348,58 @@ def alter_group_offsets(self, group_id, offsets, group_coordinator_id=None): return self._manager.run( self._async_alter_group_offsets, group_id, offsets, group_coordinator_id) + # -- Reset group offsets ---------------------------------------------- + + @staticmethod + def _reset_group_offsets_process_response(response, to_reset): + results = {} + for topic in response.topics: + for partition in topic.partitions: + tp = TopicPartition(topic.name, partition.partition_index) + results[tp] = { + 'error': Errors.for_code(partition.error_code), + 'offset': to_reset[tp].offset + } + return results + + async def _async_reset_group_offsets(self, group_id, offset_specs, group_coordinator_id=None): + if not offset_specs: + return {} + if group_coordinator_id is None: + group_coordinator_id = await self._find_coordinator_id(group_id) + current = await self._async_list_group_offsets(group_id, group_coordinator_id, offset_specs.keys()) + offsets = await self._async_list_partition_offsets(offset_specs) + to_reset = {} + for tp in offsets: + to_reset[tp] = current[tp]._replace(offset=offsets[tp].offset) + request = self._alter_group_offsets_request(group_id, to_reset) + response = await self._manager.send(request, node_id=group_coordinator_id) + return self._reset_group_offsets_process_response(response, to_reset) + + def reset_group_offsets(self, group_id, offset_specs, group_coordinator_id=None): + """Reset committed offsets for a consumer group to earliest or latest. + + The group must have no active members (i.e. be empty or dead) for + the reset to succeed; otherwise individual partitions may return + ``UNKNOWN_MEMBER_ID`` or similar errors. + + Arguments: + group_id (str): The consumer group id. + offset_specs (dict): A dict mapping :class:`~kafka.TopicPartition` to + :class:`~kafka.admin.OffsetSpec`. + + Keyword Arguments: + group_coordinator_id (int, optional): The node_id of the group's + coordinator broker. If None, the cluster will be queried to + locate the coordinator. Default: None. + + Returns: + dict: A dict mapping :class:`~kafka.TopicPartition` to dict of + {'error': :class:`~kafka.errors.KafkaError` class, 'offset': int} + """ + return self._manager.run( + self._async_reset_group_offsets, group_id, offset_specs, group_coordinator_id) + # -- Delete group offsets ---------------------------------------------- @staticmethod diff --git a/kafka/admin/_partitions.py b/kafka/admin/_partitions.py index a17ee658b..d9a6832d3 100644 --- a/kafka/admin/_partitions.py +++ b/kafka/admin/_partitions.py @@ -487,14 +487,14 @@ async def _async_list_partition_offsets(self, topic_partition_specs, isolation_l continue return results - def list_partition_offsets(self, topic_partitions, isolation_level='read_uncommitted'): + def list_partition_offsets(self, topic_partition_specs, isolation_level='read_uncommitted'): """Look up offsets for the given partitions by spec. Partitions are routed to their respective leader brokers via cluster metadata; one ``ListOffsetsRequest`` is sent per leader. Arguments: - topic_partitions: dict mapping :class:`~kafka.TopicPartition` to + topic_partition_specs: dict mapping :class:`~kafka.TopicPartition` to :class:`OffsetSpec` (or a raw integer timestamp / wire-level sentinel). @@ -515,7 +515,7 @@ def list_partition_offsets(self, topic_partitions, isolation_level='read_uncommi of ListOffsetsRequest compatible with the requested specs. """ return self._manager.run( - self._async_list_partition_offsets, topic_partitions, isolation_level) + self._async_list_partition_offsets, topic_partition_specs, isolation_level) class NewPartitions: diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index d10deed4f..5748e33a0 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -109,6 +109,9 @@ def run_cli(args=None): # Commands TODO: + # --dry-run support + # --trace ? + # [configs] # IncrementalAlterConfigs (not supported yet) diff --git a/kafka/cli/admin/groups/__init__.py b/kafka/cli/admin/groups/__init__.py index b095e34d4..602eba5df 100644 --- a/kafka/cli/admin/groups/__init__.py +++ b/kafka/cli/admin/groups/__init__.py @@ -7,6 +7,7 @@ from .list import ListGroups from .list_offsets import ListGroupOffsets from .remove_members import RemoveGroupMembers +from .reset_offsets import ResetGroupOffsets class GroupsSubCommand: @@ -16,7 +17,7 @@ def add_subparser(cls, subparsers): parser = subparsers.add_parser('groups', help='Manage Kafka Groups') commands = parser.add_subparsers() for cmd in [ListGroups, DescribeGroups, DeleteGroups, - ListGroupOffsets, AlterGroupOffsets, DeleteGroupOffsets, + ListGroupOffsets, AlterGroupOffsets, ResetGroupOffsets, DeleteGroupOffsets, RemoveGroupMembers]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/groups/list_offsets.py b/kafka/cli/admin/groups/list_offsets.py index 47f8da05b..e568958b0 100644 --- a/kafka/cli/admin/groups/list_offsets.py +++ b/kafka/cli/admin/groups/list_offsets.py @@ -1,3 +1,5 @@ +from collections import defaultdict + from kafka.structs import TopicPartition from kafka.admin import OffsetSpec @@ -13,10 +15,15 @@ def add_subparser(cls, subparsers): @classmethod def command(cls, client, args): offsets = client.list_group_offsets(args.group_id) - partitions = [TopicPartition(topic, partition) for topic in offsets for partition in offsets[topic]] - latest = client.list_partition_offsets({tp: OffsetSpec.LATEST for tp in partitions}) + latest = client.list_partition_offsets({tp: OffsetSpec.LATEST for tp in offsets}) + results = defaultdict(dict) for tp in latest: - part_res = offsets[tp.topic][tp.partition] - part_res['latest_offset'] = latest[tp].offset - part_res['lag'] = latest[tp].offset - part_res['committed_offset'] - return offsets + committed = offsets[tp] + results[tp.topic][tp.partition] = { + 'offset': committed.offset, + 'leader_epoch': committed.leader_epoch, + 'metadata': committed.metadata, + 'latest_offset': latest[tp].offset, + 'lag': latest[tp].offset - committed.offset, + } + return dict(results) diff --git a/kafka/cli/admin/groups/reset_offsets.py b/kafka/cli/admin/groups/reset_offsets.py new file mode 100644 index 000000000..f54e5b122 --- /dev/null +++ b/kafka/cli/admin/groups/reset_offsets.py @@ -0,0 +1,78 @@ +from collections import defaultdict + +from kafka.admin import OffsetSpec +from kafka.structs import OffsetAndMetadata, TopicPartition + + +class ResetGroupOffsets: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'reset-offsets', + help='Reset committed offsets for a consumer group') + parser.add_argument('-g', '--group-id', type=str, required=True) + parser.add_argument( + '-s', '--spec', type=str, + help='Spec may be one of earliest, latest, max-timestamp, earliest-local, ' + 'latest-tiered, or a millisecond timestamp. ' + 'Applies to all topic/partitions currently in group. Mutually exclusive ' + 'with --partition') + parser.add_argument( + '-p', '--partition', type=str, action='append', + dest='partitions', default=[], + help='TOPIC:PARTITION:SPEC triple (repeatable). PARTITION may be a ' + 'single partition, a closed range (0-2), an open range (1-), or ' + 'a single wildcard "*" for all partitions. SPEC may be one of ' + 'earliest, latest, max-timestamp, earliest-local, latest-tiered, ' + 'or a millisecond timestamp.') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + if not args.spec and not args.partitions: + raise ValueError('One of --spec or --partition is required') + elif args.spec and args.partitions: + raise ValueError('Only one of --spec and --partition are allowed') + group = client.describe_groups([args.group_id]) + state = group[args.group_id]['group_state'] + if state not in ('Empty', 'Dead'): + raise RuntimeError(f'Group {args.group_id} is {state}, expecting Empty or Dead!') + offset_specs = {} + if args.spec: + offsets = client.list_group_offsets(args.group_id) + spec = cls._parse_spec(args.spec) + offset_specs = {tp: spec for tp in offsets} + else: + offset_specs = cls._parse_partition_specs(args.partitions) + result = client.reset_group_offsets(args.group_id, offset_specs) + output = defaultdict(dict) + for tp, res in result.items(): + res['error'] = res['error'].__name__ + output[tp.topic][tp.partition] = res + return dict(output) + + @staticmethod + def _parse_spec(spec): + try: + return int(spec) + except ValueError: + pass + try: + spec_key = spec.upper().replace('-', '_') + return OffsetSpec[spec_key] + except KeyError: + raise ValueError(f'{spec_key} is not a valid OffsetSpec') + + @classmethod + def _parse_partition_specs(cls, partitions): + tp_offsets = {} + for entry in partitions: + topic, partition, spec_str = entry.rsplit(':', 2) + spec = cls._parse_spec(spec_str) + tp = TopicPartition(topic, int(partition)) + if tp in tp_offsets: + # Passing multiple specs for a single partition results in an InvalidRequestError + raise ValueError('Only one spec allowed per partition') + tp_offsets[tp] = spec + return tp_offsets diff --git a/test/integration/test_producer_integration.py b/test/integration/test_producer_integration.py index 2adc155a8..fc4c13140 100644 --- a/test/integration/test_producer_integration.py +++ b/test/integration/test_producer_integration.py @@ -216,7 +216,9 @@ def test_transactional_producer_offsets(kafka_producer_factory, kafka_admin_clie else: leader_epoch = -1 topic = 'transactional_test_topic' - offsets = {TopicPartition(topic, 0): OffsetAndMetadata(0, 'metadata', leader_epoch)} + tp0 = TopicPartition(topic, 0) + tp1 = TopicPartition(topic, 1) + offsets = {tp0: OffsetAndMetadata(0, 'metadata', leader_epoch)} producer = kafka_producer_factory(transactional_id='testing') producer.init_transactions() producer.begin_transaction() @@ -225,19 +227,15 @@ def test_transactional_producer_offsets(kafka_producer_factory, kafka_admin_clie producer.commit_transaction() producer.begin_transaction() - producer.send_offsets_to_transaction({TopicPartition(topic, 1): OffsetAndMetadata(1, 'bad', 1)}, 'txn-test-group') + producer.send_offsets_to_transaction({tp1: OffsetAndMetadata(1, 'bad', 1)}, 'txn-test-group') producer.abort_transaction() admin = kafka_admin_client_factory() result = { - topic: { - 0: { - 'committed_offset': 0, - 'error_code': 0, - 'metadata': 'metadata', - }, - } + tp0: OffsetAndMetadata( + offset=0, + metadata='metadata', + leader_epoch=leader_epoch, + ) } - if env_kafka_version() >= (2, 1): - result[topic][0]['committed_leader_epoch'] = leader_epoch assert admin.list_group_offsets('txn-test-group') == result