diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index 563ea53f1..2977e882a 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -10,7 +10,7 @@ import logging from typing import TYPE_CHECKING -from kafka.errors import IncompatibleBrokerVersion +import kafka.errors as Errors from kafka.protocol.admin import ( AlterConfigsRequest, DescribeConfigsRequest, @@ -31,6 +31,8 @@ class ConfigAdminMixin: def _convert_config_resource(config_resource, key_only=True): if key_only: values = list(config_resource.configs.keys()) if isinstance(config_resource.configs, dict) else config_resource.configs + elif not config_resource.configs: + values = [] else: assert isinstance(config_resource.configs, dict) values = list(config_resource.configs.items()) @@ -50,7 +52,12 @@ def _group_config_resources(self, config_resources, key_only=True): other_resources.append(self._convert_config_resource(config_resource, key_only=key_only)) return broker_resources, other_resources - async def _async_describe_configs(self, config_resources, include_synonyms=False): + async def _async_describe_configs(self, config_resources, include_synonyms=False, config_filter='modified', flat=False): + if isinstance(config_filter, str): + try: + config_filter = ConfigFilterType[config_filter.upper()] + except KeyError: + raise ValueError(f'{config_filter} is not a valid ConfigFilterType') min_version = 1 if include_synonyms else 0 broker_resources, other_resources = self._group_config_resources(config_resources, key_only=True) responses = [] @@ -70,23 +77,38 @@ async def _async_describe_configs(self, config_resources, include_synonyms=False ret = defaultdict(dict) for response in responses: for result in response.results: - result_type = ConfigResourceType(result.resource_type).name.lower() - ret[result_type][result.resource_name] = {} + resource_type = ConfigResourceType(result.resource_type) + resource_configs = {} for config in result.configs: config = config.to_dict() name = config.pop('name') + if config_filter == ConfigFilterType.DYNAMIC and config['read_only']: + continue if 'config_source' in config: - config['config_source'] = ConfigSourceType(config['config_source']).name + config_source = ConfigSourceType(config['config_source']) + elif config['read_only'] and resource_type is ConfigResourceType.BROKER: + config_source = ConfigSourceType.STATIC_BROKER_CONFIG + elif config['is_default']: + config_source = ConfigSourceType.DEFAULT_CONFIG + else: + config_source = ConfigSourceType.dynamic_for_resource_type(resource_type) + if config_filter.should_skip(config_source): + continue + config['config_source'] = config_source.name if 'synonyms' in config: for synonym in config['synonyms']: synonym['source'] = ConfigSourceType(synonym['source']).name - if 'config_type' in config: config['config_type'] = ConfigType(config['config_type']).name - ret[result_type][result.resource_name][name] = config - return dict(ret) + resource_configs[name] = config + ret[resource_type.name.lower()][result.resource_name] = resource_configs + if flat: + return [ret[resource.resource_type.name.lower()][resource.name] + for resource in config_resources] + else: + return dict(ret) - def describe_configs(self, config_resources, include_synonyms=False): + def describe_configs(self, config_resources, include_synonyms=False, config_filter='modified'): """Fetch configuration parameters for one or more Kafka resources. Arguments: @@ -98,37 +120,133 @@ def describe_configs(self, config_resources, include_synonyms=False): Keyword Arguments: include_synonyms (bool, optional): If True, return synonyms in response. Not supported by all versions. Default: False. + config_filter (ConfigFilterType or str): Modified returns only keys that have + non-default values; Dynamic returns all keys that can be modified with + alter_configs; All returns all available keys. Default: Modified. Returns: - List of DescribeConfigsResponses. + dict of {resource_type (str): {resource_name (str): {config_key: {config data}}}} """ - return self._manager.run(self._async_describe_configs, config_resources, include_synonyms) - - async def _async_alter_configs(self, config_resources, validate_only=False): + return self._manager.run(self._async_describe_configs, config_resources, + include_synonyms, config_filter) + + async def _get_missing_dynamic_configs(self, config_resources): + resource_lookups = [ConfigResource(resource.resource_type, resource.name) for resource in config_resources] + dynamic_configs = await self._async_describe_configs(resource_lookups, config_filter='modified', flat=True) + missing_resource_configs = [] + for resource, describe in zip(config_resources, dynamic_configs): + missing = {} + for config_key in describe: + if config_key not in resource.configs: + config_value = describe[config_key]['value'] + if config_value is None: + continue + missing[config_key] = config_value + missing_resource_configs.append(missing) + return missing_resource_configs + + async def _add_missing_dynamic_configs(self, config_resources): + # Add missing dynamic config values to resource list to avoid accidental resets + missing_resource_configs = await self._get_missing_dynamic_configs(config_resources) + for resource, missing in zip(config_resources, missing_resource_configs): + if not isinstance(missing, dict): + raise TypeError(f'missing configs: expected dict, found {type(missing)}') + resource.configs.update(missing) + + async def _validate_dynamic_configs(self, config_resources): + resource_lookups = [ConfigResource(resource.resource_type, resource.name) for resource in config_resources] + dynamic_configs = await self._async_describe_configs(resource_lookups, config_filter='dynamic', flat=True) + for resource, describe in zip(config_resources, dynamic_configs): + unknown = set(resource.configs or []) - set(describe) + if unknown: + raise ValueError(f'Unrecognized configs: {unknown}') + + async def _async_alter_configs(self, config_resources, validate_only=False, raise_on_unknown=True): + # Broker Version < (2, 3): use alter configs + # Broker Version >= (2, 3): use incremental alter configs + if raise_on_unknown: + await self._validate_dynamic_configs(config_resources) + await self._add_missing_dynamic_configs(config_resources) + return await self._send_alter_configs_requests(config_resources, validate_only=validate_only) + + async def _send_alter_configs_requests(self, config_resources, validate_only=False): broker_resources, other_resources = self._group_config_resources(config_resources, key_only=False) responses = [] for broker_id, resources in broker_resources.items(): request = AlterConfigsRequest( resources=resources, validate_only=validate_only) - responses.append(await self._manager.send(request, node_id=broker_id)) + response = await self._manager.send(request, node_id=broker_id) + responses.extend(response.responses) if other_resources: request = AlterConfigsRequest( resources=other_resources, validate_only=validate_only) - responses.append(await self._manager.send(request)) - return responses + response = await self._manager.send(request) + responses.extend(response.responses) + ret = defaultdict(dict) + for response in responses: + if response.error_code == 0: + result = 'OK' + else: + result = str(Errors.for_code(response.error_code)(response.error_message)) + result_type = ConfigResourceType(response.resource_type).name.lower() + ret[result_type][response.resource_name] = result + return dict(ret) - def alter_configs(self, config_resources, validate_only=False): + def alter_configs(self, config_resources, validate_only=False, raise_on_unknown=True): """Alter configuration parameters of one or more Kafka resources. Arguments: config_resources: A list of ConfigResource objects. + validate_only (bool, optional): If True, changes are sent to broker for + validation only. Changes will not be applied. Default: False + raise_on_unknown (bool, optional): If True, raises ValueError if any + config key is not recognized as a dynamic config for the resource. Returns: - Appropriate version of AlterConfigsResponse class. + dict of {resource_type (str): {resource_name (str): Error/Result}} """ - return self._manager.run(self._async_alter_configs, config_resources, validate_only) + return self._manager.run(self._async_alter_configs, config_resources, validate_only, raise_on_unknown) + + async def _async_reset_configs(self, config_resources, validate_only=False, raise_on_unknown=True): + if raise_on_unknown: + await self._validate_dynamic_configs(config_resources) + # if no keys provided, submit as-is -- full reset + # if keys are provided, replace with missing -- partial reset + partial_resets = [resource for resource in config_resources if resource.configs] + missing_resource_configs = await self._get_missing_dynamic_configs(partial_resets) + for resource, missing in zip(partial_resets, missing_resource_configs): + resource.configs = missing + return await self._send_alter_configs_requests(config_resources, validate_only=validate_only) + + def reset_configs(self, config_resources, validate_only=False, raise_on_unknown=True): + """Reset configuration parameters of one or more Kafka resources. + + Arguments: + config_resources: A list of ConfigResource objects. + + Returns: + dict of {resource_type (str): {resource_name (str): Error/Result}} + """ + return self._manager.run(self._async_reset_configs, config_resources, validate_only, raise_on_unknown) + + +class ConfigFilterType(IntEnum): + ALL = 0 + DYNAMIC = 1 + MODIFIED = 2 + DEFAULT = 3 + STATIC = 4 + + def should_skip(self, config_source): + if self is ConfigFilterType.MODIFIED: + return not config_source.is_modified() + elif self is ConfigFilterType.DEFAULT: + return config_source.is_modified() + elif self is ConfigFilterType.STATIC: + return config_source is not ConfigSourceType.STATIC_BROKER_CONFIG + return False class ConfigResourceType(IntEnum): @@ -177,11 +295,31 @@ class ConfigType(IntEnum): class ConfigSourceType(IntEnum): UNKNOWN = 0 - TOPIC_CONFIG = 1 + DYNAMIC_TOPIC_CONFIG = 1 DYNAMIC_BROKER_CONFIG = 2 DYNAMIC_DEFAULT_BROKER_CONFIG = 3 STATIC_BROKER_CONFIG = 4 DEFAULT_CONFIG = 5 DYNAMIC_BROKER_LOGGER_CONFIG = 6 - CLIENT_METRICS_CONFIG = 7 - GROUP_CONFIG = 8 + DYNAMIC_CLIENT_METRICS_CONFIG = 7 + DYNAMIC_GROUP_CONFIG = 8 + + def is_modified(self): + return self.value not in (3, 4, 5) + + @classmethod + def dynamic_for_resource_type(cls, resource_type): + if resource_type is ConfigResourceType.UNKNOWN: + return ConfigSourceType.UNKNOWN + elif resource_type is ConfigResourceType.TOPIC: + return ConfigSourceType.DYNAMIC_TOPIC_CONFIG + elif resource_type is ConfigResourceType.BROKER: + return ConfigSourceType.DYNAMIC_BROKER_CONFIG + elif resource_type is ConfigResourceType.BROKER_LOGGER: + return ConfigSourceType.DYNAMIC_BROKER_LOGGER_CONFIG + elif resource_type is ConfigResourceType.CLIENT_METRICS: + return ConfigSourceType.DYNAMIC_CLIENT_METRICS_CONFIG + elif resource_type is ConfigResourceType.GROUP: + return ConfigSourceType.DYNAMIC_GROUP_CONFIG + else: + raise RuntimeError(f'Unrecognized resource type {resource_type}') diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 5748e33a0..3ee8d42fc 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -113,6 +113,7 @@ def run_cli(args=None): # --trace ? # [configs] + # ListConfigResources # IncrementalAlterConfigs (not supported yet) # [client-quotas] diff --git a/kafka/cli/admin/configs/__init__.py b/kafka/cli/admin/configs/__init__.py index 4eab39f1c..3c2e182f1 100644 --- a/kafka/cli/admin/configs/__init__.py +++ b/kafka/cli/admin/configs/__init__.py @@ -2,6 +2,7 @@ from .alter import AlterConfigs from .describe import DescribeConfigs +from .reset import ResetConfigs class ConfigsSubCommand: @@ -10,6 +11,6 @@ class ConfigsSubCommand: def add_subparser(cls, subparsers): parser = subparsers.add_parser('configs', help='Manage Kafka Configuration') commands = parser.add_subparsers() - for cmd in [DescribeConfigs, AlterConfigs]: + for cmd in [DescribeConfigs, AlterConfigs, ResetConfigs]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/configs/alter.py b/kafka/cli/admin/configs/alter.py index 7fe15601a..db8d9df40 100644 --- a/kafka/cli/admin/configs/alter.py +++ b/kafka/cli/admin/configs/alter.py @@ -10,7 +10,9 @@ def add_subparser(cls, subparsers): parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[]) parser.add_argument('--broker-logger', type=str, action='append', dest='broker_loggers', default=[]) parser.add_argument('-g', '--group', type=str, action='append', dest='groups', default=[]) - parser.add_argument('-c', '--config', type=str, action='append', dest='configs', default=None, help='key=value to alter') + parser.add_argument('-c', '--config', type=str, action='append', dest='configs', required=True, help='key=value to alter') + parser.add_argument('-v', '--validate-only', action='store_true', default=False) + parser.add_argument('--allow-unknown', action='store_false', dest='raise_on_unknown', default=True) parser.set_defaults(command=cls.command) @classmethod @@ -18,11 +20,21 @@ def command(cls, client, args): configs = dict(config.split('=') for config in args.configs) resources = [] for topic in args.topics: + if resources: + raise ValueError('Only one resource type per request') resources.append(ConfigResource('TOPIC', topic, configs)) for broker in args.brokers: + if resources: + raise ValueError('Only one resource type per request') resources.append(ConfigResource('BROKER', broker, configs)) for broker in args.broker_loggers: + if resources: + raise ValueError('Only one resource type per request') resources.append(ConfigResource('BROKER_LOGGER', broker, configs)) for group in args.groups: + if resources: + raise ValueError('Only one resource type per request') resources.append(ConfigResource('GROUP', group, configs)) - return client.alter_configs(resources) + return client.alter_configs(resources, + validate_only=args.validate_only, + raise_on_unknown=args.raise_on_unknown) diff --git a/kafka/cli/admin/configs/describe.py b/kafka/cli/admin/configs/describe.py index af4fc172a..bbd970732 100644 --- a/kafka/cli/admin/configs/describe.py +++ b/kafka/cli/admin/configs/describe.py @@ -11,6 +11,10 @@ def add_subparser(cls, subparsers): parser.add_argument('--broker-logger', type=str, action='append', dest='broker_loggers', default=[]) parser.add_argument('-g', '--group', type=str, action='append', dest='groups', default=[]) parser.add_argument('-k', '--key', type=str, action='append', dest='keys', default=None) + parser.add_argument('--dynamic', action='store_true', default=False) + parser.add_argument('--modified', action='store_true', default=False) + parser.add_argument('--static', action='store_true', default=False) + parser.add_argument('--default', action='store_true', default=False) parser.set_defaults(command=cls.command) @classmethod @@ -25,4 +29,14 @@ def command(cls, client, args): for group in args.groups: resources.append(ConfigResource('GROUP', group, args.keys)) - return client.describe_configs(resources) + if args.modified: + config_filter = 'modified' + elif args.dynamic: + config_filter = 'dynamic' + elif args.static: + config_filter = 'static' + elif args.default: + config_filter = 'default' + else: + config_filter = 'all' + return client.describe_configs(resources, config_filter=config_filter) diff --git a/kafka/cli/admin/configs/reset.py b/kafka/cli/admin/configs/reset.py new file mode 100644 index 000000000..0885cf49a --- /dev/null +++ b/kafka/cli/admin/configs/reset.py @@ -0,0 +1,31 @@ +from kafka.admin import ConfigResource + + +class ResetConfigs: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('reset', help='Reset Kafka Configs') + parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[]) + parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[]) + parser.add_argument('--broker-logger', type=str, action='append', dest='broker_loggers', default=[]) + parser.add_argument('-g', '--group', type=str, action='append', dest='groups', default=[]) + parser.add_argument('-c', '--config', type=str, action='append', dest='configs', default=[], help='key=value to reset') + parser.add_argument('-v', '--validate-only', action='store_true', default=False) + parser.add_argument('--allow-unknown', action='store_false', dest='raise_on_unknown', default=True) + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + resources = [] + for topic in args.topics: + resources.append(ConfigResource('TOPIC', topic, args.configs)) + for broker in args.brokers: + resources.append(ConfigResource('BROKER', broker, args.configs)) + for broker in args.broker_loggers: + resources.append(ConfigResource('BROKER_LOGGER', broker, args.configs)) + for group in args.groups: + resources.append(ConfigResource('GROUP', group, args.configs)) + return client.reset_configs(resources, + validate_only=args.validate_only, + raise_on_unknown=args.raise_on_unknown) diff --git a/test/admin/test_admin_configs.py b/test/admin/test_admin_configs.py index cc661e46b..e2d85fcc7 100644 --- a/test/admin/test_admin_configs.py +++ b/test/admin/test_admin_configs.py @@ -1,6 +1,20 @@ import pytest -from kafka.admin import ConfigResource, ConfigResourceType +from kafka.admin import ConfigResource, ConfigResourceType, KafkaAdminClient +from kafka.protocol.admin import ( + AlterConfigsRequest, AlterConfigsResponse, + DescribeConfigsRequest, DescribeConfigsResponse, +) + +from test.mock_broker import MockBroker + + +# ConfigResourceType values (wire) +_TOPIC = ConfigResourceType.TOPIC.value + +# ConfigSourceType values (wire) +_SRC_DYNAMIC_TOPIC = 1 +_SRC_DEFAULT = 5 def test_config_resource(): @@ -14,3 +28,265 @@ def test_config_resource(): assert good_resource.resource_type == ConfigResourceType.TOPIC assert good_resource.name == 'baz' assert good_resource.configs == {'frob': 'nob'} + + +# --------------------------------------------------------------------------- +# MockBroker helpers +# --------------------------------------------------------------------------- + + +@pytest.fixture +def mock_broker(): + return MockBroker() + +@pytest.fixture +def admin(mock_broker): + admin = KafkaAdminClient( + kafka_client=mock_broker.client_factory(), + bootstrap_servers='%s:%d' % (mock_broker.host, mock_broker.port), + api_version=mock_broker.broker_version, + request_timeout_ms=5000, + ) + try: + yield admin + finally: + admin.close() + + +def _describe_configs_response(resource_type, resource_name, configs): + """configs: iterable of (name, value, source, read_only).""" + Result = DescribeConfigsResponse.DescribeConfigsResult + Config = Result.DescribeConfigsResourceResult + return DescribeConfigsResponse( + throttle_time_ms=0, + results=[ + Result( + error_code=0, + error_message=None, + resource_type=resource_type, + resource_name=resource_name, + configs=[ + Config( + name=name, + value=value, + read_only=read_only, + config_source=source, + is_default=(source == _SRC_DEFAULT), + is_sensitive=False, + synonyms=[], + config_type=2, # STRING + documentation='', + ) for name, value, source, read_only in configs + ], + ), + ], + ) + + +def _alter_configs_response(resource_type, resource_name, error_code=0, error_message=None): + Response = AlterConfigsResponse.AlterConfigsResourceResponse + return AlterConfigsResponse( + throttle_time_ms=0, + responses=[ + Response( + error_code=error_code, + error_message=error_message, + resource_type=resource_type, + resource_name=resource_name, + ), + ], + ) + + +def _capture_alter(captured): + """Return a respond_fn that records the decoded AlterConfigsRequest.""" + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = AlterConfigsRequest.decode( + request_bytes, version=api_version, header=True) + captured['version'] = api_version + return _alter_configs_response(_TOPIC, 'topic-a') + return handler + + +def _sent_configs(captured): + """Return the (name, value) pairs sent in the captured AlterConfigsRequest.""" + assert len(captured['request'].resources) == 1 + resource = captured['request'].resources[0] + return {c.name: c.value for c in resource.configs} + + +# --------------------------------------------------------------------------- +# alter_configs +# --------------------------------------------------------------------------- + + +class TestAlterConfigsMockBroker: + def test_fills_in_other_modified_keys(self, mock_broker, admin): + """User asks to set foo; bar is already modified; both end up on the wire.""" + # validation describe (dynamic filter) + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'old', _SRC_DYNAMIC_TOPIC, False), + ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), + ('baz', None, _SRC_DEFAULT, False), + ])) + # add_missing describe (modified filter) — same wire response, Python filters + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'old', _SRC_DYNAMIC_TOPIC, False), + ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), + ('baz', None, _SRC_DEFAULT, False), + ])) + captured = {} + mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + + admin.alter_configs([ConfigResource('TOPIC', 'topic-a', {'foo': 'new'})]) + + sent = _sent_configs(captured) + assert sent == {'foo': 'new', 'bar': 'barval'} + + def test_user_value_wins_over_describe(self, mock_broker, admin): + for _ in range(2): # validation + add_missing + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'brokerval', _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + + admin.alter_configs([ConfigResource('TOPIC', 'topic-a', {'foo': 'userval'})]) + + assert _sent_configs(captured) == {'foo': 'userval'} + + def test_none_value_from_describe_is_skipped(self, mock_broker, admin): + for _ in range(2): + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'userval-placeholder', _SRC_DYNAMIC_TOPIC, False), + ('bar', None, _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + + admin.alter_configs([ConfigResource('TOPIC', 'topic-a', {'foo': 'userval'})]) + + sent = _sent_configs(captured) + assert sent == {'foo': 'userval'} + assert 'bar' not in sent + + def test_raise_on_unknown_true_raises(self, mock_broker, admin): + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'val', _SRC_DYNAMIC_TOPIC, False), + ])) + + with pytest.raises(ValueError, match='Unrecognized configs'): + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', {'mystery': 'x'})]) + + def test_raise_on_unknown_false_submits_anyway(self, mock_broker, admin): + # only add_missing describe (validation is skipped) + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'val', _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', {'mystery': 'x'})], + raise_on_unknown=False) + + sent = _sent_configs(captured) + assert sent['mystery'] == 'x' + assert sent['foo'] == 'val' + + def test_validate_only_propagates(self, mock_broker, admin): + for _ in range(2): + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'val', _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', {'foo': 'new'})], + validate_only=True) + + assert captured['request'].validate_only is True + + +# --------------------------------------------------------------------------- +# reset_configs +# --------------------------------------------------------------------------- + + +class TestResetConfigsMockBroker: + def test_full_reset_sends_empty_configs(self, mock_broker, admin): + """Resource with no configs => submit empty configs list (full reset).""" + captured = {} + mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + + # configs=[] (empty iterable) opts out of validation + get_missing describe + admin.reset_configs( + [ConfigResource('TOPIC', 'topic-a', [])], + raise_on_unknown=False) + + assert captured['request'].resources[0].configs == [] + + def test_partial_reset_excludes_user_keys_keeps_others(self, mock_broker, admin): + """reset_configs({foo}) => submit all OTHER modified keys so only foo resets.""" + # validation describe (dynamic filter) + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'fooval', _SRC_DYNAMIC_TOPIC, False), + ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), + ('baz', 'bazval', _SRC_DYNAMIC_TOPIC, False), + ])) + # get_missing describe (modified filter) + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'fooval', _SRC_DYNAMIC_TOPIC, False), + ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), + ('baz', 'bazval', _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + + admin.reset_configs([ConfigResource('TOPIC', 'topic-a', ['foo'])]) + + sent = _sent_configs(captured) + assert 'foo' not in sent + assert sent == {'bar': 'barval', 'baz': 'bazval'} + + def test_reset_all_modified_keys_sends_empty(self, mock_broker, admin): + """If user resets every currently-modified key, the wire body is empty.""" + for _ in range(2): + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'fooval', _SRC_DYNAMIC_TOPIC, False), + ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + + admin.reset_configs([ConfigResource('TOPIC', 'topic-a', ['foo', 'bar'])]) + + assert _sent_configs(captured) == {} + + def test_validate_only_propagates(self, mock_broker, admin): + for _ in range(2): + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'fooval', _SRC_DYNAMIC_TOPIC, False), + ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + + admin.reset_configs( + [ConfigResource('TOPIC', 'topic-a', ['foo'])], + validate_only=True) + + assert captured['request'].validate_only is True diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 742208092..d88b454cf 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -102,7 +102,7 @@ def test_describe_configs_broker_resource_returns_configs(kafka_admin_client): """Tests that describe config returns configs for broker """ broker_id = kafka_admin_client._client.least_loaded_node() - configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) + configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)], config_filter='all') assert len(configs) == 1 assert len(configs['broker']) == 1 @@ -114,7 +114,7 @@ def test_describe_configs_broker_resource_returns_configs(kafka_admin_client): assert configs['broker'][str(broker_id)]['advertised.listeners']['config_type'] in ('LIST', 'STRING') if env_kafka_version() >= (4, 0): assert configs['broker'][str(broker_id)]['advertised.listeners']['read_only'] is True - elif env_kafka_version() >= (1, 0): + elif env_kafka_version() >= (1, 1): assert configs['broker'][str(broker_id)]['advertised.listeners']['read_only'] is False assert configs['broker'][str(broker_id)]['advertised.listeners']['is_sensitive'] is False if env_kafka_version() >= (1, 1): @@ -126,7 +126,7 @@ def test_describe_configs_broker_resource_returns_configs(kafka_admin_client): def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_client): """Tests that describe config returns configs for topic """ - configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)]) + configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)], config_filter='all') assert len(configs) == 1 assert len(configs['topic']) == 1 @@ -146,9 +146,11 @@ def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_cli """Tests that describe config returns configs for mixed resource types (topic + broker) """ broker_id = kafka_admin_client._client.least_loaded_node() - configs = kafka_admin_client.describe_configs([ - ConfigResource(ConfigResourceType.TOPIC, topic), - ConfigResource(ConfigResourceType.BROKER, broker_id)]) + configs = kafka_admin_client.describe_configs( + [ConfigResource(ConfigResourceType.TOPIC, topic), + ConfigResource(ConfigResourceType.BROKER, broker_id)], + config_filter='all', + ) assert len(configs) == 2 assert topic in configs['topic'] @@ -164,7 +166,7 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): broker_id = "str" with pytest.raises(ValueError): - kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) + kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)], config_filter='all') @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')