Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 111 additions & 13 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import socket
import threading
import time
import weakref

from kafka import errors as Errors
from kafka.future import Future
Expand All @@ -20,9 +21,6 @@ class ClusterMetadata:
"""
A class to manage kafka cluster metadata.

This class does not perform any IO. It simply updates internal state
given API responses (MetadataResponse, FindCoordinatorResponse).

Keyword Arguments:
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
Expand All @@ -48,6 +46,7 @@ class ClusterMetadata:
}

def __init__(self, **configs):
self._manager = None
self._topics = set()
self._brokers = {} # node_id -> MetadataResponseBroker
self._partitions = {} # topic -> partition -> PartitionMetadata
Expand All @@ -64,7 +63,10 @@ def __init__(self, **configs):
self.internal_topics = set()
self.controller = None
self.cluster_id = None
self.metadata_refresh_in_progress = False

self._refresh_loop_future = None
self._refresh_future = None
self._notify_wakeup = None

self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
Expand All @@ -74,6 +76,101 @@ def __init__(self, **configs):
self._bootstrap_brokers = self._generate_bootstrap_brokers()
self._coordinator_brokers = {}

@property
def metadata_refresh_in_progress(self):
"""True if a refresh is mid-flight."""
return self._refresh_future is not None and not self._refresh_future.is_done

def attach(self, manager):
"""Wire this cluster to its connection manager.

Construction is split from attach so ClusterMetadata can be built
standalone (tests, snapshots) without a live manager. The reference is
held via weakref.proxy so that manager <-> cluster does not form a GC
cycle; manager.close() still calls cluster.close() to clear eagerly.
"""
self._manager = weakref.proxy(manager)

def close(self):
# Drop manager reference cycle
self._manager = None

def start_refresh_loop(self):
"""Spawn the periodic refresh coroutine. Idempotent. Triggers bootstrap if needed."""
if self._manager is None:
raise RuntimeError('start_refresh_loop requires prior attach()')
if self._refresh_loop_future is not None:
return
log.debug('Starting metadata refresh loop')
self._refresh_loop_future = self._manager.call_soon(self._refresh_loop)

async def _refresh_loop(self):
"""Awaits ttl() then triggers refresh_metadata(); request_update() wakes early."""
if self._manager is None:
raise RuntimeError('start_refresh_loop requires prior attach()')
if not self._manager.bootstrapped:
await self._manager.bootstrap_async()
while True:
if self.metadata_refresh_in_progress:
await self._refresh_future
ttl_ms = self.ttl()
if ttl_ms == 0:
try:
await self.refresh_metadata()
except Exception as exc:
log.debug('Metadata refresh failed: %s', exc)
log.exception(exc)
continue
try:
log.debug('Sleeping %s for next Metadata refresh', ttl_ms / 1000)
wakeup, self._notify_wakeup = self._manager.wakeup_pair(ttl_ms / 1000)
await wakeup()
except Exception as exc:
log.error('Metadata refresh loop error: %s', exc)

async def refresh_metadata(self, node_id=None):
"""Send one MetadataRequest and apply the response.

