Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 93 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ def _convert_new_topic_request(new_topic):
]
)

def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True):
def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True,
wait_for_metadata=False):
"""Create new topics in the cluster.

Arguments:
Expand All @@ -460,10 +461,20 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
validate_only (bool, optional): If True, don't actually create new topics.
Not supported by all versions. Default: False
raise_errors (bool, optional): Whether to raise errors as exceptions. Default True.
wait_for_metadata (bool, optional): If True, after the broker successfully
accepts the create request, block until each new topic is visible in
broker metadata with a leader assigned for every partition. Useful on
KRaft clusters, where CreateTopicsResponse returning NoError does not
guarantee that a subsequent MetadataRequest will see the topic. Has no
effect when ``validate_only`` is True. Uses a fixed 10-second timeout;
call :meth:`wait_for_topics` directly for finer control.
Mutually exclusive with validate_only. Default: False

Returns:
Appropriate version of CreateTopicResponse class.
"""
if validate_only and wait_for_metadata:
raise ValueError('validate_only and wait_for_metadata are mutually exclusive')
version = self._client.api_version(CreateTopicsRequest, max_version=3)
timeout_ms = self._validate_timeout(timeout_ms)
if version == 0:
Expand All @@ -487,7 +498,87 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
def get_response_errors(r):
for topic in r.topics:
yield Errors.for_code(topic[1])
return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors)
response = self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors)
if wait_for_metadata: # implies not validate_only
self.wait_for_topics([new_topic.name for new_topic in new_topics])
return response

def wait_for_topics(self, topic_names, timeout_ms=10000):
"""Block until each of the given topics is ready to use.

CreateTopicsResponse only confirms that the broker accepted the create
request; propagating the new topics into the broker's metadata cache --
and electing a leader for every partition -- can lag behind, especially
on KRaft clusters. This method polls :meth:`describe_topics` at a fixed
interval until every requested topic both:

- is returned with ``error_code == 0`` (topic exists and is
visible in metadata), and
- has ``error_code == 0`` and a leader assigned (``leader_id >= 0``)
for every partition.

Useful after :meth:`create_topics` (including implicit creation via
``allow_auto_topic_creation``) or after a delete+recreate sequence.

Arguments:
topic_names ([str]): Topic names to wait for.

Keyword Arguments:
timeout_ms (numeric, optional): Maximum milliseconds to wait.
Default: 10000.

Raises:
KafkaTimeoutError: if any topic is still not ready when the
deadline expires. The exception message includes the
per-topic state from the final ``describe_topics`` call.
"""
if not topic_names:
return
topic_names = list(topic_names)
deadline = time.monotonic() + (timeout_ms / 1000.0)
pending = {name: 'not yet queried' for name in topic_names}
while True:
try:
topics = self.describe_topics(topics=topic_names)
except Exception as exc:
log.debug('describe_topics failed while waiting for topic visibility: %s', exc)
topics = []
by_name = {t.get('name'): t for t in topics}
pending = {}
for name in topic_names:
reason = self._topic_not_ready_reason(by_name.get(name))
if reason is not None:
pending[name] = reason
if not pending:
return
if time.monotonic() >= deadline:
raise Errors.KafkaTimeoutError(
'Topics not ready after %sms: %s' % (timeout_ms, pending))
time.sleep(0.1)

@staticmethod
def _topic_not_ready_reason(topic_info):
"""Return a string reason if ``topic_info`` isn't ready, else None."""
if topic_info is None:
return 'missing from metadata response'
error_code = topic_info.get('error_code', 0)
if error_code != 0:
return Errors.for_code(error_code).__name__
partitions = topic_info.get('partitions') or []
if not partitions:
return 'no partitions reported'
bad = []
for p in partitions:
p_err = p.get('error_code', 0)
idx = p.get('partition_index')
if p_err != 0:
bad.append('p%s=%s' % (idx, Errors.for_code(p_err).__name__))
continue
if p.get('leader_id', -1) < 0:
bad.append('p%s=no leader' % idx)
if bad:
return ','.join(bad)
return None

