From 9b0f4234e532b44ba1ea91ce093aa9bd3220086e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 13 Apr 2026 10:38:02 -0700 Subject: [PATCH 1/6] Fixup mixin merge issues; test create_topics w/ topic strings and dicts --- kafka/admin/_topics.py | 121 +++++++++++++-------- test/integration/test_admin_integration.py | 18 +++ 2 files changed, 96 insertions(+), 43 deletions(-) diff --git a/kafka/admin/_topics.py b/kafka/admin/_topics.py index c448efb00..f9fee1110 100644 --- a/kafka/admin/_topics.py +++ b/kafka/admin/_topics.py @@ -8,6 +8,7 @@ import logging import time from typing import TYPE_CHECKING +import uuid import kafka.errors as Errors from kafka.errors import IncompatibleBrokerVersion @@ -26,25 +27,55 @@ class TopicAdminMixin: config: dict @staticmethod - def _convert_new_topic_request(new_topic): - return ( - new_topic.name, - new_topic.num_partitions, - new_topic.replication_factor, - [ - (partition_id, replicas) for partition_id, replicas in new_topic.replica_assignments.items() - ], - [ - (config_key, config_value) for config_key, config_value in new_topic.topic_configs.items() - ] - ) + def _process_create_topics_input(new_topics): + _Topic = CreateTopicsRequest.CreatableTopic + _Assignment = _Topic.CreatableReplicaAssignment + _Config = _Topic.CreatableTopicConfig + topics = [] + if isinstance(new_topics, dict): + # {topic_name: {num_partitions:, replication_factor:, assignments: {partition: [broker_ids]}, configs: {key: value}} + for topic, data in new_topics.items(): + configs = data.get('configs', {}) + topics.append(_Topic( + name=topic, + num_partitions=data.get('num_partitions', -1), + replication_factor=data.get('replication_factor', -1), + assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas) + for partition_id, replicas in data.get('assignments', {}).items()], + configs=[_Config(name=config_key, value=config_value) + for config_key, config_value in data.get('configs', {}).items()] + )) + elif all(isinstance(v, str) for v in new_topics): + for new_topic in new_topics: + topics.append(_Topic(name=new_topic, num_partitions=-1, replication_factor=-1)) + else: + if all(isinstance(v, NewTopic) for v in new_topics): + for new_topic in new_topics: + topics.append(_Topic( + name=new_topic.name, + num_partitions=new_topic.num_partitions, + replication_factor=new_topic.replication_factor, + assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas) + for partition_id, replicas in new_topic.replica_assignments.items()], + configs=[_Config(name=config_key, value=config_value) + for config_key, config_value in new_topic.topic_configs.items()] + )) + if not topics: + raise ValueError(f"No valid topics found in new_topics: {new_topics}") + return topics def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True, wait_for_metadata=False): """Create new topics in the cluster. Arguments: - new_topics: A list of NewTopic objects. + new_topics: A list of topic names, + or a dict of {topic_name: {num_partitions: int (default -1), + replication_factor: int (default -1), + assignments: {partition: [broker_ids]}, + configs: {key: value}}} + All dict keys are optional. + List of NewTopic objects is deprecated. Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for new topics to be created @@ -66,7 +97,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ .format(self._manager.broker_version)) request = CreateTopicsRequest( - topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], + topics=self._process_create_topics_input(new_topics), timeout_ms=timeout_ms, validate_only=validate_only, max_version=3, @@ -76,7 +107,7 @@ def response_errors(r): yield Errors.for_code(topic.error_code) response = self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) if wait_for_metadata: - self.wait_for_topics([new_topic.name for new_topic in new_topics]) + self.wait_for_topics([new_topic.name for new_topic in request.topics]) return response def wait_for_topics(self, topic_names, timeout_ms=10000): @@ -155,7 +186,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): """Delete topics from the cluster. Arguments: - topics ([str]): A list of topic name strings. + topics ([str]): A list of topic name strings or uuid.UUID ids. Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for topics to be deleted @@ -166,35 +197,18 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): Appropriate version of DeleteTopicsResponse class. """ timeout_ms = self._validate_timeout(timeout_ms) + _Topic = DeleteTopicsRequest.DeleteTopicState request = DeleteTopicsRequest( - topic_names=topics, timeout_ms=timeout_ms, - max_version=5, - ) + topics=[_Topic(topic_id=topic) if isinstance(topic, uuid.UUID) else _Topic(name=topic) + for topic in topics], + timeout_ms=timeout_ms) def response_errors(r): for response in r.responses: yield Errors.for_code(response.error_code) return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) - def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True): - """Create additional partitions for an existing topic. - - Arguments: - topic_partitions: A dict of topic name strings to total partition count (int), - or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}} - if manual assignment is desired. - dict of {topic_name: NewPartition} is deprecated. - - Keyword Arguments: - timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be - created before the broker returns. - validate_only (bool, optional): If True, don't actually create new partitions. - Default: False - raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. - - Returns: - Appropriate version of CreatePartitionsResponse class. - """ - timeout_ms = self._validate_timeout(timeout_ms) + @staticmethod + def _process_create_partitions_input(topic_partitions): _Topic = CreatePartitionsRequest.CreatePartitionsTopic _Assignment = CreatePartitionsRequest.CreatePartitionsTopic.CreatePartitionsAssignment topics = [] @@ -208,7 +222,6 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal count=count['count'], assignments=[_Assignment(broker_ids=broker_ids) for broker_ids in count['assignments']])) - else: topics.append( _Topic( @@ -216,8 +229,30 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal count=count.total_count, assignments=[_Assignment(broker_ids=broker_ids) for broker_ids in count.new_assignments])) + return topics + + def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True): + """Create additional partitions for an existing topic. + + Arguments: + topic_partitions: A dict of topic name strings to total partition count (int), + or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}} + if manual assignment is desired. + dict of {topic_name: NewPartition} is deprecated. + + Keyword Arguments: + timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be + created before the broker returns. + validate_only (bool, optional): If True, don't actually create new partitions. + Default: False + raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. + + Returns: + Appropriate version of CreatePartitionsResponse class. + """ + timeout_ms = self._validate_timeout(timeout_ms) request = CreatePartitionsRequest( - topics=topics, + topics=self._process_create_partitions_input(topic_partitions), timeout_ms=timeout_ms, validate_only=validate_only) @@ -228,7 +263,7 @@ def response_errors(r): class NewTopic: - """A class for new topic creation. + """DEPRECATED: A class for new topic creation. Arguments: name (string): name of the topic @@ -251,7 +286,7 @@ def __init__(self, name, num_partitions=-1, replication_factor=-1, class NewPartitions: - """A class for new partition creation on existing topics. + """DEPRECATED: A class for new partition creation on existing topics. Note that the length of new_assignments, if specified, must be the difference between the new total number of partitions and the existing diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 769be843b..91a4a802e 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -396,6 +396,24 @@ def test_create_delete_topics(kafka_admin_client): assert response.responses[0].name == topic_name assert response.responses[0].error_code == 0 # NoError + topic_name = random_string(4) + response = kafka_admin_client.create_topics([topic_name]) + assert response.topics[0].name == topic_name + assert response.topics[0].error_code == 0 # NoError + + response = kafka_admin_client.delete_topics([topic_name]) + assert response.responses[0].name == topic_name + assert response.responses[0].error_code == 0 # NoError + + topic_name = random_string(4) + response = kafka_admin_client.create_topics({topic_name: {'num_partitions': 1, 'replication_factor': 1}}) + assert response.topics[0].name == topic_name + assert response.topics[0].error_code == 0 # NoError + + response = kafka_admin_client.delete_topics([topic_name]) + assert response.responses[0].name == topic_name + assert response.responses[0].error_code == 0 # NoError + @pytest.mark.skipif(env_kafka_version() < (1, 0), reason="CreatePartitions requires broker >=1.0") def test_create_partitions(kafka_admin_client, topic): From adfdd1a4b1d76b4c9b09f7f84eaf7b566d5688e8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 13 Apr 2026 10:42:22 -0700 Subject: [PATCH 2/6] Admin: _acls mixin owns _process_acl_operations --- kafka/admin/_acls.py | 6 ++++++ kafka/admin/_metadata.py | 6 ------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kafka/admin/_acls.py b/kafka/admin/_acls.py index 07ddfa61a..85f7be4c9 100644 --- a/kafka/admin/_acls.py +++ b/kafka/admin/_acls.py @@ -26,6 +26,12 @@ class ACLAdminMixin: _client: object config: dict + # ACL Helper for Metadata / DescribeGroups + def _process_acl_operations(self, obj): + if obj.get('authorized_operations', None) is not None: + obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations']))) + return obj + def describe_acls(self, acl_filter): """Describe a set of ACLs diff --git a/kafka/admin/_metadata.py b/kafka/admin/_metadata.py index dfbcaab5e..20f7fac1c 100644 --- a/kafka/admin/_metadata.py +++ b/kafka/admin/_metadata.py @@ -5,7 +5,6 @@ import logging from typing import TYPE_CHECKING -from kafka.admin._acls import valid_acl_operations from kafka.protocol.metadata import MetadataRequest if TYPE_CHECKING: @@ -18,11 +17,6 @@ class MetadataAdminMixin: """Mixin providing cluster metadata methods for KafkaAdminClient.""" _manager: KafkaConnectionManager - def _process_acl_operations(self, obj): - if obj.get('authorized_operations', None) is not None: - obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations']))) - return obj - async def _get_cluster_metadata(self, topics): """topics = [] for no topics, None for all.""" request = MetadataRequest( From 258f5665837e2d0d470c2c055d33de04460daa0e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 13 Apr 2026 10:44:22 -0700 Subject: [PATCH 3/6] Admin: drop ElectLeadersRequest max_version; use attrs for response_error decode --- kafka/admin/_records.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka/admin/_records.py b/kafka/admin/_records.py index feb8ee410..0e15bd0fa 100644 --- a/kafka/admin/_records.py +++ b/kafka/admin/_records.py @@ -162,14 +162,13 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_ election_type=ElectionType(election_type), topic_partitions=self._get_topic_partitions(topic_partitions), timeout_ms=timeout_ms, - max_version=1, ) def response_errors(r): if r.API_VERSION >= 1: yield Errors.for_code(r.error_code) for result in r.replica_election_results: - for partition in result[1]: - yield Errors.for_code(partition[1]) + for partition in result.partition_result: + yield Errors.for_code(partition.error_code) ignore_errors = (Errors.ElectionNotNeededError,) return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors) From af4c27840c85eac01dc74efbe1143c55a60f3a44 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 13 Apr 2026 11:51:33 -0700 Subject: [PATCH 4/6] Admin: move list_topics/describe_topics _metadata -> _topics mixin --- kafka/admin/_metadata.py | 22 ---------------------- kafka/admin/_topics.py | 22 ++++++++++++++++++++++ 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/kafka/admin/_metadata.py b/kafka/admin/_metadata.py index 20f7fac1c..0a64c3dfa 100644 --- a/kafka/admin/_metadata.py +++ b/kafka/admin/_metadata.py @@ -34,28 +34,6 @@ async def _get_cluster_metadata(self, topics): self._process_acl_operations(topic) return metadata - def list_topics(self): - """Retrieve a list of all topic names in the cluster. - - Returns: - A list of topic name strings. - """ - metadata = self._manager.run(self._get_cluster_metadata, None) - return [t['name'] for t in metadata['topics']] - - def describe_topics(self, topics=None): - """Fetch metadata for the specified topics or all topics if None. - - Keyword Arguments: - topics ([str], optional) A list of topic names. If None, metadata for all - topics is retrieved. - - Returns: - A list of dicts describing each topic (including partition info). - """ - metadata = self._manager.run(self._get_cluster_metadata, topics) - return metadata['topics'] - def describe_cluster(self): """Fetch cluster-wide metadata such as the list of brokers, the controller ID, and the cluster ID. diff --git a/kafka/admin/_topics.py b/kafka/admin/_topics.py index f9fee1110..220d70e5f 100644 --- a/kafka/admin/_topics.py +++ b/kafka/admin/_topics.py @@ -26,6 +26,28 @@ class TopicAdminMixin: _client: object config: dict + def list_topics(self): + """Retrieve a list of all topic names in the cluster. + + Returns: + A list of topic name strings. + """ + metadata = self._manager.run(self._get_cluster_metadata, None) + return [t['name'] for t in metadata['topics']] + + def describe_topics(self, topics=None): + """Fetch metadata for the specified topics or all topics if None. + + Keyword Arguments: + topics ([str], optional) A list of topic names. If None, metadata for all + topics is retrieved. + + Returns: + A list of dicts describing each topic (including partition info). + """ + metadata = self._manager.run(self._get_cluster_metadata, topics) + return metadata['topics'] + @staticmethod def _process_create_topics_input(new_topics): _Topic = CreateTopicsRequest.CreatableTopic From aac5f23981c20c09e174e9a15727d057348b43d2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 13 Apr 2026 12:02:24 -0700 Subject: [PATCH 5/6] Admin: _groups requests defer version selection to api_versions --- kafka/admin/_groups.py | 47 ++++++++++------------ test/integration/test_admin_integration.py | 38 ++++++++--------- 2 files changed, 40 insertions(+), 45 deletions(-) diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index 28ad6c54e..3d7b55282 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -133,21 +133,17 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include # -- List consumer groups -------------------------------------------------- def _list_consumer_groups_request(self): - version = self._client.api_version(ListGroupsRequest, max_version=2) - return ListGroupsRequest[version]() + # TODO: KIP-518: StatesFilter + # TODO: KIP-848: TypesFilter + return ListGroupsRequest() def _list_consumer_groups_process_response(self, response): """Process a ListGroupsResponse into a list of groups.""" - if response.API_VERSION <= 2: - error_type = Errors.for_code(response.error_code) - if error_type is not Errors.NoError: - raise error_type( - "ListGroupsRequest failed with response '{}'." - .format(response)) - else: - raise NotImplementedError( - "Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient." - .format(response.API_VERSION)) + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + raise error_type( + "ListGroupsRequest failed with response '{}'." + .format(response)) return [(group.group_id, group.protocol_type) for group in response.groups] async def _async_list_consumer_groups(self, broker_ids=None): @@ -189,25 +185,25 @@ def list_consumer_groups(self, broker_ids=None): # -- List consumer group offsets ------------------------------------------- def _list_consumer_group_offsets_request(self, group_id, partitions=None): - version = self._client.api_version(OffsetFetchRequest, max_version=5) + _Topic = OffsetFetchRequest.OffsetFetchRequestTopic if partitions is None: - if version <= 1: - raise ValueError( - """OffsetFetchRequest_v{} requires specifying the - partitions for which to fetch offsets. Omitting the - partitions is only supported on brokers >= 0.10.2. - For details, see KIP-88.""".format(version)) - topics_partitions = None + min_version = 1 + topics = None else: + min_version = 0 topics_partitions_dict = defaultdict(set) for topic, partition in partitions: topics_partitions_dict[topic].add(partition) - topics_partitions = list(topics_partitions_dict.items()) - return OffsetFetchRequest[version](group_id, topics_partitions) + topics = [ + _Topic(name=name, partition_indexes=list(partitions)) + for name, partitions in topics_partitions_dict.items() + ] + return OffsetFetchRequest(group_id=group_id, topics=topics, + min_version=min_version, max_version=6) def _list_consumer_group_offsets_process_response(self, response): """Process an OffsetFetchResponse.""" - if response.API_VERSION <= 5: + if response.API_VERSION <= 6: if response.API_VERSION > 1: error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: @@ -271,12 +267,11 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, # -- Delete consumer groups ------------------------------------------------ def _delete_consumer_groups_request(self, group_ids): - version = self._client.api_version(DeleteGroupsRequest, max_version=1) - return DeleteGroupsRequest[version](group_ids) + return DeleteGroupsRequest(groups_names=group_ids) def _convert_delete_groups_response(self, response): """Parse a DeleteGroupsResponse.""" - if response.API_VERSION <= 1: + if response.API_VERSION <= 2: results = [] for group_id, error_code in response.results: results.append((group_id, Errors.for_code(error_code))) diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 91a4a802e..dbace78e0 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -247,7 +247,7 @@ def consumer_thread(i, group_id): @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") -def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_messages): +def test_delete_consumer_groups(kafka_admin_client, kafka_consumer_factory, send_messages): random_group_id = 'test-group-' + random_string(6) group1 = random_group_id + "_1" group2 = random_group_id + "_2" @@ -266,10 +266,10 @@ def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_ next(consumer3) consumer3.close() - consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert group1 in consumergroups - assert group2 in consumergroups - assert group3 in consumergroups + groups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 in groups + assert group2 in groups + assert group3 in groups delete_results = { group_id: error @@ -279,14 +279,14 @@ def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_ assert delete_results[group2] == NoError assert group3 not in delete_results - consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert group1 not in consumergroups - assert group2 not in consumergroups - assert group3 in consumergroups + groups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 not in groups + assert group2 not in groups + assert group3 in groups @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") -def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages): +def test_delete_consumer_groups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages): random_group_id = 'test-group-' + random_string(6) group1 = random_group_id + "_1" group2 = random_group_id + "_2" @@ -300,24 +300,23 @@ def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_fa consumer2 = kafka_consumer_factory(group_id=group2) next(consumer2) - consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert group1 in consumergroups - assert group2 in consumergroups - assert group3 not in consumergroups + groups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 in groups + assert group2 in groups + assert group3 not in groups delete_results = { group_id: error for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2, group3]) } - assert delete_results[group1] == NoError assert delete_results[group2] == NonEmptyGroupError assert delete_results[group3] == GroupIdNotFoundError - consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert group1 not in consumergroups - assert group2 in consumergroups - assert group3 not in consumergroups + groups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 not in groups + assert group2 in groups + assert group3 not in groups @pytest.fixture(name="topic2") def _topic2(kafka_broker, request): @@ -326,6 +325,7 @@ def _topic2(kafka_broker, request): create_topics(kafka_broker, [topic_name]) return topic_name + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_messages, topic, topic2): t0p0 = TopicPartition(topic, 0) From b438d175e839d20022a037386b20ecf206c80f74 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 13 Apr 2026 13:26:23 -0700 Subject: [PATCH 6/6] CreateTopics requires explicit num_partitions/replication_factor on < 2.4 --- kafka/admin/_topics.py | 13 +++++++++-- test/integration/test_admin_integration.py | 27 ++++++++++++++-------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/kafka/admin/_topics.py b/kafka/admin/_topics.py index 220d70e5f..d09b006f3 100644 --- a/kafka/admin/_topics.py +++ b/kafka/admin/_topics.py @@ -96,8 +96,10 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ replication_factor: int (default -1), assignments: {partition: [broker_ids]}, configs: {key: value}}} - All dict keys are optional. + All keys are optional. List of NewTopic objects is deprecated. + Note: for brokers < 2.4, num_partitions and replication_factor + are required and must be provided via dict or [NewTopic]. Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for new topics to be created @@ -118,8 +120,15 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." .format(self._manager.broker_version)) + topics = self._process_create_topics_input(new_topics) + if self._manager.broker_version < (2, 4): + if any(topic.num_partitions == -1 or topic.replication_factor == -1 for topic in topics): + raise IncompatibleBrokerVersion( + "Broker version {} requires explicit num_partitions and replication_factor" + .format(self._manager.broker_version)) + request = CreateTopicsRequest( - topics=self._process_create_topics_input(new_topics), + topics=topics, timeout_ms=timeout_ms, validate_only=validate_only, max_version=3, diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index dbace78e0..10a81a31a 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -14,7 +14,7 @@ BrokerResponseError, NoError, CoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError, ElectionNotNeededError, - KafkaTimeoutError + KafkaTimeoutError, IncompatibleBrokerVersion ) from kafka.structs import TopicPartition from test.testutil import env_kafka_version, random_string @@ -397,7 +397,7 @@ def test_create_delete_topics(kafka_admin_client): assert response.responses[0].error_code == 0 # NoError topic_name = random_string(4) - response = kafka_admin_client.create_topics([topic_name]) + response = kafka_admin_client.create_topics({topic_name: {'num_partitions': 1, 'replication_factor': 1}}) assert response.topics[0].name == topic_name assert response.topics[0].error_code == 0 # NoError @@ -405,14 +405,23 @@ def test_create_delete_topics(kafka_admin_client): assert response.responses[0].name == topic_name assert response.responses[0].error_code == 0 # NoError - topic_name = random_string(4) - response = kafka_admin_client.create_topics({topic_name: {'num_partitions': 1, 'replication_factor': 1}}) - assert response.topics[0].name == topic_name - assert response.topics[0].error_code == 0 # NoError + # Create topics requires explicit num_partitions/replication_factor on < 2.4 + if env_kafka_version() < (2, 4): + with pytest.raises(IncompatibleBrokerVersion): + kafka_admin_client.create_topics([topic_name]) - response = kafka_admin_client.delete_topics([topic_name]) - assert response.responses[0].name == topic_name - assert response.responses[0].error_code == 0 # NoError + with pytest.raises(IncompatibleBrokerVersion): + kafka_admin_client.create_topics({topic_name: {'num_partitions': 2}}) + + else: + topic_name = random_string(4) + response = kafka_admin_client.create_topics([topic_name]) + assert response.topics[0].name == topic_name + assert response.topics[0].error_code == 0 # NoError + + response = kafka_admin_client.delete_topics([topic_name]) + assert response.responses[0].name == topic_name + assert response.responses[0].error_code == 0 # NoError @pytest.mark.skipif(env_kafka_version() < (1, 0), reason="CreatePartitions requires broker >=1.0")