Skip to content
96 changes: 81 additions & 15 deletions kafka/admin/_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,25 @@ def _describe_groups_process_response(self, response):
log.warn(f'Unable to decode member_assignment for {group}/{member.member_id}')
pass
# Return dict (key, val) tuples
return [(group.group_id, self._process_acl_operations(group.to_dict())) for group in response.groups]
results = {}
for group in response.groups:
group_id = group.group_id
result = self._process_acl_operations(group.to_dict())
error_code = result.pop('error_code')
error_message = result.pop('error_message', '') # v6+
result['error'] = str(Errors.for_code(error_code)(error_message)) if error_code else None
results[group_id] = result
return results

async def _async_describe_groups(self, group_ids, group_coordinator_id=None):
results = []
results = {}
for group_id in group_ids:
coordinator_id = group_coordinator_id or await self._find_coordinator_id(group_id)
request = self._describe_groups_request(group_id)
response = await self._manager.send(request, node_id=coordinator_id)
results.append(self._describe_groups_process_response(response))
results.update(self._describe_groups_process_response(response))
# Combine key/vals from multiple requests into single dict
return dict(itertools.chain(*results))
return results

def describe_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False):
"""Describe a set of consumer groups.
Expand Down Expand Up @@ -175,14 +183,20 @@ def _list_group_offsets_process_response(self, response):
raise error_type(
"OffsetFetchResponse failed with response '{}'."
.format(response))
def _partitions_to_dict(partitions):
d = {}
for p in partitions:
d[p.partition_index] = p.to_dict()
d[p.partition_index].pop('partition_index')
return d
return {topic.name: _partitions_to_dict(topic.partitions)
for topic in response.topics}
results = {}
for topic in response.topics:
for partition in topic.partitions:
tp = TopicPartition(topic.name, partition.partition_index)
error_type = Errors.for_code(partition.error_code)
if error_type is not Errors.NoError:
raise error_type(
f"OffsetFetchResponse failed for partition {tp.partition}")
results[tp] = OffsetAndMetadata(
offset=partition.committed_offset,
metadata=partition.metadata,
leader_epoch=partition.committed_leader_epoch
)
return results

async def _async_list_group_offsets(self, group_id, group_coordinator_id=None, partitions=None):
if group_coordinator_id is None:
Expand All @@ -191,8 +205,7 @@ async def _async_list_group_offsets(self, group_id, group_coordinator_id=None, p
response = await self._manager.send(request, node_id=group_coordinator_id)
return self._list_group_offsets_process_response(response)

def list_group_offsets(self, group_id, group_coordinator_id=None,
partitions=None):
def list_group_offsets(self, group_id, group_coordinator_id=None, partitions=None):
"""Fetch committed offsets for a single consumer group.

Note:
Expand All @@ -213,7 +226,8 @@ def list_group_offsets(self, group_id, group_coordinator_id=None,
known offsets for the consumer group. Default: None.

Returns:
dict: {topic: [{partition data}]} key/vals from OffsetCommitResponse}]}
A dict mapping :class:`~kafka.TopicPartition` to
:class:`~kafka.structs.OffsetAndMetadata`.
"""
return self._manager.run(self._async_list_group_offsets, group_id, group_coordinator_id, partitions)

Expand Down Expand Up @@ -334,6 +348,58 @@ def alter_group_offsets(self, group_id, offsets, group_coordinator_id=None):
return self._manager.run(
self._async_alter_group_offsets, group_id, offsets, group_coordinator_id)

# -- Reset group offsets ----------------------------------------------

@staticmethod
def _reset_group_offsets_process_response(response, to_reset):
results = {}
for topic in response.topics:
for partition in topic.partitions:
tp = TopicPartition(topic.name, partition.partition_index)
results[tp] = {
'error': Errors.for_code(partition.error_code),
'offset': to_reset[tp].offset
}
return results

async def _async_reset_group_offsets(self, group_id, offset_specs, group_coordinator_id=None):
if not offset_specs:
return {}
if group_coordinator_id is None:
group_coordinator_id = await self._find_coordinator_id(group_id)
current = await self._async_list_group_offsets(group_id, group_coordinator_id, offset_specs.keys())
offsets = await self._async_list_partition_offsets(offset_specs)
to_reset = {}
for tp in offsets:
to_reset[tp] = current[tp]._replace(offset=offsets[tp].offset)
request = self._alter_group_offsets_request(group_id, to_reset)
response = await self._manager.send(request, node_id=group_coordinator_id)
return self._reset_group_offsets_process_response(response, to_reset)

def reset_group_offsets(self, group_id, offset_specs, group_coordinator_id=None):
"""Reset committed offsets for a consumer group to earliest or latest.

