From 6fd84a5174f145e59fe0e38dece883f125c2eb90 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 15 Apr 2026 17:40:32 -0700 Subject: [PATCH 01/12] Admin cli: support acls --- kafka/admin/_acls.py | 10 +---- kafka/cli/admin/__init__.py | 8 +--- kafka/cli/admin/acls/__init__.py | 16 +++++++ kafka/cli/admin/acls/common.py | 76 ++++++++++++++++++++++++++++++++ kafka/cli/admin/acls/create.py | 19 ++++++++ kafka/cli/admin/acls/delete.py | 23 ++++++++++ kafka/cli/admin/acls/describe.py | 16 +++++++ 7 files changed, 153 insertions(+), 15 deletions(-) create mode 100644 kafka/cli/admin/acls/__init__.py create mode 100644 kafka/cli/admin/acls/common.py create mode 100644 kafka/cli/admin/acls/create.py create mode 100644 kafka/cli/admin/acls/delete.py create mode 100644 kafka/cli/admin/acls/describe.py diff --git a/kafka/admin/_acls.py b/kafka/admin/_acls.py index 85f7be4c9..f312e0e70 100644 --- a/kafka/admin/_acls.py +++ b/kafka/admin/_acls.py @@ -67,12 +67,6 @@ def describe_acls(self, acl_filter): permission_type=acl_filter.permission_type ) response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 - error_type = Errors.for_code(response.error_code) - if error_type is not Errors.NoError: - raise error_type( - "Request '{}' failed with response '{}'." - .format(request, response)) - return self._convert_describe_acls_response_to_acls(response) @staticmethod @@ -88,9 +82,7 @@ def _convert_describe_acls_response_to_acls(describe_response): """ error_type = Errors.for_code(describe_response.error_code) if error_type is not Errors.NoError: - raise error_type( - "Request '{}' failed with response '{}'." - .format("DescribeAclsRequest", describe_response)) + raise error_type(describe_response.error_message) acl_list = [] for resource in describe_response.resources: for acl in resource.acls: diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index de4bdb40e..e18ff96bd 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -4,6 +4,7 @@ from pprint import pprint from kafka.admin.client import KafkaAdminClient +from .acls import ACLsSubCommand from .cluster import ClusterSubCommand from .configs import ConfigsSubCommand from .consumer_groups import ConsumerGroupsSubCommand @@ -46,7 +47,7 @@ def build_kwargs(props): def run_cli(args=None): parser = main_parser() subparsers = parser.add_subparsers(help='subcommands') - for cmd in [ClusterSubCommand, ConfigsSubCommand, + for cmd in [ACLsSubCommand, ClusterSubCommand, ConfigsSubCommand, TopicsSubCommand, ConsumerGroupsSubCommand]: cmd.add_subparser(subparsers) config = parser.parse_args(args) @@ -97,11 +98,6 @@ def run_cli(args=None): # Commands TODO: - # [acls] - # describe - # create - # delete - # [configs] # alter # IncrementalAlterConfigs (not supported yet) diff --git a/kafka/cli/admin/acls/__init__.py b/kafka/cli/admin/acls/__init__.py new file mode 100644 index 000000000..f3c3a4566 --- /dev/null +++ b/kafka/cli/admin/acls/__init__.py @@ -0,0 +1,16 @@ +import sys + +from .create import CreateACLs +from .delete import DeleteACLs +from .describe import DescribeACLs + + +class ACLsSubCommand: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('acls', help='Manage Kafka ACLs') + commands = parser.add_subparsers() + for cmd in [DescribeACLs, CreateACLs, DeleteACLs]: + cmd.add_subparser(commands) + parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/acls/common.py b/kafka/cli/admin/acls/common.py new file mode 100644 index 000000000..b94ef22c0 --- /dev/null +++ b/kafka/cli/admin/acls/common.py @@ -0,0 +1,76 @@ +from kafka.admin._acls import ( + ACLFilter, ACLOperation, ACLPermissionType, ACLResourcePatternType, + ResourceType, ResourcePatternFilter +) + + +def _enum_names(enum_cls): + return [e.name.lower() for e in enum_cls] + + +def add_acl_filter_args(parser): + """Add arguments for building an ACLFilter (used by describe and delete).""" + parser.add_argument('--principal', type=str, default=None, help='ACL principal (e.g. User:alice)') + parser.add_argument('--host', type=str, default=None, help='ACL host (default: match any)') + parser.add_argument('--operation', type=str, default='any', + choices=_enum_names(ACLOperation), + help='ACL operation (default: any)') + parser.add_argument('--permission-type', type=str, default='any', + choices=_enum_names(ACLPermissionType), + help='ACL permission type (default: any)') + parser.add_argument('--resource-type', type=str, default='any', + choices=_enum_names(ResourceType), + help='Resource type (default: any)') + parser.add_argument('--resource-name', type=str, default=None, help='Resource name') + parser.add_argument('--pattern-type', type=str, default='any', + choices=_enum_names(ACLResourcePatternType), + help='Resource pattern type (default: any)') + + +def add_acl_args(parser, required=False): + """Add arguments for building a concrete ACL (used by create).""" + parser.add_argument('--principal', type=str, required=required, help='ACL principal (e.g. User:alice)') + parser.add_argument('--host', type=str, default='*', help='ACL host (default: *)') + parser.add_argument('--operation', type=str, required=required, + choices=_enum_names(ACLOperation), + help='ACL operation') + parser.add_argument('--permission-type', type=str, default='allow', + choices=_enum_names(ACLPermissionType), + help='ACL permission type (default: allow)') + parser.add_argument('--resource-type', type=str, required=required, + choices=_enum_names(ResourceType), + help='Resource type') + parser.add_argument('--resource-name', type=str, required=required, help='Resource name') + parser.add_argument('--pattern-type', type=str, default='literal', + choices=_enum_names(ACLResourcePatternType), + help='Resource pattern type (default: literal)') + + +def acl_filter_from_args(args): + """Build an ACLFilter from parsed CLI arguments.""" + return ACLFilter( + principal=args.principal, + host=args.host, + operation=ACLOperation[args.operation.upper()], + permission_type=ACLPermissionType[args.permission_type.upper()], + resource_pattern=ResourcePatternFilter( + resource_type=ResourceType[args.resource_type.upper()], + resource_name=args.resource_name, + pattern_type=ACLResourcePatternType[args.pattern_type.upper()], + ), + ) + + +def acl_from_args(args): + """Build an ACL from parsed CLI arguments.""" + return ACL( + principal=args.principal, + host=args.host or '*', + operation=ACLOperation[args.operation.upper()], + permission_type=ACLPermissionType[args.permission_type.upper()], + resource_pattern=ResourcePattern( + resource_type=ResourceType[args.resource_type.upper()], + resource_name=args.resource_name, + pattern_type=ACLResourcePatternType[args.pattern_type.upper()], + ), + ) diff --git a/kafka/cli/admin/acls/create.py b/kafka/cli/admin/acls/create.py new file mode 100644 index 000000000..3230e66bb --- /dev/null +++ b/kafka/cli/admin/acls/create.py @@ -0,0 +1,19 @@ +from .common import add_acl_args, acl_from_args + + +class CreateACLs: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('create', help='Create Kafka ACLs') + add_acl_args(parser, required=True) + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + acls = acl_from_args(args) + result = client.create_acls([acl]) + return { + 'succeeded': [repr(a) for a in result['succeeded']], + 'failed': [str(e) for e in result['failed']], + } diff --git a/kafka/cli/admin/acls/delete.py b/kafka/cli/admin/acls/delete.py new file mode 100644 index 000000000..ebe580039 --- /dev/null +++ b/kafka/cli/admin/acls/delete.py @@ -0,0 +1,23 @@ +from .common import add_acl_filter_args, acl_filter_from_args + + +class DeleteACLs: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('delete', help='Delete Kafka ACLs') + add_acl_filter_args(parser) + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + acl_filter = acl_filter_from_args(args) + results = client.delete_acls([acl_filter]) + output = [] + for acl_filter, matching_acls, error in results: + output.append({ + 'filter': repr(acl_filter), + 'deleted': [repr(acl) for acl in matching_acls], + 'error': str(error), + }) + return output diff --git a/kafka/cli/admin/acls/describe.py b/kafka/cli/admin/acls/describe.py new file mode 100644 index 000000000..a3dd548c2 --- /dev/null +++ b/kafka/cli/admin/acls/describe.py @@ -0,0 +1,16 @@ +from .common import add_acl_filter_args, acl_filter_from_args + + +class DescribeACLs: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('describe', help='Describe Kafka ACLs') + add_acl_filter_args(parser) + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + acl_filter = acl_filter_from_args(args) + acls, error = client.describe_acls(acl_filter) + return [repr(acl) for acl in acls] From 63ba2aab49121a23d172e1955cfbb919dc7ca3e5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 16 Apr 2026 08:12:45 -0700 Subject: [PATCH 02/12] Add USER (7) to acl resource types --- kafka/admin/_acls.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/admin/_acls.py b/kafka/admin/_acls.py index f312e0e70..2019d7ef6 100644 --- a/kafka/admin/_acls.py +++ b/kafka/admin/_acls.py @@ -260,6 +260,7 @@ class ResourceType(IntEnum): CLUSTER = 4 TRANSACTIONAL_ID = 5 DELEGATION_TOKEN = 6 + USER = 7 class ACLOperation(IntEnum): From 97025f00ce103bfb41eab247aafcb8580140b066 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 15 Apr 2026 17:39:22 -0700 Subject: [PATCH 03/12] Admin: fix alter configs --- kafka/admin/_configs.py | 82 +++++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 44 deletions(-) diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index a54994694..1402f04c1 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -26,41 +26,43 @@ class ConfigAdminMixin: config: dict @staticmethod - def _convert_describe_config_resource_request(config_resource): - return ( - config_resource.resource_type, - config_resource.name, - list(config_resource.configs.keys()) if isinstance(config_resource.configs, dict) else config_resource.configs - ) - - async def _async_describe_configs(self, config_resources, include_synonyms=False): + def _convert_config_resource(config_resource, key_only=True): + if key_only: + values = list(config_resource.configs.keys()) if isinstance(config_resource.configs, dict) else config_resource.configs + else: + assert isinstance(config_resource.configs, dict) + values = list(config_resource.configs.items()) + return (config_resource.resource_type, config_resource.name, values) + + def _group_config_resources(self, config_resources, key_only=True): broker_resources = defaultdict(list) other_resources = [] - for config_resource in config_resources: if config_resource.resource_type in (ConfigResourceType.BROKER, ConfigResourceType.BROKER_LOGGER): try: broker_id = int(config_resource.name) except ValueError: raise ValueError("Broker resource names must be an integer or a string represented integer") - broker_resources[broker_id].append(self._convert_describe_config_resource_request(config_resource)) + broker_resources[broker_id].append(self._convert_config_resource(config_resource, key_only=key_only)) else: - other_resources.append(self._convert_describe_config_resource_request(config_resource)) - - version = self._client.api_version(DescribeConfigsRequest, max_version=2) - if include_synonyms and version == 0: - raise IncompatibleBrokerVersion( - "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}." - .format(self._manager.broker_version)) + other_resources.append(self._convert_config_resource(config_resource, key_only=key_only)) + return broker_resources, other_resources + async def _async_describe_configs(self, config_resources, include_synonyms=False): + min_version = 1 if include_synonyms else 0 + broker_resources, other_resources = self._group_config_resources(config_resources, key_only=True) responses = [] for broker_id, resources in broker_resources.items(): request = DescribeConfigsRequest( resources=resources, - include_synonyms=include_synonyms) + include_synonyms=include_synonyms, + min_version=min_version) responses.append(await self._manager.send(request, node_id=broker_id)) if other_resources: - request = DescribeConfigsRequest(resources=other_resources, include_synonyms=include_synonyms) + request = DescribeConfigsRequest( + resources=other_resources, + include_synonyms=include_synonyms, + min_version=min_version) responses.append(await self._manager.send(request)) ret = defaultdict(dict) @@ -100,31 +102,23 @@ def describe_configs(self, config_resources, include_synonyms=False): """ return self._manager.run(self._async_describe_configs, config_resources, include_synonyms) - @staticmethod - def _convert_alter_config_resource_request(config_resource): - return ( - config_resource.resource_type, - config_resource.name, - [ - (config_key, config_value) for config_key, config_value in config_resource.configs.items() - ] - ) - - async def _async_alter_configs(self, config_resources): - version = self._client.api_version(AlterConfigsRequest, max_version=1) - request = AlterConfigsRequest[version]( - resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] - ) - # TODO: BROKER resources should be sent to the specific broker - return await self._manager.send(request) - - def alter_configs(self, config_resources): - """Alter configuration parameters of one or more Kafka resources. + async def _async_alter_configs(self, config_resources, validate_only=False): + broker_resources, other_resources = self._group_config_resources(config_resources, key_only=False) + responses = [] + for broker_id, resources in broker_resources.items(): + request = AlterConfigsRequest( + resources=resources, + validate_only=validate_only) + responses.append(await self._manager.send(request, node_id=broker_id)) + if other_resources: + request = AlterConfigsRequest( + resources=other_resources, + validate_only=validate_only) + responses.append(await self._manager.send(request)) + return responses - Warning: - This is currently broken for BROKER resources because those must be - sent to that specific broker, versus this always picks the - least-loaded node. + def alter_configs(self, config_resources, validate_only=False): + """Alter configuration parameters of one or more Kafka resources. Arguments: config_resources: A list of ConfigResource objects. @@ -132,7 +126,7 @@ def alter_configs(self, config_resources): Returns: Appropriate version of AlterConfigsResponse class. """ - return self._manager.run(self._async_alter_configs, config_resources) + return self._manager.run(self._async_alter_configs, config_resources, validate_only) class ConfigResourceType(IntEnum): From dab425a3bb8e7311bc5cd20d7dd0fea6ed1a3da3 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 15 Apr 2026 17:46:36 -0700 Subject: [PATCH 04/12] Admin cli: support alter configs --- kafka/cli/admin/configs/__init__.py | 3 ++- kafka/cli/admin/configs/alter.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 kafka/cli/admin/configs/alter.py diff --git a/kafka/cli/admin/configs/__init__.py b/kafka/cli/admin/configs/__init__.py index 75b555f40..4eab39f1c 100644 --- a/kafka/cli/admin/configs/__init__.py +++ b/kafka/cli/admin/configs/__init__.py @@ -1,5 +1,6 @@ import sys +from .alter import AlterConfigs from .describe import DescribeConfigs @@ -9,6 +10,6 @@ class ConfigsSubCommand: def add_subparser(cls, subparsers): parser = subparsers.add_parser('configs', help='Manage Kafka Configuration') commands = parser.add_subparsers() - for cmd in [DescribeConfigs]: + for cmd in [DescribeConfigs, AlterConfigs]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/configs/alter.py b/kafka/cli/admin/configs/alter.py new file mode 100644 index 000000000..7fe15601a --- /dev/null +++ b/kafka/cli/admin/configs/alter.py @@ -0,0 +1,28 @@ +from kafka.admin import ConfigResource + + +class AlterConfigs: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('alter', help='Alter Kafka Configs') + parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[]) + parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[]) + parser.add_argument('--broker-logger', type=str, action='append', dest='broker_loggers', default=[]) + parser.add_argument('-g', '--group', type=str, action='append', dest='groups', default=[]) + parser.add_argument('-c', '--config', type=str, action='append', dest='configs', default=None, help='key=value to alter') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + configs = dict(config.split('=') for config in args.configs) + resources = [] + for topic in args.topics: + resources.append(ConfigResource('TOPIC', topic, configs)) + for broker in args.brokers: + resources.append(ConfigResource('BROKER', broker, configs)) + for broker in args.broker_loggers: + resources.append(ConfigResource('BROKER_LOGGER', broker, configs)) + for group in args.groups: + resources.append(ConfigResource('GROUP', group, configs)) + return client.alter_configs(resources) From 2c31ee92708ec88342aa7f4a9cd93ed264fc6c7c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 16 Apr 2026 14:20:44 -0700 Subject: [PATCH 05/12] (minor) reorg kafka/protocol/admin and test/admin --- kafka/protocol/admin/__init__.py | 5 +- kafka/protocol/admin/cluster.py | 19 ------ kafka/protocol/admin/configs.py | 14 +++++ kafka/protocol/admin/topics.py | 11 ++++ ...t_acl_comparisons.py => test_admin_acl.py} | 35 +++++++++++ test/admin/test_admin_configs.py | 16 +++++ .../{test_admin.py => test_admin_topics.py} | 63 +++---------------- 7 files changed, 89 insertions(+), 74 deletions(-) create mode 100644 kafka/protocol/admin/configs.py rename test/admin/{test_acl_comparisons.py => test_admin_acl.py} (73%) create mode 100644 test/admin/test_admin_configs.py rename test/admin/{test_admin.py => test_admin_topics.py} (74%) diff --git a/kafka/protocol/admin/__init__.py b/kafka/protocol/admin/__init__.py index 4ddcbdfba..17bb2f76b 100644 --- a/kafka/protocol/admin/__init__.py +++ b/kafka/protocol/admin/__init__.py @@ -7,10 +7,13 @@ from .cluster import * from .cluster import __all__ as cluster_all +from .configs import * +from .configs import __all__ as configs_all + from .groups import * from .groups import __all__ as groups_all from .topics import * from .topics import __all__ as topics_all -__all__ = acl_all + client_quotas_all + cluster_all + groups_all + topics_all +__all__ = acl_all + client_quotas_all + cluster_all + configs_all + groups_all + topics_all diff --git a/kafka/protocol/admin/cluster.py b/kafka/protocol/admin/cluster.py index 8309824ee..f9049e694 100644 --- a/kafka/protocol/admin/cluster.py +++ b/kafka/protocol/admin/cluster.py @@ -1,5 +1,3 @@ -from enum import IntEnum - from ..api_message import ApiMessage @@ -10,28 +8,11 @@ def json_patch(cls, json): json['fields'][7]['type'] = 'bitfield' return json -class DescribeConfigsRequest(ApiMessage): pass -class DescribeConfigsResponse(ApiMessage): pass - -class AlterConfigsRequest(ApiMessage): pass -class AlterConfigsResponse(ApiMessage): pass - class DescribeLogDirsRequest(ApiMessage): pass class DescribeLogDirsResponse(ApiMessage): pass -class ElectLeadersRequest(ApiMessage): pass -class ElectLeadersResponse(ApiMessage): pass - -class ElectionType(IntEnum): - """Leader election type""" - PREFERRED = 0 - UNCLEAN = 1 - __all__ = [ 'DescribeClusterRequest', 'DescribeClusterResponse', - 'DescribeConfigsRequest', 'DescribeConfigsResponse', - 'AlterConfigsRequest', 'AlterConfigsResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', - 'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType', ] diff --git a/kafka/protocol/admin/configs.py b/kafka/protocol/admin/configs.py new file mode 100644 index 000000000..3ec7b7206 --- /dev/null +++ b/kafka/protocol/admin/configs.py @@ -0,0 +1,14 @@ +from ..api_message import ApiMessage + + +class AlterConfigsRequest(ApiMessage): pass +class AlterConfigsResponse(ApiMessage): pass + +class DescribeConfigsRequest(ApiMessage): pass +class DescribeConfigsResponse(ApiMessage): pass + + +__all__ = [ + 'AlterConfigsRequest', 'AlterConfigsResponse', + 'DescribeConfigsRequest', 'DescribeConfigsResponse', +] diff --git a/kafka/protocol/admin/topics.py b/kafka/protocol/admin/topics.py index d399921af..cc2962c13 100644 --- a/kafka/protocol/admin/topics.py +++ b/kafka/protocol/admin/topics.py @@ -1,3 +1,5 @@ +from enum import IntEnum + from ..api_message import ApiMessage @@ -30,6 +32,14 @@ class ListPartitionReassignmentsResponse(ApiMessage): pass class DeleteRecordsRequest(ApiMessage): pass class DeleteRecordsResponse(ApiMessage): pass +class ElectLeadersRequest(ApiMessage): pass +class ElectLeadersResponse(ApiMessage): pass + +class ElectionType(IntEnum): + """Leader election type""" + PREFERRED = 0 + UNCLEAN = 1 + __all__ = [ 'CreateTopicsRequest', 'CreateTopicsResponse', @@ -39,4 +49,5 @@ class DeleteRecordsResponse(ApiMessage): pass 'AlterPartitionReassignmentsRequest', 'AlterPartitionReassignmentsResponse', 'ListPartitionReassignmentsRequest', 'ListPartitionReassignmentsResponse', 'DeleteRecordsRequest', 'DeleteRecordsResponse', + 'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType', ] diff --git a/test/admin/test_acl_comparisons.py b/test/admin/test_admin_acl.py similarity index 73% rename from test/admin/test_acl_comparisons.py rename to test/admin/test_admin_acl.py index 77ea6e394..3707bf735 100644 --- a/test/admin/test_acl_comparisons.py +++ b/test/admin/test_admin_acl.py @@ -1,7 +1,10 @@ +import pytest + from kafka.admin import ( ACL, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACLResourcePatternType, ) +from kafka.errors import IllegalArgumentError def test_different_acls_are_different(): @@ -88,3 +91,35 @@ def test_same_acls_are_same(): assert one == two assert hash(one) == hash(two) assert len(set((one, two))) == 1 + + +def test_acl_resource(): + good_acl = ACL( + "User:bar", + "*", + ACLOperation.ALL, + ACLPermissionType.ALLOW, + ResourcePattern( + ResourceType.TOPIC, + "foo", + ACLResourcePatternType.LITERAL + ) + ) + + assert(good_acl.resource_pattern.resource_type == ResourceType.TOPIC) + assert(good_acl.operation == ACLOperation.ALL) + assert(good_acl.permission_type == ACLPermissionType.ALLOW) + assert(good_acl.resource_pattern.pattern_type == ACLResourcePatternType.LITERAL) + + with pytest.raises(IllegalArgumentError): + ACL( + "User:bar", + "*", + ACLOperation.ANY, + ACLPermissionType.ANY, + ResourcePattern( + ResourceType.TOPIC, + "foo", + ACLResourcePatternType.LITERAL + ) + ) diff --git a/test/admin/test_admin_configs.py b/test/admin/test_admin_configs.py new file mode 100644 index 000000000..9abe90047 --- /dev/null +++ b/test/admin/test_admin_configs.py @@ -0,0 +1,16 @@ +import pytest + +from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType + + +def test_config_resource(): + with pytest.raises(KeyError): + _bad_resource = ConfigResource('something', 'foo') + good_resource = ConfigResource('broker', 'bar') + assert good_resource.resource_type == ConfigResourceType.BROKER + assert good_resource.name == 'bar' + assert good_resource.configs is None + good_resource = ConfigResource(ConfigResourceType.TOPIC, 'baz', {'frob': 'nob'}) + assert good_resource.resource_type == ConfigResourceType.TOPIC + assert good_resource.name == 'baz' + assert good_resource.configs == {'frob': 'nob'} diff --git a/test/admin/test_admin.py b/test/admin/test_admin_topics.py similarity index 74% rename from test/admin/test_admin.py rename to test/admin/test_admin_topics.py index 5f782681e..53aedc449 100644 --- a/test/admin/test_admin.py +++ b/test/admin/test_admin_topics.py @@ -1,93 +1,48 @@ import pytest -import kafka.admin -from kafka.admin.client import KafkaAdminClient -from kafka.errors import IllegalArgumentError, KafkaTimeoutError, UnknownTopicOrPartitionError - - -def test_config_resource(): - with pytest.raises(KeyError): - _bad_resource = kafka.admin.ConfigResource('something', 'foo') - good_resource = kafka.admin.ConfigResource('broker', 'bar') - assert good_resource.resource_type == kafka.admin.ConfigResourceType.BROKER - assert good_resource.name == 'bar' - assert good_resource.configs is None - good_resource = kafka.admin.ConfigResource(kafka.admin.ConfigResourceType.TOPIC, 'baz', {'frob': 'nob'}) - assert good_resource.resource_type == kafka.admin.ConfigResourceType.TOPIC - assert good_resource.name == 'baz' - assert good_resource.configs == {'frob': 'nob'} +from kafka.admin import KafkaAdminClient, NewTopic, NewPartitions +from kafka.errors import KafkaTimeoutError, UnknownTopicOrPartitionError def test_new_partitions(): - good_partitions = kafka.admin.NewPartitions(6) + good_partitions = NewPartitions(6) assert good_partitions.total_count == 6 assert good_partitions.new_assignments is None - good_partitions = kafka.admin.NewPartitions(7, [[1, 2, 3]]) + good_partitions = NewPartitions(7, [[1, 2, 3]]) assert good_partitions.total_count == 7 assert good_partitions.new_assignments == [[1, 2, 3]] -def test_acl_resource(): - good_acl = kafka.admin.ACL( - "User:bar", - "*", - kafka.admin.ACLOperation.ALL, - kafka.admin.ACLPermissionType.ALLOW, - kafka.admin.ResourcePattern( - kafka.admin.ResourceType.TOPIC, - "foo", - kafka.admin.ACLResourcePatternType.LITERAL - ) - ) - - assert(good_acl.resource_pattern.resource_type == kafka.admin.ResourceType.TOPIC) - assert(good_acl.operation == kafka.admin.ACLOperation.ALL) - assert(good_acl.permission_type == kafka.admin.ACLPermissionType.ALLOW) - assert(good_acl.resource_pattern.pattern_type == kafka.admin.ACLResourcePatternType.LITERAL) - - with pytest.raises(IllegalArgumentError): - kafka.admin.ACL( - "User:bar", - "*", - kafka.admin.ACLOperation.ANY, - kafka.admin.ACLPermissionType.ANY, - kafka.admin.ResourcePattern( - kafka.admin.ResourceType.TOPIC, - "foo", - kafka.admin.ACLResourcePatternType.LITERAL - ) - ) - def test_new_topic(): - good_topic = kafka.admin.NewTopic('foo') + good_topic = NewTopic('foo') assert good_topic.name == 'foo' assert good_topic.num_partitions == -1 assert good_topic.replication_factor == -1 assert good_topic.replica_assignments == {} assert good_topic.topic_configs == {} - good_topic = kafka.admin.NewTopic('foo', 1) + good_topic = NewTopic('foo', 1) assert good_topic.name == 'foo' assert good_topic.num_partitions == 1 assert good_topic.replication_factor == -1 assert good_topic.replica_assignments == {} assert good_topic.topic_configs == {} - good_topic = kafka.admin.NewTopic('foo', 1, 1, {1: [1, 1, 1]}) + good_topic = NewTopic('foo', 1, 1, {1: [1, 1, 1]}) assert good_topic.name == 'foo' assert good_topic.num_partitions == 1 assert good_topic.replication_factor == 1 assert good_topic.replica_assignments == {1: [1, 1, 1]} assert good_topic.topic_configs == {} - good_topic = kafka.admin.NewTopic('foo', 1, 2) + good_topic = NewTopic('foo', 1, 2) assert good_topic.name == 'foo' assert good_topic.num_partitions == 1 assert good_topic.replication_factor == 2 assert good_topic.replica_assignments == {} assert good_topic.topic_configs == {} - good_topic = kafka.admin.NewTopic('bar', -1, -1, {1: [1, 2, 3]}, {'key': 'value'}) + good_topic = NewTopic('bar', -1, -1, {1: [1, 2, 3]}, {'key': 'value'}) assert good_topic.name == 'bar' assert good_topic.num_partitions == -1 assert good_topic.replication_factor == -1 From a09f3cf55c37452223be2f3873de53db535d4d45 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 16 Apr 2026 14:48:01 -0700 Subject: [PATCH 06/12] Admin: Support AlterUserScramCredentials --- kafka/admin/__init__.py | 6 +- kafka/admin/_configs.py | 5 +- kafka/admin/_users.py | 144 +++++++++++++ kafka/admin/client.py | 6 +- kafka/cli/admin/__init__.py | 3 +- kafka/cli/admin/users/__init__.py | 14 ++ .../users/alter_user_scram_credentials.py | 36 ++++ kafka/protocol/admin/__init__.py | 6 +- kafka/protocol/admin/users.py | 10 + .../AlterUserScramCredentialsRequest.json | 45 ++++ .../AlterUserScramCredentialsResponse.json | 35 +++ test/admin/test_admin_configs.py | 2 +- test/admin/test_admin_users.py | 201 ++++++++++++++++++ test/protocol/admin/test_protocol_admin.py | 39 ++++ 14 files changed, 545 insertions(+), 7 deletions(-) create mode 100644 kafka/admin/_users.py create mode 100644 kafka/cli/admin/users/__init__.py create mode 100644 kafka/cli/admin/users/alter_user_scram_credentials.py create mode 100644 kafka/protocol/admin/users.py create mode 100644 kafka/protocol/schemas/resources/AlterUserScramCredentialsRequest.json create mode 100644 kafka/protocol/schemas/resources/AlterUserScramCredentialsResponse.json create mode 100644 test/admin/test_admin_users.py diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index cc8d29701..9e427affe 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -1,12 +1,16 @@ from kafka.admin.client import KafkaAdminClient from kafka.admin._acls import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation, ResourceType, ACLPermissionType, ACLResourcePatternType) -from kafka.admin._configs import ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType +from kafka.admin._configs import ( + ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType) from kafka.admin._topics import NewTopic, NewPartitions +from kafka.admin._users import ( + ScramMechanism, UserScramCredentialDeletion, UserScramCredentialUpsertion) __all__ = [ 'KafkaAdminClient', 'ACL', 'ACLFilter', 'ACLOperation', 'ACLPermissionType', 'ACLResourcePatternType', 'ResourceType', 'ResourcePattern', 'ResourcePatternFilter', 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType', + 'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion', ] diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index 1402f04c1..c35c98aab 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -11,7 +11,10 @@ from typing import TYPE_CHECKING from kafka.errors import IncompatibleBrokerVersion -from kafka.protocol.admin import AlterConfigsRequest, DescribeConfigsRequest +from kafka.protocol.admin import ( + AlterConfigsRequest, + DescribeConfigsRequest, +) if TYPE_CHECKING: from kafka.net.manager import KafkaConnectionManager diff --git a/kafka/admin/_users.py b/kafka/admin/_users.py new file mode 100644 index 000000000..1bb102590 --- /dev/null +++ b/kafka/admin/_users.py @@ -0,0 +1,144 @@ +"""User management mixin for KafkaAdminClient. + +Also defines ScramMechanism, UserCredentialDeletion, +and UserCredentialUpsertion data classes. +""" + +from __future__ import annotations + +from enum import IntEnum +import hashlib +import logging +import os +from typing import TYPE_CHECKING + +from kafka.errors import IllegalArgumentError +from kafka.protocol.admin import ( + AlterUserScramCredentialsRequest, +) + +if TYPE_CHECKING: + from kafka.net.manager import KafkaConnectionManager + +log = logging.getLogger(__name__) + + +class UserAdminMixin: + """Mixin providing user management methods for KafkaAdminClient.""" + _manager: KafkaConnectionManager + + async def _async_alter_user_scram_credentials(self, alterations): + deletions = [] + upsertions = [] + for alt in alterations: + if isinstance(alt, UserScramCredentialDeletion): + deletions.append((alt.user, int(alt.mechanism))) + elif isinstance(alt, UserScramCredentialUpsertion): + upsertions.append(( + alt.user, + int(alt.mechanism), + alt.iterations, + alt.salt, + alt.salted_password, + )) + else: + raise IllegalArgumentError( + "alterations must be UserScramCredentialDeletion or " + "UserScramCredentialUpsertion, got %s" % type(alt).__name__) + + request = AlterUserScramCredentialsRequest( + deletions=deletions, + upsertions=upsertions, + ) + response = await self._manager.send(request) + + ret = {} + for result in response.results: + ret[result.user] = result.error_message if result.error_code else None + return ret + + def alter_user_scram_credentials(self, alterations): + """Alter SCRAM credentials for one or more users. + + Arguments: + alterations: A list of UserScramCredentialDeletion and/or + UserScramCredentialUpsertion objects describing the + credentials to delete and/or insert/update. + + Returns: + A dict mapping user name -> error message (or None on success). + """ + return self._manager.run(self._async_alter_user_scram_credentials, alterations) + + +class ScramMechanism(IntEnum): + UNKNOWN = 0 + SCRAM_SHA_256 = 1 + SCRAM_SHA_512 = 2 + + @property + def hash_name(self): + return { + ScramMechanism.SCRAM_SHA_256: 'sha256', + ScramMechanism.SCRAM_SHA_512: 'sha512', + }[self] + + +class UserScramCredentialDeletion: + """Specifies that a SCRAM credential should be deleted. + + Arguments: + user (str): The user name. + mechanism (ScramMechanism or int or str): The SCRAM mechanism to + delete for this user. + """ + def __init__(self, user, mechanism): + if not isinstance(mechanism, ScramMechanism): + if isinstance(mechanism, str): + mechanism = ScramMechanism[mechanism.upper().replace('-', '_')] + else: + mechanism = ScramMechanism(mechanism) + self.user = user + self.mechanism = mechanism + + def __repr__(self): + return f"UserScramCredentialDeletion({self.user}, {self.mechanism.name})" + + +class UserScramCredentialUpsertion: + """Specifies that a SCRAM credential should be inserted or updated. + + Arguments: + user (str): The user name. + mechanism (ScramMechanism or int or str): The SCRAM mechanism. + password (bytes or str): The plaintext password. The salted + password sent to the broker is derived via PBKDF2-HMAC using + the given salt and iteration count. + + Keyword Arguments: + iterations (int, optional): PBKDF2 iteration count. Default: 4096. + salt (bytes, optional): Salt to use. If omitted, a random 24-byte + salt is generated. + """ + DEFAULT_ITERATIONS = 4096 + + def __init__(self, user, mechanism, password, iterations=None, salt=None): + if not isinstance(mechanism, ScramMechanism): + if isinstance(mechanism, str): + mechanism = ScramMechanism[mechanism.upper().replace('-', '_')] + else: + mechanism = ScramMechanism(mechanism) + if mechanism == ScramMechanism.UNKNOWN: + raise IllegalArgumentError("SCRAM mechanism must not be UNKNOWN") + self.user = user + self.mechanism = mechanism + self.iterations = iterations if iterations is not None else self.DEFAULT_ITERATIONS + self.salt = salt if salt is not None else os.urandom(24) + if isinstance(password, str): + password = password.encode('utf-8') + self.salted_password = hashlib.pbkdf2_hmac( + mechanism.hash_name, password, self.salt, self.iterations) + + def __repr__(self): + return (f"UserScramCredentialUpsertion({self.user}, " + f"{self.mechanism.name}, iterations={self.iterations})") diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 642b6ef5f..18c315fe7 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -19,17 +19,19 @@ from kafka.admin._groups import GroupAdminMixin from kafka.admin._records import RecordAdminMixin from kafka.admin._topics import TopicAdminMixin +from kafka.admin._users import UserAdminMixin log = logging.getLogger(__name__) class KafkaAdminClient( - TopicAdminMixin, - ClusterAdminMixin, ACLAdminMixin, + ClusterAdminMixin, ConfigAdminMixin, GroupAdminMixin, RecordAdminMixin, + TopicAdminMixin, + UserAdminMixin, ): """A class for administering the Kafka cluster. diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index e18ff96bd..c013369a8 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -9,6 +9,7 @@ from .configs import ConfigsSubCommand from .consumer_groups import ConsumerGroupsSubCommand from .topics import TopicsSubCommand +from .users import UsersSubCommand from ..common import add_common_cli_args def main_parser(): @@ -48,7 +49,7 @@ def run_cli(args=None): parser = main_parser() subparsers = parser.add_subparsers(help='subcommands') for cmd in [ACLsSubCommand, ClusterSubCommand, ConfigsSubCommand, - TopicsSubCommand, ConsumerGroupsSubCommand]: + TopicsSubCommand, ConsumerGroupsSubCommand, UsersSubCommand]: cmd.add_subparser(subparsers) config = parser.parse_args(args) diff --git a/kafka/cli/admin/users/__init__.py b/kafka/cli/admin/users/__init__.py new file mode 100644 index 000000000..915716aa4 --- /dev/null +++ b/kafka/cli/admin/users/__init__.py @@ -0,0 +1,14 @@ +import sys + +from .alter_user_scram_credentials import AlterUserScramCredentials + + +class UsersSubCommand: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('users', help='Manage Kafka Users') + commands = parser.add_subparsers() + for cmd in [AlterUserScramCredentials]: + cmd.add_subparser(commands) + parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/users/alter_user_scram_credentials.py b/kafka/cli/admin/users/alter_user_scram_credentials.py new file mode 100644 index 000000000..379c8c737 --- /dev/null +++ b/kafka/cli/admin/users/alter_user_scram_credentials.py @@ -0,0 +1,36 @@ +from kafka.admin import ( + ScramMechanism, + UserScramCredentialDeletion, + UserScramCredentialUpsertion, +) + + +class AlterUserScramCredentials: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'alter-scram-credentials', + help='Alter SCRAM credentials for Kafka users') + parser.add_argument( + '--delete', type=str, action='append', dest='deletions', default=[], + help='USER:MECHANISM pair to delete (e.g. alice:SCRAM-SHA-256)') + parser.add_argument( + '--upsert', type=str, action='append', dest='upsertions', default=[], + help='USER:MECHANISM:PASSWORD triple to insert or update') + parser.add_argument( + '--iterations', type=int, default=None, + help='PBKDF2 iteration count for upsertions (default: 4096)') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + alterations = [] + for spec in args.deletions: + user, mechanism = spec.split(':', 1) + alterations.append(UserScramCredentialDeletion(user, mechanism)) + for spec in args.upsertions: + user, mechanism, password = spec.split(':', 2) + alterations.append(UserScramCredentialUpsertion( + user, mechanism, password, iterations=args.iterations)) + return client.alter_user_scram_credentials(alterations) diff --git a/kafka/protocol/admin/__init__.py b/kafka/protocol/admin/__init__.py index 17bb2f76b..8c7bf050d 100644 --- a/kafka/protocol/admin/__init__.py +++ b/kafka/protocol/admin/__init__.py @@ -16,4 +16,8 @@ from .topics import * from .topics import __all__ as topics_all -__all__ = acl_all + client_quotas_all + cluster_all + configs_all + groups_all + topics_all +from .users import * +from .users import __all__ as users_all + + +__all__ = acl_all + client_quotas_all + cluster_all + configs_all + groups_all + topics_all + users_all diff --git a/kafka/protocol/admin/users.py b/kafka/protocol/admin/users.py new file mode 100644 index 000000000..6c2fd303f --- /dev/null +++ b/kafka/protocol/admin/users.py @@ -0,0 +1,10 @@ +from ..api_message import ApiMessage + + +class AlterUserScramCredentialsRequest(ApiMessage): pass +class AlterUserScramCredentialsResponse(ApiMessage): pass + + +__all__ = [ + 'AlterUserScramCredentialsRequest', 'AlterUserScramCredentialsResponse', +] diff --git a/kafka/protocol/schemas/resources/AlterUserScramCredentialsRequest.json b/kafka/protocol/schemas/resources/AlterUserScramCredentialsRequest.json new file mode 100644 index 000000000..ea687072f --- /dev/null +++ b/kafka/protocol/schemas/resources/AlterUserScramCredentialsRequest.json @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 51, + "type": "request", + "listeners": ["broker", "controller"], + "name": "AlterUserScramCredentialsRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "Deletions", "type": "[]ScramCredentialDeletion", "versions": "0+", + "about": "The SCRAM credentials to remove.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", + "about": "The user name." }, + { "name": "Mechanism", "type": "int8", "versions": "0+", + "about": "The SCRAM mechanism." } + ]}, + { "name": "Upsertions", "type": "[]ScramCredentialUpsertion", "versions": "0+", + "about": "The SCRAM credentials to update/insert.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", + "about": "The user name." }, + { "name": "Mechanism", "type": "int8", "versions": "0+", + "about": "The SCRAM mechanism." }, + { "name": "Iterations", "type": "int32", "versions": "0+", + "about": "The number of iterations." }, + { "name": "Salt", "type": "bytes", "versions": "0+", + "about": "A random salt generated by the client." }, + { "name": "SaltedPassword", "type": "bytes", "versions": "0+", + "about": "The salted password." } + ]} + ] +} diff --git a/kafka/protocol/schemas/resources/AlterUserScramCredentialsResponse.json b/kafka/protocol/schemas/resources/AlterUserScramCredentialsResponse.json new file mode 100644 index 000000000..92b62d52a --- /dev/null +++ b/kafka/protocol/schemas/resources/AlterUserScramCredentialsResponse.json @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 51, + "type": "response", + "name": "AlterUserScramCredentialsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Results", "type": "[]AlterUserScramCredentialsResult", "versions": "0+", + "about": "The results for deletions and alterations, one per affected user.", "fields": [ + { "name": "User", "type": "string", "versions": "0+", + "about": "The user name." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The error message, if any." } + ]} + ] +} diff --git a/test/admin/test_admin_configs.py b/test/admin/test_admin_configs.py index 9abe90047..cc661e46b 100644 --- a/test/admin/test_admin_configs.py +++ b/test/admin/test_admin_configs.py @@ -1,6 +1,6 @@ import pytest -from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType +from kafka.admin import ConfigResource, ConfigResourceType def test_config_resource(): diff --git a/test/admin/test_admin_users.py b/test/admin/test_admin_users.py new file mode 100644 index 000000000..184dfad6f --- /dev/null +++ b/test/admin/test_admin_users.py @@ -0,0 +1,201 @@ +import hashlib + +import pytest + +from kafka.admin import ( + KafkaAdminClient, ScramMechanism, + UserScramCredentialDeletion, UserScramCredentialUpsertion, +) +from kafka.errors import IllegalArgumentError +from kafka.protocol.admin import ( + AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, +) + +from test.mock_broker import MockBroker + + +class TestScramMechanism: + + def test_hash_name(self): + assert ScramMechanism.SCRAM_SHA_256.hash_name == 'sha256' + assert ScramMechanism.SCRAM_SHA_512.hash_name == 'sha512' + + +class TestUserScramCredentialDeletion: + + @pytest.mark.parametrize("mechanism", [ + ScramMechanism.SCRAM_SHA_256, + 1, + 'SCRAM_SHA_256', + 'scram-sha-256', + ]) + def test_mechanism_accepts_enum_int_and_str(self, mechanism): + deletion = UserScramCredentialDeletion('alice', mechanism) + assert deletion.user == 'alice' + assert deletion.mechanism is ScramMechanism.SCRAM_SHA_256 + + def test_invalid_mechanism_int(self): + with pytest.raises(ValueError): + UserScramCredentialDeletion('alice', 99) + + +class TestUserScramCredentialUpsertion: + + def test_default_iterations_and_random_salt(self): + up = UserScramCredentialUpsertion( + 'alice', ScramMechanism.SCRAM_SHA_256, 'password') + assert up.user == 'alice' + assert up.mechanism is ScramMechanism.SCRAM_SHA_256 + assert up.iterations == UserScramCredentialUpsertion.DEFAULT_ITERATIONS + assert isinstance(up.salt, bytes) and len(up.salt) == 24 + # salt is random -- two instances should (almost certainly) differ + other = UserScramCredentialUpsertion( + 'alice', ScramMechanism.SCRAM_SHA_256, 'password') + assert up.salt != other.salt + + def test_salted_password_matches_pbkdf2(self): + salt = b'fixed-salt-bytes' + up = UserScramCredentialUpsertion( + 'alice', ScramMechanism.SCRAM_SHA_512, + password='password', iterations=1024, salt=salt) + expected = hashlib.pbkdf2_hmac('sha512', b'password', salt, 1024) + assert up.salted_password == expected + assert up.salt == salt + assert up.iterations == 1024 + + def test_password_accepts_bytes(self): + up = UserScramCredentialUpsertion( + 'alice', ScramMechanism.SCRAM_SHA_256, + password=b'password', iterations=512, salt=b'salt') + expected = hashlib.pbkdf2_hmac('sha256', b'password', b'salt', 512) + assert up.salted_password == expected + + def test_mechanism_string_normalization(self): + up = UserScramCredentialUpsertion( + 'alice', 'SCRAM-SHA-512', password='p', iterations=1, salt=b's') + assert up.mechanism is ScramMechanism.SCRAM_SHA_512 + + def test_unknown_mechanism_rejected(self): + with pytest.raises(IllegalArgumentError): + UserScramCredentialUpsertion( + 'alice', ScramMechanism.UNKNOWN, password='p') + + +# --------------------------------------------------------------------------- +# MockBroker tests exercising the full wire round-trip +# --------------------------------------------------------------------------- + + +def _make_admin(broker): + return KafkaAdminClient( + kafka_client=broker.client_factory(), + bootstrap_servers='%s:%d' % (broker.host, broker.port), + api_version=broker.broker_version, + request_timeout_ms=5000, + ) + + +class TestAlterUserScramCredentialsMockBroker: + + def test_all_success_returns_none_values(self): + broker = MockBroker() + Result = AlterUserScramCredentialsResponse.AlterUserScramCredentialsResult + broker.respond( + AlterUserScramCredentialsRequest, + AlterUserScramCredentialsResponse( + throttle_time_ms=0, + results=[ + Result(user='alice', error_code=0, error_message=None), + Result(user='bob', error_code=0, error_message=None), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.alter_user_scram_credentials([ + UserScramCredentialDeletion('alice', ScramMechanism.SCRAM_SHA_256), + UserScramCredentialUpsertion( + 'bob', ScramMechanism.SCRAM_SHA_512, + password='secret', iterations=4096, salt=b'fixed-salt'), + ]) + finally: + admin.close() + + assert result == {'alice': None, 'bob': None} + + def test_partial_errors_returned_in_dict(self): + broker = MockBroker() + Result = AlterUserScramCredentialsResponse.AlterUserScramCredentialsResult + broker.respond( + AlterUserScramCredentialsRequest, + AlterUserScramCredentialsResponse( + throttle_time_ms=0, + results=[ + Result(user='alice', error_code=0, error_message=None), + Result(user='bob', error_code=58, + error_message='Unsupported SASL mechanism'), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.alter_user_scram_credentials([ + UserScramCredentialDeletion('alice', ScramMechanism.SCRAM_SHA_256), + UserScramCredentialDeletion('bob', ScramMechanism.SCRAM_SHA_512), + ]) + finally: + admin.close() + + assert result == { + 'alice': None, + 'bob': 'Unsupported SASL mechanism', + } + + def test_request_is_encoded_with_deletions_and_upsertions(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + decoded = AlterUserScramCredentialsRequest.decode( + request_bytes, version=api_version, header=True) + captured['request'] = decoded + Result = AlterUserScramCredentialsResponse.AlterUserScramCredentialsResult + return AlterUserScramCredentialsResponse( + throttle_time_ms=0, + results=[ + Result(user='alice', error_code=0, error_message=None), + Result(user='bob', error_code=0, error_message=None), + ], + ) + + broker.respond_fn(AlterUserScramCredentialsRequest, handler) + + salt = b'fixed-salt-bytes' + upsertion = UserScramCredentialUpsertion( + 'bob', ScramMechanism.SCRAM_SHA_512, + password='secret', iterations=2048, salt=salt) + + admin = _make_admin(broker) + try: + admin.alter_user_scram_credentials([ + UserScramCredentialDeletion('alice', ScramMechanism.SCRAM_SHA_256), + upsertion, + ]) + finally: + admin.close() + + request = captured['request'] + assert len(request.deletions) == 1 + assert request.deletions[0].name == 'alice' + assert request.deletions[0].mechanism == int(ScramMechanism.SCRAM_SHA_256) + + assert len(request.upsertions) == 1 + ups = request.upsertions[0] + assert ups.name == 'bob' + assert ups.mechanism == int(ScramMechanism.SCRAM_SHA_512) + assert ups.iterations == 2048 + assert ups.salt == salt + assert ups.salted_password == hashlib.pbkdf2_hmac( + 'sha512', b'secret', salt, 2048) diff --git a/test/protocol/admin/test_protocol_admin.py b/test/protocol/admin/test_protocol_admin.py index f9915f4e9..867e1ff11 100644 --- a/test/protocol/admin/test_protocol_admin.py +++ b/test/protocol/admin/test_protocol_admin.py @@ -12,6 +12,7 @@ DescribeClusterRequest, DescribeClusterResponse, DescribeConfigsRequest, DescribeConfigsResponse, AlterConfigsRequest, AlterConfigsResponse, + AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, CreateAclsRequest, CreateAclsResponse, DeleteAclsRequest, DeleteAclsResponse, DescribeAclsRequest, DescribeAclsResponse, @@ -394,6 +395,44 @@ def test_alter_configs_response_roundtrip(version): assert decoded == response +@pytest.mark.parametrize("version", range(AlterUserScramCredentialsRequest.min_version, AlterUserScramCredentialsRequest.max_version + 1)) +def test_alter_user_scram_credentials_request_roundtrip(version): + Deletion = AlterUserScramCredentialsRequest.ScramCredentialDeletion + Upsertion = AlterUserScramCredentialsRequest.ScramCredentialUpsertion + request = AlterUserScramCredentialsRequest( + deletions=[ + Deletion(name='alice', mechanism=1), + ], + upsertions=[ + Upsertion( + name='bob', + mechanism=2, + iterations=8192, + salt=b'\x00\x01\x02\x03', + salted_password=b'\xaa\xbb\xcc\xdd', + ), + ], + ) + encoded = request.encode(version=version) + decoded = AlterUserScramCredentialsRequest.decode(encoded, version=version) + assert decoded == request + + +@pytest.mark.parametrize("version", range(AlterUserScramCredentialsResponse.min_version, AlterUserScramCredentialsResponse.max_version + 1)) +def test_alter_user_scram_credentials_response_roundtrip(version): + Result = AlterUserScramCredentialsResponse.AlterUserScramCredentialsResult + response = AlterUserScramCredentialsResponse( + throttle_time_ms=123, + results=[ + Result(user='alice', error_code=0, error_message=None), + Result(user='bob', error_code=58, error_message='bad mechanism'), + ], + ) + encoded = response.encode(version=version) + decoded = AlterUserScramCredentialsResponse.decode(encoded, version=version) + assert decoded == response + + @pytest.mark.parametrize("version", range(CreateAclsRequest.min_version, CreateAclsRequest.max_version + 1)) def test_create_acls_request_roundtrip(version): Creation = CreateAclsRequest.AclCreation From ad6e90839f89d7a7080efdfea0c18d812c9233ef Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 16 Apr 2026 16:08:24 -0700 Subject: [PATCH 07/12] Admin: DescribeUserScramCredentials --- kafka/admin/_users.py | 50 ++++++ kafka/cli/admin/users/__init__.py | 3 +- .../users/describe_user_scram_credentials.py | 17 ++ kafka/protocol/admin/users.py | 4 + .../DescribeUserScramCredentialsRequest.json | 30 ++++ .../DescribeUserScramCredentialsResponse.json | 45 ++++++ test/admin/test_admin_users.py | 150 ++++++++++++++++++ test/protocol/admin/test_protocol_admin.py | 44 +++++ 8 files changed, 342 insertions(+), 1 deletion(-) create mode 100644 kafka/cli/admin/users/describe_user_scram_credentials.py create mode 100644 kafka/protocol/schemas/resources/DescribeUserScramCredentialsRequest.json create mode 100644 kafka/protocol/schemas/resources/DescribeUserScramCredentialsResponse.json diff --git a/kafka/admin/_users.py b/kafka/admin/_users.py index 1bb102590..ddcb3cb1a 100644 --- a/kafka/admin/_users.py +++ b/kafka/admin/_users.py @@ -12,9 +12,11 @@ import os from typing import TYPE_CHECKING +import kafka.errors as Errors from kafka.errors import IllegalArgumentError from kafka.protocol.admin import ( AlterUserScramCredentialsRequest, + DescribeUserScramCredentialsRequest, ) if TYPE_CHECKING: @@ -70,6 +72,54 @@ def alter_user_scram_credentials(self, alterations): """ return self._manager.run(self._async_alter_user_scram_credentials, alterations) + async def _async_describe_user_scram_credentials(self, users=None): + if users is None: + users_field = None + else: + users_field = [(user,) for user in users] + request = DescribeUserScramCredentialsRequest(users=users_field) + response = await self._manager.send(request) + + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + raise error_type( + "DescribeUserScramCredentialsRequest failed: %s" + % (response.error_message,)) + + ret = {} + for result in response.results: + if result.error_code: + ret[result.user] = { + 'error': result.error_message, + 'credential_infos': [], + } + else: + ret[result.user] = { + 'error': None, + 'credential_infos': [ + { + 'mechanism': ScramMechanism(ci.mechanism), + 'iterations': ci.iterations, + } + for ci in result.credential_infos + ], + } + return ret + + def describe_user_scram_credentials(self, users=None): + """Describe SCRAM credentials for one or more users. + + Arguments: + users (list of str, optional): User names to describe. If None, + describe all users with SCRAM credentials. + + Returns: + A dict mapping user name to a dict with keys + ``'error'`` (None or error message) and ``'credential_infos'`` + (list of {'mechanism': ScramMechanism, 'iterations': int}). + """ + return self._manager.run(self._async_describe_user_scram_credentials, users) + class ScramMechanism(IntEnum): UNKNOWN = 0 diff --git a/kafka/cli/admin/users/__init__.py b/kafka/cli/admin/users/__init__.py index 915716aa4..d856e777b 100644 --- a/kafka/cli/admin/users/__init__.py +++ b/kafka/cli/admin/users/__init__.py @@ -1,6 +1,7 @@ import sys from .alter_user_scram_credentials import AlterUserScramCredentials +from .describe_user_scram_credentials import DescribeUserScramCredentials class UsersSubCommand: @@ -9,6 +10,6 @@ class UsersSubCommand: def add_subparser(cls, subparsers): parser = subparsers.add_parser('users', help='Manage Kafka Users') commands = parser.add_subparsers() - for cmd in [AlterUserScramCredentials]: + for cmd in [DescribeUserScramCredentials, AlterUserScramCredentials]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/users/describe_user_scram_credentials.py b/kafka/cli/admin/users/describe_user_scram_credentials.py new file mode 100644 index 000000000..01a8eb537 --- /dev/null +++ b/kafka/cli/admin/users/describe_user_scram_credentials.py @@ -0,0 +1,17 @@ +class DescribeUserScramCredentials: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'describe-scram-credentials', + help='Describe SCRAM credentials for Kafka users') + parser.add_argument( + '--user', type=str, action='append', dest='users', default=[], + help='User name to describe (repeatable). ' + 'If omitted, describes all users.') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + users = args.users if args.users else None + return client.describe_user_scram_credentials(users) diff --git a/kafka/protocol/admin/users.py b/kafka/protocol/admin/users.py index 6c2fd303f..69309fff4 100644 --- a/kafka/protocol/admin/users.py +++ b/kafka/protocol/admin/users.py @@ -4,7 +4,11 @@ class AlterUserScramCredentialsRequest(ApiMessage): pass class AlterUserScramCredentialsResponse(ApiMessage): pass +class DescribeUserScramCredentialsRequest(ApiMessage): pass +class DescribeUserScramCredentialsResponse(ApiMessage): pass + __all__ = [ 'AlterUserScramCredentialsRequest', 'AlterUserScramCredentialsResponse', + 'DescribeUserScramCredentialsRequest', 'DescribeUserScramCredentialsResponse', ] diff --git a/kafka/protocol/schemas/resources/DescribeUserScramCredentialsRequest.json b/kafka/protocol/schemas/resources/DescribeUserScramCredentialsRequest.json new file mode 100644 index 000000000..cde4b7cc8 --- /dev/null +++ b/kafka/protocol/schemas/resources/DescribeUserScramCredentialsRequest.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "request", + "listeners": ["broker", "controller"], + "name": "DescribeUserScramCredentialsRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "Users", "type": "[]UserName", "versions": "0+", "nullableVersions": "0+", + "about": "The users to describe, or null/empty to describe all users.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", + "about": "The user name." } + ]} + ] +} diff --git a/kafka/protocol/schemas/resources/DescribeUserScramCredentialsResponse.json b/kafka/protocol/schemas/resources/DescribeUserScramCredentialsResponse.json new file mode 100644 index 000000000..9e8b03528 --- /dev/null +++ b/kafka/protocol/schemas/resources/DescribeUserScramCredentialsResponse.json @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "response", + "name": "DescribeUserScramCredentialsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The message-level error code, 0 except for user authorization or infrastructure issues." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The message-level error message, if any." }, + { "name": "Results", "type": "[]DescribeUserScramCredentialsResult", "versions": "0+", + "about": "The results for descriptions, one per user.", "fields": [ + { "name": "User", "type": "string", "versions": "0+", + "about": "The user name." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The user-level error code." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The user-level error message, if any." }, + { "name": "CredentialInfos", "type": "[]CredentialInfo", "versions": "0+", + "about": "The mechanism and related information associated with the user's SCRAM credentials.", "fields": [ + { "name": "Mechanism", "type": "int8", "versions": "0+", + "about": "The SCRAM mechanism." }, + { "name": "Iterations", "type": "int32", "versions": "0+", + "about": "The number of iterations used in the SCRAM credential." }]} + ]} + ] +} diff --git a/test/admin/test_admin_users.py b/test/admin/test_admin_users.py index 184dfad6f..5e47f2359 100644 --- a/test/admin/test_admin_users.py +++ b/test/admin/test_admin_users.py @@ -9,6 +9,7 @@ from kafka.errors import IllegalArgumentError from kafka.protocol.admin import ( AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, + DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse, ) from test.mock_broker import MockBroker @@ -199,3 +200,152 @@ def handler(api_key, api_version, correlation_id, request_bytes): assert ups.salt == salt assert ups.salted_password == hashlib.pbkdf2_hmac( 'sha512', b'secret', salt, 2048) + + +class TestDescribeUserScramCredentialsMockBroker: + + def test_returns_credentials_per_user(self): + broker = MockBroker() + Result = DescribeUserScramCredentialsResponse.DescribeUserScramCredentialsResult + CI = Result.CredentialInfo + broker.respond( + DescribeUserScramCredentialsRequest, + DescribeUserScramCredentialsResponse( + throttle_time_ms=0, + error_code=0, + error_message=None, + results=[ + Result(user='alice', error_code=0, error_message=None, + credential_infos=[ + CI(mechanism=1, iterations=4096), + CI(mechanism=2, iterations=8192), + ]), + Result(user='bob', error_code=0, error_message=None, + credential_infos=[CI(mechanism=2, iterations=16384)]), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.describe_user_scram_credentials(['alice', 'bob']) + finally: + admin.close() + + assert result == { + 'alice': { + 'error': None, + 'credential_infos': [ + {'mechanism': ScramMechanism.SCRAM_SHA_256, 'iterations': 4096}, + {'mechanism': ScramMechanism.SCRAM_SHA_512, 'iterations': 8192}, + ], + }, + 'bob': { + 'error': None, + 'credential_infos': [ + {'mechanism': ScramMechanism.SCRAM_SHA_512, 'iterations': 16384}, + ], + }, + } + + def test_per_user_error_reported(self): + broker = MockBroker() + Result = DescribeUserScramCredentialsResponse.DescribeUserScramCredentialsResult + broker.respond( + DescribeUserScramCredentialsRequest, + DescribeUserScramCredentialsResponse( + throttle_time_ms=0, + error_code=0, + error_message=None, + results=[ + Result(user='missing', error_code=68, + error_message='resource not found', + credential_infos=[]), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.describe_user_scram_credentials(['missing']) + finally: + admin.close() + + assert result == { + 'missing': { + 'error': 'resource not found', + 'credential_infos': [], + }, + } + + def test_top_level_error_raises(self): + broker = MockBroker() + broker.respond( + DescribeUserScramCredentialsRequest, + DescribeUserScramCredentialsResponse( + throttle_time_ms=0, + error_code=58, # UnsupportedSaslMechanismError + error_message='SCRAM not configured', + results=[], + ), + ) + + admin = _make_admin(broker) + try: + with pytest.raises(Exception) as exc_info: + admin.describe_user_scram_credentials(['alice']) + assert 'SCRAM not configured' in str(exc_info.value) + finally: + admin.close() + + def test_describe_all_users_sends_null(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + decoded = DescribeUserScramCredentialsRequest.decode( + request_bytes, version=api_version, header=True) + captured['request'] = decoded + return DescribeUserScramCredentialsResponse( + throttle_time_ms=0, + error_code=0, + error_message=None, + results=[], + ) + + broker.respond_fn(DescribeUserScramCredentialsRequest, handler) + + admin = _make_admin(broker) + try: + result = admin.describe_user_scram_credentials() + finally: + admin.close() + + assert result == {} + assert captured['request'].users is None + + def test_describe_specific_users_encodes_names(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + decoded = DescribeUserScramCredentialsRequest.decode( + request_bytes, version=api_version, header=True) + captured['request'] = decoded + return DescribeUserScramCredentialsResponse( + throttle_time_ms=0, + error_code=0, + error_message=None, + results=[], + ) + + broker.respond_fn(DescribeUserScramCredentialsRequest, handler) + + admin = _make_admin(broker) + try: + admin.describe_user_scram_credentials(['alice', 'bob']) + finally: + admin.close() + + request_users = captured['request'].users + assert [u.name for u in request_users] == ['alice', 'bob'] diff --git a/test/protocol/admin/test_protocol_admin.py b/test/protocol/admin/test_protocol_admin.py index 867e1ff11..5900ced3b 100644 --- a/test/protocol/admin/test_protocol_admin.py +++ b/test/protocol/admin/test_protocol_admin.py @@ -13,6 +13,7 @@ DescribeConfigsRequest, DescribeConfigsResponse, AlterConfigsRequest, AlterConfigsResponse, AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, + DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse, CreateAclsRequest, CreateAclsResponse, DeleteAclsRequest, DeleteAclsResponse, DescribeAclsRequest, DescribeAclsResponse, @@ -433,6 +434,49 @@ def test_alter_user_scram_credentials_response_roundtrip(version): assert decoded == response +@pytest.mark.parametrize("version", range(DescribeUserScramCredentialsRequest.min_version, DescribeUserScramCredentialsRequest.max_version + 1)) +def test_describe_user_scram_credentials_request_roundtrip(version): + User = DescribeUserScramCredentialsRequest.UserName + request = DescribeUserScramCredentialsRequest( + users=[User(name='alice'), User(name='bob')], + ) + encoded = request.encode(version=version) + decoded = DescribeUserScramCredentialsRequest.decode(encoded, version=version) + assert decoded == request + + +@pytest.mark.parametrize("version", range(DescribeUserScramCredentialsRequest.min_version, DescribeUserScramCredentialsRequest.max_version + 1)) +def test_describe_user_scram_credentials_request_null_users(version): + # null Users means "describe all users" + request = DescribeUserScramCredentialsRequest(users=None) + encoded = request.encode(version=version) + decoded = DescribeUserScramCredentialsRequest.decode(encoded, version=version) + assert decoded == request + + +@pytest.mark.parametrize("version", range(DescribeUserScramCredentialsResponse.min_version, DescribeUserScramCredentialsResponse.max_version + 1)) +def test_describe_user_scram_credentials_response_roundtrip(version): + Result = DescribeUserScramCredentialsResponse.DescribeUserScramCredentialsResult + CredentialInfo = Result.CredentialInfo + response = DescribeUserScramCredentialsResponse( + throttle_time_ms=7, + error_code=0, + error_message=None, + results=[ + Result(user='alice', error_code=0, error_message=None, + credential_infos=[ + CredentialInfo(mechanism=1, iterations=4096), + CredentialInfo(mechanism=2, iterations=8192), + ]), + Result(user='bob', error_code=68, error_message='resource not found', + credential_infos=[]), + ], + ) + encoded = response.encode(version=version) + decoded = DescribeUserScramCredentialsResponse.decode(encoded, version=version) + assert decoded == response + + @pytest.mark.parametrize("version", range(CreateAclsRequest.min_version, CreateAclsRequest.max_version + 1)) def test_create_acls_request_roundtrip(version): Creation = CreateAclsRequest.AclCreation From 2d25565e516eb75c1863455a25ea7b5e5f0506f4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 16 Apr 2026 16:51:31 -0700 Subject: [PATCH 08/12] acl cli fixups --- kafka/cli/admin/acls/common.py | 4 ++-- kafka/cli/admin/acls/create.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/cli/admin/acls/common.py b/kafka/cli/admin/acls/common.py index b94ef22c0..25cf357ec 100644 --- a/kafka/cli/admin/acls/common.py +++ b/kafka/cli/admin/acls/common.py @@ -1,6 +1,6 @@ from kafka.admin._acls import ( - ACLFilter, ACLOperation, ACLPermissionType, ACLResourcePatternType, - ResourceType, ResourcePatternFilter + ACL, ACLFilter, ACLOperation, ACLPermissionType, ACLResourcePatternType, + ResourceType, ResourcePattern, ResourcePatternFilter ) diff --git a/kafka/cli/admin/acls/create.py b/kafka/cli/admin/acls/create.py index 3230e66bb..b5c7b0ca8 100644 --- a/kafka/cli/admin/acls/create.py +++ b/kafka/cli/admin/acls/create.py @@ -11,7 +11,7 @@ def add_subparser(cls, subparsers): @classmethod def command(cls, client, args): - acls = acl_from_args(args) + acl = acl_from_args(args) result = client.create_acls([acl]) return { 'succeeded': [repr(a) for a in result['succeeded']], From d704eb0cf8509998359a250d6acf49ed372557f1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 16 Apr 2026 16:57:25 -0700 Subject: [PATCH 09/12] regen stubs --- kafka/protocol/admin/cluster.pyi | 363 +------------------------------ kafka/protocol/admin/topics.pyi | 114 +++++++++- 2 files changed, 114 insertions(+), 363 deletions(-) diff --git a/kafka/protocol/admin/cluster.pyi b/kafka/protocol/admin/cluster.pyi index e29d5bfdd..d2686cc45 100644 --- a/kafka/protocol/admin/cluster.pyi +++ b/kafka/protocol/admin/cluster.pyi @@ -2,11 +2,10 @@ import uuid from typing import Any, Self -from enum import IntEnum from kafka.protocol.api_message import ApiMessage from kafka.protocol.data_container import DataContainer -__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeConfigsRequest', 'DescribeConfigsResponse', 'AlterConfigsRequest', 'AlterConfigsResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse', 'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType'] +__all__ = ['DescribeClusterRequest', 'DescribeClusterResponse', 'DescribeLogDirsRequest', 'DescribeLogDirsResponse'] class DescribeClusterRequest(ApiMessage): include_cluster_authorized_operations: bool @@ -99,255 +98,6 @@ class DescribeClusterResponse(ApiMessage): def expect_response(self) -> bool: ... def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... -class DescribeConfigsRequest(ApiMessage): - class DescribeConfigsResource(DataContainer): - resource_type: int - resource_name: str - configuration_keys: list[str] | None - def __init__( - self, - *args: Any, - resource_type: int = ..., - resource_name: str = ..., - configuration_keys: list[str] | None = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - resources: list[DescribeConfigsResource] - include_synonyms: bool - include_documentation: bool - def __init__( - self, - *args: Any, - resources: list[DescribeConfigsResource] = ..., - include_synonyms: bool = ..., - include_documentation: bool = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - name: str - type: str - API_KEY: int - API_VERSION: int - valid_versions: tuple[int, int] - min_version: int - max_version: int - @property - def header(self) -> Any: ... - @classmethod - def is_request(cls) -> bool: ... - def expect_response(self) -> bool: ... - def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... - -class DescribeConfigsResponse(ApiMessage): - class DescribeConfigsResult(DataContainer): - class DescribeConfigsResourceResult(DataContainer): - class DescribeConfigsSynonym(DataContainer): - name: str - value: str | None - source: int - def __init__( - self, - *args: Any, - name: str = ..., - value: str | None = ..., - source: int = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - name: str - value: str | None - read_only: bool - config_source: int - is_default: bool - is_sensitive: bool - synonyms: list[DescribeConfigsSynonym] - config_type: int - documentation: str | None - def __init__( - self, - *args: Any, - name: str = ..., - value: str | None = ..., - read_only: bool = ..., - config_source: int = ..., - is_default: bool = ..., - is_sensitive: bool = ..., - synonyms: list[DescribeConfigsSynonym] = ..., - config_type: int = ..., - documentation: str | None = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - error_code: int - error_message: str | None - resource_type: int - resource_name: str - configs: list[DescribeConfigsResourceResult] - def __init__( - self, - *args: Any, - error_code: int = ..., - error_message: str | None = ..., - resource_type: int = ..., - resource_name: str = ..., - configs: list[DescribeConfigsResourceResult] = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - throttle_time_ms: int - results: list[DescribeConfigsResult] - def __init__( - self, - *args: Any, - throttle_time_ms: int = ..., - results: list[DescribeConfigsResult] = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - name: str - type: str - API_KEY: int - API_VERSION: int - valid_versions: tuple[int, int] - min_version: int - max_version: int - @property - def header(self) -> Any: ... - @classmethod - def is_request(cls) -> bool: ... - def expect_response(self) -> bool: ... - def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... - -class AlterConfigsRequest(ApiMessage): - class AlterConfigsResource(DataContainer): - class AlterableConfig(DataContainer): - name: str - value: str | None - def __init__( - self, - *args: Any, - name: str = ..., - value: str | None = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - resource_type: int - resource_name: str - configs: list[AlterableConfig] - def __init__( - self, - *args: Any, - resource_type: int = ..., - resource_name: str = ..., - configs: list[AlterableConfig] = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - resources: list[AlterConfigsResource] - validate_only: bool - def __init__( - self, - *args: Any, - resources: list[AlterConfigsResource] = ..., - validate_only: bool = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - name: str - type: str - API_KEY: int - API_VERSION: int - valid_versions: tuple[int, int] - min_version: int - max_version: int - @property - def header(self) -> Any: ... - @classmethod - def is_request(cls) -> bool: ... - def expect_response(self) -> bool: ... - def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... - -class AlterConfigsResponse(ApiMessage): - class AlterConfigsResourceResponse(DataContainer): - error_code: int - error_message: str | None - resource_type: int - resource_name: str - def __init__( - self, - *args: Any, - error_code: int = ..., - error_message: str | None = ..., - resource_type: int = ..., - resource_name: str = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - throttle_time_ms: int - responses: list[AlterConfigsResourceResponse] - def __init__( - self, - *args: Any, - throttle_time_ms: int = ..., - responses: list[AlterConfigsResourceResponse] = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - name: str - type: str - API_KEY: int - API_VERSION: int - valid_versions: tuple[int, int] - min_version: int - max_version: int - @property - def header(self) -> Any: ... - @classmethod - def is_request(cls) -> bool: ... - def expect_response(self) -> bool: ... - def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... - class DescribeLogDirsRequest(ApiMessage): class DescribableLogDirTopic(DataContainer): topic: str @@ -473,114 +223,3 @@ class DescribeLogDirsResponse(ApiMessage): def is_request(cls) -> bool: ... def expect_response(self) -> bool: ... def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... - -class ElectLeadersRequest(ApiMessage): - class TopicPartitions(DataContainer): - topic: str - partitions: list[int] - def __init__( - self, - *args: Any, - topic: str = ..., - partitions: list[int] = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - election_type: int - topic_partitions: list[TopicPartitions] | None - timeout_ms: int - def __init__( - self, - *args: Any, - election_type: int = ..., - topic_partitions: list[TopicPartitions] | None = ..., - timeout_ms: int = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - name: str - type: str - API_KEY: int - API_VERSION: int - valid_versions: tuple[int, int] - min_version: int - max_version: int - @property - def header(self) -> Any: ... - @classmethod - def is_request(cls) -> bool: ... - def expect_response(self) -> bool: ... - def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... - -class ElectLeadersResponse(ApiMessage): - class ReplicaElectionResult(DataContainer): - class PartitionResult(DataContainer): - partition_id: int - error_code: int - error_message: str | None - def __init__( - self, - *args: Any, - partition_id: int = ..., - error_code: int = ..., - error_message: str | None = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - topic: str - partition_result: list[PartitionResult] - def __init__( - self, - *args: Any, - topic: str = ..., - partition_result: list[PartitionResult] = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - throttle_time_ms: int - error_code: int - replica_election_results: list[ReplicaElectionResult] - def __init__( - self, - *args: Any, - throttle_time_ms: int = ..., - error_code: int = ..., - replica_election_results: list[ReplicaElectionResult] = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - name: str - type: str - API_KEY: int - API_VERSION: int - valid_versions: tuple[int, int] - min_version: int - max_version: int - @property - def header(self) -> Any: ... - @classmethod - def is_request(cls) -> bool: ... - def expect_response(self) -> bool: ... - def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... - -class ElectionType(IntEnum): - PREFERRED: int - UNCLEAN: int diff --git a/kafka/protocol/admin/topics.pyi b/kafka/protocol/admin/topics.pyi index 2775b2fc4..5d922d830 100644 --- a/kafka/protocol/admin/topics.pyi +++ b/kafka/protocol/admin/topics.pyi @@ -2,10 +2,11 @@ import uuid from typing import Any, Self +from enum import IntEnum from kafka.protocol.api_message import ApiMessage from kafka.protocol.data_container import DataContainer -__all__ = ['CreateTopicsRequest', 'CreateTopicsResponse', 'DeleteTopicsRequest', 'DeleteTopicsResponse', 'CreatePartitionsRequest', 'CreatePartitionsResponse', 'AlterPartitionRequest', 'AlterPartitionResponse', 'AlterPartitionReassignmentsRequest', 'AlterPartitionReassignmentsResponse', 'ListPartitionReassignmentsRequest', 'ListPartitionReassignmentsResponse', 'DeleteRecordsRequest', 'DeleteRecordsResponse'] +__all__ = ['CreateTopicsRequest', 'CreateTopicsResponse', 'DeleteTopicsRequest', 'DeleteTopicsResponse', 'CreatePartitionsRequest', 'CreatePartitionsResponse', 'AlterPartitionRequest', 'AlterPartitionResponse', 'AlterPartitionReassignmentsRequest', 'AlterPartitionReassignmentsResponse', 'ListPartitionReassignmentsRequest', 'ListPartitionReassignmentsResponse', 'DeleteRecordsRequest', 'DeleteRecordsResponse', 'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType'] class CreateTopicsRequest(ApiMessage): class CreatableTopic(DataContainer): @@ -862,3 +863,114 @@ class DeleteRecordsResponse(ApiMessage): def is_request(cls) -> bool: ... def expect_response(self) -> bool: ... def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class ElectLeadersRequest(ApiMessage): + class TopicPartitions(DataContainer): + topic: str + partitions: list[int] + def __init__( + self, + *args: Any, + topic: str = ..., + partitions: list[int] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + election_type: int + topic_partitions: list[TopicPartitions] | None + timeout_ms: int + def __init__( + self, + *args: Any, + election_type: int = ..., + topic_partitions: list[TopicPartitions] | None = ..., + timeout_ms: int = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class ElectLeadersResponse(ApiMessage): + class ReplicaElectionResult(DataContainer): + class PartitionResult(DataContainer): + partition_id: int + error_code: int + error_message: str | None + def __init__( + self, + *args: Any, + partition_id: int = ..., + error_code: int = ..., + error_message: str | None = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + topic: str + partition_result: list[PartitionResult] + def __init__( + self, + *args: Any, + topic: str = ..., + partition_result: list[PartitionResult] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + throttle_time_ms: int + error_code: int + replica_election_results: list[ReplicaElectionResult] + def __init__( + self, + *args: Any, + throttle_time_ms: int = ..., + error_code: int = ..., + replica_election_results: list[ReplicaElectionResult] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class ElectionType(IntEnum): + PREFERRED: int + UNCLEAN: int From aca309180704c7072b0a22625c23b5e4dec0cff7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 16 Apr 2026 17:05:17 -0700 Subject: [PATCH 10/12] more stubs --- kafka/protocol/admin/configs.pyi | 257 +++++++++++++++++++++++++++++++ kafka/protocol/admin/users.pyi | 223 +++++++++++++++++++++++++++ kafka/protocol/generate_stubs.py | 12 +- 3 files changed, 487 insertions(+), 5 deletions(-) create mode 100644 kafka/protocol/admin/configs.pyi create mode 100644 kafka/protocol/admin/users.pyi diff --git a/kafka/protocol/admin/configs.pyi b/kafka/protocol/admin/configs.pyi new file mode 100644 index 000000000..c6f4bacba --- /dev/null +++ b/kafka/protocol/admin/configs.pyi @@ -0,0 +1,257 @@ +# Generated by generate_stubs.py (Python 3.14) +import uuid +from typing import Any, Self + +from kafka.protocol.api_message import ApiMessage +from kafka.protocol.data_container import DataContainer + +__all__ = ['AlterConfigsRequest', 'AlterConfigsResponse', 'DescribeConfigsRequest', 'DescribeConfigsResponse'] + +class AlterConfigsRequest(ApiMessage): + class AlterConfigsResource(DataContainer): + class AlterableConfig(DataContainer): + name: str + value: str | None + def __init__( + self, + *args: Any, + name: str = ..., + value: str | None = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + resource_type: int + resource_name: str + configs: list[AlterableConfig] + def __init__( + self, + *args: Any, + resource_type: int = ..., + resource_name: str = ..., + configs: list[AlterableConfig] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + resources: list[AlterConfigsResource] + validate_only: bool + def __init__( + self, + *args: Any, + resources: list[AlterConfigsResource] = ..., + validate_only: bool = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class AlterConfigsResponse(ApiMessage): + class AlterConfigsResourceResponse(DataContainer): + error_code: int + error_message: str | None + resource_type: int + resource_name: str + def __init__( + self, + *args: Any, + error_code: int = ..., + error_message: str | None = ..., + resource_type: int = ..., + resource_name: str = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + throttle_time_ms: int + responses: list[AlterConfigsResourceResponse] + def __init__( + self, + *args: Any, + throttle_time_ms: int = ..., + responses: list[AlterConfigsResourceResponse] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class DescribeConfigsRequest(ApiMessage): + class DescribeConfigsResource(DataContainer): + resource_type: int + resource_name: str + configuration_keys: list[str] | None + def __init__( + self, + *args: Any, + resource_type: int = ..., + resource_name: str = ..., + configuration_keys: list[str] | None = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + resources: list[DescribeConfigsResource] + include_synonyms: bool + include_documentation: bool + def __init__( + self, + *args: Any, + resources: list[DescribeConfigsResource] = ..., + include_synonyms: bool = ..., + include_documentation: bool = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class DescribeConfigsResponse(ApiMessage): + class DescribeConfigsResult(DataContainer): + class DescribeConfigsResourceResult(DataContainer): + class DescribeConfigsSynonym(DataContainer): + name: str + value: str | None + source: int + def __init__( + self, + *args: Any, + name: str = ..., + value: str | None = ..., + source: int = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + name: str + value: str | None + read_only: bool + config_source: int + is_default: bool + is_sensitive: bool + synonyms: list[DescribeConfigsSynonym] + config_type: int + documentation: str | None + def __init__( + self, + *args: Any, + name: str = ..., + value: str | None = ..., + read_only: bool = ..., + config_source: int = ..., + is_default: bool = ..., + is_sensitive: bool = ..., + synonyms: list[DescribeConfigsSynonym] = ..., + config_type: int = ..., + documentation: str | None = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + error_code: int + error_message: str | None + resource_type: int + resource_name: str + configs: list[DescribeConfigsResourceResult] + def __init__( + self, + *args: Any, + error_code: int = ..., + error_message: str | None = ..., + resource_type: int = ..., + resource_name: str = ..., + configs: list[DescribeConfigsResourceResult] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + throttle_time_ms: int + results: list[DescribeConfigsResult] + def __init__( + self, + *args: Any, + throttle_time_ms: int = ..., + results: list[DescribeConfigsResult] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... diff --git a/kafka/protocol/admin/users.pyi b/kafka/protocol/admin/users.pyi new file mode 100644 index 000000000..80345bc61 --- /dev/null +++ b/kafka/protocol/admin/users.pyi @@ -0,0 +1,223 @@ +# Generated by generate_stubs.py (Python 3.14) +import uuid +from typing import Any, Self + +from kafka.protocol.api_message import ApiMessage +from kafka.protocol.api_data import ApiData +from kafka.protocol.data_container import DataContainer + +__all__ = ['AlterUserScramCredentialsRequest', 'AlterUserScramCredentialsResponse', 'DescribeUserScramCredentialsRequest', 'DescribeUserScramCredentialsResponse'] + +class AlterUserScramCredentialsRequest(ApiMessage): + class ScramCredentialDeletion(DataContainer): + name: str + mechanism: int + def __init__( + self, + *args: Any, + name: str = ..., + mechanism: int = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + class ScramCredentialUpsertion(DataContainer): + name: str + mechanism: int + iterations: int + salt: bytes | ApiData + salted_password: bytes | ApiData + def __init__( + self, + *args: Any, + name: str = ..., + mechanism: int = ..., + iterations: int = ..., + salt: bytes | ApiData = ..., + salted_password: bytes | ApiData = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + deletions: list[ScramCredentialDeletion] + upsertions: list[ScramCredentialUpsertion] + def __init__( + self, + *args: Any, + deletions: list[ScramCredentialDeletion] = ..., + upsertions: list[ScramCredentialUpsertion] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class AlterUserScramCredentialsResponse(ApiMessage): + class AlterUserScramCredentialsResult(DataContainer): + user: str + error_code: int + error_message: str | None + def __init__( + self, + *args: Any, + user: str = ..., + error_code: int = ..., + error_message: str | None = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + throttle_time_ms: int + results: list[AlterUserScramCredentialsResult] + def __init__( + self, + *args: Any, + throttle_time_ms: int = ..., + results: list[AlterUserScramCredentialsResult] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class DescribeUserScramCredentialsRequest(ApiMessage): + class UserName(DataContainer): + name: str + def __init__( + self, + *args: Any, + name: str = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + users: list[UserName] | None + def __init__( + self, + *args: Any, + users: list[UserName] | None = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class DescribeUserScramCredentialsResponse(ApiMessage): + class DescribeUserScramCredentialsResult(DataContainer): + class CredentialInfo(DataContainer): + mechanism: int + iterations: int + def __init__( + self, + *args: Any, + mechanism: int = ..., + iterations: int = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + user: str + error_code: int + error_message: str | None + credential_infos: list[CredentialInfo] + def __init__( + self, + *args: Any, + user: str = ..., + error_code: int = ..., + error_message: str | None = ..., + credential_infos: list[CredentialInfo] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + throttle_time_ms: int + error_code: int + error_message: str | None + results: list[DescribeUserScramCredentialsResult] + def __init__( + self, + *args: Any, + throttle_time_ms: int = ..., + error_code: int = ..., + error_message: str | None = ..., + results: list[DescribeUserScramCredentialsResult] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... diff --git a/kafka/protocol/generate_stubs.py b/kafka/protocol/generate_stubs.py index 012b11708..38c367bcb 100644 --- a/kafka/protocol/generate_stubs.py +++ b/kafka/protocol/generate_stubs.py @@ -37,6 +37,13 @@ # Modules containing protocol classes, relative to kafka.protocol MESSAGE_MODULES = [ + 'kafka.protocol.admin.acl', + 'kafka.protocol.admin.client_quotas', + 'kafka.protocol.admin.cluster', + 'kafka.protocol.admin.configs', + 'kafka.protocol.admin.groups', + 'kafka.protocol.admin.topics', + 'kafka.protocol.admin.users', 'kafka.protocol.consumer.fetch', 'kafka.protocol.consumer.group', 'kafka.protocol.consumer.metadata', @@ -46,11 +53,6 @@ 'kafka.protocol.metadata.metadata', 'kafka.protocol.producer.produce', 'kafka.protocol.producer.transaction', - 'kafka.protocol.admin.acl', - 'kafka.protocol.admin.client_quotas', - 'kafka.protocol.admin.cluster', - 'kafka.protocol.admin.groups', - 'kafka.protocol.admin.topics', 'kafka.protocol.sasl', ] From d5de0dee5d882e8659831dfc41813b6246861e88 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 16 Apr 2026 23:41:20 -0700 Subject: [PATCH 11/12] Drop _client.api_version; use per-connection version --- kafka/admin/_acls.py | 126 +++++++++++++--------------------------- kafka/admin/_configs.py | 1 - kafka/admin/_groups.py | 13 ++--- kafka/admin/_records.py | 5 +- kafka/admin/_topics.py | 1 - 5 files changed, 44 insertions(+), 102 deletions(-) diff --git a/kafka/admin/_acls.py b/kafka/admin/_acls.py index 2019d7ef6..8576112b1 100644 --- a/kafka/admin/_acls.py +++ b/kafka/admin/_acls.py @@ -23,7 +23,6 @@ class ACLAdminMixin: """Mixin providing ACL management methods for KafkaAdminClient.""" _manager: KafkaConnectionManager - _client: object config: dict # ACL Helper for Metadata / DescribeGroups @@ -45,27 +44,17 @@ def describe_acls(self, acl_filter): Returns: tuple of a list of matching ACL objects and a KafkaError (NoError if successful) """ - - version = self._client.api_version(DescribeAclsRequest, max_version=1) - if version == 0: - request = DescribeAclsRequest[version]( - resource_type_filter=acl_filter.resource_pattern.resource_type, - resource_name_filter=acl_filter.resource_pattern.resource_name, - principal_filter=acl_filter.principal, - host_filter=acl_filter.host, - operation=acl_filter.operation, - permission_type=acl_filter.permission_type - ) - elif version <= 1: - request = DescribeAclsRequest[version]( - resource_type_filter=acl_filter.resource_pattern.resource_type, - resource_name_filter=acl_filter.resource_pattern.resource_name, - pattern_type_filter=acl_filter.resource_pattern.pattern_type, - principal_filter=acl_filter.principal, - host_filter=acl_filter.host, - operation=acl_filter.operation, - permission_type=acl_filter.permission_type - ) + min_version = 3 if acl_filter.resource_pattern.resource_type == ResourceType.USER else 0 + request = DescribeAclsRequest( + min_version=min_version, + resource_type_filter=acl_filter.resource_pattern.resource_type, + resource_name_filter=acl_filter.resource_pattern.resource_name, + pattern_type_filter=acl_filter.resource_pattern.pattern_type, + principal_filter=acl_filter.principal, + host_filter=acl_filter.host, + operation=acl_filter.operation, + permission_type=acl_filter.permission_type + ) response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 return self._convert_describe_acls_response_to_acls(response) @@ -99,28 +88,17 @@ def _convert_describe_acls_response_to_acls(describe_response): return acl_list, Errors.NoError @staticmethod - def _convert_create_acls_resource_request_v0(acl): - """Convert an ACL object into the CreateAclsRequest v0 format.""" - return ( - acl.resource_pattern.resource_type, - acl.resource_pattern.resource_name, - acl.principal, - acl.host, - acl.operation, - acl.permission_type - ) - - @staticmethod - def _convert_create_acls_resource_request_v1(acl): - """Convert an ACL object into the CreateAclsRequest v1 format.""" - return ( - acl.resource_pattern.resource_type, - acl.resource_pattern.resource_name, - acl.resource_pattern.pattern_type, - acl.principal, - acl.host, - acl.operation, - acl.permission_type + def _convert_create_acls_resource_request(acl): + """Convert an ACL object into the CreateAclsRequest format.""" + _AclCreate = CreateAclsRequest.AclCreation + return _AclCreate( + resource_type=acl.resource_pattern.resource_type, + resource_name=acl.resource_pattern.resource_name, + resource_pattern_type=acl.resource_pattern.pattern_type, + principal=acl.principal, + host=acl.host, + operation=acl.operation, + permission_type=acl.permission_type ) @staticmethod @@ -147,46 +125,28 @@ def create_acls(self, acls): Returns: dict of successes and failures """ - for acl in acls: if not isinstance(acl, ACL): raise IllegalArgumentError("acls must contain ACL objects") - version = self._client.api_version(CreateAclsRequest, max_version=1) - if version == 0: - request = CreateAclsRequest[version]( - creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls] - ) - elif version <= 1: - request = CreateAclsRequest[version]( - creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls] - ) + creations = [self._convert_create_acls_resource_request(acl) for acl in acls] + min_version = 3 if any(creation.resource_type == ResourceType.USER for creation in creations) else 0 + request = CreateAclsRequest(creations=creations, min_version=min_version) response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 return self._convert_create_acls_response_to_acls(acls, response) @staticmethod - def _convert_delete_acls_resource_request_v0(acl): - """Convert an ACLFilter object into the DeleteAclsRequest v0 format.""" - return ( - acl.resource_pattern.resource_type, - acl.resource_pattern.resource_name, - acl.principal, - acl.host, - acl.operation, - acl.permission_type - ) - - @staticmethod - def _convert_delete_acls_resource_request_v1(acl): - """Convert an ACLFilter object into the DeleteAclsRequest v1 format.""" - return ( - acl.resource_pattern.resource_type, - acl.resource_pattern.resource_name, - acl.resource_pattern.pattern_type, - acl.principal, - acl.host, - acl.operation, - acl.permission_type + def _convert_delete_acls_resource_request(acl): + """Convert an ACLFilter object into the DeleteAclsRequest format.""" + _AclsFilter = DeleteAclsRequest.DeleteAclsFilter + return _AclsFilter( + resource_type_filter=acl.resource_pattern.resource_type, + resource_name_filter=acl.resource_pattern.resource_name, + pattern_type_filter=acl.resource_pattern.pattern_type, + principal_filter=acl.principal, + host_filter=acl.host, + operation=acl.operation, + permission_type=acl.permission_type ) @staticmethod @@ -224,21 +184,13 @@ def delete_acls(self, acl_filters): a list of 3-tuples corresponding to the list of input filters. The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance) """ - for acl in acl_filters: if not isinstance(acl, ACLFilter): raise IllegalArgumentError("acl_filters must contain ACLFilter type objects") - version = self._client.api_version(DeleteAclsRequest, max_version=1) - - if version == 0: - request = DeleteAclsRequest[version]( - filters=[self._convert_delete_acls_resource_request_v0(acl) for acl in acl_filters] - ) - elif version <= 1: - request = DeleteAclsRequest[version]( - filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters] - ) + filters = [self._convert_delete_acls_resource_request(acl) for acl in acl_filters] + min_version = 3 if any(_filter.resource_type_filter == ResourceType.USER for _filter in filters) else 0 + request = DeleteAclsRequest(filters=filters, min_version=min_version) response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606 return self._convert_delete_acls_response_to_matching_acls(acl_filters, response) diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index c35c98aab..563ea53f1 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -25,7 +25,6 @@ class ConfigAdminMixin: """Mixin providing configuration management methods for KafkaAdminClient.""" _manager: KafkaConnectionManager - _client: object config: dict @staticmethod diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index d911e88fd..aa86fcf95 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -25,21 +25,16 @@ class GroupAdminMixin: """Mixin providing consumer group management methods for KafkaAdminClient.""" _manager: KafkaConnectionManager - _client: object _coordinator_cache: dict config: dict # -- Describe consumer groups ---------------------------------------------- def _describe_consumer_groups_request(self, group_id): - version = self._client.api_version(DescribeGroupsRequest, max_version=3) - if version <= 2: - request = DescribeGroupsRequest[version](groups=(group_id,)) - else: - request = DescribeGroupsRequest[version]( - groups=(group_id,), - include_authorized_operations=True - ) + request = DescribeGroupsRequest( + groups=[group_id], + include_authorized_operations=True + ) return request def _describe_consumer_groups_process_response(self, response): diff --git a/kafka/admin/_records.py b/kafka/admin/_records.py index aa22abc88..cc2df2954 100644 --- a/kafka/admin/_records.py +++ b/kafka/admin/_records.py @@ -20,7 +20,6 @@ class RecordAdminMixin: """Mixin providing record deletion and cluster operation methods.""" _manager: KafkaConnectionManager - _client: object config: dict async def _async_get_leader_for_partitions(self, partitions): @@ -50,8 +49,6 @@ async def _async_get_leader_for_partitions(self, partitions): async def _async_delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): timeout_ms = self._validate_timeout(timeout_ms) - version = self._client.api_version(DeleteRecordsRequest, max_version=0) - if partition_leader_id is None: leader2partitions = await self._async_get_leader_for_partitions(set(records_to_delete)) else: @@ -63,7 +60,7 @@ async def _async_delete_records(self, records_to_delete, timeout_ms=None, partit for partition in partitions: topic2partitions[partition.topic].append(partition) - request = DeleteRecordsRequest[version]( + request = DeleteRecordsRequest( topics=[ (topic, [(tp.partition, records_to_delete[tp]) for tp in partitions]) for topic, partitions in topic2partitions.items() diff --git a/kafka/admin/_topics.py b/kafka/admin/_topics.py index c503a20b1..1670d45d6 100644 --- a/kafka/admin/_topics.py +++ b/kafka/admin/_topics.py @@ -23,7 +23,6 @@ class TopicAdminMixin: """Mixin providing topic management methods for KafkaAdminClient.""" _manager: KafkaConnectionManager - _client: object config: dict def list_topics(self): From eb512a38d2b8b9dcc01f70cc76b63d303dd58bb8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 17 Apr 2026 07:32:07 -0700 Subject: [PATCH 12/12] fix --- kafka/admin/_groups.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index aa86fcf95..aee0dd304 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -39,10 +39,6 @@ def _describe_consumer_groups_request(self, group_id): def _describe_consumer_groups_process_response(self, response): """Process a DescribeGroupsResponse into a group description.""" - if response.API_VERSION > 3: - raise NotImplementedError( - "Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient." - .format(response.API_VERSION)) assert len(response.groups) == 1 for group in response.groups: for member in group.members: