Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from kafka.admin._configs import (
ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType)
from kafka.admin._groups import MemberToRemove
from kafka.admin._partitions import NewPartitions
from kafka.admin._partitions import NewPartitions, OffsetSpec
from kafka.admin._topics import NewTopic
from kafka.admin._users import (
ScramMechanism, UserScramCredentialDeletion, UserScramCredentialUpsertion)
Expand All @@ -14,6 +14,6 @@
'ACL', 'ACLFilter', 'ACLOperation', 'ACLPermissionType', 'ACLResourcePatternType',
'ResourceType', 'ResourcePattern', 'ResourcePatternFilter',
'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType',
'MemberToRemove', # NewTopic + NewPartitions are deprecated and not included in __all__
'MemberToRemove', 'OffsetSpec', # NewTopic + NewPartitions are deprecated and not included in __all__
'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion',
]
110 changes: 104 additions & 6 deletions kafka/admin/_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
ElectionType,
ListPartitionReassignmentsRequest,
)
from kafka.structs import TopicPartition
from kafka.protocol.consumer import ListOffsetsRequest, IsolationLevel, OffsetSpec
from kafka.structs import TopicPartition, OffsetAndTimestamp


if TYPE_CHECKING:
from kafka.net.manager import KafkaConnectionManager
Expand Down Expand Up @@ -94,22 +96,21 @@ async def _async_get_leader_for_partitions(self, partitions):

metadata = await self._get_cluster_metadata(topics)

leader2partitions = defaultdict(list)
leader2partitions = defaultdict(set)
valid_partitions = set()
for topic in metadata.get("topics", ()):
for partition in topic.get("partitions", ()):
t2p = TopicPartition(topic=topic["name"], partition=partition["partition_index"])
if t2p in partitions:
leader2partitions[partition["leader_id"]].append(t2p)
leader2partitions[partition["leader_id"]].add(t2p)
valid_partitions.add(t2p)

if len(partitions) != len(valid_partitions):
unknown = set(partitions) - valid_partitions
if partitions != valid_partitions:
unknown = partitions - valid_partitions
raise UnknownTopicOrPartitionError(
"The following partitions are not known: %s"
% ", ".join(str(x) for x in unknown)
)

return leader2partitions