The group must have no active members (i.e. be empty or dead) for
the reset to succeed; otherwise individual partitions may return
``UNKNOWN_MEMBER_ID`` or similar errors.

Arguments:
group_id (str): The consumer group id.
offset_specs (dict): A dict mapping :class:`~kafka.TopicPartition` to
:class:`~kafka.admin.OffsetSpec`.

Keyword Arguments:
group_coordinator_id (int, optional): The node_id of the group's
coordinator broker. If None, the cluster will be queried to
locate the coordinator. Default: None.

Returns:
dict: A dict mapping :class:`~kafka.TopicPartition` to dict of
{'error': :class:`~kafka.errors.KafkaError` class, 'offset': int}
"""
return self._manager.run(
self._async_reset_group_offsets, group_id, offset_specs, group_coordinator_id)

# -- Delete group offsets ----------------------------------------------

@staticmethod
Expand Down
6 changes: 3 additions & 3 deletions kafka/admin/_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,14 +487,14 @@ async def _async_list_partition_offsets(self, topic_partition_specs, isolation_l
continue
return results

def list_partition_offsets(self, topic_partitions, isolation_level='read_uncommitted'):
def list_partition_offsets(self, topic_partition_specs, isolation_level='read_uncommitted'):
"""Look up offsets for the given partitions by spec.

Partitions are routed to their respective leader brokers via cluster
metadata; one ``ListOffsetsRequest`` is sent per leader.

Arguments:
topic_partitions: dict mapping :class:`~kafka.TopicPartition` to
topic_partition_specs: dict mapping :class:`~kafka.TopicPartition` to
:class:`OffsetSpec` (or a raw integer timestamp /
wire-level sentinel).

