Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 43 additions & 42 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,51 +282,52 @@ def ensure_coordinator_ready(self, timeout_ms=None):

Returns: True is coordinator found before timeout_ms, else False
"""
timer = Timer(timeout_ms)
with self._client._lock, self._lock:
while self.coordinator_unknown():

# Prior to 0.8.2 there was no group coordinator
# so we will just pick a node at random and treat
# it as the "coordinator"
if self.config['api_version'] < (0, 8, 2):
maybe_coordinator_id = self._client.least_loaded_node()
if maybe_coordinator_id is None:
future = Future().failure(Errors.NodeNotReadyError('coordinator'))
else:
self.coordinator_id = maybe_coordinator_id
self._client.maybe_connect(self.coordinator_id)
if timer.expired:
return False
else:
continue
else:
future = self.lookup_coordinator()
with self._client._lock:
return self._manager.run(self.ensure_coordinator_ready_async, timeout_ms)

self._client.poll(future=future, timeout_ms=timer.timeout_ms)
async def ensure_coordinator_ready_async(self, timeout_ms=None):
"""Async variant of :meth:`ensure_coordinator_ready`.

if not future.is_done:
return False

if future.failed():
if future.retriable():
if getattr(future.exception, 'invalid_metadata', False):
log.debug('Requesting metadata for group coordinator request: %s', future.exception)
metadata_update = self._client.cluster.request_update()
self._client.poll(future=metadata_update, timeout_ms=timer.timeout_ms)
if not metadata_update.is_done:
return False
else:
if timeout_ms is None or timer.timeout_ms > self.config['retry_backoff_ms']:
time.sleep(self.config['retry_backoff_ms'] / 1000)
else:
time.sleep(timer.timeout_ms / 1000)
else:
raise future.exception # pylint: disable-msg=raising-bad-type
if timer.expired:
return False
Awaits until the coordinator for this group is known, or until the
timeout (if any) expires.
"""
timer = Timer(timeout_ms)
while self.coordinator_unknown():
# Prior to 0.8.2 there was no group coordinator
# so we will just pick a node at random and treat
# it as the "coordinator"
if self.config['api_version'] < (0, 8, 2):
maybe_coordinator_id = self._client.least_loaded_node()
if maybe_coordinator_id is None:
future = Future().failure(Errors.NodeNotReadyError('coordinator'))
else:
self.coordinator_id = maybe_coordinator_id
return not timer.expired
else:
return True
future = self.lookup_coordinator()

try:
await self._manager.wait_for(future, timer.timeout_ms)
except Errors.KafkaTimeoutError:
return False
except Errors.KafkaError as exc:
if not future.retriable():
raise
if exc.invalid_metadata:
log.debug('Requesting metadata for group coordinator request: %s', exc)
metadata_update = self._client.cluster.request_update()
try:
await self._manager.wait_for(metadata_update, timer.timeout_ms)
except Errors.KafkaTimeoutError:
return False
else:
delay_ms = self.config['retry_backoff_ms']
if timer.timeout_ms is not None:
delay = min(delay_ms, timer.timeout_ms)
await self._manager._net.sleep(delay_ms / 1000)
if timer.expired:
return False
return True

def _reset_find_coordinator_future(self, result):
self._find_coordinator_future = None
Expand Down
77 changes: 43 additions & 34 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,13 @@ def need_rejoin(self):

def refresh_committed_offsets_if_needed(self, timeout_ms=None):
"""Fetch committed offsets for assigned partitions."""
with self._client._lock:
return self._manager.run(self.refresh_committed_offsets_if_needed_async, timeout_ms)

async def refresh_committed_offsets_if_needed_async(self, timeout_ms=None):
missing_fetch_positions = set(self._subscription.missing_fetch_positions())
try:
offsets = self.fetch_committed_offsets(missing_fetch_positions, timeout_ms=timeout_ms)
offsets = await self.fetch_committed_offsets_async(missing_fetch_positions, timeout_ms=timeout_ms)
except Errors.KafkaTimeoutError:
return False
for partition, offset in offsets.items():
Expand All @@ -430,13 +434,20 @@ def fetch_committed_offsets(self, partitions, timeout_ms=None):
Raises:
KafkaTimeoutError if timeout_ms provided
"""
if not partitions:
return {}
with self._client._lock:
return self._manager.run(self.fetch_committed_offsets_async, partitions, timeout_ms)

async def fetch_committed_offsets_async(self, partitions, timeout_ms=None):
"""Async variant of :meth:`fetch_committed_offsets`."""
if not partitions:
return {}

