Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 59 additions & 77 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(self, client, subscriptions, **configs):
raise Errors.KafkaConfigurationError('Unrecognized isolation_level')

self._client = client
self._manager = client._manager
self._subscriptions = subscriptions
self._completed_fetches = collections.deque() # Unparsed responses
self._next_partition_records = None # Holds a single PartitionRecords until fully consumed
Expand Down Expand Up @@ -197,7 +198,7 @@ def reset_offsets_if_needed(self):
if ts:
offset_resets[tp] = ts

self._reset_offsets_async(offset_resets)
self._manager.call_soon(self._reset_offsets_async, offset_resets)
return True

def offsets_by_times(self, timestamps, timeout_ms=None):
Expand All @@ -221,7 +222,7 @@ def offsets_by_times(self, timestamps, timeout_ms=None):
Raises:
KafkaTimeoutError if timeout_ms provided
"""
offsets = self._client._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
for tp in timestamps:
if tp not in offsets:
offsets[tp] = None
Expand All @@ -238,25 +239,25 @@ async def _fetch_offsets_by_times_async(self, timestamps, timeout_ms=None):
if not timestamps:
return {}

future = self._send_list_offsets_requests(timestamps)
future = self._manager.call_soon(self._send_list_offsets_requests, timestamps)
try:
offsets, retry = await self._client._manager.wait_for(future, timer.timeout_ms)
offsets, retry = await self._manager.wait_for(future, timer.timeout_ms)
except Errors.KafkaTimeoutError:
break
except Exception as exc:
if not getattr(exc, 'retriable', False):
raise
if getattr(exc, 'invalid_metadata', False) or self._client._manager.cluster.need_update:
refresh_future = self._client.cluster.request_update()
if getattr(exc, 'invalid_metadata', False) or self._manager.cluster.need_update:
refresh_future = self._manager.cluster.request_update()
try:
await self._client._manager.wait_for(refresh_future, timer.timeout_ms)
await self._manager.wait_for(refresh_future, timer.timeout_ms)
except Errors.KafkaTimeoutError:
break
else:
delay = self.config['retry_backoff_ms'] / 1000
if timer.timeout_ms is not None:
delay = min(delay, timer.timeout_ms / 1000)
await self._client._manager._net.sleep(delay)
await self._manager._net.sleep(delay)
else:
fetched_offsets.update(offsets)
if not retry:
Expand All @@ -278,7 +279,7 @@ def end_offsets(self, partitions, timeout_ms):

def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
timestamps = dict([(tp, timestamp) for tp in partitions])
offsets = self._client._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
for tp in timestamps:
offsets[tp] = offsets[tp].offset
return offsets
Expand Down Expand Up @@ -410,77 +411,67 @@ def _reset_offset_if_needed(self, partition, timestamp, offset):
log.info("Resetting offset for partition %s to offset %s.", partition, offset)
self._subscriptions.seek(partition, offset)

def _reset_offsets_async(self, timestamps):
async def _reset_offsets_async(self, timestamps):
timestamps_by_node = self._group_list_offset_requests(timestamps)

for node_id, timestamps_and_epochs in timestamps_by_node.items():
if not self._client.ready(node_id):
continue
partitions = set(timestamps_and_epochs.keys())
expire_at = time.monotonic() + self.config['request_timeout_ms'] / 1000
self._subscriptions.set_reset_pending(partitions, expire_at)
self._manager.call_soon(self._reset_offsets_for_node, node_id, timestamps_and_epochs, partitions)

def on_success(timestamps_and_epochs, result):
fetched_offsets, partitions_to_retry = result
if partitions_to_retry:
self._subscriptions.reset_failed(partitions_to_retry, time.monotonic() + self.config['retry_backoff_ms'] / 1000)
self._client.cluster.request_update()

for partition, offset in fetched_offsets.items():
ts, _epoch = timestamps_and_epochs[partition]
self._reset_offset_if_needed(partition, ts, offset.offset)

def on_failure(partitions, error):
self._subscriptions.reset_failed(partitions, time.monotonic() + self.config['retry_backoff_ms'] / 1000)
self._client.cluster.request_update()

if not getattr(error, 'retriable', False):
if not self._cached_list_offsets_exception:
self._cached_list_offsets_exception = error
else:
log.error("Discarding error in ListOffsetResponse because another error is pending: %s", error)
async def _reset_offsets_for_node(self, node_id, timestamps_and_epochs, partitions):
try:
fetched_offsets, partitions_to_retry = await self._send_list_offsets_request(node_id, timestamps_and_epochs)
except Exception as error:
self._subscriptions.reset_failed(partitions, time.monotonic() + self.config['retry_backoff_ms'] / 1000)
self._manager.cluster.request_update()
if not getattr(error, 'retriable', False):
if not self._cached_list_offsets_exception:
self._cached_list_offsets_exception = error
else:
log.error("Discarding error in ListOffsetResponse because another error is pending: %s", error)
return

future = self._send_list_offsets_request(node_id, timestamps_and_epochs)
future.add_callback(on_success, timestamps_and_epochs)
future.add_errback(on_failure, partitions)
if partitions_to_retry:
self._subscriptions.reset_failed(partitions_to_retry, time.monotonic() + self.config['retry_backoff_ms'] / 1000)
self._manager.cluster.request_update()
for partition, offset in fetched_offsets.items():
ts, _epoch = timestamps_and_epochs[partition]
self._reset_offset_if_needed(partition, ts, offset.offset)

def _send_list_offsets_requests(self, timestamps):
async def _send_list_offsets_requests(self, timestamps):
"""Fetch offsets for each partition in timestamps dict. This may send
request to multiple nodes, based on who is Leader for partition.

