Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

from kafka.admin import KafkaAdminClient
from kafka.consumer import KafkaConsumer
from kafka.consumer.subscription_state import ConsumerRebalanceListener
from kafka.consumer.subscription_state import (
AsyncConsumerRebalanceListener, ConsumerRebalanceListener,
)
from kafka.producer import KafkaProducer
from kafka.serializer import Serializer, Deserializer
from kafka.structs import TopicPartition, TopicPartitionReplica, OffsetAndMetadata
Expand All @@ -21,7 +23,8 @@

__all__ = [
'KafkaAdminClient', 'KafkaConsumer', 'KafkaProducer',
'ConsumerRebalanceListener', 'Serializer', 'Deserializer',
'AsyncConsumerRebalanceListener', 'ConsumerRebalanceListener',
'Serializer', 'Deserializer',
'TopicPartition', 'TopicPartitionReplica', 'OffsetAndMetadata',
'IsolationLevel', 'OffsetSpec',
]
62 changes: 58 additions & 4 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ def subscribe(self, topics=(), pattern=None, listener=None):
self._set_subscription_type(SubscriptionType.AUTO_TOPICS)
self.change_subscription(topics)

if listener and not isinstance(listener, ConsumerRebalanceListener):
raise TypeError('listener must be a ConsumerRebalanceListener')
if listener and not isinstance(
listener, (ConsumerRebalanceListener, AsyncConsumerRebalanceListener)):
raise TypeError(
'listener must be a ConsumerRebalanceListener or AsyncConsumerRebalanceListener')
self.rebalance_listener = listener

@synchronized
Expand Down Expand Up @@ -528,8 +530,15 @@ class ConsumerRebalanceListener(metaclass=abc.ABCMeta):
it may want to automatically trigger a flush of this cache, before the new
owner takes over consumption.

This callback will execute in the user thread as part of the Consumer.poll()
whenever partition assignment changes.
Threading: callbacks run on the consumer's IO event loop, the same loop
that drives heartbeats. Sync listener methods must return promptly --
blocking IO inside a sync listener will block heartbeats for the duration
and can cause the consumer to be kicked from the group if the listener
runs longer than ``session_timeout_ms``. For listeners that need to do
blocking work (e.g. flushing state to a database), prefer
:class:`AsyncConsumerRebalanceListener`, which lets you ``await`` while
keeping the loop responsive, or wrap the blocking call in your own
worker thread.

It is guaranteed that all consumer processes will invoke
on_partitions_revoked() prior to any process invoking
Expand Down Expand Up @@ -574,3 +583,48 @@ def on_partitions_assigned(self, assigned):
consumer (may include partitions that were previously assigned)
"""
pass


class AsyncConsumerRebalanceListener(metaclass=abc.ABCMeta):
"""
Async variant of :class:`ConsumerRebalanceListener`.

Implement this when your rebalance hooks need to perform IO that would
otherwise block the consumer's event loop -- e.g. flushing state to a
database, calling an external service, or coordinating with other async
code. The coordinator detects coroutine functions and ``await`` s them
instead of calling inline, so other tasks on the loop (notably the
heartbeat coroutine) continue to run while your listener is suspended.

Same lifecycle and ordering guarantees as the sync listener: all
consumers in the group invoke ``on_partitions_revoked`` before any
invokes ``on_partitions_assigned``. Both methods must be defined as
``async def``; otherwise use :class:`ConsumerRebalanceListener`.
"""
@abc.abstractmethod
async def on_partitions_revoked(self, revoked):
"""Async-callback for the start of a rebalance operation.

See :meth:`ConsumerRebalanceListener.on_partitions_revoked` for
semantics. The coordinator awaits this method, so non-blocking IO
via ``await`` keeps the heartbeat loop responsive during the call.

Arguments:
revoked (set of TopicPartition): the partitions that were
assigned to the consumer on the last rebalance.
"""
pass

@abc.abstractmethod
async def on_partitions_assigned(self, assigned):
"""Async-callback for the completion of a partition re-assignment.

See :meth:`ConsumerRebalanceListener.on_partitions_assigned` for
semantics.

Arguments:
assigned (set of TopicPartition): the partitions assigned to
the consumer (may include partitions that were previously
assigned).
"""
pass
Loading
Loading