diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index f00482554..5cf5e34b9 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -10,7 +10,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.consumer import FetchRequest from kafka.protocol.consumer import ( - ListOffsetsRequest, OffsetResetStrategy, UNKNOWN_OFFSET + ListOffsetsRequest, OffsetSpec, UNKNOWN_OFFSET, IsolationLevel ) from kafka.record import MemoryRecords from kafka.serializer import Deserializer @@ -20,15 +20,6 @@ log = logging.getLogger(__name__) -# Isolation levels -READ_UNCOMMITTED = 0 -READ_COMMITTED = 1 - -ISOLATION_LEVEL_CONFIG = { - 'read_uncommitted': READ_UNCOMMITTED, - 'read_committed': READ_COMMITTED, -} - ConsumerRecord = collections.namedtuple("ConsumerRecord", ["topic", "partition", "leader_epoch", "offset", "timestamp", "timestamp_type", "key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"]) @@ -43,8 +34,11 @@ ["partition", "fetched_offset", "exception"]) -class NoOffsetForPartitionError(Errors.KafkaError): - pass +_FetchTopic = FetchRequest.FetchTopic +_FetchPartition = _FetchTopic.FetchPartition +_ForgottenTopic = FetchRequest.ForgottenTopic +_ListOffsetsTopic = ListOffsetsRequest.ListOffsetsTopic +_ListOffsetsPartition = _ListOffsetsTopic.ListOffsetsPartition class RecordTooLargeError(Errors.KafkaError): @@ -116,8 +110,10 @@ def __init__(self, client, subscriptions, **configs): if key in configs: self.config[key] = configs[key] - if self.config['isolation_level'] not in ISOLATION_LEVEL_CONFIG: - raise Errors.KafkaConfigurationError('Unrecognized isolation_level') + try: + self._isolation_level = IsolationLevel.build_from(self.config['isolation_level']) + except ValueError: + raise Errors.KafkaConfigurationError('Unrecognized isolation_level') from None self._client = client self._manager = client._manager @@ -130,12 +126,17 @@ def __init__(self, client, subscriptions, **configs): self._sensors = FetchManagerMetrics(self.config['metrics'], self.config['metric_group_prefix']) else: self._sensors = None - self._isolation_level = ISOLATION_LEVEL_CONFIG[self.config['isolation_level']] self._session_handlers = {} self._nodes_with_pending_fetch_requests = set() self._cached_list_offsets_exception = None self._next_in_line_exception_metadata = None + @property + def _enable_incremental_fetch_sessions(self): + if self._manager.broker_version is None or self._manager.broker_version < (1, 1): + return False + return self.config['enable_incremental_fetch_sessions'] + def send_fetches(self): """Send FetchRequests for all assigned partitions that do not already have an in-flight fetch or pending fetch data. @@ -147,7 +148,7 @@ def send_fetches(self): for node_id, (request, fetch_offsets) in self._create_fetch_requests().items(): log.debug("Sending FetchRequest to node %s", node_id) self._nodes_with_pending_fetch_requests.add(node_id) - future = self._client.send(node_id, request, wakeup=False) + future = self._manager.send(request, node_id=node_id) future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.monotonic()) future.add_errback(self._handle_fetch_error, node_id) future.add_both(self._clear_pending_fetch_request, node_id) @@ -172,15 +173,11 @@ def in_flight_fetches(self): def reset_offsets_if_needed(self): """Reset offsets for the given partitions using the offset reset strategy. - Arguments: - partitions ([TopicPartition]): the partitions that need offsets reset - Returns: bool: True if any partitions need reset; otherwise False (no reset pending) Raises: NoOffsetForPartitionError: if no offset reset strategy is defined - KafkaTimeoutError if timeout_ms provided """ # Raise exception from previous offset fetch if there is one exc, self._cached_list_offsets_exception = self._cached_list_offsets_exception, None @@ -214,10 +211,10 @@ def offsets_by_times(self, timestamps, timeout_ms=None): timeout_ms (int, optional): The maximum time in milliseconds to block. Returns: - {TopicPartition: OffsetAndTimestamp}: Mapping of partition to - retrieved offset, timestamp, and leader_epoch. If offset does not exist for - the provided timestamp, that partition will be missing from - this mapping. + {TopicPartition: OffsetAndTimestamp or None}: Mapping of partition to + retrieved offset, timestamp, and leader_epoch. If offset does not + exist for the provided timestamp, the value for the TopicPartition + will be None. Raises: KafkaTimeoutError if timeout_ms provided @@ -229,6 +226,24 @@ def offsets_by_times(self, timestamps, timeout_ms=None): return offsets async def _fetch_offsets_by_times_async(self, timestamps, timeout_ms=None): + """Fetch offsets for each partition in timestamps dict. This may send + request to multiple nodes, based on who is Leader for partition. + + Per-node requests are dispatched concurrently; if any fails, the first + exception encountered propagates and the remaining results are dropped. + + Arguments: + timestamps (dict): {TopicPartition: int} mapping of partitions to + timestamps or OffsetSpec sentinels. + + Returns: + (fetched_offsets, partitions_to_retry): + dict[TopicPartition, OffsetAndTimestamp], + set[TopicPartition] + + Raises: + KafkaTimeoutError: if offsets cannot be fully fetched before timeout_ms + """ if not timestamps: return {} @@ -269,15 +284,65 @@ async def _fetch_offsets_by_times_async(self, timestamps, timeout_ms=None): raise Errors.KafkaTimeoutError( "Failed to get offsets by timestamps in %s ms" % (timeout_ms,)) - def beginning_offsets(self, partitions, timeout_ms): + def beginning_offsets(self, partitions, timeout_ms=None): + """Fetch earliest (oldest) offset for each partition. + + Blocks until offsets are obtained, a non-retriable exception is raised + or ``timeout_ms`` passed. + + Arguments: + partitions ([TopicPartition]): List of partitions for list offsets. + timeout_ms (int, optional): The maximum time in milliseconds to block. + + Returns: + {TopicPartition: int}: Mapping of partition to retrieved offset. + + Raises: + KafkaTimeoutError if timeout_ms provided. + """ return self.beginning_or_end_offset( - partitions, OffsetResetStrategy.EARLIEST, timeout_ms) + partitions, OffsetSpec.EARLIEST, timeout_ms) + + def end_offsets(self, partitions, timeout_ms=None): + """Fetch latest (most recent) offset for each partition. + + Blocks until offsets are obtained, a non-retriable exception is raised + or ``timeout_ms`` passed. + + Arguments: + partitions ([TopicPartition]): List of partitions for list offsets. + timeout_ms (int, optional): The maximum time in milliseconds to block. - def end_offsets(self, partitions, timeout_ms): + Returns: + {TopicPartition: int}: Mapping of partition to retrieved offset. + + Raises: + KafkaTimeoutError if timeout_ms provided. + """ return self.beginning_or_end_offset( - partitions, OffsetResetStrategy.LATEST, timeout_ms) + partitions, OffsetSpec.LATEST, timeout_ms) + + def beginning_or_end_offset(self, partitions, timestamp, timeout_ms=None): + """Fetch offset for each partition using ``timestamp``. + + Blocks until offsets are obtained, a non-retriable exception is raised + or ``timeout_ms`` passed. + + Arguments: + partitions ([TopicPartition]): List of partitions for list offsets. + timestamp (int or OffsetSpec): OffsetSpec.LATEST (-1) for the latest + available, OffsetSpec.EARLIEST (-2) for the earliest available. + Otherwise timestamp is treated as epoch milliseconds. + timeout_ms (int, optional): The maximum time in milliseconds to block. + + Returns: + {TopicPartition: int}: Mapping of partition to retrieved offset. - def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): + Raises: + UnsupportedVersionError if broker does not support any compatible + ListOffsetsRequest api version. + KafkaTimeoutError if timeout_ms provided. + """ timestamps = dict([(tp, timestamp) for tp in partitions]) offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) for tp in timestamps: @@ -451,7 +516,9 @@ async def _send_list_offsets_requests(self, timestamps): timestamps. Returns: - (fetched_offsets, partitions_to_retry) + (fetched_offsets, partitions_to_retry): + dict[TopicPartition, OffsetAndTimestamp], + set[TopicPartition] Raises: StaleMetadata: if no node has known leader for any partition. @@ -476,53 +543,70 @@ async def _send_list_offsets_requests(self, timestamps): def _group_list_offset_requests(self, timestamps): timestamps_by_node = collections.defaultdict(dict) for partition, timestamp in timestamps.items(): - node_id = self._client.cluster.leader_for_partition(partition) + node_id = self._manager.cluster.leader_for_partition(partition) if node_id is None: - self._client.cluster.add_topic(partition.topic) + self._manager.cluster.add_topic(partition.topic) log.debug("Partition %s is unknown for fetching offset", partition) - self._client.cluster.request_update() + self._manager.cluster.request_update() elif node_id == -1: log.debug("Leader for partition %s unavailable for fetching " "offset, wait for metadata refresh", partition) - self._client.cluster.request_update() + self._manager.cluster.request_update() else: leader_epoch = -1 timestamps_by_node[node_id][partition] = (timestamp, leader_epoch) return dict(timestamps_by_node) async def _send_list_offsets_request(self, node_id, timestamps_and_epochs): - version = self._client.api_version(ListOffsetsRequest, max_version=5) - if self.config['isolation_level'] == 'read_committed' and version < 2: - raise Errors.UnsupportedVersionError('read_committed isolation level requires ListOffsetsRequest >= v2') + """Send single ListOffsetsResponse to node_id + + Returns: + (fetched_offsets, partitions_to_retry): + dict[TopicPartition, OffsetAndTimestamp], + set[TopicPartition] + + Raises: + TopicAuthorizationFailedError: if any topic returned an auth error + """ + # TODO: + # v6 flexible + # v7 MAX_TIMESTAMP (KIP-734) + # v8 EARLIEST_LOCAL (KIP-405) + # v9 LATEST_TIERED (KIP-1005) + # v10 async remote (KIP-1075) + max_version = 5 + min_version = 1 if any(res[0] >= 0 for res in timestamps_and_epochs.values()) else 0 + min_version = max(min_version, ListOffsetsRequest.min_version_for_isolation_level(self._isolation_level)) by_topic = collections.defaultdict(list) for tp, (timestamp, leader_epoch) in timestamps_and_epochs.items(): - if version >= 4: - data = (tp.partition, leader_epoch, timestamp) - elif version >= 1: - data = (tp.partition, timestamp) - else: - data = (tp.partition, timestamp, 1) + data = _ListOffsetsPartition( + partition_index=tp.partition, + current_leader_epoch=leader_epoch, + timestamp=timestamp) by_topic[tp.topic].append(data) - if version >= 2: - request = ListOffsetsRequest[version]( - -1, - self._isolation_level, - list(by_topic.items())) - else: - request = ListOffsetsRequest[version]( - -1, - list(by_topic.items())) + request = ListOffsetsRequest( + isolation_level=self._isolation_level, + topics=list(by_topic.items()), + min_version=min_version, + max_version=max_version, + ) log.debug("Sending ListOffsetRequest %s to broker %s", request, node_id) - response = await self._manager.send(request, node_id=node_id) + try: + response = await self._manager.send(request, node_id=node_id) + except Errors.IncompatibleBrokerVersion as exc: + # TODO: push this down to connection or bvd + raise Errors.UnsupportedVersionError(exc.args[0]) from None return self._handle_list_offsets_response(response) def _handle_list_offsets_response(self, response): """Parse a ListOffsets response. Returns: - (fetched_offsets, partitions_to_retry): dict[TopicPartition, OffsetAndTimestamp], set[TopicPartition] + (fetched_offsets, partitions_to_retry): + dict[TopicPartition, OffsetAndTimestamp], + set[TopicPartition] Raises: TopicAuthorizationFailedError: if any topic returned an auth error @@ -530,57 +614,57 @@ def _handle_list_offsets_response(self, response): fetched_offsets = dict() partitions_to_retry = set() unauthorized_topics = set() - for topic, part_data in response.topics: - for partition_info in part_data: - partition, error_code = partition_info[:2] - partition = TopicPartition(topic, partition) + for topic_data in response.topics: + for partition_info in topic_data.partitions: + tp = TopicPartition(topic_data.name, partition_info.partition_index) + error_code = partition_info.error_code error_type = Errors.for_code(error_code) if error_type is Errors.NoError: if response.API_VERSION == 0: - offsets = partition_info[2] + offsets = partition_info.old_style_offsets assert len(offsets) <= 1, 'Expected ListOffsetsResponse with one offset' - if not offsets: - offset = UNKNOWN_OFFSET - else: - offset = offsets[0] - timestamp = None - leader_epoch = -1 - elif response.API_VERSION <= 3: - timestamp, offset = partition_info[2:] - leader_epoch = -1 + offset = offsets[0] if offsets else UNKNOWN_OFFSET else: - timestamp, offset, leader_epoch = partition_info[2:] + offset = partition_info.offset + timestamp = partition_info.timestamp + leader_epoch = partition_info.leader_epoch + # DataContainer currently does not set default for + # out-of-version fields; so we need to handle explicitly + if timestamp is None: + timestamp = -1 + if leader_epoch is None: + leader_epoch = -1 log.debug("Handling ListOffsetsResponse response for %s. " "Fetched offset %s, timestamp %s, leader_epoch %s", - partition, offset, timestamp, leader_epoch) + tp, offset, timestamp, leader_epoch) if offset != UNKNOWN_OFFSET: - fetched_offsets[partition] = OffsetAndTimestamp(offset, timestamp, leader_epoch) + fetched_offsets[tp] = OffsetAndTimestamp(offset, timestamp, leader_epoch) elif error_type is Errors.UnsupportedForMessageFormatError: # The message format on the broker side is before 0.10.0, which means it does not # support timestamps. We treat this case the same as if we weren't able to find an # offset corresponding to the requested timestamp and leave it out of the result. log.debug("Cannot search by timestamp for partition %s because the" - " message format version is before 0.10.0", partition) + " message format version is before 0.10.0", tp) elif error_type in (Errors.NotLeaderForPartitionError, Errors.ReplicaNotAvailableError, Errors.KafkaStorageError, Errors.OffsetNotAvailableError, Errors.LeaderNotAvailableError): log.debug("Attempt to fetch offsets for partition %s failed due" - " to %s, retrying.", error_type.__name__, partition) - partitions_to_retry.add(partition) + " to %s, retrying.", error_type.__name__, tp) + partitions_to_retry.add(tp) elif error_type is Errors.UnknownTopicOrPartitionError: log.warning("Received unknown topic or partition error in ListOffsets " "request for partition %s. The topic/partition " + "may not exist or the user may not have Describe access " - "to it.", partition) - partitions_to_retry.add(partition) + "to it.", tp) + partitions_to_retry.add(tp) elif error_type is Errors.TopicAuthorizationFailedError: - unauthorized_topics.add(topic) + unauthorized_topics.add(tp.topic) else: log.warning("Attempt to fetch offsets for partition %s failed due to:" - " %s", partition, error_type.__name__) - partitions_to_retry.add(partition) + " %s", tp, error_type.__name__) + partitions_to_retry.add(tp) if unauthorized_topics: raise Errors.TopicAuthorizationFailedError(unauthorized_topics) return fetched_offsets, partitions_to_retry @@ -601,77 +685,60 @@ def _create_fetch_requests(self): FetchRequests skipped if no leader, or node has requests in flight Returns: - dict: {node_id: (FetchRequest, {TopicPartition: fetch_offset}), ...} (version depends on client api_versions) + dict: {node_id: (FetchRequest, {TopicPartition: fetch_offset}), ...} """ - # create the fetch info as a dict of lists of partition info tuples - # which can be passed to FetchRequest() via .items() - version = self._client.api_version(FetchRequest, max_version=10) + # TODO: + # v12 epoch detection / validation + # v13 topic ids (KIP-516) + # v14 tiered storage (KIP-405) + # v15 replica state (KIP-903) + # v16 node endpoints (KIP-951) + # v17 directory id (KIP-853) + max_version = 10 fetchable = collections.defaultdict(collections.OrderedDict) + for tp in self._fetchable_partitions(): + node_id = self._manager.cluster.leader_for_partition(tp) - for partition in self._fetchable_partitions(): - node_id = self._client.cluster.leader_for_partition(partition) - - position = self._subscriptions.assignment[partition].position + position = self._subscriptions.assignment[tp].position # fetch if there is a leader and no in-flight requests if node_id is None or node_id == -1: log.debug("No leader found for partition %s." - " Requesting metadata update", partition) - self._client.cluster.request_update() + " Requesting metadata update", tp) + self._manager.cluster.request_update() - elif not self._client.connected(node_id) and self._client.connection_delay(node_id) > 0: + elif self._manager.connection_delay(node_id) > 0: # If we try to send during the reconnect backoff window, then the request is just # going to be failed anyway before being sent, so skip the send for now log.debug("Skipping fetch for partition %s because node %s is awaiting reconnect backoff", - partition, node_id) + tp, node_id) + # TODO: handle throttle_delay in kafka.net elif self._client.throttle_delay(node_id) > 0: # If we try to send while throttled, then the request is just # going to be failed anyway before being sent, so skip the send for now log.debug("Skipping fetch for partition %s because node %s is throttled", - partition, node_id) - - elif not self._client.ready(node_id): - # Until we support send request queues, any attempt to send to a not-ready node will be - # immediately failed with NodeNotReadyError. - log.debug("Skipping fetch for partition %s because connection to leader node is not ready yet", - partition) + tp, node_id) elif node_id in self._nodes_with_pending_fetch_requests: log.debug("Skipping fetch for partition %s because there is a pending fetch request to node %s", - partition, node_id) + tp, node_id) else: # Leader is connected and does not have a pending fetch request - if version < 5: - partition_info = ( - partition.partition, - position.offset, - self.config['max_partition_fetch_bytes'] - ) - elif version <= 8: - partition_info = ( - partition.partition, - position.offset, - -1, # log_start_offset is used internally by brokers / replicas only - self.config['max_partition_fetch_bytes'], - ) - else: - partition_info = ( - partition.partition, - position.leader_epoch, - position.offset, - -1, # log_start_offset is used internally by brokers / replicas only - self.config['max_partition_fetch_bytes'], - ) - - fetchable[node_id][partition] = partition_info + partition_info = _FetchPartition( + partition=tp.partition, + current_leader_epoch=position.leader_epoch, + fetch_offset=position.offset, + partition_max_bytes=self.config['max_partition_fetch_bytes'] + ) + fetchable[node_id][tp] = partition_info log.debug("Adding fetch request for partition %s at offset %d", - partition, position.offset) + tp, position.offset) requests = {} for node_id, next_partitions in fetchable.items(): - if version >= 7 and self.config['enable_incremental_fetch_sessions']: + if self._enable_incremental_fetch_sessions: if node_id not in self._session_handlers: self._session_handlers[node_id] = FetchSessionHandler(node_id) session = self._session_handlers[node_id].build_next(next_partitions) @@ -679,77 +746,57 @@ def _create_fetch_requests(self): # No incremental fetch support session = FetchRequestData(next_partitions, None, FetchMetadata.LEGACY) - if version <= 2: - request = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - session.to_send) - elif version == 3: - request = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - self.config['fetch_max_bytes'], - session.to_send) - elif version <= 6: - request = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - self.config['fetch_max_bytes'], - self._isolation_level, - session.to_send) - else: - # Through v8 - request = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - self.config['fetch_max_bytes'], - self._isolation_level, - session.id, - session.epoch, - session.to_send, - session.to_forget) - - fetch_offsets = {} - for tp, partition_data in next_partitions.items(): - if version <= 8: - offset = partition_data[1] - else: - offset = partition_data[2] - fetch_offsets[tp] = offset + min_version = FetchRequest.min_version_for_isolation_level(self._isolation_level) + request = FetchRequest( + max_wait_ms=self.config['fetch_max_wait_ms'], + min_bytes=self.config['fetch_min_bytes'], + max_bytes=self.config['fetch_max_bytes'], + isolation_level=self._isolation_level, + session_id=session.id, + session_epoch=session.epoch, + topics=session.to_send, + forgotten_topics_data=session.to_forget, + min_version=min_version, + max_version=max_version, + ) + fetch_offsets = {tp: next_partitions[tp].fetch_offset for tp in next_partitions} requests[node_id] = (request, fetch_offsets) return requests def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): """The callback for fetch completion""" - if response.API_VERSION >= 7 and self.config['enable_incremental_fetch_sessions']: + if response.API_VERSION >= 7 and self._enable_incremental_fetch_sessions: if node_id not in self._session_handlers: log.error("Unable to find fetch session handler for node %s. Ignoring fetch response", node_id) return if not self._session_handlers[node_id].handle_response(response): return - partitions = set([TopicPartition(topic, partition_data[0]) - for topic, partitions in response.responses - for partition_data in partitions]) + partitions = set([ + TopicPartition( + topic_data.topic, + partition_data.partition_index) + for topic_data in response.responses + for partition_data in topic_data.partitions + ]) if self._sensors: metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions) else: metric_aggregator = None - for topic, partitions in response.responses: - for partition_data in partitions: - tp = TopicPartition(topic, partition_data[0]) + for topic_data in response.responses: + for partition_data in topic_data.partitions: + tp = TopicPartition( + topic_data.topic, + partition_data.partition_index + ) fetch_offset = fetch_offsets[tp] completed_fetch = CompletedFetch( tp, fetch_offset, response.API_VERSION, - partition_data[1:], + partition_data, metric_aggregator ) self._completed_fetches.append(completed_fetch) @@ -772,7 +819,8 @@ def _clear_pending_fetch_request(self, node_id, _): def _parse_fetched_data(self, completed_fetch): tp = completed_fetch.topic_partition fetch_offset = completed_fetch.fetched_offset - error_code, highwater = completed_fetch.partition_data[:2] + error_code = completed_fetch.partition_data.error_code + highwater = completed_fetch.partition_data.high_watermark error_type = Errors.for_code(error_code) parsed_records = None @@ -796,12 +844,8 @@ def _parse_fetched_data(self, completed_fetch): position.offset) return None - records = MemoryRecords(completed_fetch.partition_data[-1]) - aborted_transactions = None - if completed_fetch.response_version >= 11: - aborted_transactions = completed_fetch.partition_data[-3] - elif completed_fetch.response_version >= 4: - aborted_transactions = completed_fetch.partition_data[-2] + records = MemoryRecords(completed_fetch.partition_data.records) + aborted_transactions = completed_fetch.partition_data.aborted_transactions log.debug("Preparing to read %s bytes of data for partition %s with offset %d", records.size_in_bytes(), tp, fetch_offset) parsed_records = self.PartitionRecords(fetch_offset, tp, records, @@ -839,7 +883,7 @@ def _parse_fetched_data(self, completed_fetch): Errors.UnknownTopicOrPartitionError, Errors.KafkaStorageError): log.debug("Error fetching partition %s: %s", tp, error_type.__name__) - self._client.cluster.request_update() + self._manager.cluster.request_update() elif error_type is Errors.OffsetOutOfRangeError: position = self._subscriptions.assignment[tp].position if position is None or position.offset != fetch_offset: @@ -858,7 +902,7 @@ def _parse_fetched_data(self, completed_fetch): elif getattr(error_type, 'retriable', False): log.debug("Retriable error fetching partition %s: %s", tp, error_type()) if getattr(error_type, 'invalid_metadata', False): - self._client.cluster.request_update() + self._manager.cluster.request_update() else: raise error_type('Unexpected error while fetching data') @@ -887,7 +931,8 @@ def close(self): class PartitionRecords: def __init__(self, fetch_offset, tp, records, key_deserializer=None, value_deserializer=None, - check_crcs=True, isolation_level=READ_UNCOMMITTED, + check_crcs=True, + isolation_level=IsolationLevel.READ_UNCOMMITTED, aborted_transactions=None, # AbortedTransaction data from FetchResponse metric_aggregator=None, on_drain=lambda x: None): self.fetch_offset = fetch_offset @@ -968,7 +1013,7 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer): # base_offset, last_offset_delta, aborted transactions, and control batches if batch.magic == 2: self.leader_epoch = batch.leader_epoch - if self.isolation_level == READ_COMMITTED and batch.has_producer_id(): + if self.isolation_level == IsolationLevel.READ_COMMITTED and batch.has_producer_id(): # remove from the aborted transaction queue all aborted transactions which have begun # before the current batch's last offset and add the associated producerIds to the # aborted producer set @@ -1182,9 +1227,9 @@ def handle_error(self, _exception): self.next_metadata = self.next_metadata.next_close_existing() def _response_partitions(self, response): - return {TopicPartition(topic, partition_data[0]) - for topic, partitions in response.responses - for partition_data in partitions} + return {TopicPartition(topic_data.topic, partition_data.partition_index) + for topic_data in response.responses + for partition_data in topic_data.partitions} class FetchMetadata: @@ -1249,21 +1294,27 @@ def epoch(self): @property def to_send(self): - # Return as list of [(topic, [(partition, ...), ...]), ...] + # Return as list of _FetchTopic data objects # so it can be passed directly to encoder partition_data = collections.defaultdict(list) for tp, partition_info in self._to_send.items(): partition_data[tp.topic].append(partition_info) - return list(partition_data.items()) + return [ + _FetchTopic(topic=topic, partitions=partitions) + for topic, partitions in partition_data.items() + ] @property def to_forget(self): - # Return as list of [(topic, (partiiton, ...)), ...] - # so it an be passed directly to encoder + # Return as list of _ForgottenTopic data objects + # so it can be passed directly to encoder partition_data = collections.defaultdict(list) for tp in self._to_forget: partition_data[tp.topic].append(tp.partition) - return list(partition_data.items()) + return [ + _ForgottenTopic(topic=topic, partitions=partitions) + for topic, partitions in partition_data.items() + ] class FetchMetrics: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 5972d2bdf..89ca93fa5 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1027,7 +1027,7 @@ def metrics(self, raw=False): metrics[k.group][k.name] = v.value() return metrics - def offsets_for_times(self, timestamps): + def offsets_for_times(self, timestamps, timeout_ms=None): """Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the @@ -1041,17 +1041,18 @@ def offsets_for_times(self, timestamps): partition. ``None`` will also be returned for the partition if there are no messages in it. - Note: - This method may block indefinitely if the partition does not exist. + Note: This method may block indefinitely if the partition does not exist + and no timeout_ms provided. Arguments: timestamps (dict): ``{TopicPartition: int}`` mapping from partition to the timestamp to look up. Unit should be milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)) + timeout_ms (int, optional): Milliseconds to block fetching offsets. Returns: ``{TopicPartition: OffsetAndTimestamp}``: mapping from partition - to the timestamp and offset of the first message with timestamp + to the offset and timestamp of the first message with timestamp greater than or equal to the target timestamp. Raises: @@ -1060,31 +1061,28 @@ def offsets_for_times(self, timestamps): up the offsets by timestamp. KafkaTimeoutError: If fetch failed in request_timeout_ms """ - if self.config['api_version'] <= (0, 10, 0): - raise UnsupportedVersionError( - "offsets_for_times API not supported for cluster version {}" - .format(self.config['api_version'])) + timeout_ms = self.config['request_timeout_ms'] if timeout_ms is None else timeout_ms for tp, ts in timestamps.items(): timestamps[tp] = int(ts) if ts < 0: raise ValueError( "The target time for partition {} is {}. The target time " "cannot be negative.".format(tp, ts)) - return self._fetcher.offsets_by_times( - timestamps, self.config['request_timeout_ms']) + return self._fetcher.offsets_by_times(timestamps, timeout_ms) - def beginning_offsets(self, partitions): + def beginning_offsets(self, partitions, timeout_ms=None): """Get the first offset for the given partitions. This method does not change the current consumer position of the partitions. - Note: - This method may block indefinitely if the partition does not exist. + Note: This method may block indefinitely if the partition does not exist + and no timeout_ms provided. Arguments: partitions (list): List of TopicPartition instances to fetch offsets for. + timeout_ms (int, optional): Milliseconds to block fetching offsets. Returns: ``{TopicPartition: int}``: The earliest available offsets for the @@ -1093,13 +1091,13 @@ def beginning_offsets(self, partitions): Raises: UnsupportedVersionError: If the broker does not support looking up the offsets by timestamp. - KafkaTimeoutError: If fetch failed in request_timeout_ms. + KafkaTimeoutError: If fetch failed in timeout_ms. """ - offsets = self._fetcher.beginning_offsets( - partitions, self.config['request_timeout_ms']) + timeout_ms = self.config['request_timeout_ms'] if timeout_ms is None else timeout_ms + offsets = self._fetcher.beginning_offsets(partitions, timeout_ms) return offsets - def end_offsets(self, partitions): + def end_offsets(self, partitions, timeout_ms=None): """Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1. @@ -1107,12 +1105,13 @@ def end_offsets(self, partitions): This method does not change the current consumer position of the partitions. - Note: - This method may block indefinitely if the partition does not exist. + Note: This method may block indefinitely if the partition does not exist + and no timeout_ms provided. Arguments: partitions (list): List of TopicPartition instances to fetch offsets for. + timeout_ms (int, optional): Milliseconds to block fetching offsets. Returns: ``{TopicPartition: int}``: The end offsets for the given partitions. @@ -1120,10 +1119,10 @@ def end_offsets(self, partitions): Raises: UnsupportedVersionError: If the broker does not support looking up the offsets by timestamp. - KafkaTimeoutError: If fetch failed in request_timeout_ms + KafkaTimeoutError: If fetch failed in timeout_ms """ - offsets = self._fetcher.end_offsets( - partitions, self.config['request_timeout_ms']) + timeout_ms = self.config['request_timeout_ms'] if timeout_ms is None else timeout_ms + offsets = self._fetcher.end_offsets(partitions, timeout_ms) return offsets def _update_fetch_positions(self, timeout_ms=None): @@ -1131,8 +1130,8 @@ def _update_fetch_positions(self, timeout_ms=None): or reset it using the offset reset policy the user has configured. Arguments: - partitions (List[TopicPartition]): The partitions that need - updating fetch positions. + timeout_ms (int, optional): Milliseconds to block refreshing committed + offsets. Returns True if fetch positions updated, False if timeout or async reset is pending diff --git a/kafka/net/compat.py b/kafka/net/compat.py index 8a001eb3d..1d5b22fe7 100644 --- a/kafka/net/compat.py +++ b/kafka/net/compat.py @@ -94,15 +94,15 @@ def bootstrap_connected(self): return bootstrap_future is not None and not bootstrap_future.is_done def get_broker_version(self, timeout_ms=None): - if self._manager.broker_version_data is None: + if self._manager.broker_version is None: self._manager.bootstrap(timeout_ms) - return self._manager.broker_version_data.broker_version + return self._manager.broker_version def check_version(self, node_id=None, timeout_ms=10000): if not self._manager.bootstrapped: self._manager.bootstrap(timeout_ms) if node_id is None: - return self._manager.broker_version_data.broker_version + return self._manager.broker_version async def _check_version(broker_id): conn = await self._manager.get_connection(broker_id) return conn.broker_version diff --git a/kafka/util.py b/kafka/util.py index 5ba42be9a..bb7516a2d 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -145,7 +145,7 @@ def build_from(cls, val): try: return cls[str(val).strip().upper().replace('-', '_')] # pylint: disable=E1136 except KeyError: - raise ValueError(f'Unrecognized {cls.__name__}: {val}') + raise ValueError(f'Unrecognized {cls.__name__}: {val}') from None @classmethod def value_for(cls, val): @@ -156,4 +156,4 @@ def value_for(cls, val): try: return cls[str(val).upper().replace('-', '_')].value # pylint: disable=E1136 except KeyError: - raise ValueError(f'Unrecognized {cls.__name__}: {val}') + raise ValueError(f'Unrecognized {cls.__name__}: {val}') from None diff --git a/test/consumer/test_fetcher.py b/test/consumer/test_fetcher.py index f5c1aa450..05d05db50 100644 --- a/test/consumer/test_fetcher.py +++ b/test/consumer/test_fetcher.py @@ -6,6 +6,7 @@ from collections import OrderedDict import itertools import time +from unittest.mock import MagicMock from kafka.consumer.fetcher import ( CompletedFetch, ConsumerRecord, Fetcher @@ -27,6 +28,12 @@ from kafka.structs import OffsetAndMetadata, OffsetAndTimestamp, TopicPartition +_ResponseTopic = FetchResponse.FetchableTopicResponse +_ResponsePartition = _ResponseTopic.PartitionData +_ListResponseTopic = ListOffsetsResponse.ListOffsetsTopicResponse +_ListResponsePartition = _ListResponseTopic.ListOffsetsPartitionResponse + + @pytest.fixture def subscription_state(): return SubscriptionState() @@ -38,9 +45,13 @@ def topic(): @pytest.fixture -def fetcher(client, metrics, subscription_state, topic): +def assignment(topic): + return [TopicPartition(topic, i) for i in range(3)] + + +@pytest.fixture +def fetcher(client, metrics, subscription_state, topic, assignment): subscription_state.subscribe(topics=[topic]) - assignment = [TopicPartition(topic, i) for i in range(3)] subscription_state.assign_from_subscribed(assignment) for tp in assignment: subscription_state.seek(tp, 0) @@ -57,6 +68,21 @@ def _build_record_batch(msgs, compression=0, offset=0, magic=2): return builder.buffer() +def _build_completed_fetch(tp, msgs, error=None, offset=0): + if error is not None: + partition_data = _ResponsePartition( + error_code=error.errno, + high_watermark=-1, + records=None) + else: + partition_data = _ResponsePartition( + error_code=0, + high_watermark=100, + records=_build_record_batch(msgs, offset=offset)) + return CompletedFetch( + tp, offset, 0, partition_data, MagicMock()) + + def test_send_fetches(fetcher, topic, mocker): fetch_requests = [ FetchRequest[0]( @@ -87,27 +113,28 @@ def build_fetch_offsets(request): return_value=(dict(enumerate(map(lambda r: (r, build_fetch_offsets(r)), fetch_requests))))) mocker.patch.object(fetcher._client, 'ready', return_value=True) - mocker.patch.object(fetcher._client, 'send') + mocker.patch.object(fetcher._manager, 'send') ret = fetcher.send_fetches() for node, request in enumerate(fetch_requests): - fetcher._client.send.assert_any_call(node, request, wakeup=False) + fetcher._manager.send.assert_any_call(request, node_id=node) assert len(ret) == len(fetch_requests) -@pytest.mark.parametrize(("api_version", "fetch_version"), [ - ((0, 10, 1), 3), - ((0, 10, 0), 2), - ((0, 9), 1), - ((0, 8, 2), 0) -]) -def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): - fetcher._client._manager.broker_version_data = BrokerVersionData(api_version) - mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) - mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) +def test_create_fetch_requests(fetcher, mocker, assignment): + mocker.patch.object(fetcher._manager.cluster, "leader_for_partition", return_value=0) + mocker.patch.object(fetcher._manager.cluster, "leader_epoch_for_partition", return_value=0) mocker.patch.object(fetcher._client, "ready", return_value=True) by_node = fetcher._create_fetch_requests() - requests_and_offsets = by_node.values() - assert set([r.API_VERSION for (r, _offsets) in requests_and_offsets]) == set([fetch_version]) + assert len(by_node) == 1 + assert 0 in by_node + request, offsets = by_node[0] + assert isinstance(request, FetchRequest) + requested = set([ + TopicPartition(topic.topic, partition.partition) + for topic in request.topics + for partition in topic.partitions + ]) + assert requested == set(assignment) def test_reset_offsets_if_needed(fetcher, topic, mocker): @@ -299,32 +326,32 @@ def wait_for_send_futures(n): def test__handle_list_offsets_response_v1(fetcher, mocker): # Broker returns UnsupportedForMessageFormatError, will omit partition res = ListOffsetsResponse[1]([ - ("topic", [(0, 43, -1, -1)]), - ("topic", [(1, 0, 1000, 9999)]) + _ListResponseTopic("topic", [_ListResponsePartition(0, 43, -1, -1, version=1)], version=1), + _ListResponseTopic("topic", [_ListResponsePartition(1, 0, 1000, 9999, version=1)], version=1) ]) assert fetcher._handle_list_offsets_response(res) == ( {TopicPartition("topic", 1): OffsetAndTimestamp(9999, 1000, -1)}, set()) # Broker returns NotLeaderForPartitionError res = ListOffsetsResponse[1]([ - ("topic", [(0, 6, -1, -1)]), + _ListResponseTopic("topic", [_ListResponsePartition(0, 6, -1, -1, version=1)], version=1), ]) assert fetcher._handle_list_offsets_response(res) == ( {}, set([TopicPartition("topic", 0)])) # Broker returns UnknownTopicOrPartitionError res = ListOffsetsResponse[1]([ - ("topic", [(0, 3, -1, -1)]), + _ListResponseTopic("topic", [_ListResponsePartition(0, 3, -1, -1, version=1)], version=1), ]) assert fetcher._handle_list_offsets_response(res) == ( {}, set([TopicPartition("topic", 0)])) # Broker returns many errors and 1 result res = ListOffsetsResponse[1]([ - ("topic", [(0, 43, -1, -1)]), # not retriable - ("topic", [(1, 6, -1, -1)]), # retriable - ("topic", [(2, 3, -1, -1)]), # retriable - ("topic", [(3, 0, 1000, 9999)]) + _ListResponseTopic("topic", [_ListResponsePartition(0, 43, -1, -1, version=1)], version=1), # not retriable + _ListResponseTopic("topic", [_ListResponsePartition(1, 6, -1, -1, version=1)], version=1), # retriable + _ListResponseTopic("topic", [_ListResponsePartition(2, 3, -1, -1, version=1)], version=1), # retriable + _ListResponseTopic("topic", [_ListResponsePartition(3, 0, 1000, 9999, version=1)], version=1) ]) assert fetcher._handle_list_offsets_response(res) == ( {TopicPartition("topic", 3): OffsetAndTimestamp(9999, 1000, -1)}, @@ -335,7 +362,7 @@ def test__handle_list_offsets_response_v2_v3(fetcher, mocker): # including a throttle_time shouldnt cause issues res = ListOffsetsResponse[2]( 123, # throttle_time_ms - [("topic", [(0, 0, 1000, 9999)]) + [_ListResponseTopic("topic", [_ListResponsePartition(0, 0, 1000, 9999, version=2)], version=2) ]) assert fetcher._handle_list_offsets_response(res) == ( {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)}, set()) @@ -343,7 +370,7 @@ def test__handle_list_offsets_response_v2_v3(fetcher, mocker): # v3 response is the same format res = ListOffsetsResponse[3]( 123, # throttle_time_ms - [("topic", [(0, 0, 1000, 9999)]) + [_ListResponseTopic("topic", [_ListResponsePartition(0, 0, 1000, 9999, version=3)], version=3) ]) assert fetcher._handle_list_offsets_response(res) == ( {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)}, set()) @@ -353,7 +380,7 @@ def test__handle_list_offsets_response_v4_v5(fetcher, mocker): # includes leader_epoch res = ListOffsetsResponse[4]( 123, # throttle_time_ms - [("topic", [(0, 0, 1000, 9999, 1234)]) + [_ListResponseTopic("topic", [_ListResponsePartition(0, 0, 1000, 9999, 1234, version=4)], version=4) ]) assert fetcher._handle_list_offsets_response(res) == ( {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)}, set()) @@ -361,7 +388,7 @@ def test__handle_list_offsets_response_v4_v5(fetcher, mocker): # v5 response is the same format res = ListOffsetsResponse[5]( 123, # throttle_time_ms - [("topic", [(0, 0, 1000, 9999, 1234)]) + [_ListResponseTopic("topic", [_ListResponsePartition(0, 0, 1000, 9999, 1234, version=5)], version=5) ]) assert fetcher._handle_list_offsets_response(res) == ( {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)}, set()) @@ -374,10 +401,7 @@ def test_fetched_records(fetcher, topic, mocker): msgs = [] for i in range(10): msgs.append((None, b"foo", None)) - completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, _build_record_batch(msgs)], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, msgs) fetcher._completed_fetches.append(completed_fetch) records, partial = fetcher.fetched_records() assert tp in records @@ -390,42 +414,50 @@ def test_fetched_records(fetcher, topic, mocker): ( {TopicPartition('foo', 0): 0}, FetchResponse[0]( - [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + [_ResponseTopic("foo", [ + _ResponsePartition(0, 0, 1000, _build_record_batch([(None, b'xxx', None)]), version=0)], version=0)]), 1, ), ( {TopicPartition('foo', 0): 0, TopicPartition('foo', 1): 0}, FetchResponse[1]( 0, - [("foo", [ - (0, 0, 1000, [(0, b'xxx'),]), - (1, 0, 1000, [(0, b'xxx'),]), - ]),]), + [_ResponseTopic("foo", [ + _ResponsePartition(0, 0, 1000, _build_record_batch([(None, b'xxx', None)]), version=1), + _ResponsePartition(1, 0, 1000, _build_record_batch([(None, b'xxx', None)]), version=1)], version=1)]), 2, ), ( {TopicPartition('foo', 0): 0}, FetchResponse[2]( - 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + 0, + [_ResponseTopic("foo", [ + _ResponsePartition(0, 0, 1000, _build_record_batch([(None, b'xxx', None)]), version=2)], version=2)]), 1, ), ( {TopicPartition('foo', 0): 0}, FetchResponse[3]( - 0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]), + 0, + [_ResponseTopic("foo", [ + _ResponsePartition(0, 0, 1000, _build_record_batch([(None, b'xxx', None)]), version=3)], version=3)]), 1, ), ( {TopicPartition('foo', 0): 0}, FetchResponse[4]( - 0, [("foo", [(0, 0, 1000, 0, [], [(0, b'xxx'),])]),]), + 0, + [_ResponseTopic("foo", [ + _ResponsePartition(0, 0, 1000, 0, [], _build_record_batch([(None, b'xxx', None)]), version=4)], version=4)]), 1, ), ( # This may only be used in broker-broker api calls {TopicPartition('foo', 0): 0}, FetchResponse[5]( - 0, [("foo", [(0, 0, 1000, 0, 0, [], [(0, b'xxx'),])]),]), + 0, + [_ResponseTopic("foo", [ + _ResponsePartition(0, 0, 1000, 0, 0, [], _build_record_batch([(None, b'xxx', None)]), version=5)], version=5)]), 1, ), ]) @@ -495,10 +527,7 @@ def test__parse_fetched_data(fetcher, topic, mocker): msgs = [] for i in range(10): msgs.append((None, b"foo", None)) - completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, _build_record_batch(msgs)], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, msgs) partition_record = fetcher._parse_fetched_data(completed_fetch) assert isinstance(partition_record, fetcher.PartitionRecords) assert partition_record @@ -511,10 +540,7 @@ def test__parse_fetched_data__paused(fetcher, topic, mocker): msgs = [] for i in range(10): msgs.append((None, b"foo", None)) - completed_fetch = CompletedFetch( - tp, 0, 0, [0, 100, _build_record_batch(msgs)], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, msgs) fetcher._subscriptions.pause(tp) partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None @@ -526,10 +552,8 @@ def test__parse_fetched_data__stale_offset(fetcher, topic, mocker): msgs = [] for i in range(10): msgs.append((None, b"foo", None)) - completed_fetch = CompletedFetch( - tp, 10, 0, [0, 100, _build_record_batch(msgs)], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, msgs) + completed_fetch = completed_fetch._replace(fetched_offset=10) partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None @@ -537,10 +561,7 @@ def test__parse_fetched_data__stale_offset(fetcher, topic, mocker): def test__parse_fetched_data__not_leader(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) - completed_fetch = CompletedFetch( - tp, 0, 0, [NotLeaderForPartitionError.errno, -1, None], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, [], error=NotLeaderForPartitionError) mocker.patch.object(fetcher._client.cluster, 'request_update') partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None @@ -550,10 +571,7 @@ def test__parse_fetched_data__not_leader(fetcher, topic, mocker): def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) - completed_fetch = CompletedFetch( - tp, 0, 0, [UnknownTopicOrPartitionError.errno, -1, None], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, [], error=UnknownTopicOrPartitionError) mocker.patch.object(fetcher._client.cluster, 'request_update') partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None @@ -563,10 +581,7 @@ def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker): def test__parse_fetched_data__out_of_range(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) - completed_fetch = CompletedFetch( - tp, 0, 0, [OffsetOutOfRangeError.errno, -1, None], - mocker.MagicMock() - ) + completed_fetch = _build_completed_fetch(tp, [], error=OffsetOutOfRangeError) partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None assert fetcher._subscriptions.assignment[tp].awaiting_reset is True @@ -707,7 +722,7 @@ def test_reset_offsets_paused_with_valid(subscription_state, client, mocker): assert subscription_state.position(tp) == OffsetAndMetadata(10, '', -1) -def test_fetch_position_after_exception(client, mocker): +def test_fetch_position_after_exception(client): subscription_state = SubscriptionState(offset_reset_strategy='NONE') fetcher = Fetcher(client, subscription_state) @@ -721,12 +736,11 @@ def test_fetch_position_after_exception(client, mocker): assert len(fetcher._fetchable_partitions()) == 2 - empty_records = _build_record_batch([], offset=1) - three_records = _build_record_batch([(None, b'msg', None) for _ in range(3)], offset=1) + msgs = [(None, b'msg', None) for _ in range(3)] fetcher._completed_fetches.append( - CompletedFetch(tp1, 1, 0, [0, 100, three_records], mocker.MagicMock())) + _build_completed_fetch(tp1, msgs, offset=1)) fetcher._completed_fetches.append( - CompletedFetch(tp0, 1, 0, [1, 100, empty_records], mocker.MagicMock())) + _build_completed_fetch(tp0, [], error=OffsetOutOfRangeError, offset=1)) records, partial = fetcher.fetched_records() assert len(records) == 1 @@ -746,7 +760,7 @@ def test_fetch_position_after_exception(client, mocker): assert exceptions[0].args == ({tp0: 1},) -def test_seek_before_exception(client, mocker): +def test_seek_before_exception(client): subscription_state = SubscriptionState(offset_reset_strategy='NONE') fetcher = Fetcher(client, subscription_state, max_poll_records=2) @@ -757,9 +771,9 @@ def test_seek_before_exception(client, mocker): assert len(fetcher._fetchable_partitions()) == 1 - three_records = _build_record_batch([(None, b'msg', None) for _ in range(3)], offset=1) + msgs = [(None, b'msg', None) for _ in range(3)] fetcher._completed_fetches.append( - CompletedFetch(tp0, 1, 0, [0, 100, three_records], mocker.MagicMock())) + _build_completed_fetch(tp0, msgs, offset=1)) records, partial = fetcher.fetched_records() assert len(records) == 1 @@ -774,7 +788,7 @@ def test_seek_before_exception(client, mocker): empty_records = _build_record_batch([], offset=1) fetcher._completed_fetches.append( - CompletedFetch(tp1, 1, 0, [1, 100, empty_records], mocker.MagicMock())) + _build_completed_fetch(tp1, [], error=OffsetOutOfRangeError, offset=1)) records, partial = fetcher.fetched_records() assert len(records) == 1