diff --git a/kafka/admin/client.py b/kafka/admin/client.py index ece6bf7c9..12eee44bf 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -5,6 +5,7 @@ import selectors import socket import time +import uuid from . import ConfigResourceType @@ -398,38 +399,16 @@ async def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), .format(request, response)) return response - @staticmethod - def _convert_new_topic_request(new_topic): - """ - Build the tuple required by CreateTopicsRequest from a NewTopic object. - - Arguments: - new_topic: A NewTopic instance containing name, partition count, replication factor, - replica assignments, and config entries. - - Returns: - A tuple in the form: - (topic_name, num_partitions, replication_factor, [(partition_id, [replicas])...], - [(config_key, config_value)...]) - """ - return ( - new_topic.name, - new_topic.num_partitions, - new_topic.replication_factor, - [ - (partition_id, replicas) for partition_id, replicas in new_topic.replica_assignments.items() - ], - [ - (config_key, config_value) for config_key, config_value in new_topic.topic_configs.items() - ] - ) - def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True, wait_for_metadata=False): """Create new topics in the cluster. Arguments: - new_topics: A list of NewTopic objects. + new_topics: A list of topic names, + or a dict of {topic_name: {num_partitions:, replication_factor:, + assignments: {partition: [broker_ids]}, + configs: {key: value}}} + List of NewTopic objects is deprecated. Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for new topics to be created @@ -450,30 +429,56 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ """ if validate_only and wait_for_metadata: raise ValueError('validate_only and wait_for_metadata are mutually exclusive') - version = self._client.api_version(CreateTopicsRequest, max_version=3) timeout_ms = self._validate_timeout(timeout_ms) - if version == 0: - if validate_only: - raise IncompatibleBrokerVersion( - "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." - .format(self._manager.broker_version)) - request = CreateTopicsRequest[version]( - topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], - timeout_ms=timeout_ms - ) - elif version <= 3: - request = CreateTopicsRequest[version]( - topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], - timeout_ms=timeout_ms, - validate_only=validate_only - ) + if validate_only and self._manager.broker_version < (0, 10, 2): + raise IncompatibleBrokerVersion( + "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." + .format(self._manager.broker_version)) + + _Topic = CreateTopicsRequest.CreatableTopic + _Assignment = _Topic.CreatableReplicaAssignment + _Config = _Topic.CreatableTopicConfig + topics = [] + if isinstance(new_topics, dict): + # {topic_name: {num_partitions:, replication_factor:, assignments: {partition: [broker_ids]}, configs: {key: value}} + for topic, data in new_topics.items(): + configs = data.get('configs', {}) + topics.append(_Topic( + name=topic, + num_partitions=data.get('num_partitions', -1), + replication_factor=data.get('replication_factor', -1), + assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas) + for partition_id, replicas in data.get('assignments', {}).items()], + configs=[_Config(name=config_key, value=config_value) + for config_key, config_value in data.get('configs', {}).items()] + )) + elif all(isinstance(v, str) for v in new_topics): + for new_topic in new_topics: + topics.append(_Topic(name=new_topic)) else: - raise RuntimeError('Version check error: %s' % version) - # TODO convert structs to a more pythonic interface - def get_response_errors(r): + from .new_topic import NewTopic + if all(isinstance(v, NewTopic) for v in new_topics): + for new_topic in new_topics: + topics.append(_Topic( + name=new_topic.name, + num_partitions=new_topic.num_partitions, + replication_factor=new_topic.replication_factor, + assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas) + for partition_id, replicas in new_topic.replica_assignments.items()], + configs=[_Config(name=config_key, value=config_value) + for config_key, config_value in new_topic.topic_configs.items()] + )) + if not topics: + raise ValueError(f"No valid topics found in new_topics: {new_topics}") + + request = CreateTopicsRequest( + topics=topics, + timeout_ms=timeout_ms, + validate_only=validate_only) + def response_errors(r): for topic in r.topics: - yield Errors.for_code(topic[1]) - response = self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors) + yield Errors.for_code(topic.error_code) + response = self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) if wait_for_metadata: # implies not validate_only self.wait_for_topics([new_topic.name for new_topic in new_topics]) return response @@ -559,7 +564,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): """Delete topics from the cluster. Arguments: - topics ([str]): A list of topic name strings. + topics ([str]): A list of topic name strings or uuid.UUID ids. Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for topics to be deleted @@ -569,13 +574,17 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): Returns: Appropriate version of DeleteTopicsResponse class. """ - version = self._client.api_version(DeleteTopicsRequest, max_version=3) timeout_ms = self._validate_timeout(timeout_ms) - request = DeleteTopicsRequest[version](topic_names=topics, timeout_ms=timeout_ms) - def get_response_errors(r): + _Topic = DeleteTopicsRequest.DeleteTopicState + request = DeleteTopicsRequest( + topics=[_Topic(topic_id=topic) if isinstance(topic, uuid.UUID) else _Topic(name=topic) + for topic in topics], + timeout_ms=timeout_ms) + + def response_errors(r): for response in r.responses: - yield Errors.for_code(response[1]) - return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors) + yield Errors.for_code(response.error_code) + return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) def _process_acl_operations(self, obj): if obj.get('authorized_operations', None) is not None: @@ -1067,30 +1076,14 @@ def alter_configs(self, config_resources): # describe log dirs protocol not yet implemented # Note: have to lookup the broker with the replica assignment and send the request to that broker - @staticmethod - def _convert_create_partitions_request(topic_name, new_partitions): - """Convert a NewPartitions object into the tuple format for CreatePartitionsRequest. - - Arguments: - topic_name: The name of the existing topic. - new_partitions: A NewPartitions instance with total_count and new_assignments. - - Returns: - A tuple: (topic_name, (total_count, [list_of_assignments])). - """ - return ( - topic_name, - ( - new_partitions.total_count, - new_partitions.new_assignments - ) - ) - def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True): """Create additional partitions for an existing topic. Arguments: - topic_partitions: A map of topic name strings to NewPartition objects. + topic_partitions: A dict of topic name strings to total partition count (int), + or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}} + if manual assignment is desired. + dict of {topic_name: NewPartition} is deprecated. Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be @@ -1102,17 +1095,37 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal Returns: Appropriate version of CreatePartitionsResponse class. """ - version = self._client.api_version(CreatePartitionsRequest, max_version=1) timeout_ms = self._validate_timeout(timeout_ms) - request = CreatePartitionsRequest[version]( - topics=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()], + _Topic = CreatePartitionsRequest.CreatePartitionsTopic + _Assignment = CreatePartitionsRequest.CreatePartitionsTopic.CreatePartitionsAssignment + topics = [] + for topic, count in topic_partitions.items(): + if isinstance(count, int): + topics.append(_Topic(name=topic, count=count)) + elif isinstance(count, dict): + topics.append( + _Topic( + name=topic, + count=count['count'], + assignments=[_Assignment(broker_ids=broker_ids) + for broker_ids in count['assignments']])) + + else: + topics.append( + _Topic( + name=topic, + count=count.total_count, + assignments=[_Assignment(broker_ids=broker_ids) + for broker_ids in count.new_assignments])) + request = CreatePartitionsRequest( + topics=topics, timeout_ms=timeout_ms, - validate_only=validate_only - ) - def get_response_errors(r): + validate_only=validate_only) + + def response_errors(r): for result in r.results: - yield Errors.for_code(result[1]) - return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors) + yield Errors.for_code(result.error_code) + return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) def _get_leader_for_partitions(self, partitions, timeout_ms=None): """Finds ID of the leader node for every given topic partition. @@ -1644,22 +1657,19 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_ Returns: Appropriate version of ElectLeadersResponse class. """ - version = self._client.api_version(ElectLeadersRequest, max_version=1) timeout_ms = self._validate_timeout(timeout_ms) - request = ElectLeadersRequest[version]( + request = ElectLeadersRequest( election_type=ElectionType(election_type), topic_partitions=self._get_topic_partitions(topic_partitions), - timeout_ms=timeout_ms, - ) - # TODO convert structs to a more pythonic interface - def get_response_errors(r): + timeout_ms=timeout_ms) + def response_errors(r): if r.API_VERSION >= 1: yield Errors.for_code(r.error_code) for result in r.replica_election_results: - for partition in result[1]: - yield Errors.for_code(partition[1]) + for partition in result.partition_result: + yield Errors.for_code(partition.error_code) ignore_errors = (Errors.ElectionNotNeededError,) - return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors, ignore_errors) + return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors) def describe_log_dirs(self): """Send a DescribeLogDirsRequest request to a broker. diff --git a/kafka/cli/admin/topics/create.py b/kafka/cli/admin/topics/create.py index 0844b411b..7112e4e7f 100644 --- a/kafka/cli/admin/topics/create.py +++ b/kafka/cli/admin/topics/create.py @@ -1,6 +1,3 @@ -from kafka.admin.new_topic import NewTopic - - class CreateTopic: @classmethod @@ -13,4 +10,4 @@ def add_subparser(cls, subparsers): @classmethod def command(cls, client, args): - return client.create_topics([NewTopic(args.topic, args.num_partitions, args.replication_factor)]) + return client.create_topics({args.topic: {'num_partitions': args.num_partitions, 'replication_factor': args.replication_factor}}) diff --git a/kafka/cli/admin/topics/delete.py b/kafka/cli/admin/topics/delete.py index 70a1e749b..4193b65c0 100644 --- a/kafka/cli/admin/topics/delete.py +++ b/kafka/cli/admin/topics/delete.py @@ -1,7 +1,19 @@ +import uuid + + class DeleteTopic: @classmethod def add_subparser(cls, subparsers): parser = subparsers.add_parser('delete', help='Delete Kafka Topic') - parser.add_argument('-t', '--topic', type=str, required=True) - parser.set_defaults(command=lambda cli, args: cli.delete_topics([args.topic])) + parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='topic name') + parser.add_argument('--id', type=str, action='append', dest='topic_ids', help='topic UUID') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + if not args.topics and not args.topic_ids: + raise ValueError('At least one topic or topic_id is required!') + topic_ids = [uuid.UUID(topic_id) for topic_id in args.topic_ids] + topic_names = args.topics + return client.delete_topics(topic_names + topic_ids) diff --git a/kafka/protocol/admin/topics.py b/kafka/protocol/admin/topics.py index aa3adeee9..d399921af 100644 --- a/kafka/protocol/admin/topics.py +++ b/kafka/protocol/admin/topics.py @@ -4,7 +4,15 @@ class CreateTopicsRequest(ApiMessage): pass class CreateTopicsResponse(ApiMessage): pass -class DeleteTopicsRequest(ApiMessage): pass +class DeleteTopicsRequest(ApiMessage): + def encode(self, version=None, header=False, framed=False): + # convert topics => topic_names for v0-v5 + version = self.API_VERSION if version is None else version + if version is not None and version <= 5: + if self.topics and not self.topic_names: # pylint: disable=E0203 + self.topic_names = [topic.name for topic in self.topics] + return super().encode(version=version, header=header, framed=framed) + class DeleteTopicsResponse(ApiMessage): pass class CreatePartitionsRequest(ApiMessage): pass diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 3cddeb761..156fa70da 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -8,7 +8,7 @@ ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType, - NewTopic, + NewPartitions, NewTopic, ) from kafka.errors import ( BrokerResponseError, NoError, CoordinatorNotAvailableError, @@ -388,12 +388,32 @@ def test_delete_records_with_errors(kafka_admin_client, topic, send_messages): def test_create_delete_topics(kafka_admin_client): topic_name = random_string(4) response = kafka_admin_client.create_topics([NewTopic(topic_name, 1, 1)]) - assert response.topics[0][0] == topic_name - assert response.topics[0][1] == 0 # NoError + assert response.topics[0].name == topic_name + assert response.topics[0].error_code == 0 # NoError response = kafka_admin_client.delete_topics([topic_name]) - assert response.responses[0][0] == topic_name - assert response.responses[0][1] == 0 # NoError + assert response.responses[0].name == topic_name + assert response.responses[0].error_code == 0 # NoError + + +@pytest.mark.skipif(env_kafka_version() < (1, 0), reason="CreatePartitions requires broker >=1.0") +def test_create_partitions(kafka_admin_client, topic): + # topic fixture creates with 4 partitions by default + topic_metadata = kafka_admin_client.describe_topics([topic]) + assert len(topic_metadata) == 1 + original_count = len(topic_metadata[0]['partitions']) + assert original_count == 4 + + # Increase to 6 partitions + new_total = 6 + response = kafka_admin_client.create_partitions({topic: NewPartitions(new_total, [[0], [0]])}) + for result in response.results: + assert result[0] == topic + assert result[1] == 0 # NoError + + # Verify the new partition count + topic_metadata = kafka_admin_client.describe_topics([topic]) + assert len(topic_metadata[0]['partitions']) == new_total @pytest.mark.skipif(env_kafka_version() < (2, 2), reason="Leader Election requires broker >=2.2")