Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def __init__(self, client, subscriptions, **configs):

self._client = client
self._manager = client._manager
self._net = self._manager._net
self._subscriptions = subscriptions
self._completed_fetches = collections.deque() # Unparsed responses
self._next_partition_records = None # Holds a single PartitionRecords until fully consumed
Expand Down Expand Up @@ -200,7 +201,7 @@ def _wake(_):
# retires HeartbeatThread.
try:
with self._client._lock:
self._manager.run(self._manager.wait_for, wakeup, timeout_ms)
self._net.run(self._manager.wait_for, wakeup, timeout_ms)
except Errors.KafkaTimeoutError:
pass

Expand Down Expand Up @@ -301,7 +302,7 @@ def offsets_by_times(self, timestamps, timeout_ms=None):
KafkaTimeoutError if timeout_ms provided
"""
with self._client._lock:
offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
offsets = self._net.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
for tp in timestamps:
if tp not in offsets:
offsets[tp] = None
Expand Down Expand Up @@ -427,7 +428,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms=None):
"""
timestamps = dict([(tp, timestamp) for tp in partitions])
with self._client._lock:
offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
offsets = self._net.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
for tp in timestamps:
offsets[tp] = offsets[tp].offset
return offsets
Expand Down
7 changes: 4 additions & 3 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ def __init__(self, *topics, **configs):
self._client = self.config['kafka_client'](metrics=self._metrics, **self.config)
self._manager = self._client._manager
self._cluster = self._manager.cluster
self._net = self._manager._net

# If api_version was not passed explicitly, bootstrap to auto-discover
# it. bootstrap is passed as a deferred coroutine so that once the IO
Expand Down Expand Up @@ -620,11 +621,11 @@ def _fetch_all_topic_metadata(self):
"""
if self._cluster.metadata_refresh_in_progress:
future = self._cluster.request_update()
self._manager.run(self._manager.wait_for, future, None)
self._net.run(self._manager.wait_for, future, None)
stash = self._cluster.need_all_topic_metadata
self._cluster.need_all_topic_metadata = True
future = self._cluster.request_update()
self._manager.run(self._manager.wait_for, future, None)
self._net.run(self._manager.wait_for, future, None)
self._cluster.need_all_topic_metadata = stash

def topics(self):
Expand Down Expand Up @@ -760,7 +761,7 @@ def position(self, partition, timeout_ms=None):
reset_task = self._fetcher.reset_offsets_if_needed(timeout_ms=timer.timeout_ms)
if reset_task is not None and not timer.expired:
try:
self._manager.run(self._manager.wait_for, reset_task, timer.timeout_ms)
self._net.run(self._manager.wait_for, reset_task, timer.timeout_ms)
except Errors.KafkaTimeoutError:
pass
position = self._subscription.assignment[partition].position
Expand Down
7 changes: 4 additions & 3 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def __init__(self, client, **configs):
self._client = client
self._manager = client._manager
self._cluster = self._manager.cluster
self._net = self._manager._net
self.heartbeat = Heartbeat(**self.config)
self._heartbeat_wakeup = WakeupNotifier(self._manager._net)
self._heartbeat_loop_future = None
Expand Down Expand Up @@ -292,7 +293,7 @@ def ensure_coordinator_ready(self, timeout_ms=None):
Returns: True is coordinator found before timeout_ms, else False
"""
with self._client._lock:
return self._manager.run(self.ensure_coordinator_ready_async, timeout_ms)
return self._net.run(self.ensure_coordinator_ready_async, timeout_ms)

async def ensure_coordinator_ready_async(self, timeout_ms=None):
"""Async variant of :meth:`ensure_coordinator_ready`.
Expand Down Expand Up @@ -413,7 +414,7 @@ def ensure_active_group(self, timeout_ms=None):
Returns: True if group initialized before timeout_ms, else False
"""
with self._client._lock:
return self._manager.run(self.ensure_active_group_async, timeout_ms)
return self._net.run(self.ensure_active_group_async, timeout_ms)