async def _async_delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None):
Expand Down Expand Up @@ -419,6 +420,103 @@ def describe_topic_partitions(self, topics, response_partition_limit=2000, curso
return self._manager.run(
self._async_describe_topic_partitions, topics, response_partition_limit, cursor)

# -- List partition offsets --------------------------------------------

@staticmethod
def _list_partition_offsets_request(partition_timestamps, isolation_level_int):
min_version = max(ListOffsetsRequest.min_version_for_isolation_level(isolation_level_int), 0)
_Topic = ListOffsetsRequest.ListOffsetsTopic
_Partition = _Topic.ListOffsetsPartition
topic2partitions = defaultdict(list)
for tp, ts in partition_timestamps.items():
if not isinstance(ts, (int, OffsetSpec)):
raise TypeError(f'Unsupported ts type {type(ts)}, expected int or OffsetSpec')
elif int(ts) < 0:
min_version = max(ListOffsetsRequest.min_version_for_timestamp(ts), min_version)
topic2partitions[tp.topic].append(
_Partition(partition_index=tp.partition, timestamp=ts))
return ListOffsetsRequest(
replica_id=-1,
isolation_level=isolation_level_int,
topics=[_Topic(name=name, partitions=parts)
for name, parts in topic2partitions.items()],
min_version=min_version,
)

@staticmethod
def _list_partition_offsets_process_response(response):
results = {}
for topic in response.topics:
for partition in topic.partitions:
tp = TopicPartition(topic.name, partition.partition_index)
err = Errors.for_code(partition.error_code)
if err is not Errors.NoError:
raise err(
"ListOffsetsRequest failed for %s: %s" % (tp, err.__name__))
leader_epoch = getattr(partition, 'leader_epoch', -1)
results[tp] = OffsetAndTimestamp(
offset=partition.offset,
timestamp=partition.timestamp,
leader_epoch=leader_epoch if leader_epoch >= 0 else None,
)
return results

async def _async_list_partition_offsets(self, topic_partition_specs, isolation_level='read_uncommitted'):
if isinstance(isolation_level, str):
try:
isolation_level = IsolationLevel[isolation_level.upper()]
except KeyError:
raise ValueError(f'Unrecognized isolation_level: {isolation_level}')
elif isinstance(isolation_level, int):
isolation_level = IsolationLevel(isolation_level)

results = {}
topic_partitions = set(topic_partition_specs.keys())
while topic_partitions:
leader2partitions = await self._async_get_leader_for_partitions(topic_partitions)

for leader, partitions in leader2partitions.items():
request = self._list_partition_offsets_request(
{tp: spec for tp, spec in topic_partition_specs.items() if tp in partitions},
isolation_level.value)
try:
response = await self._manager.send(request, node_id=leader)
results.update(self._list_partition_offsets_process_response(response))
topic_partitions -= partitions
except Errors.NotLeaderForPartitionError:
continue
return results

def list_partition_offsets(self, topic_partitions, isolation_level='read_uncommitted'):
"""Look up offsets for the given partitions by spec.

Partitions are routed to their respective leader brokers via cluster
metadata; one ``ListOffsetsRequest`` is sent per leader.

Arguments:
topic_partitions: dict mapping :class:`~kafka.TopicPartition` to
:class:`OffsetSpec` (or a raw integer timestamp /
wire-level sentinel).

Keyword Arguments:
isolation_level (str, optional): One of ``'read_uncommitted'``
(default) or ``'read_committed'``. ``read_committed`` requires
broker support for ListOffsets v2+.

Returns:
dict: A dict mapping :class:`~kafka.TopicPartition` to
:class:`~kafka.structs.OffsetAndTimestamp`

Raises:
KafkaError: If any partition response carries an error code.
UnknownTopicOrPartitionError: If a requested partition is not
known to the cluster.
UnsupportedVersionError: If the broker does not support a version
of ListOffsetsRequest compatible with the requested specs.
"""
return self._manager.run(
self._async_list_partition_offsets, topic_partitions, isolation_level)


class NewPartitions:
"""DEPRECATED: A class for new partition creation on existing topics.
Expand Down
3 changes: 0 additions & 3 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ def run_cli(args=None):
# list (ListTransactions - not supported yet)
# abort (not supported yet)

# [topics]
# list-offsets (not supported yet)

# [cluster]
# describe-features (DescribeFeatures - not supported yet)
# update-features (UpdateFeatures - not supported yet)
Expand Down
17 changes: 16 additions & 1 deletion kafka/cli/admin/groups/list_offsets.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
from kafka.structs import TopicPartition
from kafka.admin import OffsetSpec


class ListGroupOffsets:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('list-offsets', help='List Offsets for Group')
parser.add_argument('-g', '--group-id', type=str, required=True)
parser.set_defaults(command=lambda cli, args: cli.list_group_offsets(args.group_id))
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
offsets = client.list_group_offsets(args.group_id)
partitions = [TopicPartition(topic, partition) for topic in offsets for partition in offsets[topic]]
latest = client.list_partition_offsets({tp: OffsetSpec.LATEST for tp in partitions})
for tp in latest:
part_res = offsets[tp.topic][tp.partition]
part_res['latest_offset'] = latest[tp].offset
part_res['lag'] = latest[tp].offset - part_res['committed_offset']
return offsets
2 changes: 2 additions & 0 deletions kafka/cli/admin/partitions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .delete_records import DeleteRecords
from .describe import DescribeTopicPartitions
from .elect_leaders import ElectLeaders
from .list_offsets import ListPartitionOffsets
from .list_reassignments import ListPartitionReassignments


Expand All @@ -17,6 +18,7 @@ def add_subparser(cls, subparsers):
for cmd in [
CreatePartitions,
DescribeTopicPartitions,
ListPartitionOffsets,
ListPartitionReassignments,
AlterPartitionReassignments,
DeleteRecords,
Expand Down
100 changes: 100 additions & 0 deletions kafka/cli/admin/partitions/list_offsets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from collections import defaultdict

from kafka.protocol.consumer import OffsetSpec
from kafka.structs import TopicPartition


class ListPartitionOffsets:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser(
'list-offsets',
help='List offsets for partitions by spec (earliest/latest/timestamp)')
parser.add_argument(
'-t', '--topic', type=str)
parser.add_argument(
'-s', '--spec', type=str,
help='Spec may be one of earliest, latest, max-timestamp, earliest-local, '
'latest-tiered, or a millisecond timestamp.')
parser.add_argument(
'-p', '--partition', type=str, action='append',
dest='partitions', default=[],
help='TOPIC:PARTITION:SPEC triple (repeatable). PARTITION may be a '
'single partition, a closed range (0-2), an open range (1-), or '
'a single wildcard "*" for all partitions. SPEC may be one of '
'earliest, latest, max-timestamp, earliest-local, latest-tiered, '
'or a millisecond timestamp.')
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
tp_offsets = cls._parse_partition_specs(client, args)
output = defaultdict(dict)
result = client.list_partition_offsets(tp_offsets)
for tp, info in result.items():
output[tp.topic][tp.partition] = {
'offset': info.offset,
'timestamp': info.timestamp,
'leader_epoch': info.leader_epoch,
'spec': tp_offsets[tp]
}
return dict(output)

@staticmethod
def _parse_spec(spec):
try:
return int(spec)
except ValueError:
pass
try:
spec_key = spec.upper().replace('-', '_')
return OffsetSpec[spec_key]
except KeyError:
raise ValueError(f'{spec_key} is not a valid OffsetSpec')

@classmethod
def _parse_partition_specs(cls, client, args):
if args.partitions:
assert not args.topic and not args.spec, "Either --partition or (--topic and --spec) is supported, but not both."
partitions = args.partitions
else:
assert args.topic and args.spec, "Both --topic and --spec must be provided."
partitions = [f'{args.topic}:*:{args.spec}']
tp_offsets = {}
for entry in partitions:
topic, partition, spec_str = entry.rsplit(':', 2)
spec = cls._parse_spec(spec_str)
for tp in cls._parse_tp(client, topic, partition):
if tp in tp_offsets:
# Passing multiple specs for a single partition results in an InvalidRequestError
raise ValueError('Only one spec allowed per partition')
tp_offsets[tp] = spec
return tp_offsets

@classmethod
def _parse_tp(cls, client, topic, partition, cache={}):
try:
return [TopicPartition(topic, int(partition))]
except ValueError:
pass
if not partition == '*' and '-' not in partition:
raise ValueError(f'Unrecognized partition: {partition}')

if topic not in cache:
cache[topic] = sorted([p['partition_index'] for p in client.describe_topics([topic])[0]['partitions']])

if partition == '*':
return [TopicPartition(topic, p)
for p in cache[topic]]

elif '-' in partition:
start, end = partition.split('-')
if not start and not end:
raise ValueError(f'Unrecognized partition: {partition}')
if not start:
start = cache[topic][0]
if not end:
end = cache[topic][-1]
return [TopicPartition(topic, p)
for p in range(int(start), int(end) + 1)]
9 changes: 8 additions & 1 deletion kafka/protocol/consumer/fetch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from ..api_message import ApiMessage


class FetchRequest(ApiMessage): pass
class FetchRequest(ApiMessage):
@classmethod
def min_version_for_isolation_level(cls, il):
if int(il) > 0:
return 4
else:
return 0

class FetchResponse(ApiMessage): pass

__all__ = [
Expand Down
2 changes: 2 additions & 0 deletions kafka/protocol/consumer/fetch.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ class FetchRequest(ApiMessage):
def is_request(cls) -> bool: ...
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...
@classmethod
def min_version_for_isolation_level(cls, il: Any) -> Any: ...

class FetchResponse(ApiMessage):
class FetchableTopicResponse(DataContainer):
Expand Down
41 changes: 39 additions & 2 deletions kafka/protocol/consumer/offsets.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from enum import IntEnum

from ..api_message import ApiMessage


Expand All @@ -8,15 +10,50 @@ class OffsetResetStrategy:
EARLIEST = -2
NONE = 0

class ListOffsetsRequest(ApiMessage): pass

class IsolationLevel(IntEnum):
READ_UNCOMMITTED = 0
READ_COMMITTED = 1


class OffsetSpec(IntEnum):
# Any >= 0: # earliest offset whose timestamp is greater than or equal to the given timestamp and the timestamp of that record.
LATEST = -1 # offset of the next message that will be appended to the log and a timestamp of -1
EARLIEST = -2 # first offset on the partition, including remote-storage, and a timestamp of -1
MAX_TIMESTAMP = -3 # offset and timestamp corresponding to the record with the highest timestamp on the partition. (KIP-734)
EARLIEST_LOCAL = -4 # first offset on the local partition of the leader broker, excluding remote-storage, and a timestamp of -1 (KIP-405)
LATEST_TIERED = -5 # the latest offset of the partition in remote storage (KIP-1005)


class ListOffsetsRequest(ApiMessage):
@classmethod
def min_version_for_timestamp(cls, ts):
ts = OffsetSpec(ts)
if ts == OffsetSpec.MAX_TIMESTAMP:
return 7
elif ts == OffsetSpec.EARLIEST_LOCAL:
return 8
elif ts == OffsetSpec.LATEST_TIERED:
return 9
else:
return 0

@classmethod
def min_version_for_isolation_level(cls, il):
if int(il) > 0:
return 2
else:
return 0

class ListOffsetsResponse(ApiMessage): pass


class OffsetForLeaderEpochRequest(ApiMessage): pass
class OffsetForLeaderEpochResponse(ApiMessage): pass


__all__ = [
'UNKNOWN_OFFSET', 'OffsetResetStrategy',
'UNKNOWN_OFFSET', 'OffsetResetStrategy', 'IsolationLevel', 'OffsetSpec',
'ListOffsetsRequest', 'ListOffsetsResponse',
'OffsetForLeaderEpochRequest', 'OffsetForLeaderEpochResponse',
]
Loading
Loading