future_key = frozenset(partitions)
timer = Timer(timeout_ms)
while True:
if not self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms):
if not await self.ensure_coordinator_ready_async(timeout_ms=timer.timeout_ms):
timer.maybe_raise()

# contact coordinator to fetch committed offsets
Expand All @@ -446,7 +457,13 @@ def fetch_committed_offsets(self, partitions, timeout_ms=None):
future = self._send_offset_fetch_request(partitions)
self._offset_fetch_futures[future_key] = future

self._client.poll(future=future, timeout_ms=timer.timeout_ms)
try:
await self._manager.wait_for(future, timer.timeout_ms)
except Errors.KafkaTimeoutError:
pass
except BaseException:
# handled below via future.is_done / retriable; cleanup happens too
pass

if future.is_done:
if future_key in self._offset_fetch_futures:
Expand All @@ -456,13 +473,14 @@ def fetch_committed_offsets(self, partitions, timeout_ms=None):
return future.value

elif not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
raise future.exception # pylint: disable-msg=raising-bad-type

# future failed but is retriable, or is not done yet
if timer.timeout_ms is None or timer.timeout_ms > self.config['retry_backoff_ms']:
time.sleep(self.config['retry_backoff_ms'] / 1000)
else:
time.sleep(timer.timeout_ms / 1000)
delay_ms = self.config['retry_backoff_ms']
if timer.timeout_ms is not None:
delay_ms = min(delay_ms, timer.timeout_ms)
if delay_ms > 0:
await self._manager._net.sleep(delay_ms / 1000)
timer.maybe_raise()

def close(self, autocommit=True, timeout_ms=None):
Expand Down Expand Up @@ -516,13 +534,6 @@ def commit_offsets_async(self, offsets, callback=None):
future.add_callback(lambda r: functools.partial(self._do_commit_offsets_async, offsets, callback)())
if callback:
future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e)))

# ensure the commit has a chance to be transmitted (without blocking on
# its completion). Note that commits are treated as heartbeats by the
# coordinator, so there is no need to explicitly allow heartbeats
# through delayed task execution.
self._client.poll(timeout_ms=0) # no wakeup if we add that feature

return future

def _do_commit_offsets_async(self, offsets, callback=None):
Expand Down Expand Up @@ -560,26 +571,36 @@ def commit_offsets_sync(self, offsets, timeout_ms=None):
self._invoke_completed_offset_commit_callbacks()
if not offsets:
return
with self._client._lock:
return self._manager.run(self._commit_offsets_sync_async, offsets, timeout_ms)

async def _commit_offsets_sync_async(self, offsets, timeout_ms=None):
timer = Timer(timeout_ms)
while True:
self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms)
await self.ensure_coordinator_ready_async(timeout_ms=timer.timeout_ms)

future = self._send_offset_commit_request(offsets)
self._client.poll(future=future, timeout_ms=timer.timeout_ms)
try:
await self._manager.wait_for(future, timer.timeout_ms)
except Errors.KafkaTimeoutError:
pass
except BaseException:
# handled below via future.is_done / retriable
pass

if future.is_done:
if future.succeeded():
return future.value

elif not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
raise future.exception # pylint: disable-msg=raising-bad-type

# future failed but is retriable, or it is still pending
if timer.timeout_ms is None or timer.timeout_ms > self.config['retry_backoff_ms']:
time.sleep(self.config['retry_backoff_ms'] / 1000)
else:
time.sleep(timer.timeout_ms / 1000)
delay_ms = self.config['retry_backoff_ms']
if timer.timeout_ms is not None:
delay_ms = min(delay_ms, timer.timeout_ms)
if delay_ms > 0:
await self._manager._net.sleep(delay_ms / 1000)
timer.maybe_raise()

def _maybe_auto_commit_offsets_sync(self, timeout_ms=None):
Expand Down Expand Up @@ -626,12 +647,6 @@ def _send_offset_commit_request(self, offsets):
if node_id is None:
return Future().failure(Errors.CoordinatorNotAvailableError)

# Verify node is ready
if not self._client.ready(node_id, metadata_priority=False):
log.debug("Node %s not ready -- failing offset commit request",
node_id)
return Future().failure(Errors.NodeNotReadyError)

# create the offset commit request
offset_data = collections.defaultdict(dict)
for tp, offset in offsets.items():
Expand Down Expand Up @@ -858,12 +873,6 @@ def _send_offset_fetch_request(self, partitions):
if node_id is None:
return Future().failure(Errors.CoordinatorNotAvailableError)

# Verify node is ready
if not self._client.ready(node_id, metadata_priority=False):
log.debug("Node %s not ready -- failing offset fetch request",
node_id)
return Future().failure(Errors.NodeNotReadyError)

