Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 59 additions & 102 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,36 +557,21 @@ def _send_join_group_request(self):

# send a join group request to the coordinator
log.info("(Re-)joining group %s", self.group_id)
version = self._client.api_version(JoinGroupRequest, max_version=5)
if version == 0:
request = JoinGroupRequest[version](
self.group_id,
self.config['session_timeout_ms'],
self._generation.member_id,
self.protocol_type(),
self.group_protocols())
elif version <= 4:
request = JoinGroupRequest[version](
self.group_id,
self.config['session_timeout_ms'],
self.config['max_poll_interval_ms'],
self._generation.member_id,
self.protocol_type(),
self.group_protocols())
else:
request = JoinGroupRequest[version](
self.group_id,
self.config['session_timeout_ms'],
self.config['max_poll_interval_ms'],
self._generation.member_id,
self.group_instance_id,
self.protocol_type(),
self.group_protocols())
max_version = 6
request = JoinGroupRequest(
group_id=self.group_id,
session_timeout_ms=self.config['session_timeout_ms'],
rebalance_timeout_ms=self.config['max_poll_interval_ms'],
member_id=self._generation.member_id,
group_instance_id=self.group_instance_id,
protocol_type=self.protocol_type(),
protocols=self.group_protocols(),
max_version=max_version)

# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f = self._manager.send(request, node_id=self.coordinator_id)
_f.add_callback(self._handle_join_group_response, future, time.monotonic())
_f.add_errback(self._failed_request, self.coordinator_id,
request, future)
Expand All @@ -604,7 +589,8 @@ def _failed_request(self, node_id, request, future, error):
else:
log.debug('Error sending %s to node %s [%s]',
request.__class__.__name__, node_id, error)
future.failure(error)
if future is not None:
future.failure(error)

def _handle_join_group_response(self, future, send_time, response):
log.debug("Received JoinGroup response: %s", response)
Expand Down Expand Up @@ -684,20 +670,14 @@ def _handle_join_group_response(self, future, send_time, response):

def _on_join_follower(self):
# send follower's sync group with an empty assignment
version = self._client.api_version(SyncGroupRequest, max_version=3)
if version <= 2:
request = SyncGroupRequest[version](
self.group_id,
self._generation.generation_id,
self._generation.member_id,
[])
else:
request = SyncGroupRequest[version](
self.group_id,
self._generation.generation_id,
self._generation.member_id,
self.group_instance_id,
[])
max_version = 4
request = SyncGroupRequest(
group_id=self.group_id,
generation_id=self._generation.generation_id,
member_id=self._generation.member_id,
group_instance_id=self.group_instance_id,
assignments=[],
max_version=max_version)
log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
return self._send_sync_group_request(request)
Expand All @@ -720,20 +700,14 @@ def _on_join_leader(self, response):
except Exception as e:
return Future().failure(e)

version = self._client.api_version(SyncGroupRequest, max_version=3)
if version <= 2:
request = SyncGroupRequest[version](
self.group_id,
self._generation.generation_id,
self._generation.member_id,
group_assignment.items())
else:
request = SyncGroupRequest[version](
self.group_id,
self._generation.generation_id,
self._generation.member_id,
self.group_instance_id,
group_assignment.items())
max_version = 4
request = SyncGroupRequest(
group_id=self.group_id,
generation_id=self._generation.generation_id,
member_id=self._generation.member_id,
group_instance_id=self.group_instance_id,
assignments=group_assignment.items(),
max_version=max_version)
log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
return self._send_sync_group_request(request)
Expand All @@ -750,7 +724,7 @@ def _send_sync_group_request(self, request):
# itself requests a metadata update

future = Future()
_f = self._client.send(self.coordinator_id, request)
_f = self._manager.send(request, node_id=self.coordinator_id)
_f.add_callback(self._handle_sync_group_response, future, time.monotonic())
_f.add_errback(self._failed_request, self.coordinator_id,
request, future)
Expand Down Expand Up @@ -808,15 +782,14 @@ def _send_group_coordinator_request(self):
e = Errors.NodeNotReadyError(node_id)
return Future().failure(e)

version = self._client.api_version(FindCoordinatorRequest, max_version=2)
if version == 0:
request = FindCoordinatorRequest[version](self.group_id)
else:
request = FindCoordinatorRequest[version](self.group_id, 0)
max_version = 3
request = FindCoordinatorRequest(
key=self.group_id,
max_version=max_version)
log.debug("Sending group coordinator request for group %s to broker %s: %s",
self.group_id, node_id, request)
future = Future()
_f = self._client.send(node_id, request)
_f = self._manager.send(request, node_id=node_id)
_f.add_callback(self._handle_find_coordinator_response, future)
_f.add_errback(self._failed_request, node_id, request, future)
return future
Expand Down Expand Up @@ -985,7 +958,7 @@ async def _do_heartbeat(self):
heartbeat_log.debug('Sending heartbeat for group %s %s', self.group_id, self._generation)
self.heartbeat.sent_heartbeat()
try:
response = await self._send_heartbeat_request()
await self._send_heartbeat_request()
heartbeat_log.debug('Heartbeat success')
self.heartbeat.received_heartbeat()
except Errors.KafkaError as exc:
Expand Down Expand Up @@ -1074,80 +1047,64 @@ def _handle_leave_group_response(self, response):
log.error("LeaveGroup request for member %s / group instance %s failed with error: %s",
member.member_id, member.group_instance_id, error_type())