Per-node requests are dispatched concurrently; if any fails, the first
exception encountered propagates and the remaining results are dropped.

Arguments:
timestamps (dict): {TopicPartition: int} mapping of fetching
timestamps.

Returns:
Future: resolves to a mapping of retrieved offsets
(fetched_offsets, partitions_to_retry)

Raises:
StaleMetadata: if no node has known leader for any partition.
"""
timestamps_by_node = self._group_list_offset_requests(timestamps)
if not timestamps_by_node:
return Future().failure(Errors.StaleMetadata())
raise Errors.StaleMetadata()

futures = [
self._manager.call_soon(self._send_list_offsets_request, node_id, ts)
for node_id, ts in timestamps_by_node.items()
]

# Aggregate results until we have all responses
list_offsets_future = Future()
fetched_offsets = dict()
partitions_to_retry = set()
remaining_responses = [len(timestamps_by_node)] # list for mutable / 2.7 hack

def on_success(remaining_responses, value):
remaining_responses[0] -= 1 # noqa: F823
fetched_offsets.update(value[0])
partitions_to_retry.update(value[1])
if not remaining_responses[0] and not list_offsets_future.is_done:
list_offsets_future.success((fetched_offsets, partitions_to_retry))

def on_fail(err):
if not list_offsets_future.is_done:
list_offsets_future.failure(err)

for node_id, timestamps in timestamps_by_node.items():
_f = self._send_list_offsets_request(node_id, timestamps)
_f.add_callback(on_success, remaining_responses)
_f.add_errback(on_fail)
return list_offsets_future
for f in futures:
offs, retry = await f
fetched_offsets.update(offs)
partitions_to_retry.update(retry)
return fetched_offsets, partitions_to_retry

def _group_list_offset_requests(self, timestamps):
timestamps_by_node = collections.defaultdict(dict)
Expand All @@ -499,7 +490,7 @@ def _group_list_offset_requests(self, timestamps):
timestamps_by_node[node_id][partition] = (timestamp, leader_epoch)
return dict(timestamps_by_node)

def _send_list_offsets_request(self, node_id, timestamps_and_epochs):
async def _send_list_offsets_request(self, node_id, timestamps_and_epochs):
version = self._client.api_version(ListOffsetsRequest, max_version=5)
if self.config['isolation_level'] == 'read_committed' and version < 2:
raise Errors.UnsupportedVersionError('read_committed isolation level requires ListOffsetsRequest >= v2')
Expand All @@ -523,26 +514,18 @@ def _send_list_offsets_request(self, node_id, timestamps_and_epochs):
-1,
list(by_topic.items()))

# Client returns a future that only fails on network issues
# so create a separate future and attach a callback to update it
# based on response error codes
future = Future()

log.debug("Sending ListOffsetRequest %s to broker %s", request, node_id)
_f = self._client.send(node_id, request)
_f.add_callback(self._handle_list_offsets_response, future)
_f.add_errback(lambda e: future.failure(e))
return future
response = await self._manager.send(request, node_id=node_id)
return self._handle_list_offsets_response(response)

def _handle_list_offsets_response(self, future, response):
"""Callback for the response of the ListOffsets api call
def _handle_list_offsets_response(self, response):
"""Parse a ListOffsets response.

Arguments:
future (Future): the future to update based on response
response (ListOffsetsResponse): response from the server
Returns:
(fetched_offsets, partitions_to_retry): dict[TopicPartition, OffsetAndTimestamp], set[TopicPartition]

Raises:
AssertionError: if response does not match partition
TopicAuthorizationFailedError: if any topic returned an auth error
"""
fetched_offsets = dict()
partitions_to_retry = set()
Expand Down Expand Up @@ -599,9 +582,8 @@ def _handle_list_offsets_response(self, future, response):
" %s", partition, error_type.__name__)
partitions_to_retry.add(partition)
if unauthorized_topics:
future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics))
else:
future.success((fetched_offsets, partitions_to_retry))
raise Errors.TopicAuthorizationFailedError(unauthorized_topics)
return fetched_offsets, partitions_to_retry

def _fetchable_partitions(self):
fetchable = self._subscriptions.fetchable_partitions()
Expand Down
2 changes: 2 additions & 0 deletions kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,8 @@ async def _invoke(self, coro, args):
result = await coro
else:
result = coro(*args)
if inspect.iscoroutine(result) or hasattr(result, '__await__'):
result = await result
while isinstance(result, Future):
result = await result
return result
Expand Down
Loading
Loading