diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 358538938..98690b0a0 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -557,36 +557,21 @@ def _send_join_group_request(self): # send a join group request to the coordinator log.info("(Re-)joining group %s", self.group_id) - version = self._client.api_version(JoinGroupRequest, max_version=5) - if version == 0: - request = JoinGroupRequest[version]( - self.group_id, - self.config['session_timeout_ms'], - self._generation.member_id, - self.protocol_type(), - self.group_protocols()) - elif version <= 4: - request = JoinGroupRequest[version]( - self.group_id, - self.config['session_timeout_ms'], - self.config['max_poll_interval_ms'], - self._generation.member_id, - self.protocol_type(), - self.group_protocols()) - else: - request = JoinGroupRequest[version]( - self.group_id, - self.config['session_timeout_ms'], - self.config['max_poll_interval_ms'], - self._generation.member_id, - self.group_instance_id, - self.protocol_type(), - self.group_protocols()) + max_version = 6 + request = JoinGroupRequest( + group_id=self.group_id, + session_timeout_ms=self.config['session_timeout_ms'], + rebalance_timeout_ms=self.config['max_poll_interval_ms'], + member_id=self._generation.member_id, + group_instance_id=self.group_instance_id, + protocol_type=self.protocol_type(), + protocols=self.group_protocols(), + max_version=max_version) # create the request for the coordinator log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) future = Future() - _f = self._client.send(self.coordinator_id, request) + _f = self._manager.send(request, node_id=self.coordinator_id) _f.add_callback(self._handle_join_group_response, future, time.monotonic()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) @@ -604,7 +589,8 @@ def _failed_request(self, node_id, request, future, error): else: log.debug('Error sending %s to node %s [%s]', request.__class__.__name__, node_id, error) - future.failure(error) + if future is not None: + future.failure(error) def _handle_join_group_response(self, future, send_time, response): log.debug("Received JoinGroup response: %s", response) @@ -684,20 +670,14 @@ def _handle_join_group_response(self, future, send_time, response): def _on_join_follower(self): # send follower's sync group with an empty assignment - version = self._client.api_version(SyncGroupRequest, max_version=3) - if version <= 2: - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - []) - else: - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - self.group_instance_id, - []) + max_version = 4 + request = SyncGroupRequest( + group_id=self.group_id, + generation_id=self._generation.generation_id, + member_id=self._generation.member_id, + group_instance_id=self.group_instance_id, + assignments=[], + max_version=max_version) log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) return self._send_sync_group_request(request) @@ -720,20 +700,14 @@ def _on_join_leader(self, response): except Exception as e: return Future().failure(e) - version = self._client.api_version(SyncGroupRequest, max_version=3) - if version <= 2: - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - group_assignment.items()) - else: - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - self.group_instance_id, - group_assignment.items()) + max_version = 4 + request = SyncGroupRequest( + group_id=self.group_id, + generation_id=self._generation.generation_id, + member_id=self._generation.member_id, + group_instance_id=self.group_instance_id, + assignments=group_assignment.items(), + max_version=max_version) log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) return self._send_sync_group_request(request) @@ -750,7 +724,7 @@ def _send_sync_group_request(self, request): # itself requests a metadata update future = Future() - _f = self._client.send(self.coordinator_id, request) + _f = self._manager.send(request, node_id=self.coordinator_id) _f.add_callback(self._handle_sync_group_response, future, time.monotonic()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) @@ -808,15 +782,14 @@ def _send_group_coordinator_request(self): e = Errors.NodeNotReadyError(node_id) return Future().failure(e) - version = self._client.api_version(FindCoordinatorRequest, max_version=2) - if version == 0: - request = FindCoordinatorRequest[version](self.group_id) - else: - request = FindCoordinatorRequest[version](self.group_id, 0) + max_version = 3 + request = FindCoordinatorRequest( + key=self.group_id, + max_version=max_version) log.debug("Sending group coordinator request for group %s to broker %s: %s", self.group_id, node_id, request) future = Future() - _f = self._client.send(node_id, request) + _f = self._manager.send(request, node_id=node_id) _f.add_callback(self._handle_find_coordinator_response, future) _f.add_errback(self._failed_request, node_id, request, future) return future @@ -985,7 +958,7 @@ async def _do_heartbeat(self): heartbeat_log.debug('Sending heartbeat for group %s %s', self.group_id, self._generation) self.heartbeat.sent_heartbeat() try: - response = await self._send_heartbeat_request() + await self._send_heartbeat_request() heartbeat_log.debug('Heartbeat success') self.heartbeat.received_heartbeat() except Errors.KafkaError as exc: @@ -1074,80 +1047,64 @@ def _handle_leave_group_response(self, response): log.error("LeaveGroup request for member %s / group instance %s failed with error: %s", member.member_id, member.group_instance_id, error_type()) - def _send_heartbeat_request(self): + async def _send_heartbeat_request(self): """Send a heartbeat request""" if self.coordinator_unknown(): - e = Errors.CoordinatorNotAvailableError(self.coordinator_id) - return Future().failure(e) - - elif not self._client.ready(self.coordinator_id, metadata_priority=False): - e = Errors.NodeNotReadyError(self.coordinator_id) - return Future().failure(e) - - version = self._client.api_version(HeartbeatRequest, max_version=3) - if version <=2: - request = HeartbeatRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - ) - else: - request = HeartbeatRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - self.group_instance_id, - ) + raise Errors.CoordinatorNotAvailableError(self.coordinator_id) + + request = HeartbeatRequest( + group_id=self.group_id, + generation_id=self._generation.generation_id, + member_id=self._generation.member_id, + group_instance_id=self.group_instance_id, + ) heartbeat_log.debug("Sending HeartbeatRequest to %s: %s", self.coordinator_id, request) - future = Future() - _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_heartbeat_response, future, time.monotonic()) - _f.add_errback(self._failed_request, self.coordinator_id, - request, future) - return future + try: + send_time = time.monotonic() + response = await self._manager.send(request, node_id=self.coordinator_id) + return self._handle_heartbeat_response(response, send_time) + except Errors.KafkaError as exc: + self._failed_request(self.coordinator_id, request, None, exc) + raise - def _handle_heartbeat_response(self, future, send_time, response): + def _handle_heartbeat_response(self, response, send_time): if self._sensors: self._sensors.heartbeat_latency.record((time.monotonic() - send_time) * 1000) heartbeat_log.debug("Received heartbeat response for group %s: %s", self.group_id, response) error_type = Errors.for_code(response.error_code) + error = error_type() if error_type is Errors.NoError: - future.success(None) + return elif error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError): heartbeat_log.warning("Heartbeat failed for group %s: coordinator (node %s)" " is either not started or not valid", self.group_id, self.coordinator_id) - self.coordinator_dead(error_type()) - future.failure(error_type()) + self.coordinator_dead(error) elif error_type is Errors.RebalanceInProgressError: heartbeat_log.warning("Heartbeat failed for group %s because it is" " rebalancing", self.group_id) self.request_rejoin() - future.failure(error_type()) elif error_type is Errors.IllegalGenerationError: heartbeat_log.warning("Heartbeat failed for group %s: generation id is not " " current.", self.group_id) self.reset_generation() - future.failure(error_type()) elif error_type is Errors.FencedInstanceIdError: heartbeat_log.error("Heartbeat failed for group %s due to fenced id error: %s", self.group_id, self.group_instance_id) - future.failure(error_type((self.group_id, self.group_instance_id))) + error = error_type((self.group_id, self.group_instance_id)) elif error_type is Errors.UnknownMemberIdError: heartbeat_log.warning("Heartbeat: local member_id was not recognized;" " this consumer needs to re-join") self.reset_generation() - future.failure(error_type) elif error_type is Errors.GroupAuthorizationFailedError: error = error_type(self.group_id) heartbeat_log.error("Heartbeat failed: authorization error: %s", error) - future.failure(error) else: - error = error_type() heartbeat_log.error("Heartbeat failed: Unhandled error: %s", error) - future.failure(error) + + raise error class GroupCoordinatorMetrics: diff --git a/test/consumer/test_coordinator.py b/test/consumer/test_coordinator.py index 22051a743..62f2fe390 100644 --- a/test/consumer/test_coordinator.py +++ b/test/consumer/test_coordinator.py @@ -694,11 +694,18 @@ def test_heartbeat(mocker, coordinator): coordinator._enable_heartbeat() coordinator.state = MemberState.STABLE - # Replace _send_heartbeat_request with a stub returning a Future we control, - # so the heartbeat coroutine reaches the dispatch and blocks there. The - # Mock's call_count then verifies the loop fired exactly once. + # Replace _send_heartbeat_request with an async stub that suspends on a + # Future we control, so the heartbeat coroutine reaches the dispatch and + # blocks there. patch.object auto-detects the async def and would return + # an AsyncMock whose return_value=Future() is never awaited (the Future + # is just yielded as the coroutine's value, which would let the loop + # spin); using side_effect with an async function that awaits the Future + # forces the suspension we want. The Mock's call_count then verifies the + # loop fired exactly once. blocked_send = Future() - mocker.patch.object(coordinator, '_send_heartbeat_request', return_value=blocked_send) + async def _hang(*args, **kwargs): + await blocked_send + mocker.patch.object(coordinator, '_send_heartbeat_request', side_effect=_hang) mocker.patch.object(coordinator.heartbeat, 'should_heartbeat', return_value=True) # Wakeup callback resolves the future on one poll cycle; the heartbeat # coroutine resumes and reaches _send_heartbeat_request on the next.