diff --git a/test/consumer/test_consumer.py b/test/consumer/test_consumer.py index 41f4dd15b..75f41c1c7 100644 --- a/test/consumer/test_consumer.py +++ b/test/consumer/test_consumer.py @@ -26,6 +26,7 @@ def test_subscription_copy(): assert sub == set(['foo']) sub.add('fizz') assert consumer.subscription() == set(['foo']) + consumer.close() def test_assign(): @@ -37,6 +38,7 @@ def test_assign(): consumer.assign([TopicPartition('foo', 0)]) assert 'foo' in consumer._client.cluster._topics + consumer.close() consumer = KafkaConsumer(api_version=(0, 10, 0)) assert consumer.assignment() == set() @@ -48,3 +50,4 @@ def test_assign(): consumer.subscribe(topics=['foo']) consumer.assign([]) assert consumer.assignment() == set() + consumer.close() diff --git a/test/integration/test_consumer_integration.py b/test/integration/test_consumer_integration.py index f5c848f17..7911cfbb4 100644 --- a/test/integration/test_consumer_integration.py +++ b/test/integration/test_consumer_integration.py @@ -187,6 +187,8 @@ def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_mes for partition, msgs in poll_res.items(): for msg in msgs: seen_partitions.add(partition) + if seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)}: + break # Check that we fetched at least 1 message from both partitions assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)} diff --git a/test/integration/test_producer_integration.py b/test/integration/test_producer_integration.py index fc4c13140..b0a2fb7a3 100644 --- a/test/integration/test_producer_integration.py +++ b/test/integration/test_producer_integration.py @@ -1,10 +1,9 @@ -from contextlib import contextmanager import platform import time import pytest -from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer, TopicPartition, OffsetAndMetadata +from kafka import TopicPartition, OffsetAndMetadata from test.testutil import env_kafka_version, random_string, maybe_skip_unsupported_compression diff --git a/test/integration/test_sasl_integration.py b/test/integration/test_sasl_integration.py index 3cba1f5d1..d36100bb1 100644 --- a/test/integration/test_sasl_integration.py +++ b/test/integration/test_sasl_integration.py @@ -40,6 +40,7 @@ def test_admin(request, sasl_kafka): admin = KafkaAdminClient(**client_params(sasl_kafka, 'admin')) admin.create_topics([NewTopic(topic_name, 1, 1)]) assert topic_name in sasl_kafka.get_topic_names() + admin.close() def test_produce_and_consume(request, sasl_kafka): @@ -53,6 +54,7 @@ def test_produce_and_consume(request, sasl_kafka): future = producer.send(topic_name, value=encoded_msg, partition=i % 2) messages_and_futures.append((encoded_msg, future)) producer.flush() + producer.close() for (msg, f) in messages_and_futures: assert f.succeeded() @@ -67,6 +69,7 @@ def test_produce_and_consume(request, sasl_kafka): assert_message_count(messages[0], 50) assert_message_count(messages[1], 50) + consumer.close() def test_client(request, sasl_kafka): @@ -85,3 +88,4 @@ def test_client(request, sasl_kafka): raise future.exception result = future.value assert topic_name in [t[1] for t in result.topics] + client.close()