Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 0 additions & 188 deletions test/integration/test_consumer_group.py

This file was deleted.

178 changes: 178 additions & 0 deletions test/integration/test_consumer_integration.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,50 @@
import collections
import logging
import threading
import time
from unittest.mock import patch, ANY

import pytest

import kafka.codec
from kafka.coordinator.base import MemberState
from kafka.errors import KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError
from kafka.protocol.broker_version_data import BrokerVersionData
from kafka.structs import TopicPartition, OffsetAndTimestamp

from test.testutil import Timer, assert_message_count, env_kafka_version, random_string


def test_consumer(consumer):
# The `topic` fixture is included because
# 0.8.2 brokers need a topic to function well
consumer.poll(timeout_ms=500)
assert consumer._client.cluster.brokers()


def test_consumer_topics(consumer, topic):
consumer.poll(timeout_ms=500)
assert topic in consumer.topics()
assert len(consumer.partitions_for_topic(topic)) > 0


def test_paused(kafka_consumer_factory, topic):
consumer = kafka_consumer_factory(topics=())
topics = [TopicPartition(topic, 1)]
consumer.assign(topics)
assert set(topics) == consumer.assignment()
assert set() == consumer.paused()

consumer.pause(topics[0])
assert set([topics[0]]) == consumer.paused()

consumer.resume(topics[0])
assert set() == consumer.paused()

consumer.unsubscribe()
assert set() == consumer.paused()


@pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Requires KAFKA_VERSION >= 0.10")
def test_kafka_version_infer(kafka_consumer_factory):
consumer = kafka_consumer_factory(api_version=None)
Expand Down Expand Up @@ -310,3 +343,148 @@ def test_kafka_consumer_position_after_seek_to_end(kafka_consumer_factory, topic

# Verify we got the expected position
assert position == 10, "Expected position 10, got {}".format(position)


# Consumer group tests (use group_id, exercise the coordinator/join path)


@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_group(kafka_consumer_factory, topic):
num_partitions = 4
consumers = {}
stop = {}
threads = {}
messages = collections.defaultdict(lambda: collections.defaultdict(list))
group_id = 'test-group-' + random_string(6)
def consumer_thread(i):
assert i not in consumers
assert i not in stop
stop[i] = threading.Event()
consumers[i] = kafka_consumer_factory(group_id=group_id,
client_id="consumer_thread-%s" % i,
api_version_auto_timeout_ms=5000)
while not stop[i].is_set():
for tp, records in consumers[i].poll(timeout_ms=200).items():
messages[i][tp].extend(records)
consumers[i].close(timeout_ms=500)
consumers[i] = None
stop[i] = None

num_consumers = 4
for i in range(num_consumers):
t = threading.Thread(target=consumer_thread, args=(i,))
t.daemon = True
t.start()
threads[i] = t

try:
timeout = time.monotonic() + 15
while True:
assert time.monotonic() < timeout, "timeout waiting for assignments"
# Verify all consumers have been created
missing_consumers = set(range(num_consumers)) - set(consumers.keys())
if missing_consumers:
logging.info('Waiting on consumer threads: %s', missing_consumers)
time.sleep(1)
continue

unassigned_consumers = {c for c, consumer in consumers.items() if not consumer.assignment()}
if unassigned_consumers:
logging.info('Waiting for consumer assignments: %s', unassigned_consumers)
time.sleep(1)
continue

# If all consumers exist and have an assignment
logging.info('All consumers have assignment... checking for stable group')
# Verify all consumers are in the same generation
# then log state and break while loop
generations = set([consumer._coordinator._generation.generation_id
for consumer in consumers.values()])

# New generation assignment is not complete until
# coordinator.rejoining = False
rejoining = set([c for c, consumer in consumers.items() if consumer._coordinator.rejoining])

if not rejoining and len(generations) == 1:
for c, consumer in consumers.items():
logging.info("[%s] %s %s: %s", c,
consumer._coordinator._generation.generation_id,
consumer._coordinator._generation.member_id,
consumer.assignment())
break
else:
logging.info('Rejoining: %s, generations: %s', rejoining, generations)
time.sleep(1)
continue

logging.info('Group stabilized; verifying assignment')
group_assignment = set()
for c in range(num_consumers):
assert len(consumers[c].assignment()) != 0
assert set.isdisjoint(consumers[c].assignment(), group_assignment)
group_assignment.update(consumers[c].assignment())

assert group_assignment == set([
TopicPartition(topic, partition)
for partition in range(num_partitions)])
logging.info('Assignment looks good!')

logging.info('Verifying heartbeats')
while True:
for c in range(num_consumers):
heartbeat = consumers[c]._coordinator.heartbeat
last_hb = time.monotonic() - 0.5
if (heartbeat.heartbeat_failed or
heartbeat.last_receive < last_hb or
heartbeat.last_reset > last_hb):
time.sleep(0.1)
continue
else:
break
logging.info('Heartbeats look good')

finally:
logging.info('Shutting down %s consumers', num_consumers)
for c in range(num_consumers):
logging.info('Stopping consumer %s', c)
stop[c].set()
threads[c].join(timeout=5)
assert not threads[c].is_alive()
threads[c] = None


@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_heartbeat_thread(kafka_consumer_factory):
group_id = 'test-group-' + random_string(6)
consumer = kafka_consumer_factory(group_id=group_id)

# poll until we have joined group / have assignment
start = time.monotonic()
while not consumer.assignment():
consumer.poll(timeout_ms=100)

assert consumer._coordinator.state is MemberState.STABLE
last_poll = consumer._coordinator.heartbeat.last_poll

# wait until we receive first heartbeat
while consumer._coordinator.heartbeat.last_receive < start:
time.sleep(0.1)

last_send = consumer._coordinator.heartbeat.last_send
last_recv = consumer._coordinator.heartbeat.last_receive
assert last_poll > start
assert last_send > start
assert last_recv > start

timeout = time.monotonic() + 30
while True:
if time.monotonic() > timeout:
raise RuntimeError('timeout waiting for heartbeat')
if consumer._coordinator.heartbeat.last_receive > last_recv:
break
time.sleep(0.5)

assert consumer._coordinator.heartbeat.last_poll == last_poll
consumer.poll(timeout_ms=100)
assert consumer._coordinator.heartbeat.last_poll > last_poll
consumer.close(timeout_ms=100)
Loading