Concurrent callers share a single in-flight request: if a refresh is
already underway, additional callers await the same Future and see the
same outcome (success or exception). This avoids duplicate broker
requests when bootstrap and the refresh loop race, or when external
callers invoke refresh while the loop is mid-flight.
"""
if self._manager is None:
raise RuntimeError('refresh_metadata requires prior attach()')
if self.metadata_refresh_in_progress:
log.debug('Metadata refresh already in flight; awaiting existing')
await self._refresh_future
return
self._refresh_future = Future()
try:
await self._do_refresh_metadata(node_id)
except Exception as exc:
self._refresh_future.failure(exc)
raise
else:
self._refresh_future.success(None)

async def _do_refresh_metadata(self, node_id):
log.debug(f'Metadata refresh (node_id={node_id})')
node_id = self._manager.least_loaded_node() if node_id is None else node_id
if node_id is None:
self._manager.update_backoff('metadata')
raise Errors.NodeNotReadyError('metadata')
else:
self._manager.reset_backoff('metadata')
try:
request = self.metadata_request()
log.debug("Sending metadata request %s to node %s", request, node_id)
response = await self._manager.send(request, node_id)
except Exception as exc:
log.error('Metadata refresh: failed %s', exc)
self.failed_update(exc)
raise
log.debug('Metadata refresh: success')
self.update_metadata(response)

def _generate_bootstrap_brokers(self):
# collect_hosts does not perform DNS, so we should be fine to re-use
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
Expand Down Expand Up @@ -228,8 +325,9 @@ def coordinator_for_group(self, group):
def ttl(self):
"""Milliseconds until metadata should be refreshed"""
now = time.monotonic() * 1000
if self.metadata_refresh_in_progress:
ttl = self.config['retry_backoff_ms']
if self._manager is not None and self._manager.connection_delay('metadata'):
# Exponential backoff - KIP-580
return self._manager.connection_delay('metadata') * 1000
elif self._need_update:
ttl = 0
else:
Expand Down Expand Up @@ -258,7 +356,13 @@ def request_update(self):
self._need_update = True
if not self._future or self._future.is_done:
self._future = Future()
return self._future
ret = self._future
if self._manager:
self.start_refresh_loop()
if self._notify_wakeup:
self._notify_wakeup()
self._notify_wakeup = None
return ret

@property
def need_update(self):
Expand All @@ -283,10 +387,6 @@ def topics(self, exclude_internal_topics=True):
return topics

def metadata_request(self):
if self.metadata_refresh_in_progress:
raise RuntimeError('MetadataRequest currently in-flight!')
else:
self.metadata_refresh_in_progress = True
if self.need_all_topic_metadata:
topics = MetadataRequest.ALL_TOPICS
elif not self._topics:
Expand All @@ -309,7 +409,6 @@ def failed_update(self, exception):
f = self._future
self._future = None
self._last_refresh_ms = time.monotonic() * 1000
self.metadata_refresh_in_progress = False
if f:
f.failure(exception)

Expand Down Expand Up @@ -403,7 +502,6 @@ def update_metadata(self, metadata):
now = time.monotonic() * 1000
self._last_refresh_ms = now
self._last_successful_refresh_ms = now
self.metadata_refresh_in_progress = False

if f:
# In the common case where we ask for a single topic and get back an
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ async def _fetch_offsets_by_times_async(self, timestamps, timeout_ms=None):
if not getattr(exc, 'retriable', False):
raise
if getattr(exc, 'invalid_metadata', False) or self._client._manager.cluster.need_update:
refresh_future = self._client._manager.update_metadata()
refresh_future = self._client.cluster.request_update()
try:
await self._client._manager.wait_for(refresh_future, timer.timeout_ms)
except Errors.KafkaTimeoutError:
Expand Down
22 changes: 7 additions & 15 deletions kafka/net/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time

import kafka.errors as Errors
from kafka.cluster import ClusterMetadata
from kafka.net.manager import KafkaConnectionManager
from kafka.net.selector import NetworkSelector

Expand All @@ -20,13 +19,11 @@ class KafkaNetClient:
KafkaConnectionManager directly (fire-and-forget via _request_buffer).
"""
def __init__(self, **configs):
# _lock is still used by the legacy Coordinator (kafka/coordinator/base.py).
# Remove once Coordinator moves to the IO thread (Phase D).
self._lock = threading.RLock()
self._net = NetworkSelector(**configs)
cluster = ClusterMetadata(
bootstrap_servers=configs.get('bootstrap_servers', ['localhost:9092']),
metadata_max_age_ms=configs.get('metadata_max_age_ms', 300000),
)
self._manager = KafkaConnectionManager(self._net, cluster, **configs)
self._manager = KafkaConnectionManager(self._net, **configs)

@property
def cluster(self):
Expand Down Expand Up @@ -129,16 +126,11 @@ def send_and_receive(self, node_id, request, timeout_ms=30000):
# Delegation

def poll(self, timeout_ms=None, future=None):
# _lock serializes with HeartbeatThread, which also drives poll()
# while holding this lock. Without it, both threads would call
# _net.poll() concurrently and race on selector / task state.
# The lock goes away once HeartbeatThread does (Phase D).
with self._lock:
metadata_ttl = self._manager.cluster.ttl()
if metadata_ttl == 0:
self._manager.update_metadata()
elif self._manager.cluster.need_update:
# A refresh is pending but retry_backoff hasn't elapsed yet.
# Cap timeout_ms so select() returns in time for us to come
# back and re-check ttl.
if timeout_ms is None or metadata_ttl < timeout_ms:
timeout_ms = metadata_ttl
return self._manager.poll(timeout_ms=timeout_ms, future=future)

def close(self, node_id=None):
Expand Down
Loading
Loading