Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,72 @@ def _enable_incremental_fetch_sessions(self):
return False
return self.config['enable_incremental_fetch_sessions']

def fetch_records(self, max_records=None, update_offsets=True, timeout_ms=None):
"""Drain buffered records, pipeline next fetches, and wait briefly
for in-flight responses if no records are immediately available.

Single-call replacement for the legacy
``fetched_records → send_fetches → client.poll → fetched_records``
loop in :meth:`KafkaConsumer._poll_once`. The caller no longer
drives the event loop; the wait happens inside this method via a
wakeup Future fired by any in-flight fetch's completion callback.

Arguments:
max_records (int, optional): cap on returned records.
update_offsets (bool): advance subscription positions for
consumed records.
timeout_ms (int, optional): wall-clock cap on the wait phase.
Only applies when no records are immediately available.

Returns:
dict[TopicPartition, list[ConsumerRecord]]: records grouped by
partition; may be empty if no records arrived in the budget.
"""
# Drain whatever's already buffered from prior fetch responses.
records, partial = self.fetched_records(
max_records, update_offsets=update_offsets)
if not partial:
# No buffered records remaining; send next batch of fetch requests.
self.send_fetches()

if records:
return records

# No records yet. Wait for any signal that more work might be
# ready: an in-flight fetch completing OR a pending offset-reset
# task completing (positions become available, enabling future
# fetches). The wait drives the manager's event loop — the
# consumer has no background IO thread, so call_soon-scheduled
# tasks (resets, sent fetches) only run inside manager.run.
# add_both fires synchronously on already-done futures, closing
# the race where a response arrives between scheduling and the
# wait setup.
waited_on = list(self._fetch_futures)
if self._reset_task is not None and not self._reset_task.is_done:
waited_on.append(self._reset_task)
if not waited_on:
return records # nothing pending; nothing to wait for

wakeup = Future()
def _wake(_):
if not wakeup.is_done:
wakeup.success(None)
for fut in waited_on:
fut.add_both(_wake)

# Hold _client._lock so we serialize with HeartbeatThread, which
# also drives _net.poll under this lock. Drops once Phase D
# retires HeartbeatThread.
try:
with self._client._lock:
self._manager.run(self._manager.wait_for, wakeup, timeout_ms)
except Errors.KafkaTimeoutError:
pass

records, _ = self.fetched_records(
max_records, update_offsets=update_offsets)
return records

def send_fetches(self):
"""Send FetchRequests for all assigned partitions that do not already have
an in-flight fetch or pending fetch data.
Expand Down
31 changes: 8 additions & 23 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,35 +721,20 @@ def _poll_once(self, timer, max_records, update_offsets=True):
self._refresh_committed_offsets(timeout_ms=timer.timeout_ms)
# Fire-and-forget: kicks ListOffsets reset for any remaining partitions.
# _reset_offsets_async self-drives metadata refresh + retry-backoff
# within request_timeout_ms; we don't need to wake the user thread
# early to re-trigger it. The result Task is shared across callers
# via Fetcher._reset_task.
# within request_timeout_ms.
self._fetcher.reset_offsets_if_needed()

# If data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
log.debug('poll: fetched records: %s, %s', records, partial)
# Before returning the fetched records, we can send off the
# next round of fetches and avoid block waiting for their
# responses to enable pipelining while the user is handling the
# fetched records.
if not partial:
log.debug("poll: Sending fetches")
futures = self._fetcher.send_fetches()
if len(futures):
self._client.poll(timeout_ms=0)

if records:
return records

# Cap the fetch wait by the heartbeat deadline so we don't block past
# when the coordinator wants to send the next heartbeat.
poll_timeout_ms = timer.timeout_ms
if self.config['group_id'] is not None:
poll_timeout_ms = min(poll_timeout_ms, self._coordinator.time_to_next_poll() * 1000)

self._client.poll(timeout_ms=poll_timeout_ms)
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
return records
# turn the IO crank here until we have permanent io thread
self._client.poll(timeout_ms=0)

return self._fetcher.fetch_records(
max_records, update_offsets=update_offsets, timeout_ms=poll_timeout_ms)

def position(self, partition, timeout_ms=None):
"""Get the offset of the next record that will be fetched
Expand Down
Loading