log.debug("Group %s fetching committed offsets for partitions: %s",
self.group_id, partitions)
# construct the request
Expand Down
34 changes: 13 additions & 21 deletions test/consumer/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,12 @@ def test_need_rejoin(coordinator):
def test_refresh_committed_offsets_if_needed(mocker, coordinator):
tp0 = TopicPartition('foobar', 0)
tp1 = TopicPartition('foobar', 1)
mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets',
return_value = {
tp0: OffsetAndMetadata(123, '', -1),
tp1: OffsetAndMetadata(234, '', -1)})
async def _fake_fetch(self, partitions, timeout_ms=None):
return {
tp0: OffsetAndMetadata(123, '', -1),
tp1: OffsetAndMetadata(234, '', -1),
}
mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets_async', _fake_fetch)
coordinator._subscription.assign_from_user([tp0, tp1])
coordinator._subscription.request_offset_reset(tp0)
coordinator._subscription.request_offset_reset(tp1)
Expand All @@ -258,39 +260,35 @@ def test_refresh_committed_offsets_if_needed(mocker, coordinator):
def test_fetch_committed_offsets(mocker, coordinator):

# No partitions, no IO polling
mocker.patch.object(coordinator._client, 'poll')
assert coordinator.fetch_committed_offsets([]) == {}
assert coordinator._client.poll.call_count == 0

# general case -- send offset fetch request, get successful future
mocker.patch.object(coordinator, 'ensure_coordinator_ready')
async def _ready(*args, **kwargs):
return True
mocker.patch.object(coordinator, 'ensure_coordinator_ready_async', side_effect=_ready)
mocker.patch.object(coordinator, '_send_offset_fetch_request',
return_value=Future().success('foobar'))
partitions = [TopicPartition('foobar', 0)]
ret = coordinator.fetch_committed_offsets(partitions)
assert ret == 'foobar'
coordinator._send_offset_fetch_request.assert_called_with(partitions)
assert coordinator._client.poll.call_count == 1

# Failed future is raised if not retriable
coordinator._send_offset_fetch_request.return_value = Future().failure(AssertionError)
coordinator._client.poll.reset_mock()
try:
coordinator.fetch_committed_offsets(partitions)
except AssertionError:
pass
else:
assert False, 'Exception not raised when expected'
assert coordinator._client.poll.call_count == 1

coordinator._client.poll.reset_mock()
coordinator._send_offset_fetch_request.side_effect = [
Future().failure(Errors.RequestTimedOutError),
Future().success('fizzbuzz')]

ret = coordinator.fetch_committed_offsets(partitions)
assert ret == 'fizzbuzz'
assert coordinator._client.poll.call_count == 2 # call + retry
assert coordinator._send_offset_fetch_request.call_count == 4 # successful, failed, retried+success


def test_close(mocker, coordinator):
Expand Down Expand Up @@ -333,41 +331,35 @@ def test_commit_offsets_async(mocker, coordinator, offsets):


def test_commit_offsets_sync(mocker, coordinator, offsets):
mocker.patch.object(coordinator, 'ensure_coordinator_ready')
async def _ready(*args, **kwargs):
return True
mocker.patch.object(coordinator, 'ensure_coordinator_ready_async', side_effect=_ready)
mocker.patch.object(coordinator, '_send_offset_commit_request',
return_value=Future().success('fizzbuzz'))
cli = coordinator._client
mocker.patch.object(cli, 'poll')

# No offsets, no calls
assert coordinator.commit_offsets_sync({}) is None
assert coordinator._send_offset_commit_request.call_count == 0
assert cli.poll.call_count == 0

ret = coordinator.commit_offsets_sync(offsets)
assert coordinator._send_offset_commit_request.call_count == 1
assert cli.poll.call_count == 1
assert ret == 'fizzbuzz'

# Failed future is raised if not retriable
coordinator._send_offset_commit_request.return_value = Future().failure(AssertionError)
coordinator._client.poll.reset_mock()
try:
coordinator.commit_offsets_sync(offsets)
except AssertionError:
pass
else:
assert False, 'Exception not raised when expected'
assert coordinator._client.poll.call_count == 1

coordinator._client.poll.reset_mock()
coordinator._send_offset_commit_request.side_effect = [
Future().failure(Errors.RequestTimedOutError),
Future().success('fizzbuzz')]

ret = coordinator.commit_offsets_sync(offsets)
assert ret == 'fizzbuzz'
assert coordinator._client.poll.call_count == 2 # call + retry


@pytest.mark.parametrize(
Expand Down
Loading