From 81b0bf03a75c079d28dc5c4305b16f6bca527ee2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 09:14:46 -0700 Subject: [PATCH 01/15] Consumer: use new proto attrs in Fetcher (ListOffsets/Fetch) --- kafka/consumer/fetcher.py | 274 +++++++++++++++------------------- test/consumer/test_fetcher.py | 154 ++++++++++--------- 2 files changed, 206 insertions(+), 222 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index f00482554..fa563922d 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -10,7 +10,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.consumer import FetchRequest from kafka.protocol.consumer import ( - ListOffsetsRequest, OffsetResetStrategy, UNKNOWN_OFFSET + ListOffsetsRequest, OffsetSpec, UNKNOWN_OFFSET, IsolationLevel ) from kafka.record import MemoryRecords from kafka.serializer import Deserializer @@ -20,15 +20,6 @@ log = logging.getLogger(__name__) -# Isolation levels -READ_UNCOMMITTED = 0 -READ_COMMITTED = 1 - -ISOLATION_LEVEL_CONFIG = { - 'read_uncommitted': READ_UNCOMMITTED, - 'read_committed': READ_COMMITTED, -} - ConsumerRecord = collections.namedtuple("ConsumerRecord", ["topic", "partition", "leader_epoch", "offset", "timestamp", "timestamp_type", "key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"]) @@ -43,6 +34,13 @@ ["partition", "fetched_offset", "exception"]) +_FetchTopic = FetchRequest.FetchTopic +_FetchPartition = _FetchTopic.FetchPartition +_ForgottenTopic = FetchRequest.ForgottenTopic +_ListOffsetsTopic = ListOffsetsRequest.ListOffsetsTopic +_ListOffsetsPartition = _ListOffsetsTopic.ListOffsetsPartition + + class NoOffsetForPartitionError(Errors.KafkaError): pass @@ -116,8 +114,10 @@ def __init__(self, client, subscriptions, **configs): if key in configs: self.config[key] = configs[key] - if self.config['isolation_level'] not in ISOLATION_LEVEL_CONFIG: - raise Errors.KafkaConfigurationError('Unrecognized isolation_level') + try: + self._isolation_level = IsolationLevel.build_from(self.config['isolation_level']) + except ValueError: + raise Errors.KafkaConfigurationError('Unrecognized isolation_level') from None self._client = client self._manager = client._manager @@ -130,12 +130,17 @@ def __init__(self, client, subscriptions, **configs): self._sensors = FetchManagerMetrics(self.config['metrics'], self.config['metric_group_prefix']) else: self._sensors = None - self._isolation_level = ISOLATION_LEVEL_CONFIG[self.config['isolation_level']] self._session_handlers = {} self._nodes_with_pending_fetch_requests = set() self._cached_list_offsets_exception = None self._next_in_line_exception_metadata = None + @property + def _enable_incremental_fetch_sessions(self): + if self._manager.broker_version is None or self._manager.broker_version < (1, 1): + return False + return self.config['enable_incremental_fetch_sessions'] + def send_fetches(self): """Send FetchRequests for all assigned partitions that do not already have an in-flight fetch or pending fetch data. @@ -271,11 +276,11 @@ async def _fetch_offsets_by_times_async(self, timestamps, timeout_ms=None): def beginning_offsets(self, partitions, timeout_ms): return self.beginning_or_end_offset( - partitions, OffsetResetStrategy.EARLIEST, timeout_ms) + partitions, OffsetSpec.EARLIEST, timeout_ms) def end_offsets(self, partitions, timeout_ms): return self.beginning_or_end_offset( - partitions, OffsetResetStrategy.LATEST, timeout_ms) + partitions, OffsetSpec.LATEST, timeout_ms) def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): timestamps = dict([(tp, timestamp) for tp in partitions]) @@ -476,43 +481,37 @@ async def _send_list_offsets_requests(self, timestamps): def _group_list_offset_requests(self, timestamps): timestamps_by_node = collections.defaultdict(dict) for partition, timestamp in timestamps.items(): - node_id = self._client.cluster.leader_for_partition(partition) + node_id = self._manager.cluster.leader_for_partition(partition) if node_id is None: - self._client.cluster.add_topic(partition.topic) + self._manager.cluster.add_topic(partition.topic) log.debug("Partition %s is unknown for fetching offset", partition) - self._client.cluster.request_update() + self._manager.cluster.request_update() elif node_id == -1: log.debug("Leader for partition %s unavailable for fetching " "offset, wait for metadata refresh", partition) - self._client.cluster.request_update() + self._manager.cluster.request_update() else: leader_epoch = -1 timestamps_by_node[node_id][partition] = (timestamp, leader_epoch) return dict(timestamps_by_node) async def _send_list_offsets_request(self, node_id, timestamps_and_epochs): - version = self._client.api_version(ListOffsetsRequest, max_version=5) - if self.config['isolation_level'] == 'read_committed' and version < 2: - raise Errors.UnsupportedVersionError('read_committed isolation level requires ListOffsetsRequest >= v2') + max_version = 6 # TODO: support 7-10 via OffsetSpec + min_version = ListOffsetsRequest.min_version_for_isolation_level(self._isolation_level) by_topic = collections.defaultdict(list) for tp, (timestamp, leader_epoch) in timestamps_and_epochs.items(): - if version >= 4: - data = (tp.partition, leader_epoch, timestamp) - elif version >= 1: - data = (tp.partition, timestamp) - else: - data = (tp.partition, timestamp, 1) + data = _ListOffsetsPartition( + partition_index=tp.partition, + current_leader_epoch=leader_epoch, + timestamp=timestamp) by_topic[tp.topic].append(data) - if version >= 2: - request = ListOffsetsRequest[version]( - -1, - self._isolation_level, - list(by_topic.items())) - else: - request = ListOffsetsRequest[version]( - -1, - list(by_topic.items())) + request = ListOffsetsRequest( + isolation_level=self._isolation_level, + topics=list(by_topic.items()), + min_version=min_version, + max_version=max_version, + ) log.debug("Sending ListOffsetRequest %s to broker %s", request, node_id) response = await self._manager.send(request, node_id=node_id) @@ -530,57 +529,53 @@ def _handle_list_offsets_response(self, response): fetched_offsets = dict() partitions_to_retry = set() unauthorized_topics = set() - for topic, part_data in response.topics: - for partition_info in part_data: - partition, error_code = partition_info[:2] - partition = TopicPartition(topic, partition) + for topic_data in response.topics: + for partition_info in topic_data.partitions: + tp = TopicPartition(topic_data.name, partition_info.partition_index) + error_code = partition_info.error_code error_type = Errors.for_code(error_code) if error_type is Errors.NoError: if response.API_VERSION == 0: - offsets = partition_info[2] + offsets = partition_info.old_style_offsets assert len(offsets) <= 1, 'Expected ListOffsetsResponse with one offset' - if not offsets: - offset = UNKNOWN_OFFSET - else: - offset = offsets[0] - timestamp = None - leader_epoch = -1 - elif response.API_VERSION <= 3: - timestamp, offset = partition_info[2:] - leader_epoch = -1 + offset = offsets[0] if offsets else UNKNOWN_OFFSET else: - timestamp, offset, leader_epoch = partition_info[2:] + offset = partition_info.offset + timestamp = partition_info.timestamp + if timestamp == -1: + timestamp = None + leader_epoch = partition_info.leader_epoch log.debug("Handling ListOffsetsResponse response for %s. " "Fetched offset %s, timestamp %s, leader_epoch %s", - partition, offset, timestamp, leader_epoch) + tp, offset, timestamp, leader_epoch) if offset != UNKNOWN_OFFSET: - fetched_offsets[partition] = OffsetAndTimestamp(offset, timestamp, leader_epoch) + fetched_offsets[tp] = OffsetAndTimestamp(offset, timestamp, leader_epoch) elif error_type is Errors.UnsupportedForMessageFormatError: # The message format on the broker side is before 0.10.0, which means it does not # support timestamps. We treat this case the same as if we weren't able to find an # offset corresponding to the requested timestamp and leave it out of the result. log.debug("Cannot search by timestamp for partition %s because the" - " message format version is before 0.10.0", partition) + " message format version is before 0.10.0", tp) elif error_type in (Errors.NotLeaderForPartitionError, Errors.ReplicaNotAvailableError, Errors.KafkaStorageError, Errors.OffsetNotAvailableError, Errors.LeaderNotAvailableError): log.debug("Attempt to fetch offsets for partition %s failed due" - " to %s, retrying.", error_type.__name__, partition) - partitions_to_retry.add(partition) + " to %s, retrying.", error_type.__name__, tp) + partitions_to_retry.add(tp) elif error_type is Errors.UnknownTopicOrPartitionError: log.warning("Received unknown topic or partition error in ListOffsets " "request for partition %s. The topic/partition " + "may not exist or the user may not have Describe access " - "to it.", partition) - partitions_to_retry.add(partition) + "to it.", tp) + partitions_to_retry.add(tp) elif error_type is Errors.TopicAuthorizationFailedError: - unauthorized_topics.add(topic) + unauthorized_topics.add(tp.topic) else: log.warning("Attempt to fetch offsets for partition %s failed due to:" - " %s", partition, error_type.__name__) - partitions_to_retry.add(partition) + " %s", tp, error_type.__name__) + partitions_to_retry.add(tp) if unauthorized_topics: raise Errors.TopicAuthorizationFailedError(unauthorized_topics) return fetched_offsets, partitions_to_retry @@ -603,11 +598,18 @@ def _create_fetch_requests(self): Returns: dict: {node_id: (FetchRequest, {TopicPartition: fetch_offset}), ...} (version depends on client api_versions) """ + # TODO: + # v12 epoch detection / validation + # v13 topic ids (KIP-516) + # v14 tiered storage (KIP-405) + # v15 replica state (KIP-903) + # v16 node endpoints (KIP-951) + # v17 directory id (KIP-853) + max_version = 10 + # create the fetch info as a dict of lists of partition info tuples # which can be passed to FetchRequest() via .items() - version = self._client.api_version(FetchRequest, max_version=10) fetchable = collections.defaultdict(collections.OrderedDict) - for partition in self._fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) @@ -643,35 +645,19 @@ def _create_fetch_requests(self): else: # Leader is connected and does not have a pending fetch request - if version < 5: - partition_info = ( - partition.partition, - position.offset, - self.config['max_partition_fetch_bytes'] - ) - elif version <= 8: - partition_info = ( - partition.partition, - position.offset, - -1, # log_start_offset is used internally by brokers / replicas only - self.config['max_partition_fetch_bytes'], - ) - else: - partition_info = ( - partition.partition, - position.leader_epoch, - position.offset, - -1, # log_start_offset is used internally by brokers / replicas only - self.config['max_partition_fetch_bytes'], - ) - + partition_info = _FetchPartition( + partition=partition.partition, + current_leader_epoch=position.leader_epoch, + fetch_offset=position.offset, + partition_max_bytes=self.config['max_partition_fetch_bytes'] + ) fetchable[node_id][partition] = partition_info log.debug("Adding fetch request for partition %s at offset %d", partition, position.offset) requests = {} for node_id, next_partitions in fetchable.items(): - if version >= 7 and self.config['enable_incremental_fetch_sessions']: + if self._enable_incremental_fetch_sessions: if node_id not in self._session_handlers: self._session_handlers[node_id] = FetchSessionHandler(node_id) session = self._session_handlers[node_id].build_next(next_partitions) @@ -679,77 +665,57 @@ def _create_fetch_requests(self): # No incremental fetch support session = FetchRequestData(next_partitions, None, FetchMetadata.LEGACY) - if version <= 2: - request = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - session.to_send) - elif version == 3: - request = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - self.config['fetch_max_bytes'], - session.to_send) - elif version <= 6: - request = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - self.config['fetch_max_bytes'], - self._isolation_level, - session.to_send) - else: - # Through v8 - request = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - self.config['fetch_max_bytes'], - self._isolation_level, - session.id, - session.epoch, - session.to_send, - session.to_forget) - - fetch_offsets = {} - for tp, partition_data in next_partitions.items(): - if version <= 8: - offset = partition_data[1] - else: - offset = partition_data[2] - fetch_offsets[tp] = offset + min_version = FetchRequest.min_version_for_isolation_level(self._isolation_level) + request = FetchRequest( + max_wait_ms=self.config['fetch_max_wait_ms'], + min_bytes=self.config['fetch_min_bytes'], + max_bytes=self.config['fetch_max_bytes'], + isolation_level=self._isolation_level, + session_id=session.id, + session_epoch=session.epoch, + topics=session.to_send, + forgotten_topics_data=session.to_forget, + min_version=min_version, + max_version=max_version, + ) + fetch_offsets = {tp: next_partitions[tp].fetch_offset for tp in next_partitions} requests[node_id] = (request, fetch_offsets) return requests def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): """The callback for fetch completion""" - if response.API_VERSION >= 7 and self.config['enable_incremental_fetch_sessions']: + if response.API_VERSION >= 7 and self_enable_incremental_fetch_sessions: if node_id not in self._session_handlers: log.error("Unable to find fetch session handler for node %s. Ignoring fetch response", node_id) return if not self._session_handlers[node_id].handle_response(response): return - partitions = set([TopicPartition(topic, partition_data[0]) - for topic, partitions in response.responses - for partition_data in partitions]) + partitions = set([ + TopicPartition( + topic_data.topic, + partition_data.partition_index) + for topic_data in response.responses + for partition_data in topic_data.partitions + ]) if self._sensors: metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions) else: metric_aggregator = None - for topic, partitions in response.responses: - for partition_data in partitions: - tp = TopicPartition(topic, partition_data[0]) + for topic_data in response.responses: + for partition_data in topic_data.partitions: + tp = TopicPartition( + topic_data.topic, + partition_data.partition_index + ) fetch_offset = fetch_offsets[tp] completed_fetch = CompletedFetch( tp, fetch_offset, response.API_VERSION, - partition_data[1:], + partition_data, metric_aggregator ) self._completed_fetches.append(completed_fetch) @@ -772,7 +738,8 @@ def _clear_pending_fetch_request(self, node_id, _): def _parse_fetched_data(self, completed_fetch): tp = completed_fetch.topic_partition fetch_offset = completed_fetch.fetched_offset - error_code, highwater = completed_fetch.partition_data[:2] + error_code = completed_fetch.partition_data.error_code + highwater = completed_fetch.partition_data.high_watermark error_type = Errors.for_code(error_code) parsed_records = None @@ -796,12 +763,8 @@ def _parse_fetched_data(self, completed_fetch): position.offset) return None - records = MemoryRecords(completed_fetch.partition_data[-1]) - aborted_transactions = None - if completed_fetch.response_version >= 11: - aborted_transactions = completed_fetch.partition_data[-3] - elif completed_fetch.response_version >= 4: - aborted_transactions = completed_fetch.partition_data[-2] + records = MemoryRecords(completed_fetch.partition_data.records) + aborted_transactions = completed_fetch.partition_data.aborted_transactions log.debug("Preparing to read %s bytes of data for partition %s with offset %d", records.size_in_bytes(), tp, fetch_offset) parsed_records = self.PartitionRecords(fetch_offset, tp, records, @@ -887,7 +850,8 @@ def close(self): class PartitionRecords: def __init__(self, fetch_offset, tp, records, key_deserializer=None, value_deserializer=None, - check_crcs=True, isolation_level=READ_UNCOMMITTED, + check_crcs=True, + isolation_level=IsolationLevel.READ_UNCOMMITTED, aborted_transactions=None, # AbortedTransaction data from FetchResponse metric_aggregator=None, on_drain=lambda x: None): self.fetch_offset = fetch_offset @@ -968,7 +932,7 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer): # base_offset, last_offset_delta, aborted transactions, and control batches if batch.magic == 2: self.leader_epoch = batch.leader_epoch - if self.isolation_level == READ_COMMITTED and batch.has_producer_id(): + if self.isolation_level == IsolationLevel.READ_COMMITTED and batch.has_producer_id(): # remove from the aborted transaction queue all aborted transactions which have begun # before the current batch's last offset and add the associated producerIds to the # aborted producer set @@ -1182,9 +1146,9 @@ def handle_error(self, _exception): self.next_metadata = self.next_metadata.next_close_existing() def _response_partitions(self, response): - return {TopicPartition(topic, partition_data[0]) - for topic, partitions in response.responses - for partition_data in partitions} + return {TopicPartition(topic_data.topic, partition_data.partition_index) + for topic_data in response.responses + for partition_data in topic_data.partitions} class FetchMetadata: @@ -1249,21 +1213,27 @@ def epoch(self): @property def to_send(self): - # Return as list of [(topic, [(partition, ...), ...]), ...] + # Return as list of _FetchTopic data objects # so it can be passed directly to encoder partition_data = collections.defaultdict(list) for tp, partition_info in self._to_send.items(): partition_data[tp.topic].append(partition_info) - return list(partition_data.items()) + return [ + _FetchTopic(topic=tp.topic, partitions=partitions) + for topic, partitions in partition_data.items() + ] @property def to_forget(self): - # Return as list of [(topic, (partiiton, ...)), ...] + # Return as list of _ForgottenTopic data objects # so it an be passed directly to encoder partition_data = collections.defaultdict(list) for tp in self._to_forget: partition_data[tp.topic].append(tp.partition) - return list(partition_data.items()) + return [ + _ForgottenTopic(topic=tp.topic, partitions=partitions) + for topic, partitions in partition_data.items() + ] class FetchMetrics: diff --git a/test/consumer/test_fetcher.py b/test/consumer/test_fetcher.py index f5c1aa450..ebf84823f 100644 --- a/test/consumer/test_fetcher.py +++ b/test/consumer/test_fetcher.py @@ -6,6 +6,7 @@ from collections import OrderedDict import itertools import time +from unittest.mock import MagicMock from kafka.consumer.fetcher import ( CompletedFetch, ConsumerRecord, Fetcher @@ -27,6 +28,12 @@ from kafka.structs import OffsetAndMetadata, OffsetAndTimestamp, TopicPartition +_ResponseTopic = FetchResponse.FetchableTopicResponse +_ResponsePartition = _ResponseTopic.PartitionData +_ListResponseTopic = ListOffsetsResponse.ListOffsetsTopicResponse +_ListResponsePartition = _ListResponseTopic.ListOffsetsPartitionResponse + + @pytest.fixture def subscription_state(): return SubscriptionState() @@ -38,9 +45,13 @@ def topic(): @pytest.fixture -def fetcher(client, metrics, subscription_state, topic): +def assignment(topic): + return [TopicPartition(topic, i) for i in range(3)] + + +@pytest.fixture +def fetcher(client, metrics, subscription_state, topic, assignment): subscription_state.subscribe(topics=[topic]) - assignment = [TopicPartition(topic, i) for i in range(3)] subscription_state.assign_from_subscribed(assignment) for tp in assignment: subscription_state.seek(tp, 0) @@ -57,6 +68,21 @@ def _build_record_batch(msgs, compression=0, offset=0, magic=2): return builder.buffer() +def _build_completed_fetch(tp, msgs, error=None, offset=0): + if error is not None: + partition_data = _ResponsePartition( + error_code=error.errno, + high_watermark=-1, + records=None) + else: + partition_data = _ResponsePartition( + error_code=0, + high_watermark=100, + records=_build_record_batch(msgs, offset=offset)) + return CompletedFetch( + tp, offset, 0, partition_data, MagicMock()) + + def test_send_fetches(fetcher, topic, mocker): fetch_requests = [ FetchRequest[0]( @@ -94,20 +120,21 @@ def build_fetch_offsets(request): assert len(ret) == len(fetch_requests) -@pytest.mark.parametrize(("api_version", "fetch_version"), [ - ((0, 10, 1), 3), - ((0, 10, 0), 2), - ((0, 9), 1), - ((0, 8, 2), 0) -]) -def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): - fetcher._client._manager.broker_version_data = BrokerVersionData(api_version) +def test_create_fetch_requests(fetcher, mocker, assignment): mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) mocker.patch.object(fetcher._client, "ready", return_value=True) by_node = fetcher._create_fetch_requests() - requests_and_offsets = by_node.values() - assert set([r.API_VERSION for (r, _offsets) in requests_and_offsets]) == set([fetch_version]) + assert len(by_node) == 1 + assert 0 in by_node + request, offsets = by_node[0] + assert isinstance(request, FetchRequest) + requested = set([ + TopicPartition(topic.topic, partition.partition) + for topic in request.topics + for partition in topic.partitions + ]) + assert requested == set(assignment) def test_reset_offsets_if_needed(fetcher, topic, mocker): @@ -299,32 +326,32 @@ def wait_for_send_futures(n): def test__handle_list_offsets_response_v1(fetcher, mocker): # Broker returns UnsupportedForMessageFormatError, will omit partition res = ListOffsetsResponse[1]([ - ("topic", [(0, 43, -1, -1)]), - ("topic", [(1, 0, 1000, 9999)]) + _ListResponseTopic("topic", [_ListResponsePartition(0, 43, -1, -1, version=1)], version=1), + _ListResponseTopic("topic", [_ListResponsePartition(1, 0, 1000, 9999, version=1)], version=1) ]) assert fetcher._handle_list_offsets_response(res) == ( {TopicPartition("topic", 1): OffsetAndTimestamp(9999, 1000, -1)}, set()) # Broker returns NotLeaderForPartitionError res = ListOffsetsResponse[1]([ - ("topic", [(0, 6, -1, -1)]), + _ListResponseTopic("topic", [_ListResponsePartition(0, 6, -1, -1, version=1)], version=1), ]) assert fetcher._handle_list_offsets_response(res) == ( {}, set([TopicPartition("topic", 0)])) # Broker returns UnknownTopicOrPartitionError res = ListOffsetsResponse[1]([ - ("topic", [(0, 3, -1, -1)]), + _ListResponseTopic("topic", [_ListResponsePartition(0, 3, -1, -1, version=1)], version=1), ]) assert fetcher._handle_list_offsets_response(res) == ( {}, set([TopicPartition("topic", 0)])) # Broker returns many errors and 1 result res = ListOffsetsResponse[1]([ - ("topic", [(0, 43, -1, -1)]), # not retriable - ("topic", [(1, 6, -1, -1)]), # retriable - ("topic", [(2, 3, -1, -1)]), # retriable - ("topic", [(3, 0, 1000, 9999)]) + _ListResponseTopic("topic", [_ListResponsePartition(0, 43, -1, -1, version=1)], version=1), # not retriable + _ListResponseTopic("topic", [_ListResponsePartition(1, 6, -1, -1, version=1)], version=1), # retriable + _ListResponseTopic("topic", [_ListResponsePartition(2, 3, -1, -1, version=1)], version=1), # retriable + _ListResponseTopic("topic", [_ListResponsePartition(3, 0, 1000, 9999, version=1)], version=1) ]) assert fetcher._handle_list_offsets_response(res) == ( {TopicPartition("topic", 3): OffsetAndTimestamp(9999, 1000, -1)}, @@ -335,7 +362,7 @@ def test__handle_list_offsets_response_v2_v3(fetcher, mocker): # including a throttle_time shouldnt cause issues res = ListOffsetsResponse[2]( 123, # throttle_time_ms - [("topic", [(0, 0, 1000, 9999)]) + [_ListResponseTopic("topic", [_ListResponsePartition(0, 0, 1000, 9999, version=2)], version=2) ]) assert fetcher._handle_list_offsets_response(res) == ( {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)}, set()) @@ -343,7 +370,7 @@ def test__handle_list_offsets_response_v2_v3(fetcher, mocker): # v3 response is the same format res = ListOffsetsResponse[3]( 123, # throttle_time_ms - [("topic", [(0, 0, 1000, 9999)]) + [_ListResponseTopic("topic", [_ListResponsePartition(0, 0, 1000, 9999, version=3)], version=3) ]) assert fetcher._handle_list_offsets_response(res) == ( {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)}, set()) @@ -353,7 +380,7 @@ def test__handle_list_offsets_response_v4_v5(fetcher, mocker): # includes leader_epoch res = ListOffsetsResponse[4]( 123, # throttle_time_ms - [("topic", [(0, 0, 1000, 9999, 1234)]) + [_ListResponseTopic("topic", [_ListResponsePartition(0, 0, 1000, 9999, 1234, version=4)], version=4) ]) assert fetcher._handle_list_offsets_response(res) == ( {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)}, set()) @@ -361,7 +388,7 @@ def test__handle_list_offsets_response_v4_v5(fetcher, mocker): # v5 response is the same format res = ListOffsetsResponse[5]( 123, # throttle_time_ms - [("topic", [(0, 0, 1000, 9999, 1234)]) + [_ListResponseTopic("topic", [_ListResponsePartition(0, 0, 1000, 9999, 1234, version=5)], version=5) ]) assert fetcher._handle_list_offsets_response(res) == ( {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)}, set()) @@ -374,10 +401,7 @@ def test_fetched_records(fetcher, topic, mocker): msgs = [] for i in range(10): msgs.append((None, b"foo", None)) - completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, _build_record_batch(msgs)], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, msgs) fetcher._completed_fetches.append(completed_fetch) records, partial = fetcher.fetched_records() assert tp in records @@ -390,42 +414,50 @@ def test_fetched_records(fetcher, topic, mocker): ( {TopicPartition('foo', 0): 0}, FetchResponse[0]( - [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + [_ResponseTopic("foo", [ + _ResponsePartition(0, 0, 1000, _build_record_batch([(None, b'xxx', None)]), version=0)], version=0)]), 1, ), ( {TopicPartition('foo', 0): 0, TopicPartition('foo', 1): 0}, FetchResponse[1]( 0, - [("foo", [ - (0, 0, 1000, [(0, b'xxx'),]), - (1, 0, 1000, [(0, b'xxx'),]), - ]),]), + [_ResponseTopic("foo", [ + _ResponsePartition(0, 0, 1000, _build_record_batch([(None, b'xxx', None)]), version=1), + _ResponsePartition(1, 0, 1000, _build_record_batch([(None, b'xxx', None)]), version=1)], version=1)]), 2, ), ( {TopicPartition('foo', 0): 0}, FetchResponse[2]( - 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + 0, + [_ResponseTopic("foo", [ + _ResponsePartition(0, 0, 1000, _build_record_batch([(None, b'xxx', None)]), version=2)], version=2)]), 1, ), ( {TopicPartition('foo', 0): 0}, FetchResponse[3]( - 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + 0, + [_ResponseTopic("foo", [ + _ResponsePartition(0, 0, 1000, _build_record_batch([(None, b'xxx', None)]), version=3)], version=3)]), 1, ), ( {TopicPartition('foo', 0): 0}, FetchResponse[4]( - 0, [("foo", [(0, 0, 1000, 0, [], [(0, b'xxx'),])]),]), + 0, + [_ResponseTopic("foo", [ + _ResponsePartition(0, 0, 1000, 0, [], _build_record_batch([(None, b'xxx', None)]), version=4)], version=4)]), 1, ), ( # This may only be used in broker-broker api calls {TopicPartition('foo', 0): 0}, FetchResponse[5]( - 0, [("foo", [(0, 0, 1000, 0, 0, [], [(0, b'xxx'),])]),]), + 0, + [_ResponseTopic("foo", [ + _ResponsePartition(0, 0, 1000, 0, 0, [], _build_record_batch([(None, b'xxx', None)]), version=5)], version=5)]), 1, ), ]) @@ -495,10 +527,7 @@ def test__parse_fetched_data(fetcher, topic, mocker): msgs = [] for i in range(10): msgs.append((None, b"foo", None)) - completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, _build_record_batch(msgs)], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, msgs) partition_record = fetcher._parse_fetched_data(completed_fetch) assert isinstance(partition_record, fetcher.PartitionRecords) assert partition_record @@ -511,10 +540,7 @@ def test__parse_fetched_data__paused(fetcher, topic, mocker): msgs = [] for i in range(10): msgs.append((None, b"foo", None)) - completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, _build_record_batch(msgs)], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, msgs) fetcher._subscriptions.pause(tp) partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None @@ -526,10 +552,8 @@ def test__parse_fetched_data__stale_offset(fetcher, topic, mocker): msgs = [] for i in range(10): msgs.append((None, b"foo", None)) - completed_fetch = CompletedFetch( - tp, 10, 0, [0, 100, _build_record_batch(msgs)], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, msgs) + completed_fetch = completed_fetch._replace(fetched_offset=10) partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None @@ -537,10 +561,7 @@ def test__parse_fetched_data__stale_offset(fetcher, topic, mocker): def test__parse_fetched_data__not_leader(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) - completed_fetch = CompletedFetch( - tp, 0, 0, [NotLeaderForPartitionError.errno, -1, None], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, [], error=NotLeaderForPartitionError) mocker.patch.object(fetcher._client.cluster, 'request_update') partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None @@ -550,10 +571,7 @@ def test__parse_fetched_data__not_leader(fetcher, topic, mocker): def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) - completed_fetch = CompletedFetch( - tp, 0, 0, [UnknownTopicOrPartitionError.errno, -1, None], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, [], error=UnknownTopicOrPartitionError) mocker.patch.object(fetcher._client.cluster, 'request_update') partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None @@ -563,10 +581,7 @@ def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker): def test__parse_fetched_data__out_of_range(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) - completed_fetch = CompletedFetch( - tp, 0, 0, [OffsetOutOfRangeError.errno, -1, None], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, [], error=OffsetOutOfRangeError) partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None assert fetcher._subscriptions.assignment[tp].awaiting_reset is True @@ -707,7 +722,7 @@ def test_reset_offsets_paused_with_valid(subscription_state, client, mocker): assert subscription_state.position(tp) == OffsetAndMetadata(10, '', -1) -def test_fetch_position_after_exception(client, mocker): +def test_fetch_position_after_exception(client): subscription_state = SubscriptionState(offset_reset_strategy='NONE') fetcher = Fetcher(client, subscription_state) @@ -721,12 +736,11 @@ def test_fetch_position_after_exception(client, mocker): assert len(fetcher._fetchable_partitions()) == 2 - empty_records = _build_record_batch([], offset=1) - three_records = _build_record_batch([(None, b'msg', None) for _ in range(3)], offset=1) + msgs = [(None, b'msg', None) for _ in range(3)] fetcher._completed_fetches.append( - CompletedFetch(tp1, 1, 0, [0, 100, three_records], mocker.MagicMock())) + _build_completed_fetch(tp1, msgs, offset=1)) fetcher._completed_fetches.append( - CompletedFetch(tp0, 1, 0, [1, 100, empty_records], mocker.MagicMock())) + _build_completed_fetch(tp0, [], error=OffsetOutOfRangeError, offset=1)) records, partial = fetcher.fetched_records() assert len(records) == 1 @@ -746,7 +760,7 @@ def test_fetch_position_after_exception(client, mocker): assert exceptions[0].args == ({tp0: 1},) -def test_seek_before_exception(client, mocker): +def test_seek_before_exception(client): subscription_state = SubscriptionState(offset_reset_strategy='NONE') fetcher = Fetcher(client, subscription_state, max_poll_records=2) @@ -757,9 +771,9 @@ def test_seek_before_exception(client, mocker): assert len(fetcher._fetchable_partitions()) == 1 - three_records = _build_record_batch([(None, b'msg', None) for _ in range(3)], offset=1) + msgs = [(None, b'msg', None) for _ in range(3)] fetcher._completed_fetches.append( - CompletedFetch(tp0, 1, 0, [0, 100, three_records], mocker.MagicMock())) + _build_completed_fetch(tp0, msgs, offset=1)) records, partial = fetcher.fetched_records() assert len(records) == 1 @@ -774,7 +788,7 @@ def test_seek_before_exception(client, mocker): empty_records = _build_record_batch([], offset=1) fetcher._completed_fetches.append( - CompletedFetch(tp1, 1, 0, [1, 100, empty_records], mocker.MagicMock())) + _build_completed_fetch(tp1, [], error=OffsetOutOfRangeError, offset=1)) records, partial = fetcher.fetched_records() assert len(records) == 1 From bf0487660dae1e8744402a739b6ee60ce37be56c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 09:16:12 -0700 Subject: [PATCH 02/15] KafkaNetClient: use manager.broker_version property --- kafka/net/compat.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/net/compat.py b/kafka/net/compat.py index 8a001eb3d..1d5b22fe7 100644 --- a/kafka/net/compat.py +++ b/kafka/net/compat.py @@ -94,15 +94,15 @@ def bootstrap_connected(self): return bootstrap_future is not None and not bootstrap_future.is_done def get_broker_version(self, timeout_ms=None): - if self._manager.broker_version_data is None: + if self._manager.broker_version is None: self._manager.bootstrap(timeout_ms) - return self._manager.broker_version_data.broker_version + return self._manager.broker_version def check_version(self, node_id=None, timeout_ms=10000): if not self._manager.bootstrapped: self._manager.bootstrap(timeout_ms) if node_id is None: - return self._manager.broker_version_data.broker_version + return self._manager.broker_version async def _check_version(broker_id): conn = await self._manager.get_connection(broker_id) return conn.broker_version From 1bf8bc14525ee05ef8db597b523e2d6f09cabafd Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 09:17:15 -0700 Subject: [PATCH 03/15] kafka.util: raise from None to clean KeyError --- kafka/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/util.py b/kafka/util.py index 5ba42be9a..bb7516a2d 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -145,7 +145,7 @@ def build_from(cls, val): try: return cls[str(val).strip().upper().replace('-', '_')] # pylint: disable=E1136 except KeyError: - raise ValueError(f'Unrecognized {cls.__name__}: {val}') + raise ValueError(f'Unrecognized {cls.__name__}: {val}') from None @classmethod def value_for(cls, val): @@ -156,4 +156,4 @@ def value_for(cls, val): try: return cls[str(val).upper().replace('-', '_')].value # pylint: disable=E1136 except KeyError: - raise ValueError(f'Unrecognized {cls.__name__}: {val}') + raise ValueError(f'Unrecognized {cls.__name__}: {val}') from None From 828feac4283948d03f270cd079a8762f52e6032f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 09:17:51 -0700 Subject: [PATCH 04/15] fetcher: prefer _manager to _client --- kafka/consumer/fetcher.py | 10 +++++----- test/consumer/test_fetcher.py | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index fa563922d..c07c2dacf 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -152,7 +152,7 @@ def send_fetches(self): for node_id, (request, fetch_offsets) in self._create_fetch_requests().items(): log.debug("Sending FetchRequest to node %s", node_id) self._nodes_with_pending_fetch_requests.add(node_id) - future = self._client.send(node_id, request, wakeup=False) + future = self._manager.send(request, node_id=node_id) future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.monotonic()) future.add_errback(self._handle_fetch_error, node_id) future.add_both(self._clear_pending_fetch_request, node_id) @@ -611,7 +611,7 @@ def _create_fetch_requests(self): # which can be passed to FetchRequest() via .items() fetchable = collections.defaultdict(collections.OrderedDict) for partition in self._fetchable_partitions(): - node_id = self._client.cluster.leader_for_partition(partition) + node_id = self._manager.cluster.leader_for_partition(partition) position = self._subscriptions.assignment[partition].position @@ -619,7 +619,7 @@ def _create_fetch_requests(self): if node_id is None or node_id == -1: log.debug("No leader found for partition %s." " Requesting metadata update", partition) - self._client.cluster.request_update() + self._manager.cluster.request_update() elif not self._client.connected(node_id) and self._client.connection_delay(node_id) > 0: # If we try to send during the reconnect backoff window, then the request is just @@ -802,7 +802,7 @@ def _parse_fetched_data(self, completed_fetch): Errors.UnknownTopicOrPartitionError, Errors.KafkaStorageError): log.debug("Error fetching partition %s: %s", tp, error_type.__name__) - self._client.cluster.request_update() + self._manager.cluster.request_update() elif error_type is Errors.OffsetOutOfRangeError: position = self._subscriptions.assignment[tp].position if position is None or position.offset != fetch_offset: @@ -821,7 +821,7 @@ def _parse_fetched_data(self, completed_fetch): elif getattr(error_type, 'retriable', False): log.debug("Retriable error fetching partition %s: %s", tp, error_type()) if getattr(error_type, 'invalid_metadata', False): - self._client.cluster.request_update() + self._manager.cluster.request_update() else: raise error_type('Unexpected error while fetching data') diff --git a/test/consumer/test_fetcher.py b/test/consumer/test_fetcher.py index ebf84823f..05d05db50 100644 --- a/test/consumer/test_fetcher.py +++ b/test/consumer/test_fetcher.py @@ -113,16 +113,16 @@ def build_fetch_offsets(request): return_value=(dict(enumerate(map(lambda r: (r, build_fetch_offsets(r)), fetch_requests))))) mocker.patch.object(fetcher._client, 'ready', return_value=True) - mocker.patch.object(fetcher._client, 'send') + mocker.patch.object(fetcher._manager, 'send') ret = fetcher.send_fetches() for node, request in enumerate(fetch_requests): - fetcher._client.send.assert_any_call(node, request, wakeup=False) + fetcher._manager.send.assert_any_call(request, node_id=node) assert len(ret) == len(fetch_requests) def test_create_fetch_requests(fetcher, mocker, assignment): - mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) - mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) + mocker.patch.object(fetcher._manager.cluster, "leader_for_partition", return_value=0) + mocker.patch.object(fetcher._manager.cluster, "leader_epoch_for_partition", return_value=0) mocker.patch.object(fetcher._client, "ready", return_value=True) by_node = fetcher._create_fetch_requests() assert len(by_node) == 1 From 82e239ff60adc4467761a87c422e93a3196b50bb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 12:15:16 -0700 Subject: [PATCH 05/15] fix typo --- kafka/consumer/fetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c07c2dacf..6a1b69967 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -686,7 +686,7 @@ def _create_fetch_requests(self): def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): """The callback for fetch completion""" - if response.API_VERSION >= 7 and self_enable_incremental_fetch_sessions: + if response.API_VERSION >= 7 and self._enable_incremental_fetch_sessions: if node_id not in self._session_handlers: log.error("Unable to find fetch session handler for node %s. Ignoring fetch response", node_id) return From dffce7c2c4e067f2c624c0b6dc034ff7338a910d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 12:16:07 -0700 Subject: [PATCH 06/15] handle timestamp / leader_epoch None -> defaults --- kafka/consumer/fetcher.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 6a1b69967..16aad91d5 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -542,9 +542,13 @@ def _handle_list_offsets_response(self, response): else: offset = partition_info.offset timestamp = partition_info.timestamp - if timestamp == -1: - timestamp = None leader_epoch = partition_info.leader_epoch + # DataContainer currently does not set default for + # out-of-version fields; so we need to handle explicitly + if timestamp is None: + timestamp = -1 + if leader_epoch is None: + leader_epoch = -1 log.debug("Handling ListOffsetsResponse response for %s. " "Fetched offset %s, timestamp %s, leader_epoch %s", tp, offset, timestamp, leader_epoch) From 6d8eb64bda8d726929f2766a31b9371906c19cb8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 12:17:11 -0700 Subject: [PATCH 07/15] Drop _client.connected and _client.ready checks in _create_fetch_requests --- kafka/consumer/fetcher.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 16aad91d5..55a0a005c 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -625,24 +625,19 @@ def _create_fetch_requests(self): " Requesting metadata update", partition) self._manager.cluster.request_update() - elif not self._client.connected(node_id) and self._client.connection_delay(node_id) > 0: + elif self._manager.connection_delay(node_id) > 0: # If we try to send during the reconnect backoff window, then the request is just # going to be failed anyway before being sent, so skip the send for now log.debug("Skipping fetch for partition %s because node %s is awaiting reconnect backoff", partition, node_id) + # TODO: handle throttle_delay in kafka.net elif self._client.throttle_delay(node_id) > 0: # If we try to send while throttled, then the request is just # going to be failed anyway before being sent, so skip the send for now log.debug("Skipping fetch for partition %s because node %s is throttled", partition, node_id) - elif not self._client.ready(node_id): - # Until we support send request queues, any attempt to send to a not-ready node will be - # immediately failed with NodeNotReadyError. - log.debug("Skipping fetch for partition %s because connection to leader node is not ready yet", - partition) - elif node_id in self._nodes_with_pending_fetch_requests: log.debug("Skipping fetch for partition %s because there is a pending fetch request to node %s", partition, node_id) From 25949edb628791327dfdadc5714e4c614c11b4d8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 12:18:03 -0700 Subject: [PATCH 08/15] Fix to_send / to_forget encoding --- kafka/consumer/fetcher.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 55a0a005c..f58ec2393 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -1218,19 +1218,19 @@ def to_send(self): for tp, partition_info in self._to_send.items(): partition_data[tp.topic].append(partition_info) return [ - _FetchTopic(topic=tp.topic, partitions=partitions) + _FetchTopic(topic=topic, partitions=partitions) for topic, partitions in partition_data.items() ] @property def to_forget(self): # Return as list of _ForgottenTopic data objects - # so it an be passed directly to encoder + # so it can be passed directly to encoder partition_data = collections.defaultdict(list) for tp in self._to_forget: partition_data[tp.topic].append(tp.partition) return [ - _ForgottenTopic(topic=tp.topic, partitions=partitions) + _ForgottenTopic(topic=topic, partitions=partitions) for topic, partitions in partition_data.items() ] From cbf9ca2ef08b442aeffdbafb538d0977bd02a185 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 12:18:20 -0700 Subject: [PATCH 09/15] Drop unused NoOffsetForPartitionError --- kafka/consumer/fetcher.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index f58ec2393..ef654c38b 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -41,10 +41,6 @@ _ListOffsetsPartition = _ListOffsetsTopic.ListOffsetsPartition -class NoOffsetForPartitionError(Errors.KafkaError): - pass - - class RecordTooLargeError(Errors.KafkaError): pass From 84ca8cbebcc5f7a39cfee6054b379a8156873218 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 12:24:51 -0700 Subject: [PATCH 10/15] optional timeout_ms for offset lookups --- kafka/consumer/fetcher.py | 6 +++--- kafka/consumer/group.py | 43 +++++++++++++++++++++------------------ 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index ef654c38b..7bdc566d0 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -270,15 +270,15 @@ async def _fetch_offsets_by_times_async(self, timestamps, timeout_ms=None): raise Errors.KafkaTimeoutError( "Failed to get offsets by timestamps in %s ms" % (timeout_ms,)) - def beginning_offsets(self, partitions, timeout_ms): + def beginning_offsets(self, partitions, timeout_ms=None): return self.beginning_or_end_offset( partitions, OffsetSpec.EARLIEST, timeout_ms) - def end_offsets(self, partitions, timeout_ms): + def end_offsets(self, partitions, timeout_ms=None): return self.beginning_or_end_offset( partitions, OffsetSpec.LATEST, timeout_ms) - def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): + def beginning_or_end_offset(self, partitions, timestamp, timeout_ms=None): timestamps = dict([(tp, timestamp) for tp in partitions]) offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) for tp in timestamps: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 5972d2bdf..1d239d097 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1027,7 +1027,7 @@ def metrics(self, raw=False): metrics[k.group][k.name] = v.value() return metrics - def offsets_for_times(self, timestamps): + def offsets_for_times(self, timestamps, timeout_ms=None): """Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the @@ -1041,17 +1041,18 @@ def offsets_for_times(self, timestamps): partition. ``None`` will also be returned for the partition if there are no messages in it. - Note: - This method may block indefinitely if the partition does not exist. + Note: This method may block indefinitely if the partition does not exist + and no timeout_ms provided. Arguments: timestamps (dict): ``{TopicPartition: int}`` mapping from partition to the timestamp to look up. Unit should be milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)) + timeout_ms (int, optional): Milliseconds to block fetching offsets. Returns: ``{TopicPartition: OffsetAndTimestamp}``: mapping from partition - to the timestamp and offset of the first message with timestamp + to the offset and timestamp of the first message with timestamp greater than or equal to the target timestamp. Raises: @@ -1060,6 +1061,7 @@ def offsets_for_times(self, timestamps): up the offsets by timestamp. KafkaTimeoutError: If fetch failed in request_timeout_ms """ + timeout_ms = self.config['request_timeout_ms'] if timeout_ms is None else timeout_ms if self.config['api_version'] <= (0, 10, 0): raise UnsupportedVersionError( "offsets_for_times API not supported for cluster version {}" @@ -1070,21 +1072,21 @@ def offsets_for_times(self, timestamps): raise ValueError( "The target time for partition {} is {}. The target time " "cannot be negative.".format(tp, ts)) - return self._fetcher.offsets_by_times( - timestamps, self.config['request_timeout_ms']) + return self._fetcher.offsets_by_times(timestamps, timeout_ms) - def beginning_offsets(self, partitions): + def beginning_offsets(self, partitions, timeout_ms=None): """Get the first offset for the given partitions. This method does not change the current consumer position of the partitions. - Note: - This method may block indefinitely if the partition does not exist. + Note: This method may block indefinitely if the partition does not exist + and no timeout_ms provided. Arguments: partitions (list): List of TopicPartition instances to fetch offsets for. + timeout_ms (int, optional): Milliseconds to block fetching offsets. Returns: ``{TopicPartition: int}``: The earliest available offsets for the @@ -1093,13 +1095,13 @@ def beginning_offsets(self, partitions): Raises: UnsupportedVersionError: If the broker does not support looking up the offsets by timestamp. - KafkaTimeoutError: If fetch failed in request_timeout_ms. + KafkaTimeoutError: If fetch failed in timeout_ms. """ - offsets = self._fetcher.beginning_offsets( - partitions, self.config['request_timeout_ms']) + timeout_ms = self.config['request_timeout_ms'] if timeout_ms is None else timeout_ms + offsets = self._fetcher.beginning_offsets(partitions, timeout_ms) return offsets - def end_offsets(self, partitions): + def end_offsets(self, partitions, timeout_ms=None): """Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1. @@ -1107,12 +1109,13 @@ def end_offsets(self, partitions): This method does not change the current consumer position of the partitions. - Note: - This method may block indefinitely if the partition does not exist. + Note: This method may block indefinitely if the partition does not exist + and no timeout_ms provided. Arguments: partitions (list): List of TopicPartition instances to fetch offsets for. + timeout_ms (int, optional): Milliseconds to block fetching offsets. Returns: ``{TopicPartition: int}``: The end offsets for the given partitions. @@ -1120,10 +1123,10 @@ def end_offsets(self, partitions): Raises: UnsupportedVersionError: If the broker does not support looking up the offsets by timestamp. - KafkaTimeoutError: If fetch failed in request_timeout_ms + KafkaTimeoutError: If fetch failed in timeout_ms """ - offsets = self._fetcher.end_offsets( - partitions, self.config['request_timeout_ms']) + timeout_ms = self.config['request_timeout_ms'] if timeout_ms is None else timeout_ms + offsets = self._fetcher.end_offsets(partitions, timeout_ms) return offsets def _update_fetch_positions(self, timeout_ms=None): @@ -1131,8 +1134,8 @@ def _update_fetch_positions(self, timeout_ms=None): or reset it using the offset reset policy the user has configured. Arguments: - partitions (List[TopicPartition]): The partitions that need - updating fetch positions. + timeout_ms (int, optional): Milliseconds to block refreshing committed + offsets. Returns True if fetch positions updated, False if timeout or async reset is pending From 2b296b0240f6e56abe6958ae636d350a0c9e6ac0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 12:25:35 -0700 Subject: [PATCH 11/15] revert ListOffsets max_version = 5; add detailed TODO --- kafka/consumer/fetcher.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7bdc566d0..dc7ff5e09 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -492,7 +492,13 @@ def _group_list_offset_requests(self, timestamps): return dict(timestamps_by_node) async def _send_list_offsets_request(self, node_id, timestamps_and_epochs): - max_version = 6 # TODO: support 7-10 via OffsetSpec + # TODO: + # v6 flexible + # v7 MAX_TIMESTAMP (KIP-734) + # v8 EARLIEST_LOCAL (KIP-405) + # v9 LATEST_TIERED (KIP-1005) + # v10 async remote (KIP-1075) + max_version = 5 min_version = ListOffsetsRequest.min_version_for_isolation_level(self._isolation_level) by_topic = collections.defaultdict(list) for tp, (timestamp, leader_epoch) in timestamps_and_epochs.items(): From 69f34cc14c517bb3ab5f8050b75ecb0ba4c0f020 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 12:25:54 -0700 Subject: [PATCH 12/15] docstrings --- kafka/consumer/fetcher.py | 98 ++++++++++++++++++++++++++++++++++----- 1 file changed, 87 insertions(+), 11 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index dc7ff5e09..3961c2b64 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -173,15 +173,11 @@ def in_flight_fetches(self): def reset_offsets_if_needed(self): """Reset offsets for the given partitions using the offset reset strategy. - Arguments: - partitions ([TopicPartition]): the partitions that need offsets reset - Returns: bool: True if any partitions need reset; otherwise False (no reset pending) Raises: NoOffsetForPartitionError: if no offset reset strategy is defined - KafkaTimeoutError if timeout_ms provided """ # Raise exception from previous offset fetch if there is one exc, self._cached_list_offsets_exception = self._cached_list_offsets_exception, None @@ -215,10 +211,10 @@ def offsets_by_times(self, timestamps, timeout_ms=None): timeout_ms (int, optional): The maximum time in milliseconds to block. Returns: - {TopicPartition: OffsetAndTimestamp}: Mapping of partition to - retrieved offset, timestamp, and leader_epoch. If offset does not exist for - the provided timestamp, that partition will be missing from - this mapping. + {TopicPartition: OffsetAndTimestamp or None}: Mapping of partition to + retrieved offset, timestamp, and leader_epoch. If offset does not + exist for the provided timestamp, the value for the TopicPartition + will be None. Raises: KafkaTimeoutError if timeout_ms provided @@ -230,6 +226,24 @@ def offsets_by_times(self, timestamps, timeout_ms=None): return offsets async def _fetch_offsets_by_times_async(self, timestamps, timeout_ms=None): + """Fetch offsets for each partition in timestamps dict. This may send + request to multiple nodes, based on who is Leader for partition. + + Per-node requests are dispatched concurrently; if any fails, the first + exception encountered propagates and the remaining results are dropped. + + Arguments: + timestamps (dict): {TopicPartition: int} mapping of partitions to + timestamps or OffsetSpec sentinels. + + Returns: + (fetched_offsets, partitions_to_retry): + dict[TopicPartition, OffsetAndTimestamp], + set[TopicPartition] + + Raises: + KafkaTimeoutError: if offsets cannot be fully fetched before timeout_ms + """ if not timestamps: return {} @@ -271,14 +285,62 @@ async def _fetch_offsets_by_times_async(self, timestamps, timeout_ms=None): "Failed to get offsets by timestamps in %s ms" % (timeout_ms,)) def beginning_offsets(self, partitions, timeout_ms=None): + """Fetch earliest (oldest) offset for each partition. + + Blocks until offsets are obtained, a non-retriable exception is raised + or ``timeout_ms`` passed. + + Arguments: + partitions ([TopicPartition]): List of partitions for list offsets. + timeout_ms (int, optional): The maximum time in milliseconds to block. + + Returns: + {TopicPartition: int}: Mapping of partition to retrieved offset. + + Raises: + KafkaTimeoutError if timeout_ms provided. + """ return self.beginning_or_end_offset( partitions, OffsetSpec.EARLIEST, timeout_ms) def end_offsets(self, partitions, timeout_ms=None): + """Fetch latest (most recent) offset for each partition. + + Blocks until offsets are obtained, a non-retriable exception is raised + or ``timeout_ms`` passed. + + Arguments: + partitions ([TopicPartition]): List of partitions for list offsets. + timeout_ms (int, optional): The maximum time in milliseconds to block. + + Returns: + {TopicPartition: int}: Mapping of partition to retrieved offset. + + Raises: + KafkaTimeoutError if timeout_ms provided. + """ return self.beginning_or_end_offset( partitions, OffsetSpec.LATEST, timeout_ms) def beginning_or_end_offset(self, partitions, timestamp, timeout_ms=None): + """Fetch offset for each partition using ``timestamp``. + + Blocks until offsets are obtained, a non-retriable exception is raised + or ``timeout_ms`` passed. + + Arguments: + partitions ([TopicPartition]): List of partitions for list offsets. + timestamp (int or OffsetSpec): OffsetSpec.LATEST (-1) for the latest + available, OffsetSpec.EARLIEST (-2) for the earliest available. + Otherwise timestamp is treated as epoch milliseconds. + timeout_ms (int, optional): The maximum time in milliseconds to block. + + Returns: + {TopicPartition: int}: Mapping of partition to retrieved offset. + + Raises: + KafkaTimeoutError if timeout_ms provided. + """ timestamps = dict([(tp, timestamp) for tp in partitions]) offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) for tp in timestamps: @@ -452,7 +514,9 @@ async def _send_list_offsets_requests(self, timestamps): timestamps. Returns: - (fetched_offsets, partitions_to_retry) + (fetched_offsets, partitions_to_retry): + dict[TopicPartition, OffsetAndTimestamp], + set[TopicPartition] Raises: StaleMetadata: if no node has known leader for any partition. @@ -492,6 +556,16 @@ def _group_list_offset_requests(self, timestamps): return dict(timestamps_by_node) async def _send_list_offsets_request(self, node_id, timestamps_and_epochs): + """Send single ListOffsetsResponse to node_id + + Returns: + (fetched_offsets, partitions_to_retry): + dict[TopicPartition, OffsetAndTimestamp], + set[TopicPartition] + + Raises: + TopicAuthorizationFailedError: if any topic returned an auth error + """ # TODO: # v6 flexible # v7 MAX_TIMESTAMP (KIP-734) @@ -523,7 +597,9 @@ def _handle_list_offsets_response(self, response): """Parse a ListOffsets response. Returns: - (fetched_offsets, partitions_to_retry): dict[TopicPartition, OffsetAndTimestamp], set[TopicPartition] + (fetched_offsets, partitions_to_retry): + dict[TopicPartition, OffsetAndTimestamp], + set[TopicPartition] Raises: TopicAuthorizationFailedError: if any topic returned an auth error @@ -602,7 +678,7 @@ def _create_fetch_requests(self): FetchRequests skipped if no leader, or node has requests in flight Returns: - dict: {node_id: (FetchRequest, {TopicPartition: fetch_offset}), ...} (version depends on client api_versions) + dict: {node_id: (FetchRequest, {TopicPartition: fetch_offset}), ...} """ # TODO: # v12 epoch detection / validation From 4efe32d051deef7507d4f5f6f09ee2f30054c32a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 12:26:16 -0700 Subject: [PATCH 13/15] partition => tp --- kafka/consumer/fetcher.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 3961c2b64..f590b3b0c 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -688,49 +688,46 @@ def _create_fetch_requests(self): # v16 node endpoints (KIP-951) # v17 directory id (KIP-853) max_version = 10 - - # create the fetch info as a dict of lists of partition info tuples - # which can be passed to FetchRequest() via .items() fetchable = collections.defaultdict(collections.OrderedDict) - for partition in self._fetchable_partitions(): - node_id = self._manager.cluster.leader_for_partition(partition) + for tp in self._fetchable_partitions(): + node_id = self._manager.cluster.leader_for_partition(tp) - position = self._subscriptions.assignment[partition].position + position = self._subscriptions.assignment[tp].position # fetch if there is a leader and no in-flight requests if node_id is None or node_id == -1: log.debug("No leader found for partition %s." - " Requesting metadata update", partition) + " Requesting metadata update", tp) self._manager.cluster.request_update() elif self._manager.connection_delay(node_id) > 0: # If we try to send during the reconnect backoff window, then the request is just # going to be failed anyway before being sent, so skip the send for now log.debug("Skipping fetch for partition %s because node %s is awaiting reconnect backoff", - partition, node_id) + tp, node_id) # TODO: handle throttle_delay in kafka.net elif self._client.throttle_delay(node_id) > 0: # If we try to send while throttled, then the request is just # going to be failed anyway before being sent, so skip the send for now log.debug("Skipping fetch for partition %s because node %s is throttled", - partition, node_id) + tp, node_id) elif node_id in self._nodes_with_pending_fetch_requests: log.debug("Skipping fetch for partition %s because there is a pending fetch request to node %s", - partition, node_id) + tp, node_id) else: # Leader is connected and does not have a pending fetch request partition_info = _FetchPartition( - partition=partition.partition, + partition=tp.partition, current_leader_epoch=position.leader_epoch, fetch_offset=position.offset, partition_max_bytes=self.config['max_partition_fetch_bytes'] ) - fetchable[node_id][partition] = partition_info + fetchable[node_id][tp] = partition_info log.debug("Adding fetch request for partition %s at offset %d", - partition, position.offset) + tp, position.offset) requests = {} for node_id, next_partitions in fetchable.items(): From d3b19d6bbd7273cead7c853d323a4b2257c1aa21 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 12:35:09 -0700 Subject: [PATCH 14/15] Drop version guard from offsets_for_times (protocol will handle if necessary) --- kafka/consumer/group.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 1d239d097..89ca93fa5 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1062,10 +1062,6 @@ def offsets_for_times(self, timestamps, timeout_ms=None): KafkaTimeoutError: If fetch failed in request_timeout_ms """ timeout_ms = self.config['request_timeout_ms'] if timeout_ms is None else timeout_ms - if self.config['api_version'] <= (0, 10, 0): - raise UnsupportedVersionError( - "offsets_for_times API not supported for cluster version {}" - .format(self.config['api_version'])) for tp, ts in timestamps.items(): timestamps[tp] = int(ts) if ts < 0: From b6ea49d1ed5cc02e8bfa1197390bded431fc9778 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Apr 2026 12:58:32 -0700 Subject: [PATCH 15/15] ListOffsetsRequest min_version 1 for >=0 timestamp; IncompatibleBrokerVersion->UnsupportedVersionError --- kafka/consumer/fetcher.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index f590b3b0c..5cf5e34b9 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -339,6 +339,8 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms=None): {TopicPartition: int}: Mapping of partition to retrieved offset. Raises: + UnsupportedVersionError if broker does not support any compatible + ListOffsetsRequest api version. KafkaTimeoutError if timeout_ms provided. """ timestamps = dict([(tp, timestamp) for tp in partitions]) @@ -573,7 +575,8 @@ async def _send_list_offsets_request(self, node_id, timestamps_and_epochs): # v9 LATEST_TIERED (KIP-1005) # v10 async remote (KIP-1075) max_version = 5 - min_version = ListOffsetsRequest.min_version_for_isolation_level(self._isolation_level) + min_version = 1 if any(res[0] >= 0 for res in timestamps_and_epochs.values()) else 0 + min_version = max(min_version, ListOffsetsRequest.min_version_for_isolation_level(self._isolation_level)) by_topic = collections.defaultdict(list) for tp, (timestamp, leader_epoch) in timestamps_and_epochs.items(): data = _ListOffsetsPartition( @@ -590,7 +593,11 @@ async def _send_list_offsets_request(self, node_id, timestamps_and_epochs): ) log.debug("Sending ListOffsetRequest %s to broker %s", request, node_id) - response = await self._manager.send(request, node_id=node_id) + try: + response = await self._manager.send(request, node_id=node_id) + except Errors.IncompatibleBrokerVersion as exc: + # TODO: push this down to connection or bvd + raise Errors.UnsupportedVersionError(exc.args[0]) from None return self._handle_list_offsets_response(response) def _handle_list_offsets_response(self, response):