From 909f32a3ec00860bc8256b6e29edf81f562918f5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 12 Apr 2026 12:46:35 -0700 Subject: [PATCH 1/5] Admin: Use connection-specific version for coordinator api requests --- kafka/admin/client.py | 90 +++++++++++----------- test/integration/test_admin_integration.py | 8 +- 2 files changed, 51 insertions(+), 47 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index ece6bf7c9..1ee04c762 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -450,30 +450,21 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ """ if validate_only and wait_for_metadata: raise ValueError('validate_only and wait_for_metadata are mutually exclusive') - version = self._client.api_version(CreateTopicsRequest, max_version=3) timeout_ms = self._validate_timeout(timeout_ms) - if version == 0: - if validate_only: - raise IncompatibleBrokerVersion( - "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." - .format(self._manager.broker_version)) - request = CreateTopicsRequest[version]( - topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], - timeout_ms=timeout_ms - ) - elif version <= 3: - request = CreateTopicsRequest[version]( - topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], - timeout_ms=timeout_ms, - validate_only=validate_only - ) - else: - raise RuntimeError('Version check error: %s' % version) - # TODO convert structs to a more pythonic interface - def get_response_errors(r): + if validate_only and self._manager.broker_version < (0, 10, 2): + raise IncompatibleBrokerVersion( + "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." + .format(self._manager.broker_version)) + + request = CreateTopicsRequest( + topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], + timeout_ms=timeout_ms, + validate_only=validate_only + ) + def response_errors(r): for topic in r.topics: - yield Errors.for_code(topic[1]) - response = self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors) + yield Errors.for_code(topic.error_code) + response = self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) if wait_for_metadata: # implies not validate_only self.wait_for_topics([new_topic.name for new_topic in new_topics]) return response @@ -569,13 +560,16 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): Returns: Appropriate version of DeleteTopicsResponse class. """ - version = self._client.api_version(DeleteTopicsRequest, max_version=3) timeout_ms = self._validate_timeout(timeout_ms) - request = DeleteTopicsRequest[version](topic_names=topics, timeout_ms=timeout_ms) - def get_response_errors(r): + request = DeleteTopicsRequest( + topic_names=topics, # v0-v5 + topics=[DeleteTopicsRequest.DeleteTopicState(name=topic) for topic in topics], # v6+ + timeout_ms=timeout_ms, + ) + def response_errors(r): for response in r.responses: - yield Errors.for_code(response[1]) - return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors) + yield Errors.for_code(response.error_code) + return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) def _process_acl_operations(self, obj): if obj.get('authorized_operations', None) is not None: @@ -1090,7 +1084,8 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal """Create additional partitions for an existing topic. Arguments: - topic_partitions: A map of topic name strings to NewPartition objects. + topic_partitions: A map of topic name strings to total partition count (int), + or if manual assignment is desired, a NewPartition object with assignments. Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be @@ -1102,17 +1097,28 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal Returns: Appropriate version of CreatePartitionsResponse class. """ - version = self._client.api_version(CreatePartitionsRequest, max_version=1) timeout_ms = self._validate_timeout(timeout_ms) - request = CreatePartitionsRequest[version]( - topics=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()], + _Topic = CreatePartitionsRequest.CreatePartitionsTopic + _Assignment = CreatePartitionsRequest.CreatePartitionsTopic.CreatePartitionsAssignment + topics = [] + for topic, count in topic_partitions.items(): + if isinstance(count, int): + topics.append(_Topic(name=topic, count=count)) + else: + topics.append( + _Topic( + name=topic, + count=count.total_count, + assignments=[_Assignment(broker_ids=broker_ids) + for broker_ids in count.new_assignments])) + request = CreatePartitionsRequest( + topics=topics, timeout_ms=timeout_ms, - validate_only=validate_only - ) - def get_response_errors(r): + validate_only=validate_only) + def response_errors(r): for result in r.results: - yield Errors.for_code(result[1]) - return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors) + yield Errors.for_code(result.error_code) + return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) def _get_leader_for_partitions(self, partitions, timeout_ms=None): """Finds ID of the leader node for every given topic partition. @@ -1644,22 +1650,20 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_ Returns: Appropriate version of ElectLeadersResponse class. """ - version = self._client.api_version(ElectLeadersRequest, max_version=1) timeout_ms = self._validate_timeout(timeout_ms) - request = ElectLeadersRequest[version]( + request = ElectLeadersRequest( election_type=ElectionType(election_type), topic_partitions=self._get_topic_partitions(topic_partitions), timeout_ms=timeout_ms, ) - # TODO convert structs to a more pythonic interface - def get_response_errors(r): + def response_errors(r): if r.API_VERSION >= 1: yield Errors.for_code(r.error_code) for result in r.replica_election_results: - for partition in result[1]: - yield Errors.for_code(partition[1]) + for partition in result.partition_result: + yield Errors.for_code(partition.error_code) ignore_errors = (Errors.ElectionNotNeededError,) - return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors, ignore_errors) + return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors) def describe_log_dirs(self): """Send a DescribeLogDirsRequest request to a broker. diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 3cddeb761..4fcd8f4ea 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -388,12 +388,12 @@ def test_delete_records_with_errors(kafka_admin_client, topic, send_messages): def test_create_delete_topics(kafka_admin_client): topic_name = random_string(4) response = kafka_admin_client.create_topics([NewTopic(topic_name, 1, 1)]) - assert response.topics[0][0] == topic_name - assert response.topics[0][1] == 0 # NoError + assert response.topics[0].name == topic_name + assert response.topics[0].error_code == 0 # NoError response = kafka_admin_client.delete_topics([topic_name]) - assert response.responses[0][0] == topic_name - assert response.responses[0][1] == 0 # NoError + assert response.responses[0].name == topic_name + assert response.responses[0].error_code == 0 # NoError @pytest.mark.skipif(env_kafka_version() < (2, 2), reason="Leader Election requires broker >=2.2") From 2b4318aa4da149c4fd692c16daa4b88accf155bb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 12 Apr 2026 16:20:31 -0700 Subject: [PATCH 2/5] Admin: deprecate NewTopics/NewPartitions (use simple dicts); delete by topic-id --- kafka/admin/client.py | 122 ++++++++++++++++--------------- kafka/cli/admin/topics/create.py | 5 +- kafka/cli/admin/topics/delete.py | 16 +++- kafka/protocol/admin/topics.py | 8 +- 4 files changed, 86 insertions(+), 65 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 1ee04c762..12eee44bf 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -5,6 +5,7 @@ import selectors import socket import time +import uuid from . import ConfigResourceType @@ -398,38 +399,16 @@ async def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), .format(request, response)) return response - @staticmethod - def _convert_new_topic_request(new_topic): - """ - Build the tuple required by CreateTopicsRequest from a NewTopic object. - - Arguments: - new_topic: A NewTopic instance containing name, partition count, replication factor, - replica assignments, and config entries. - - Returns: - A tuple in the form: - (topic_name, num_partitions, replication_factor, [(partition_id, [replicas])...], - [(config_key, config_value)...]) - """ - return ( - new_topic.name, - new_topic.num_partitions, - new_topic.replication_factor, - [ - (partition_id, replicas) for partition_id, replicas in new_topic.replica_assignments.items() - ], - [ - (config_key, config_value) for config_key, config_value in new_topic.topic_configs.items() - ] - ) - def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True, wait_for_metadata=False): """Create new topics in the cluster. Arguments: - new_topics: A list of NewTopic objects. + new_topics: A list of topic names, + or a dict of {topic_name: {num_partitions:, replication_factor:, + assignments: {partition: [broker_ids]}, + configs: {key: value}}} + List of NewTopic objects is deprecated. Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for new topics to be created @@ -456,11 +435,46 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." .format(self._manager.broker_version)) + _Topic = CreateTopicsRequest.CreatableTopic + _Assignment = _Topic.CreatableReplicaAssignment + _Config = _Topic.CreatableTopicConfig + topics = [] + if isinstance(new_topics, dict): + # {topic_name: {num_partitions:, replication_factor:, assignments: {partition: [broker_ids]}, configs: {key: value}} + for topic, data in new_topics.items(): + configs = data.get('configs', {}) + topics.append(_Topic( + name=topic, + num_partitions=data.get('num_partitions', -1), + replication_factor=data.get('replication_factor', -1), + assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas) + for partition_id, replicas in data.get('assignments', {}).items()], + configs=[_Config(name=config_key, value=config_value) + for config_key, config_value in data.get('configs', {}).items()] + )) + elif all(isinstance(v, str) for v in new_topics): + for new_topic in new_topics: + topics.append(_Topic(name=new_topic)) + else: + from .new_topic import NewTopic + if all(isinstance(v, NewTopic) for v in new_topics): + for new_topic in new_topics: + topics.append(_Topic( + name=new_topic.name, + num_partitions=new_topic.num_partitions, + replication_factor=new_topic.replication_factor, + assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas) + for partition_id, replicas in new_topic.replica_assignments.items()], + configs=[_Config(name=config_key, value=config_value) + for config_key, config_value in new_topic.topic_configs.items()] + )) + if not topics: + raise ValueError(f"No valid topics found in new_topics: {new_topics}") + request = CreateTopicsRequest( - topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], + topics=topics, timeout_ms=timeout_ms, - validate_only=validate_only - ) + validate_only=validate_only) def response_errors(r): for topic in r.topics: yield Errors.for_code(topic.error_code) @@ -550,7 +564,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): """Delete topics from the cluster. Arguments: - topics ([str]): A list of topic name strings. + topics ([str]): A list of topic name strings or uuid.UUID ids. Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for topics to be deleted @@ -561,11 +575,12 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): Appropriate version of DeleteTopicsResponse class. """ timeout_ms = self._validate_timeout(timeout_ms) + _Topic = DeleteTopicsRequest.DeleteTopicState request = DeleteTopicsRequest( - topic_names=topics, # v0-v5 - topics=[DeleteTopicsRequest.DeleteTopicState(name=topic) for topic in topics], # v6+ - timeout_ms=timeout_ms, - ) + topics=[_Topic(topic_id=topic) if isinstance(topic, uuid.UUID) else _Topic(name=topic) + for topic in topics], + timeout_ms=timeout_ms) + def response_errors(r): for response in r.responses: yield Errors.for_code(response.error_code) @@ -1061,31 +1076,14 @@ def alter_configs(self, config_resources): # describe log dirs protocol not yet implemented # Note: have to lookup the broker with the replica assignment and send the request to that broker - @staticmethod - def _convert_create_partitions_request(topic_name, new_partitions): - """Convert a NewPartitions object into the tuple format for CreatePartitionsRequest. - - Arguments: - topic_name: The name of the existing topic. - new_partitions: A NewPartitions instance with total_count and new_assignments. - - Returns: - A tuple: (topic_name, (total_count, [list_of_assignments])). - """ - return ( - topic_name, - ( - new_partitions.total_count, - new_partitions.new_assignments - ) - ) - def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True): """Create additional partitions for an existing topic. Arguments: - topic_partitions: A map of topic name strings to total partition count (int), - or if manual assignment is desired, a NewPartition object with assignments. + topic_partitions: A dict of topic name strings to total partition count (int), + or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}} + if manual assignment is desired. + dict of {topic_name: NewPartition} is deprecated. Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be @@ -1104,6 +1102,14 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal for topic, count in topic_partitions.items(): if isinstance(count, int): topics.append(_Topic(name=topic, count=count)) + elif isinstance(count, dict): + topics.append( + _Topic( + name=topic, + count=count['count'], + assignments=[_Assignment(broker_ids=broker_ids) + for broker_ids in count['assignments']])) + else: topics.append( _Topic( @@ -1115,6 +1121,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal topics=topics, timeout_ms=timeout_ms, validate_only=validate_only) + def response_errors(r): for result in r.results: yield Errors.for_code(result.error_code) @@ -1654,8 +1661,7 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_ request = ElectLeadersRequest( election_type=ElectionType(election_type), topic_partitions=self._get_topic_partitions(topic_partitions), - timeout_ms=timeout_ms, - ) + timeout_ms=timeout_ms) def response_errors(r): if r.API_VERSION >= 1: yield Errors.for_code(r.error_code) diff --git a/kafka/cli/admin/topics/create.py b/kafka/cli/admin/topics/create.py index 0844b411b..7112e4e7f 100644 --- a/kafka/cli/admin/topics/create.py +++ b/kafka/cli/admin/topics/create.py @@ -1,6 +1,3 @@ -from kafka.admin.new_topic import NewTopic - - class CreateTopic: @classmethod @@ -13,4 +10,4 @@ def add_subparser(cls, subparsers): @classmethod def command(cls, client, args): - return client.create_topics([NewTopic(args.topic, args.num_partitions, args.replication_factor)]) + return client.create_topics({args.topic: {'num_partitions': args.num_partitions, 'replication_factor': args.replication_factor}}) diff --git a/kafka/cli/admin/topics/delete.py b/kafka/cli/admin/topics/delete.py index 70a1e749b..4193b65c0 100644 --- a/kafka/cli/admin/topics/delete.py +++ b/kafka/cli/admin/topics/delete.py @@ -1,7 +1,19 @@ +import uuid + + class DeleteTopic: @classmethod def add_subparser(cls, subparsers): parser = subparsers.add_parser('delete', help='Delete Kafka Topic') - parser.add_argument('-t', '--topic', type=str, required=True) - parser.set_defaults(command=lambda cli, args: cli.delete_topics([args.topic])) + parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='topic name') + parser.add_argument('--id', type=str, action='append', dest='topic_ids', help='topic UUID') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + if not args.topics and not args.topic_ids: + raise ValueError('At least one topic or topic_id is required!') + topic_ids = [uuid.UUID(topic_id) for topic_id in args.topic_ids] + topic_names = args.topics + return client.delete_topics(topic_names + topic_ids) diff --git a/kafka/protocol/admin/topics.py b/kafka/protocol/admin/topics.py index aa3adeee9..cccef5203 100644 --- a/kafka/protocol/admin/topics.py +++ b/kafka/protocol/admin/topics.py @@ -4,7 +4,13 @@ class CreateTopicsRequest(ApiMessage): pass class CreateTopicsResponse(ApiMessage): pass -class DeleteTopicsRequest(ApiMessage): pass +class DeleteTopicsRequest(ApiMessage): + def encode(self, version=None, header=False, framed=False): + # convert topics => topic_names + if self.topics and not self.topic_names: + self.topic_names = [topic.name for topic in self.topics] + return super().encode(version=version, header=header, framed=framed) + class DeleteTopicsResponse(ApiMessage): pass class CreatePartitionsRequest(ApiMessage): pass From edee7bf0a8dc1814b0c3f97e4591fb538dd907fe Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 12 Apr 2026 16:22:12 -0700 Subject: [PATCH 3/5] Test create_partitions --- test/integration/test_admin_integration.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 4fcd8f4ea..156fa70da 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -8,7 +8,7 @@ ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType, - NewTopic, + NewPartitions, NewTopic, ) from kafka.errors import ( BrokerResponseError, NoError, CoordinatorNotAvailableError, @@ -396,6 +396,26 @@ def test_create_delete_topics(kafka_admin_client): assert response.responses[0].error_code == 0 # NoError +@pytest.mark.skipif(env_kafka_version() < (1, 0), reason="CreatePartitions requires broker >=1.0") +def test_create_partitions(kafka_admin_client, topic): + # topic fixture creates with 4 partitions by default + topic_metadata = kafka_admin_client.describe_topics([topic]) + assert len(topic_metadata) == 1 + original_count = len(topic_metadata[0]['partitions']) + assert original_count == 4 + + # Increase to 6 partitions + new_total = 6 + response = kafka_admin_client.create_partitions({topic: NewPartitions(new_total, [[0], [0]])}) + for result in response.results: + assert result[0] == topic + assert result[1] == 0 # NoError + + # Verify the new partition count + topic_metadata = kafka_admin_client.describe_topics([topic]) + assert len(topic_metadata[0]['partitions']) == new_total + + @pytest.mark.skipif(env_kafka_version() < (2, 2), reason="Leader Election requires broker >=2.2") def test_perform_leader_election(kafka_admin_client, topic): topic_metadata = kafka_admin_client.describe_topics([topic])[0] From d6e4b4a848106201a067882d47d7d100405e4a8c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 12 Apr 2026 16:26:54 -0700 Subject: [PATCH 4/5] pylint --- kafka/protocol/admin/topics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/protocol/admin/topics.py b/kafka/protocol/admin/topics.py index cccef5203..839e1c40b 100644 --- a/kafka/protocol/admin/topics.py +++ b/kafka/protocol/admin/topics.py @@ -7,7 +7,7 @@ class CreateTopicsResponse(ApiMessage): pass class DeleteTopicsRequest(ApiMessage): def encode(self, version=None, header=False, framed=False): # convert topics => topic_names - if self.topics and not self.topic_names: + if self.topics and not self.topic_names: # pylint: disable=E0203 self.topic_names = [topic.name for topic in self.topics] return super().encode(version=version, header=header, framed=framed) From 33691829a0c8a526555bcad82129eff150952746 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 12 Apr 2026 16:36:53 -0700 Subject: [PATCH 5/5] Only convert topics->topic_names for v0-v5 --- kafka/protocol/admin/topics.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kafka/protocol/admin/topics.py b/kafka/protocol/admin/topics.py index 839e1c40b..d399921af 100644 --- a/kafka/protocol/admin/topics.py +++ b/kafka/protocol/admin/topics.py @@ -6,9 +6,11 @@ class CreateTopicsResponse(ApiMessage): pass class DeleteTopicsRequest(ApiMessage): def encode(self, version=None, header=False, framed=False): - # convert topics => topic_names - if self.topics and not self.topic_names: # pylint: disable=E0203 - self.topic_names = [topic.name for topic in self.topics] + # convert topics => topic_names for v0-v5 + version = self.API_VERSION if version is None else version + if version is not None and version <= 5: + if self.topics and not self.topic_names: # pylint: disable=E0203 + self.topic_names = [topic.name for topic in self.topics] return super().encode(version=version, header=header, framed=framed) class DeleteTopicsResponse(ApiMessage): pass