From 9019efba3f96fa71eacddfad7c18a1298d9c0cac Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 11 Apr 2026 08:32:16 -0700 Subject: [PATCH] Consolidate consumer integration tests --- test/integration/test_consumer_group.py | 188 ------------------ test/integration/test_consumer_integration.py | 178 +++++++++++++++++ 2 files changed, 178 insertions(+), 188 deletions(-) delete mode 100644 test/integration/test_consumer_group.py diff --git a/test/integration/test_consumer_group.py b/test/integration/test_consumer_group.py deleted file mode 100644 index 7f888f7d2..000000000 --- a/test/integration/test_consumer_group.py +++ /dev/null @@ -1,188 +0,0 @@ -import collections -import logging -import threading -import time - -import pytest - -from kafka.consumer.group import KafkaConsumer -from kafka.coordinator.base import MemberState -from kafka.structs import TopicPartition - -from test.testutil import env_kafka_version, random_string - - -def test_consumer(consumer): - # The `topic` fixture is included because - # 0.8.2 brokers need a topic to function well - consumer.poll(timeout_ms=500) - assert consumer._client.cluster.brokers() - consumer.close() - - -def test_consumer_topics(consumer, topic): - # Necessary to drive the IO - consumer.poll(timeout_ms=500) - assert topic in consumer.topics() - assert len(consumer.partitions_for_topic(topic)) > 0 - consumer.close() - - -@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version') -def test_group(kafka_consumer_factory, topic): - num_partitions = 4 - consumers = {} - stop = {} - threads = {} - messages = collections.defaultdict(lambda: collections.defaultdict(list)) - group_id = 'test-group-' + random_string(6) - def consumer_thread(i): - assert i not in consumers - assert i not in stop - stop[i] = threading.Event() - consumers[i] = kafka_consumer_factory(group_id=group_id, - client_id="consumer_thread-%s" % i, - api_version_auto_timeout_ms=5000) - while not stop[i].is_set(): - for tp, records in consumers[i].poll(timeout_ms=200).items(): - messages[i][tp].extend(records) - consumers[i].close(timeout_ms=500) - consumers[i] = None - stop[i] = None - - num_consumers = 4 - for i in range(num_consumers): - t = threading.Thread(target=consumer_thread, args=(i,)) - t.daemon = True - t.start() - threads[i] = t - - try: - timeout = time.monotonic() + 15 - while True: - assert time.monotonic() < timeout, "timeout waiting for assignments" - # Verify all consumers have been created - missing_consumers = set(range(num_consumers)) - set(consumers.keys()) - if missing_consumers: - logging.info('Waiting on consumer threads: %s', missing_consumers) - time.sleep(1) - continue - - unassigned_consumers = {c for c, consumer in consumers.items() if not consumer.assignment()} - if unassigned_consumers: - logging.info('Waiting for consumer assignments: %s', unassigned_consumers) - time.sleep(1) - continue - - # If all consumers exist and have an assignment - logging.info('All consumers have assignment... checking for stable group') - # Verify all consumers are in the same generation - # then log state and break while loop - generations = set([consumer._coordinator._generation.generation_id - for consumer in consumers.values()]) - - # New generation assignment is not complete until - # coordinator.rejoining = False - rejoining = set([c for c, consumer in consumers.items() if consumer._coordinator.rejoining]) - - if not rejoining and len(generations) == 1: - for c, consumer in consumers.items(): - logging.info("[%s] %s %s: %s", c, - consumer._coordinator._generation.generation_id, - consumer._coordinator._generation.member_id, - consumer.assignment()) - break - else: - logging.info('Rejoining: %s, generations: %s', rejoining, generations) - time.sleep(1) - continue - - logging.info('Group stabilized; verifying assignment') - group_assignment = set() - for c in range(num_consumers): - assert len(consumers[c].assignment()) != 0 - assert set.isdisjoint(consumers[c].assignment(), group_assignment) - group_assignment.update(consumers[c].assignment()) - - assert group_assignment == set([ - TopicPartition(topic, partition) - for partition in range(num_partitions)]) - logging.info('Assignment looks good!') - - logging.info('Verifying heartbeats') - while True: - for c in range(num_consumers): - heartbeat = consumers[c]._coordinator.heartbeat - last_hb = time.monotonic() - 0.5 - if (heartbeat.heartbeat_failed or - heartbeat.last_receive < last_hb or - heartbeat.last_reset > last_hb): - time.sleep(0.1) - continue - else: - break - logging.info('Heartbeats look good') - - finally: - logging.info('Shutting down %s consumers', num_consumers) - for c in range(num_consumers): - logging.info('Stopping consumer %s', c) - stop[c].set() - threads[c].join(timeout=5) - assert not threads[c].is_alive() - threads[c] = None - - -def test_paused(kafka_consumer_factory, topic): - consumer = kafka_consumer_factory(topics=()) - topics = [TopicPartition(topic, 1)] - consumer.assign(topics) - assert set(topics) == consumer.assignment() - assert set() == consumer.paused() - - consumer.pause(topics[0]) - assert set([topics[0]]) == consumer.paused() - - consumer.resume(topics[0]) - assert set() == consumer.paused() - - consumer.unsubscribe() - assert set() == consumer.paused() - consumer.close() - - -@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version') -def test_heartbeat_thread(kafka_consumer_factory): - group_id = 'test-group-' + random_string(6) - consumer = kafka_consumer_factory(group_id=group_id) - - # poll until we have joined group / have assignment - start = time.monotonic() - while not consumer.assignment(): - consumer.poll(timeout_ms=100) - - assert consumer._coordinator.state is MemberState.STABLE - last_poll = consumer._coordinator.heartbeat.last_poll - - # wait until we receive first heartbeat - while consumer._coordinator.heartbeat.last_receive < start: - time.sleep(0.1) - - last_send = consumer._coordinator.heartbeat.last_send - last_recv = consumer._coordinator.heartbeat.last_receive - assert last_poll > start - assert last_send > start - assert last_recv > start - - timeout = time.monotonic() + 30 - while True: - if time.monotonic() > timeout: - raise RuntimeError('timeout waiting for heartbeat') - if consumer._coordinator.heartbeat.last_receive > last_recv: - break - time.sleep(0.5) - - assert consumer._coordinator.heartbeat.last_poll == last_poll - consumer.poll(timeout_ms=100) - assert consumer._coordinator.heartbeat.last_poll > last_poll - consumer.close(timeout_ms=100) diff --git a/test/integration/test_consumer_integration.py b/test/integration/test_consumer_integration.py index 2fcff554a..4c63ac297 100644 --- a/test/integration/test_consumer_integration.py +++ b/test/integration/test_consumer_integration.py @@ -1,10 +1,13 @@ +import collections import logging +import threading import time from unittest.mock import patch, ANY import pytest import kafka.codec +from kafka.coordinator.base import MemberState from kafka.errors import KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError from kafka.protocol.broker_version_data import BrokerVersionData from kafka.structs import TopicPartition, OffsetAndTimestamp @@ -12,6 +15,36 @@ from test.testutil import Timer, assert_message_count, env_kafka_version, random_string +def test_consumer(consumer): + # The `topic` fixture is included because + # 0.8.2 brokers need a topic to function well + consumer.poll(timeout_ms=500) + assert consumer._client.cluster.brokers() + + +def test_consumer_topics(consumer, topic): + consumer.poll(timeout_ms=500) + assert topic in consumer.topics() + assert len(consumer.partitions_for_topic(topic)) > 0 + + +def test_paused(kafka_consumer_factory, topic): + consumer = kafka_consumer_factory(topics=()) + topics = [TopicPartition(topic, 1)] + consumer.assign(topics) + assert set(topics) == consumer.assignment() + assert set() == consumer.paused() + + consumer.pause(topics[0]) + assert set([topics[0]]) == consumer.paused() + + consumer.resume(topics[0]) + assert set() == consumer.paused() + + consumer.unsubscribe() + assert set() == consumer.paused() + + @pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Requires KAFKA_VERSION >= 0.10") def test_kafka_version_infer(kafka_consumer_factory): consumer = kafka_consumer_factory(api_version=None) @@ -310,3 +343,148 @@ def test_kafka_consumer_position_after_seek_to_end(kafka_consumer_factory, topic # Verify we got the expected position assert position == 10, "Expected position 10, got {}".format(position) + + +# Consumer group tests (use group_id, exercise the coordinator/join path) + + +@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version') +def test_group(kafka_consumer_factory, topic): + num_partitions = 4 + consumers = {} + stop = {} + threads = {} + messages = collections.defaultdict(lambda: collections.defaultdict(list)) + group_id = 'test-group-' + random_string(6) + def consumer_thread(i): + assert i not in consumers + assert i not in stop + stop[i] = threading.Event() + consumers[i] = kafka_consumer_factory(group_id=group_id, + client_id="consumer_thread-%s" % i, + api_version_auto_timeout_ms=5000) + while not stop[i].is_set(): + for tp, records in consumers[i].poll(timeout_ms=200).items(): + messages[i][tp].extend(records) + consumers[i].close(timeout_ms=500) + consumers[i] = None + stop[i] = None + + num_consumers = 4 + for i in range(num_consumers): + t = threading.Thread(target=consumer_thread, args=(i,)) + t.daemon = True + t.start() + threads[i] = t + + try: + timeout = time.monotonic() + 15 + while True: + assert time.monotonic() < timeout, "timeout waiting for assignments" + # Verify all consumers have been created + missing_consumers = set(range(num_consumers)) - set(consumers.keys()) + if missing_consumers: + logging.info('Waiting on consumer threads: %s', missing_consumers) + time.sleep(1) + continue + + unassigned_consumers = {c for c, consumer in consumers.items() if not consumer.assignment()} + if unassigned_consumers: + logging.info('Waiting for consumer assignments: %s', unassigned_consumers) + time.sleep(1) + continue + + # If all consumers exist and have an assignment + logging.info('All consumers have assignment... checking for stable group') + # Verify all consumers are in the same generation + # then log state and break while loop + generations = set([consumer._coordinator._generation.generation_id + for consumer in consumers.values()]) + + # New generation assignment is not complete until + # coordinator.rejoining = False + rejoining = set([c for c, consumer in consumers.items() if consumer._coordinator.rejoining]) + + if not rejoining and len(generations) == 1: + for c, consumer in consumers.items(): + logging.info("[%s] %s %s: %s", c, + consumer._coordinator._generation.generation_id, + consumer._coordinator._generation.member_id, + consumer.assignment()) + break + else: + logging.info('Rejoining: %s, generations: %s', rejoining, generations) + time.sleep(1) + continue + + logging.info('Group stabilized; verifying assignment') + group_assignment = set() + for c in range(num_consumers): + assert len(consumers[c].assignment()) != 0 + assert set.isdisjoint(consumers[c].assignment(), group_assignment) + group_assignment.update(consumers[c].assignment()) + + assert group_assignment == set([ + TopicPartition(topic, partition) + for partition in range(num_partitions)]) + logging.info('Assignment looks good!') + + logging.info('Verifying heartbeats') + while True: + for c in range(num_consumers): + heartbeat = consumers[c]._coordinator.heartbeat + last_hb = time.monotonic() - 0.5 + if (heartbeat.heartbeat_failed or + heartbeat.last_receive < last_hb or + heartbeat.last_reset > last_hb): + time.sleep(0.1) + continue + else: + break + logging.info('Heartbeats look good') + + finally: + logging.info('Shutting down %s consumers', num_consumers) + for c in range(num_consumers): + logging.info('Stopping consumer %s', c) + stop[c].set() + threads[c].join(timeout=5) + assert not threads[c].is_alive() + threads[c] = None + + +@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version') +def test_heartbeat_thread(kafka_consumer_factory): + group_id = 'test-group-' + random_string(6) + consumer = kafka_consumer_factory(group_id=group_id) + + # poll until we have joined group / have assignment + start = time.monotonic() + while not consumer.assignment(): + consumer.poll(timeout_ms=100) + + assert consumer._coordinator.state is MemberState.STABLE + last_poll = consumer._coordinator.heartbeat.last_poll + + # wait until we receive first heartbeat + while consumer._coordinator.heartbeat.last_receive < start: + time.sleep(0.1) + + last_send = consumer._coordinator.heartbeat.last_send + last_recv = consumer._coordinator.heartbeat.last_receive + assert last_poll > start + assert last_send > start + assert last_recv > start + + timeout = time.monotonic() + 30 + while True: + if time.monotonic() > timeout: + raise RuntimeError('timeout waiting for heartbeat') + if consumer._coordinator.heartbeat.last_receive > last_recv: + break + time.sleep(0.5) + + assert consumer._coordinator.heartbeat.last_poll == last_poll + consumer.poll(timeout_ms=100) + assert consumer._coordinator.heartbeat.last_poll > last_poll + consumer.close(timeout_ms=100)