diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index 2977e882a..d4e22a40b 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -14,6 +14,7 @@ from kafka.protocol.admin import ( AlterConfigsRequest, DescribeConfigsRequest, + ListConfigResourcesRequest, ) if TYPE_CHECKING: @@ -130,6 +131,49 @@ def describe_configs(self, config_resources, include_synonyms=False, config_filt return self._manager.run(self._async_describe_configs, config_resources, include_synonyms, config_filter) + @staticmethod + def _list_config_resources_process_response(response): + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + raise error_type( + "ListConfigResourcesRequest failed with response '{}'.".format(response)) + ret = defaultdict(list) + for resource in response.config_resources: + resource_type = ConfigResourceType(resource.resource_type) + ret[resource_type.name.lower()].append(resource.resource_name) + return dict(ret) + + async def _async_list_config_resources(self, resource_types=None): + wire_types = [] + for rt in resource_types or []: + if not isinstance(rt, ConfigResourceType): + try: + rt = ConfigResourceType[str(rt).upper().replace('-', '_')] + except KeyError: + raise ValueError(f'Unrecognized ConfigResourceType: {rt}') + wire_types.append(rt.value) + request = ListConfigResourcesRequest(resource_types=wire_types) + response = await self._manager.send(request) + return self._list_config_resources_process_response(response) + + def list_config_resources(self, resource_types=None): + """List config resources known to the cluster. + + Useful for discovering resource types that have no separate enumeration + API (e.g. ``CLIENT_METRICS``, ``GROUP``). For ``TOPIC`` and ``BROKER`` + the data is also available via ``Metadata`` / cluster descriptions. + + Keyword Arguments: + resource_types (list, optional): Filter by resource type. Each entry + may be a :class:`ConfigResourceType` or its name (e.g. ``'TOPIC'``). + If None or empty, the broker returns all supported types. + Requires broker >= 4.1 for anything other than ``CLIENT_METRICS``. + + Returns: + dict of {resource_type (str): [resource_name (str)]} + """ + return self._manager.run(self._async_list_config_resources, resource_types) + async def _get_missing_dynamic_configs(self, config_resources): resource_lookups = [ConfigResource(resource.resource_type, resource.name) for resource in config_resources] dynamic_configs = await self._async_describe_configs(resource_lookups, config_filter='modified', flat=True) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 3ee8d42fc..5748e33a0 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -113,7 +113,6 @@ def run_cli(args=None): # --trace ? # [configs] - # ListConfigResources # IncrementalAlterConfigs (not supported yet) # [client-quotas] diff --git a/kafka/cli/admin/configs/__init__.py b/kafka/cli/admin/configs/__init__.py index 3c2e182f1..d8639f57e 100644 --- a/kafka/cli/admin/configs/__init__.py +++ b/kafka/cli/admin/configs/__init__.py @@ -2,6 +2,7 @@ from .alter import AlterConfigs from .describe import DescribeConfigs +from .list import ListConfigResources from .reset import ResetConfigs @@ -11,6 +12,6 @@ class ConfigsSubCommand: def add_subparser(cls, subparsers): parser = subparsers.add_parser('configs', help='Manage Kafka Configuration') commands = parser.add_subparsers() - for cmd in [DescribeConfigs, AlterConfigs, ResetConfigs]: + for cmd in [DescribeConfigs, AlterConfigs, ListConfigResources, ResetConfigs]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/configs/alter.py b/kafka/cli/admin/configs/alter.py index db8d9df40..efa2e0e29 100644 --- a/kafka/cli/admin/configs/alter.py +++ b/kafka/cli/admin/configs/alter.py @@ -1,4 +1,4 @@ -from kafka.admin import ConfigResource +from .common import add_resource_arguments, parse_resources class AlterConfigs: @@ -6,10 +6,7 @@ class AlterConfigs: @classmethod def add_subparser(cls, subparsers): parser = subparsers.add_parser('alter', help='Alter Kafka Configs') - parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[]) - parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[]) - parser.add_argument('--broker-logger', type=str, action='append', dest='broker_loggers', default=[]) - parser.add_argument('-g', '--group', type=str, action='append', dest='groups', default=[]) + add_resource_arguments(parser) parser.add_argument('-c', '--config', type=str, action='append', dest='configs', required=True, help='key=value to alter') parser.add_argument('-v', '--validate-only', action='store_true', default=False) parser.add_argument('--allow-unknown', action='store_false', dest='raise_on_unknown', default=True) @@ -17,24 +14,11 @@ def add_subparser(cls, subparsers): @classmethod def command(cls, client, args): - configs = dict(config.split('=') for config in args.configs) - resources = [] - for topic in args.topics: - if resources: - raise ValueError('Only one resource type per request') - resources.append(ConfigResource('TOPIC', topic, configs)) - for broker in args.brokers: - if resources: - raise ValueError('Only one resource type per request') - resources.append(ConfigResource('BROKER', broker, configs)) - for broker in args.broker_loggers: - if resources: - raise ValueError('Only one resource type per request') - resources.append(ConfigResource('BROKER_LOGGER', broker, configs)) - for group in args.groups: - if resources: - raise ValueError('Only one resource type per request') - resources.append(ConfigResource('GROUP', group, configs)) + try: + configs = dict(config.split('=') for config in args.configs) + except ValueError: + raise ValueError(f'Unable to parse configs! {args.configs}') + resources = parse_resources(args, configs=configs) return client.alter_configs(resources, validate_only=args.validate_only, raise_on_unknown=args.raise_on_unknown) diff --git a/kafka/cli/admin/configs/common.py b/kafka/cli/admin/configs/common.py new file mode 100644 index 000000000..e8957ea46 --- /dev/null +++ b/kafka/cli/admin/configs/common.py @@ -0,0 +1,17 @@ +from kafka.admin import ConfigResource + + +def add_resource_arguments(parser): + """Add arguments for specifying ConfigResource""" + parser.add_argument( + '-r', '--resource-type', type=str, required=True, + help='Type of resource to describe: topic, broker, broker_logger, ' + 'client_metrics, group.') + parser.add_argument( + '-n', '--resource-name', type=str, action='append', dest='resource_names', required=True, + help='Name of resource(s) to describe. May be repeated.') + + +def parse_resources(args, configs=None): + return [ConfigResource(args.resource_type.upper().replace('-', '_'), resource_name, configs) + for resource_name in args.resource_names] diff --git a/kafka/cli/admin/configs/describe.py b/kafka/cli/admin/configs/describe.py index bbd970732..16d2a73d4 100644 --- a/kafka/cli/admin/configs/describe.py +++ b/kafka/cli/admin/configs/describe.py @@ -1,4 +1,4 @@ -from kafka.admin import ConfigResource +from .common import add_resource_arguments, parse_resources class DescribeConfigs: @@ -6,11 +6,8 @@ class DescribeConfigs: @classmethod def add_subparser(cls, subparsers): parser = subparsers.add_parser('describe', help='Describe Kafka Configs') - parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[]) - parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[]) - parser.add_argument('--broker-logger', type=str, action='append', dest='broker_loggers', default=[]) - parser.add_argument('-g', '--group', type=str, action='append', dest='groups', default=[]) - parser.add_argument('-k', '--key', type=str, action='append', dest='keys', default=None) + add_resource_arguments(parser) + parser.add_argument('-c', '--config', type=str, action='append', dest='configs', default=None) parser.add_argument('--dynamic', action='store_true', default=False) parser.add_argument('--modified', action='store_true', default=False) parser.add_argument('--static', action='store_true', default=False) @@ -19,16 +16,7 @@ def add_subparser(cls, subparsers): @classmethod def command(cls, client, args): - resources = [] - for topic in args.topics: - resources.append(ConfigResource('TOPIC', topic, args.keys)) - for broker in args.brokers: - resources.append(ConfigResource('BROKER', broker, args.keys)) - for broker in args.broker_loggers: - resources.append(ConfigResource('BROKER_LOGGER', broker, args.keys)) - for group in args.groups: - resources.append(ConfigResource('GROUP', group, args.keys)) - + resources = parse_resources(args, configs=args.configs) if args.modified: config_filter = 'modified' elif args.dynamic: diff --git a/kafka/cli/admin/configs/list.py b/kafka/cli/admin/configs/list.py new file mode 100644 index 000000000..cff2c8159 --- /dev/null +++ b/kafka/cli/admin/configs/list.py @@ -0,0 +1,19 @@ +class ListConfigResources: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'list', + help='List config resources known to the cluster (requires broker >= 4.1 ' + 'for non client_metrics types)') + parser.add_argument( + '-r', '--resource-type', type=str, action='append', dest='resource_types', default=[], + help='Filter by resource type (repeatable): topic, broker, ' + 'broker_logger, client_metrics, group. Omit to list all ' + 'supported types.') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + return client.list_config_resources( + resource_types=args.resource_types or None) diff --git a/kafka/cli/admin/configs/reset.py b/kafka/cli/admin/configs/reset.py index 0885cf49a..f9d832121 100644 --- a/kafka/cli/admin/configs/reset.py +++ b/kafka/cli/admin/configs/reset.py @@ -1,4 +1,4 @@ -from kafka.admin import ConfigResource +from .common import add_resource_arguments, parse_resources class ResetConfigs: @@ -6,26 +6,15 @@ class ResetConfigs: @classmethod def add_subparser(cls, subparsers): parser = subparsers.add_parser('reset', help='Reset Kafka Configs') - parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[]) - parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[]) - parser.add_argument('--broker-logger', type=str, action='append', dest='broker_loggers', default=[]) - parser.add_argument('-g', '--group', type=str, action='append', dest='groups', default=[]) - parser.add_argument('-c', '--config', type=str, action='append', dest='configs', default=[], help='key=value to reset') + add_resource_arguments(parser) + parser.add_argument('-c', '--config', type=str, action='append', dest='configs', default=[], help='key to reset') parser.add_argument('-v', '--validate-only', action='store_true', default=False) parser.add_argument('--allow-unknown', action='store_false', dest='raise_on_unknown', default=True) parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): - resources = [] - for topic in args.topics: - resources.append(ConfigResource('TOPIC', topic, args.configs)) - for broker in args.brokers: - resources.append(ConfigResource('BROKER', broker, args.configs)) - for broker in args.broker_loggers: - resources.append(ConfigResource('BROKER_LOGGER', broker, args.configs)) - for group in args.groups: - resources.append(ConfigResource('GROUP', group, args.configs)) + resources = parse_resources(args, configs=args.configs) return client.reset_configs(resources, validate_only=args.validate_only, raise_on_unknown=args.raise_on_unknown) diff --git a/kafka/protocol/admin/configs.py b/kafka/protocol/admin/configs.py index 3ec7b7206..7bbaead1d 100644 --- a/kafka/protocol/admin/configs.py +++ b/kafka/protocol/admin/configs.py @@ -7,8 +7,12 @@ class AlterConfigsResponse(ApiMessage): pass class DescribeConfigsRequest(ApiMessage): pass class DescribeConfigsResponse(ApiMessage): pass +class ListConfigResourcesRequest(ApiMessage): pass +class ListConfigResourcesResponse(ApiMessage): pass + __all__ = [ 'AlterConfigsRequest', 'AlterConfigsResponse', 'DescribeConfigsRequest', 'DescribeConfigsResponse', + 'ListConfigResourcesRequest', 'ListConfigResourcesResponse', ] diff --git a/kafka/protocol/admin/configs.pyi b/kafka/protocol/admin/configs.pyi index c6f4bacba..3da555eef 100644 --- a/kafka/protocol/admin/configs.pyi +++ b/kafka/protocol/admin/configs.pyi @@ -5,7 +5,7 @@ from typing import Any, Self from kafka.protocol.api_message import ApiMessage from kafka.protocol.data_container import DataContainer -__all__ = ['AlterConfigsRequest', 'AlterConfigsResponse', 'DescribeConfigsRequest', 'DescribeConfigsResponse'] +__all__ = ['AlterConfigsRequest', 'AlterConfigsResponse', 'DescribeConfigsRequest', 'DescribeConfigsResponse', 'ListConfigResourcesRequest', 'ListConfigResourcesResponse'] class AlterConfigsRequest(ApiMessage): class AlterConfigsResource(DataContainer): @@ -255,3 +255,74 @@ class DescribeConfigsResponse(ApiMessage): def is_request(cls) -> bool: ... def expect_response(self) -> bool: ... def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class ListConfigResourcesRequest(ApiMessage): + resource_types: list[int] + def __init__( + self, + *args: Any, + resource_types: list[int] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class ListConfigResourcesResponse(ApiMessage): + class ConfigResource(DataContainer): + resource_name: str + resource_type: int + def __init__( + self, + *args: Any, + resource_name: str = ..., + resource_type: int = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + throttle_time_ms: int + error_code: int + config_resources: list[ConfigResource] + def __init__( + self, + *args: Any, + throttle_time_ms: int = ..., + error_code: int = ..., + config_resources: list[ConfigResource] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... diff --git a/kafka/protocol/schemas/resources/ListConfigResourcesRequest.json b/kafka/protocol/schemas/resources/ListConfigResourcesRequest.json new file mode 100644 index 000000000..c4b858a71 --- /dev/null +++ b/kafka/protocol/schemas/resources/ListConfigResourcesRequest.json @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 74, + "type": "request", + "listeners": ["broker"], + "name": "ListConfigResourcesRequest", + // Version 0 is used as ListClientMetricsResourcesRequest which only lists client metrics resources. + // Version 1 adds ResourceTypes field (KIP-1142). If there is no specified ResourceTypes, it should return all configuration resources. + "validVersions": "0-1", + "flexibleVersions": "0+", + "fields": [ + { "name": "ResourceTypes", "type": "[]int8", "versions": "1+", + "about": "The list of resource type. If the list is empty, it uses default supported config resource types." + } + ] +} + \ No newline at end of file diff --git a/kafka/protocol/schemas/resources/ListConfigResourcesResponse.json b/kafka/protocol/schemas/resources/ListConfigResourcesResponse.json new file mode 100644 index 000000000..8a2dbdf5a --- /dev/null +++ b/kafka/protocol/schemas/resources/ListConfigResourcesResponse.json @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 74, + "type": "response", + "name": "ListConfigResourcesResponse", + // Version 0 is used as ListClientMetricsResourcesResponse which returns all client metrics resources. + // Version 1 adds ResourceType to ConfigResources (KIP-1142). + "validVersions": "0-1", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error." }, + { "name": "ConfigResources", "type": "[]ConfigResource", "versions": "0+", + "about": "Each config resource in the response.", "fields": [ + { "name": "ResourceName", "type": "string", "versions": "0+", + "about": "The resource name." }, + { "name": "ResourceType", "type": "int8", "versions": "1+", "ignorable": true, "default": 16, + "about": "The resource type." } + ]} + ] +} diff --git a/test/admin/test_admin_configs.py b/test/admin/test_admin_configs.py index e2d85fcc7..9e2850a18 100644 --- a/test/admin/test_admin_configs.py +++ b/test/admin/test_admin_configs.py @@ -1,9 +1,11 @@ import pytest from kafka.admin import ConfigResource, ConfigResourceType, KafkaAdminClient +from kafka.errors import ClusterAuthorizationFailedError from kafka.protocol.admin import ( AlterConfigsRequest, AlterConfigsResponse, DescribeConfigsRequest, DescribeConfigsResponse, + ListConfigResourcesRequest, ListConfigResourcesResponse, ) from test.mock_broker import MockBroker @@ -290,3 +292,104 @@ def test_validate_only_propagates(self, mock_broker, admin): validate_only=True) assert captured['request'].validate_only is True + + +# --------------------------------------------------------------------------- +# list_config_resources +# --------------------------------------------------------------------------- + + +def _list_config_resources_response(resources, error_code=0): + """resources: iterable of (resource_name, resource_type).""" + Resource = ListConfigResourcesResponse.ConfigResource + return ListConfigResourcesResponse( + throttle_time_ms=0, + error_code=error_code, + config_resources=[ + Resource(resource_name=name, resource_type=rtype) + for name, rtype in resources + ], + ) + + +def _capture_list_config_resources(captured, response): + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = ListConfigResourcesRequest.decode( + request_bytes, version=api_version, header=True) + captured['version'] = api_version + return response + return handler + + +class TestListConfigResourcesMockBroker: + + def test_groups_results_by_resource_type(self, mock_broker, admin): + mock_broker.respond( + ListConfigResourcesRequest, + _list_config_resources_response([ + ('topic-a', ConfigResourceType.TOPIC.value), + ('topic-b', ConfigResourceType.TOPIC.value), + ('mygroup', ConfigResourceType.GROUP.value), + ('metrics-1', ConfigResourceType.CLIENT_METRICS.value), + ]), + ) + result = admin.list_config_resources() + assert result == { + 'topic': ['topic-a', 'topic-b'], + 'group': ['mygroup'], + 'client_metrics': ['metrics-1'], + } + + def test_no_resource_types_sends_empty_filter(self, mock_broker, admin): + captured = {} + mock_broker.respond_fn( + ListConfigResourcesRequest, + _capture_list_config_resources( + captured, _list_config_resources_response([]))) + + admin.list_config_resources() + assert captured['request'].resource_types == [] + + def test_string_filter_is_normalized_and_sent_as_int8(self, mock_broker, admin): + captured = {} + mock_broker.respond_fn( + ListConfigResourcesRequest, + _capture_list_config_resources( + captured, _list_config_resources_response([]))) + + admin.list_config_resources( + resource_types=['topic', 'Client-Metrics']) + assert set(captured['request'].resource_types) == { + ConfigResourceType.TOPIC.value, + ConfigResourceType.CLIENT_METRICS.value, + } + + def test_enum_filter_accepted(self, mock_broker, admin): + captured = {} + mock_broker.respond_fn( + ListConfigResourcesRequest, + _capture_list_config_resources( + captured, _list_config_resources_response([]))) + + admin.list_config_resources( + resource_types=[ConfigResourceType.GROUP]) + assert captured['request'].resource_types == [ + ConfigResourceType.GROUP.value] + + def test_unrecognized_type_raises(self, admin): + with pytest.raises(ValueError, match='Unrecognized ConfigResourceType'): + admin.list_config_resources(resource_types=['bogus']) + + def test_error_code_raises(self, mock_broker, admin): + mock_broker.respond( + ListConfigResourcesRequest, + _list_config_resources_response( + [], error_code=ClusterAuthorizationFailedError.errno)) + with pytest.raises(ClusterAuthorizationFailedError): + admin.list_config_resources() + + def test_empty_response_returns_empty_dict(self, mock_broker, admin): + mock_broker.respond( + ListConfigResourcesRequest, + _list_config_resources_response([])) + assert admin.list_config_resources() == {} diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index d88b454cf..af54448e0 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -561,3 +561,9 @@ def test_list_partition_offsets(kafka_admin_client, topic): result = kafka_admin_client.list_partition_offsets({tp: OffsetSpec.LATEST}) assert tp in result assert isinstance(result[tp], OffsetAndTimestamp) + + +@pytest.mark.skipif(env_kafka_version() < (4, 1), reason="ListConfigResources requires broker >=4.1 (KRaft)") +def test_list_config_resources(kafka_admin_client): + result = kafka_admin_client.list_config_resources() + assert 'broker' in result