Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion kafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from kafka.admin.client import KafkaAdminClient
from kafka.admin._acls import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
ResourceType, ACLPermissionType, ACLResourcePatternType)
from kafka.admin._configs import ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType
from kafka.admin._configs import (
ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType)
from kafka.admin._topics import NewTopic, NewPartitions
from kafka.admin._users import (
ScramMechanism, UserScramCredentialDeletion, UserScramCredentialUpsertion)

__all__ = [
'KafkaAdminClient',
'ACL', 'ACLFilter', 'ACLOperation', 'ACLPermissionType', 'ACLResourcePatternType',
'ResourceType', 'ResourcePattern', 'ResourcePatternFilter',
'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType',
'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion',
]
137 changes: 41 additions & 96 deletions kafka/admin/_acls.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
class ACLAdminMixin:
"""Mixin providing ACL management methods for KafkaAdminClient."""
_manager: KafkaConnectionManager
_client: object
config: dict

# ACL Helper for Metadata / DescribeGroups
Expand All @@ -45,34 +44,18 @@ def describe_acls(self, acl_filter):
Returns:
tuple of a list of matching ACL objects and a KafkaError (NoError if successful)
"""

version = self._client.api_version(DescribeAclsRequest, max_version=1)
if version == 0:
request = DescribeAclsRequest[version](
resource_type_filter=acl_filter.resource_pattern.resource_type,
resource_name_filter=acl_filter.resource_pattern.resource_name,
principal_filter=acl_filter.principal,
host_filter=acl_filter.host,
operation=acl_filter.operation,
permission_type=acl_filter.permission_type
)
elif version <= 1:
request = DescribeAclsRequest[version](
resource_type_filter=acl_filter.resource_pattern.resource_type,
resource_name_filter=acl_filter.resource_pattern.resource_name,
pattern_type_filter=acl_filter.resource_pattern.pattern_type,
principal_filter=acl_filter.principal,
host_filter=acl_filter.host,
operation=acl_filter.operation,
permission_type=acl_filter.permission_type
)
min_version = 3 if acl_filter.resource_pattern.resource_type == ResourceType.USER else 0
request = DescribeAclsRequest(
min_version=min_version,
resource_type_filter=acl_filter.resource_pattern.resource_type,
resource_name_filter=acl_filter.resource_pattern.resource_name,
pattern_type_filter=acl_filter.resource_pattern.pattern_type,
principal_filter=acl_filter.principal,
host_filter=acl_filter.host,
operation=acl_filter.operation,
permission_type=acl_filter.permission_type
)
response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))

return self._convert_describe_acls_response_to_acls(response)

@staticmethod
Expand All @@ -88,9 +71,7 @@ def _convert_describe_acls_response_to_acls(describe_response):
"""
error_type = Errors.for_code(describe_response.error_code)
if error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format("DescribeAclsRequest", describe_response))
raise error_type(describe_response.error_message)
acl_list = []
for resource in describe_response.resources:
for acl in resource.acls:
Expand All @@ -107,28 +88,17 @@ def _convert_describe_acls_response_to_acls(describe_response):
return acl_list, Errors.NoError

@staticmethod
def _convert_create_acls_resource_request_v0(acl):
"""Convert an ACL object into the CreateAclsRequest v0 format."""
return (
acl.resource_pattern.resource_type,
acl.resource_pattern.resource_name,
acl.principal,
acl.host,
acl.operation,
acl.permission_type
)