Expand All @@ -515,7 +515,7 @@ def list_partition_offsets(self, topic_partitions, isolation_level='read_uncommi
of ListOffsetsRequest compatible with the requested specs.
"""
return self._manager.run(
self._async_list_partition_offsets, topic_partitions, isolation_level)
self._async_list_partition_offsets, topic_partition_specs, isolation_level)


class NewPartitions:
Expand Down
3 changes: 3 additions & 0 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ def run_cli(args=None):


# Commands TODO:
# --dry-run support
# --trace ?

# [configs]
# IncrementalAlterConfigs (not supported yet)

Expand Down
3 changes: 2 additions & 1 deletion kafka/cli/admin/groups/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .list import ListGroups
from .list_offsets import ListGroupOffsets
from .remove_members import RemoveGroupMembers
from .reset_offsets import ResetGroupOffsets


class GroupsSubCommand:
Expand All @@ -16,7 +17,7 @@ def add_subparser(cls, subparsers):
parser = subparsers.add_parser('groups', help='Manage Kafka Groups')
commands = parser.add_subparsers()
for cmd in [ListGroups, DescribeGroups, DeleteGroups,
ListGroupOffsets, AlterGroupOffsets, DeleteGroupOffsets,
ListGroupOffsets, AlterGroupOffsets, ResetGroupOffsets, DeleteGroupOffsets,
RemoveGroupMembers]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
19 changes: 13 additions & 6 deletions kafka/cli/admin/groups/list_offsets.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from collections import defaultdict

from kafka.structs import TopicPartition
from kafka.admin import OffsetSpec

Expand All @@ -13,10 +15,15 @@ def add_subparser(cls, subparsers):
@classmethod
def command(cls, client, args):
offsets = client.list_group_offsets(args.group_id)
partitions = [TopicPartition(topic, partition) for topic in offsets for partition in offsets[topic]]
latest = client.list_partition_offsets({tp: OffsetSpec.LATEST for tp in partitions})
latest = client.list_partition_offsets({tp: OffsetSpec.LATEST for tp in offsets})
results = defaultdict(dict)
for tp in latest:
part_res = offsets[tp.topic][tp.partition]
part_res['latest_offset'] = latest[tp].offset
part_res['lag'] = latest[tp].offset - part_res['committed_offset']
return offsets
committed = offsets[tp]
results[tp.topic][tp.partition] = {
'offset': committed.offset,
'leader_epoch': committed.leader_epoch,
'metadata': committed.metadata,
'latest_offset': latest[tp].offset,
'lag': latest[tp].offset - committed.offset,
}
return dict(results)
78 changes: 78 additions & 0 deletions kafka/cli/admin/groups/reset_offsets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from collections import defaultdict

from kafka.admin import OffsetSpec
from kafka.structs import OffsetAndMetadata, TopicPartition


class ResetGroupOffsets:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser(
'reset-offsets',
help='Reset committed offsets for a consumer group')
parser.add_argument('-g', '--group-id', type=str, required=True)
parser.add_argument(
'-s', '--spec', type=str,
help='Spec may be one of earliest, latest, max-timestamp, earliest-local, '
'latest-tiered, or a millisecond timestamp. '
'Applies to all topic/partitions currently in group. Mutually exclusive '
'with --partition')
parser.add_argument(
'-p', '--partition', type=str, action='append',
dest='partitions', default=[],
help='TOPIC:PARTITION:SPEC triple (repeatable). PARTITION may be a '
'single partition, a closed range (0-2), an open range (1-), or '
'a single wildcard "*" for all partitions. SPEC may be one of '
'earliest, latest, max-timestamp, earliest-local, latest-tiered, '
'or a millisecond timestamp.')
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
if not args.spec and not args.partitions:
raise ValueError('One of --spec or --partition is required')
elif args.spec and args.partitions:
raise ValueError('Only one of --spec and --partition are allowed')
group = client.describe_groups([args.group_id])
state = group[args.group_id]['group_state']
if state not in ('Empty', 'Dead'):
raise RuntimeError(f'Group {args.group_id} is {state}, expecting Empty or Dead!')
offset_specs = {}
if args.spec:
offsets = client.list_group_offsets(args.group_id)
spec = cls._parse_spec(args.spec)
offset_specs = {tp: spec for tp in offsets}
else:
offset_specs = cls._parse_partition_specs(args.partitions)
result = client.reset_group_offsets(args.group_id, offset_specs)
output = defaultdict(dict)
for tp, res in result.items():
res['error'] = res['error'].__name__
output[tp.topic][tp.partition] = res
return dict(output)

@staticmethod
def _parse_spec(spec):
try:
return int(spec)
except ValueError:
pass
try:
spec_key = spec.upper().replace('-', '_')
return OffsetSpec[spec_key]
except KeyError:
raise ValueError(f'{spec_key} is not a valid OffsetSpec')

@classmethod
def _parse_partition_specs(cls, partitions):
tp_offsets = {}
for entry in partitions:
topic, partition, spec_str = entry.rsplit(':', 2)
spec = cls._parse_spec(spec_str)
tp = TopicPartition(topic, int(partition))
if tp in tp_offsets:
# Passing multiple specs for a single partition results in an InvalidRequestError
raise ValueError('Only one spec allowed per partition')
tp_offsets[tp] = spec
return tp_offsets
20 changes: 9 additions & 11 deletions test/integration/test_producer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ def test_transactional_producer_offsets(kafka_producer_factory, kafka_admin_clie
else:
leader_epoch = -1
topic = 'transactional_test_topic'
offsets = {TopicPartition(topic, 0): OffsetAndMetadata(0, 'metadata', leader_epoch)}
tp0 = TopicPartition(topic, 0)
tp1 = TopicPartition(topic, 1)
offsets = {tp0: OffsetAndMetadata(0, 'metadata', leader_epoch)}
producer = kafka_producer_factory(transactional_id='testing')
producer.init_transactions()
producer.begin_transaction()
Expand All @@ -225,19 +227,15 @@ def test_transactional_producer_offsets(kafka_producer_factory, kafka_admin_clie
producer.commit_transaction()

producer.begin_transaction()
producer.send_offsets_to_transaction({TopicPartition(topic, 1): OffsetAndMetadata(1, 'bad', 1)}, 'txn-test-group')
producer.send_offsets_to_transaction({tp1: OffsetAndMetadata(1, 'bad', 1)}, 'txn-test-group')
producer.abort_transaction()

admin = kafka_admin_client_factory()
result = {
topic: {
0: {
'committed_offset': 0,
'error_code': 0,
'metadata': 'metadata',
},
}
tp0: OffsetAndMetadata(
offset=0,
metadata='metadata',
leader_epoch=leader_epoch,
)
}
if env_kafka_version() >= (2, 1):
result[topic][0]['committed_leader_epoch'] = leader_epoch
assert admin.list_group_offsets('txn-test-group') == result
Loading