diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index 59127f68b..2e55f0881 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -4,7 +4,7 @@ from kafka.admin._configs import ( ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType) from kafka.admin._groups import MemberToRemove -from kafka.admin._partitions import NewPartitions +from kafka.admin._partitions import NewPartitions, OffsetSpec from kafka.admin._topics import NewTopic from kafka.admin._users import ( ScramMechanism, UserScramCredentialDeletion, UserScramCredentialUpsertion) @@ -14,6 +14,6 @@ 'ACL', 'ACLFilter', 'ACLOperation', 'ACLPermissionType', 'ACLResourcePatternType', 'ResourceType', 'ResourcePattern', 'ResourcePatternFilter', 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType', - 'MemberToRemove', # NewTopic + NewPartitions are deprecated and not included in __all__ + 'MemberToRemove', 'OffsetSpec', # NewTopic + NewPartitions are deprecated and not included in __all__ 'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion', ] diff --git a/kafka/admin/_partitions.py b/kafka/admin/_partitions.py index 37fcc470d..a17ee658b 100644 --- a/kafka/admin/_partitions.py +++ b/kafka/admin/_partitions.py @@ -20,7 +20,9 @@ ElectionType, ListPartitionReassignmentsRequest, ) -from kafka.structs import TopicPartition +from kafka.protocol.consumer import ListOffsetsRequest, IsolationLevel, OffsetSpec +from kafka.structs import TopicPartition, OffsetAndTimestamp + if TYPE_CHECKING: from kafka.net.manager import KafkaConnectionManager @@ -94,22 +96,21 @@ async def _async_get_leader_for_partitions(self, partitions): metadata = await self._get_cluster_metadata(topics) - leader2partitions = defaultdict(list) + leader2partitions = defaultdict(set) valid_partitions = set() for topic in metadata.get("topics", ()): for partition in topic.get("partitions", ()): t2p = TopicPartition(topic=topic["name"], partition=partition["partition_index"]) if t2p in partitions: - leader2partitions[partition["leader_id"]].append(t2p) + leader2partitions[partition["leader_id"]].add(t2p) valid_partitions.add(t2p) - if len(partitions) != len(valid_partitions): - unknown = set(partitions) - valid_partitions + if partitions != valid_partitions: + unknown = partitions - valid_partitions raise UnknownTopicOrPartitionError( "The following partitions are not known: %s" % ", ".join(str(x) for x in unknown) ) - return leader2partitions async def _async_delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): @@ -419,6 +420,103 @@ def describe_topic_partitions(self, topics, response_partition_limit=2000, curso return self._manager.run( self._async_describe_topic_partitions, topics, response_partition_limit, cursor) + # -- List partition offsets -------------------------------------------- + + @staticmethod + def _list_partition_offsets_request(partition_timestamps, isolation_level_int): + min_version = max(ListOffsetsRequest.min_version_for_isolation_level(isolation_level_int), 0) + _Topic = ListOffsetsRequest.ListOffsetsTopic + _Partition = _Topic.ListOffsetsPartition + topic2partitions = defaultdict(list) + for tp, ts in partition_timestamps.items(): + if not isinstance(ts, (int, OffsetSpec)): + raise TypeError(f'Unsupported ts type {type(ts)}, expected int or OffsetSpec') + elif int(ts) < 0: + min_version = max(ListOffsetsRequest.min_version_for_timestamp(ts), min_version) + topic2partitions[tp.topic].append( + _Partition(partition_index=tp.partition, timestamp=ts)) + return ListOffsetsRequest( + replica_id=-1, + isolation_level=isolation_level_int, + topics=[_Topic(name=name, partitions=parts) + for name, parts in topic2partitions.items()], + min_version=min_version, + ) + + @staticmethod + def _list_partition_offsets_process_response(response): + results = {} + for topic in response.topics: + for partition in topic.partitions: + tp = TopicPartition(topic.name, partition.partition_index) + err = Errors.for_code(partition.error_code) + if err is not Errors.NoError: + raise err( + "ListOffsetsRequest failed for %s: %s" % (tp, err.__name__)) + leader_epoch = getattr(partition, 'leader_epoch', -1) + results[tp] = OffsetAndTimestamp( + offset=partition.offset, + timestamp=partition.timestamp, + leader_epoch=leader_epoch if leader_epoch >= 0 else None, + ) + return results + + async def _async_list_partition_offsets(self, topic_partition_specs, isolation_level='read_uncommitted'): + if isinstance(isolation_level, str): + try: + isolation_level = IsolationLevel[isolation_level.upper()] + except KeyError: + raise ValueError(f'Unrecognized isolation_level: {isolation_level}') + elif isinstance(isolation_level, int): + isolation_level = IsolationLevel(isolation_level) + + results = {} + topic_partitions = set(topic_partition_specs.keys()) + while topic_partitions: + leader2partitions = await self._async_get_leader_for_partitions(topic_partitions) + + for leader, partitions in leader2partitions.items(): + request = self._list_partition_offsets_request( + {tp: spec for tp, spec in topic_partition_specs.items() if tp in partitions}, + isolation_level.value) + try: + response = await self._manager.send(request, node_id=leader) + results.update(self._list_partition_offsets_process_response(response)) + topic_partitions -= partitions + except Errors.NotLeaderForPartitionError: + continue + return results + + def list_partition_offsets(self, topic_partitions, isolation_level='read_uncommitted'): + """Look up offsets for the given partitions by spec. + + Partitions are routed to their respective leader brokers via cluster + metadata; one ``ListOffsetsRequest`` is sent per leader. + + Arguments: + topic_partitions: dict mapping :class:`~kafka.TopicPartition` to + :class:`OffsetSpec` (or a raw integer timestamp / + wire-level sentinel). + + Keyword Arguments: + isolation_level (str, optional): One of ``'read_uncommitted'`` + (default) or ``'read_committed'``. ``read_committed`` requires + broker support for ListOffsets v2+. + + Returns: + dict: A dict mapping :class:`~kafka.TopicPartition` to + :class:`~kafka.structs.OffsetAndTimestamp` + + Raises: + KafkaError: If any partition response carries an error code. + UnknownTopicOrPartitionError: If a requested partition is not + known to the cluster. + UnsupportedVersionError: If the broker does not support a version + of ListOffsetsRequest compatible with the requested specs. + """ + return self._manager.run( + self._async_list_partition_offsets, topic_partitions, isolation_level) + class NewPartitions: """DEPRECATED: A class for new partition creation on existing topics. diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 024e6c437..d10deed4f 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -124,9 +124,6 @@ def run_cli(args=None): # list (ListTransactions - not supported yet) # abort (not supported yet) - # [topics] - # list-offsets (not supported yet) - # [cluster] # describe-features (DescribeFeatures - not supported yet) # update-features (UpdateFeatures - not supported yet) diff --git a/kafka/cli/admin/groups/list_offsets.py b/kafka/cli/admin/groups/list_offsets.py index 3592ae132..47f8da05b 100644 --- a/kafka/cli/admin/groups/list_offsets.py +++ b/kafka/cli/admin/groups/list_offsets.py @@ -1,7 +1,22 @@ +from kafka.structs import TopicPartition +from kafka.admin import OffsetSpec + + class ListGroupOffsets: @classmethod def add_subparser(cls, subparsers): parser = subparsers.add_parser('list-offsets', help='List Offsets for Group') parser.add_argument('-g', '--group-id', type=str, required=True) - parser.set_defaults(command=lambda cli, args: cli.list_group_offsets(args.group_id)) + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + offsets = client.list_group_offsets(args.group_id) + partitions = [TopicPartition(topic, partition) for topic in offsets for partition in offsets[topic]] + latest = client.list_partition_offsets({tp: OffsetSpec.LATEST for tp in partitions}) + for tp in latest: + part_res = offsets[tp.topic][tp.partition] + part_res['latest_offset'] = latest[tp].offset + part_res['lag'] = latest[tp].offset - part_res['committed_offset'] + return offsets diff --git a/kafka/cli/admin/partitions/__init__.py b/kafka/cli/admin/partitions/__init__.py index 1577e4566..428987089 100644 --- a/kafka/cli/admin/partitions/__init__.py +++ b/kafka/cli/admin/partitions/__init__.py @@ -5,6 +5,7 @@ from .delete_records import DeleteRecords from .describe import DescribeTopicPartitions from .elect_leaders import ElectLeaders +from .list_offsets import ListPartitionOffsets from .list_reassignments import ListPartitionReassignments @@ -17,6 +18,7 @@ def add_subparser(cls, subparsers): for cmd in [ CreatePartitions, DescribeTopicPartitions, + ListPartitionOffsets, ListPartitionReassignments, AlterPartitionReassignments, DeleteRecords, diff --git a/kafka/cli/admin/partitions/list_offsets.py b/kafka/cli/admin/partitions/list_offsets.py new file mode 100644 index 000000000..51003c236 --- /dev/null +++ b/kafka/cli/admin/partitions/list_offsets.py @@ -0,0 +1,100 @@ +from collections import defaultdict + +from kafka.protocol.consumer import OffsetSpec +from kafka.structs import TopicPartition + + +class ListPartitionOffsets: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'list-offsets', + help='List offsets for partitions by spec (earliest/latest/timestamp)') + parser.add_argument( + '-t', '--topic', type=str) + parser.add_argument( + '-s', '--spec', type=str, + help='Spec may be one of earliest, latest, max-timestamp, earliest-local, ' + 'latest-tiered, or a millisecond timestamp.') + parser.add_argument( + '-p', '--partition', type=str, action='append', + dest='partitions', default=[], + help='TOPIC:PARTITION:SPEC triple (repeatable). PARTITION may be a ' + 'single partition, a closed range (0-2), an open range (1-), or ' + 'a single wildcard "*" for all partitions. SPEC may be one of ' + 'earliest, latest, max-timestamp, earliest-local, latest-tiered, ' + 'or a millisecond timestamp.') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + tp_offsets = cls._parse_partition_specs(client, args) + output = defaultdict(dict) + result = client.list_partition_offsets(tp_offsets) + for tp, info in result.items(): + output[tp.topic][tp.partition] = { + 'offset': info.offset, + 'timestamp': info.timestamp, + 'leader_epoch': info.leader_epoch, + 'spec': tp_offsets[tp] + } + return dict(output) + + @staticmethod + def _parse_spec(spec): + try: + return int(spec) + except ValueError: + pass + try: + spec_key = spec.upper().replace('-', '_') + return OffsetSpec[spec_key] + except KeyError: + raise ValueError(f'{spec_key} is not a valid OffsetSpec') + + @classmethod + def _parse_partition_specs(cls, client, args): + if args.partitions: + assert not args.topic and not args.spec, "Either --partition or (--topic and --spec) is supported, but not both." + partitions = args.partitions + else: + assert args.topic and args.spec, "Both --topic and --spec must be provided." + partitions = [f'{args.topic}:*:{args.spec}'] + tp_offsets = {} + for entry in partitions: + topic, partition, spec_str = entry.rsplit(':', 2) + spec = cls._parse_spec(spec_str) + for tp in cls._parse_tp(client, topic, partition): + if tp in tp_offsets: + # Passing multiple specs for a single partition results in an InvalidRequestError + raise ValueError('Only one spec allowed per partition') + tp_offsets[tp] = spec + return tp_offsets + + @classmethod + def _parse_tp(cls, client, topic, partition, cache={}): + try: + return [TopicPartition(topic, int(partition))] + except ValueError: + pass + if not partition == '*' and '-' not in partition: + raise ValueError(f'Unrecognized partition: {partition}') + + if topic not in cache: + cache[topic] = sorted([p['partition_index'] for p in client.describe_topics([topic])[0]['partitions']]) + + if partition == '*': + return [TopicPartition(topic, p) + for p in cache[topic]] + + elif '-' in partition: + start, end = partition.split('-') + if not start and not end: + raise ValueError(f'Unrecognized partition: {partition}') + if not start: + start = cache[topic][0] + if not end: + end = cache[topic][-1] + return [TopicPartition(topic, p) + for p in range(int(start), int(end) + 1)] diff --git a/kafka/protocol/consumer/fetch.py b/kafka/protocol/consumer/fetch.py index 173715664..5eef57121 100644 --- a/kafka/protocol/consumer/fetch.py +++ b/kafka/protocol/consumer/fetch.py @@ -1,7 +1,14 @@ from ..api_message import ApiMessage -class FetchRequest(ApiMessage): pass +class FetchRequest(ApiMessage): + @classmethod + def min_version_for_isolation_level(cls, il): + if int(il) > 0: + return 4 + else: + return 0 + class FetchResponse(ApiMessage): pass __all__ = [ diff --git a/kafka/protocol/consumer/fetch.pyi b/kafka/protocol/consumer/fetch.pyi index 9127585d0..bf31005ea 100644 --- a/kafka/protocol/consumer/fetch.pyi +++ b/kafka/protocol/consumer/fetch.pyi @@ -129,6 +129,8 @@ class FetchRequest(ApiMessage): def is_request(cls) -> bool: ... def expect_response(self) -> bool: ... def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + @classmethod + def min_version_for_isolation_level(cls, il: Any) -> Any: ... class FetchResponse(ApiMessage): class FetchableTopicResponse(DataContainer): diff --git a/kafka/protocol/consumer/offsets.py b/kafka/protocol/consumer/offsets.py index 709c0162f..452fd6f7b 100644 --- a/kafka/protocol/consumer/offsets.py +++ b/kafka/protocol/consumer/offsets.py @@ -1,3 +1,5 @@ +from enum import IntEnum + from ..api_message import ApiMessage @@ -8,15 +10,50 @@ class OffsetResetStrategy: EARLIEST = -2 NONE = 0 -class ListOffsetsRequest(ApiMessage): pass + +class IsolationLevel(IntEnum): + READ_UNCOMMITTED = 0 + READ_COMMITTED = 1 + + +class OffsetSpec(IntEnum): + # Any >= 0: # earliest offset whose timestamp is greater than or equal to the given timestamp and the timestamp of that record. + LATEST = -1 # offset of the next message that will be appended to the log and a timestamp of -1 + EARLIEST = -2 # first offset on the partition, including remote-storage, and a timestamp of -1 + MAX_TIMESTAMP = -3 # offset and timestamp corresponding to the record with the highest timestamp on the partition. (KIP-734) + EARLIEST_LOCAL = -4 # first offset on the local partition of the leader broker, excluding remote-storage, and a timestamp of -1 (KIP-405) + LATEST_TIERED = -5 # the latest offset of the partition in remote storage (KIP-1005) + + +class ListOffsetsRequest(ApiMessage): + @classmethod + def min_version_for_timestamp(cls, ts): + ts = OffsetSpec(ts) + if ts == OffsetSpec.MAX_TIMESTAMP: + return 7 + elif ts == OffsetSpec.EARLIEST_LOCAL: + return 8 + elif ts == OffsetSpec.LATEST_TIERED: + return 9 + else: + return 0 + + @classmethod + def min_version_for_isolation_level(cls, il): + if int(il) > 0: + return 2 + else: + return 0 + class ListOffsetsResponse(ApiMessage): pass + class OffsetForLeaderEpochRequest(ApiMessage): pass class OffsetForLeaderEpochResponse(ApiMessage): pass __all__ = [ - 'UNKNOWN_OFFSET', 'OffsetResetStrategy', + 'UNKNOWN_OFFSET', 'OffsetResetStrategy', 'IsolationLevel', 'OffsetSpec', 'ListOffsetsRequest', 'ListOffsetsResponse', 'OffsetForLeaderEpochRequest', 'OffsetForLeaderEpochResponse', ] diff --git a/kafka/protocol/consumer/offsets.pyi b/kafka/protocol/consumer/offsets.pyi index 22471829b..239ee774e 100644 --- a/kafka/protocol/consumer/offsets.pyi +++ b/kafka/protocol/consumer/offsets.pyi @@ -2,10 +2,11 @@ import uuid from typing import Any, Self +from enum import IntEnum from kafka.protocol.api_message import ApiMessage from kafka.protocol.data_container import DataContainer -__all__ = ['UNKNOWN_OFFSET', 'OffsetResetStrategy', 'ListOffsetsRequest', 'ListOffsetsResponse', 'OffsetForLeaderEpochRequest', 'OffsetForLeaderEpochResponse'] +__all__ = ['UNKNOWN_OFFSET', 'OffsetResetStrategy', 'IsolationLevel', 'OffsetSpec', 'ListOffsetsRequest', 'ListOffsetsResponse', 'OffsetForLeaderEpochRequest', 'OffsetForLeaderEpochResponse'] UNKNOWN_OFFSET: int @@ -14,6 +15,17 @@ class OffsetResetStrategy: EARLIEST: int NONE: int +class IsolationLevel(IntEnum): + READ_UNCOMMITTED: int + READ_COMMITTED: int + +class OffsetSpec(IntEnum): + LATEST: int + EARLIEST: int + MAX_TIMESTAMP: int + EARLIEST_LOCAL: int + LATEST_TIERED: int + class ListOffsetsRequest(ApiMessage): class ListOffsetsTopic(DataContainer): class ListOffsetsPartition(DataContainer): @@ -79,6 +91,10 @@ class ListOffsetsRequest(ApiMessage): def is_request(cls) -> bool: ... def expect_response(self) -> bool: ... def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + @classmethod + def min_version_for_timestamp(cls, ts: Any) -> Any: ... + @classmethod + def min_version_for_isolation_level(cls, il: Any) -> Any: ... class ListOffsetsResponse(ApiMessage): class ListOffsetsTopicResponse(DataContainer): diff --git a/test/admin/test_admin_partitions.py b/test/admin/test_admin_partitions.py index f4bb64738..e593d8a92 100644 --- a/test/admin/test_admin_partitions.py +++ b/test/admin/test_admin_partitions.py @@ -2,14 +2,19 @@ import pytest -from kafka.admin import KafkaAdminClient +from kafka.admin import KafkaAdminClient, OffsetSpec +from kafka.errors import ( + UnknownTopicOrPartitionError, + IncompatibleBrokerVersion, +) from kafka.protocol.admin import ( AlterPartitionReassignmentsRequest, AlterPartitionReassignmentsResponse, ListPartitionReassignmentsRequest, ListPartitionReassignmentsResponse, DescribeTopicPartitionsRequest, DescribeTopicPartitionsResponse, ) +from kafka.protocol.consumer import ListOffsetsRequest, ListOffsetsResponse from kafka.protocol.metadata import MetadataResponse -from kafka.structs import TopicPartition +from kafka.structs import TopicPartition, OffsetAndTimestamp from test.mock_broker import MockBroker @@ -433,3 +438,296 @@ def handler(api_key, api_version, correlation_id, request_bytes): assert cursor is not None assert cursor.topic_name == 'topic-a' assert cursor.partition_index == 3 + + +# --------------------------------------------------------------------------- +# list_partition_offsets +# --------------------------------------------------------------------------- + + +def _set_metadata_for_topic(broker, name, num_partitions, leader_id=0, brokers=None): + Topic = MetadataResponse.MetadataResponseTopic + Partition = Topic.MetadataResponsePartition + if brokers is None: + Broker = MetadataResponse.MetadataResponseBroker + brokers = [Broker(node_id=0, host=broker.host, port=broker.port, rack=None)] + broker.set_metadata( + topics=[Topic( + version=8, error_code=0, name=name, is_internal=False, + partitions=[ + Partition(version=8, error_code=0, partition_index=i, + leader_id=leader_id if not isinstance(leader_id, dict) else leader_id[i], + leader_epoch=0, + replica_nodes=[0], isr_nodes=[0], offline_replicas=[]) + for i in range(num_partitions) + ])], + brokers=brokers, + ) + + +def _list_offsets_response(per_partition): + """per_partition: list of (topic, partition, offset, timestamp, leader_epoch, error_code).""" + Topic = ListOffsetsResponse.ListOffsetsTopicResponse + Partition = Topic.ListOffsetsPartitionResponse + by_topic = {} + for topic, partition, offset, ts, le, err in per_partition: + by_topic.setdefault(topic, []).append(Partition( + partition_index=partition, error_code=err, timestamp=ts, + offset=offset, leader_epoch=le)) + return ListOffsetsResponse( + throttle_time_ms=0, + topics=[Topic(name=t, partitions=parts) for t, parts in by_topic.items()], + ) + + +class TestListPartitionOffsetsMockBroker: + + def test_returns_result_info(self): + broker = MockBroker() + _set_metadata_for_topic(broker, 'topic-a', num_partitions=2) + broker.respond( + ListOffsetsRequest, + _list_offsets_response([ + ('topic-a', 0, 100, 1234, 5, 0), + ('topic-a', 1, 200, 5678, 7, 0), + ]), + ) + + admin = _make_admin(broker) + try: + result = admin.list_partition_offsets({ + TopicPartition('topic-a', 0): OffsetSpec.EARLIEST, + TopicPartition('topic-a', 1): OffsetSpec.LATEST, + }) + finally: + admin.close() + + assert result == { + TopicPartition('topic-a', 0): OffsetAndTimestamp( + offset=100, timestamp=1234, leader_epoch=5), + TopicPartition('topic-a', 1): OffsetAndTimestamp( + offset=200, timestamp=5678, leader_epoch=7), + } + + def test_request_uses_spec_timestamp_sentinels(self): + broker = MockBroker() + _set_metadata_for_topic(broker, 'topic-a', num_partitions=2) + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['version'] = api_version + captured['request'] = ListOffsetsRequest.decode( + request_bytes, version=api_version, header=True) + return _list_offsets_response([ + ('topic-a', 0, 0, -1, -1, 0), + ('topic-a', 1, 0, -1, -1, 0), + ]) + + broker.respond_fn(ListOffsetsRequest, handler) + + admin = _make_admin(broker) + try: + admin.list_partition_offsets({ + TopicPartition('topic-a', 0): OffsetSpec.EARLIEST, + TopicPartition('topic-a', 1): OffsetSpec.LATEST, + }) + finally: + admin.close() + + req = captured['request'] + assert req.replica_id == -1 + assert req.isolation_level == 0 + topic = req.topics[0] + timestamps = {p.partition_index: p.timestamp for p in topic.partitions} + assert timestamps == {0: -2, 1: -1} + + def test_offset_timestamp_passed_through(self): + broker = MockBroker() + _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = ListOffsetsRequest.decode( + request_bytes, version=api_version, header=True) + return _list_offsets_response([('topic-a', 0, 42, 1700000000, 0, 0)]) + + broker.respond_fn(ListOffsetsRequest, handler) + + admin = _make_admin(broker) + try: + result = admin.list_partition_offsets({ + TopicPartition('topic-a', 0): 1700000000, + }) + finally: + admin.close() + + assert captured['request'].topics[0].partitions[0].timestamp == 1700000000 + assert result[TopicPartition('topic-a', 0)].offset == 42 + + def test_groups_partitions_by_leader(self): + # Two partitions, two different leaders => two requests. + broker = MockBroker() + Broker = MetadataResponse.MetadataResponseBroker + Topic = MetadataResponse.MetadataResponseTopic + Partition = Topic.MetadataResponsePartition + broker.set_metadata( + topics=[Topic( + version=8, error_code=0, name='topic-a', is_internal=False, + partitions=[ + Partition(version=8, error_code=0, partition_index=0, + leader_id=0, leader_epoch=0, + replica_nodes=[0], isr_nodes=[0], offline_replicas=[]), + Partition(version=8, error_code=0, partition_index=1, + leader_id=1, leader_epoch=0, + replica_nodes=[1], isr_nodes=[1], offline_replicas=[]), + ])], + brokers=[ + Broker(node_id=0, host=broker.host, port=broker.port, rack=None), + Broker(node_id=1, host=broker.host, port=broker.port, rack=None), + ], + ) + + captured = [] + + def handler(api_key, api_version, correlation_id, request_bytes): + req = ListOffsetsRequest.decode( + request_bytes, version=api_version, header=True) + captured.append(req) + partitions = [(t.name, p.partition_index, 0, -1, -1, 0) + for t in req.topics for p in t.partitions] + return _list_offsets_response(partitions) + + broker.respond_fn(ListOffsetsRequest, handler) + broker.respond_fn(ListOffsetsRequest, handler) + + admin = _make_admin(broker) + try: + admin.list_partition_offsets({ + TopicPartition('topic-a', 0): OffsetSpec.LATEST, + TopicPartition('topic-a', 1): OffsetSpec.LATEST, + }) + finally: + admin.close() + + # One request per leader, each carrying exactly one partition. + assert len(captured) == 2 + for req in captured: + assert sum(len(t.partitions) for t in req.topics) == 1 + + def test_partition_error_raises(self): + broker = MockBroker() + _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) + broker.respond( + ListOffsetsRequest, + _list_offsets_response([ + ('topic-a', 0, -1, -1, -1, UnknownTopicOrPartitionError.errno), + ]), + ) + + admin = _make_admin(broker) + try: + with pytest.raises(UnknownTopicOrPartitionError): + admin.list_partition_offsets({ + TopicPartition('topic-a', 0): OffsetSpec.LATEST, + }) + finally: + admin.close() + + def test_unknown_partition_raises(self): + broker = MockBroker() + _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) + admin = _make_admin(broker) + try: + with pytest.raises(UnknownTopicOrPartitionError): + admin.list_partition_offsets({ + TopicPartition('topic-a', 99): OffsetSpec.LATEST, + }) + finally: + admin.close() + + def test_max_timestamp_requires_v7(self): + # (2, 7) broker -> ListOffsets max v6 -> MAX_TIMESTAMP unsupported + broker = MockBroker(broker_version=(2, 7)) + _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) + admin = _make_admin(broker) + try: + with pytest.raises(IncompatibleBrokerVersion): + admin.list_partition_offsets({ + TopicPartition('topic-a', 0): OffsetSpec.MAX_TIMESTAMP, + }) + finally: + admin.close() + + def test_invalid_timestamp_raises(self): + broker = MockBroker() + _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) + admin = _make_admin(broker) + try: + with pytest.raises(ValueError): + admin.list_partition_offsets({TopicPartition('topic-a', 0): -100}, 0) + finally: + admin.close() + + def test_invalid_isolation_level_raises(self): + broker = MockBroker() + _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) + admin = _make_admin(broker) + try: + with pytest.raises(ValueError): + admin.list_partition_offsets( + {TopicPartition('topic-a', 0): OffsetSpec.LATEST}, + isolation_level='wat', + ) + finally: + admin.close() + + def test_empty_input_is_noop(self): + broker = MockBroker() + admin = _make_admin(broker) + try: + assert admin.list_partition_offsets({}) == {} + finally: + admin.close() + + def test_int_timestamp_accepted_directly(self): + broker = MockBroker() + _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = ListOffsetsRequest.decode( + request_bytes, version=api_version, header=True) + return _list_offsets_response([('topic-a', 0, 0, -1, -1, 0)]) + + broker.respond_fn(ListOffsetsRequest, handler) + + admin = _make_admin(broker) + try: + admin.list_partition_offsets({TopicPartition('topic-a', 0): -2}) + finally: + admin.close() + + assert captured['request'].topics[0].partitions[0].timestamp == -2 + + def test_read_committed_uses_isolation_level_1(self): + broker = MockBroker() + _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = ListOffsetsRequest.decode( + request_bytes, version=api_version, header=True) + return _list_offsets_response([('topic-a', 0, 0, -1, -1, 0)]) + + broker.respond_fn(ListOffsetsRequest, handler) + + admin = _make_admin(broker) + try: + admin.list_partition_offsets( + {TopicPartition('topic-a', 0): OffsetSpec.LATEST}, + isolation_level='read_committed', + ) + finally: + admin.close() + + assert captured['request'].isolation_level == 1 diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 3d787f4e5..742208092 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -8,7 +8,7 @@ ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType, - NewPartitions, NewTopic, + NewPartitions, NewTopic, OffsetSpec ) from kafka.errors import ( BrokerResponseError, NoError, CoordinatorNotAvailableError, @@ -16,7 +16,7 @@ UnknownTopicOrPartitionError, ElectionNotNeededError, KafkaTimeoutError, IncompatibleBrokerVersion ) -from kafka.structs import TopicPartition +from kafka.structs import TopicPartition, OffsetAndTimestamp from test.testutil import env_kafka_version, random_string from test.integration.fixtures import create_topics @@ -552,3 +552,10 @@ def test_describe_topic_partitions_pagination(kafka_admin_client, topic): next_result = kafka_admin_client.describe_topic_partitions( [topic], response_partition_limit=10, cursor=cursor) assert next_result['topics'] + + +def test_list_partition_offsets(kafka_admin_client, topic): + tp = TopicPartition(topic, 0) + result = kafka_admin_client.list_partition_offsets({tp: OffsetSpec.LATEST}) + assert tp in result + assert isinstance(result[tp], OffsetAndTimestamp)