From ab602c0054117827aecc54d9256c70eee9d93662 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 11 Apr 2026 09:33:00 -0700 Subject: [PATCH] AdminClient: wait_for_topics() and create_topics() wait_for_metadata option --- kafka/admin/client.py | 95 ++++++++++++++++++++++++- test/admin/test_admin.py | 145 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 237 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 0f5daa677..db5b0551a 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -448,7 +448,8 @@ def _convert_new_topic_request(new_topic): ] ) - def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True): + def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True, + wait_for_metadata=False): """Create new topics in the cluster. Arguments: @@ -460,10 +461,20 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ validate_only (bool, optional): If True, don't actually create new topics. Not supported by all versions. Default: False raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. + wait_for_metadata (bool, optional): If True, after the broker successfully + accepts the create request, block until each new topic is visible in + broker metadata with a leader assigned for every partition. Useful on + KRaft clusters, where CreateTopicsResponse returning NoError does not + guarantee that a subsequent MetadataRequest will see the topic. Has no + effect when ``validate_only`` is True. Uses a fixed 10-second timeout; + call :meth:`wait_for_topics` directly for finer control. + Mutually exclusive with validate_only. Default: False Returns: Appropriate version of CreateTopicResponse class. """ + if validate_only and wait_for_metadata: + raise ValueError('validate_only and wait_for_metadata are mutually exclusive') version = self._client.api_version(CreateTopicsRequest, max_version=3) timeout_ms = self._validate_timeout(timeout_ms) if version == 0: @@ -487,7 +498,87 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ def get_response_errors(r): for topic in r.topics: yield Errors.for_code(topic[1]) - return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors) + response = self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors) + if wait_for_metadata: # implies not validate_only + self.wait_for_topics([new_topic.name for new_topic in new_topics]) + return response + + def wait_for_topics(self, topic_names, timeout_ms=10000): + """Block until each of the given topics is ready to use. + + CreateTopicsResponse only confirms that the broker accepted the create + request; propagating the new topics into the broker's metadata cache -- + and electing a leader for every partition -- can lag behind, especially + on KRaft clusters. This method polls :meth:`describe_topics` at a fixed + interval until every requested topic both: + + - is returned with ``error_code == 0`` (topic exists and is + visible in metadata), and + - has ``error_code == 0`` and a leader assigned (``leader_id >= 0``) + for every partition. + + Useful after :meth:`create_topics` (including implicit creation via + ``allow_auto_topic_creation``) or after a delete+recreate sequence. + + Arguments: + topic_names ([str]): Topic names to wait for. + + Keyword Arguments: + timeout_ms (numeric, optional): Maximum milliseconds to wait. + Default: 10000. + + Raises: + KafkaTimeoutError: if any topic is still not ready when the + deadline expires. The exception message includes the + per-topic state from the final ``describe_topics`` call. + """ + if not topic_names: + return + topic_names = list(topic_names) + deadline = time.monotonic() + (timeout_ms / 1000.0) + pending = {name: 'not yet queried' for name in topic_names} + while True: + try: + topics = self.describe_topics(topics=topic_names) + except Exception as exc: + log.debug('describe_topics failed while waiting for topic visibility: %s', exc) + topics = [] + by_name = {t.get('name'): t for t in topics} + pending = {} + for name in topic_names: + reason = self._topic_not_ready_reason(by_name.get(name)) + if reason is not None: + pending[name] = reason + if not pending: + return + if time.monotonic() >= deadline: + raise Errors.KafkaTimeoutError( + 'Topics not ready after %sms: %s' % (timeout_ms, pending)) + time.sleep(0.1) + + @staticmethod + def _topic_not_ready_reason(topic_info): + """Return a string reason if ``topic_info`` isn't ready, else None.""" + if topic_info is None: + return 'missing from metadata response' + error_code = topic_info.get('error_code', 0) + if error_code != 0: + return Errors.for_code(error_code).__name__ + partitions = topic_info.get('partitions') or [] + if not partitions: + return 'no partitions reported' + bad = [] + for p in partitions: + p_err = p.get('error_code', 0) + idx = p.get('partition_index') + if p_err != 0: + bad.append('p%s=%s' % (idx, Errors.for_code(p_err).__name__)) + continue + if p.get('leader_id', -1) < 0: + bad.append('p%s=no leader' % idx) + if bad: + return ','.join(bad) + return None def delete_topics(self, topics, timeout_ms=None, raise_errors=True): """Delete topics from the cluster. diff --git a/test/admin/test_admin.py b/test/admin/test_admin.py index e6f7937a7..5f782681e 100644 --- a/test/admin/test_admin.py +++ b/test/admin/test_admin.py @@ -1,7 +1,8 @@ import pytest import kafka.admin -from kafka.errors import IllegalArgumentError +from kafka.admin.client import KafkaAdminClient +from kafka.errors import IllegalArgumentError, KafkaTimeoutError, UnknownTopicOrPartitionError def test_config_resource(): @@ -92,3 +93,145 @@ def test_new_topic(): assert good_topic.replication_factor == -1 assert good_topic.replica_assignments == {1: [1, 2, 3]} assert good_topic.topic_configs == {'key': 'value'} + + +# --------------------------------------------------------------------------- +# _topic_not_ready_reason (pure function, no network) +# --------------------------------------------------------------------------- + + +def _ready_topic(name='foo', num_partitions=1): + return { + 'name': name, + 'error_code': 0, + 'partitions': [ + {'error_code': 0, 'partition_index': i, 'leader_id': 0} + for i in range(num_partitions) + ], + } + + +def test_topic_not_ready_reason_missing(): + assert KafkaAdminClient._topic_not_ready_reason(None) == 'missing from metadata response' + + +def test_topic_not_ready_reason_topic_error(): + assert KafkaAdminClient._topic_not_ready_reason( + {'name': 'foo', 'error_code': 3, 'partitions': []} + ) == 'UnknownTopicOrPartitionError' + + +def test_topic_not_ready_reason_no_partitions(): + assert KafkaAdminClient._topic_not_ready_reason( + {'name': 'foo', 'error_code': 0, 'partitions': []} + ) == 'no partitions reported' + + +def test_topic_not_ready_reason_no_leader(): + assert KafkaAdminClient._topic_not_ready_reason( + {'name': 'foo', 'error_code': 0, 'partitions': [ + {'error_code': 0, 'partition_index': 0, 'leader_id': -1}, + {'error_code': 0, 'partition_index': 1, 'leader_id': 0}, + ]} + ) == 'p0=no leader' + + +def test_topic_not_ready_reason_partition_error(): + assert KafkaAdminClient._topic_not_ready_reason( + {'name': 'foo', 'error_code': 0, 'partitions': [ + {'error_code': 5, 'partition_index': 0, 'leader_id': -1}, + ]} + ) == 'p0=LeaderNotAvailableError' + + +def test_topic_not_ready_reason_partial_partition_errors(): + # Multiple partitions each with their own issue -> all reasons joined. + reason = KafkaAdminClient._topic_not_ready_reason( + {'name': 'foo', 'error_code': 0, 'partitions': [ + {'error_code': 0, 'partition_index': 0, 'leader_id': -1}, + {'error_code': 5, 'partition_index': 1, 'leader_id': -1}, + ]} + ) + assert 'p0=no leader' in reason + assert 'p1=LeaderNotAvailableError' in reason + + +def test_topic_not_ready_reason_ready(): + assert KafkaAdminClient._topic_not_ready_reason(_ready_topic()) is None + + +# --------------------------------------------------------------------------- +# wait_for_topics (mocks describe_topics; does not hit the network) +# --------------------------------------------------------------------------- + + +def _bare_admin(): + """Return a KafkaAdminClient instance without running __init__ (which + would try to bootstrap a real broker). All attributes needed by the + method under test are provided by the test. + """ + return object.__new__(KafkaAdminClient) + + +def test_wait_for_topics_empty_list_returns_immediately(): + admin = _bare_admin() + # No describe_topics monkey-patch: if it were called the test would + # crash with AttributeError, proving the empty-list fast path. + admin.wait_for_topics([]) + + +def test_wait_for_topics_ready_on_first_call(monkeypatch): + admin = _bare_admin() + calls = [] + def fake_describe_topics(topics): + calls.append(topics) + return [_ready_topic(name=t, num_partitions=2) for t in topics] + monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics) + admin.wait_for_topics(['foo', 'bar'], timeout_ms=1000) + assert len(calls) == 1 + assert set(calls[0]) == {'foo', 'bar'} + + +def test_wait_for_topics_becomes_ready_after_retry(monkeypatch): + admin = _bare_admin() + responses = [ + # First call: topic missing + [], + # Second call: topic exists but no leader yet + [{'name': 'foo', 'error_code': 0, 'partitions': [ + {'error_code': 0, 'partition_index': 0, 'leader_id': -1}]}], + # Third call: ready + [_ready_topic(name='foo')], + ] + def fake_describe_topics(topics): + return responses.pop(0) + monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics) + admin.wait_for_topics(['foo'], timeout_ms=5000) + assert responses == [] # all three calls consumed + + +def test_wait_for_topics_timeout(monkeypatch): + admin = _bare_admin() + def fake_describe_topics(topics): + return [{'name': 'foo', 'error_code': 3, 'partitions': []}] + monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics) + with pytest.raises(KafkaTimeoutError) as exc_info: + admin.wait_for_topics(['foo'], timeout_ms=200) + assert 'foo' in str(exc_info.value) + assert 'UnknownTopicOrPartitionError' in str(exc_info.value) + + +def test_wait_for_topics_describe_exception_keeps_retrying(monkeypatch): + """A transient exception from describe_topics should be logged and + retried, not propagated - otherwise a flaky broker could turn a + recoverable wait into a hard failure.""" + admin = _bare_admin() + state = {'calls': 0} + def fake_describe_topics(topics): + state['calls'] += 1 + if state['calls'] == 1: + raise RuntimeError('transient') + return [_ready_topic(name=t) for t in topics] + monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics) + admin.wait_for_topics(['foo'], timeout_ms=5000) + assert state['calls'] == 2