From f05fd1a4408c872f57f614497377034abd441598 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 7 May 2026 11:18:23 -0700 Subject: [PATCH 1/5] Consumer: replace client.poll with fetcher.fetch_records --- kafka/consumer/fetcher.py | 59 +++++++++++++++++++++++++++++++++++++++ kafka/consumer/group.py | 28 ++++--------------- 2 files changed, 64 insertions(+), 23 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index a2da71166..996c9b637 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -142,6 +142,65 @@ def _enable_incremental_fetch_sessions(self): return False return self.config['enable_incremental_fetch_sessions'] + def fetch_records(self, max_records=None, update_offsets=True, timeout_ms=None): + """Drain buffered records, pipeline next fetches, and wait briefly + for in-flight responses if no records are immediately available. + + Single-call replacement for the legacy + ``fetched_records → send_fetches → client.poll → fetched_records`` + loop in :meth:`KafkaConsumer._poll_once`. The caller no longer + drives the event loop; the wait happens inside this method via a + wakeup Future fired by any in-flight fetch's completion callback. + + Arguments: + max_records (int, optional): cap on returned records. + update_offsets (bool): advance subscription positions for + consumed records. + timeout_ms (int, optional): wall-clock cap on the wait phase. + Only applies when no records are immediately available. + + Returns: + dict[TopicPartition, list[ConsumerRecord]]: records grouped by + partition; may be empty if no records arrived in the budget. + """ + # Drain whatever's already buffered from prior fetch responses. + records, partial = self.fetched_records( + max_records, update_offsets=update_offsets) + if records and partial: + # Same partition has more buffered; hold off on more fetches + # so we drain the buffer in order across calls. + return records + + # Pipeline next fetches for partitions without an in-flight request. + self.send_fetches() + + if records: + return records + + # No records yet. Wait for any in-flight fetch to complete (or + # timeout). add_both fires synchronously on already-done futures, + # closing the race where a response arrives between send_fetches + # and the wait setup. + in_flight = list(self._fetch_futures) + if not in_flight: + return records # nothing in flight; nothing to wait for + + wakeup = Future() + def _wake(_): + if not wakeup.is_done: + wakeup.success(None) + for fut in in_flight: + fut.add_both(_wake) + + try: + self._manager.run(self._manager.wait_for, wakeup, timeout_ms) + except Errors.KafkaTimeoutError: + pass + + records, _ = self.fetched_records( + max_records, update_offsets=update_offsets) + return records + def send_fetches(self): """Send FetchRequests for all assigned partitions that do not already have an in-flight fetch or pending fetch data. diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 028b97ba6..93e0ea09d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -721,35 +721,17 @@ def _poll_once(self, timer, max_records, update_offsets=True): self._refresh_committed_offsets(timeout_ms=timer.timeout_ms) # Fire-and-forget: kicks ListOffsets reset for any remaining partitions. # _reset_offsets_async self-drives metadata refresh + retry-backoff - # within request_timeout_ms; we don't need to wake the user thread - # early to re-trigger it. The result Task is shared across callers - # via Fetcher._reset_task. + # within request_timeout_ms. self._fetcher.reset_offsets_if_needed() - # If data is available already, e.g. from a previous network client - # poll() call to commit, then just return it immediately - records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) - log.debug('poll: fetched records: %s, %s', records, partial) - # Before returning the fetched records, we can send off the - # next round of fetches and avoid block waiting for their - # responses to enable pipelining while the user is handling the - # fetched records. - if not partial: - log.debug("poll: Sending fetches") - futures = self._fetcher.send_fetches() - if len(futures): - self._client.poll(timeout_ms=0) - - if records: - return records - + # Cap the fetch wait by the heartbeat deadline so we don't block past + # when the coordinator wants to send the next heartbeat. poll_timeout_ms = timer.timeout_ms if self.config['group_id'] is not None: poll_timeout_ms = min(poll_timeout_ms, self._coordinator.time_to_next_poll() * 1000) - self._client.poll(timeout_ms=poll_timeout_ms) - records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) - return records + return self._fetcher.fetch_records( + max_records, update_offsets=update_offsets, timeout_ms=poll_timeout_ms) def position(self, partition, timeout_ms=None): """Get the offset of the next record that will be fetched From bfa63800370a2008ab88d0cff3d7c06c8e19db2b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 7 May 2026 11:18:53 -0700 Subject: [PATCH 2/5] minor simplifications --- kafka/consumer/fetcher.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 996c9b637..e1f813cd7 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -166,13 +166,9 @@ def fetch_records(self, max_records=None, update_offsets=True, timeout_ms=None): # Drain whatever's already buffered from prior fetch responses. records, partial = self.fetched_records( max_records, update_offsets=update_offsets) - if records and partial: - # Same partition has more buffered; hold off on more fetches - # so we drain the buffer in order across calls. - return records - - # Pipeline next fetches for partitions without an in-flight request. - self.send_fetches() + if not partial: + # No buffered records remaining; send next batch of fetch requests. + self.send_fetches() if records: return records From 562bc0497ed5beabbbd0d1d7e13de63eb78bc1d9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 10 May 2026 18:24:05 -0700 Subject: [PATCH 3/5] also wait on reset_task --- kafka/consumer/fetcher.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e1f813cd7..2db0553a4 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -173,19 +173,26 @@ def fetch_records(self, max_records=None, update_offsets=True, timeout_ms=None): if records: return records - # No records yet. Wait for any in-flight fetch to complete (or - # timeout). add_both fires synchronously on already-done futures, - # closing the race where a response arrives between send_fetches - # and the wait setup. - in_flight = list(self._fetch_futures) - if not in_flight: - return records # nothing in flight; nothing to wait for + # No records yet. Wait for any signal that more work might be + # ready: an in-flight fetch completing OR a pending offset-reset + # task completing (positions become available, enabling future + # fetches). The wait drives the manager's event loop — the + # consumer has no background IO thread, so call_soon-scheduled + # tasks (resets, sent fetches) only run inside manager.run. + # add_both fires synchronously on already-done futures, closing + # the race where a response arrives between scheduling and the + # wait setup. + waited_on = list(self._fetch_futures) + if self._reset_task is not None and not self._reset_task.is_done: + waited_on.append(self._reset_task) + if not waited_on: + return records # nothing pending; nothing to wait for wakeup = Future() def _wake(_): if not wakeup.is_done: wakeup.success(None) - for fut in in_flight: + for fut in waited_on: fut.add_both(_wake) try: From 006730709c783c8032102d14b6a2ca88708238e4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 10 May 2026 18:24:21 -0700 Subject: [PATCH 4/5] hold client._lock during manager.run --- kafka/consumer/fetcher.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 2db0553a4..e316ad577 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -195,8 +195,12 @@ def _wake(_): for fut in waited_on: fut.add_both(_wake) + # Hold _client._lock so we serialize with HeartbeatThread, which + # also drives _net.poll under this lock. Drops once Phase D + # retires HeartbeatThread. try: - self._manager.run(self._manager.wait_for, wakeup, timeout_ms) + with self._client._lock: + self._manager.run(self._manager.wait_for, wakeup, timeout_ms) except Errors.KafkaTimeoutError: pass From d08939a3e7f8c6b9bba60871fca4313e15f58fb9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 10 May 2026 18:24:37 -0700 Subject: [PATCH 5/5] client.poll until permanent io thread --- kafka/consumer/group.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 93e0ea09d..4572420f6 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -730,6 +730,9 @@ def _poll_once(self, timer, max_records, update_offsets=True): if self.config['group_id'] is not None: poll_timeout_ms = min(poll_timeout_ms, self._coordinator.time_to_next_poll() * 1000) + # turn the IO crank here until we have permanent io thread + self._client.poll(timeout_ms=0) + return self._fetcher.fetch_records( max_records, update_offsets=update_offsets, timeout_ms=poll_timeout_ms)