From 34e9cda96800204696062fd91040cd3ebaa743df Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 17 Apr 2026 10:36:25 -0700 Subject: [PATCH 01/14] admin cli: consumer-groups => groups --- kafka/cli/admin/__init__.py | 4 ++-- kafka/cli/admin/consumer_groups/__init__.py | 17 ----------------- kafka/cli/admin/groups/__init__.py | 17 +++++++++++++++++ .../admin/{consumer_groups => groups}/delete.py | 4 ++-- .../{consumer_groups => groups}/describe.py | 4 ++-- .../admin/{consumer_groups => groups}/list.py | 4 ++-- .../{consumer_groups => groups}/list_offsets.py | 4 ++-- 7 files changed, 27 insertions(+), 27 deletions(-) delete mode 100644 kafka/cli/admin/consumer_groups/__init__.py create mode 100644 kafka/cli/admin/groups/__init__.py rename kafka/cli/admin/{consumer_groups => groups}/delete.py (70%) rename kafka/cli/admin/{consumer_groups => groups}/describe.py (86%) rename kafka/cli/admin/{consumer_groups => groups}/list.py (58%) rename kafka/cli/admin/{consumer_groups => groups}/list_offsets.py (82%) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 98c27a690..2050c5871 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -7,7 +7,7 @@ from .acls import ACLsSubCommand from .cluster import ClusterSubCommand from .configs import ConfigsSubCommand -from .consumer_groups import ConsumerGroupsSubCommand +from .groups import GroupsSubCommand from .partitions import PartitionsSubCommand from .topics import TopicsSubCommand from .users import UsersSubCommand @@ -51,7 +51,7 @@ def run_cli(args=None): subparsers = parser.add_subparsers(help='subcommands') for cmd in [ACLsSubCommand, ClusterSubCommand, ConfigsSubCommand, TopicsSubCommand, PartitionsSubCommand, - ConsumerGroupsSubCommand, UsersSubCommand]: + GroupsSubCommand, UsersSubCommand]: cmd.add_subparser(subparsers) config = parser.parse_args(args) diff --git a/kafka/cli/admin/consumer_groups/__init__.py b/kafka/cli/admin/consumer_groups/__init__.py deleted file mode 100644 index dea4513cb..000000000 --- a/kafka/cli/admin/consumer_groups/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -import sys - -from .delete import DeleteConsumerGroups -from .describe import DescribeConsumerGroups -from .list import ListConsumerGroups -from .list_offsets import ListConsumerGroupOffsets - - -class ConsumerGroupsSubCommand: - - @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('consumer-groups', help='Manage Kafka Consumer Groups') - commands = parser.add_subparsers() - for cmd in [ListConsumerGroups, DescribeConsumerGroups, ListConsumerGroupOffsets, DeleteConsumerGroups]: - cmd.add_subparser(commands) - parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/groups/__init__.py b/kafka/cli/admin/groups/__init__.py new file mode 100644 index 000000000..14e3f2081 --- /dev/null +++ b/kafka/cli/admin/groups/__init__.py @@ -0,0 +1,17 @@ +import sys + +from .delete import DeleteGroups +from .describe import DescribeGroups +from .list import ListGroups +from .list_offsets import ListGroupOffsets + + +class GroupsSubCommand: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('groups', help='Manage Kafka Groups') + commands = parser.add_subparsers() + for cmd in [ListGroups, DescribeGroups, ListGroupOffsets, DeleteGroups]: + cmd.add_subparser(commands) + parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/consumer_groups/delete.py b/kafka/cli/admin/groups/delete.py similarity index 70% rename from kafka/cli/admin/consumer_groups/delete.py rename to kafka/cli/admin/groups/delete.py index 40fc593ac..143ebfb9a 100644 --- a/kafka/cli/admin/consumer_groups/delete.py +++ b/kafka/cli/admin/groups/delete.py @@ -1,7 +1,7 @@ -class DeleteConsumerGroups: +class DeleteGroups: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('delete', help='Delete Consumer Groups') + parser = subparsers.add_parser('delete', help='Delete Groups') parser.add_argument('-g', '--group-id', type=str, action='append', dest='groups', required=True) parser.set_defaults(command=lambda cli, args: cli.delete_consumer_groups(args.groups)) diff --git a/kafka/cli/admin/consumer_groups/describe.py b/kafka/cli/admin/groups/describe.py similarity index 86% rename from kafka/cli/admin/consumer_groups/describe.py rename to kafka/cli/admin/groups/describe.py index e1f3393e2..0cd023638 100644 --- a/kafka/cli/admin/consumer_groups/describe.py +++ b/kafka/cli/admin/groups/describe.py @@ -1,7 +1,7 @@ -class DescribeConsumerGroups: +class DescribeGroups: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe', help='Describe Consumer Groups') + parser = subparsers.add_parser('describe', help='Describe Groups') parser.add_argument('-g', '--group-id', type=str, action='append', dest='groups', required=True) parser.set_defaults(command=lambda cli, args: cli.describe_consumer_groups(args.groups)) diff --git a/kafka/cli/admin/consumer_groups/list.py b/kafka/cli/admin/groups/list.py similarity index 58% rename from kafka/cli/admin/consumer_groups/list.py rename to kafka/cli/admin/groups/list.py index dfeb954f3..facc07b06 100644 --- a/kafka/cli/admin/consumer_groups/list.py +++ b/kafka/cli/admin/groups/list.py @@ -1,6 +1,6 @@ -class ListConsumerGroups: +class ListGroups: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('list', help='List Consumer Groups') + parser = subparsers.add_parser('list', help='List Groups') parser.set_defaults(command=lambda cli, _args: cli.list_consumer_groups()) diff --git a/kafka/cli/admin/consumer_groups/list_offsets.py b/kafka/cli/admin/groups/list_offsets.py similarity index 82% rename from kafka/cli/admin/consumer_groups/list_offsets.py rename to kafka/cli/admin/groups/list_offsets.py index ff8f33acb..f53322734 100644 --- a/kafka/cli/admin/consumer_groups/list_offsets.py +++ b/kafka/cli/admin/groups/list_offsets.py @@ -1,7 +1,7 @@ -class ListConsumerGroupOffsets: +class ListGroupOffsets: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('list-offsets', help='List Offsets for Consumer Group') + parser = subparsers.add_parser('list-offsets', help='List Offsets for Group') parser.add_argument('-g', '--group-id', type=str, required=True) parser.set_defaults(command=lambda cli, args: cli.list_consumer_group_offsets(args.group_id)) From 83c79d1f8a70fcdcc53a1c7798066b8195177f2b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 17 Apr 2026 22:17:09 -0700 Subject: [PATCH 02/14] BREAKING: rename admin consumer_groups apis -> groups --- kafka/admin/_groups.py | 72 +++++++++++++------------- kafka/cli/admin/groups/delete.py | 2 +- kafka/cli/admin/groups/describe.py | 2 +- kafka/cli/admin/groups/list.py | 2 +- kafka/cli/admin/groups/list_offsets.py | 2 +- 5 files changed, 40 insertions(+), 40 deletions(-) diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index aee0dd304..6e0a11062 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -1,4 +1,4 @@ -"""Consumer group management mixin for KafkaAdminClient.""" +"""Group management mixin for KafkaAdminClient.""" from __future__ import annotations @@ -28,16 +28,16 @@ class GroupAdminMixin: _coordinator_cache: dict config: dict - # -- Describe consumer groups ---------------------------------------------- + # -- Describe groups ---------------------------------------------- - def _describe_consumer_groups_request(self, group_id): + def _describe_groups_request(self, group_id): request = DescribeGroupsRequest( groups=[group_id], include_authorized_operations=True ) return request - def _describe_consumer_groups_process_response(self, response): + def _describe_groups_process_response(self, response): """Process a DescribeGroupsResponse into a group description.""" assert len(response.groups) == 1 for group in response.groups: @@ -47,17 +47,17 @@ def _describe_consumer_groups_process_response(self, response): # Return dict (key, val) tuples return [(group.group_id, self._process_acl_operations(group.to_dict())) for group in response.groups] - async def _async_describe_consumer_groups(self, group_ids, group_coordinator_id=None): + async def _async_describe_groups(self, group_ids, group_coordinator_id=None): results = [] for group_id in group_ids: coordinator_id = group_coordinator_id or await self._find_coordinator_id(group_id) - request = self._describe_consumer_groups_request(group_id) + request = self._describe_groups_request(group_id) response = await self._manager.send(request, node_id=coordinator_id) - results.append(self._describe_consumer_groups_process_response(response)) + results.append(self._describe_groups_process_response(response)) # Combine key/vals from multiple requests into single dict return dict(itertools.chain(*results)) - def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False): + def describe_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False): """Describe a set of consumer groups. Any errors are immediately raised. @@ -80,16 +80,16 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include of ConsumerSubscription and ConsumerAssignment metadata, and conversion of acl set ints to semantic enums). """ - return self._manager.run(self._async_describe_consumer_groups, group_ids, group_coordinator_id) + return self._manager.run(self._async_describe_groups, group_ids, group_coordinator_id) - # -- List consumer groups -------------------------------------------------- + # -- List groups -------------------------------------------------- - def _list_consumer_groups_request(self): + def _list_groups_request(self): # TODO: KIP-518: StatesFilter # TODO: KIP-848: TypesFilter return ListGroupsRequest() - def _list_consumer_groups_process_response(self, response): + def _list_groups_process_response(self, response): """Process a ListGroupsResponse into a list of groups.""" error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: @@ -98,20 +98,20 @@ def _list_consumer_groups_process_response(self, response): .format(response)) return [group.to_dict() for group in response.groups] - async def _async_list_consumer_groups(self, broker_ids=None): + async def _async_list_groups(self, broker_ids=None): if broker_ids is None: broker_ids = [broker.node_id for broker in self._manager.cluster.brokers()] - consumer_groups = [] + groups = [] for broker_id in broker_ids: - request = self._list_consumer_groups_request() + request = self._list_groups_request() response = await self._manager.send(request, node_id=broker_id) - consumer_groups.extend(self._list_consumer_groups_process_response(response)) - return consumer_groups + groups.extend(self._list_groups_process_response(response)) + return groups - def list_consumer_groups(self, broker_ids=None): + def list_groups(self, broker_ids=None): """List all consumer groups known to the cluster. - This returns a list of Consumer Group tuples. The tuples are + This returns a list of Group dicts. The tuples are composed of the consumer group name and the consumer group protocol type. @@ -132,11 +132,11 @@ def list_consumer_groups(self, broker_ids=None): Returns: List of group data dicts, with key/vals from ListGroupsRequest """ - return self._manager.run(self._async_list_consumer_groups, broker_ids) + return self._manager.run(self._async_list_groups, broker_ids) - # -- List consumer group offsets ------------------------------------------- + # -- List group offsets ------------------------------------------- - def _list_consumer_group_offsets_request(self, group_id, partitions=None): + def _list_group_offsets_request(self, group_id, partitions=None): _Topic = OffsetFetchRequest.OffsetFetchRequestTopic if partitions is None: min_version = 1 @@ -153,7 +153,7 @@ def _list_consumer_group_offsets_request(self, group_id, partitions=None): return OffsetFetchRequest(group_id=group_id, topics=topics, min_version=min_version, max_version=6) - def _list_consumer_group_offsets_process_response(self, response): + def _list_group_offsets_process_response(self, response): """Process an OffsetFetchResponse.""" if response.API_VERSION > 1: error_type = Errors.for_code(response.error_code) @@ -170,16 +170,16 @@ def _partitions_to_dict(partitions): return {topic.name: _partitions_to_dict(topic.partitions) for topic in response.topics} - async def _async_list_consumer_group_offsets(self, group_id, group_coordinator_id=None, partitions=None): + async def _async_list_group_offsets(self, group_id, group_coordinator_id=None, partitions=None): if group_coordinator_id is None: group_coordinator_id = await self._find_coordinator_id(group_id) - request = self._list_consumer_group_offsets_request(group_id, partitions) + request = self._list_group_offsets_request(group_id, partitions) response = await self._manager.send(request, node_id=group_coordinator_id) - return self._list_consumer_group_offsets_process_response(response) + return self._list_group_offsets_process_response(response) - def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, + def list_group_offsets(self, group_id, group_coordinator_id=None, partitions=None): - """Fetch Consumer Offsets for a single consumer group. + """Fetch committed offsets for a single consumer group. Note: This does not verify that the group_id or partitions actually exist @@ -201,11 +201,11 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, Returns: dict: {topic: [{partition data}]} key/vals from OffsetCommitResponse}]} """ - return self._manager.run(self._async_list_consumer_group_offsets, group_id, group_coordinator_id, partitions) + return self._manager.run(self._async_list_group_offsets, group_id, group_coordinator_id, partitions) - # -- Delete consumer groups ------------------------------------------------ + # -- Delete groups ------------------------------------------------ - def _delete_consumer_groups_request(self, group_ids): + def _delete_groups_request(self, group_ids): return DeleteGroupsRequest(groups_names=group_ids) def _convert_delete_groups_response(self, response): @@ -216,7 +216,7 @@ def _convert_delete_groups_response(self, response): results.append((group_id, res)) return results - async def _async_delete_consumer_groups(self, group_ids, group_coordinator_id=None): + async def _async_delete_groups(self, group_ids, group_coordinator_id=None): coordinators_groups = defaultdict(list) if group_coordinator_id is not None: coordinators_groups[group_coordinator_id] = group_ids @@ -227,13 +227,13 @@ async def _async_delete_consumer_groups(self, group_ids, group_coordinator_id=No results = [] for coordinator_id, coordinator_group_ids in coordinators_groups.items(): - request = self._delete_consumer_groups_request(coordinator_group_ids) + request = self._delete_groups_request(coordinator_group_ids) response = await self._manager.send(request, node_id=coordinator_id) results.extend(self._convert_delete_groups_response(response)) return dict(results) - def delete_consumer_groups(self, group_ids, group_coordinator_id=None): - """Delete Consumer Group Offsets for given consumer groups. + def delete_groups(self, group_ids, group_coordinator_id=None): + """Delete Group Offsets for given consumer groups. Note: This does not verify that the group ids actually exist and @@ -251,4 +251,4 @@ def delete_consumer_groups(self, group_ids, group_coordinator_id=None): Returns: A list of tuples (group_id, KafkaError) """ - return self._manager.run(self._async_delete_consumer_groups, group_ids, group_coordinator_id) + return self._manager.run(self._async_delete_groups, group_ids, group_coordinator_id) diff --git a/kafka/cli/admin/groups/delete.py b/kafka/cli/admin/groups/delete.py index 143ebfb9a..ec122cd6f 100644 --- a/kafka/cli/admin/groups/delete.py +++ b/kafka/cli/admin/groups/delete.py @@ -4,4 +4,4 @@ class DeleteGroups: def add_subparser(cls, subparsers): parser = subparsers.add_parser('delete', help='Delete Groups') parser.add_argument('-g', '--group-id', type=str, action='append', dest='groups', required=True) - parser.set_defaults(command=lambda cli, args: cli.delete_consumer_groups(args.groups)) + parser.set_defaults(command=lambda cli, args: cli.delete_groups(args.groups)) diff --git a/kafka/cli/admin/groups/describe.py b/kafka/cli/admin/groups/describe.py index 0cd023638..5898c891a 100644 --- a/kafka/cli/admin/groups/describe.py +++ b/kafka/cli/admin/groups/describe.py @@ -4,4 +4,4 @@ class DescribeGroups: def add_subparser(cls, subparsers): parser = subparsers.add_parser('describe', help='Describe Groups') parser.add_argument('-g', '--group-id', type=str, action='append', dest='groups', required=True) - parser.set_defaults(command=lambda cli, args: cli.describe_consumer_groups(args.groups)) + parser.set_defaults(command=lambda cli, args: cli.describe_groups(args.groups)) diff --git a/kafka/cli/admin/groups/list.py b/kafka/cli/admin/groups/list.py index facc07b06..c0342718d 100644 --- a/kafka/cli/admin/groups/list.py +++ b/kafka/cli/admin/groups/list.py @@ -3,4 +3,4 @@ class ListGroups: @classmethod def add_subparser(cls, subparsers): parser = subparsers.add_parser('list', help='List Groups') - parser.set_defaults(command=lambda cli, _args: cli.list_consumer_groups()) + parser.set_defaults(command=lambda cli, _args: cli.list_groups()) diff --git a/kafka/cli/admin/groups/list_offsets.py b/kafka/cli/admin/groups/list_offsets.py index f53322734..3592ae132 100644 --- a/kafka/cli/admin/groups/list_offsets.py +++ b/kafka/cli/admin/groups/list_offsets.py @@ -4,4 +4,4 @@ class ListGroupOffsets: def add_subparser(cls, subparsers): parser = subparsers.add_parser('list-offsets', help='List Offsets for Group') parser.add_argument('-g', '--group-id', type=str, required=True) - parser.set_defaults(command=lambda cli, args: cli.list_consumer_group_offsets(args.group_id)) + parser.set_defaults(command=lambda cli, args: cli.list_group_offsets(args.group_id)) From 0834e43b1f836be0cb58dbfcbec78b51697f16d9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 17 Apr 2026 22:17:30 -0700 Subject: [PATCH 03/14] update cli TODO re: groups apis --- kafka/cli/admin/__init__.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 2050c5871..e5a13f78c 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -104,17 +104,15 @@ def run_cli(args=None): # [configs] # IncrementalAlterConfigs (not supported yet) - # [consumer-groups] - # remove-members (not supported yet) - # delete-offsets (not supported yet) - # alter-offsets (not supported yet) + # [groups] + # remove-members (not supported yet [uses LeaveGroupRequest]) + # delete-offsets (not supported yet [uses OffsetDeleteRequest]) + # alter-offsets (not supported yet [uses OffsetCommitRequest]) # [client-quotas] # describe (DescribeClientQuotas - not supported yet) # alter (AlterClientQuotas - not supported yet) - # DescribeQuorum (not supported yet) - # [producers] # describe (DescribeProducers - not supported yet) @@ -133,3 +131,4 @@ def run_cli(args=None): # version # api-versions # alter-log-dirs (AlterReplicaLogDirs - not supported yet) + # DescribeQuorum (not supported yet) From b262aa9b8a1b1c0f6de2568f0c1563580d0d9580 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 17 Apr 2026 22:40:00 -0700 Subject: [PATCH 04/14] Admin: alter_group_offsets / delete_group_offsets --- kafka/admin/_groups.py | 140 +++++++- kafka/protocol/consumer/group.py | 4 + kafka/protocol/consumer/group.pyi | 118 ++++++- .../resources/OffsetDeleteRequest.json | 39 +++ .../resources/OffsetDeleteResponse.json | 42 +++ test/admin/test_admin_groups.py | 303 ++++++++++++++++++ test/protocol/consumer/test_group.py | 44 +++ 7 files changed, 688 insertions(+), 2 deletions(-) create mode 100644 kafka/protocol/schemas/resources/OffsetDeleteRequest.json create mode 100644 kafka/protocol/schemas/resources/OffsetDeleteResponse.json create mode 100644 test/admin/test_admin_groups.py diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index 6e0a11062..2d05f0ee8 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -10,7 +10,10 @@ import kafka.errors as Errors from kafka.admin._acls import valid_acl_operations from kafka.protocol.admin import DeleteGroupsRequest, DescribeGroupsRequest, ListGroupsRequest -from kafka.protocol.consumer import OffsetFetchRequest +from kafka.protocol.consumer import ( + OffsetCommitRequest, OffsetDeleteRequest, OffsetFetchRequest, +) +from kafka.protocol.consumer.group import DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID from kafka.protocol.consumer.metadata import ( ConsumerProtocolAssignment, ConsumerProtocolSubscription, ConsumerProtocolType, ) @@ -252,3 +255,138 @@ def delete_groups(self, group_ids, group_coordinator_id=None): A list of tuples (group_id, KafkaError) """ return self._manager.run(self._async_delete_groups, group_ids, group_coordinator_id) + + # -- Alter group offsets ----------------------------------------------- + + @staticmethod + def _alter_group_offsets_request(group_id, offsets): + _Topic = OffsetCommitRequest.OffsetCommitRequestTopic + _Partition = _Topic.OffsetCommitRequestPartition + topic2partitions = defaultdict(list) + for tp, oam in offsets.items(): + topic2partitions[tp.topic].append(_Partition( + partition_index=tp.partition, + committed_offset=oam.offset, + committed_leader_epoch=-1 if oam.leader_epoch is None else oam.leader_epoch, + committed_metadata=oam.metadata, + )) + return OffsetCommitRequest( + group_id=group_id, + generation_id_or_member_epoch=DEFAULT_GENERATION_ID, + member_id=UNKNOWN_MEMBER_ID, + group_instance_id=None, + retention_time_ms=-1, + topics=[_Topic(name=name, partitions=parts) + for name, parts in topic2partitions.items()], + ) + + @staticmethod + def _alter_group_offsets_process_response(response): + results = {} + for topic in response.topics: + for partition in topic.partitions: + results[TopicPartition(topic.name, partition.partition_index)] = \ + Errors.for_code(partition.error_code) + return results + + async def _async_alter_group_offsets(self, group_id, offsets, group_coordinator_id=None): + if not offsets: + return {} + if group_coordinator_id is None: + group_coordinator_id = await self._find_coordinator_id(group_id) + request = self._alter_group_offsets_request(group_id, offsets) + response = await self._manager.send(request, node_id=group_coordinator_id) + return self._alter_group_offsets_process_response(response) + + def alter_group_offsets(self, group_id, offsets, group_coordinator_id=None): + """Alter committed offsets for a consumer group. + + Mirrors Java's ``Admin.alterConsumerGroupOffsets``. The group must + have no active members (i.e. be empty or dead) for the commit to + succeed; otherwise individual partitions may return + ``UNKNOWN_MEMBER_ID`` or similar errors. + + Arguments: + group_id (str): The consumer group id. + offsets (dict): A dict mapping :class:`~kafka.TopicPartition` to + :class:`~kafka.structs.OffsetAndMetadata`. + + Keyword Arguments: + group_coordinator_id (int, optional): The node_id of the group's + coordinator broker. If None, the cluster will be queried to + locate the coordinator. Default: None. + + Returns: + dict: A dict mapping :class:`~kafka.TopicPartition` to the + partition-level :class:`~kafka.errors.KafkaError` class + (``NoError`` on success). + """ + return self._manager.run( + self._async_alter_group_offsets, group_id, offsets, group_coordinator_id) + + # -- Delete group offsets ---------------------------------------------- + + @staticmethod + def _delete_group_offsets_request(group_id, partitions): + _Topic = OffsetDeleteRequest.OffsetDeleteRequestTopic + _Partition = _Topic.OffsetDeleteRequestPartition + topic2partitions = defaultdict(list) + for tp in partitions: + topic2partitions[tp.topic].append( + _Partition(partition_index=tp.partition)) + return OffsetDeleteRequest( + group_id=group_id, + topics=[_Topic(name=name, partitions=parts) + for name, parts in topic2partitions.items()], + ) + + @staticmethod + def _delete_group_offsets_process_response(response): + top_level = Errors.for_code(response.error_code) + if top_level is not Errors.NoError: + raise top_level( + "OffsetDeleteRequest failed with response '{}'.".format(response)) + results = {} + for topic in response.topics: + for partition in topic.partitions: + results[TopicPartition(topic.name, partition.partition_index)] = \ + Errors.for_code(partition.error_code) + return results + + async def _async_delete_group_offsets(self, group_id, partitions, group_coordinator_id=None): + if not partitions: + return {} + if group_coordinator_id is None: + group_coordinator_id = await self._find_coordinator_id(group_id) + request = self._delete_group_offsets_request(group_id, partitions) + response = await self._manager.send(request, node_id=group_coordinator_id) + return self._delete_group_offsets_process_response(response) + + def delete_group_offsets(self, group_id, partitions, group_coordinator_id=None): + """Delete committed offsets for a consumer group. + + Mirrors Java's ``Admin.deleteConsumerGroupOffsets``. The group must + have no active members subscribed to the given topics; otherwise + individual partitions may fail with ``GROUP_SUBSCRIBED_TO_TOPIC``. + + Arguments: + group_id (str): The consumer group id. + partitions: An iterable of :class:`~kafka.TopicPartition` whose + committed offsets should be deleted. + + Keyword Arguments: + group_coordinator_id (int, optional): The node_id of the group's + coordinator broker. If None, the cluster will be queried to + locate the coordinator. Default: None. + + Returns: + dict: A dict mapping :class:`~kafka.TopicPartition` to the + partition-level :class:`~kafka.errors.KafkaError` class + (``NoError`` on success). + + Raises: + KafkaError: If the response contains a top-level error (e.g. + ``GroupIdNotFoundError``, ``NonEmptyGroupError``). + """ + return self._manager.run( + self._async_delete_group_offsets, group_id, partitions, group_coordinator_id) diff --git a/kafka/protocol/consumer/group.py b/kafka/protocol/consumer/group.py index 67b44a7b8..7ab82e0ed 100644 --- a/kafka/protocol/consumer/group.py +++ b/kafka/protocol/consumer/group.py @@ -22,6 +22,9 @@ class OffsetFetchResponse(ApiMessage): pass class OffsetCommitRequest(ApiMessage): pass class OffsetCommitResponse(ApiMessage): pass +class OffsetDeleteRequest(ApiMessage): pass +class OffsetDeleteResponse(ApiMessage): pass + __all__ = [ 'DEFAULT_GENERATION_ID', 'UNKNOWN_MEMBER_ID', @@ -31,4 +34,5 @@ class OffsetCommitResponse(ApiMessage): pass 'HeartbeatRequest', 'HeartbeatResponse', 'OffsetFetchRequest', 'OffsetFetchResponse', 'OffsetCommitRequest', 'OffsetCommitResponse', + 'OffsetDeleteRequest', 'OffsetDeleteResponse', ] diff --git a/kafka/protocol/consumer/group.pyi b/kafka/protocol/consumer/group.pyi index 9b74ad33a..cd8b76936 100644 --- a/kafka/protocol/consumer/group.pyi +++ b/kafka/protocol/consumer/group.pyi @@ -6,7 +6,7 @@ from kafka.protocol.api_message import ApiMessage from kafka.protocol.api_data import ApiData from kafka.protocol.data_container import DataContainer -__all__ = ['DEFAULT_GENERATION_ID', 'UNKNOWN_MEMBER_ID', 'JoinGroupRequest', 'JoinGroupResponse', 'SyncGroupRequest', 'SyncGroupResponse', 'LeaveGroupRequest', 'LeaveGroupResponse', 'HeartbeatRequest', 'HeartbeatResponse', 'OffsetFetchRequest', 'OffsetFetchResponse', 'OffsetCommitRequest', 'OffsetCommitResponse'] +__all__ = ['DEFAULT_GENERATION_ID', 'UNKNOWN_MEMBER_ID', 'JoinGroupRequest', 'JoinGroupResponse', 'SyncGroupRequest', 'SyncGroupResponse', 'LeaveGroupRequest', 'LeaveGroupResponse', 'HeartbeatRequest', 'HeartbeatResponse', 'OffsetFetchRequest', 'OffsetFetchResponse', 'OffsetCommitRequest', 'OffsetCommitResponse', 'OffsetDeleteRequest', 'OffsetDeleteResponse'] DEFAULT_GENERATION_ID: int @@ -698,3 +698,119 @@ class OffsetCommitResponse(ApiMessage): def is_request(cls) -> bool: ... def expect_response(self) -> bool: ... def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class OffsetDeleteRequest(ApiMessage): + class OffsetDeleteRequestTopic(DataContainer): + class OffsetDeleteRequestPartition(DataContainer): + partition_index: int + def __init__( + self, + *args: Any, + partition_index: int = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + name: str + partitions: list[OffsetDeleteRequestPartition] + def __init__( + self, + *args: Any, + name: str = ..., + partitions: list[OffsetDeleteRequestPartition] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + group_id: str + topics: list[OffsetDeleteRequestTopic] + def __init__( + self, + *args: Any, + group_id: str = ..., + topics: list[OffsetDeleteRequestTopic] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class OffsetDeleteResponse(ApiMessage): + class OffsetDeleteResponseTopic(DataContainer): + class OffsetDeleteResponsePartition(DataContainer): + partition_index: int + error_code: int + def __init__( + self, + *args: Any, + partition_index: int = ..., + error_code: int = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + name: str + partitions: list[OffsetDeleteResponsePartition] + def __init__( + self, + *args: Any, + name: str = ..., + partitions: list[OffsetDeleteResponsePartition] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + error_code: int + throttle_time_ms: int + topics: list[OffsetDeleteResponseTopic] + def __init__( + self, + *args: Any, + error_code: int = ..., + throttle_time_ms: int = ..., + topics: list[OffsetDeleteResponseTopic] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... diff --git a/kafka/protocol/schemas/resources/OffsetDeleteRequest.json b/kafka/protocol/schemas/resources/OffsetDeleteRequest.json new file mode 100644 index 000000000..1974b67f6 --- /dev/null +++ b/kafka/protocol/schemas/resources/OffsetDeleteRequest.json @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 47, + "type": "request", + "listeners": ["broker"], + "name": "OffsetDeleteRequest", + "validVersions": "0", + "flexibleVersions": "none", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The unique group identifier." }, + { "name": "Topics", "type": "[]OffsetDeleteRequestTopic", "versions": "0+", + "about": "The topics to delete offsets for.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]OffsetDeleteRequestPartition", "versions": "0+", + "about": "Each partition to delete offsets for.", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." } + ] + } + ] + } + ] +} diff --git a/kafka/protocol/schemas/resources/OffsetDeleteResponse.json b/kafka/protocol/schemas/resources/OffsetDeleteResponse.json new file mode 100644 index 000000000..d32b36f6f --- /dev/null +++ b/kafka/protocol/schemas/resources/OffsetDeleteResponse.json @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 47, + "type": "response", + "name": "OffsetDeleteResponse", + "validVersions": "0", + "flexibleVersions": "none", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error." }, + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Topics", "type": "[]OffsetDeleteResponseTopic", "versions": "0+", + "about": "The responses for each topic.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]OffsetDeleteResponsePartition", "versions": "0+", + "about": "The responses for each partition in the topic.", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true, + "about": "The partition index." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error." } + ] + } + ] + } + ] +} diff --git a/test/admin/test_admin_groups.py b/test/admin/test_admin_groups.py new file mode 100644 index 000000000..b6af8696d --- /dev/null +++ b/test/admin/test_admin_groups.py @@ -0,0 +1,303 @@ +import pytest + +from kafka.admin import KafkaAdminClient +from kafka.errors import ( + GroupIdNotFoundError, + GroupSubscribedToTopicError, + NoError, + UnknownMemberIdError, +) +from kafka.protocol.consumer import ( + OffsetCommitRequest, OffsetCommitResponse, + OffsetDeleteRequest, OffsetDeleteResponse, +) +from kafka.protocol.consumer.group import DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID +from kafka.structs import OffsetAndMetadata, TopicPartition + +from test.mock_broker import MockBroker + + +def _make_admin(broker): + return KafkaAdminClient( + kafka_client=broker.client_factory(), + bootstrap_servers='%s:%d' % (broker.host, broker.port), + api_version=broker.broker_version, + request_timeout_ms=5000, + ) + + +# --------------------------------------------------------------------------- +# alter_group_offsets +# --------------------------------------------------------------------------- + + +class TestAlterGroupOffsetsMockBroker: + + def test_success_returns_tp_to_noerror(self): + broker = MockBroker() + _Topic = OffsetCommitResponse.OffsetCommitResponseTopic + _Partition = _Topic.OffsetCommitResponsePartition + broker.respond( + OffsetCommitRequest, + OffsetCommitResponse( + throttle_time_ms=0, + topics=[ + _Topic(name='topic-a', partitions=[ + _Partition(partition_index=0, error_code=0), + _Partition(partition_index=1, error_code=0), + ]), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.alter_group_offsets( + 'g1', + { + TopicPartition('topic-a', 0): OffsetAndMetadata(10, '', None), + TopicPartition('topic-a', 1): OffsetAndMetadata(20, 'm', 5), + }, + group_coordinator_id=0, + ) + finally: + admin.close() + + assert result == { + TopicPartition('topic-a', 0): NoError, + TopicPartition('topic-a', 1): NoError, + } + + def test_request_uses_standalone_member_fields(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = OffsetCommitRequest.decode( + request_bytes, version=api_version, header=True) + return OffsetCommitResponse(throttle_time_ms=0, topics=[]) + + broker.respond_fn(OffsetCommitRequest, handler) + + admin = _make_admin(broker) + try: + admin.alter_group_offsets( + 'g1', + {TopicPartition('topic-a', 0): OffsetAndMetadata(10, 'meta', 7)}, + group_coordinator_id=0, + ) + finally: + admin.close() + + req = captured['request'] + assert req.group_id == 'g1' + assert req.generation_id_or_member_epoch == DEFAULT_GENERATION_ID + assert req.member_id == UNKNOWN_MEMBER_ID + assert req.group_instance_id is None + assert req.retention_time_ms == -1 + assert len(req.topics) == 1 + topic = req.topics[0] + assert topic.name == 'topic-a' + assert len(topic.partitions) == 1 + p = topic.partitions[0] + assert p.partition_index == 0 + assert p.committed_offset == 10 + assert p.committed_metadata == 'meta' + assert p.committed_leader_epoch == 7 + + def test_leader_epoch_none_sent_as_minus_one(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = OffsetCommitRequest.decode( + request_bytes, version=api_version, header=True) + return OffsetCommitResponse(throttle_time_ms=0, topics=[]) + + broker.respond_fn(OffsetCommitRequest, handler) + + admin = _make_admin(broker) + try: + admin.alter_group_offsets( + 'g1', + {TopicPartition('topic-a', 0): OffsetAndMetadata(10, '', None)}, + group_coordinator_id=0, + ) + finally: + admin.close() + + assert captured['request'].topics[0].partitions[0].committed_leader_epoch == -1 + + def test_partition_level_error_returned_not_raised(self): + broker = MockBroker() + _Topic = OffsetCommitResponse.OffsetCommitResponseTopic + _Partition = _Topic.OffsetCommitResponsePartition + broker.respond( + OffsetCommitRequest, + OffsetCommitResponse( + throttle_time_ms=0, + topics=[ + _Topic(name='topic-a', partitions=[ + _Partition(partition_index=0, + error_code=UnknownMemberIdError.errno), + ]), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.alter_group_offsets( + 'g1', + {TopicPartition('topic-a', 0): OffsetAndMetadata(1, '', None)}, + group_coordinator_id=0, + ) + finally: + admin.close() + + assert result == {TopicPartition('topic-a', 0): UnknownMemberIdError} + + def test_empty_offsets_is_noop(self): + broker = MockBroker() + admin = _make_admin(broker) + try: + result = admin.alter_group_offsets('g1', {}, group_coordinator_id=0) + finally: + admin.close() + assert result == {} + + +# --------------------------------------------------------------------------- +# delete_group_offsets +# --------------------------------------------------------------------------- + + +class TestDeleteGroupOffsetsMockBroker: + + def test_success_returns_tp_to_noerror(self): + broker = MockBroker() + _Topic = OffsetDeleteResponse.OffsetDeleteResponseTopic + _Partition = _Topic.OffsetDeleteResponsePartition + broker.respond( + OffsetDeleteRequest, + OffsetDeleteResponse( + error_code=0, + throttle_time_ms=0, + topics=[ + _Topic(name='topic-a', partitions=[ + _Partition(partition_index=0, error_code=0), + _Partition(partition_index=1, error_code=0), + ]), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.delete_group_offsets( + 'g1', + [TopicPartition('topic-a', 0), TopicPartition('topic-a', 1)], + group_coordinator_id=0, + ) + finally: + admin.close() + + assert result == { + TopicPartition('topic-a', 0): NoError, + TopicPartition('topic-a', 1): NoError, + } + + def test_groups_partitions_by_topic(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = OffsetDeleteRequest.decode( + request_bytes, version=api_version, header=True) + return OffsetDeleteResponse(error_code=0, throttle_time_ms=0, topics=[]) + + broker.respond_fn(OffsetDeleteRequest, handler) + + admin = _make_admin(broker) + try: + admin.delete_group_offsets( + 'g1', + [ + TopicPartition('topic-a', 0), + TopicPartition('topic-b', 2), + TopicPartition('topic-a', 1), + ], + group_coordinator_id=0, + ) + finally: + admin.close() + + req = captured['request'] + assert req.group_id == 'g1' + topics_by_name = {t.name: t for t in req.topics} + assert set(topics_by_name.keys()) == {'topic-a', 'topic-b'} + a_indexes = sorted(p.partition_index for p in topics_by_name['topic-a'].partitions) + b_indexes = sorted(p.partition_index for p in topics_by_name['topic-b'].partitions) + assert a_indexes == [0, 1] + assert b_indexes == [2] + + def test_top_level_error_raises(self): + broker = MockBroker() + broker.respond( + OffsetDeleteRequest, + OffsetDeleteResponse( + error_code=GroupIdNotFoundError.errno, + throttle_time_ms=0, + topics=[], + ), + ) + + admin = _make_admin(broker) + try: + with pytest.raises(GroupIdNotFoundError): + admin.delete_group_offsets( + 'g1', + [TopicPartition('topic-a', 0)], + group_coordinator_id=0, + ) + finally: + admin.close() + + def test_partition_level_error_returned_not_raised(self): + broker = MockBroker() + _Topic = OffsetDeleteResponse.OffsetDeleteResponseTopic + _Partition = _Topic.OffsetDeleteResponsePartition + broker.respond( + OffsetDeleteRequest, + OffsetDeleteResponse( + error_code=0, + throttle_time_ms=0, + topics=[ + _Topic(name='topic-a', partitions=[ + _Partition(partition_index=0, + error_code=GroupSubscribedToTopicError.errno), + ]), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.delete_group_offsets( + 'g1', + [TopicPartition('topic-a', 0)], + group_coordinator_id=0, + ) + finally: + admin.close() + + assert result == {TopicPartition('topic-a', 0): GroupSubscribedToTopicError} + + def test_empty_partitions_is_noop(self): + broker = MockBroker() + admin = _make_admin(broker) + try: + result = admin.delete_group_offsets('g1', [], group_coordinator_id=0) + finally: + admin.close() + assert result == {} diff --git a/test/protocol/consumer/test_group.py b/test/protocol/consumer/test_group.py index c20d7e3ab..258320832 100644 --- a/test/protocol/consumer/test_group.py +++ b/test/protocol/consumer/test_group.py @@ -9,6 +9,7 @@ HeartbeatRequest, HeartbeatResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetCommitRequest, OffsetCommitResponse, + OffsetDeleteRequest, OffsetDeleteResponse, ) from kafka.protocol.consumer.metadata import ConsumerProtocolSubscription @@ -333,6 +334,49 @@ def test_offset_commit_response_roundtrip(version): assert decoded == response +@pytest.mark.parametrize("version", range(OffsetDeleteRequest.min_version, OffsetDeleteRequest.max_version + 1)) +def test_offset_delete_request_roundtrip(version): + Topic = OffsetDeleteRequest.OffsetDeleteRequestTopic + Partition = Topic.OffsetDeleteRequestPartition + request = OffsetDeleteRequest( + group_id="test-group", + topics=[ + Topic( + name="topic-1", + partitions=[ + Partition(partition_index=0), + Partition(partition_index=1), + ], + ), + ], + ) + encoded = request.encode(version=version) + decoded = OffsetDeleteRequest.decode(encoded, version=version) + assert decoded == request + + +@pytest.mark.parametrize("version", range(OffsetDeleteResponse.min_version, OffsetDeleteResponse.max_version + 1)) +def test_offset_delete_response_roundtrip(version): + Topic = OffsetDeleteResponse.OffsetDeleteResponseTopic + Partition = Topic.OffsetDeleteResponsePartition + response = OffsetDeleteResponse( + error_code=0, + throttle_time_ms=100, + topics=[ + Topic( + name="topic-1", + partitions=[ + Partition(partition_index=0, error_code=0), + Partition(partition_index=1, error_code=28), + ], + ), + ], + ) + encoded = response.encode(version=version) + decoded = OffsetDeleteResponse.decode(encoded, version=version) + assert decoded == response + + def test_default_generation_id(): assert DEFAULT_GENERATION_ID == -1 From 96c4e30814ab9a6410af46fd600084b889a537e1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 17 Apr 2026 22:46:33 -0700 Subject: [PATCH 05/14] admin cli: groups alter-offsets / delete-offsets --- kafka/cli/admin/__init__.py | 2 -- kafka/cli/admin/groups/__init__.py | 5 +++- kafka/cli/admin/groups/alter_offsets.py | 32 ++++++++++++++++++++++++ kafka/cli/admin/groups/delete_offsets.py | 31 +++++++++++++++++++++++ 4 files changed, 67 insertions(+), 3 deletions(-) create mode 100644 kafka/cli/admin/groups/alter_offsets.py create mode 100644 kafka/cli/admin/groups/delete_offsets.py diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index e5a13f78c..88fe9d69c 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -106,8 +106,6 @@ def run_cli(args=None): # [groups] # remove-members (not supported yet [uses LeaveGroupRequest]) - # delete-offsets (not supported yet [uses OffsetDeleteRequest]) - # alter-offsets (not supported yet [uses OffsetCommitRequest]) # [client-quotas] # describe (DescribeClientQuotas - not supported yet) diff --git a/kafka/cli/admin/groups/__init__.py b/kafka/cli/admin/groups/__init__.py index 14e3f2081..f3aaca759 100644 --- a/kafka/cli/admin/groups/__init__.py +++ b/kafka/cli/admin/groups/__init__.py @@ -1,6 +1,8 @@ import sys +from .alter_offsets import AlterGroupOffsets from .delete import DeleteGroups +from .delete_offsets import DeleteGroupOffsets from .describe import DescribeGroups from .list import ListGroups from .list_offsets import ListGroupOffsets @@ -12,6 +14,7 @@ class GroupsSubCommand: def add_subparser(cls, subparsers): parser = subparsers.add_parser('groups', help='Manage Kafka Groups') commands = parser.add_subparsers() - for cmd in [ListGroups, DescribeGroups, ListGroupOffsets, DeleteGroups]: + for cmd in [ListGroups, DescribeGroups, DeleteGroups, + ListGroupOffsets, AlterGroupOffsets, DeleteGroupOffsets]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/groups/alter_offsets.py b/kafka/cli/admin/groups/alter_offsets.py new file mode 100644 index 000000000..c06cbfffe --- /dev/null +++ b/kafka/cli/admin/groups/alter_offsets.py @@ -0,0 +1,32 @@ +from kafka.structs import OffsetAndMetadata, TopicPartition + + +class AlterGroupOffsets: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'alter-offsets', + help='Alter committed offsets for a consumer group') + parser.add_argument('-g', '--group-id', type=str, required=True) + parser.add_argument( + '-o', '--offset', type=str, action='append', + dest='offsets', default=[], required=True, + help='TOPIC:PARTITION:OFFSET triple (repeatable).') + parser.add_argument( + '--group-coordinator-id', type=int, default=None, + help='Send directly to this broker id, skipping coordinator lookup') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + offsets = {} + for spec in args.offsets: + topic, partition, offset = spec.rsplit(':', 2) + offsets[TopicPartition(topic, int(partition))] = \ + OffsetAndMetadata(int(offset), '', None) + result = client.alter_group_offsets( + args.group_id, offsets, + group_coordinator_id=args.group_coordinator_id) + return {'%s:%d' % (tp.topic, tp.partition): err.__name__ + for tp, err in result.items()} diff --git a/kafka/cli/admin/groups/delete_offsets.py b/kafka/cli/admin/groups/delete_offsets.py new file mode 100644 index 000000000..e5117dcce --- /dev/null +++ b/kafka/cli/admin/groups/delete_offsets.py @@ -0,0 +1,31 @@ +from kafka.structs import TopicPartition + + +class DeleteGroupOffsets: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'delete-offsets', + help='Delete committed offsets for a consumer group') + parser.add_argument('-g', '--group-id', type=str, required=True) + parser.add_argument( + '-p', '--partition', type=str, action='append', + dest='partitions', default=[], required=True, + help='TOPIC:PARTITION pair (repeatable).') + parser.add_argument( + '--group-coordinator-id', type=int, default=None, + help='Send directly to this broker id, skipping coordinator lookup') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + partitions = [] + for spec in args.partitions: + topic, partition = spec.rsplit(':', 1) + partitions.append(TopicPartition(topic, int(partition))) + result = client.delete_group_offsets( + args.group_id, partitions, + group_coordinator_id=args.group_coordinator_id) + return {'%s:%d' % (tp.topic, tp.partition): err.__name__ + for tp, err in result.items()} From 62c440bd294bec91240834b77031f68826872156 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 17 Apr 2026 22:47:35 -0700 Subject: [PATCH 06/14] reorder cli partitions commands --- kafka/cli/admin/partitions/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/cli/admin/partitions/__init__.py b/kafka/cli/admin/partitions/__init__.py index d1416f16b..1577e4566 100644 --- a/kafka/cli/admin/partitions/__init__.py +++ b/kafka/cli/admin/partitions/__init__.py @@ -16,11 +16,11 @@ def add_subparser(cls, subparsers): commands = parser.add_subparsers() for cmd in [ CreatePartitions, + DescribeTopicPartitions, + ListPartitionReassignments, + AlterPartitionReassignments, DeleteRecords, ElectLeaders, - AlterPartitionReassignments, - ListPartitionReassignments, - DescribeTopicPartitions, ]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) From 2a46225e30cb0e002c1f6bfa7ab05df562de8595 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 18 Apr 2026 06:50:10 -0700 Subject: [PATCH 07/14] Fixup alter/delete group docstrings --- kafka/admin/_groups.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index 2d05f0ee8..d7d80569b 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -301,9 +301,8 @@ async def _async_alter_group_offsets(self, group_id, offsets, group_coordinator_ def alter_group_offsets(self, group_id, offsets, group_coordinator_id=None): """Alter committed offsets for a consumer group. - Mirrors Java's ``Admin.alterConsumerGroupOffsets``. The group must - have no active members (i.e. be empty or dead) for the commit to - succeed; otherwise individual partitions may return + The group must have no active members (i.e. be empty or dead) for + the commit to succeed; otherwise individual partitions may return ``UNKNOWN_MEMBER_ID`` or similar errors. Arguments: @@ -365,9 +364,8 @@ async def _async_delete_group_offsets(self, group_id, partitions, group_coordina def delete_group_offsets(self, group_id, partitions, group_coordinator_id=None): """Delete committed offsets for a consumer group. - Mirrors Java's ``Admin.deleteConsumerGroupOffsets``. The group must - have no active members subscribed to the given topics; otherwise - individual partitions may fail with ``GROUP_SUBSCRIBED_TO_TOPIC``. + The group must have no active members subscribed to the given topics; + otherwise partitions may fail with ``GROUP_SUBSCRIBED_TO_TOPIC``. Arguments: group_id (str): The consumer group id. From 84421eb1762d3e31af8dfae512fcc578fc424f89 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 18 Apr 2026 06:58:48 -0700 Subject: [PATCH 08/14] Admin: remove_group_members api --- kafka/admin/__init__.py | 4 +- kafka/admin/_groups.py | 145 +++++++++++++++++++++- test/admin/test_admin_groups.py | 205 +++++++++++++++++++++++++++++++- 3 files changed, 351 insertions(+), 3 deletions(-) diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index 405bb482b..59127f68b 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -3,8 +3,9 @@ ResourceType, ACLPermissionType, ACLResourcePatternType) from kafka.admin._configs import ( ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType) -from kafka.admin._topics import NewTopic +from kafka.admin._groups import MemberToRemove from kafka.admin._partitions import NewPartitions +from kafka.admin._topics import NewTopic from kafka.admin._users import ( ScramMechanism, UserScramCredentialDeletion, UserScramCredentialUpsertion) @@ -13,5 +14,6 @@ 'ACL', 'ACLFilter', 'ACLOperation', 'ACLPermissionType', 'ACLResourcePatternType', 'ResourceType', 'ResourcePattern', 'ResourcePatternFilter', 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType', + 'MemberToRemove', # NewTopic + NewPartitions are deprecated and not included in __all__ 'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion', ] diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index d7d80569b..864310ded 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -11,7 +11,7 @@ from kafka.admin._acls import valid_acl_operations from kafka.protocol.admin import DeleteGroupsRequest, DescribeGroupsRequest, ListGroupsRequest from kafka.protocol.consumer import ( - OffsetCommitRequest, OffsetDeleteRequest, OffsetFetchRequest, + LeaveGroupRequest, OffsetCommitRequest, OffsetDeleteRequest, OffsetFetchRequest, ) from kafka.protocol.consumer.group import DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID from kafka.protocol.consumer.metadata import ( @@ -388,3 +388,146 @@ def delete_group_offsets(self, group_id, partitions, group_coordinator_id=None): """ return self._manager.run( self._async_delete_group_offsets, group_id, partitions, group_coordinator_id) + + # -- Remove group members --------------------------------------------- + + @staticmethod + def _remove_group_members_batch_request(group_id, members, version): + _Member = LeaveGroupRequest.MemberIdentity + identities = [] + for m in members: + kwargs = { + 'member_id': m.member_id if m.member_id is not None else '', + 'group_instance_id': m.group_instance_id, + } + if version >= 5: + kwargs['reason'] = m.reason + identities.append(_Member(**kwargs)) + return LeaveGroupRequest( + group_id=group_id, + members=identities, + min_version=3, + max_version=version, + ) + + @staticmethod + def _remove_group_members_process_batch_response(response): + top_level = Errors.for_code(response.error_code) + if top_level is not Errors.NoError: + raise top_level( + "LeaveGroupRequest failed with response '{}'.".format(response)) + return { + MemberToRemove( + member_id=m.member_id or None, + group_instance_id=m.group_instance_id, + reason=None, + ): Errors.for_code(m.error_code) + for m in response.members + } + + async def _async_remove_group_members(self, group_id, members, + group_coordinator_id=None): + if not members: + return {} + if group_coordinator_id is None: + group_coordinator_id = await self._find_coordinator_id(group_id) + + version = self._manager.broker_version_data.api_version(LeaveGroupRequest) + batch_supported = version >= 3 + + if batch_supported: + request = self._remove_group_members_batch_request( + group_id, members, version) + response = await self._manager.send(request, node_id=group_coordinator_id) + return self._remove_group_members_process_batch_response(response) + + results = {} + for m in members: + if m.group_instance_id is not None: + raise Errors.UnsupportedVersionError( + "Broker does not support removing members by group.instance.id; " + "requires LeaveGroup v3+ (Kafka 2.3+).") + if not m.member_id: + raise ValueError( + "MemberToRemove.member_id is required when broker does not " + "support batched LeaveGroupRequest (v3+).") + request = LeaveGroupRequest( + group_id=group_id, + member_id=m.member_id, + max_version=2, + ) + response = await self._manager.send(request, node_id=group_coordinator_id) + results[m] = Errors.for_code(response.error_code) + return results + + def remove_group_members(self, group_id, members, group_coordinator_id=None): + """Remove members from a consumer group. + + On brokers supporting LeaveGroup v3+ (Kafka 2.3+), a single batched + request is sent. On older brokers, falls back to one single-member + LeaveGroupRequest per member (in which case ``group_instance_id`` is + not supported and ``member_id`` is required). + + Arguments: + group_id (str): The consumer group id. + members: An iterable of :class:`~kafka.admin.MemberToRemove`. + Each entry must set at least one of ``member_id`` or, + if brokers support LeaveGroup v3+, ``group_instance_id``. + ``reason`` is only sent to brokers supporting + LeaveGroup v5+ (KIP-800). + + Keyword Arguments: + group_coordinator_id (int, optional): The node_id of the group's + coordinator broker. If None, the cluster will be queried to + locate the coordinator. Default: None. + + Returns: + dict: A dict mapping :class:`~kafka.admin.MemberToRemove` to the + per-member :class:`~kafka.errors.KafkaError` class + (``NoError`` on success). The key's ``reason`` is always None in + the result (not echoed by the broker). + + Raises: + KafkaError: If a batched response contains a top-level error. + UnsupportedVersionError: If the broker does not support batched + LeaveGroupRequest and any member uses ``group_instance_id``. + """ + return self._manager.run( + self._async_remove_group_members, group_id, members, group_coordinator_id) + + +class MemberToRemove: + """A consumer group member to remove via Admin.remove_group_members + + At least one of ``member_id`` (identifying a dynamic group member) + or ``group_instance_id`` (identifying a static group member) must be set. + + Keyword Arguments: + member_id (str or None): The dynamic member id (as assigned by the + coordinator in JoinGroupResponse). Use None for static-only removal. + group_instance_id (str or None): The static member instance id (the + ``group.instance.id`` configured on the member). Requires LeaveGroup + v3+ (Kafka 2.3+). + reason (str or None): Optional reason for removal (propagated to the + broker on LeaveGroup v5+; ignored on older brokers). + """ + __slots__ = ('member_id', 'group_instance_id', 'reason') + + def __init__(self, member_id=None, group_instance_id=None, reason=None): + self.member_id = member_id + self.group_instance_id = group_instance_id + self.reason = reason + + def __repr__(self): + return "".format( + self.member_id, self.group_instance_id, self.reason) + + def __eq__(self, other): + return all(( + self.member_id == other.member_id, + self.group_instance_id == other.group_instance_id, + self.reason == other.reason, + )) + + def __hash__(self): + return hash((self.member_id, self.group_instance_id, self.reason)) diff --git a/test/admin/test_admin_groups.py b/test/admin/test_admin_groups.py index b6af8696d..6f924ede8 100644 --- a/test/admin/test_admin_groups.py +++ b/test/admin/test_admin_groups.py @@ -1,13 +1,15 @@ import pytest -from kafka.admin import KafkaAdminClient +from kafka.admin import KafkaAdminClient, MemberToRemove from kafka.errors import ( GroupIdNotFoundError, GroupSubscribedToTopicError, NoError, UnknownMemberIdError, + UnsupportedVersionError, ) from kafka.protocol.consumer import ( + LeaveGroupRequest, LeaveGroupResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, ) @@ -301,3 +303,204 @@ def test_empty_partitions_is_noop(self): finally: admin.close() assert result == {} + + +# --------------------------------------------------------------------------- +# remove_group_members +# --------------------------------------------------------------------------- + + +class TestRemoveGroupMembersMockBroker: + + def test_batch_success_returns_member_to_noerror(self): + broker = MockBroker() # broker_version=(4, 2) -> LeaveGroup v5 + _MemberResp = LeaveGroupResponse.MemberResponse + broker.respond( + LeaveGroupRequest, + LeaveGroupResponse( + throttle_time_ms=0, + error_code=0, + members=[ + _MemberResp(member_id='m1', group_instance_id=None, error_code=0), + _MemberResp(member_id='', group_instance_id='static-1', error_code=0), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.remove_group_members( + 'g1', + [ + MemberToRemove(member_id='m1'), + MemberToRemove(group_instance_id='static-1'), + ], + group_coordinator_id=0, + ) + finally: + admin.close() + + assert result == { + MemberToRemove(member_id='m1'): NoError, + MemberToRemove(group_instance_id='static-1'): NoError, + } + + def test_batch_request_fields(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['version'] = api_version + captured['request'] = LeaveGroupRequest.decode( + request_bytes, version=api_version, header=True) + return LeaveGroupResponse( + throttle_time_ms=0, error_code=0, members=[]) + + broker.respond_fn(LeaveGroupRequest, handler) + + admin = _make_admin(broker) + try: + admin.remove_group_members( + 'g1', + [ + MemberToRemove(member_id='m1', reason='rebalance'), + MemberToRemove(group_instance_id='inst-2', reason='shutdown'), + ], + group_coordinator_id=0, + ) + finally: + admin.close() + + assert captured['version'] >= 3 + req = captured['request'] + assert req.group_id == 'g1' + assert len(req.members) == 2 + m1 = req.members[0] + assert m1.member_id == 'm1' + assert m1.group_instance_id is None + m2 = req.members[1] + assert m2.member_id == '' + assert m2.group_instance_id == 'inst-2' + if captured['version'] >= 5: + assert m1.reason == 'rebalance' + assert m2.reason == 'shutdown' + + def test_batch_top_level_error_raises(self): + broker = MockBroker() + broker.respond( + LeaveGroupRequest, + LeaveGroupResponse( + throttle_time_ms=0, + error_code=GroupIdNotFoundError.errno, + members=[], + ), + ) + + admin = _make_admin(broker) + try: + with pytest.raises(GroupIdNotFoundError): + admin.remove_group_members( + 'g1', + [MemberToRemove(member_id='m1')], + group_coordinator_id=0, + ) + finally: + admin.close() + + def test_batch_per_member_error_returned(self): + broker = MockBroker() + _MemberResp = LeaveGroupResponse.MemberResponse + broker.respond( + LeaveGroupRequest, + LeaveGroupResponse( + throttle_time_ms=0, + error_code=0, + members=[ + _MemberResp(member_id='m1', group_instance_id=None, + error_code=UnknownMemberIdError.errno), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.remove_group_members( + 'g1', + [MemberToRemove(member_id='m1')], + group_coordinator_id=0, + ) + finally: + admin.close() + + assert result == {MemberToRemove(member_id='m1'): UnknownMemberIdError} + + def test_empty_members_is_noop(self): + broker = MockBroker() + admin = _make_admin(broker) + try: + result = admin.remove_group_members('g1', [], group_coordinator_id=0) + finally: + admin.close() + assert result == {} + + def test_fallback_fans_out_one_request_per_member(self): + # (2, 3) broker: LeaveGroup v0-v2 only, no batch support + broker = MockBroker(broker_version=(2, 3)) + captured = [] + + def handler(api_key, api_version, correlation_id, request_bytes): + captured.append(LeaveGroupRequest.decode( + request_bytes, version=api_version, header=True)) + return LeaveGroupResponse( + version=api_version, throttle_time_ms=0, error_code=0) + + broker.respond_fn(LeaveGroupRequest, handler) + broker.respond_fn(LeaveGroupRequest, handler) + + admin = _make_admin(broker) + try: + result = admin.remove_group_members( + 'g1', + [ + MemberToRemove(member_id='m1'), + MemberToRemove(member_id='m2'), + ], + group_coordinator_id=0, + ) + finally: + admin.close() + + assert len(captured) == 2 + assert captured[0].group_id == 'g1' + assert captured[0].member_id == 'm1' + assert captured[1].member_id == 'm2' + assert result == { + MemberToRemove(member_id='m1'): NoError, + MemberToRemove(member_id='m2'): NoError, + } + + def test_fallback_rejects_group_instance_id(self): + broker = MockBroker(broker_version=(2, 3)) + admin = _make_admin(broker) + try: + with pytest.raises(UnsupportedVersionError): + admin.remove_group_members( + 'g1', + [MemberToRemove(group_instance_id='inst-1')], + group_coordinator_id=0, + ) + finally: + admin.close() + + def test_fallback_requires_member_id(self): + broker = MockBroker(broker_version=(2, 3)) + admin = _make_admin(broker) + try: + with pytest.raises(ValueError): + admin.remove_group_members( + 'g1', + [MemberToRemove()], + group_coordinator_id=0, + ) + finally: + admin.close() From 965e28d047dd5af1e209d4eab3d1285e06461de5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 18 Apr 2026 07:23:14 -0700 Subject: [PATCH 09/14] Handle and log metadata decode errors --- kafka/admin/_groups.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index 864310ded..902ca646b 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -5,6 +5,7 @@ import itertools import logging from collections import defaultdict +import struct from typing import TYPE_CHECKING import kafka.errors as Errors @@ -45,8 +46,18 @@ def _describe_groups_process_response(self, response): assert len(response.groups) == 1 for group in response.groups: for member in group.members: - member.member_metadata = ConsumerProtocolSubscription.decode(member.member_metadata) - member.member_assignment = ConsumerProtocolAssignment.decode(member.member_assignment) + if member.member_metadata: + try: + member.member_metadata = ConsumerProtocolSubscription.decode(member.member_metadata) + except struct.error: + log.warn(f'Unable to decode member_metadata for {group}/{member.member_id}') + pass + if member.member_assignment: + try: + member.member_assignment = ConsumerProtocolAssignment.decode(member.member_assignment) + except struct.error: + log.warn(f'Unable to decode member_assignment for {group}/{member.member_id}') + pass # Return dict (key, val) tuples return [(group.group_id, self._process_acl_operations(group.to_dict())) for group in response.groups] From 25fffb8344c21d39134c083006a75937ef01dd4d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 18 Apr 2026 07:23:50 -0700 Subject: [PATCH 10/14] consumer cli: --group-instance-id --- kafka/cli/consumer/__init__.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/kafka/cli/consumer/__init__.py b/kafka/cli/consumer/__init__.py index 2336cad0c..186b17360 100644 --- a/kafka/cli/consumer/__init__.py +++ b/kafka/cli/consumer/__init__.py @@ -18,6 +18,9 @@ def main_parser(): parser.add_argument( '-g', '--group', type=str, required=True, help='consumer group') + parser.add_argument( + '-i', '--group-instance-id', type=str, + help='static group membership identifier') parser.add_argument( '-f', '--format', type=str, default='str', help='output format: str|raw|full') @@ -71,7 +74,11 @@ def run_cli(args=None): logger = logging.getLogger(__name__) kwargs = build_kwargs(config.extra_config) - consumer = KafkaConsumer(bootstrap_servers=config.bootstrap_servers, group_id=config.group, **kwargs) + consumer = KafkaConsumer( + bootstrap_servers=config.bootstrap_servers, + group_id=config.group, + group_instance_id=config.group_instance_id, + **kwargs) consumer.subscribe(config.topics) try: for m in consumer: From 54f07cd414da34f49be4d3eb822911fbdb801a94 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 18 Apr 2026 07:24:27 -0700 Subject: [PATCH 11/14] admin cli: groups remove-members --- kafka/cli/admin/__init__.py | 3 -- kafka/cli/admin/groups/__init__.py | 4 ++- kafka/cli/admin/groups/remove_members.py | 42 ++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 4 deletions(-) create mode 100644 kafka/cli/admin/groups/remove_members.py diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 88fe9d69c..410345d35 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -104,9 +104,6 @@ def run_cli(args=None): # [configs] # IncrementalAlterConfigs (not supported yet) - # [groups] - # remove-members (not supported yet [uses LeaveGroupRequest]) - # [client-quotas] # describe (DescribeClientQuotas - not supported yet) # alter (AlterClientQuotas - not supported yet) diff --git a/kafka/cli/admin/groups/__init__.py b/kafka/cli/admin/groups/__init__.py index f3aaca759..b095e34d4 100644 --- a/kafka/cli/admin/groups/__init__.py +++ b/kafka/cli/admin/groups/__init__.py @@ -6,6 +6,7 @@ from .describe import DescribeGroups from .list import ListGroups from .list_offsets import ListGroupOffsets +from .remove_members import RemoveGroupMembers class GroupsSubCommand: @@ -15,6 +16,7 @@ def add_subparser(cls, subparsers): parser = subparsers.add_parser('groups', help='Manage Kafka Groups') commands = parser.add_subparsers() for cmd in [ListGroups, DescribeGroups, DeleteGroups, - ListGroupOffsets, AlterGroupOffsets, DeleteGroupOffsets]: + ListGroupOffsets, AlterGroupOffsets, DeleteGroupOffsets, + RemoveGroupMembers]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/groups/remove_members.py b/kafka/cli/admin/groups/remove_members.py new file mode 100644 index 000000000..08c37cbc0 --- /dev/null +++ b/kafka/cli/admin/groups/remove_members.py @@ -0,0 +1,42 @@ +from kafka.admin import MemberToRemove + + +class RemoveGroupMembers: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'remove-members', + help='Remove members from a consumer group') + parser.add_argument('-g', '--group-id', type=str, required=True) + parser.add_argument( + '-m', '--member-id', type=str, action='append', + dest='member_ids', default=[], + help='Dynamic member id to remove (repeatable).') + parser.add_argument( + '-i', '--group-instance-id', type=str, action='append', + dest='group_instance_ids', default=[], + help='Static group.instance.id to remove (repeatable; ' + 'requires broker LeaveGroup v3+).') + parser.add_argument( + '--reason', type=str, default=None, + help='Optional reason; sent to broker on LeaveGroup v5+ ' + '(KIP-800). Applied to all members.') + parser.add_argument( + '--group-coordinator-id', type=int, default=None, + help='Send directly to this broker id, skipping coordinator lookup') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + members = [MemberToRemove(member_id=mid, reason=args.reason) + for mid in args.member_ids] + members.extend(MemberToRemove(group_instance_id=gid, reason=args.reason) + for gid in args.group_instance_ids) + if not members: + raise ValueError( + 'At least one --member-id or --group-instance-id is required.') + result = client.remove_group_members( + args.group_id, members, + group_coordinator_id=args.group_coordinator_id) + return {repr(m): err.__name__ for m, err in result.items()} From 48e3786918d2c347ba0bcb81602de6264571325d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 18 Apr 2026 07:24:39 -0700 Subject: [PATCH 12/14] cli TODO updates --- kafka/cli/admin/__init__.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 410345d35..7374545cc 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -118,7 +118,6 @@ def run_cli(args=None): # [topics] # list-offsets (not supported yet) - # delete-offsets (OffsetDelete - not supported yet) # [cluster] # describe-features (DescribeFeatures - not supported yet) @@ -127,3 +126,12 @@ def run_cli(args=None): # api-versions # alter-log-dirs (AlterReplicaLogDirs - not supported yet) # DescribeQuorum (not supported yet) + # UnregisterBroker + # AddRaftVoter + # RemoveRaftVoter + + # [tokens] *DelegationTokenRequest + # create + # describe + # renew + # expire From bc7b6aee56faf8a9ff388c42ed404e9ac1bc74bb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 18 Apr 2026 07:42:25 -0700 Subject: [PATCH 13/14] fixup integration tests w/ name changes --- test/integration/test_admin_integration.py | 38 +++++++++---------- test/integration/test_producer_integration.py | 2 +- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 0de5baa7c..3d787f4e5 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -168,7 +168,7 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') -def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic): +def test_describe_group_exists(kafka_admin_client, kafka_consumer_factory, topic): """Tests that the describe consumer group call returns valid consumer group information This test takes inspiration from the test 'test_group' in test_consumer_group.py. """ @@ -234,20 +234,20 @@ def consumer_thread(i, group_id): sleep(1) info('Group stabilized; verifying assignment') - output = kafka_admin_client.describe_consumer_groups(group_id_list) + output = kafka_admin_client.describe_groups(group_id_list) assert len(output) == 2 - consumer_groups = set() - for consumer_group in output.values(): - assert(consumer_group['group_id'] in group_id_list) - if consumer_group['group_id'] == group_id_list[0]: - assert(len(consumer_group['members']) == 2) + groups = set() + for group in output.values(): + assert(group['group_id'] in group_id_list) + if group['group_id'] == group_id_list[0]: + assert(len(group['members']) == 2) else: - assert(len(consumer_group['members']) == 1) - for member in consumer_group['members']: + assert(len(group['members']) == 1) + for member in group['members']: assert(member['member_metadata']['topics'] == [topic]) assert(member['member_assignment']['assigned_partitions'][0]['topic'] == topic) - consumer_groups.add(consumer_group['group_id']) - assert(sorted(list(consumer_groups)) == group_id_list) + groups.add(group['group_id']) + assert(sorted(list(groups)) == group_id_list) finally: info('Shutting down %s consumers', num_consumers) for c in range(num_consumers): @@ -260,7 +260,7 @@ def consumer_thread(i, group_id): @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") -def test_delete_consumer_groups(kafka_admin_client, kafka_consumer_factory, send_messages): +def test_delete_groups(kafka_admin_client, kafka_consumer_factory, send_messages): random_group_id = 'test-group-' + random_string(6) group1 = random_group_id + "_1" group2 = random_group_id + "_2" @@ -279,24 +279,24 @@ def test_delete_consumer_groups(kafka_admin_client, kafka_consumer_factory, send next(consumer3) consumer3.close() - groups = {group['group_id'] for group in kafka_admin_client.list_consumer_groups()} + groups = {group['group_id'] for group in kafka_admin_client.list_groups()} assert group1 in groups assert group2 in groups assert group3 in groups - delete_results = kafka_admin_client.delete_consumer_groups([group1, group2]) + delete_results = kafka_admin_client.delete_groups([group1, group2]) assert delete_results[group1] == 'OK' assert delete_results[group2] == 'OK' assert group3 not in delete_results - groups = {group['group_id'] for group in kafka_admin_client.list_consumer_groups()} + groups = {group['group_id'] for group in kafka_admin_client.list_groups()} assert group1 not in groups assert group2 not in groups assert group3 in groups @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") -def test_delete_consumer_groups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages): +def test_delete_groups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages): random_group_id = 'test-group-' + random_string(6) group1 = random_group_id + "_1" group2 = random_group_id + "_2" @@ -310,17 +310,17 @@ def test_delete_consumer_groups_with_errors(kafka_admin_client, kafka_consumer_f consumer2 = kafka_consumer_factory(group_id=group2) next(consumer2) - groups = {group['group_id'] for group in kafka_admin_client.list_consumer_groups()} + groups = {group['group_id'] for group in kafka_admin_client.list_groups()} assert group1 in groups assert group2 in groups assert group3 not in groups - delete_results = kafka_admin_client.delete_consumer_groups([group1, group2, group3]) + delete_results = kafka_admin_client.delete_groups([group1, group2, group3]) assert delete_results[group1] == 'OK' assert delete_results[group2] == 'NonEmptyGroupError' assert delete_results[group3] == 'GroupIdNotFoundError' - groups = {group['group_id'] for group in kafka_admin_client.list_consumer_groups()} + groups = {group['group_id'] for group in kafka_admin_client.list_groups()} assert group1 not in groups assert group2 in groups assert group3 not in groups diff --git a/test/integration/test_producer_integration.py b/test/integration/test_producer_integration.py index 47fe09975..2adc155a8 100644 --- a/test/integration/test_producer_integration.py +++ b/test/integration/test_producer_integration.py @@ -240,4 +240,4 @@ def test_transactional_producer_offsets(kafka_producer_factory, kafka_admin_clie } if env_kafka_version() >= (2, 1): result[topic][0]['committed_leader_epoch'] = leader_epoch - assert admin.list_consumer_group_offsets('txn-test-group') == result + assert admin.list_group_offsets('txn-test-group') == result From 4890e40b96b890db95f8791dc878af9ddd333ecf Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 18 Apr 2026 08:01:41 -0700 Subject: [PATCH 14/14] Fixup cli remove-members result keys (member_id or group_instance_id) --- kafka/cli/admin/groups/remove_members.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/cli/admin/groups/remove_members.py b/kafka/cli/admin/groups/remove_members.py index 08c37cbc0..c55810a11 100644 --- a/kafka/cli/admin/groups/remove_members.py +++ b/kafka/cli/admin/groups/remove_members.py @@ -39,4 +39,4 @@ def command(cls, client, args): result = client.remove_group_members( args.group_id, members, group_coordinator_id=args.group_coordinator_id) - return {repr(m): err.__name__ for m, err in result.items()} + return {(m.member_id or m.group_instance_id): err.__name__ for m, err in result.items()}