Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from kafka.version import __version__
__author__ = 'Dana Powers'
__license__ = 'Apache License 2.0'
__copyright__ = 'Copyright 2025 Dana Powers, David Arthur, and Contributors'
__copyright__ = 'Copyright 2026 Dana Powers, David Arthur, and Contributors'

# Set default logging handler to avoid "No handler found" warnings.
import logging
Expand All @@ -11,16 +11,16 @@


from kafka.admin import KafkaAdminClient
from kafka.client_async import KafkaClient
from kafka.consumer import KafkaConsumer
from kafka.consumer.subscription_state import ConsumerRebalanceListener
from kafka.producer import KafkaProducer
from kafka.conn import BrokerConnection
from kafka.serializer import Serializer, Deserializer
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.protocol.consumer import IsolationLevel, OffsetSpec


__all__ = [
'BrokerConnection', 'ConsumerRebalanceListener', 'KafkaAdminClient',
'KafkaClient', 'KafkaConsumer', 'KafkaProducer',
'KafkaAdminClient', 'KafkaConsumer', 'KafkaProducer',
'ConsumerRebalanceListener', 'Serializer', 'Deserializer',
'TopicPartition', 'OffsetAndMetadata', 'IsolationLevel', 'OffsetSpec',
]
28 changes: 9 additions & 19 deletions kafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
from collections import namedtuple


"""A topic and partition tuple
TopicPartition = namedtuple("TopicPartition",
["topic", "partition"])
TopicPartition.__doc__ = """A topic and partition tuple

Keyword Arguments:
topic (str): A topic name
partition (int): A partition id
"""
TopicPartition = namedtuple("TopicPartition",
["topic", "partition"])


"""The Kafka offset commit API
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
["offset", "metadata", "leader_epoch"], defaults=[None, '', -1])
OffsetAndMetadata.__doc__ = """Container for committed group offset data.

The Kafka offset commit API allows users to provide additional metadata
(in the form of a string) when an offset is committed. This can be useful
Expand All @@ -25,26 +27,14 @@
metadata (str): Non-null metadata
leader_epoch (int): The last known epoch from the leader / broker
"""
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
["offset", "metadata", "leader_epoch"])


"""An offset and timestamp tuple
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
["offset", "timestamp", "leader_epoch"])
OffsetAndTimestamp.__doc__ = """An offset and timestamp tuple

Keyword Arguments:
offset (int): An offset
timestamp (int): The timestamp associated to the offset
leader_epoch (int): The last known epoch from the leader / broker
"""
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
["offset", "timestamp", "leader_epoch"])

"""Define retry policy for async producer

Keyword Arguments:
Limit (int): Number of retries. limit >= 0, 0 means no retries
backoff_ms (int): Milliseconds to backoff.
retry_on_timeouts:
"""
RetryOptions = namedtuple("RetryOptions",
["limit", "backoff_ms", "retry_on_timeouts"])
2 changes: 1 addition & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _set_conn_state(state):

@pytest.fixture
def client(conn, mocker):
from kafka import KafkaClient
from kafka.client_async import KafkaClient

cli = KafkaClient(api_version=(0, 9))
mocker.patch.object(cli, '_init_connect', return_value=True)
Expand Down
3 changes: 2 additions & 1 deletion test/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import pytest

from kafka import KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.client_async import KafkaClient
from kafka.util import TOPIC_LEGAL_CHARS, TOPIC_MAX_LENGTH, ensure_valid_topic_name
from test.testutil import env_kafka_version, random_string
from test.integration.fixtures import KafkaFixture, ZookeeperFixture, create_topics, client_params
Expand Down
2 changes: 1 addition & 1 deletion test/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ def get_api_versions():
k = KafkaFixture.instance(0)
zk = k.zookeeper

from kafka import KafkaClient
from kafka.client_async import KafkaClient
client = KafkaClient(bootstrap_servers='localhost:{}'.format(k.port))
client.check_version()

Expand Down
3 changes: 2 additions & 1 deletion test/integration/test_sasl_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

import pytest

from kafka import KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic
from kafka.client_async import KafkaClient
from kafka.protocol.metadata import MetadataRequest
from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore
from test.integration.fixtures import client_params, create_topics
Expand Down
16 changes: 8 additions & 8 deletions test/test_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ def test_top_level_namespace(self):
assert kafka1.codec.__name__ == "kafka.codec"

def test_submodule_namespace(self):
import kafka.client_async as client1
assert client1.__name__ == "kafka.client_async"
import kafka.net as net1
assert net1.__name__ == "kafka.net"

from kafka import client_async as client2
assert client2.__name__ == "kafka.client_async"
from kafka import net as net2
assert net2.__name__ == "kafka.net"

from kafka.client_async import KafkaClient as KafkaClient1
assert KafkaClient1.__name__ == "KafkaClient"
from kafka import KafkaConsumer as KafkaConsumer1
assert KafkaConsumer1.__name__ == "KafkaConsumer"

from kafka import KafkaClient as KafkaClient2
assert KafkaClient2.__name__ == "KafkaClient"
from kafka import KafkaConsumer as KafkaConsumer2
assert KafkaConsumer2.__name__ == "KafkaConsumer"

from kafka.codec import gzip_encode as gzip_encode1
assert gzip_encode1.__name__ == "gzip_encode"
Expand Down
Loading