def _send_heartbeat_request(self):
async def _send_heartbeat_request(self):
"""Send a heartbeat request"""
if self.coordinator_unknown():
e = Errors.CoordinatorNotAvailableError(self.coordinator_id)
return Future().failure(e)

elif not self._client.ready(self.coordinator_id, metadata_priority=False):
e = Errors.NodeNotReadyError(self.coordinator_id)
return Future().failure(e)

version = self._client.api_version(HeartbeatRequest, max_version=3)
if version <=2:
request = HeartbeatRequest[version](
self.group_id,
self._generation.generation_id,
self._generation.member_id,
)
else:
request = HeartbeatRequest[version](
self.group_id,
self._generation.generation_id,
self._generation.member_id,
self.group_instance_id,
)
raise Errors.CoordinatorNotAvailableError(self.coordinator_id)

request = HeartbeatRequest(
group_id=self.group_id,
generation_id=self._generation.generation_id,
member_id=self._generation.member_id,
group_instance_id=self.group_instance_id,
)
heartbeat_log.debug("Sending HeartbeatRequest to %s: %s", self.coordinator_id, request)
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_heartbeat_response, future, time.monotonic())
_f.add_errback(self._failed_request, self.coordinator_id,
request, future)
return future
try:
send_time = time.monotonic()
response = await self._manager.send(request, node_id=self.coordinator_id)
return self._handle_heartbeat_response(response, send_time)
except Errors.KafkaError as exc:
self._failed_request(self.coordinator_id, request, None, exc)
raise

def _handle_heartbeat_response(self, future, send_time, response):
def _handle_heartbeat_response(self, response, send_time):
if self._sensors:
self._sensors.heartbeat_latency.record((time.monotonic() - send_time) * 1000)
heartbeat_log.debug("Received heartbeat response for group %s: %s",
self.group_id, response)
error_type = Errors.for_code(response.error_code)
error = error_type()
if error_type is Errors.NoError:
future.success(None)
return
elif error_type in (Errors.CoordinatorNotAvailableError,
Errors.NotCoordinatorError):
heartbeat_log.warning("Heartbeat failed for group %s: coordinator (node %s)"
" is either not started or not valid", self.group_id,
self.coordinator_id)
self.coordinator_dead(error_type())
future.failure(error_type())
self.coordinator_dead(error)
elif error_type is Errors.RebalanceInProgressError:
heartbeat_log.warning("Heartbeat failed for group %s because it is"
" rebalancing", self.group_id)
self.request_rejoin()
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
heartbeat_log.warning("Heartbeat failed for group %s: generation id is not "
" current.", self.group_id)
self.reset_generation()
future.failure(error_type())
elif error_type is Errors.FencedInstanceIdError:
heartbeat_log.error("Heartbeat failed for group %s due to fenced id error: %s",
self.group_id, self.group_instance_id)
future.failure(error_type((self.group_id, self.group_instance_id)))
error = error_type((self.group_id, self.group_instance_id))
elif error_type is Errors.UnknownMemberIdError:
heartbeat_log.warning("Heartbeat: local member_id was not recognized;"
" this consumer needs to re-join")
self.reset_generation()
future.failure(error_type)
elif error_type is Errors.GroupAuthorizationFailedError:
error = error_type(self.group_id)
heartbeat_log.error("Heartbeat failed: authorization error: %s", error)
future.failure(error)
else:
error = error_type()
heartbeat_log.error("Heartbeat failed: Unhandled error: %s", error)
future.failure(error)

raise error


class GroupCoordinatorMetrics:
Expand Down
15 changes: 11 additions & 4 deletions test/consumer/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,11 +694,18 @@ def test_heartbeat(mocker, coordinator):

coordinator._enable_heartbeat()
coordinator.state = MemberState.STABLE
# Replace _send_heartbeat_request with a stub returning a Future we control,
# so the heartbeat coroutine reaches the dispatch and blocks there. The
# Mock's call_count then verifies the loop fired exactly once.
# Replace _send_heartbeat_request with an async stub that suspends on a
# Future we control, so the heartbeat coroutine reaches the dispatch and
# blocks there. patch.object auto-detects the async def and would return
# an AsyncMock whose return_value=Future() is never awaited (the Future
# is just yielded as the coroutine's value, which would let the loop
# spin); using side_effect with an async function that awaits the Future
# forces the suspension we want. The Mock's call_count then verifies the
# loop fired exactly once.
blocked_send = Future()
mocker.patch.object(coordinator, '_send_heartbeat_request', return_value=blocked_send)
async def _hang(*args, **kwargs):
await blocked_send
mocker.patch.object(coordinator, '_send_heartbeat_request', side_effect=_hang)
mocker.patch.object(coordinator.heartbeat, 'should_heartbeat', return_value=True)
# Wakeup callback resolves the future on one poll cycle; the heartbeat
# coroutine resumes and reaches _send_heartbeat_request on the next.
Expand Down
Loading