diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e316ad577..c3691b5ca 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -117,6 +117,7 @@ def __init__(self, client, subscriptions, **configs): self._client = client self._manager = client._manager + self._net = self._manager._net self._subscriptions = subscriptions self._completed_fetches = collections.deque() # Unparsed responses self._next_partition_records = None # Holds a single PartitionRecords until fully consumed @@ -200,7 +201,7 @@ def _wake(_): # retires HeartbeatThread. try: with self._client._lock: - self._manager.run(self._manager.wait_for, wakeup, timeout_ms) + self._net.run(self._manager.wait_for, wakeup, timeout_ms) except Errors.KafkaTimeoutError: pass @@ -301,7 +302,7 @@ def offsets_by_times(self, timestamps, timeout_ms=None): KafkaTimeoutError if timeout_ms provided """ with self._client._lock: - offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) + offsets = self._net.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) for tp in timestamps: if tp not in offsets: offsets[tp] = None @@ -427,7 +428,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms=None): """ timestamps = dict([(tp, timestamp) for tp in partitions]) with self._client._lock: - offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) + offsets = self._net.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms) for tp in timestamps: offsets[tp] = offsets[tp].offset return offsets diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 4572420f6..c9e5b9f4b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -384,6 +384,7 @@ def __init__(self, *topics, **configs): self._client = self.config['kafka_client'](metrics=self._metrics, **self.config) self._manager = self._client._manager self._cluster = self._manager.cluster + self._net = self._manager._net # If api_version was not passed explicitly, bootstrap to auto-discover # it. bootstrap is passed as a deferred coroutine so that once the IO @@ -620,11 +621,11 @@ def _fetch_all_topic_metadata(self): """ if self._cluster.metadata_refresh_in_progress: future = self._cluster.request_update() - self._manager.run(self._manager.wait_for, future, None) + self._net.run(self._manager.wait_for, future, None) stash = self._cluster.need_all_topic_metadata self._cluster.need_all_topic_metadata = True future = self._cluster.request_update() - self._manager.run(self._manager.wait_for, future, None) + self._net.run(self._manager.wait_for, future, None) self._cluster.need_all_topic_metadata = stash def topics(self): @@ -760,7 +761,7 @@ def position(self, partition, timeout_ms=None): reset_task = self._fetcher.reset_offsets_if_needed(timeout_ms=timer.timeout_ms) if reset_task is not None and not timer.expired: try: - self._manager.run(self._manager.wait_for, reset_task, timer.timeout_ms) + self._net.run(self._manager.wait_for, reset_task, timer.timeout_ms) except Errors.KafkaTimeoutError: pass position = self._subscription.assignment[partition].position diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 28a33e5f0..ffce33d57 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -141,6 +141,7 @@ def __init__(self, client, **configs): self._client = client self._manager = client._manager self._cluster = self._manager.cluster + self._net = self._manager._net self.heartbeat = Heartbeat(**self.config) self._heartbeat_wakeup = WakeupNotifier(self._manager._net) self._heartbeat_loop_future = None @@ -292,7 +293,7 @@ def ensure_coordinator_ready(self, timeout_ms=None): Returns: True is coordinator found before timeout_ms, else False """ with self._client._lock: - return self._manager.run(self.ensure_coordinator_ready_async, timeout_ms) + return self._net.run(self.ensure_coordinator_ready_async, timeout_ms) async def ensure_coordinator_ready_async(self, timeout_ms=None): """Async variant of :meth:`ensure_coordinator_ready`. @@ -413,7 +414,7 @@ def ensure_active_group(self, timeout_ms=None): Returns: True if group initialized before timeout_ms, else False """ with self._client._lock: - return self._manager.run(self.ensure_active_group_async, timeout_ms) + return self._net.run(self.ensure_active_group_async, timeout_ms) async def ensure_active_group_async(self, timeout_ms=None): """Async variant of :meth:`ensure_active_group`.""" @@ -949,7 +950,7 @@ def is_dynamic_member(self): def maybe_leave_group(self, reason=None, timeout_ms=None): """Leave the current group and reset local generation/member_id.""" with self._client._lock: - return self._manager.run(self.maybe_leave_group_async, reason, timeout_ms) + return self._net.run(self.maybe_leave_group_async, reason, timeout_ms) async def maybe_leave_group_async(self, reason=None, timeout_ms=None): if not self._use_group_apis: diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a4c39a51d..9416d3836 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -327,7 +327,7 @@ def poll(self, timeout_ms=None): if self._subscription.subscribed_pattern: metadata_update = self._cluster.request_update() try: - self._manager.run( + self._net.run( self._manager.wait_for, metadata_update, timer.timeout_ms) except Errors.KafkaTimeoutError: log.debug('coordinator.poll: timeout updating metadata; returning early') @@ -460,7 +460,7 @@ def need_rejoin(self): def refresh_committed_offsets_if_needed(self, timeout_ms=None): """Fetch committed offsets for assigned partitions.""" with self._client._lock: - return self._manager.run(self.refresh_committed_offsets_if_needed_async, timeout_ms) + return self._net.run(self.refresh_committed_offsets_if_needed_async, timeout_ms) async def refresh_committed_offsets_if_needed_async(self, timeout_ms=None): missing_fetch_positions = set(self._subscription.missing_fetch_positions()) @@ -488,7 +488,7 @@ def fetch_committed_offsets(self, partitions, timeout_ms=None): if not partitions: return {} with self._client._lock: - return self._manager.run(self.fetch_committed_offsets_async, partitions, timeout_ms) + return self._net.run(self.fetch_committed_offsets_async, partitions, timeout_ms) async def fetch_committed_offsets_async(self, partitions, timeout_ms=None): """Async variant of :meth:`fetch_committed_offsets`.""" @@ -531,7 +531,7 @@ async def fetch_committed_offsets_async(self, partitions, timeout_ms=None): if timer.timeout_ms is not None: delay_ms = min(delay_ms, timer.timeout_ms) if delay_ms > 0: - await self._manager._net.sleep(delay_ms / 1000) + await self._net.sleep(delay_ms / 1000) timer.maybe_raise() def close(self, autocommit=True, timeout_ms=None): @@ -621,7 +621,7 @@ def commit_offsets_sync(self, offsets, timeout_ms=None): offsets.values())) self._invoke_completed_offset_commit_callbacks() with self._client._lock: - return self._manager.run(self._commit_offsets_sync_async, offsets, timeout_ms) + return self._net.run(self._commit_offsets_sync_async, offsets, timeout_ms) async def _commit_offsets_sync_async(self, offsets, timeout_ms=None): if not offsets: @@ -651,7 +651,7 @@ async def _commit_offsets_sync_async(self, offsets, timeout_ms=None): if timer.timeout_ms is not None: delay_ms = min(delay_ms, timer.timeout_ms) if delay_ms > 0: - await self._manager._net.sleep(delay_ms / 1000) + await self._net.sleep(delay_ms / 1000) timer.maybe_raise() def _maybe_auto_commit_offsets_sync(self, timeout_ms=None): diff --git a/kafka/net/compat.py b/kafka/net/compat.py index 8297d2786..7d26691f4 100644 --- a/kafka/net/compat.py +++ b/kafka/net/compat.py @@ -106,7 +106,7 @@ def check_version(self, node_id=None, timeout_ms=10000): async def _check_version(broker_id, timeout_ms): conn = await self._manager.get_connection(broker_id, timeout_ms=timeout_ms) return conn.broker_version - return self._manager.run(_check_version, node_id, timeout_ms) + return self._net.run(_check_version, node_id, timeout_ms) # Request sending diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 113965015..b81c74f79 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -152,7 +152,7 @@ def bootstrap_async(self, timeout_ms=None): return self._bootstrap_future def bootstrap(self, timeout_ms=None): - self.run(self.bootstrap_async, timeout_ms) + self._net.run(self.bootstrap_async, timeout_ms) @property def bootstrapped(self):