diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index c67fb9e6a..f120bfe63 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -1,9 +1,8 @@ -from kafka.admin.config_resource import ConfigResource, ConfigResourceType +from kafka.admin._configs import ConfigResource, ConfigResourceType from kafka.admin.client import KafkaAdminClient -from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation, - ResourceType, ACLPermissionType, ACLResourcePatternType) -from kafka.admin.new_topic import NewTopic -from kafka.admin.new_partitions import NewPartitions +from kafka.admin._acls import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation, + ResourceType, ACLPermissionType, ACLResourcePatternType) +from kafka.admin._topics import NewTopic, NewPartitions __all__ = [ 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions', 'ACL', 'ACLFilter', diff --git a/kafka/admin/_acls.py b/kafka/admin/_acls.py new file mode 100644 index 000000000..07ddfa61a --- /dev/null +++ b/kafka/admin/_acls.py @@ -0,0 +1,404 @@ +"""ACL management mixin for KafkaAdminClient. + +Also defines ACL data types: ResourceType, ACLOperation, ACLPermissionType, +ACLResourcePatternType, ACLFilter, ACL, ResourcePatternFilter, ResourcePattern. +""" + +from __future__ import annotations + +import logging +from enum import IntEnum +from typing import TYPE_CHECKING + +import kafka.errors as Errors +from kafka.errors import IllegalArgumentError +from kafka.protocol.admin import CreateAclsRequest, DeleteAclsRequest, DescribeAclsRequest + +if TYPE_CHECKING: + from kafka.net.manager import KafkaConnectionManager + +log = logging.getLogger(__name__) + + +class ACLAdminMixin: + """Mixin providing ACL management methods for KafkaAdminClient.""" + _manager: KafkaConnectionManager + _client: object + config: dict + + def describe_acls(self, acl_filter): + """Describe a set of ACLs + + Used to return a set of ACLs matching the supplied ACLFilter. + The cluster must be configured with an authorizer for this to work, or + you will get a SecurityDisabledError + + Arguments: + acl_filter: an ACLFilter object + + Returns: + tuple of a list of matching ACL objects and a KafkaError (NoError if successful) + """ + + version = self._client.api_version(DescribeAclsRequest, max_version=1) + if version == 0: + request = DescribeAclsRequest[version]( + resource_type_filter=acl_filter.resource_pattern.resource_type, + resource_name_filter=acl_filter.resource_pattern.resource_name, + principal_filter=acl_filter.principal, + host_filter=acl_filter.host, + operation=acl_filter.operation, + permission_type=acl_filter.permission_type + ) + elif version <= 1: + request = DescribeAclsRequest[version]( + resource_type_filter=acl_filter.resource_pattern.resource_type, + resource_name_filter=acl_filter.resource_pattern.resource_name, + pattern_type_filter=acl_filter.resource_pattern.pattern_type, + principal_filter=acl_filter.principal, + host_filter=acl_filter.host, + operation=acl_filter.operation, + permission_type=acl_filter.permission_type + ) + response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + + return self._convert_describe_acls_response_to_acls(response) + + @staticmethod + def _convert_describe_acls_response_to_acls(describe_response): + """Convert a DescribeAclsResponse into a list of ACL objects and a KafkaError. + + Arguments: + describe_response: The response object from the DescribeAclsRequest. + + Returns: + A tuple of (list_of_acl_objects, error) where error is an instance + of KafkaError (NoError if successful). + """ + error_type = Errors.for_code(describe_response.error_code) + if error_type is not Errors.NoError: + raise error_type( + "Request '{}' failed with response '{}'." + .format("DescribeAclsRequest", describe_response)) + acl_list = [] + for resource in describe_response.resources: + for acl in resource.acls: + acl_list.append( + ACL( + principal=acl.principal, + host=acl.host, + operation=ACLOperation(acl.operation), + permission_type=ACLPermissionType(acl.permission_type), + resource_pattern=ResourcePattern( + resource_type=ResourceType(resource.resource_type), + resource_name=resource.resource_name, + pattern_type=ACLResourcePatternType(resource.pattern_type)))) + return acl_list, Errors.NoError + + @staticmethod + def _convert_create_acls_resource_request_v0(acl): + """Convert an ACL object into the CreateAclsRequest v0 format.""" + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_create_acls_resource_request_v1(acl): + """Convert an ACL object into the CreateAclsRequest v1 format.""" + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.resource_pattern.pattern_type, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_create_acls_response_to_acls(acls, create_response): + """Parse a CreateAclsResponse, returning a dict of successes and failures.""" + results = {'succeeded': [], 'failed': []} + for i, result in enumerate(create_response.results): + acl = acls[i] + if result.error_code == 0: + results['succeeded'].append(acl) + else: + results['failed'].append(Errors.for_code(result.error_code)) + return results + + def create_acls(self, acls): + """Create a list of ACLs + + This endpoint only accepts a list of concrete ACL objects, no ACLFilters. + Throws TopicAlreadyExistsError if topic is already present. + + Arguments: + acls: a list of ACL objects + + Returns: + dict of successes and failures + """ + + for acl in acls: + if not isinstance(acl, ACL): + raise IllegalArgumentError("acls must contain ACL objects") + + version = self._client.api_version(CreateAclsRequest, max_version=1) + if version == 0: + request = CreateAclsRequest[version]( + creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls] + ) + elif version <= 1: + request = CreateAclsRequest[version]( + creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls] + ) + response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 + return self._convert_create_acls_response_to_acls(acls, response) + + @staticmethod + def _convert_delete_acls_resource_request_v0(acl): + """Convert an ACLFilter object into the DeleteAclsRequest v0 format.""" + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_delete_acls_resource_request_v1(acl): + """Convert an ACLFilter object into the DeleteAclsRequest v1 format.""" + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.resource_pattern.pattern_type, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response): + """Parse a DeleteAclsResponse, returning a list of (filter, matched ACLs, error) tuples.""" + results = [] + for i, result in enumerate(delete_response.filter_results): + acl_filter = acl_filters[i] + error_type = Errors.for_code(result.error_code) + matching_acls = [] + for acl in result.matching_acls: + error = Errors.for_code(acl.error_code) + matching_acls.append( + ACL( + principal=acl.principal, + host=acl.host, + operation=ACLOperation(acl.operation), + permission_type=ACLPermissionType(acl.permission_type), + resource_pattern=ResourcePattern( + resource_type=ResourceType(acl.resource_type), + resource_name=acl.resource_name, + pattern_type=ACLResourcePatternType(acl.pattern_type)))) + results.append((acl_filter, matching_acls, error_type)) + return results + + def delete_acls(self, acl_filters): + """Delete a set of ACLs + + Deletes all ACLs matching the list of input ACLFilter + + Arguments: + acl_filters: a list of ACLFilter + + Returns: + a list of 3-tuples corresponding to the list of input filters. + The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance) + """ + + for acl in acl_filters: + if not isinstance(acl, ACLFilter): + raise IllegalArgumentError("acl_filters must contain ACLFilter type objects") + + version = self._client.api_version(DeleteAclsRequest, max_version=1) + + if version == 0: + request = DeleteAclsRequest[version]( + filters=[self._convert_delete_acls_resource_request_v0(acl) for acl in acl_filters] + ) + elif version <= 1: + request = DeleteAclsRequest[version]( + filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters] + ) + response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 + return self._convert_delete_acls_response_to_matching_acls(acl_filters, response) + + +# --------------------------------------------------------------------------- +# ACL data types +# --------------------------------------------------------------------------- + + +class ResourceType(IntEnum): + """Type of kafka resource to set ACL for. + + The ANY value is only valid in a filter context. + """ + UNKNOWN = 0 + ANY = 1 + TOPIC = 2 + GROUP = 3 + CLUSTER = 4 + TRANSACTIONAL_ID = 5 + DELEGATION_TOKEN = 6 + + +class ACLOperation(IntEnum): + """Type of operation. + + The ANY value is only valid in a filter context. + """ + UNKNOWN = 0 + ANY = 1 + ALL = 2 + READ = 3 + WRITE = 4 + CREATE = 5 + DELETE = 6 + ALTER = 7 + DESCRIBE = 8 + CLUSTER_ACTION = 9 + DESCRIBE_CONFIGS = 10 + ALTER_CONFIGS = 11 + IDEMPOTENT_WRITE = 12 + CREATE_TOKENS = 13 + DESCRIBE_TOKENS = 14 + + +class ACLPermissionType(IntEnum): + """An enumerated type of permissions. + + The ANY value is only valid in a filter context. + """ + UNKNOWN = 0 + ANY = 1 + DENY = 2 + ALLOW = 3 + + +class ACLResourcePatternType(IntEnum): + """An enumerated type of resource patterns. + + More details on the pattern types and how they work + can be found in KIP-290 (Support for prefixed ACLs). + """ + UNKNOWN = 0 + ANY = 1 + MATCH = 2 + LITERAL = 3 + PREFIXED = 4 + + +class ResourcePatternFilter: + def __init__(self, resource_type, resource_name, pattern_type): + self.resource_type = resource_type + self.resource_name = resource_name + self.pattern_type = pattern_type + self.validate() + + def validate(self): + if not isinstance(self.resource_type, ResourceType): + raise IllegalArgumentError("resource_type must be a ResourceType object") + if not isinstance(self.pattern_type, ACLResourcePatternType): + raise IllegalArgumentError("pattern_type must be an ACLResourcePatternType object") + + def __repr__(self): + return "".format( + self.resource_type.name, self.resource_name, self.pattern_type.name) + + def __eq__(self, other): + return all(( + self.resource_type == other.resource_type, + self.resource_name == other.resource_name, + self.pattern_type == other.pattern_type, + )) + + def __hash__(self): + return hash((self.resource_type, self.resource_name, self.pattern_type)) + + +class ResourcePattern(ResourcePatternFilter): + """A resource pattern to apply the ACL to.""" + def __init__(self, resource_type, resource_name, pattern_type=ACLResourcePatternType.LITERAL): + super().__init__(resource_type, resource_name, pattern_type) + self.validate() + + def validate(self): + if self.resource_type == ResourceType.ANY: + raise IllegalArgumentError("resource_type cannot be ANY") + if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]: + raise IllegalArgumentError( + "pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name)) + + +class ACLFilter: + """Represents a filter to use with describing and deleting ACLs.""" + def __init__(self, principal, host, operation, permission_type, resource_pattern): + self.principal = principal + self.host = host + self.operation = operation + self.permission_type = permission_type + self.resource_pattern = resource_pattern + self.validate() + + def validate(self): + if not isinstance(self.operation, ACLOperation): + raise IllegalArgumentError("operation must be an ACLOperation object, and cannot be ANY") + if not isinstance(self.permission_type, ACLPermissionType): + raise IllegalArgumentError("permission_type must be an ACLPermissionType object, and cannot be ANY") + if not isinstance(self.resource_pattern, ResourcePatternFilter): + raise IllegalArgumentError("resource_pattern must be a ResourcePatternFilter object") + + def __repr__(self): + return "".format( + principal=self.principal, host=self.host, operation=self.operation.name, + type=self.permission_type.name, resource=self.resource_pattern) + + def __eq__(self, other): + return all(( + self.principal == other.principal, self.host == other.host, + self.operation == other.operation, self.permission_type == other.permission_type, + self.resource_pattern == other.resource_pattern)) + + def __hash__(self): + return hash((self.principal, self.host, self.operation, self.permission_type, self.resource_pattern)) + + +class ACL(ACLFilter): + """Represents a concrete ACL for a specific ResourcePattern.""" + def __init__(self, principal, host, operation, permission_type, resource_pattern): + super().__init__(principal, host, operation, permission_type, resource_pattern) + self.validate() + + def validate(self): + if self.operation == ACLOperation.ANY: + raise IllegalArgumentError("operation cannot be ANY") + if self.permission_type == ACLPermissionType.ANY: + raise IllegalArgumentError("permission_type cannot be ANY") + if not isinstance(self.resource_pattern, ResourcePattern): + raise IllegalArgumentError("resource_pattern must be a ResourcePattern object") + + +def valid_acl_operations(int_vals): + return set([ACLOperation(v) for v in int_vals if v not in (0, 1, 2)]) diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py new file mode 100644 index 000000000..2e2c276b0 --- /dev/null +++ b/kafka/admin/_configs.py @@ -0,0 +1,152 @@ +"""Configuration management mixin for KafkaAdminClient. + +Also defines ConfigResource and ConfigResourceType data classes. +""" + +from __future__ import annotations + +import logging +from enum import IntEnum +from typing import TYPE_CHECKING + +from kafka.errors import IncompatibleBrokerVersion +from kafka.protocol.admin import AlterConfigsRequest, DescribeConfigsRequest + +if TYPE_CHECKING: + from kafka.net.manager import KafkaConnectionManager + +log = logging.getLogger(__name__) + + +class ConfigAdminMixin: + """Mixin providing configuration management methods for KafkaAdminClient.""" + _manager: KafkaConnectionManager + _client: object + config: dict + + @staticmethod + def _convert_describe_config_resource_request(config_resource): + return ( + config_resource.resource_type, + config_resource.name, + [config_key for config_key, config_value in config_resource.configs.items()] if config_resource.configs else None + ) + + async def _async_describe_configs(self, config_resources, include_synonyms=False): + broker_resources = [] + topic_resources = [] + + for config_resource in config_resources: + if config_resource.resource_type == ConfigResourceType.BROKER: + broker_resources.append(self._convert_describe_config_resource_request(config_resource)) + else: + topic_resources.append(self._convert_describe_config_resource_request(config_resource)) + + version = self._client.api_version(DescribeConfigsRequest, max_version=2) + if include_synonyms and version == 0: + raise IncompatibleBrokerVersion( + "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}." + .format(self._manager.broker_version)) + + results = [] + for broker_resource in broker_resources: + try: + broker_id = int(broker_resource[1]) + except ValueError: + raise ValueError("Broker resource names must be an integer or a string represented integer") + if version == 0: + request = DescribeConfigsRequest[version](resources=[broker_resource]) + else: + request = DescribeConfigsRequest[version]( + resources=[broker_resource], + include_synonyms=include_synonyms) + results.append(await self._manager.send(request, node_id=broker_id)) + + if topic_resources: + if version == 0: + request = DescribeConfigsRequest[version](resources=topic_resources) + else: + request = DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms) + results.append(await self._manager.send(request)) + + return results + + def describe_configs(self, config_resources, include_synonyms=False): + """Fetch configuration parameters for one or more Kafka resources. + + Arguments: + config_resources: An list of ConfigResource objects. + Any keys in ConfigResource.configs dict will be used to filter the + result. Setting the configs dict to None will get all values. An + empty dict will get zero values (as per Kafka protocol). + + Keyword Arguments: + include_synonyms (bool, optional): If True, return synonyms in response. Not + supported by all versions. Default: False. + + Returns: + List of DescribeConfigsResponses. + """ + return self._manager.run(self._async_describe_configs, config_resources, include_synonyms) + + @staticmethod + def _convert_alter_config_resource_request(config_resource): + return ( + config_resource.resource_type, + config_resource.name, + [ + (config_key, config_value) for config_key, config_value in config_resource.configs.items() + ] + ) + + async def _async_alter_configs(self, config_resources): + version = self._client.api_version(AlterConfigsRequest, max_version=1) + request = AlterConfigsRequest[version]( + resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] + ) + # TODO: BROKER resources should be sent to the specific broker + return await self._manager.send(request) + + def alter_configs(self, config_resources): + """Alter configuration parameters of one or more Kafka resources. + + Warning: + This is currently broken for BROKER resources because those must be + sent to that specific broker, versus this always picks the + least-loaded node. + + Arguments: + config_resources: A list of ConfigResource objects. + + Returns: + Appropriate version of AlterConfigsResponse class. + """ + return self._manager.run(self._async_alter_configs, config_resources) + + +class ConfigResourceType(IntEnum): + """An enumerated type of config resources""" + BROKER = 4 + TOPIC = 2 + + +class ConfigResource: + """A class for specifying config resources. + + Arguments: + resource_type (ConfigResourceType): the type of kafka resource + name (string): The name of the kafka resource + configs ({key : value}): A maps of config keys to values. + """ + def __init__(self, resource_type, name, configs=None): + if not isinstance(resource_type, ConfigResourceType): + resource_type = ConfigResourceType[str(resource_type).upper()] # pylint: disable-msg=unsubscriptable-object + self.resource_type = resource_type + self.name = name + self.configs = configs + + def __str__(self): + return "ConfigResource %s=%s" % (self.resource_type, self.name) + + def __repr__(self): + return "ConfigResource(%s, %s, %s)" % (self.resource_type, self.name, self.configs) diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py new file mode 100644 index 000000000..28ad6c54e --- /dev/null +++ b/kafka/admin/_groups.py @@ -0,0 +1,324 @@ +"""Consumer group management mixin for KafkaAdminClient.""" + +from __future__ import annotations + +import itertools +import logging +from collections import defaultdict +from typing import TYPE_CHECKING + +import kafka.errors as Errors +from kafka.admin._acls import valid_acl_operations +from kafka.protocol.admin import DeleteGroupsRequest, DescribeGroupsRequest, ListGroupsRequest +from kafka.protocol.consumer import OffsetFetchRequest +from kafka.protocol.consumer.metadata import ( + ConsumerProtocolAssignment, ConsumerProtocolSubscription, ConsumerProtocolType, +) +from kafka.structs import GroupInformation, MemberInformation, OffsetAndMetadata, TopicPartition + +if TYPE_CHECKING: + from kafka.net.manager import KafkaConnectionManager + +log = logging.getLogger(__name__) + + +class GroupAdminMixin: + """Mixin providing consumer group management methods for KafkaAdminClient.""" + _manager: KafkaConnectionManager + _client: object + _coordinator_cache: dict + config: dict + + # -- Describe consumer groups ---------------------------------------------- + + def _describe_consumer_groups_request(self, group_id): + version = self._client.api_version(DescribeGroupsRequest, max_version=3) + if version <= 2: + request = DescribeGroupsRequest[version](groups=(group_id,)) + else: + request = DescribeGroupsRequest[version]( + groups=(group_id,), + include_authorized_operations=True + ) + return request + + def _describe_consumer_groups_process_response(self, response): + """Process a DescribeGroupsResponse into a group description.""" + if response.API_VERSION > 3: + raise NotImplementedError( + "Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient." + .format(response.API_VERSION)) + + assert len(response.groups) == 1 + for response_name, response_field in response.fields.items(): + if response_name == 'groups': + described_group = getattr(response, response_name)[0] + described_group_information_list = [] + protocol_type_is_consumer = False + for group_information_name, group_information_field in response_field.fields.items(): + if not group_information_field.for_version_q(response.API_VERSION): + continue + described_group_information = getattr(described_group, group_information_name) + if group_information_name == 'protocol_type': + protocol_type = described_group_information + protocol_type_is_consumer = (protocol_type == ConsumerProtocolType or not protocol_type) + if group_information_name == 'members': + member_information_list = [] + for member in described_group_information: + member_information = [] + for attr_name, attr_field in group_information_field.fields.items(): + if not attr_field.for_version_q(response.API_VERSION): + continue + attr_val = getattr(member, attr_name) + if protocol_type_is_consumer: + if attr_name == 'member_metadata' and attr_val: + member_information.append(ConsumerProtocolSubscription.decode(attr_val)) + elif attr_name == 'member_assignment' and attr_val: + member_information.append(ConsumerProtocolAssignment.decode(attr_val)) + else: + member_information.append(attr_val) + member_info_tuple = MemberInformation._make(member_information) + member_information_list.append(member_info_tuple) + described_group_information_list.append(member_information_list) + else: + described_group_information_list.append(described_group_information) + if response.API_VERSION >= 3: + described_group_information_list[-1] = list(map(lambda acl: acl.name, valid_acl_operations(described_group_information_list[-1]))) + else: + described_group_information_list.append([]) + group_description = GroupInformation._make(described_group_information_list) + error_code = group_description.error_code + error_type = Errors.for_code(error_code) + if error_type is not Errors.NoError: + raise error_type( + "DescribeGroupsResponse failed with response '{}'." + .format(response)) + return group_description + assert False, "DescribeGroupsResponse parsing failed" + + async def _async_describe_consumer_groups(self, group_ids, group_coordinator_id=None): + results = [] + for group_id in group_ids: + coordinator_id = group_coordinator_id or await self._find_coordinator_id(group_id) + request = self._describe_consumer_groups_request(group_id) + response = await self._manager.send(request, node_id=coordinator_id) + results.append(self._describe_consumer_groups_process_response(response)) + return results + + def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False): + """Describe a set of consumer groups. + + Any errors are immediately raised. + + Arguments: + group_ids: A list of consumer group IDs. These are typically the + group names as strings. + + Keyword Arguments: + group_coordinator_id (int, optional): The node_id of the groups' coordinator + broker. If set to None, it will query the cluster for each group to + find that group's coordinator. Explicitly specifying this can be + useful for avoiding extra network round trips if you already know + the group coordinator. This is only useful when all the group_ids + have the same coordinator, otherwise it will error. Default: None. + + Returns: + A list of group descriptions. For now the group descriptions + are the raw results from the DescribeGroupsResponse. Long-term, we + plan to change this to return namedtuples as well as decoding the + partition assignments. + """ + return self._manager.run(self._async_describe_consumer_groups, group_ids, group_coordinator_id) + + # -- List consumer groups -------------------------------------------------- + + def _list_consumer_groups_request(self): + version = self._client.api_version(ListGroupsRequest, max_version=2) + return ListGroupsRequest[version]() + + def _list_consumer_groups_process_response(self, response): + """Process a ListGroupsResponse into a list of groups.""" + if response.API_VERSION <= 2: + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + raise error_type( + "ListGroupsRequest failed with response '{}'." + .format(response)) + else: + raise NotImplementedError( + "Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient." + .format(response.API_VERSION)) + return [(group.group_id, group.protocol_type) for group in response.groups] + + async def _async_list_consumer_groups(self, broker_ids=None): + if broker_ids is None: + broker_ids = [broker.node_id for broker in self._manager.cluster.brokers()] + consumer_groups = set() + for broker_id in broker_ids: + request = self._list_consumer_groups_request() + response = await self._manager.send(request, node_id=broker_id) + consumer_groups.update(self._list_consumer_groups_process_response(response)) + return list(consumer_groups) + + def list_consumer_groups(self, broker_ids=None): + """List all consumer groups known to the cluster. + + This returns a list of Consumer Group tuples. The tuples are + composed of the consumer group name and the consumer group protocol + type. + + Only consumer groups that store their offsets in Kafka are returned. + The protocol type will be an empty string for groups created using + Kafka < 0.9 APIs because, although they store their offsets in Kafka, + they don't use Kafka for group coordination. For groups created using + Kafka >= 0.9, the protocol type will typically be "consumer". + + As soon as any error is encountered, it is immediately raised. + + Keyword Arguments: + broker_ids ([int], optional): A list of broker node_ids to query for consumer + groups. If set to None, will query all brokers in the cluster. + Explicitly specifying broker(s) can be useful for determining which + consumer groups are coordinated by those broker(s). Default: None + + Returns: + list: List of tuples of Consumer Groups. + """ + return self._manager.run(self._async_list_consumer_groups, broker_ids) + + # -- List consumer group offsets ------------------------------------------- + + def _list_consumer_group_offsets_request(self, group_id, partitions=None): + version = self._client.api_version(OffsetFetchRequest, max_version=5) + if partitions is None: + if version <= 1: + raise ValueError( + """OffsetFetchRequest_v{} requires specifying the + partitions for which to fetch offsets. Omitting the + partitions is only supported on brokers >= 0.10.2. + For details, see KIP-88.""".format(version)) + topics_partitions = None + else: + topics_partitions_dict = defaultdict(set) + for topic, partition in partitions: + topics_partitions_dict[topic].add(partition) + topics_partitions = list(topics_partitions_dict.items()) + return OffsetFetchRequest[version](group_id, topics_partitions) + + def _list_consumer_group_offsets_process_response(self, response): + """Process an OffsetFetchResponse.""" + if response.API_VERSION <= 5: + if response.API_VERSION > 1: + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + raise error_type( + "OffsetFetchResponse failed with response '{}'." + .format(response)) + offsets = {} + for topic, partitions in response.topics: + for partition_data in partitions: + if response.API_VERSION <= 4: + partition, offset, metadata, error_code = partition_data + leader_epoch = -1 + else: + partition, offset, leader_epoch, metadata, error_code = partition_data + error_type = Errors.for_code(error_code) + if error_type is not Errors.NoError: + raise error_type( + "Unable to fetch consumer group offsets for topic {}, partition {}" + .format(topic, partition)) + offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata, leader_epoch) + else: + raise NotImplementedError( + "Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient." + .format(response.API_VERSION)) + return offsets + + async def _async_list_consumer_group_offsets(self, group_id, group_coordinator_id=None, partitions=None): + if group_coordinator_id is None: + group_coordinator_id = await self._find_coordinator_id(group_id) + request = self._list_consumer_group_offsets_request(group_id, partitions) + response = await self._manager.send(request, node_id=group_coordinator_id) + return self._list_consumer_group_offsets_process_response(response) + + def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, + partitions=None): + """Fetch Consumer Offsets for a single consumer group. + + Note: + This does not verify that the group_id or partitions actually exist + in the cluster. + + As soon as any error is encountered, it is immediately raised. + + Arguments: + group_id (str): The consumer group id name for which to fetch offsets. + + Keyword Arguments: + group_coordinator_id (int, optional): The node_id of the group's coordinator + broker. If set to None, will query the cluster to find the group + coordinator. Default: None. + partitions: A list of TopicPartitions for which to fetch + offsets. On brokers >= 0.10.2, this can be set to None to fetch all + known offsets for the consumer group. Default: None. + + Returns: + dictionary: A dictionary with TopicPartition keys and + OffsetAndMetadata values. + """ + return self._manager.run(self._async_list_consumer_group_offsets, group_id, group_coordinator_id, partitions) + + # -- Delete consumer groups ------------------------------------------------ + + def _delete_consumer_groups_request(self, group_ids): + version = self._client.api_version(DeleteGroupsRequest, max_version=1) + return DeleteGroupsRequest[version](group_ids) + + def _convert_delete_groups_response(self, response): + """Parse a DeleteGroupsResponse.""" + if response.API_VERSION <= 1: + results = [] + for group_id, error_code in response.results: + results.append((group_id, Errors.for_code(error_code))) + return results + else: + raise NotImplementedError( + "Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient." + .format(response.API_VERSION)) + + async def _async_delete_consumer_groups(self, group_ids, group_coordinator_id=None): + coordinators_groups = defaultdict(list) + if group_coordinator_id is not None: + coordinators_groups[group_coordinator_id] = group_ids + else: + for group_id in group_ids: + coordinator_id = await self._find_coordinator_id(group_id) + coordinators_groups[coordinator_id].append(group_id) + + results = [] + for coordinator_id, coordinator_group_ids in coordinators_groups.items(): + request = self._delete_consumer_groups_request(coordinator_group_ids) + response = await self._manager.send(request, node_id=coordinator_id) + results.extend(self._convert_delete_groups_response(response)) + return results + + def delete_consumer_groups(self, group_ids, group_coordinator_id=None): + """Delete Consumer Group Offsets for given consumer groups. + + Note: + This does not verify that the group ids actually exist and + group_coordinator_id is the correct coordinator for all these groups. + + The result needs checking for potential errors. + + Arguments: + group_ids ([str]): The consumer group ids of the groups which are to be deleted. + + Keyword Arguments: + group_coordinator_id (int, optional): The node_id of the broker which is + the coordinator for all the groups. Default: None. + + Returns: + A list of tuples (group_id, KafkaError) + """ + return self._manager.run(self._async_delete_consumer_groups, group_ids, group_coordinator_id) diff --git a/kafka/admin/_metadata.py b/kafka/admin/_metadata.py new file mode 100644 index 000000000..dfbcaab5e --- /dev/null +++ b/kafka/admin/_metadata.py @@ -0,0 +1,74 @@ +"""Cluster metadata mixin for KafkaAdminClient.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from kafka.admin._acls import valid_acl_operations +from kafka.protocol.metadata import MetadataRequest + +if TYPE_CHECKING: + from kafka.net.manager import KafkaConnectionManager + +log = logging.getLogger(__name__) + + +class MetadataAdminMixin: + """Mixin providing cluster metadata methods for KafkaAdminClient.""" + _manager: KafkaConnectionManager + + def _process_acl_operations(self, obj): + if obj.get('authorized_operations', None) is not None: + obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations']))) + return obj + + async def _get_cluster_metadata(self, topics): + """topics = [] for no topics, None for all.""" + request = MetadataRequest( + topics=[ + MetadataRequest.MetadataRequestTopic(name=topic) + for topic in topics] if topics is not None else None, + allow_auto_topic_creation=False, + include_cluster_authorized_operations=True, + include_topic_authorized_operations=True, + ) + response = await self._manager.send(request) + metadata = response.to_dict() + self._process_acl_operations(metadata) + for topic in metadata['topics']: + self._process_acl_operations(topic) + return metadata + + def list_topics(self): + """Retrieve a list of all topic names in the cluster. + + Returns: + A list of topic name strings. + """ + metadata = self._manager.run(self._get_cluster_metadata, None) + return [t['name'] for t in metadata['topics']] + + def describe_topics(self, topics=None): + """Fetch metadata for the specified topics or all topics if None. + + Keyword Arguments: + topics ([str], optional) A list of topic names. If None, metadata for all + topics is retrieved. + + Returns: + A list of dicts describing each topic (including partition info). + """ + metadata = self._manager.run(self._get_cluster_metadata, topics) + return metadata['topics'] + + def describe_cluster(self): + """Fetch cluster-wide metadata such as the list of brokers, the controller ID, + and the cluster ID. + + Returns: + A dict with cluster-wide metadata, excluding topic details. + """ + metadata = self._manager.run(self._get_cluster_metadata, []) + metadata.pop('topics') + return metadata diff --git a/kafka/admin/_records.py b/kafka/admin/_records.py new file mode 100644 index 000000000..feb8ee410 --- /dev/null +++ b/kafka/admin/_records.py @@ -0,0 +1,186 @@ +"""Record deletion and cluster operation mixin for KafkaAdminClient.""" + +from __future__ import annotations + +import logging +from collections import defaultdict +from typing import TYPE_CHECKING + +import kafka.errors as Errors +from kafka.errors import UnknownTopicOrPartitionError +from kafka.protocol.admin import DeleteRecordsRequest, DescribeLogDirsRequest, ElectLeadersRequest, ElectionType +from kafka.structs import TopicPartition + +if TYPE_CHECKING: + from kafka.net.manager import KafkaConnectionManager + +log = logging.getLogger(__name__) + + +class RecordAdminMixin: + """Mixin providing record deletion and cluster operation methods.""" + _manager: KafkaConnectionManager + _client: object + config: dict + + async def _async_get_leader_for_partitions(self, partitions): + """Finds ID of the leader node for every given topic partition.""" + partitions = set(partitions) + topics = set(tp.topic for tp in partitions) + + metadata = await self._get_cluster_metadata(topics) + + leader2partitions = defaultdict(list) + valid_partitions = set() + for topic in metadata.get("topics", ()): + for partition in topic.get("partitions", ()): + t2p = TopicPartition(topic=topic["name"], partition=partition["partition_index"]) + if t2p in partitions: + leader2partitions[partition["leader_id"]].append(t2p) + valid_partitions.add(t2p) + + if len(partitions) != len(valid_partitions): + unknown = set(partitions) - valid_partitions + raise UnknownTopicOrPartitionError( + "The following partitions are not known: %s" + % ", ".join(str(x) for x in unknown) + ) + + return leader2partitions + + async def _async_delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): + timeout_ms = self._validate_timeout(timeout_ms) + version = self._client.api_version(DeleteRecordsRequest, max_version=0) + + if partition_leader_id is None: + leader2partitions = await self._async_get_leader_for_partitions(set(records_to_delete)) + else: + leader2partitions = {partition_leader_id: set(records_to_delete)} + + responses = [] + for leader, partitions in leader2partitions.items(): + topic2partitions = defaultdict(list) + for partition in partitions: + topic2partitions[partition.topic].append(partition) + + request = DeleteRecordsRequest[version]( + topics=[ + (topic, [(tp.partition, records_to_delete[tp]) for tp in partitions]) + for topic, partitions in topic2partitions.items() + ], + timeout_ms=timeout_ms + ) + response = await self._manager.send(request, node_id=leader) + responses.append(response.to_dict()) + + partition2result = {} + partition2error = {} + for response in responses: + for topic in response["topics"]: + for partition in topic["partitions"]: + tp = TopicPartition(topic["name"], partition["partition_index"]) + partition2result[tp] = partition + if partition["error_code"] != 0: + partition2error[tp] = partition["error_code"] + + if partition2error: + if len(partition2error) == 1: + key, error = next(iter(partition2error.items())) + raise Errors.for_code(error)( + "Error deleting records from topic %s partition %s" % (key.topic, key.partition) + ) + else: + raise Errors.BrokerResponseError( + "The following errors occured when trying to delete records: " + + ", ".join( + "%s(partition=%d): %s" % + (partition.topic, partition.partition, Errors.for_code(error).__name__) + for partition, error in partition2error.items() + ) + ) + + return partition2result + + def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): + """Delete records whose offset is smaller than the given offset of the corresponding partition. + + Arguments: + records_to_delete ({TopicPartition: int}): The earliest available offsets for the + given partitions. + + Keyword Arguments: + timeout_ms (numeric, optional): Timeout in milliseconds. + partition_leader_id (node_id / int, optional): If specified, all deletion requests + will be sent to this node. + + Returns: + dict {topicPartition -> metadata} + """ + return self._manager.run(self._async_delete_records, records_to_delete, timeout_ms, partition_leader_id) + + @staticmethod + def _convert_topic_partitions(topic_partitions): + return [ + ( + topic, + partitions + ) + for topic, partitions in topic_partitions.items() + ] + + def _get_all_topic_partitions(self): + return [ + ( + topic['name'], + [p['partition_index'] for p in topic['partitions']] + ) + for topic in self.describe_topics() + ] + + def _get_topic_partitions(self, topic_partitions): + if topic_partitions is None: + return self._get_all_topic_partitions() + return self._convert_topic_partitions(topic_partitions) + + def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None, raise_errors=True): + """Trigger leader election for the specified topic partitions. + + Arguments: + election_type: Type of election to attempt. 0 for Preferred, 1 for Unclean + + Keyword Arguments: + topic_partitions (dict): A map of topic name strings to partition ids list. + By default, will run on all topic partitions + timeout_ms (num, optional): Milliseconds to wait for the leader election process. + raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. + + Returns: + Appropriate version of ElectLeadersResponse class. + """ + timeout_ms = self._validate_timeout(timeout_ms) + request = ElectLeadersRequest( + election_type=ElectionType(election_type), + topic_partitions=self._get_topic_partitions(topic_partitions), + timeout_ms=timeout_ms, + max_version=1, + ) + def response_errors(r): + if r.API_VERSION >= 1: + yield Errors.for_code(r.error_code) + for result in r.replica_election_results: + for partition in result[1]: + yield Errors.for_code(partition[1]) + ignore_errors = (Errors.ElectionNotNeededError,) + return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors) + + async def _async_describe_log_dirs(self): + version = self._client.api_version(DescribeLogDirsRequest, max_version=0) + return await self._manager.send(DescribeLogDirsRequest[version]()) + + def describe_log_dirs(self): + """Send a DescribeLogDirsRequest request to a broker. + + Returns: + DescribeLogDirsResponse object + """ + return self._manager.run(self._async_describe_log_dirs) diff --git a/kafka/admin/_topics.py b/kafka/admin/_topics.py new file mode 100644 index 000000000..c448efb00 --- /dev/null +++ b/kafka/admin/_topics.py @@ -0,0 +1,269 @@ +"""Topic management mixin for KafkaAdminClient. + +Also defines NewTopic and NewPartitions data classes. +""" + +from __future__ import annotations + +import logging +import time +from typing import TYPE_CHECKING + +import kafka.errors as Errors +from kafka.errors import IncompatibleBrokerVersion +from kafka.protocol.admin import CreateTopicsRequest, DeleteTopicsRequest, CreatePartitionsRequest + +if TYPE_CHECKING: + from kafka.net.manager import KafkaConnectionManager + +log = logging.getLogger(__name__) + + +class TopicAdminMixin: + """Mixin providing topic management methods for KafkaAdminClient.""" + _manager: KafkaConnectionManager + _client: object + config: dict + + @staticmethod + def _convert_new_topic_request(new_topic): + return ( + new_topic.name, + new_topic.num_partitions, + new_topic.replication_factor, + [ + (partition_id, replicas) for partition_id, replicas in new_topic.replica_assignments.items() + ], + [ + (config_key, config_value) for config_key, config_value in new_topic.topic_configs.items() + ] + ) + + def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True, + wait_for_metadata=False): + """Create new topics in the cluster. + + Arguments: + new_topics: A list of NewTopic objects. + + Keyword Arguments: + timeout_ms (numeric, optional): Milliseconds to wait for new topics to be created + before the broker returns. + validate_only (bool, optional): If True, don't actually create new topics. + Not supported by all versions. Default: False + raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. + wait_for_metadata (bool, optional): If True, block until each new topic is visible + in broker metadata with a leader assigned for every partition. Default: False + + Returns: CreateTopicResponse + """ + if validate_only and wait_for_metadata: + raise ValueError('validate_only and wait_for_metadata are mutually exclusive') + timeout_ms = self._validate_timeout(timeout_ms) + if validate_only and self._manager.broker_version < (0, 10, 2): + raise IncompatibleBrokerVersion( + "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." + .format(self._manager.broker_version)) + + request = CreateTopicsRequest( + topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], + timeout_ms=timeout_ms, + validate_only=validate_only, + max_version=3, + ) + def response_errors(r): + for topic in r.topics: + yield Errors.for_code(topic.error_code) + response = self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) + if wait_for_metadata: + self.wait_for_topics([new_topic.name for new_topic in new_topics]) + return response + + def wait_for_topics(self, topic_names, timeout_ms=10000): + """Block until each of the given topics is ready to use. + + CreateTopicsResponse only confirms that the broker accepted the create + request; propagating the new topics into the broker's metadata cache -- + and electing a leader for every partition -- can lag behind, especially + on KRaft clusters. This method polls :meth:`describe_topics` at a fixed + interval until every requested topic both: + + - is returned with ``error_code == 0``, and + - has ``error_code == 0`` and a leader assigned (``leader_id >= 0``) + for every partition. + + Arguments: + topic_names ([str]): Topic names to wait for. + + Keyword Arguments: + timeout_ms (numeric, optional): Maximum milliseconds to wait. + Default: 10000. + + Raises: + KafkaTimeoutError: if any topic is still not ready when the + deadline expires. + """ + if not topic_names: + return + topic_names = list(topic_names) + deadline = time.monotonic() + (timeout_ms / 1000.0) + pending = {name: 'not yet queried' for name in topic_names} + while True: + try: + topics = self.describe_topics(topics=topic_names) + except Exception as exc: + log.debug('describe_topics failed while waiting for topic visibility: %s', exc) + topics = [] + by_name = {t.get('name'): t for t in topics} + pending = {} + for name in topic_names: + reason = self._topic_not_ready_reason(by_name.get(name)) + if reason is not None: + pending[name] = reason + if not pending: + return + if time.monotonic() >= deadline: + raise Errors.KafkaTimeoutError( + 'Topics not ready after %sms: %s' % (timeout_ms, pending)) + time.sleep(0.1) + + @staticmethod + def _topic_not_ready_reason(topic_info): + """Return a string reason if ``topic_info`` isn't ready, else None.""" + if topic_info is None: + return 'missing from metadata response' + error_code = topic_info.get('error_code', 0) + if error_code != 0: + return Errors.for_code(error_code).__name__ + partitions = topic_info.get('partitions') or [] + if not partitions: + return 'no partitions reported' + bad = [] + for p in partitions: + p_err = p.get('error_code', 0) + idx = p.get('partition_index') + if p_err != 0: + bad.append('p%s=%s' % (idx, Errors.for_code(p_err).__name__)) + continue + if p.get('leader_id', -1) < 0: + bad.append('p%s=no leader' % idx) + if bad: + return ','.join(bad) + return None + + def delete_topics(self, topics, timeout_ms=None, raise_errors=True): + """Delete topics from the cluster. + + Arguments: + topics ([str]): A list of topic name strings. + + Keyword Arguments: + timeout_ms (numeric, optional): Milliseconds to wait for topics to be deleted + before the broker returns. + raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. + + Returns: + Appropriate version of DeleteTopicsResponse class. + """ + timeout_ms = self._validate_timeout(timeout_ms) + request = DeleteTopicsRequest( + topic_names=topics, timeout_ms=timeout_ms, + max_version=5, + ) + def response_errors(r): + for response in r.responses: + yield Errors.for_code(response.error_code) + return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) + + def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True): + """Create additional partitions for an existing topic. + + Arguments: + topic_partitions: A dict of topic name strings to total partition count (int), + or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}} + if manual assignment is desired. + dict of {topic_name: NewPartition} is deprecated. + + Keyword Arguments: + timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be + created before the broker returns. + validate_only (bool, optional): If True, don't actually create new partitions. + Default: False + raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. + + Returns: + Appropriate version of CreatePartitionsResponse class. + """ + timeout_ms = self._validate_timeout(timeout_ms) + _Topic = CreatePartitionsRequest.CreatePartitionsTopic + _Assignment = CreatePartitionsRequest.CreatePartitionsTopic.CreatePartitionsAssignment + topics = [] + for topic, count in topic_partitions.items(): + if isinstance(count, int): + topics.append(_Topic(name=topic, count=count)) + elif isinstance(count, dict): + topics.append( + _Topic( + name=topic, + count=count['count'], + assignments=[_Assignment(broker_ids=broker_ids) + for broker_ids in count['assignments']])) + + else: + topics.append( + _Topic( + name=topic, + count=count.total_count, + assignments=[_Assignment(broker_ids=broker_ids) + for broker_ids in count.new_assignments])) + request = CreatePartitionsRequest( + topics=topics, + timeout_ms=timeout_ms, + validate_only=validate_only) + + def response_errors(r): + for result in r.results: + yield Errors.for_code(result.error_code) + return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) + + +class NewTopic: + """A class for new topic creation. + + Arguments: + name (string): name of the topic + num_partitions (int): number of partitions, or -1 if + replica_assignment has been specified + replication_factor (int): replication factor, or -1 if + replica assignment is specified + replica_assignments (dict of int: [int]): A mapping containing + partition id and replicas to assign to it. + topic_configs (dict of str: str): A mapping of config key + and value for the topic. + """ + def __init__(self, name, num_partitions=-1, replication_factor=-1, + replica_assignments=None, topic_configs=None): + self.name = name + self.num_partitions = num_partitions + self.replication_factor = replication_factor + self.replica_assignments = replica_assignments or {} + self.topic_configs = topic_configs or {} + + +class NewPartitions: + """A class for new partition creation on existing topics. + + Note that the length of new_assignments, if specified, must be the + difference between the new total number of partitions and the existing + number of partitions. + + Arguments: + total_count (int): the total number of partitions that should exist + on the topic + new_assignments ([[int]]): an array of arrays of replica assignments + for new partitions. If not set, broker assigns replicas per an + internal algorithm. + """ + def __init__(self, total_count, new_assignments=None): + self.total_count = total_count + self.new_assignments = new_assignments diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py deleted file mode 100644 index 7cd505055..000000000 --- a/kafka/admin/acl_resource.py +++ /dev/null @@ -1,247 +0,0 @@ -from enum import IntEnum - -from kafka.errors import IllegalArgumentError - - -class ResourceType(IntEnum): - """Type of kafka resource to set ACL for - - The ANY value is only valid in a filter context - """ - - UNKNOWN = 0, - ANY = 1, - CLUSTER = 4, - DELEGATION_TOKEN = 6, - GROUP = 3, - TOPIC = 2, - TRANSACTIONAL_ID = 5 - - -class ACLOperation(IntEnum): - """Type of operation - - The ANY value is only valid in a filter context - """ - - UNKNOWN = 0, - ANY = 1, - ALL = 2, - READ = 3, - WRITE = 4, - CREATE = 5, - DELETE = 6, - ALTER = 7, - DESCRIBE = 8, - CLUSTER_ACTION = 9, - DESCRIBE_CONFIGS = 10, - ALTER_CONFIGS = 11, - IDEMPOTENT_WRITE = 12, - CREATE_TOKENS = 13, - DESCRIBE_TOKENS = 14 - - -class ACLPermissionType(IntEnum): - """An enumerated type of permissions - - The ANY value is only valid in a filter context - """ - - UNKNOWN = 0, - ANY = 1, - DENY = 2, - ALLOW = 3 - - -class ACLResourcePatternType(IntEnum): - """An enumerated type of resource patterns - - More details on the pattern types and how they work - can be found in KIP-290 (Support for prefixed ACLs) - https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs - """ - - UNKNOWN = 0, - ANY = 1, - MATCH = 2, - LITERAL = 3, - PREFIXED = 4 - - -class ACLFilter: - """Represents a filter to use with describing and deleting ACLs - - The difference between this class and the ACL class is mainly that - we allow using ANY with the operation, permission, and resource type objects - to fetch ALCs matching any of the properties. - - To make a filter matching any principal, set principal to None - """ - - def __init__( - self, - principal, - host, - operation, - permission_type, - resource_pattern - ): - self.principal = principal - self.host = host - self.operation = operation - self.permission_type = permission_type - self.resource_pattern = resource_pattern - - self.validate() - - def validate(self): - if not isinstance(self.operation, ACLOperation): - raise IllegalArgumentError("operation must be an ACLOperation object, and cannot be ANY") - if not isinstance(self.permission_type, ACLPermissionType): - raise IllegalArgumentError("permission_type must be an ACLPermissionType object, and cannot be ANY") - if not isinstance(self.resource_pattern, ResourcePatternFilter): - raise IllegalArgumentError("resource_pattern must be a ResourcePatternFilter object") - - def __repr__(self): - return "".format( - principal=self.principal, - host=self.host, - operation=self.operation.name, - type=self.permission_type.name, - resource=self.resource_pattern - ) - - def __eq__(self, other): - return all(( - self.principal == other.principal, - self.host == other.host, - self.operation == other.operation, - self.permission_type == other.permission_type, - self.resource_pattern == other.resource_pattern - )) - - def __hash__(self): - return hash(( - self.principal, - self.host, - self.operation, - self.permission_type, - self.resource_pattern, - )) - - -class ACL(ACLFilter): - """Represents a concrete ACL for a specific ResourcePattern - - In kafka an ACL is a 4-tuple of (principal, host, operation, permission_type) - that limits who can do what on a specific resource (or since KIP-290 a resource pattern) - - Terminology: - Principal -> This is the identifier for the user. Depending on the authorization method used (SSL, SASL etc) - the principal will look different. See http://kafka.apache.org/documentation/#security_authz for details. - The principal must be on the format "User:" or kafka will treat it as invalid. It's possible to use - other principal types than "User" if using a custom authorizer for the cluster. - Host -> This must currently be an IP address. It cannot be a range, and it cannot be a domain name. - It can be set to "*", which is special cased in kafka to mean "any host" - Operation -> Which client operation this ACL refers to. Has different meaning depending - on the resource type the ACL refers to. See https://docs.confluent.io/current/kafka/authorization.html#acl-format - for a list of which combinations of resource/operation that unlocks which kafka APIs - Permission Type: Whether this ACL is allowing or denying access - Resource Pattern -> This is a representation of the resource or resource pattern that the ACL - refers to. See the ResourcePattern class for details. - - """ - - def __init__( - self, - principal, - host, - operation, - permission_type, - resource_pattern - ): - super().__init__(principal, host, operation, permission_type, resource_pattern) - self.validate() - - def validate(self): - if self.operation == ACLOperation.ANY: - raise IllegalArgumentError("operation cannot be ANY") - if self.permission_type == ACLPermissionType.ANY: - raise IllegalArgumentError("permission_type cannot be ANY") - if not isinstance(self.resource_pattern, ResourcePattern): - raise IllegalArgumentError("resource_pattern must be a ResourcePattern object") - - -class ResourcePatternFilter: - def __init__( - self, - resource_type, - resource_name, - pattern_type - ): - self.resource_type = resource_type - self.resource_name = resource_name - self.pattern_type = pattern_type - - self.validate() - - def validate(self): - if not isinstance(self.resource_type, ResourceType): - raise IllegalArgumentError("resource_type must be a ResourceType object") - if not isinstance(self.pattern_type, ACLResourcePatternType): - raise IllegalArgumentError("pattern_type must be an ACLResourcePatternType object") - - def __repr__(self): - return "".format( - self.resource_type.name, - self.resource_name, - self.pattern_type.name - ) - - def __eq__(self, other): - return all(( - self.resource_type == other.resource_type, - self.resource_name == other.resource_name, - self.pattern_type == other.pattern_type, - )) - - def __hash__(self): - return hash(( - self.resource_type, - self.resource_name, - self.pattern_type - )) - - -class ResourcePattern(ResourcePatternFilter): - """A resource pattern to apply the ACL to - - Resource patterns are used to be able to specify which resources an ACL - describes in a more flexible way than just pointing to a literal topic name for example. - Since KIP-290 (kafka 2.0) it's possible to set an ACL for a prefixed resource name, which - can cut down considerably on the number of ACLs needed when the number of topics and - consumer groups start to grow. - The default pattern_type is LITERAL, and it describes a specific resource. This is also how - ACLs worked before the introduction of prefixed ACLs - """ - - def __init__( - self, - resource_type, - resource_name, - pattern_type=ACLResourcePatternType.LITERAL - ): - super().__init__(resource_type, resource_name, pattern_type) - self.validate() - - def validate(self): - if self.resource_type == ResourceType.ANY: - raise IllegalArgumentError("resource_type cannot be ANY") - if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]: - raise IllegalArgumentError( - "pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name) - ) - - -def valid_acl_operations(int_vals): - return set([ACLOperation(v) for v in int_vals if v not in (0, 1, 2)]) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 329db3f26..a91fc54e8 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1,40 +1,36 @@ -from collections import defaultdict +"""KafkaAdminClient — high-level Kafka cluster administration.""" + import copy -import itertools import logging import selectors import socket import time -import uuid - -from . import ConfigResourceType -from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \ - ACLResourcePatternType, valid_acl_operations -from kafka.net.compat import KafkaNetClient -from kafka.protocol.consumer.metadata import ( - ConsumerProtocolSubscription, ConsumerProtocolAssignment, ConsumerProtocolType, -) import kafka.errors as Errors -from kafka.errors import ( - IncompatibleBrokerVersion, KafkaConfigurationError, UnknownTopicOrPartitionError, - UnrecognizedBrokerVersion, IllegalArgumentError) -from kafka.future import Future +from kafka.errors import KafkaConfigurationError, UnrecognizedBrokerVersion from kafka.metrics import MetricConfig, Metrics -from kafka.protocol.admin import ( - CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, - ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, - DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest, ElectLeadersRequest, ElectionType) -from kafka.protocol.consumer import OffsetFetchRequest +from kafka.net.compat import KafkaNetClient from kafka.protocol.metadata import MetadataRequest, FindCoordinatorRequest -from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation from kafka.version import __version__ +from kafka.admin._acls import ACLAdminMixin +from kafka.admin._configs import ConfigAdminMixin +from kafka.admin._groups import GroupAdminMixin +from kafka.admin._metadata import MetadataAdminMixin +from kafka.admin._records import RecordAdminMixin +from kafka.admin._topics import TopicAdminMixin log = logging.getLogger(__name__) -class KafkaAdminClient: +class KafkaAdminClient( + TopicAdminMixin, + MetadataAdminMixin, + ACLAdminMixin, + ConfigAdminMixin, + GroupAdminMixin, + RecordAdminMixin, +): """A class for administering the Kafka cluster. Warning: @@ -240,16 +236,11 @@ def close(self): log.debug("KafkaAdminClient is now closed.") def _validate_timeout(self, timeout_ms): - """Validate the timeout is set or use the configuration default. - - Arguments: - timeout_ms: The timeout provided by api call, in milliseconds. - - Returns: - The timeout to use for the operation. - """ + """Validate the timeout is set or use the configuration default.""" return timeout_ms or self.config['request_timeout_ms'] + # -- Routing primitives (used by mixins) ---------------------------------- + async def _refresh_controller_id(self, timeout_ms=30000): """Determine the Kafka cluster controller.""" if self._manager.broker_version < (0, 10): @@ -274,15 +265,6 @@ async def _find_coordinator_id(self, group_id): Results are cached; subsequent calls for the same group_id return the cached value without a network round-trip. - - Arguments: - group_id (str): The consumer group ID. - - Returns: - int: The node_id of the group's coordinator broker. - - Raises: - Errors from the FindCoordinatorResponse (e.g., CoordinatorNotAvailableError). """ cached = self._coordinator_cache.get(group_id) if cached is not None: @@ -300,20 +282,8 @@ async def _find_coordinator_id(self, group_id): async def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), raise_errors=True, ignore_errors=()): """Send a Kafka protocol message to the cluster controller. - Will block until the message result is received. - - Arguments: - request: The message to send. - - Keyword Arguments: - get_errors_fn (func): Function to process response and return an iterable of Error types. - ignore_errors (tuple): Any non-zero error codes that should be ignored. Not used if raise_errors=False. - raise_errors (bool): Whether to raise unhandled errors (True, default) or return response with errors (False). - - Returns: - The Kafka protocol response for the message. + Retries once on NotControllerError after refreshing the controller id. """ - # retry in case our controller_id is out of date if self._controller_id is None or self._controller_id == -1: self._controller_id = await self._refresh_controller_id() @@ -334,1279 +304,3 @@ async def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), "Request '{}' failed with response '{}'." .format(request, response)) return response - - def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True, - wait_for_metadata=False): - """Create new topics in the cluster. - - Arguments: - new_topics: A list of topic names, - or a dict of {topic_name: {num_partitions:, replication_factor:, - assignments: {partition: [broker_ids]}, - configs: {key: value}}} - List of NewTopic objects is deprecated. - - Keyword Arguments: - timeout_ms (numeric, optional): Milliseconds to wait for new topics to be created - before the broker returns. - validate_only (bool, optional): If True, don't actually create new topics. - Not supported by all versions. Default: False - raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. - wait_for_metadata (bool, optional): If True, after the broker successfully - accepts the create request, block until each new topic is visible in - broker metadata with a leader assigned for every partition. Useful on - KRaft clusters, where CreateTopicsResponse returning NoError does not - guarantee that a subsequent MetadataRequest will see the topic. Has no - effect when ``validate_only`` is True. Uses a fixed 10-second timeout; - call :meth:`wait_for_topics` directly for finer control. - Mutually exclusive with validate_only. Default: False - - Returns: CreateTopicResponse - """ - if validate_only and wait_for_metadata: - raise ValueError('validate_only and wait_for_metadata are mutually exclusive') - timeout_ms = self._validate_timeout(timeout_ms) - if validate_only and self._manager.broker_version < (0, 10, 2): - raise IncompatibleBrokerVersion( - "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." - .format(self._manager.broker_version)) - - _Topic = CreateTopicsRequest.CreatableTopic - _Assignment = _Topic.CreatableReplicaAssignment - _Config = _Topic.CreatableTopicConfig - topics = [] - if isinstance(new_topics, dict): - # {topic_name: {num_partitions:, replication_factor:, assignments: {partition: [broker_ids]}, configs: {key: value}} - for topic, data in new_topics.items(): - configs = data.get('configs', {}) - topics.append(_Topic( - name=topic, - num_partitions=data.get('num_partitions', -1), - replication_factor=data.get('replication_factor', -1), - assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas) - for partition_id, replicas in data.get('assignments', {}).items()], - configs=[_Config(name=config_key, value=config_value) - for config_key, config_value in data.get('configs', {}).items()] - )) - elif all(isinstance(v, str) for v in new_topics): - for new_topic in new_topics: - topics.append(_Topic(name=new_topic)) - else: - from .new_topic import NewTopic - if all(isinstance(v, NewTopic) for v in new_topics): - for new_topic in new_topics: - topics.append(_Topic( - name=new_topic.name, - num_partitions=new_topic.num_partitions, - replication_factor=new_topic.replication_factor, - assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas) - for partition_id, replicas in new_topic.replica_assignments.items()], - configs=[_Config(name=config_key, value=config_value) - for config_key, config_value in new_topic.topic_configs.items()] - )) - if not topics: - raise ValueError(f"No valid topics found in new_topics: {new_topics}") - - request = CreateTopicsRequest( - topics=topics, - timeout_ms=timeout_ms, - validate_only=validate_only) - def response_errors(r): - for topic in r.topics: - yield Errors.for_code(topic.error_code) - response = self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) - if wait_for_metadata: # implies not validate_only - self.wait_for_topics([new_topic.name for new_topic in new_topics]) - return response - - def wait_for_topics(self, topic_names, timeout_ms=10000): - """Block until each of the given topics is ready to use. - - CreateTopicsResponse only confirms that the broker accepted the create - request; propagating the new topics into the broker's metadata cache -- - and electing a leader for every partition -- can lag behind, especially - on KRaft clusters. This method polls :meth:`describe_topics` at a fixed - interval until every requested topic both: - - - is returned with ``error_code == 0`` (topic exists and is - visible in metadata), and - - has ``error_code == 0`` and a leader assigned (``leader_id >= 0``) - for every partition. - - Useful after :meth:`create_topics` (including implicit creation via - ``allow_auto_topic_creation``) or after a delete+recreate sequence. - - Arguments: - topic_names ([str]): Topic names to wait for. - - Keyword Arguments: - timeout_ms (numeric, optional): Maximum milliseconds to wait. - Default: 10000. - - Raises: - KafkaTimeoutError: if any topic is still not ready when the - deadline expires. The exception message includes the - per-topic state from the final ``describe_topics`` call. - """ - if not topic_names: - return - topic_names = list(topic_names) - deadline = time.monotonic() + (timeout_ms / 1000.0) - pending = {name: 'not yet queried' for name in topic_names} - while True: - try: - topics = self.describe_topics(topics=topic_names) - except Exception as exc: - log.debug('describe_topics failed while waiting for topic visibility: %s', exc) - topics = [] - by_name = {t.get('name'): t for t in topics} - pending = {} - for name in topic_names: - reason = self._topic_not_ready_reason(by_name.get(name)) - if reason is not None: - pending[name] = reason - if not pending: - return - if time.monotonic() >= deadline: - raise Errors.KafkaTimeoutError( - 'Topics not ready after %sms: %s' % (timeout_ms, pending)) - time.sleep(0.1) - - @staticmethod - def _topic_not_ready_reason(topic_info): - """Return a string reason if ``topic_info`` isn't ready, else None.""" - if topic_info is None: - return 'missing from metadata response' - error_code = topic_info.get('error_code', 0) - if error_code != 0: - return Errors.for_code(error_code).__name__ - partitions = topic_info.get('partitions') or [] - if not partitions: - return 'no partitions reported' - bad = [] - for p in partitions: - p_err = p.get('error_code', 0) - idx = p.get('partition_index') - if p_err != 0: - bad.append('p%s=%s' % (idx, Errors.for_code(p_err).__name__)) - continue - if p.get('leader_id', -1) < 0: - bad.append('p%s=no leader' % idx) - if bad: - return ','.join(bad) - return None - - def delete_topics(self, topics, timeout_ms=None, raise_errors=True): - """Delete topics from the cluster. - - Arguments: - topics ([str]): A list of topic name strings or uuid.UUID ids. - - Keyword Arguments: - timeout_ms (numeric, optional): Milliseconds to wait for topics to be deleted - before the broker returns. - raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. - - Returns: - Appropriate version of DeleteTopicsResponse class. - """ - timeout_ms = self._validate_timeout(timeout_ms) - _Topic = DeleteTopicsRequest.DeleteTopicState - request = DeleteTopicsRequest( - topics=[_Topic(topic_id=topic) if isinstance(topic, uuid.UUID) else _Topic(name=topic) - for topic in topics], - timeout_ms=timeout_ms) - - def response_errors(r): - for response in r.responses: - yield Errors.for_code(response.error_code) - return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) - - def _process_acl_operations(self, obj): - if obj.get('authorized_operations', None) is not None: - obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations']))) - return obj - - async def _get_cluster_metadata(self, topics): - """topics = [] for no topics, None for all.""" - request = MetadataRequest( - topics=[ - MetadataRequest.MetadataRequestTopic(name=topic) - for topic in topics] if topics is not None else None, - allow_auto_topic_creation=False, - include_cluster_authorized_operations=True, - include_topic_authorized_operations=True, - ) - response = await self._manager.send(request) - metadata = response.to_dict() - self._process_acl_operations(metadata) - for topic in metadata['topics']: - self._process_acl_operations(topic) - return metadata - - def list_topics(self): - """Retrieve a list of all topic names in the cluster. - - Returns: - A list of topic name strings. - """ - metadata = self._manager.run(self._get_cluster_metadata, None) # None => request all topics - return [t['name'] for t in metadata['topics']] - - def describe_topics(self, topics=None): - """Fetch metadata for the specified topics or all topics if None. - - Keyword Arguments: - topics ([str], optional) A list of topic names. If None, metadata for all - topics is retrieved. - - Returns: - A list of dicts describing each topic (including partition info). - """ - metadata = self._manager.run(self._get_cluster_metadata, topics) - return metadata['topics'] - - def describe_cluster(self): - """Fetch cluster-wide metadata such as the list of brokers, the controller ID, - and the cluster ID. - - Returns: - A dict with cluster-wide metadata, excluding topic details. - """ - metadata = self._manager.run(self._get_cluster_metadata, []) # [] => no topics - metadata.pop('topics') # We have 'describe_topics' for this - return metadata - - @staticmethod - def _convert_describe_acls_response_to_acls(describe_response): - """Convert a DescribeAclsResponse into a list of ACL objects and a KafkaError. - - Arguments: - describe_response: The response object from the DescribeAclsRequest. - - Returns: - A tuple of (list_of_acl_objects, error) where error is an instance - of KafkaError (NoError if successful). - """ - version = describe_response.API_VERSION - - error = Errors.for_code(describe_response.error_code) - acl_list = [] - for resources in describe_response.resources: - if version == 0: - resource_type, resource_name, acls = resources - resource_pattern_type = ACLResourcePatternType.LITERAL.value - elif version <= 1: - resource_type, resource_name, resource_pattern_type, acls = resources - else: - raise NotImplementedError( - "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin." - .format(version) - ) - for acl in acls: - principal, host, operation, permission_type = acl - conv_acl = ACL( - principal=principal, - host=host, - operation=ACLOperation(operation), - permission_type=ACLPermissionType(permission_type), - resource_pattern=ResourcePattern( - ResourceType(resource_type), - resource_name, - ACLResourcePatternType(resource_pattern_type) - ) - ) - acl_list.append(conv_acl) - - return (acl_list, error,) - - def describe_acls(self, acl_filter): - """Describe a set of ACLs - - Used to return a set of ACLs matching the supplied ACLFilter. - The cluster must be configured with an authorizer for this to work, or - you will get a SecurityDisabledError - - Arguments: - acl_filter: an ACLFilter object - - Returns: - tuple of a list of matching ACL objects and a KafkaError (NoError if successful) - """ - - version = self._client.api_version(DescribeAclsRequest, max_version=1) - if version == 0: - request = DescribeAclsRequest[version]( - resource_type_filter=acl_filter.resource_pattern.resource_type, - resource_name_filter=acl_filter.resource_pattern.resource_name, - principal_filter=acl_filter.principal, - host_filter=acl_filter.host, - operation=acl_filter.operation, - permission_type=acl_filter.permission_type - ) - elif version <= 1: - request = DescribeAclsRequest[version]( - resource_type_filter=acl_filter.resource_pattern.resource_type, - resource_name_filter=acl_filter.resource_pattern.resource_name, - pattern_type_filter=acl_filter.resource_pattern.pattern_type, - principal_filter=acl_filter.principal, - host_filter=acl_filter.host, - operation=acl_filter.operation, - permission_type=acl_filter.permission_type - ) - response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 - error_type = Errors.for_code(response.error_code) - if error_type is not Errors.NoError: - # optionally we could retry if error_type.retriable - raise error_type( - "Request '{}' failed with response '{}'." - .format(request, response)) - - return self._convert_describe_acls_response_to_acls(response) - - @staticmethod - def _convert_create_acls_resource_request_v0(acl): - """Convert an ACL object into the CreateAclsRequest v0 format. - - Arguments: - acl: An ACL object with resource pattern and permissions. - - Returns: - A tuple: (resource_type, resource_name, principal, host, operation, permission_type). - """ - - return ( - acl.resource_pattern.resource_type, - acl.resource_pattern.resource_name, - acl.principal, - acl.host, - acl.operation, - acl.permission_type - ) - - @staticmethod - def _convert_create_acls_resource_request_v1(acl): - """Convert an ACL object into the CreateAclsRequest v1 format. - - Arguments: - acl: An ACL object with resource pattern and permissions. - - Returns: - A tuple: (resource_type, resource_name, pattern_type, principal, host, operation, permission_type). - """ - return ( - acl.resource_pattern.resource_type, - acl.resource_pattern.resource_name, - acl.resource_pattern.pattern_type, - acl.principal, - acl.host, - acl.operation, - acl.permission_type - ) - - @staticmethod - def _convert_create_acls_response_to_acls(acls, create_response): - """Parse CreateAclsResponse and correlate success/failure with original ACL objects. - - Arguments: - acls: A list of ACL objects that were requested for creation. - create_response: The broker's CreateAclsResponse object. - - Returns: - A dict with: - { - 'succeeded': [list of ACL objects successfully created], - 'failed': [(acl_object, KafkaError), ...] - } - """ - version = create_response.API_VERSION - - creations_error = [] - creations_success = [] - for i, creations in enumerate(create_response.results): - if version <= 1: - error_code, error_message = creations - acl = acls[i] - error = Errors.for_code(error_code) - else: - raise NotImplementedError( - "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin." - .format(version) - ) - - if error is Errors.NoError: - creations_success.append(acl) - else: - creations_error.append((acl, error,)) - - return {"succeeded": creations_success, "failed": creations_error} - - def create_acls(self, acls): - """Create a list of ACLs - - This endpoint only accepts a list of concrete ACL objects, no ACLFilters. - Throws TopicAlreadyExistsError if topic is already present. - - Arguments: - acls: a list of ACL objects - - Returns: - dict of successes and failures - """ - - for acl in acls: - if not isinstance(acl, ACL): - raise IllegalArgumentError("acls must contain ACL objects") - - version = self._client.api_version(CreateAclsRequest, max_version=1) - if version == 0: - request = CreateAclsRequest[version]( - creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls] - ) - elif version <= 1: - request = CreateAclsRequest[version]( - creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls] - ) - response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 - return self._convert_create_acls_response_to_acls(acls, response) - - @staticmethod - def _convert_delete_acls_resource_request_v0(acl): - """Convert an ACLFilter object into the DeleteAclsRequest v0 format. - - Arguments: - acl: An ACLFilter object identifying the ACLs to be deleted. - - Returns: - A tuple: (resource_type, resource_name, principal, host, operation, permission_type). - """ - return ( - acl.resource_pattern.resource_type, - acl.resource_pattern.resource_name, - acl.principal, - acl.host, - acl.operation, - acl.permission_type - ) - - @staticmethod - def _convert_delete_acls_resource_request_v1(acl): - """Convert an ACLFilter object into the DeleteAclsRequest v1 format. - - Arguments: - acl: An ACLFilter object identifying the ACLs to be deleted. - - Returns: - A tuple: (resource_type, resource_name, pattern_type, principal, host, operation, permission_type). - """ - return ( - acl.resource_pattern.resource_type, - acl.resource_pattern.resource_name, - acl.resource_pattern.pattern_type, - acl.principal, - acl.host, - acl.operation, - acl.permission_type - ) - - @staticmethod - def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response): - """Parse the DeleteAclsResponse and map the results back to each input ACLFilter. - - Arguments: - acl_filters: A list of ACLFilter objects that were provided in the request. - delete_response: The response from the DeleteAclsRequest. - - Returns: - A list of tuples of the form: - (acl_filter, [(matching_acl, KafkaError), ...], filter_level_error). - """ - version = delete_response.API_VERSION - filter_result_list = [] - for i, filter_responses in enumerate(delete_response.filter_results): - filter_error_code, filter_error_message, matching_acls = filter_responses - filter_error = Errors.for_code(filter_error_code) - acl_result_list = [] - for acl in matching_acls: - if version == 0: - error_code, error_message, resource_type, resource_name, principal, host, operation, permission_type = acl - resource_pattern_type = ACLResourcePatternType.LITERAL.value - elif version == 1: - error_code, error_message, resource_type, resource_name, resource_pattern_type, principal, host, operation, permission_type = acl - else: - raise NotImplementedError( - "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin." - .format(version) - ) - acl_error = Errors.for_code(error_code) - conv_acl = ACL( - principal=principal, - host=host, - operation=ACLOperation(operation), - permission_type=ACLPermissionType(permission_type), - resource_pattern=ResourcePattern( - ResourceType(resource_type), - resource_name, - ACLResourcePatternType(resource_pattern_type) - ) - ) - acl_result_list.append((conv_acl, acl_error,)) - filter_result_list.append((acl_filters[i], acl_result_list, filter_error,)) - return filter_result_list - - def delete_acls(self, acl_filters): - """Delete a set of ACLs - - Deletes all ACLs matching the list of input ACLFilter - - Arguments: - acl_filters: a list of ACLFilter - - Returns: - a list of 3-tuples corresponding to the list of input filters. - The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance) - """ - - for acl in acl_filters: - if not isinstance(acl, ACLFilter): - raise IllegalArgumentError("acl_filters must contain ACLFilter type objects") - - version = self._client.api_version(DeleteAclsRequest, max_version=1) - - if version == 0: - request = DeleteAclsRequest[version]( - filters=[self._convert_delete_acls_resource_request_v0(acl) for acl in acl_filters] - ) - elif version <= 1: - request = DeleteAclsRequest[version]( - filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters] - ) - response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 - return self._convert_delete_acls_response_to_matching_acls(acl_filters, response) - - @staticmethod - def _convert_describe_config_resource_request(config_resource): - """Convert a ConfigResource into the format required by DescribeConfigsRequest. - - Arguments: - config_resource: A ConfigResource with resource_type, name, and optional config keys. - - Returns: - A tuple: (resource_type, resource_name, [list_of_config_keys] or None). - """ - return ( - config_resource.resource_type, - config_resource.name, - [ - config_key for config_key, config_value in config_resource.configs.items() - ] if config_resource.configs else None - ) - - async def _async_describe_configs(self, config_resources, include_synonyms=False): - # Break up requests by type - a broker config request must be sent to the specific broker. - # All other (currently just topic resources) can be sent to any broker. - broker_resources = [] - topic_resources = [] - - for config_resource in config_resources: - if config_resource.resource_type == ConfigResourceType.BROKER: - broker_resources.append(self._convert_describe_config_resource_request(config_resource)) - else: - topic_resources.append(self._convert_describe_config_resource_request(config_resource)) - - version = self._client.api_version(DescribeConfigsRequest, max_version=2) - if include_synonyms and version == 0: - raise IncompatibleBrokerVersion( - "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}." - .format(self._manager.broker_version)) - - results = [] - for broker_resource in broker_resources: - try: - broker_id = int(broker_resource[1]) - except ValueError: - raise ValueError("Broker resource names must be an integer or a string represented integer") - if version == 0: - request = DescribeConfigsRequest[version](resources=[broker_resource]) - else: - request = DescribeConfigsRequest[version]( - resources=[broker_resource], - include_synonyms=include_synonyms) - results.append(await self._manager.send(request, node_id=broker_id)) - - if topic_resources: - if version == 0: - request = DescribeConfigsRequest[version](resources=topic_resources) - else: - request = DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms) - results.append(await self._manager.send(request)) - - return results - - def describe_configs(self, config_resources, include_synonyms=False): - """Fetch configuration parameters for one or more Kafka resources. - - Arguments: - config_resources: An list of ConfigResource objects. - Any keys in ConfigResource.configs dict will be used to filter the - result. Setting the configs dict to None will get all values. An - empty dict will get zero values (as per Kafka protocol). - - Keyword Arguments: - include_synonyms (bool, optional): If True, return synonyms in response. Not - supported by all versions. Default: False. - - Returns: - List of DescribeConfigsResponses. - """ - return self._manager.run(self._async_describe_configs, config_resources, include_synonyms) - - @staticmethod - def _convert_alter_config_resource_request(config_resource): - """Convert a ConfigResource into the format required by AlterConfigsRequest. - - Arguments: - config_resource: A ConfigResource with resource_type, name, and config (key, value) pairs. - - Returns: - A tuple: (resource_type, resource_name, [(config_key, config_value), ...]). - """ - return ( - config_resource.resource_type, - config_resource.name, - [ - (config_key, config_value) for config_key, config_value in config_resource.configs.items() - ] - ) - - async def _async_alter_configs(self, config_resources): - version = self._client.api_version(AlterConfigsRequest, max_version=1) - request = AlterConfigsRequest[version]( - resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] - ) - # TODO the Java client has the note: - # // We must make a separate AlterConfigs request for every BROKER resource we want to alter - # // and send the request to that specific broker. Other resources are grouped together into - # // a single request that may be sent to any broker. - # - # So this is currently broken as it always sends to the least_loaded_node() - return await self._manager.send(request) - - def alter_configs(self, config_resources): - """Alter configuration parameters of one or more Kafka resources. - - Warning: - This is currently broken for BROKER resources because those must be - sent to that specific broker, versus this always picks the - least-loaded node. See the comment in the source code for details. - We would happily accept a PR fixing this. - - Arguments: - config_resources: A list of ConfigResource objects. - - Returns: - Appropriate version of AlterConfigsResponse class. - """ - return self._manager.run(self._async_alter_configs, config_resources) - - # alter replica logs dir protocol not yet implemented - # Note: have to lookup the broker with the replica assignment and send the request to that broker - - # describe log dirs protocol not yet implemented - # Note: have to lookup the broker with the replica assignment and send the request to that broker - - def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True): - """Create additional partitions for an existing topic. - - Arguments: - topic_partitions: A dict of topic name strings to total partition count (int), - or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}} - if manual assignment is desired. - dict of {topic_name: NewPartition} is deprecated. - - Keyword Arguments: - timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be - created before the broker returns. - validate_only (bool, optional): If True, don't actually create new partitions. - Default: False - raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. - - Returns: - Appropriate version of CreatePartitionsResponse class. - """ - timeout_ms = self._validate_timeout(timeout_ms) - _Topic = CreatePartitionsRequest.CreatePartitionsTopic - _Assignment = CreatePartitionsRequest.CreatePartitionsTopic.CreatePartitionsAssignment - topics = [] - for topic, count in topic_partitions.items(): - if isinstance(count, int): - topics.append(_Topic(name=topic, count=count)) - elif isinstance(count, dict): - topics.append( - _Topic( - name=topic, - count=count['count'], - assignments=[_Assignment(broker_ids=broker_ids) - for broker_ids in count['assignments']])) - - else: - topics.append( - _Topic( - name=topic, - count=count.total_count, - assignments=[_Assignment(broker_ids=broker_ids) - for broker_ids in count.new_assignments])) - request = CreatePartitionsRequest( - topics=topics, - timeout_ms=timeout_ms, - validate_only=validate_only) - - def response_errors(r): - for result in r.results: - yield Errors.for_code(result.error_code) - return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) - - async def _async_get_leader_for_partitions(self, partitions): - """Finds ID of the leader node for every given topic partition.""" - partitions = set(partitions) - topics = set(tp.topic for tp in partitions) - - metadata = await self._get_cluster_metadata(topics) - - leader2partitions = defaultdict(list) - valid_partitions = set() - for topic in metadata.get("topics", ()): - for partition in topic.get("partitions", ()): - t2p = TopicPartition(topic=topic["name"], partition=partition["partition_index"]) - if t2p in partitions: - leader2partitions[partition["leader_id"]].append(t2p) - valid_partitions.add(t2p) - - if len(partitions) != len(valid_partitions): - unknown = set(partitions) - valid_partitions - raise UnknownTopicOrPartitionError( - "The following partitions are not known: %s" - % ", ".join(str(x) for x in unknown) - ) - - return leader2partitions - - async def _async_delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): - timeout_ms = self._validate_timeout(timeout_ms) - version = self._client.api_version(DeleteRecordsRequest, max_version=0) - - if partition_leader_id is None: - leader2partitions = await self._async_get_leader_for_partitions(set(records_to_delete)) - else: - leader2partitions = {partition_leader_id: set(records_to_delete)} - - responses = [] - for leader, partitions in leader2partitions.items(): - topic2partitions = defaultdict(list) - for partition in partitions: - topic2partitions[partition.topic].append(partition) - - request = DeleteRecordsRequest[version]( - topics=[ - (topic, [(tp.partition, records_to_delete[tp]) for tp in partitions]) - for topic, partitions in topic2partitions.items() - ], - timeout_ms=timeout_ms - ) - response = await self._manager.send(request, node_id=leader) - responses.append(response.to_dict()) - - partition2result = {} - partition2error = {} - for response in responses: - for topic in response["topics"]: - for partition in topic["partitions"]: - tp = TopicPartition(topic["name"], partition["partition_index"]) - partition2result[tp] = partition - if partition["error_code"] != 0: - partition2error[tp] = partition["error_code"] - - if partition2error: - if len(partition2error) == 1: - key, error = next(iter(partition2error.items())) - raise Errors.for_code(error)( - "Error deleting records from topic %s partition %s" % (key.topic, key.partition) - ) - else: - raise Errors.BrokerResponseError( - "The following errors occured when trying to delete records: " + - ", ".join( - "%s(partition=%d): %s" % - (partition.topic, partition.partition, Errors.for_code(error).__name__) - for partition, error in partition2error.items() - ) - ) - - return partition2result - - def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): - """Delete records whose offset is smaller than the given offset of the corresponding partition. - - Arguments: - records_to_delete ({TopicPartition: int}): The earliest available offsets for the - given partitions. - - Keyword Arguments: - timeout_ms (numeric, optional): Timeout in milliseconds, if None (default), will be read from - config. - partition_leader_id (node_id / int, optional): If specified, all deletion requests will be sent to - this node. No check is performed verifying that this is indeed the leader for all - listed partitions: use with caution. - - Returns: - dict {topicPartition -> metadata}, where metadata is returned by the broker. - See DeleteRecordsResponse for possible fields. error_code for all partitions is - guaranteed to be zero, otherwise an exception is raised. - """ - return self._manager.run(self._async_delete_records, records_to_delete, timeout_ms, partition_leader_id) - - # create delegation token protocol not yet implemented - # Note: send the request to the least_loaded_node() - - # renew delegation token protocol not yet implemented - # Note: send the request to the least_loaded_node() - - # expire delegation_token protocol not yet implemented - # Note: send the request to the least_loaded_node() - - # describe delegation_token protocol not yet implemented - # Note: send the request to the least_loaded_node() - - def _describe_consumer_groups_request(self, group_id): - """Send a DescribeGroupsRequest to the group's coordinator. - - Arguments: - group_id: The group name as a string - - Returns: - DescribeGroupsRequest object - """ - version = self._client.api_version(DescribeGroupsRequest, max_version=3) - if version <= 2: - # Note: KAFKA-6788 A potential optimization is to group the - # request per coordinator and send one request with a list of - # all consumer groups. Java still hasn't implemented this - # because the error checking is hard to get right when some - # groups error and others don't. - request = DescribeGroupsRequest[version](groups=(group_id,)) - else: - request = DescribeGroupsRequest[version]( - groups=(group_id,), - include_authorized_operations=True - ) - return request - - def _describe_consumer_groups_process_response(self, response): - """Process a DescribeGroupsResponse into a group description.""" - if response.API_VERSION > 3: - raise NotImplementedError( - "Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient." - .format(response.API_VERSION)) - - assert len(response.groups) == 1 - for response_name, response_field in response.fields.items(): - if response_name == 'groups': - described_group = getattr(response, response_name)[0] - described_group_information_list = [] - protocol_type_is_consumer = False - for group_information_name, group_information_field in response_field.fields.items(): - if not group_information_field.for_version_q(response.API_VERSION): - continue - described_group_information = getattr(described_group, group_information_name) - if group_information_name == 'protocol_type': - protocol_type = described_group_information - protocol_type_is_consumer = (protocol_type == ConsumerProtocolType or not protocol_type) - if group_information_name == 'members': - member_information_list = [] - for member in described_group_information: - member_information = [] - for attr_name, attr_field in group_information_field.fields.items(): - if not attr_field.for_version_q(response.API_VERSION): - continue - attr_val = getattr(member, attr_name) - if protocol_type_is_consumer: - if attr_name == 'member_metadata' and attr_val: - member_information.append(ConsumerProtocolSubscription.decode(attr_val)) - elif attr_name == 'member_assignment' and attr_val: - member_information.append(ConsumerProtocolAssignment.decode(attr_val)) - else: - member_information.append(attr_val) - member_info_tuple = MemberInformation._make(member_information) - member_information_list.append(member_info_tuple) - described_group_information_list.append(member_information_list) - else: - described_group_information_list.append(described_group_information) - # Version 3 of the DescribeGroups API introduced the "authorized_operations" field. - if response.API_VERSION >= 3: - described_group_information_list[-1] = list(map(lambda acl: acl.name, valid_acl_operations(described_group_information_list[-1]))) - else: - # TODO: Fix GroupInformation defaults - described_group_information_list.append([]) - group_description = GroupInformation._make(described_group_information_list) - error_code = group_description.error_code - error_type = Errors.for_code(error_code) - # Java has the note: KAFKA-6789, we can retry based on the error code - if error_type is not Errors.NoError: - raise error_type( - "DescribeGroupsResponse failed with response '{}'." - .format(response)) - return group_description - assert False, "DescribeGroupsResponse parsing failed" - - async def _async_describe_consumer_groups(self, group_ids, group_coordinator_id=None): - results = [] - for group_id in group_ids: - coordinator_id = group_coordinator_id or await self._find_coordinator_id(group_id) - request = self._describe_consumer_groups_request(group_id) - response = await self._manager.send(request, node_id=coordinator_id) - results.append(self._describe_consumer_groups_process_response(response)) - return results - - def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False): - """Describe a set of consumer groups. - - Any errors are immediately raised. - - Arguments: - group_ids: A list of consumer group IDs. These are typically the - group names as strings. - - Keyword Arguments: - group_coordinator_id (int, optional): The node_id of the groups' coordinator - broker. If set to None, it will query the cluster for each group to - find that group's coordinator. Explicitly specifying this can be - useful for avoiding extra network round trips if you already know - the group coordinator. This is only useful when all the group_ids - have the same coordinator, otherwise it will error. Default: None. - - Returns: - A list of group descriptions. For now the group descriptions - are the raw results from the DescribeGroupsResponse. Long-term, we - plan to change this to return namedtuples as well as decoding the - partition assignments. - """ - return self._manager.run(self._async_describe_consumer_groups, group_ids, group_coordinator_id) - - def _list_consumer_groups_request(self): - """Send a ListGroupsRequest to a broker. - - Returns: - ListGroupsRequest object - """ - version = self._client.api_version(ListGroupsRequest, max_version=2) - return ListGroupsRequest[version]() - - def _list_consumer_groups_process_response(self, response): - """Process a ListGroupsResponse into a list of groups.""" - if response.API_VERSION <= 2: - error_type = Errors.for_code(response.error_code) - if error_type is not Errors.NoError: - raise error_type( - "ListGroupsRequest failed with response '{}'." - .format(response)) - else: - raise NotImplementedError( - "Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient." - .format(response.API_VERSION)) - return [(group.group_id, group.protocol_type) for group in response.groups] - - async def _async_list_consumer_groups(self, broker_ids=None): - if broker_ids is None: - broker_ids = [broker.node_id for broker in self._manager.cluster.brokers()] - # Use a set to prevent duplicates if a group coordinator migrates - # between brokers mid-query. - consumer_groups = set() - for broker_id in broker_ids: - request = self._list_consumer_groups_request() - response = await self._manager.send(request, node_id=broker_id) - consumer_groups.update(self._list_consumer_groups_process_response(response)) - return list(consumer_groups) - - def list_consumer_groups(self, broker_ids=None): - """List all consumer groups known to the cluster. - - This returns a list of Consumer Group tuples. The tuples are - composed of the consumer group name and the consumer group protocol - type. - - Only consumer groups that store their offsets in Kafka are returned. - The protocol type will be an empty string for groups created using - Kafka < 0.9 APIs because, although they store their offsets in Kafka, - they don't use Kafka for group coordination. For groups created using - Kafka >= 0.9, the protocol type will typically be "consumer". - - As soon as any error is encountered, it is immediately raised. - - Keyword Arguments: - broker_ids ([int], optional): A list of broker node_ids to query for consumer - groups. If set to None, will query all brokers in the cluster. - Explicitly specifying broker(s) can be useful for determining which - consumer groups are coordinated by those broker(s). Default: None - - Returns: - list: List of tuples of Consumer Groups. - - Raises: - CoordinatorNotAvailableError: The coordinator is not - available, so cannot process requests. - CoordinatorLoadInProgressError: The coordinator is loading and - hence can't process requests. - """ - return self._manager.run(self._async_list_consumer_groups, broker_ids) - - def _list_consumer_group_offsets_request(self, group_id, partitions=None): - """Send an OffsetFetchRequest to a broker. - - Arguments: - group_id (str): The consumer group id name for which to fetch offsets. - - Keyword Arguments: - partitions: A list of TopicPartitions for which to fetch - offsets. On brokers >= 0.10.2, this can be set to None to fetch all - known offsets for the consumer group. Default: None. - - Returns: - OffsetFetchRequest object - """ - version = self._client.api_version(OffsetFetchRequest, max_version=5) - if partitions is None: - if version <= 1: - raise ValueError( - """OffsetFetchRequest_v{} requires specifying the - partitions for which to fetch offsets. Omitting the - partitions is only supported on brokers >= 0.10.2. - For details, see KIP-88.""".format(version)) - topics_partitions = None - else: - # transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])] - topics_partitions_dict = defaultdict(set) - for topic, partition in partitions: - topics_partitions_dict[topic].add(partition) - topics_partitions = list(topics_partitions_dict.items()) - return OffsetFetchRequest[version](group_id, topics_partitions) - - def _list_consumer_group_offsets_process_response(self, response): - """Process an OffsetFetchResponse. - - Arguments: - response: an OffsetFetchResponse. - - Returns: - A dictionary composed of TopicPartition keys and - OffsetAndMetadata values. - """ - if response.API_VERSION <= 5: - - # OffsetFetchResponse_v1 lacks a top-level error_code - if response.API_VERSION > 1: - error_type = Errors.for_code(response.error_code) - if error_type is not Errors.NoError: - # optionally we could retry if error_type.retriable - raise error_type( - "OffsetFetchResponse failed with response '{}'." - .format(response)) - - # transform response into a dictionary with TopicPartition keys and - # OffsetAndMetadata values--this is what the Java AdminClient returns - offsets = {} - for topic, partitions in response.topics: - for partition_data in partitions: - if response.API_VERSION <= 4: - partition, offset, metadata, error_code = partition_data - leader_epoch = -1 - else: - partition, offset, leader_epoch, metadata, error_code = partition_data - error_type = Errors.for_code(error_code) - if error_type is not Errors.NoError: - raise error_type( - "Unable to fetch consumer group offsets for topic {}, partition {}" - .format(topic, partition)) - offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata, leader_epoch) - else: - raise NotImplementedError( - "Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient." - .format(response.API_VERSION)) - return offsets - - async def _async_list_consumer_group_offsets(self, group_id, group_coordinator_id=None, partitions=None): - if group_coordinator_id is None: - group_coordinator_id = await self._find_coordinator_id(group_id) - request = self._list_consumer_group_offsets_request(group_id, partitions) - response = await self._manager.send(request, node_id=group_coordinator_id) - return self._list_consumer_group_offsets_process_response(response) - - def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, - partitions=None): - """Fetch Consumer Offsets for a single consumer group. - - Note: - This does not verify that the group_id or partitions actually exist - in the cluster. - - As soon as any error is encountered, it is immediately raised. - - Arguments: - group_id (str): The consumer group id name for which to fetch offsets. - - Keyword Arguments: - group_coordinator_id (int, optional): The node_id of the group's coordinator - broker. If set to None, will query the cluster to find the group - coordinator. Explicitly specifying this can be useful to prevent - that extra network round trip if you already know the group - coordinator. Default: None. - partitions: A list of TopicPartitions for which to fetch - offsets. On brokers >= 0.10.2, this can be set to None to fetch all - known offsets for the consumer group. Default: None. - - Returns: - dictionary: A dictionary with TopicPartition keys and - OffsetAndMetadata values. Partitions that are not specified and for - which the group_id does not have a recorded offset are omitted. An - offset value of `-1` indicates the group_id has no offset for that - TopicPartition. A `-1` can only happen for partitions that are - explicitly specified. - """ - return self._manager.run(self._async_list_consumer_group_offsets, group_id, group_coordinator_id, partitions) - - async def _async_delete_consumer_groups(self, group_ids, group_coordinator_id=None): - coordinators_groups = defaultdict(list) - if group_coordinator_id is not None: - coordinators_groups[group_coordinator_id] = group_ids - else: - for group_id in group_ids: - coordinator_id = await self._find_coordinator_id(group_id) - coordinators_groups[coordinator_id].append(group_id) - - results = [] - for coordinator_id, coordinator_group_ids in coordinators_groups.items(): - request = self._delete_consumer_groups_request(coordinator_group_ids) - response = await self._manager.send(request, node_id=coordinator_id) - results.extend(self._convert_delete_groups_response(response)) - return results - - def delete_consumer_groups(self, group_ids, group_coordinator_id=None): - """Delete Consumer Group Offsets for given consumer groups. - - Note: - This does not verify that the group ids actually exist and - group_coordinator_id is the correct coordinator for all these groups. - - The result needs checking for potential errors. - - Arguments: - group_ids ([str]): The consumer group ids of the groups which are to be deleted. - - Keyword Arguments: - group_coordinator_id (int, optional): The node_id of the broker which is - the coordinator for all the groups. Use only if all groups are coordinated - by the same broker. If set to None, will query the cluster to find the coordinator - for every single group. Explicitly specifying this can be useful to prevent - that extra network round trips if you already know the group coordinator. - Default: None. - - Returns: - A list of tuples (group_id, KafkaError) - """ - return self._manager.run(self._async_delete_consumer_groups, group_ids, group_coordinator_id) - - def _convert_delete_groups_response(self, response): - """Parse the DeleteGroupsResponse, mapping group IDs to their respective errors. - - Arguments: - response: A DeleteGroupsResponse object from the broker. - - Returns: - A list of (group_id, KafkaError) for each deleted group. - """ - if response.API_VERSION <= 1: - results = [] - for group_id, error_code in response.results: - results.append((group_id, Errors.for_code(error_code))) - return results - else: - raise NotImplementedError( - "Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient." - .format(response.API_VERSION)) - - def _delete_consumer_groups_request(self, group_ids): - """Build a DeleteGroupsRequest to send to a broker (the group coordinator). - - Arguments: - group_ids ([str]): A list of consumer group IDs to be deleted. - - Returns: - A DeleteGroupsRequest object. - """ - version = self._client.api_version(DeleteGroupsRequest, max_version=1) - return DeleteGroupsRequest[version](group_ids) - - @staticmethod - def _convert_topic_partitions(topic_partitions): - return [ - ( - topic, - partition_ids - ) - for topic, partition_ids in topic_partitions.items() - ] - - def _get_all_topic_partitions(self): - return [ - ( - topic, - [partition_info.partition for partition_info in self._client.cluster._partitions[topic].values()] - ) - for topic in self._client.cluster.topics() - ] - - def _get_topic_partitions(self, topic_partitions): - if topic_partitions is None: - return self._get_all_topic_partitions() - return self._convert_topic_partitions(topic_partitions) - - def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None, raise_errors=True): - """Perform leader election on the topic partitions. - - Arguments: - election_type: Type of election to attempt. 0 for Preferred, 1 for Unclean - - Keyword Arguments: - topic_partitions (dict): A map of topic name strings to partition ids list. - By default, will run on all topic partitions - timeout_ms (num, optional): Milliseconds to wait for the leader election process to complete - before the broker returns. - raise_errors (bool, optional): True/False whether to raise errors as exceptions. Default True. - - Returns: - Appropriate version of ElectLeadersResponse class. - """ - timeout_ms = self._validate_timeout(timeout_ms) - request = ElectLeadersRequest( - election_type=ElectionType(election_type), - topic_partitions=self._get_topic_partitions(topic_partitions), - timeout_ms=timeout_ms) - def response_errors(r): - if r.API_VERSION >= 1: - yield Errors.for_code(r.error_code) - for result in r.replica_election_results: - for partition in result.partition_result: - yield Errors.for_code(partition.error_code) - ignore_errors = (Errors.ElectionNotNeededError,) - return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors) - - async def _async_describe_log_dirs(self): - version = self._client.api_version(DescribeLogDirsRequest, max_version=0) - return await self._manager.send(DescribeLogDirsRequest[version]()) - - def describe_log_dirs(self): - """Send a DescribeLogDirsRequest request to a broker. - - Returns: - DescribeLogDirsResponse object - """ - return self._manager.run(self._async_describe_log_dirs) diff --git a/kafka/admin/config_resource.py b/kafka/admin/config_resource.py deleted file mode 100644 index eb61dcd50..000000000 --- a/kafka/admin/config_resource.py +++ /dev/null @@ -1,35 +0,0 @@ -from enum import IntEnum - - -class ConfigResourceType(IntEnum): - """An enumerated type of config resources""" - - BROKER = 4, - TOPIC = 2 - - -class ConfigResource: - """A class for specifying config resources. - Arguments: - resource_type (ConfigResourceType): the type of kafka resource - name (string): The name of the kafka resource - configs ({key : value}): A maps of config keys to values. - """ - - def __init__( - self, - resource_type, - name, - configs=None - ): - if not isinstance(resource_type, (ConfigResourceType)): - resource_type = ConfigResourceType[str(resource_type).upper()] # pylint: disable-msg=unsubscriptable-object - self.resource_type = resource_type - self.name = name - self.configs = configs - - def __str__(self): - return "ConfigResource %s=%s" % (self.resource_type, self.name) - - def __repr__(self): - return "ConfigResource(%s, %s, %s)" % (self.resource_type, self.name, self.configs) diff --git a/kafka/admin/new_partitions.py b/kafka/admin/new_partitions.py deleted file mode 100644 index 613fb861e..000000000 --- a/kafka/admin/new_partitions.py +++ /dev/null @@ -1,16 +0,0 @@ -class NewPartitions: - """A class for new partition creation on existing topics. Note that the length of new_assignments, if specified, - must be the difference between the new total number of partitions and the existing number of partitions. - Arguments: - total_count (int): the total number of partitions that should exist on the topic - new_assignments ([[int]]): an array of arrays of replica assignments for new partitions. - If not set, broker assigns replicas per an internal algorithm. - """ - - def __init__( - self, - total_count, - new_assignments=None - ): - self.total_count = total_count - self.new_assignments = new_assignments diff --git a/kafka/admin/new_topic.py b/kafka/admin/new_topic.py deleted file mode 100644 index 2c53d87bb..000000000 --- a/kafka/admin/new_topic.py +++ /dev/null @@ -1,26 +0,0 @@ -class NewTopic: - """ A class for new topic creation - Arguments: - name (string): name of the topic - num_partitions (int): number of partitions - or -1 if replica_assignment has been specified - replication_factor (int): replication factor or -1 if - replica assignment is specified - replica_assignment (dict of int: [int]): A mapping containing - partition id and replicas to assign to it. - topic_configs (dict of str: str): A mapping of config key - and value for the topic. - """ - def __init__( - self, - name, - num_partitions=-1, - replication_factor=-1, - replica_assignments=None, - topic_configs=None, - ): - self.name = name - self.num_partitions = num_partitions - self.replication_factor = replication_factor - self.replica_assignments = replica_assignments or {} - self.topic_configs = topic_configs or {} diff --git a/kafka/cli/admin/configs/describe.py b/kafka/cli/admin/configs/describe.py index 0f32a744f..be5439cb9 100644 --- a/kafka/cli/admin/configs/describe.py +++ b/kafka/cli/admin/configs/describe.py @@ -1,4 +1,4 @@ -from kafka.admin.config_resource import ConfigResource +from kafka.admin import ConfigResource class DescribeConfigs: diff --git a/test/admin/test_acl_comparisons.py b/test/admin/test_acl_comparisons.py index 291bf0e2f..77ea6e394 100644 --- a/test/admin/test_acl_comparisons.py +++ b/test/admin/test_acl_comparisons.py @@ -1,9 +1,7 @@ -from kafka.admin.acl_resource import ACL -from kafka.admin.acl_resource import ACLOperation -from kafka.admin.acl_resource import ACLPermissionType -from kafka.admin.acl_resource import ResourcePattern -from kafka.admin.acl_resource import ResourceType -from kafka.admin.acl_resource import ACLResourcePatternType +from kafka.admin import ( + ACL, ACLOperation, ACLPermissionType, ResourcePattern, + ResourceType, ACLResourcePatternType, +) def test_different_acls_are_different():