diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index d4cfeb03a..358538938 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -282,51 +282,52 @@ def ensure_coordinator_ready(self, timeout_ms=None): Returns: True is coordinator found before timeout_ms, else False """ - timer = Timer(timeout_ms) - with self._client._lock, self._lock: - while self.coordinator_unknown(): - - # Prior to 0.8.2 there was no group coordinator - # so we will just pick a node at random and treat - # it as the "coordinator" - if self.config['api_version'] < (0, 8, 2): - maybe_coordinator_id = self._client.least_loaded_node() - if maybe_coordinator_id is None: - future = Future().failure(Errors.NodeNotReadyError('coordinator')) - else: - self.coordinator_id = maybe_coordinator_id - self._client.maybe_connect(self.coordinator_id) - if timer.expired: - return False - else: - continue - else: - future = self.lookup_coordinator() + with self._client._lock: + return self._manager.run(self.ensure_coordinator_ready_async, timeout_ms) - self._client.poll(future=future, timeout_ms=timer.timeout_ms) + async def ensure_coordinator_ready_async(self, timeout_ms=None): + """Async variant of :meth:`ensure_coordinator_ready`. - if not future.is_done: - return False - - if future.failed(): - if future.retriable(): - if getattr(future.exception, 'invalid_metadata', False): - log.debug('Requesting metadata for group coordinator request: %s', future.exception) - metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update, timeout_ms=timer.timeout_ms) - if not metadata_update.is_done: - return False - else: - if timeout_ms is None or timer.timeout_ms > self.config['retry_backoff_ms']: - time.sleep(self.config['retry_backoff_ms'] / 1000) - else: - time.sleep(timer.timeout_ms / 1000) - else: - raise future.exception # pylint: disable-msg=raising-bad-type - if timer.expired: - return False + Awaits until the coordinator for this group is known, or until the + timeout (if any) expires. + """ + timer = Timer(timeout_ms) + while self.coordinator_unknown(): + # Prior to 0.8.2 there was no group coordinator + # so we will just pick a node at random and treat + # it as the "coordinator" + if self.config['api_version'] < (0, 8, 2): + maybe_coordinator_id = self._client.least_loaded_node() + if maybe_coordinator_id is None: + future = Future().failure(Errors.NodeNotReadyError('coordinator')) + else: + self.coordinator_id = maybe_coordinator_id + return not timer.expired else: - return True + future = self.lookup_coordinator() + + try: + await self._manager.wait_for(future, timer.timeout_ms) + except Errors.KafkaTimeoutError: + return False + except Errors.KafkaError as exc: + if not future.retriable(): + raise + if exc.invalid_metadata: + log.debug('Requesting metadata for group coordinator request: %s', exc) + metadata_update = self._client.cluster.request_update() + try: + await self._manager.wait_for(metadata_update, timer.timeout_ms) + except Errors.KafkaTimeoutError: + return False + else: + delay_ms = self.config['retry_backoff_ms'] + if timer.timeout_ms is not None: + delay = min(delay_ms, timer.timeout_ms) + await self._manager._net.sleep(delay_ms / 1000) + if timer.expired: + return False + return True def _reset_find_coordinator_future(self, result): self._find_coordinator_future = None diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a7a1a6fbb..7eb828c2f 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -408,9 +408,13 @@ def need_rejoin(self): def refresh_committed_offsets_if_needed(self, timeout_ms=None): """Fetch committed offsets for assigned partitions.""" + with self._client._lock: + return self._manager.run(self.refresh_committed_offsets_if_needed_async, timeout_ms) + + async def refresh_committed_offsets_if_needed_async(self, timeout_ms=None): missing_fetch_positions = set(self._subscription.missing_fetch_positions()) try: - offsets = self.fetch_committed_offsets(missing_fetch_positions, timeout_ms=timeout_ms) + offsets = await self.fetch_committed_offsets_async(missing_fetch_positions, timeout_ms=timeout_ms) except Errors.KafkaTimeoutError: return False for partition, offset in offsets.items(): @@ -430,13 +434,20 @@ def fetch_committed_offsets(self, partitions, timeout_ms=None): Raises: KafkaTimeoutError if timeout_ms provided """ + if not partitions: + return {} + with self._client._lock: + return self._manager.run(self.fetch_committed_offsets_async, partitions, timeout_ms) + + async def fetch_committed_offsets_async(self, partitions, timeout_ms=None): + """Async variant of :meth:`fetch_committed_offsets`.""" if not partitions: return {} future_key = frozenset(partitions) timer = Timer(timeout_ms) while True: - if not self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms): + if not await self.ensure_coordinator_ready_async(timeout_ms=timer.timeout_ms): timer.maybe_raise() # contact coordinator to fetch committed offsets @@ -446,7 +457,13 @@ def fetch_committed_offsets(self, partitions, timeout_ms=None): future = self._send_offset_fetch_request(partitions) self._offset_fetch_futures[future_key] = future - self._client.poll(future=future, timeout_ms=timer.timeout_ms) + try: + await self._manager.wait_for(future, timer.timeout_ms) + except Errors.KafkaTimeoutError: + pass + except BaseException: + # handled below via future.is_done / retriable; cleanup happens too + pass if future.is_done: if future_key in self._offset_fetch_futures: @@ -456,13 +473,14 @@ def fetch_committed_offsets(self, partitions, timeout_ms=None): return future.value elif not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type # future failed but is retriable, or is not done yet - if timer.timeout_ms is None or timer.timeout_ms > self.config['retry_backoff_ms']: - time.sleep(self.config['retry_backoff_ms'] / 1000) - else: - time.sleep(timer.timeout_ms / 1000) + delay_ms = self.config['retry_backoff_ms'] + if timer.timeout_ms is not None: + delay_ms = min(delay_ms, timer.timeout_ms) + if delay_ms > 0: + await self._manager._net.sleep(delay_ms / 1000) timer.maybe_raise() def close(self, autocommit=True, timeout_ms=None): @@ -516,13 +534,6 @@ def commit_offsets_async(self, offsets, callback=None): future.add_callback(lambda r: functools.partial(self._do_commit_offsets_async, offsets, callback)()) if callback: future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e))) - - # ensure the commit has a chance to be transmitted (without blocking on - # its completion). Note that commits are treated as heartbeats by the - # coordinator, so there is no need to explicitly allow heartbeats - # through delayed task execution. - self._client.poll(timeout_ms=0) # no wakeup if we add that feature - return future def _do_commit_offsets_async(self, offsets, callback=None): @@ -560,26 +571,36 @@ def commit_offsets_sync(self, offsets, timeout_ms=None): self._invoke_completed_offset_commit_callbacks() if not offsets: return + with self._client._lock: + return self._manager.run(self._commit_offsets_sync_async, offsets, timeout_ms) + async def _commit_offsets_sync_async(self, offsets, timeout_ms=None): timer = Timer(timeout_ms) while True: - self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms) + await self.ensure_coordinator_ready_async(timeout_ms=timer.timeout_ms) future = self._send_offset_commit_request(offsets) - self._client.poll(future=future, timeout_ms=timer.timeout_ms) + try: + await self._manager.wait_for(future, timer.timeout_ms) + except Errors.KafkaTimeoutError: + pass + except BaseException: + # handled below via future.is_done / retriable + pass if future.is_done: if future.succeeded(): return future.value elif not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type # future failed but is retriable, or it is still pending - if timer.timeout_ms is None or timer.timeout_ms > self.config['retry_backoff_ms']: - time.sleep(self.config['retry_backoff_ms'] / 1000) - else: - time.sleep(timer.timeout_ms / 1000) + delay_ms = self.config['retry_backoff_ms'] + if timer.timeout_ms is not None: + delay_ms = min(delay_ms, timer.timeout_ms) + if delay_ms > 0: + await self._manager._net.sleep(delay_ms / 1000) timer.maybe_raise() def _maybe_auto_commit_offsets_sync(self, timeout_ms=None): @@ -626,12 +647,6 @@ def _send_offset_commit_request(self, offsets): if node_id is None: return Future().failure(Errors.CoordinatorNotAvailableError) - # Verify node is ready - if not self._client.ready(node_id, metadata_priority=False): - log.debug("Node %s not ready -- failing offset commit request", - node_id) - return Future().failure(Errors.NodeNotReadyError) - # create the offset commit request offset_data = collections.defaultdict(dict) for tp, offset in offsets.items(): @@ -858,12 +873,6 @@ def _send_offset_fetch_request(self, partitions): if node_id is None: return Future().failure(Errors.CoordinatorNotAvailableError) - # Verify node is ready - if not self._client.ready(node_id, metadata_priority=False): - log.debug("Node %s not ready -- failing offset fetch request", - node_id) - return Future().failure(Errors.NodeNotReadyError) - log.debug("Group %s fetching committed offsets for partitions: %s", self.group_id, partitions) # construct the request diff --git a/test/consumer/test_coordinator.py b/test/consumer/test_coordinator.py index b198e7b76..9f9cc50ca 100644 --- a/test/consumer/test_coordinator.py +++ b/test/consumer/test_coordinator.py @@ -238,10 +238,12 @@ def test_need_rejoin(coordinator): def test_refresh_committed_offsets_if_needed(mocker, coordinator): tp0 = TopicPartition('foobar', 0) tp1 = TopicPartition('foobar', 1) - mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets', - return_value = { - tp0: OffsetAndMetadata(123, '', -1), - tp1: OffsetAndMetadata(234, '', -1)}) + async def _fake_fetch(self, partitions, timeout_ms=None): + return { + tp0: OffsetAndMetadata(123, '', -1), + tp1: OffsetAndMetadata(234, '', -1), + } + mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets_async', _fake_fetch) coordinator._subscription.assign_from_user([tp0, tp1]) coordinator._subscription.request_offset_reset(tp0) coordinator._subscription.request_offset_reset(tp1) @@ -258,39 +260,35 @@ def test_refresh_committed_offsets_if_needed(mocker, coordinator): def test_fetch_committed_offsets(mocker, coordinator): # No partitions, no IO polling - mocker.patch.object(coordinator._client, 'poll') assert coordinator.fetch_committed_offsets([]) == {} - assert coordinator._client.poll.call_count == 0 # general case -- send offset fetch request, get successful future - mocker.patch.object(coordinator, 'ensure_coordinator_ready') + async def _ready(*args, **kwargs): + return True + mocker.patch.object(coordinator, 'ensure_coordinator_ready_async', side_effect=_ready) mocker.patch.object(coordinator, '_send_offset_fetch_request', return_value=Future().success('foobar')) partitions = [TopicPartition('foobar', 0)] ret = coordinator.fetch_committed_offsets(partitions) assert ret == 'foobar' coordinator._send_offset_fetch_request.assert_called_with(partitions) - assert coordinator._client.poll.call_count == 1 # Failed future is raised if not retriable coordinator._send_offset_fetch_request.return_value = Future().failure(AssertionError) - coordinator._client.poll.reset_mock() try: coordinator.fetch_committed_offsets(partitions) except AssertionError: pass else: assert False, 'Exception not raised when expected' - assert coordinator._client.poll.call_count == 1 - coordinator._client.poll.reset_mock() coordinator._send_offset_fetch_request.side_effect = [ Future().failure(Errors.RequestTimedOutError), Future().success('fizzbuzz')] ret = coordinator.fetch_committed_offsets(partitions) assert ret == 'fizzbuzz' - assert coordinator._client.poll.call_count == 2 # call + retry + assert coordinator._send_offset_fetch_request.call_count == 4 # successful, failed, retried+success def test_close(mocker, coordinator): @@ -333,41 +331,35 @@ def test_commit_offsets_async(mocker, coordinator, offsets): def test_commit_offsets_sync(mocker, coordinator, offsets): - mocker.patch.object(coordinator, 'ensure_coordinator_ready') + async def _ready(*args, **kwargs): + return True + mocker.patch.object(coordinator, 'ensure_coordinator_ready_async', side_effect=_ready) mocker.patch.object(coordinator, '_send_offset_commit_request', return_value=Future().success('fizzbuzz')) - cli = coordinator._client - mocker.patch.object(cli, 'poll') # No offsets, no calls assert coordinator.commit_offsets_sync({}) is None assert coordinator._send_offset_commit_request.call_count == 0 - assert cli.poll.call_count == 0 ret = coordinator.commit_offsets_sync(offsets) assert coordinator._send_offset_commit_request.call_count == 1 - assert cli.poll.call_count == 1 assert ret == 'fizzbuzz' # Failed future is raised if not retriable coordinator._send_offset_commit_request.return_value = Future().failure(AssertionError) - coordinator._client.poll.reset_mock() try: coordinator.commit_offsets_sync(offsets) except AssertionError: pass else: assert False, 'Exception not raised when expected' - assert coordinator._client.poll.call_count == 1 - coordinator._client.poll.reset_mock() coordinator._send_offset_commit_request.side_effect = [ Future().failure(Errors.RequestTimedOutError), Future().success('fizzbuzz')] ret = coordinator.commit_offsets_sync(offsets) assert ret == 'fizzbuzz' - assert coordinator._client.poll.call_count == 2 # call + retry @pytest.mark.parametrize(