diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 156fa70da..769be843b 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -1,6 +1,6 @@ from logging import info from threading import Event, Thread -from time import time, sleep +from time import monotonic as time, sleep import pytest @@ -14,6 +14,7 @@ BrokerResponseError, NoError, CoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError, ElectionNotNeededError, + KafkaTimeoutError ) from kafka.structs import TopicPartition from test.testutil import env_kafka_version, random_string @@ -411,9 +412,16 @@ def test_create_partitions(kafka_admin_client, topic): assert result[0] == topic assert result[1] == 0 # NoError - # Verify the new partition count - topic_metadata = kafka_admin_client.describe_topics([topic]) - assert len(topic_metadata[0]['partitions']) == new_total + timeout_at = time() + 30 + while time() < timeout_at: + # Verify the new partition count + topic_metadata = kafka_admin_client.describe_topics([topic]) + if len(topic_metadata[0]['partitions']) == new_total: + break + else: + sleep(1) + else: + raise KafkaTimeoutError('Failed to create partitions') @pytest.mark.skipif(env_kafka_version() < (2, 2), reason="Leader Election requires broker >=2.2")