@staticmethod
def _convert_create_acls_resource_request_v1(acl):
"""Convert an ACL object into the CreateAclsRequest v1 format."""
return (
acl.resource_pattern.resource_type,
acl.resource_pattern.resource_name,
acl.resource_pattern.pattern_type,
acl.principal,
acl.host,
acl.operation,
acl.permission_type
def _convert_create_acls_resource_request(acl):
"""Convert an ACL object into the CreateAclsRequest format."""
_AclCreate = CreateAclsRequest.AclCreation
return _AclCreate(
resource_type=acl.resource_pattern.resource_type,
resource_name=acl.resource_pattern.resource_name,
resource_pattern_type=acl.resource_pattern.pattern_type,
principal=acl.principal,
host=acl.host,
operation=acl.operation,
permission_type=acl.permission_type
)

@staticmethod
Expand All @@ -155,46 +125,28 @@ def create_acls(self, acls):
Returns:
dict of successes and failures
"""

for acl in acls:
if not isinstance(acl, ACL):
raise IllegalArgumentError("acls must contain ACL objects")

version = self._client.api_version(CreateAclsRequest, max_version=1)
if version == 0:
request = CreateAclsRequest[version](
creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls]
)
elif version <= 1:
request = CreateAclsRequest[version](
creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls]
)
creations = [self._convert_create_acls_resource_request(acl) for acl in acls]
min_version = 3 if any(creation.resource_type == ResourceType.USER for creation in creations) else 0
request = CreateAclsRequest(creations=creations, min_version=min_version)
response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606
return self._convert_create_acls_response_to_acls(acls, response)

@staticmethod
def _convert_delete_acls_resource_request_v0(acl):
"""Convert an ACLFilter object into the DeleteAclsRequest v0 format."""
return (
acl.resource_pattern.resource_type,
acl.resource_pattern.resource_name,
acl.principal,
acl.host,
acl.operation,
acl.permission_type
)

@staticmethod
def _convert_delete_acls_resource_request_v1(acl):
"""Convert an ACLFilter object into the DeleteAclsRequest v1 format."""
return (
acl.resource_pattern.resource_type,
acl.resource_pattern.resource_name,
acl.resource_pattern.pattern_type,
acl.principal,
acl.host,
acl.operation,
acl.permission_type
def _convert_delete_acls_resource_request(acl):
"""Convert an ACLFilter object into the DeleteAclsRequest format."""
_AclsFilter = DeleteAclsRequest.DeleteAclsFilter
return _AclsFilter(
resource_type_filter=acl.resource_pattern.resource_type,
resource_name_filter=acl.resource_pattern.resource_name,
pattern_type_filter=acl.resource_pattern.pattern_type,
principal_filter=acl.principal,
host_filter=acl.host,
operation=acl.operation,
permission_type=acl.permission_type
)

@staticmethod
Expand Down Expand Up @@ -232,21 +184,13 @@ def delete_acls(self, acl_filters):
a list of 3-tuples corresponding to the list of input filters.
The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance)
"""

for acl in acl_filters:
if not isinstance(acl, ACLFilter):
raise IllegalArgumentError("acl_filters must contain ACLFilter type objects")

version = self._client.api_version(DeleteAclsRequest, max_version=1)

if version == 0:
request = DeleteAclsRequest[version](
filters=[self._convert_delete_acls_resource_request_v0(acl) for acl in acl_filters]
)
elif version <= 1:
request = DeleteAclsRequest[version](
filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters]
)
filters = [self._convert_delete_acls_resource_request(acl) for acl in acl_filters]
min_version = 3 if any(_filter.resource_type_filter == ResourceType.USER for _filter in filters) else 0
request = DeleteAclsRequest(filters=filters, min_version=min_version)
response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606
return self._convert_delete_acls_response_to_matching_acls(acl_filters, response)

Expand All @@ -268,6 +212,7 @@ class ResourceType(IntEnum):
CLUSTER = 4
TRANSACTIONAL_ID = 5
DELEGATION_TOKEN = 6
USER = 7


class ACLOperation(IntEnum):
Expand Down
88 changes: 42 additions & 46 deletions kafka/admin/_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
from typing import TYPE_CHECKING

from kafka.errors import IncompatibleBrokerVersion
from kafka.protocol.admin import AlterConfigsRequest, DescribeConfigsRequest
from kafka.protocol.admin import (
AlterConfigsRequest,
DescribeConfigsRequest,
)

if TYPE_CHECKING:
from kafka.net.manager import KafkaConnectionManager
Expand All @@ -22,45 +25,46 @@
class ConfigAdminMixin:
"""Mixin providing configuration management methods for KafkaAdminClient."""
_manager: KafkaConnectionManager
_client: object
config: dict

@staticmethod
def _convert_describe_config_resource_request(config_resource):
return (
config_resource.resource_type,
config_resource.name,
list(config_resource.configs.keys()) if isinstance(config_resource.configs, dict) else config_resource.configs
)

async def _async_describe_configs(self, config_resources, include_synonyms=False):
def _convert_config_resource(config_resource, key_only=True):
if key_only:
values = list(config_resource.configs.keys()) if isinstance(config_resource.configs, dict) else config_resource.configs
else:
assert isinstance(config_resource.configs, dict)
values = list(config_resource.configs.items())
return (config_resource.resource_type, config_resource.name, values)

def _group_config_resources(self, config_resources, key_only=True):
broker_resources = defaultdict(list)
other_resources = []

