From e51a10795d595b62c2244056991ca9f96b0804a7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 10 Apr 2026 23:08:07 -0700 Subject: [PATCH] debug: add join/rejoin diagnostic logging Add temporary debug logging to investigate consumer rejoin flakiness where _handle_join_success runs (logging "Enabling heartbeat thread") but _on_join_complete never runs (missing "Setting newly assigned partitions"), causing the consumer to spin without an assignment. - _initiate_join_group: log create vs reuse path + future state - join_group: log future.is_done/exception/identity after the poll, succeeded() branch, and entry/exit of _on_join_complete - ConsumerCoordinator.need_rejoin: log which branch is taken with the relevant snapshot/subscription state Co-Authored-By: Claude Opus 4.6 (1M context) --- kafka/coordinator/base.py | 10 ++++++++++ kafka/coordinator/consumer.py | 11 ++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 185549b5b..1b5321c97 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -397,6 +397,7 @@ def _initiate_join_group(self): # This ensures that we do not mistakenly attempt to rejoin # before the pending rebalance has completed. if self.join_future is None: + log.debug("_initiate_join_group: creating new join_future (state=%s)", self.state) self.state = MemberState.REBALANCING self.join_future = self._send_join_group_request() @@ -409,6 +410,9 @@ def _initiate_join_group(self): # If the join completes after having been woken up, the # exception is ignored and we will rejoin self.join_future.add_errback(self._handle_join_failure) + else: + log.debug("_initiate_join_group: returning existing join_future (is_done=%s, exception=%s, state=%s)", + self.join_future.is_done, self.join_future.exception, self.state) return self.join_future @@ -490,18 +494,24 @@ def join_group(self, timeout_ms=None): future = self._initiate_join_group() self._client.poll(future=future, timeout_ms=timer.timeout_ms) + log.debug("join_group: after poll, future.is_done=%s future.exception=%s future is self.join_future=%s state=%s", + future.is_done, future.exception, future is self.join_future, self.state) if future.is_done: self._reset_join_group_future() else: return False + log.debug("join_group: checking future.succeeded()=%s (is_done=%s exception=%s)", + future.succeeded(), future.is_done, future.exception) if future.succeeded(): self.rejoining = False self.rejoin_needed = False + log.debug("join_group: about to call _on_join_complete (generation=%s)", self._generation) self._on_join_complete(self._generation.generation_id, self._generation.member_id, self._generation.protocol, future.value) + log.debug("join_group: _on_join_complete returned") return True else: exception = future.exception diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index da2ed2b97..6592988c9 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -388,22 +388,31 @@ def need_rejoin(self): bool: True if consumer should rejoin group, False otherwise """ if not self._subscription.partitions_auto_assigned(): + log.debug("need_rejoin: False (partitions not auto-assigned)") return False if self._auto_assign_all_partitions(): + log.debug("need_rejoin: False (auto-assign all partitions)") return False # we need to rejoin if we performed the assignment and metadata has changed if (self._assignment_snapshot is not None and self._assignment_snapshot != self._metadata_snapshot): + log.debug("need_rejoin: True (assignment_snapshot != metadata_snapshot: %s != %s)", + self._assignment_snapshot, self._metadata_snapshot) return True # we need to join if our subscription has changed since the last join if (self._joined_subscription is not None and self._joined_subscription != self._subscription.subscription): + log.debug("need_rejoin: True (joined_subscription != subscription: %s != %s)", + self._joined_subscription, self._subscription.subscription) return True - return super().need_rejoin() + parent = super().need_rejoin() + log.debug("need_rejoin: %s (from base.rejoin_needed; assignment_snapshot=%s metadata_snapshot=%s joined_subscription=%s)", + parent, self._assignment_snapshot, self._metadata_snapshot, self._joined_subscription) + return parent def refresh_committed_offsets_if_needed(self, timeout_ms=None): """Fetch committed offsets for assigned partitions."""