Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ def _initiate_join_group(self):
# This ensures that we do not mistakenly attempt to rejoin
# before the pending rebalance has completed.
if self.join_future is None:
log.debug("_initiate_join_group: creating new join_future (state=%s)", self.state)
self.state = MemberState.REBALANCING
self.join_future = self._send_join_group_request()

Expand All @@ -409,6 +410,9 @@ def _initiate_join_group(self):
# If the join completes after having been woken up, the
# exception is ignored and we will rejoin
self.join_future.add_errback(self._handle_join_failure)
else:
log.debug("_initiate_join_group: returning existing join_future (is_done=%s, exception=%s, state=%s)",
self.join_future.is_done, self.join_future.exception, self.state)

return self.join_future

Expand Down Expand Up @@ -490,18 +494,24 @@ def join_group(self, timeout_ms=None):

future = self._initiate_join_group()
self._client.poll(future=future, timeout_ms=timer.timeout_ms)
log.debug("join_group: after poll, future.is_done=%s future.exception=%s future is self.join_future=%s state=%s",
future.is_done, future.exception, future is self.join_future, self.state)
if future.is_done:
self._reset_join_group_future()
else:
return False

log.debug("join_group: checking future.succeeded()=%s (is_done=%s exception=%s)",
future.succeeded(), future.is_done, future.exception)
if future.succeeded():
self.rejoining = False
self.rejoin_needed = False
log.debug("join_group: about to call _on_join_complete (generation=%s)", self._generation)
self._on_join_complete(self._generation.generation_id,
self._generation.member_id,
self._generation.protocol,
future.value)
log.debug("join_group: _on_join_complete returned")
return True
else:
exception = future.exception
Expand Down
11 changes: 10 additions & 1 deletion kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,22 +388,31 @@ def need_rejoin(self):
bool: True if consumer should rejoin group, False otherwise
"""
if not self._subscription.partitions_auto_assigned():
log.debug("need_rejoin: False (partitions not auto-assigned)")
return False

if self._auto_assign_all_partitions():
log.debug("need_rejoin: False (auto-assign all partitions)")
return False

# we need to rejoin if we performed the assignment and metadata has changed
if (self._assignment_snapshot is not None
and self._assignment_snapshot != self._metadata_snapshot):
log.debug("need_rejoin: True (assignment_snapshot != metadata_snapshot: %s != %s)",
self._assignment_snapshot, self._metadata_snapshot)
return True

# we need to join if our subscription has changed since the last join
if (self._joined_subscription is not None
and self._joined_subscription != self._subscription.subscription):
log.debug("need_rejoin: True (joined_subscription != subscription: %s != %s)",
self._joined_subscription, self._subscription.subscription)
return True

return super().need_rejoin()
parent = super().need_rejoin()
log.debug("need_rejoin: %s (from base.rejoin_needed; assignment_snapshot=%s metadata_snapshot=%s joined_subscription=%s)",
parent, self._assignment_snapshot, self._metadata_snapshot, self._joined_subscription)
return parent

def refresh_committed_offsets_if_needed(self, timeout_ms=None):
"""Fetch committed offsets for assigned partitions."""
Expand Down
Loading