diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 586340c56..f00482554 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -120,6 +120,7 @@ def __init__(self, client, subscriptions, **configs): raise Errors.KafkaConfigurationError('Unrecognized isolation_level') self._client = client + self._manager = client._manager self._subscriptions = subscriptions self._completed_fetches = collections.deque() # Unparsed responses self._next_partition_records = None # Holds a single PartitionRecords until fully consumed @@ -197,7 +198,7 @@ def reset_offsets_if_needed(self): if ts: offset_resets[tp] = ts - self._reset_offsets_async(offset_resets) + self._manager.call_soon(self._reset_offsets_async, offset_resets) return True def offsets_by_times(self, timestamps, timeout_ms=None): @@ -221,7 +222,7 @@ def offsets_by_times(self, timestamps, timeout_ms=None): Raises: KafkaTimeoutError if timeout_ms provided """ - offsets = self._client._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) + offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) for tp in timestamps: if tp not in offsets: offsets[tp] = None @@ -238,25 +239,25 @@ async def _fetch_offsets_by_times_async(self, timestamps, timeout_ms=None): if not timestamps: return {} - future = self._send_list_offsets_requests(timestamps) + future = self._manager.call_soon(self._send_list_offsets_requests, timestamps) try: - offsets, retry = await self._client._manager.wait_for(future, timer.timeout_ms) + offsets, retry = await self._manager.wait_for(future, timer.timeout_ms) except Errors.KafkaTimeoutError: break except Exception as exc: if not getattr(exc, 'retriable', False): raise - if getattr(exc, 'invalid_metadata', False) or self._client._manager.cluster.need_update: - refresh_future = self._client.cluster.request_update() + if getattr(exc, 'invalid_metadata', False) or self._manager.cluster.need_update: + refresh_future = self._manager.cluster.request_update() try: - await self._client._manager.wait_for(refresh_future, timer.timeout_ms) + await self._manager.wait_for(refresh_future, timer.timeout_ms) except Errors.KafkaTimeoutError: break else: delay = self.config['retry_backoff_ms'] / 1000 if timer.timeout_ms is not None: delay = min(delay, timer.timeout_ms / 1000) - await self._client._manager._net.sleep(delay) + await self._manager._net.sleep(delay) else: fetched_offsets.update(offsets) if not retry: @@ -278,7 +279,7 @@ def end_offsets(self, partitions, timeout_ms): def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): timestamps = dict([(tp, timestamp) for tp in partitions]) - offsets = self._client._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) + offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) for tp in timestamps: offsets[tp] = offsets[tp].offset return offsets @@ -410,77 +411,67 @@ def _reset_offset_if_needed(self, partition, timestamp, offset): log.info("Resetting offset for partition %s to offset %s.", partition, offset) self._subscriptions.seek(partition, offset) - def _reset_offsets_async(self, timestamps): + async def _reset_offsets_async(self, timestamps): timestamps_by_node = self._group_list_offset_requests(timestamps) - for node_id, timestamps_and_epochs in timestamps_by_node.items(): - if not self._client.ready(node_id): - continue partitions = set(timestamps_and_epochs.keys()) expire_at = time.monotonic() + self.config['request_timeout_ms'] / 1000 self._subscriptions.set_reset_pending(partitions, expire_at) + self._manager.call_soon(self._reset_offsets_for_node, node_id, timestamps_and_epochs, partitions) - def on_success(timestamps_and_epochs, result): - fetched_offsets, partitions_to_retry = result - if partitions_to_retry: - self._subscriptions.reset_failed(partitions_to_retry, time.monotonic() + self.config['retry_backoff_ms'] / 1000) - self._client.cluster.request_update() - - for partition, offset in fetched_offsets.items(): - ts, _epoch = timestamps_and_epochs[partition] - self._reset_offset_if_needed(partition, ts, offset.offset) - - def on_failure(partitions, error): - self._subscriptions.reset_failed(partitions, time.monotonic() + self.config['retry_backoff_ms'] / 1000) - self._client.cluster.request_update() - - if not getattr(error, 'retriable', False): - if not self._cached_list_offsets_exception: - self._cached_list_offsets_exception = error - else: - log.error("Discarding error in ListOffsetResponse because another error is pending: %s", error) + async def _reset_offsets_for_node(self, node_id, timestamps_and_epochs, partitions): + try: + fetched_offsets, partitions_to_retry = await self._send_list_offsets_request(node_id, timestamps_and_epochs) + except Exception as error: + self._subscriptions.reset_failed(partitions, time.monotonic() + self.config['retry_backoff_ms'] / 1000) + self._manager.cluster.request_update() + if not getattr(error, 'retriable', False): + if not self._cached_list_offsets_exception: + self._cached_list_offsets_exception = error + else: + log.error("Discarding error in ListOffsetResponse because another error is pending: %s", error) + return - future = self._send_list_offsets_request(node_id, timestamps_and_epochs) - future.add_callback(on_success, timestamps_and_epochs) - future.add_errback(on_failure, partitions) + if partitions_to_retry: + self._subscriptions.reset_failed(partitions_to_retry, time.monotonic() + self.config['retry_backoff_ms'] / 1000) + self._manager.cluster.request_update() + for partition, offset in fetched_offsets.items(): + ts, _epoch = timestamps_and_epochs[partition] + self._reset_offset_if_needed(partition, ts, offset.offset) - def _send_list_offsets_requests(self, timestamps): + async def _send_list_offsets_requests(self, timestamps): """Fetch offsets for each partition in timestamps dict. This may send request to multiple nodes, based on who is Leader for partition. + Per-node requests are dispatched concurrently; if any fails, the first + exception encountered propagates and the remaining results are dropped. + Arguments: timestamps (dict): {TopicPartition: int} mapping of fetching timestamps. Returns: - Future: resolves to a mapping of retrieved offsets + (fetched_offsets, partitions_to_retry) + + Raises: + StaleMetadata: if no node has known leader for any partition. """ timestamps_by_node = self._group_list_offset_requests(timestamps) if not timestamps_by_node: - return Future().failure(Errors.StaleMetadata()) + raise Errors.StaleMetadata() + + futures = [ + self._manager.call_soon(self._send_list_offsets_request, node_id, ts) + for node_id, ts in timestamps_by_node.items() + ] - # Aggregate results until we have all responses - list_offsets_future = Future() fetched_offsets = dict() partitions_to_retry = set() - remaining_responses = [len(timestamps_by_node)] # list for mutable / 2.7 hack - - def on_success(remaining_responses, value): - remaining_responses[0] -= 1 # noqa: F823 - fetched_offsets.update(value[0]) - partitions_to_retry.update(value[1]) - if not remaining_responses[0] and not list_offsets_future.is_done: - list_offsets_future.success((fetched_offsets, partitions_to_retry)) - - def on_fail(err): - if not list_offsets_future.is_done: - list_offsets_future.failure(err) - - for node_id, timestamps in timestamps_by_node.items(): - _f = self._send_list_offsets_request(node_id, timestamps) - _f.add_callback(on_success, remaining_responses) - _f.add_errback(on_fail) - return list_offsets_future + for f in futures: + offs, retry = await f + fetched_offsets.update(offs) + partitions_to_retry.update(retry) + return fetched_offsets, partitions_to_retry def _group_list_offset_requests(self, timestamps): timestamps_by_node = collections.defaultdict(dict) @@ -499,7 +490,7 @@ def _group_list_offset_requests(self, timestamps): timestamps_by_node[node_id][partition] = (timestamp, leader_epoch) return dict(timestamps_by_node) - def _send_list_offsets_request(self, node_id, timestamps_and_epochs): + async def _send_list_offsets_request(self, node_id, timestamps_and_epochs): version = self._client.api_version(ListOffsetsRequest, max_version=5) if self.config['isolation_level'] == 'read_committed' and version < 2: raise Errors.UnsupportedVersionError('read_committed isolation level requires ListOffsetsRequest >= v2') @@ -523,26 +514,18 @@ def _send_list_offsets_request(self, node_id, timestamps_and_epochs): -1, list(by_topic.items())) - # Client returns a future that only fails on network issues - # so create a separate future and attach a callback to update it - # based on response error codes - future = Future() - log.debug("Sending ListOffsetRequest %s to broker %s", request, node_id) - _f = self._client.send(node_id, request) - _f.add_callback(self._handle_list_offsets_response, future) - _f.add_errback(lambda e: future.failure(e)) - return future + response = await self._manager.send(request, node_id=node_id) + return self._handle_list_offsets_response(response) - def _handle_list_offsets_response(self, future, response): - """Callback for the response of the ListOffsets api call + def _handle_list_offsets_response(self, response): + """Parse a ListOffsets response. - Arguments: - future (Future): the future to update based on response - response (ListOffsetsResponse): response from the server + Returns: + (fetched_offsets, partitions_to_retry): dict[TopicPartition, OffsetAndTimestamp], set[TopicPartition] Raises: - AssertionError: if response does not match partition + TopicAuthorizationFailedError: if any topic returned an auth error """ fetched_offsets = dict() partitions_to_retry = set() @@ -599,9 +582,8 @@ def _handle_list_offsets_response(self, future, response): " %s", partition, error_type.__name__) partitions_to_retry.add(partition) if unauthorized_topics: - future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) - else: - future.success((fetched_offsets, partitions_to_retry)) + raise Errors.TopicAuthorizationFailedError(unauthorized_topics) + return fetched_offsets, partitions_to_retry def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() diff --git a/kafka/net/manager.py b/kafka/net/manager.py index bc044e897..2ef8c0f33 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -412,6 +412,8 @@ async def _invoke(self, coro, args): result = await coro else: result = coro(*args) + if inspect.iscoroutine(result) or hasattr(result, '__await__'): + result = await result while isinstance(result, Future): result = await result return result diff --git a/test/consumer/test_fetcher.py b/test/consumer/test_fetcher.py index 04b87521d..f5c1aa450 100644 --- a/test/consumer/test_fetcher.py +++ b/test/consumer/test_fetcher.py @@ -111,27 +111,27 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): def test_reset_offsets_if_needed(fetcher, topic, mocker): - mocker.patch.object(fetcher, '_reset_offsets_async') + call_soon = mocker.patch.object(fetcher._client._manager, 'call_soon') partition = TopicPartition(topic, 0) # fetchable partition (has offset, not paused) fetcher.reset_offsets_if_needed() - assert fetcher._reset_offsets_async.call_count == 0 + assert call_soon.call_count == 0 # partition needs reset, no valid position fetcher._subscriptions.request_offset_reset(partition) fetcher.reset_offsets_if_needed() - fetcher._reset_offsets_async.assert_called_with({partition: OffsetResetStrategy.EARLIEST}) + call_soon.assert_called_with(fetcher._reset_offsets_async, {partition: OffsetResetStrategy.EARLIEST}) assert fetcher._subscriptions.assignment[partition].awaiting_reset is True fetcher.reset_offsets_if_needed() - fetcher._reset_offsets_async.assert_called_with({partition: OffsetResetStrategy.EARLIEST}) + call_soon.assert_called_with(fetcher._reset_offsets_async, {partition: OffsetResetStrategy.EARLIEST}) # partition needs reset, has valid position - fetcher._reset_offsets_async.reset_mock() + call_soon.reset_mock() fetcher._subscriptions.request_offset_reset(partition) fetcher._subscriptions.seek(partition, 123) fetcher.reset_offsets_if_needed() - assert fetcher._reset_offsets_async.call_count == 0 + assert call_soon.call_count == 0 def test__reset_offsets_async(fetcher, mocker): @@ -144,15 +144,26 @@ def test__reset_offsets_async(fetcher, mocker): leaders = {tp0: 0, tp1: 1} mocker.patch.object(fetcher._client.cluster, "leader_for_partition", side_effect=lambda tp: leaders[tp]) mocker.patch.object(fetcher._client, 'ready', return_value=True) - future1 = Future() - future2 = Future() - mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=[future1, future2]) - fetcher._reset_offsets_async({ + + results = { + 0: ({tp0: OffsetAndTimestamp(1001, None, -1)}, set()), + 1: ({tp1: OffsetAndTimestamp(1002, None, -1)}, set()), + } + pending = [] + async def fake_send(node_id, timestamps_and_epochs): + pending.append(node_id) + return results[node_id] + mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=fake_send) + + manager = fetcher._client._manager + manager.run(fetcher._reset_offsets_async, { tp0: OffsetResetStrategy.EARLIEST, tp1: OffsetResetStrategy.EARLIEST, }) - future1.success(({tp0: OffsetAndTimestamp(1001, None, -1)}, set())), - future2.success(({tp1: OffsetAndTimestamp(1002, None, -1)}, set())), + # _reset_offsets_async is fire-and-forget; drain the spawned per-node tasks + while len(pending) < 2 or fetcher._subscriptions.assignment[tp0].awaiting_reset or fetcher._subscriptions.assignment[tp1].awaiting_reset: + manager.poll(timeout_ms=10) + assert not fetcher._subscriptions.assignment[tp0].awaiting_reset assert not fetcher._subscriptions.assignment[tp1].awaiting_reset assert fetcher._subscriptions.assignment[tp0].position.offset == 1001 @@ -161,14 +172,13 @@ def test__reset_offsets_async(fetcher, mocker): def test__send_list_offsets_requests(fetcher, mocker): tp = TopicPartition("topic_send_list_offsets", 1) - mocked_send = mocker.patch.object(fetcher, "_send_list_offsets_request") - send_futures = [] - def send_side_effect(*args, **kw): + pending = [] + async def fake_send(node_id, timestamps): f = Future() - send_futures.append(f) - return f - mocked_send.side_effect = send_side_effect + pending.append(f) + return await f + mocked_send = mocker.patch.object(fetcher, "_send_list_offsets_request", side_effect=fake_send) mocked_leader = mocker.patch.object( fetcher._client.cluster, "leader_for_partition") @@ -178,33 +188,36 @@ def send_side_effect(*args, **kw): [None, -1], itertools.cycle([0])) mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) + manager = fetcher._client._manager + # Leader == None - fut = fetcher._send_list_offsets_requests({tp: 0}) - assert fut.failed() - assert isinstance(fut.exception, StaleMetadata) + with pytest.raises(StaleMetadata): + manager.run(fetcher._send_list_offsets_requests, {tp: 0}) assert not mocked_send.called # Leader == -1 - fut = fetcher._send_list_offsets_requests({tp: 0}) - assert fut.failed() - assert isinstance(fut.exception, StaleMetadata) + with pytest.raises(StaleMetadata): + manager.run(fetcher._send_list_offsets_requests, {tp: 0}) assert not mocked_send.called # Leader == 0, send failed - fut = fetcher._send_list_offsets_requests({tp: 0}) + fut = manager.call_soon(fetcher._send_list_offsets_requests, {tp: 0}) + while not pending: + manager.poll(timeout_ms=10) assert not fut.is_done assert mocked_send.called - # Check that we bound the futures correctly to chain failure - send_futures.pop().failure(NotLeaderForPartitionError(tp)) + pending.pop().failure(NotLeaderForPartitionError(tp)) + manager.poll(future=fut) assert fut.failed() assert isinstance(fut.exception, NotLeaderForPartitionError) # Leader == 0, send success - fut = fetcher._send_list_offsets_requests({tp: 0}) + fut = manager.call_soon(fetcher._send_list_offsets_requests, {tp: 0}) + while not pending: + manager.poll(timeout_ms=10) assert not fut.is_done - assert mocked_send.called - # Check that we bound the futures correctly to chain success - send_futures.pop().success(({tp: (10, 10000)}, set())) + pending.pop().success(({tp: (10, 10000)}, set())) + manager.poll(future=fut) assert fut.succeeded() assert fut.value == ({tp: (10, 10000)}, set()) @@ -214,23 +227,29 @@ def test__send_list_offsets_requests_multiple_nodes(fetcher, mocker): tp2 = TopicPartition("topic_send_list_offsets", 2) tp3 = TopicPartition("topic_send_list_offsets", 3) tp4 = TopicPartition("topic_send_list_offsets", 4) - mocked_send = mocker.patch.object(fetcher, "_send_list_offsets_request") - send_futures = [] - def send_side_effect(node_id, timestamps): + send_futures = [] + async def fake_send(node_id, timestamps): f = Future() send_futures.append((node_id, timestamps, f)) - return f - mocked_send.side_effect = send_side_effect + return await f + mocked_send = mocker.patch.object(fetcher, "_send_list_offsets_request", side_effect=fake_send) mocked_leader = mocker.patch.object( fetcher._client.cluster, "leader_for_partition") mocked_leader.side_effect = itertools.cycle([0, 1]) mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) + manager = fetcher._client._manager + + def wait_for_send_futures(n): + while len(send_futures) < n: + manager.poll(timeout_ms=10) + # -- All node succeeded case tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)]) - fut = fetcher._send_list_offsets_requests(tss) + fut = manager.call_soon(fetcher._send_list_offsets_requests, tss) + wait_for_send_futures(2) assert not fut.is_done assert mocked_send.call_count == 2 @@ -249,115 +268,103 @@ def send_side_effect(node_id, timestamps): } # We only resolved 1 future so far, so result future is not yet ready + manager.poll(timeout_ms=10) assert not fut.is_done second_future.success(({tp2: (12, 1002), tp4: (14, 1004)}, set())) + manager.poll(future=fut) assert fut.succeeded() assert fut.value == ({tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)}, set()) # -- First succeeded second not del send_futures[:] - fut = fetcher._send_list_offsets_requests(tss) - assert len(send_futures) == 2 + fut = manager.call_soon(fetcher._send_list_offsets_requests, tss) + wait_for_send_futures(2) send_futures[0][2].success(({tp1: (11, 1001)}, set())) send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1)) + manager.poll(future=fut) assert fut.failed() assert isinstance(fut.exception, UnknownTopicOrPartitionError) # -- First fails second succeeded del send_futures[:] - fut = fetcher._send_list_offsets_requests(tss) - assert len(send_futures) == 2 + fut = manager.call_soon(fetcher._send_list_offsets_requests, tss) + wait_for_send_futures(2) send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1)) send_futures[1][2].success(({tp1: (11, 1001)}, set())) + manager.poll(future=fut) assert fut.failed() assert isinstance(fut.exception, UnknownTopicOrPartitionError) def test__handle_list_offsets_response_v1(fetcher, mocker): # Broker returns UnsupportedForMessageFormatError, will omit partition - fut = Future() res = ListOffsetsResponse[1]([ ("topic", [(0, 43, -1, -1)]), ("topic", [(1, 0, 1000, 9999)]) ]) - fetcher._handle_list_offsets_response(fut, res) - assert fut.succeeded() - assert fut.value == ({TopicPartition("topic", 1): OffsetAndTimestamp(9999, 1000, -1)}, set()) + assert fetcher._handle_list_offsets_response(res) == ( + {TopicPartition("topic", 1): OffsetAndTimestamp(9999, 1000, -1)}, set()) # Broker returns NotLeaderForPartitionError - fut = Future() res = ListOffsetsResponse[1]([ ("topic", [(0, 6, -1, -1)]), ]) - fetcher._handle_list_offsets_response(fut, res) - assert fut.succeeded() - assert fut.value == ({}, set([TopicPartition("topic", 0)])) + assert fetcher._handle_list_offsets_response(res) == ( + {}, set([TopicPartition("topic", 0)])) # Broker returns UnknownTopicOrPartitionError - fut = Future() res = ListOffsetsResponse[1]([ ("topic", [(0, 3, -1, -1)]), ]) - fetcher._handle_list_offsets_response(fut, res) - assert fut.succeeded() - assert fut.value == ({}, set([TopicPartition("topic", 0)])) + assert fetcher._handle_list_offsets_response(res) == ( + {}, set([TopicPartition("topic", 0)])) # Broker returns many errors and 1 result - fut = Future() res = ListOffsetsResponse[1]([ ("topic", [(0, 43, -1, -1)]), # not retriable ("topic", [(1, 6, -1, -1)]), # retriable ("topic", [(2, 3, -1, -1)]), # retriable ("topic", [(3, 0, 1000, 9999)]) ]) - fetcher._handle_list_offsets_response(fut, res) - assert fut.succeeded() - assert fut.value == ({TopicPartition("topic", 3): OffsetAndTimestamp(9999, 1000, -1)}, - set([TopicPartition("topic", 1), TopicPartition("topic", 2)])) + assert fetcher._handle_list_offsets_response(res) == ( + {TopicPartition("topic", 3): OffsetAndTimestamp(9999, 1000, -1)}, + set([TopicPartition("topic", 1), TopicPartition("topic", 2)])) def test__handle_list_offsets_response_v2_v3(fetcher, mocker): # including a throttle_time shouldnt cause issues - fut = Future() res = ListOffsetsResponse[2]( 123, # throttle_time_ms [("topic", [(0, 0, 1000, 9999)]) ]) - fetcher._handle_list_offsets_response(fut, res) - assert fut.succeeded() - assert fut.value == ({TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)}, set()) + assert fetcher._handle_list_offsets_response(res) == ( + {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)}, set()) # v3 response is the same format - fut = Future() res = ListOffsetsResponse[3]( 123, # throttle_time_ms [("topic", [(0, 0, 1000, 9999)]) ]) - fetcher._handle_list_offsets_response(fut, res) - assert fut.succeeded() - assert fut.value == ({TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)}, set()) + assert fetcher._handle_list_offsets_response(res) == ( + {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)}, set()) def test__handle_list_offsets_response_v4_v5(fetcher, mocker): # includes leader_epoch - fut = Future() res = ListOffsetsResponse[4]( 123, # throttle_time_ms [("topic", [(0, 0, 1000, 9999, 1234)]) ]) - fetcher._handle_list_offsets_response(fut, res) - assert fut.succeeded() - assert fut.value == ({TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)}, set()) + assert fetcher._handle_list_offsets_response(res) == ( + {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)}, set()) # v5 response is the same format - fut = Future() res = ListOffsetsResponse[5]( 123, # throttle_time_ms [("topic", [(0, 0, 1000, 9999, 1234)]) ]) - fetcher._handle_list_offsets_response(fut, res) - assert fut.succeeded() - assert fut.value == ({TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)}, set()) + assert fetcher._handle_list_offsets_response(res) == ( + {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)}, set()) def test_fetched_records(fetcher, topic, mocker): @@ -643,11 +650,15 @@ def test_reset_offsets_paused(subscription_state, client, mocker): subscription_state.request_offset_reset(tp, OffsetResetStrategy.LATEST) fetched_offsets = {tp: OffsetAndTimestamp(10, 1, -1)} + async def fake_send(node_id, timestamps_and_epochs): + return (fetched_offsets, set()) mocker.patch.object(fetcher._client, 'ready', return_value=True) - mocker.patch.object(fetcher, '_send_list_offsets_request', - return_value=Future().success((fetched_offsets, set()))) + mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=fake_send) mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) - fetcher.reset_offsets_if_needed() + manager = fetcher._client._manager + manager.run(fetcher._reset_offsets_async, {tp: OffsetResetStrategy.LATEST}) + while subscription_state.is_offset_reset_needed(tp): + manager.poll(timeout_ms=10) assert not subscription_state.is_offset_reset_needed(tp) assert not subscription_state.is_fetchable(tp) # because tp is paused @@ -663,11 +674,15 @@ def test_reset_offsets_paused_without_valid(subscription_state, client, mocker): subscription_state.reset_missing_positions() fetched_offsets = {tp: OffsetAndTimestamp(0, 1, -1)} + async def fake_send(node_id, timestamps_and_epochs): + return (fetched_offsets, set()) mocker.patch.object(fetcher._client, 'ready', return_value=True) - mocker.patch.object(fetcher, '_send_list_offsets_request', - return_value=Future().success((fetched_offsets, set()))) + mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=fake_send) mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) - fetcher.reset_offsets_if_needed() + manager = fetcher._client._manager + manager.run(fetcher._reset_offsets_async, {tp: OffsetResetStrategy.EARLIEST}) + while subscription_state.is_offset_reset_needed(tp): + manager.poll(timeout_ms=10) assert not subscription_state.is_offset_reset_needed(tp) assert not subscription_state.is_fetchable(tp) # because tp is paused @@ -791,9 +806,9 @@ def test_success_no_retry(self, fetcher, mocker): timestamps = {tp: 1000} expected_offset = OffsetAndTimestamp(10, 1000, -1) - future = Future() - future.success(({tp: expected_offset}, set())) - mocker.patch.object(fetcher, '_send_list_offsets_requests', return_value=future) + async def fake_send(ts): + return ({tp: expected_offset}, set()) + mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=fake_send) result = fetcher.offsets_by_times(timestamps, timeout_ms=10000) assert result == {tp: expected_offset} @@ -805,10 +820,10 @@ def test_success_with_retry(self, fetcher, mocker): offset0 = OffsetAndTimestamp(10, 1000, -1) offset1 = OffsetAndTimestamp(20, 2000, -1) - future1 = Future().success(({tp0: offset0}, {tp1})) - future2 = Future().success(({tp1: offset1}, set())) - futures = iter([future1, future2]) - mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=lambda ts: next(futures)) + results = iter([({tp0: offset0}, {tp1}), ({tp1: offset1}, set())]) + async def fake_send(ts): + return next(results) + mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=fake_send) result = fetcher.offsets_by_times(timestamps, timeout_ms=10000) assert result == {tp0: offset0, tp1: offset1} @@ -817,9 +832,10 @@ def test_timeout_raises(self, fetcher, mocker): tp = TopicPartition('test', 0) timestamps = {tp: 1000} - # Return a future that never completes - future = Future() - mocker.patch.object(fetcher, '_send_list_offsets_requests', return_value=future) + # Awaits a future that never completes + async def fake_send(ts): + await Future() + mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=fake_send) with pytest.raises(Errors.KafkaTimeoutError): fetcher.offsets_by_times(timestamps, timeout_ms=50) @@ -829,9 +845,9 @@ def test_non_retriable_error_raises(self, fetcher, mocker): timestamps = {tp: 1000} # AuthorizationError is not retriable - error = Errors.TopicAuthorizationFailedError() - future = Future().failure(error) - mocker.patch.object(fetcher, '_send_list_offsets_requests', return_value=future) + async def fake_send(ts): + raise Errors.TopicAuthorizationFailedError() + mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=fake_send) with pytest.raises(Errors.TopicAuthorizationFailedError): fetcher.offsets_by_times(timestamps, timeout_ms=10000) @@ -842,10 +858,13 @@ def test_retriable_invalid_metadata_triggers_refresh(self, fetcher, mocker): expected_offset = OffsetAndTimestamp(10, 1000, -1) # First call fails with invalid_metadata error, second succeeds - future1 = Future().failure(NotLeaderForPartitionError()) - future2 = Future().success(({tp: expected_offset}, set())) - futures = iter([future1, future2]) - mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=lambda ts: next(futures)) + results = iter([NotLeaderForPartitionError(), ({tp: expected_offset}, set())]) + async def fake_send(ts): + r = next(results) + if isinstance(r, BaseException): + raise r + return r + mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=fake_send) refresh_future = Future().success(None) update_metadata_mock = mocker.patch.object( @@ -861,10 +880,13 @@ def test_retriable_non_metadata_error_sleeps(self, fetcher, mocker): expected_offset = OffsetAndTimestamp(10, 1000, -1) # RequestTimedOutError is retriable but not invalid_metadata - future1 = Future().failure(Errors.RequestTimedOutError()) - future2 = Future().success(({tp: expected_offset}, set())) - futures = iter([future1, future2]) - mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=lambda ts: next(futures)) + results = iter([Errors.RequestTimedOutError(), ({tp: expected_offset}, set())]) + async def fake_send(ts): + r = next(results) + if isinstance(r, BaseException): + raise r + return r + mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=fake_send) # Ensure cluster does not need update mocker.patch.object(type(fetcher._client._manager.cluster), 'need_update', @@ -889,10 +911,10 @@ def test_success_does_not_check_exception(self, fetcher, mocker): # Succeeds but has retry partitions -- the bug was that code # would fall through to check future.exception (which is None), # causing an AttributeError - future1 = Future().success(({tp0: offset0}, {tp1})) - future2 = Future().success(({tp1: offset1}, set())) - futures = iter([future1, future2]) - mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=lambda ts: next(futures)) + results = iter([({tp0: offset0}, {tp1}), ({tp1: offset1}, set())]) + async def fake_send(ts): + return next(results) + mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=fake_send) # Should not raise AttributeError result = fetcher.offsets_by_times(timestamps, timeout_ms=10000) @@ -903,8 +925,9 @@ def test_no_timeout_passes_none(self, fetcher, mocker): timestamps = {tp: 1000} expected_offset = OffsetAndTimestamp(10, 1000, -1) - future = Future().success(({tp: expected_offset}, set())) - mocker.patch.object(fetcher, '_send_list_offsets_requests', return_value=future) + async def fake_send(ts): + return ({tp: expected_offset}, set()) + mocker.patch.object(fetcher, '_send_list_offsets_requests', side_effect=fake_send) result = fetcher.offsets_by_times(timestamps, timeout_ms=None) assert result == {tp: expected_offset}