async def ensure_active_group_async(self, timeout_ms=None):
"""Async variant of :meth:`ensure_active_group`."""
Expand Down Expand Up @@ -949,7 +950,7 @@ def is_dynamic_member(self):
def maybe_leave_group(self, reason=None, timeout_ms=None):
"""Leave the current group and reset local generation/member_id."""
with self._client._lock:
return self._manager.run(self.maybe_leave_group_async, reason, timeout_ms)
return self._net.run(self.maybe_leave_group_async, reason, timeout_ms)

async def maybe_leave_group_async(self, reason=None, timeout_ms=None):
if not self._use_group_apis:
Expand Down
12 changes: 6 additions & 6 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def poll(self, timeout_ms=None):
if self._subscription.subscribed_pattern:
metadata_update = self._cluster.request_update()
try:
self._manager.run(
self._net.run(
self._manager.wait_for, metadata_update, timer.timeout_ms)
except Errors.KafkaTimeoutError:
log.debug('coordinator.poll: timeout updating metadata; returning early')
Expand Down Expand Up @@ -460,7 +460,7 @@ def need_rejoin(self):
def refresh_committed_offsets_if_needed(self, timeout_ms=None):
"""Fetch committed offsets for assigned partitions."""
with self._client._lock:
return self._manager.run(self.refresh_committed_offsets_if_needed_async, timeout_ms)
return self._net.run(self.refresh_committed_offsets_if_needed_async, timeout_ms)

async def refresh_committed_offsets_if_needed_async(self, timeout_ms=None):
missing_fetch_positions = set(self._subscription.missing_fetch_positions())
Expand Down Expand Up @@ -488,7 +488,7 @@ def fetch_committed_offsets(self, partitions, timeout_ms=None):
if not partitions:
return {}
with self._client._lock:
return self._manager.run(self.fetch_committed_offsets_async, partitions, timeout_ms)
return self._net.run(self.fetch_committed_offsets_async, partitions, timeout_ms)

async def fetch_committed_offsets_async(self, partitions, timeout_ms=None):
"""Async variant of :meth:`fetch_committed_offsets`."""
Expand Down Expand Up @@ -531,7 +531,7 @@ async def fetch_committed_offsets_async(self, partitions, timeout_ms=None):
if timer.timeout_ms is not None:
delay_ms = min(delay_ms, timer.timeout_ms)
if delay_ms > 0:
await self._manager._net.sleep(delay_ms / 1000)
await self._net.sleep(delay_ms / 1000)
timer.maybe_raise()

def close(self, autocommit=True, timeout_ms=None):
Expand Down Expand Up @@ -621,7 +621,7 @@ def commit_offsets_sync(self, offsets, timeout_ms=None):
offsets.values()))
self._invoke_completed_offset_commit_callbacks()
with self._client._lock:
return self._manager.run(self._commit_offsets_sync_async, offsets, timeout_ms)
return self._net.run(self._commit_offsets_sync_async, offsets, timeout_ms)

async def _commit_offsets_sync_async(self, offsets, timeout_ms=None):
if not offsets:
Expand Down Expand Up @@ -651,7 +651,7 @@ async def _commit_offsets_sync_async(self, offsets, timeout_ms=None):
if timer.timeout_ms is not None:
delay_ms = min(delay_ms, timer.timeout_ms)
if delay_ms > 0:
await self._manager._net.sleep(delay_ms / 1000)
await self._net.sleep(delay_ms / 1000)
timer.maybe_raise()

def _maybe_auto_commit_offsets_sync(self, timeout_ms=None):
Expand Down
2 changes: 1 addition & 1 deletion kafka/net/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def check_version(self, node_id=None, timeout_ms=10000):
async def _check_version(broker_id, timeout_ms):
conn = await self._manager.get_connection(broker_id, timeout_ms=timeout_ms)
return conn.broker_version
return self._manager.run(_check_version, node_id, timeout_ms)
return self._net.run(_check_version, node_id, timeout_ms)

# Request sending

Expand Down
2 changes: 1 addition & 1 deletion kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def bootstrap_async(self, timeout_ms=None):
return self._bootstrap_future

def bootstrap(self, timeout_ms=None):
self.run(self.bootstrap_async, timeout_ms)
self._net.run(self.bootstrap_async, timeout_ms)

@property
def bootstrapped(self):
Expand Down
Loading