diff --git a/kafka/__init__.py b/kafka/__init__.py index e394188f8..d5cc26de5 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -2,7 +2,7 @@ from kafka.version import __version__ __author__ = 'Dana Powers' __license__ = 'Apache License 2.0' -__copyright__ = 'Copyright 2025 Dana Powers, David Arthur, and Contributors' +__copyright__ = 'Copyright 2026 Dana Powers, David Arthur, and Contributors' # Set default logging handler to avoid "No handler found" warnings. import logging @@ -11,16 +11,16 @@ from kafka.admin import KafkaAdminClient -from kafka.client_async import KafkaClient from kafka.consumer import KafkaConsumer from kafka.consumer.subscription_state import ConsumerRebalanceListener from kafka.producer import KafkaProducer -from kafka.conn import BrokerConnection from kafka.serializer import Serializer, Deserializer from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.protocol.consumer import IsolationLevel, OffsetSpec __all__ = [ - 'BrokerConnection', 'ConsumerRebalanceListener', 'KafkaAdminClient', - 'KafkaClient', 'KafkaConsumer', 'KafkaProducer', + 'KafkaAdminClient', 'KafkaConsumer', 'KafkaProducer', + 'ConsumerRebalanceListener', 'Serializer', 'Deserializer', + 'TopicPartition', 'OffsetAndMetadata', 'IsolationLevel', 'OffsetSpec', ] diff --git a/kafka/structs.py b/kafka/structs.py index 5093120e0..9fae24f3b 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -3,17 +3,19 @@ from collections import namedtuple -"""A topic and partition tuple +TopicPartition = namedtuple("TopicPartition", + ["topic", "partition"]) +TopicPartition.__doc__ = """A topic and partition tuple Keyword Arguments: topic (str): A topic name partition (int): A partition id """ -TopicPartition = namedtuple("TopicPartition", - ["topic", "partition"]) -"""The Kafka offset commit API +OffsetAndMetadata = namedtuple("OffsetAndMetadata", + ["offset", "metadata", "leader_epoch"], defaults=[None, '', -1]) +OffsetAndMetadata.__doc__ = """Container for committed group offset data. The Kafka offset commit API allows users to provide additional metadata (in the form of a string) when an offset is committed. This can be useful @@ -25,26 +27,14 @@ metadata (str): Non-null metadata leader_epoch (int): The last known epoch from the leader / broker """ -OffsetAndMetadata = namedtuple("OffsetAndMetadata", - ["offset", "metadata", "leader_epoch"]) -"""An offset and timestamp tuple +OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", + ["offset", "timestamp", "leader_epoch"]) +OffsetAndTimestamp.__doc__ = """An offset and timestamp tuple Keyword Arguments: offset (int): An offset timestamp (int): The timestamp associated to the offset leader_epoch (int): The last known epoch from the leader / broker """ -OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", - ["offset", "timestamp", "leader_epoch"]) - -"""Define retry policy for async producer - -Keyword Arguments: - Limit (int): Number of retries. limit >= 0, 0 means no retries - backoff_ms (int): Milliseconds to backoff. - retry_on_timeouts: -""" -RetryOptions = namedtuple("RetryOptions", - ["limit", "backoff_ms", "retry_on_timeouts"]) diff --git a/test/conftest.py b/test/conftest.py index c981ee2d0..946078955 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -42,7 +42,7 @@ def _set_conn_state(state): @pytest.fixture def client(conn, mocker): - from kafka import KafkaClient + from kafka.client_async import KafkaClient cli = KafkaClient(api_version=(0, 9)) mocker.patch.object(cli, '_init_connect', return_value=True) diff --git a/test/integration/conftest.py b/test/integration/conftest.py index 03e37553a..fd7da01ce 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -4,7 +4,8 @@ import pytest -from kafka import KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer +from kafka.client_async import KafkaClient from kafka.util import TOPIC_LEGAL_CHARS, TOPIC_MAX_LENGTH, ensure_valid_topic_name from test.testutil import env_kafka_version, random_string from test.integration.fixtures import KafkaFixture, ZookeeperFixture, create_topics, client_params diff --git a/test/integration/fixtures.py b/test/integration/fixtures.py index 4bb35ca8f..4894bbcb4 100644 --- a/test/integration/fixtures.py +++ b/test/integration/fixtures.py @@ -718,7 +718,7 @@ def get_api_versions(): k = KafkaFixture.instance(0) zk = k.zookeeper - from kafka import KafkaClient + from kafka.client_async import KafkaClient client = KafkaClient(bootstrap_servers='localhost:{}'.format(k.port)) client.check_version() diff --git a/test/integration/test_sasl_integration.py b/test/integration/test_sasl_integration.py index a3418b1ec..c74152a2c 100644 --- a/test/integration/test_sasl_integration.py +++ b/test/integration/test_sasl_integration.py @@ -5,8 +5,9 @@ import pytest -from kafka import KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer from kafka.admin import NewTopic +from kafka.client_async import KafkaClient from kafka.protocol.metadata import MetadataRequest from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore from test.integration.fixtures import client_params, create_topics diff --git a/test/test_package.py b/test/test_package.py index aa42c9cec..b513ad0d1 100644 --- a/test/test_package.py +++ b/test/test_package.py @@ -6,17 +6,17 @@ def test_top_level_namespace(self): assert kafka1.codec.__name__ == "kafka.codec" def test_submodule_namespace(self): - import kafka.client_async as client1 - assert client1.__name__ == "kafka.client_async" + import kafka.net as net1 + assert net1.__name__ == "kafka.net" - from kafka import client_async as client2 - assert client2.__name__ == "kafka.client_async" + from kafka import net as net2 + assert net2.__name__ == "kafka.net" - from kafka.client_async import KafkaClient as KafkaClient1 - assert KafkaClient1.__name__ == "KafkaClient" + from kafka import KafkaConsumer as KafkaConsumer1 + assert KafkaConsumer1.__name__ == "KafkaConsumer" - from kafka import KafkaClient as KafkaClient2 - assert KafkaClient2.__name__ == "KafkaClient" + from kafka import KafkaConsumer as KafkaConsumer2 + assert KafkaConsumer2.__name__ == "KafkaConsumer" from kafka.codec import gzip_encode as gzip_encode1 assert gzip_encode1.__name__ == "gzip_encode"