def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
"""Delete topics from the cluster.
Expand Down
145 changes: 144 additions & 1 deletion test/admin/test_admin.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest

import kafka.admin
from kafka.errors import IllegalArgumentError
from kafka.admin.client import KafkaAdminClient
from kafka.errors import IllegalArgumentError, KafkaTimeoutError, UnknownTopicOrPartitionError


def test_config_resource():
Expand Down Expand Up @@ -92,3 +93,145 @@ def test_new_topic():
assert good_topic.replication_factor == -1
assert good_topic.replica_assignments == {1: [1, 2, 3]}
assert good_topic.topic_configs == {'key': 'value'}


# ---------------------------------------------------------------------------
# _topic_not_ready_reason (pure function, no network)
# ---------------------------------------------------------------------------


def _ready_topic(name='foo', num_partitions=1):
return {
'name': name,
'error_code': 0,
'partitions': [
{'error_code': 0, 'partition_index': i, 'leader_id': 0}
for i in range(num_partitions)
],
}


def test_topic_not_ready_reason_missing():
assert KafkaAdminClient._topic_not_ready_reason(None) == 'missing from metadata response'


def test_topic_not_ready_reason_topic_error():
assert KafkaAdminClient._topic_not_ready_reason(
{'name': 'foo', 'error_code': 3, 'partitions': []}
) == 'UnknownTopicOrPartitionError'


def test_topic_not_ready_reason_no_partitions():
assert KafkaAdminClient._topic_not_ready_reason(
{'name': 'foo', 'error_code': 0, 'partitions': []}
) == 'no partitions reported'


def test_topic_not_ready_reason_no_leader():
assert KafkaAdminClient._topic_not_ready_reason(
{'name': 'foo', 'error_code': 0, 'partitions': [
{'error_code': 0, 'partition_index': 0, 'leader_id': -1},
{'error_code': 0, 'partition_index': 1, 'leader_id': 0},
]}
) == 'p0=no leader'


def test_topic_not_ready_reason_partition_error():
assert KafkaAdminClient._topic_not_ready_reason(
{'name': 'foo', 'error_code': 0, 'partitions': [
{'error_code': 5, 'partition_index': 0, 'leader_id': -1},
]}
) == 'p0=LeaderNotAvailableError'


def test_topic_not_ready_reason_partial_partition_errors():
# Multiple partitions each with their own issue -> all reasons joined.
reason = KafkaAdminClient._topic_not_ready_reason(
{'name': 'foo', 'error_code': 0, 'partitions': [
{'error_code': 0, 'partition_index': 0, 'leader_id': -1},
{'error_code': 5, 'partition_index': 1, 'leader_id': -1},
]}
)
assert 'p0=no leader' in reason
assert 'p1=LeaderNotAvailableError' in reason


def test_topic_not_ready_reason_ready():
assert KafkaAdminClient._topic_not_ready_reason(_ready_topic()) is None


# ---------------------------------------------------------------------------
# wait_for_topics (mocks describe_topics; does not hit the network)
# ---------------------------------------------------------------------------


def _bare_admin():
"""Return a KafkaAdminClient instance without running __init__ (which
would try to bootstrap a real broker). All attributes needed by the
method under test are provided by the test.
"""
return object.__new__(KafkaAdminClient)


def test_wait_for_topics_empty_list_returns_immediately():
admin = _bare_admin()
# No describe_topics monkey-patch: if it were called the test would
# crash with AttributeError, proving the empty-list fast path.
admin.wait_for_topics([])


def test_wait_for_topics_ready_on_first_call(monkeypatch):
admin = _bare_admin()
calls = []
def fake_describe_topics(topics):
calls.append(topics)
return [_ready_topic(name=t, num_partitions=2) for t in topics]
monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics)
admin.wait_for_topics(['foo', 'bar'], timeout_ms=1000)
assert len(calls) == 1
assert set(calls[0]) == {'foo', 'bar'}


def test_wait_for_topics_becomes_ready_after_retry(monkeypatch):
admin = _bare_admin()
responses = [
# First call: topic missing
[],
# Second call: topic exists but no leader yet
[{'name': 'foo', 'error_code': 0, 'partitions': [
{'error_code': 0, 'partition_index': 0, 'leader_id': -1}]}],
# Third call: ready
[_ready_topic(name='foo')],
]
def fake_describe_topics(topics):
return responses.pop(0)
monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics)
admin.wait_for_topics(['foo'], timeout_ms=5000)
assert responses == [] # all three calls consumed


def test_wait_for_topics_timeout(monkeypatch):
admin = _bare_admin()
def fake_describe_topics(topics):
return [{'name': 'foo', 'error_code': 3, 'partitions': []}]
monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics)
with pytest.raises(KafkaTimeoutError) as exc_info:
admin.wait_for_topics(['foo'], timeout_ms=200)
assert 'foo' in str(exc_info.value)
assert 'UnknownTopicOrPartitionError' in str(exc_info.value)


def test_wait_for_topics_describe_exception_keeps_retrying(monkeypatch):
"""A transient exception from describe_topics should be logged and
retried, not propagated - otherwise a flaky broker could turn a
recoverable wait into a hard failure."""
admin = _bare_admin()
state = {'calls': 0}
def fake_describe_topics(topics):
state['calls'] += 1
if state['calls'] == 1:
raise RuntimeError('transient')
return [_ready_topic(name=t) for t in topics]
monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics)
admin.wait_for_topics(['foo'], timeout_ms=5000)
assert state['calls'] == 2
Loading