for config_resource in config_resources:
if config_resource.resource_type in (ConfigResourceType.BROKER, ConfigResourceType.BROKER_LOGGER):
try:
broker_id = int(config_resource.name)
except ValueError:
raise ValueError("Broker resource names must be an integer or a string represented integer")
broker_resources[broker_id].append(self._convert_describe_config_resource_request(config_resource))
broker_resources[broker_id].append(self._convert_config_resource(config_resource, key_only=key_only))
else:
other_resources.append(self._convert_describe_config_resource_request(config_resource))

version = self._client.api_version(DescribeConfigsRequest, max_version=2)
if include_synonyms and version == 0:
raise IncompatibleBrokerVersion(
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
.format(self._manager.broker_version))
other_resources.append(self._convert_config_resource(config_resource, key_only=key_only))
return broker_resources, other_resources

async def _async_describe_configs(self, config_resources, include_synonyms=False):
min_version = 1 if include_synonyms else 0
broker_resources, other_resources = self._group_config_resources(config_resources, key_only=True)
responses = []
for broker_id, resources in broker_resources.items():
request = DescribeConfigsRequest(
resources=resources,
include_synonyms=include_synonyms)
include_synonyms=include_synonyms,
min_version=min_version)
responses.append(await self._manager.send(request, node_id=broker_id))
if other_resources:
request = DescribeConfigsRequest(resources=other_resources, include_synonyms=include_synonyms)
request = DescribeConfigsRequest(
resources=other_resources,
include_synonyms=include_synonyms,
min_version=min_version)
responses.append(await self._manager.send(request))

ret = defaultdict(dict)
Expand Down Expand Up @@ -100,39 +104,31 @@ def describe_configs(self, config_resources, include_synonyms=False):
"""
return self._manager.run(self._async_describe_configs, config_resources, include_synonyms)

@staticmethod
def _convert_alter_config_resource_request(config_resource):
return (
config_resource.resource_type,
config_resource.name,
[
(config_key, config_value) for config_key, config_value in config_resource.configs.items()
]
)

async def _async_alter_configs(self, config_resources):
version = self._client.api_version(AlterConfigsRequest, max_version=1)
request = AlterConfigsRequest[version](
resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
)
# TODO: BROKER resources should be sent to the specific broker
return await self._manager.send(request)

def alter_configs(self, config_resources):
"""Alter configuration parameters of one or more Kafka resources.
async def _async_alter_configs(self, config_resources, validate_only=False):
broker_resources, other_resources = self._group_config_resources(config_resources, key_only=False)
responses = []
for broker_id, resources in broker_resources.items():
request = AlterConfigsRequest(
resources=resources,
validate_only=validate_only)
responses.append(await self._manager.send(request, node_id=broker_id))
if other_resources:
request = AlterConfigsRequest(
resources=other_resources,
validate_only=validate_only)
responses.append(await self._manager.send(request))
return responses

Warning:
This is currently broken for BROKER resources because those must be
sent to that specific broker, versus this always picks the
least-loaded node.
def alter_configs(self, config_resources, validate_only=False):
"""Alter configuration parameters of one or more Kafka resources.

Arguments:
config_resources: A list of ConfigResource objects.

Returns:
Appropriate version of AlterConfigsResponse class.
"""
return self._manager.run(self._async_alter_configs, config_resources)
return self._manager.run(self._async_alter_configs, config_resources, validate_only)


class ConfigResourceType(IntEnum):
Expand Down
17 changes: 4 additions & 13 deletions kafka/admin/_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,20 @@
class GroupAdminMixin:
"""Mixin providing consumer group management methods for KafkaAdminClient."""
_manager: KafkaConnectionManager
_client: object
_coordinator_cache: dict
config: dict

# -- Describe consumer groups ----------------------------------------------

def _describe_consumer_groups_request(self, group_id):
version = self._client.api_version(DescribeGroupsRequest, max_version=3)
if version <= 2:
request = DescribeGroupsRequest[version](groups=(group_id,))
else:
request = DescribeGroupsRequest[version](
groups=(group_id,),
include_authorized_operations=True
)
request = DescribeGroupsRequest(
groups=[group_id],
include_authorized_operations=True
)
return request

def _describe_consumer_groups_process_response(self, response):
"""Process a DescribeGroupsResponse into a group description."""
if response.API_VERSION > 3:
raise NotImplementedError(
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."
.format(response.API_VERSION))
assert len(response.groups) == 1
for group in response.groups:
for member in group.members:
Expand Down
Loading
Loading