Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions test/integration/test_admin_integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from logging import info
from threading import Event, Thread
from time import time, sleep
from time import monotonic as time, sleep

import pytest

Expand All @@ -14,6 +14,7 @@
BrokerResponseError, NoError, CoordinatorNotAvailableError,
NonEmptyGroupError, GroupIdNotFoundError, OffsetOutOfRangeError,
UnknownTopicOrPartitionError, ElectionNotNeededError,
KafkaTimeoutError
)
from kafka.structs import TopicPartition
from test.testutil import env_kafka_version, random_string
Expand Down Expand Up @@ -411,9 +412,16 @@ def test_create_partitions(kafka_admin_client, topic):
assert result[0] == topic
assert result[1] == 0 # NoError

# Verify the new partition count
topic_metadata = kafka_admin_client.describe_topics([topic])
assert len(topic_metadata[0]['partitions']) == new_total
timeout_at = time() + 30
while time() < timeout_at:
# Verify the new partition count
topic_metadata = kafka_admin_client.describe_topics([topic])
if len(topic_metadata[0]['partitions']) == new_total:
break
else:
sleep(1)
else:
raise KafkaTimeoutError('Failed to create partitions')


@pytest.mark.skipif(env_kafka_version() < (2, 2), reason="Leader Election requires broker >=2.2")
Expand Down
Loading