diff --git a/kafka/admin/_acls.py b/kafka/admin/_acls.py index 07ddfa61a..85f7be4c9 100644 --- a/kafka/admin/_acls.py +++ b/kafka/admin/_acls.py @@ -26,6 +26,12 @@ class ACLAdminMixin: _client: object config: dict + # ACL Helper for Metadata / DescribeGroups + def _process_acl_operations(self, obj): + if obj.get('authorized_operations', None) is not None: + obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations']))) + return obj + def describe_acls(self, acl_filter): """Describe a set of ACLs diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index 28ad6c54e..3d7b55282 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -133,21 +133,17 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include # -- List consumer groups -------------------------------------------------- def _list_consumer_groups_request(self): - version = self._client.api_version(ListGroupsRequest, max_version=2) - return ListGroupsRequest[version]() + # TODO: KIP-518: StatesFilter + # TODO: KIP-848: TypesFilter + return ListGroupsRequest() def _list_consumer_groups_process_response(self, response): """Process a ListGroupsResponse into a list of groups.""" - if response.API_VERSION <= 2: - error_type = Errors.for_code(response.error_code) - if error_type is not Errors.NoError: - raise error_type( - "ListGroupsRequest failed with response '{}'." - .format(response)) - else: - raise NotImplementedError( - "Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient." - .format(response.API_VERSION)) + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + raise error_type( + "ListGroupsRequest failed with response '{}'." + .format(response)) return [(group.group_id, group.protocol_type) for group in response.groups] async def _async_list_consumer_groups(self, broker_ids=None): @@ -189,25 +185,25 @@ def list_consumer_groups(self, broker_ids=None): # -- List consumer group offsets ------------------------------------------- def _list_consumer_group_offsets_request(self, group_id, partitions=None): - version = self._client.api_version(OffsetFetchRequest, max_version=5) + _Topic = OffsetFetchRequest.OffsetFetchRequestTopic if partitions is None: - if version <= 1: - raise ValueError( - """OffsetFetchRequest_v{} requires specifying the - partitions for which to fetch offsets. Omitting the - partitions is only supported on brokers >= 0.10.2. - For details, see KIP-88.""".format(version)) - topics_partitions = None + min_version = 1 + topics = None else: + min_version = 0 topics_partitions_dict = defaultdict(set) for topic, partition in partitions: topics_partitions_dict[topic].add(partition) - topics_partitions = list(topics_partitions_dict.items()) - return OffsetFetchRequest[version](group_id, topics_partitions) + topics = [ + _Topic(name=name, partition_indexes=list(partitions)) + for name, partitions in topics_partitions_dict.items() + ] + return OffsetFetchRequest(group_id=group_id, topics=topics, + min_version=min_version, max_version=6) def _list_consumer_group_offsets_process_response(self, response): """Process an OffsetFetchResponse.""" - if response.API_VERSION <= 5: + if response.API_VERSION <= 6: if response.API_VERSION > 1: error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: @@ -271,12 +267,11 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, # -- Delete consumer groups ------------------------------------------------ def _delete_consumer_groups_request(self, group_ids): - version = self._client.api_version(DeleteGroupsRequest, max_version=1) - return DeleteGroupsRequest[version](group_ids) + return DeleteGroupsRequest(groups_names=group_ids) def _convert_delete_groups_response(self, response): """Parse a DeleteGroupsResponse.""" - if response.API_VERSION <= 1: + if response.API_VERSION <= 2: results = [] for group_id, error_code in response.results: results.append((group_id, Errors.for_code(error_code))) diff --git a/kafka/admin/_metadata.py b/kafka/admin/_metadata.py index dfbcaab5e..0a64c3dfa 100644 --- a/kafka/admin/_metadata.py +++ b/kafka/admin/_metadata.py @@ -5,7 +5,6 @@ import logging from typing import TYPE_CHECKING -from kafka.admin._acls import valid_acl_operations from kafka.protocol.metadata import MetadataRequest if TYPE_CHECKING: @@ -18,11 +17,6 @@ class MetadataAdminMixin: """Mixin providing cluster metadata methods for KafkaAdminClient.""" _manager: KafkaConnectionManager - def _process_acl_operations(self, obj): - if obj.get('authorized_operations', None) is not None: - obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations']))) - return obj - async def _get_cluster_metadata(self, topics): """topics = [] for no topics, None for all.""" request = MetadataRequest( @@ -40,28 +34,6 @@ async def _get_cluster_metadata(self, topics): self._process_acl_operations(topic) return metadata - def list_topics(self): - """Retrieve a list of all topic names in the cluster. - - Returns: - A list of topic name strings. - """ - metadata = self._manager.run(self._get_cluster_metadata, None) - return [t['name'] for t in metadata['topics']] - - def describe_topics(self, topics=None): - """Fetch metadata for the specified topics or all topics if None. - - Keyword Arguments: - topics ([str], optional) A list of topic names. If None, metadata for all - topics is retrieved. - - Returns: - A list of dicts describing each topic (including partition info). - """ - metadata = self._manager.run(self._get_cluster_metadata, topics) - return metadata['topics'] - def describe_cluster(self): """Fetch cluster-wide metadata such as the list of brokers, the controller ID, and the cluster ID. diff --git a/kafka/admin/_records.py b/kafka/admin/_records.py index feb8ee410..0e15bd0fa 100644 --- a/kafka/admin/_records.py +++ b/kafka/admin/_records.py @@ -162,14 +162,13 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_ election_type=ElectionType(election_type), topic_partitions=self._get_topic_partitions(topic_partitions), timeout_ms=timeout_ms, - max_version=1, ) def response_errors(r): if r.API_VERSION >= 1: yield Errors.for_code(r.error_code) for result in r.replica_election_results: - for partition in result[1]: - yield Errors.for_code(partition[1]) + for partition in result.partition_result: + yield Errors.for_code(partition.error_code) ignore_errors = (Errors.ElectionNotNeededError,) return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors) diff --git a/kafka/admin/_topics.py b/kafka/admin/_topics.py index c448efb00..d09b006f3 100644 --- a/kafka/admin/_topics.py +++ b/kafka/admin/_topics.py @@ -8,6 +8,7 @@ import logging import time from typing import TYPE_CHECKING +import uuid import kafka.errors as Errors from kafka.errors import IncompatibleBrokerVersion @@ -25,26 +26,80 @@ class TopicAdminMixin: _client: object config: dict + def list_topics(self): + """Retrieve a list of all topic names in the cluster. + + Returns: + A list of topic name strings. + """ + metadata = self._manager.run(self._get_cluster_metadata, None) + return [t['name'] for t in metadata['topics']] + + def describe_topics(self, topics=None): + """Fetch metadata for the specified topics or all topics if None. + + Keyword Arguments: + topics ([str], optional) A list of topic names. If None, metadata for all + topics is retrieved. + + Returns: + A list of dicts describing each topic (including partition info). + """ + metadata = self._manager.run(self._get_cluster_metadata, topics) + return metadata['topics'] + @staticmethod - def _convert_new_topic_request(new_topic): - return ( - new_topic.name, - new_topic.num_partitions, - new_topic.replication_factor, - [ - (partition_id, replicas) for partition_id, replicas in new_topic.replica_assignments.items() - ], - [ - (config_key, config_value) for config_key, config_value in new_topic.topic_configs.items() - ] - ) + def _process_create_topics_input(new_topics): + _Topic = CreateTopicsRequest.CreatableTopic + _Assignment = _Topic.CreatableReplicaAssignment + _Config = _Topic.CreatableTopicConfig + topics = [] + if isinstance(new_topics, dict): + # {topic_name: {num_partitions:, replication_factor:, assignments: {partition: [broker_ids]}, configs: {key: value}} + for topic, data in new_topics.items(): + configs = data.get('configs', {}) + topics.append(_Topic( + name=topic, + num_partitions=data.get('num_partitions', -1), + replication_factor=data.get('replication_factor', -1), + assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas) + for partition_id, replicas in data.get('assignments', {}).items()], + configs=[_Config(name=config_key, value=config_value) + for config_key, config_value in data.get('configs', {}).items()] + )) + elif all(isinstance(v, str) for v in new_topics): + for new_topic in new_topics: + topics.append(_Topic(name=new_topic, num_partitions=-1, replication_factor=-1)) + else: + if all(isinstance(v, NewTopic) for v in new_topics): + for new_topic in new_topics: + topics.append(_Topic( + name=new_topic.name, + num_partitions=new_topic.num_partitions, + replication_factor=new_topic.replication_factor, + assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas) + for partition_id, replicas in new_topic.replica_assignments.items()], + configs=[_Config(name=config_key, value=config_value) + for config_key, config_value in new_topic.topic_configs.items()] + )) + if not topics: + raise ValueError(f"No valid topics found in new_topics: {new_topics}") + return topics def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True, wait_for_metadata=False): """Create new topics in the cluster. Arguments: - new_topics: A list of NewTopic objects. + new_topics: A list of topic names, + or a dict of {topic_name: {num_partitions: int (default -1), + replication_factor: int (default -1), + assignments: {partition: [broker_ids]}, + configs: {key: value}}} + All keys are optional. + List of NewTopic objects is deprecated. + Note: for brokers < 2.4, num_partitions and replication_factor + are required and must be provided via dict or [NewTopic]. Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for new topics to be created @@ -65,8 +120,15 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." .format(self._manager.broker_version)) + topics = self._process_create_topics_input(new_topics) + if self._manager.broker_version < (2, 4): + if any(topic.num_partitions == -1 or topic.replication_factor == -1 for topic in topics): + raise IncompatibleBrokerVersion( + "Broker version {} requires explicit num_partitions and replication_factor" + .format(self._manager.broker_version)) + request = CreateTopicsRequest( - topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], + topics=topics, timeout_ms=timeout_ms, validate_only=validate_only, max_version=3, @@ -76,7 +138,7 @@ def response_errors(r): yield Errors.for_code(topic.error_code) response = self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) if wait_for_metadata: - self.wait_for_topics([new_topic.name for new_topic in new_topics]) + self.wait_for_topics([new_topic.name for new_topic in request.topics]) return response def wait_for_topics(self, topic_names, timeout_ms=10000): @@ -155,7 +217,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): """Delete topics from the cluster. Arguments: - topics ([str]): A list of topic name strings. + topics ([str]): A list of topic name strings or uuid.UUID ids. Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for topics to be deleted @@ -166,35 +228,18 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): Appropriate version of DeleteTopicsResponse class. """ timeout_ms = self._validate_timeout(timeout_ms) + _Topic = DeleteTopicsRequest.DeleteTopicState request = DeleteTopicsRequest( - topic_names=topics, timeout_ms=timeout_ms, - max_version=5, - ) + topics=[_Topic(topic_id=topic) if isinstance(topic, uuid.UUID) else _Topic(name=topic) + for topic in topics], + timeout_ms=timeout_ms) def response_errors(r): for response in r.responses: yield Errors.for_code(response.error_code) return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) - def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True): - """Create additional partitions for an existing topic. - - Arguments: - topic_partitions: A dict of topic name strings to total partition count (int), - or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}} - if manual assignment is desired. - dict of {topic_name: NewPartition} is deprecated. - - Keyword Arguments: - timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be - created before the broker returns. - validate_only (bool, optional): If True, don't actually create new partitions. - Default: False - raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. - - Returns: - Appropriate version of CreatePartitionsResponse class. - """ - timeout_ms = self._validate_timeout(timeout_ms) + @staticmethod + def _process_create_partitions_input(topic_partitions): _Topic = CreatePartitionsRequest.CreatePartitionsTopic _Assignment = CreatePartitionsRequest.CreatePartitionsTopic.CreatePartitionsAssignment topics = [] @@ -208,7 +253,6 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal count=count['count'], assignments=[_Assignment(broker_ids=broker_ids) for broker_ids in count['assignments']])) - else: topics.append( _Topic( @@ -216,8 +260,30 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal count=count.total_count, assignments=[_Assignment(broker_ids=broker_ids) for broker_ids in count.new_assignments])) + return topics + + def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True): + """Create additional partitions for an existing topic. + + Arguments: + topic_partitions: A dict of topic name strings to total partition count (int), + or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}} + if manual assignment is desired. + dict of {topic_name: NewPartition} is deprecated. + + Keyword Arguments: + timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be + created before the broker returns. + validate_only (bool, optional): If True, don't actually create new partitions. + Default: False + raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. + + Returns: + Appropriate version of CreatePartitionsResponse class. + """ + timeout_ms = self._validate_timeout(timeout_ms) request = CreatePartitionsRequest( - topics=topics, + topics=self._process_create_partitions_input(topic_partitions), timeout_ms=timeout_ms, validate_only=validate_only) @@ -228,7 +294,7 @@ def response_errors(r): class NewTopic: - """A class for new topic creation. + """DEPRECATED: A class for new topic creation. Arguments: name (string): name of the topic @@ -251,7 +317,7 @@ def __init__(self, name, num_partitions=-1, replication_factor=-1, class NewPartitions: - """A class for new partition creation on existing topics. + """DEPRECATED: A class for new partition creation on existing topics. Note that the length of new_assignments, if specified, must be the difference between the new total number of partitions and the existing diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 769be843b..10a81a31a 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -14,7 +14,7 @@ BrokerResponseError, NoError, CoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError, ElectionNotNeededError, - KafkaTimeoutError + KafkaTimeoutError, IncompatibleBrokerVersion ) from kafka.structs import TopicPartition from test.testutil import env_kafka_version, random_string @@ -247,7 +247,7 @@ def consumer_thread(i, group_id): @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") -def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_messages): +def test_delete_consumer_groups(kafka_admin_client, kafka_consumer_factory, send_messages): random_group_id = 'test-group-' + random_string(6) group1 = random_group_id + "_1" group2 = random_group_id + "_2" @@ -266,10 +266,10 @@ def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_ next(consumer3) consumer3.close() - consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert group1 in consumergroups - assert group2 in consumergroups - assert group3 in consumergroups + groups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 in groups + assert group2 in groups + assert group3 in groups delete_results = { group_id: error @@ -279,14 +279,14 @@ def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_ assert delete_results[group2] == NoError assert group3 not in delete_results - consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert group1 not in consumergroups - assert group2 not in consumergroups - assert group3 in consumergroups + groups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 not in groups + assert group2 not in groups + assert group3 in groups @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") -def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages): +def test_delete_consumer_groups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages): random_group_id = 'test-group-' + random_string(6) group1 = random_group_id + "_1" group2 = random_group_id + "_2" @@ -300,24 +300,23 @@ def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_fa consumer2 = kafka_consumer_factory(group_id=group2) next(consumer2) - consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert group1 in consumergroups - assert group2 in consumergroups - assert group3 not in consumergroups + groups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 in groups + assert group2 in groups + assert group3 not in groups delete_results = { group_id: error for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2, group3]) } - assert delete_results[group1] == NoError assert delete_results[group2] == NonEmptyGroupError assert delete_results[group3] == GroupIdNotFoundError - consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert group1 not in consumergroups - assert group2 in consumergroups - assert group3 not in consumergroups + groups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 not in groups + assert group2 in groups + assert group3 not in groups @pytest.fixture(name="topic2") def _topic2(kafka_broker, request): @@ -326,6 +325,7 @@ def _topic2(kafka_broker, request): create_topics(kafka_broker, [topic_name]) return topic_name + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0") def test_delete_records(kafka_admin_client, kafka_consumer_factory, send_messages, topic, topic2): t0p0 = TopicPartition(topic, 0) @@ -396,6 +396,33 @@ def test_create_delete_topics(kafka_admin_client): assert response.responses[0].name == topic_name assert response.responses[0].error_code == 0 # NoError + topic_name = random_string(4) + response = kafka_admin_client.create_topics({topic_name: {'num_partitions': 1, 'replication_factor': 1}}) + assert response.topics[0].name == topic_name + assert response.topics[0].error_code == 0 # NoError + + response = kafka_admin_client.delete_topics([topic_name]) + assert response.responses[0].name == topic_name + assert response.responses[0].error_code == 0 # NoError + + # Create topics requires explicit num_partitions/replication_factor on < 2.4 + if env_kafka_version() < (2, 4): + with pytest.raises(IncompatibleBrokerVersion): + kafka_admin_client.create_topics([topic_name]) + + with pytest.raises(IncompatibleBrokerVersion): + kafka_admin_client.create_topics({topic_name: {'num_partitions': 2}}) + + else: + topic_name = random_string(4) + response = kafka_admin_client.create_topics([topic_name]) + assert response.topics[0].name == topic_name + assert response.topics[0].error_code == 0 # NoError + + response = kafka_admin_client.delete_topics([topic_name]) + assert response.responses[0].name == topic_name + assert response.responses[0].error_code == 0 # NoError + @pytest.mark.skipif(env_kafka_version() < (1, 0), reason="CreatePartitions requires broker >=1.0") def test_create_partitions(kafka_admin_client, topic):