From 13a6d77cfcffdb5faf245dea5c9c966817447fa4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 21 Apr 2026 15:02:50 -0700 Subject: [PATCH 1/6] IncrementalAlterConfigs --- kafka/protocol/admin/configs.py | 4 + kafka/protocol/admin/configs.pyi | 111 +++++++++++++++++- .../IncrementalAlterConfigsRequest.json | 44 +++++++ .../IncrementalAlterConfigsResponse.json | 38 ++++++ 4 files changed, 196 insertions(+), 1 deletion(-) create mode 100644 kafka/protocol/schemas/resources/IncrementalAlterConfigsRequest.json create mode 100644 kafka/protocol/schemas/resources/IncrementalAlterConfigsResponse.json diff --git a/kafka/protocol/admin/configs.py b/kafka/protocol/admin/configs.py index 7bbaead1d..5969eb202 100644 --- a/kafka/protocol/admin/configs.py +++ b/kafka/protocol/admin/configs.py @@ -7,6 +7,9 @@ class AlterConfigsResponse(ApiMessage): pass class DescribeConfigsRequest(ApiMessage): pass class DescribeConfigsResponse(ApiMessage): pass +class IncrementalAlterConfigsRequest(ApiMessage): pass +class IncrementalAlterConfigsResponse(ApiMessage): pass + class ListConfigResourcesRequest(ApiMessage): pass class ListConfigResourcesResponse(ApiMessage): pass @@ -14,5 +17,6 @@ class ListConfigResourcesResponse(ApiMessage): pass __all__ = [ 'AlterConfigsRequest', 'AlterConfigsResponse', 'DescribeConfigsRequest', 'DescribeConfigsResponse', + 'IncrementalAlterConfigsRequest', 'IncrementalAlterConfigsResponse', 'ListConfigResourcesRequest', 'ListConfigResourcesResponse', ] diff --git a/kafka/protocol/admin/configs.pyi b/kafka/protocol/admin/configs.pyi index 3da555eef..f4b27edc6 100644 --- a/kafka/protocol/admin/configs.pyi +++ b/kafka/protocol/admin/configs.pyi @@ -5,7 +5,7 @@ from typing import Any, Self from kafka.protocol.api_message import ApiMessage from kafka.protocol.data_container import DataContainer -__all__ = ['AlterConfigsRequest', 'AlterConfigsResponse', 'DescribeConfigsRequest', 'DescribeConfigsResponse', 'ListConfigResourcesRequest', 'ListConfigResourcesResponse'] +__all__ = ['AlterConfigsRequest', 'AlterConfigsResponse', 'DescribeConfigsRequest', 'DescribeConfigsResponse', 'IncrementalAlterConfigsRequest', 'IncrementalAlterConfigsResponse', 'ListConfigResourcesRequest', 'ListConfigResourcesResponse'] class AlterConfigsRequest(ApiMessage): class AlterConfigsResource(DataContainer): @@ -256,6 +256,115 @@ class DescribeConfigsResponse(ApiMessage): def expect_response(self) -> bool: ... def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... +class IncrementalAlterConfigsRequest(ApiMessage): + class AlterConfigsResource(DataContainer): + class AlterableConfig(DataContainer): + name: str + config_operation: int + value: str | None + def __init__( + self, + *args: Any, + name: str = ..., + config_operation: int = ..., + value: str | None = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + resource_type: int + resource_name: str + configs: list[AlterableConfig] + def __init__( + self, + *args: Any, + resource_type: int = ..., + resource_name: str = ..., + configs: list[AlterableConfig] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + resources: list[AlterConfigsResource] + validate_only: bool + def __init__( + self, + *args: Any, + resources: list[AlterConfigsResource] = ..., + validate_only: bool = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + +class IncrementalAlterConfigsResponse(ApiMessage): + class AlterConfigsResourceResponse(DataContainer): + error_code: int + error_message: str | None + resource_type: int + resource_name: str + def __init__( + self, + *args: Any, + error_code: int = ..., + error_message: str | None = ..., + resource_type: int = ..., + resource_name: str = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + + throttle_time_ms: int + responses: list[AlterConfigsResourceResponse] + def __init__( + self, + *args: Any, + throttle_time_ms: int = ..., + responses: list[AlterConfigsResourceResponse] = ..., + version: int | None = None, + **kwargs: Any, + ) -> None: ... + @property + def version(self) -> int | None: ... + def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... + name: str + type: str + API_KEY: int + API_VERSION: int + valid_versions: tuple[int, int] + min_version: int + max_version: int + @property + def header(self) -> Any: ... + @classmethod + def is_request(cls) -> bool: ... + def expect_response(self) -> bool: ... + def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... + class ListConfigResourcesRequest(ApiMessage): resource_types: list[int] def __init__( diff --git a/kafka/protocol/schemas/resources/IncrementalAlterConfigsRequest.json b/kafka/protocol/schemas/resources/IncrementalAlterConfigsRequest.json new file mode 100644 index 000000000..d908c2801 --- /dev/null +++ b/kafka/protocol/schemas/resources/IncrementalAlterConfigsRequest.json @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 44, + "type": "request", + "listeners": ["broker", "controller"], + "name": "IncrementalAlterConfigsRequest", + // Version 1 is the first flexible version. + "validVersions": "0-1", + "flexibleVersions": "1+", + "fields": [ + { "name": "Resources", "type": "[]AlterConfigsResource", "versions": "0+", + "about": "The incremental updates for each resource.", "fields": [ + { "name": "ResourceType", "type": "int8", "versions": "0+", "mapKey": true, + "about": "The resource type." }, + { "name": "ResourceName", "type": "string", "versions": "0+", "mapKey": true, + "about": "The resource name." }, + { "name": "Configs", "type": "[]AlterableConfig", "versions": "0+", + "about": "The configurations.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, + "about": "The configuration key name." }, + { "name": "ConfigOperation", "type": "int8", "versions": "0+", "mapKey": true, + "about": "The type (Set, Delete, Append, Subtract) of operation." }, + { "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The value to set for the configuration key."} + ]} + ]}, + { "name": "ValidateOnly", "type": "bool", "versions": "0+", + "about": "True if we should validate the request, but not change the configurations."} + ] +} diff --git a/kafka/protocol/schemas/resources/IncrementalAlterConfigsResponse.json b/kafka/protocol/schemas/resources/IncrementalAlterConfigsResponse.json new file mode 100644 index 000000000..d4dad294f --- /dev/null +++ b/kafka/protocol/schemas/resources/IncrementalAlterConfigsResponse.json @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 44, + "type": "response", + "name": "IncrementalAlterConfigsResponse", + // Version 1 is the first flexible version. + "validVersions": "0-1", + "flexibleVersions": "1+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Responses", "type": "[]AlterConfigsResourceResponse", "versions": "0+", + "about": "The responses for each resource.", "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The resource error code." }, + { "name": "ErrorMessage", "type": "string", "nullableVersions": "0+", "versions": "0+", + "about": "The resource error message, or null if there was no error." }, + { "name": "ResourceType", "type": "int8", "versions": "0+", + "about": "The resource type." }, + { "name": "ResourceName", "type": "string", "versions": "0+", + "about": "The resource name." } + ]} + ] +} From db101f91c640ecd20425f8fd021579e9983e3ec5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 09:19:09 -0700 Subject: [PATCH 2/6] Admin: support incremental alter configs --- kafka/admin/__init__.py | 9 +- kafka/admin/_configs.py | 245 +++++++++++++++++++-------- test/admin/test_admin_configs.py | 282 +++++++++++++++++++++++++++++-- 3 files changed, 453 insertions(+), 83 deletions(-) diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index 2e55f0881..425e2f3c9 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -1,8 +1,9 @@ from kafka.admin.client import KafkaAdminClient -from kafka.admin._acls import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation, - ResourceType, ACLPermissionType, ACLResourcePatternType) +from kafka.admin._acls import ( + ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation, + ResourceType, ACLPermissionType, ACLResourcePatternType) from kafka.admin._configs import ( - ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType) + AlterConfigOp, ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType) from kafka.admin._groups import MemberToRemove from kafka.admin._partitions import NewPartitions, OffsetSpec from kafka.admin._topics import NewTopic @@ -13,7 +14,7 @@ 'KafkaAdminClient', 'ACL', 'ACLFilter', 'ACLOperation', 'ACLPermissionType', 'ACLResourcePatternType', 'ResourceType', 'ResourcePattern', 'ResourcePatternFilter', - 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType', + 'AlterConfigOp', 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType', 'MemberToRemove', 'OffsetSpec', # NewTopic + NewPartitions are deprecated and not included in __all__ 'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion', ] diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index d4e22a40b..8491b6daf 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -14,6 +14,7 @@ from kafka.protocol.admin import ( AlterConfigsRequest, DescribeConfigsRequest, + IncrementalAlterConfigsRequest, ListConfigResourcesRequest, ) @@ -28,18 +29,57 @@ class ConfigAdminMixin: _manager: KafkaConnectionManager config: dict + def _check_incremental_alter_configs_support(self): + # Broker Version >= (2, 3) has incremental alter configs + try: + self._manager.broker_version_data.api_version(IncrementalAlterConfigsRequest) + return True + except Errors.IncompatibleBrokerVersion: + return False + @staticmethod - def _convert_config_resource(config_resource, key_only=True): - if key_only: - values = list(config_resource.configs.keys()) if isinstance(config_resource.configs, dict) else config_resource.configs - elif not config_resource.configs: - values = [] - else: - assert isinstance(config_resource.configs, dict) - values = list(config_resource.configs.items()) - return (config_resource.resource_type, config_resource.name, values) + def _incremental_configs_entries(configs): + if not configs: + return [] + if not isinstance(configs, dict): + raise TypeError('alter_configs requires configs as a dict of ' + '{key: (op, value)} or {key: value} (interpreted as SET)') + entries = [] + for name, op_value in configs.items(): + if isinstance(op_value, tuple): + op, value = op_value + else: + op, value = AlterConfigOp.SET, op_value + op_code = AlterConfigOp.value_for(op) + if op_code == AlterConfigOp.DELETE.value: + value = None + entries.append((name, op_code, value)) + return entries + + @staticmethod + def _describe_configs_entries(configs): + return list(configs.keys()) if isinstance(configs, dict) else configs + + @staticmethod + def _alter_configs_entries(configs): + if not configs: + return [] + elif not isinstance(configs, dict): + raise TypeError(f'configs should be a dict of {{key: value}}, found {type(configs)}') + entries = [] + for name, op_value in configs.items(): + if isinstance(op_value, tuple): + op, value = op_value + else: + op, value = AlterConfigOp.SET, op_value + op_code = AlterConfigOp.value_for(op) + if op_code != AlterConfigOp.SET.value: + raise ValueError(f'Non-incremental AlterConfigsRequest does not support operation {op} (SET only)') + entries.append((name, value)) + return entries - def _group_config_resources(self, config_resources, key_only=True): + @staticmethod + def _group_config_resources(config_resources): broker_resources = defaultdict(list) other_resources = [] for config_resource in config_resources: @@ -48,33 +88,27 @@ def _group_config_resources(self, config_resources, key_only=True): broker_id = int(config_resource.name) except ValueError: raise ValueError("Broker resource names must be an integer or a string represented integer") - broker_resources[broker_id].append(self._convert_config_resource(config_resource, key_only=key_only)) + broker_resources[broker_id].append(config_resource) else: - other_resources.append(self._convert_config_resource(config_resource, key_only=key_only)) + other_resources.append(config_resource) return broker_resources, other_resources - async def _async_describe_configs(self, config_resources, include_synonyms=False, config_filter='modified', flat=False): + @classmethod + def _describe_configs_request(cls, config_resources, include_synonyms=False): + min_version = 1 if include_synonyms else 0 + return DescribeConfigsRequest( + resources=[(cr.resource_type, cr.name, cls._describe_configs_entries(cr.configs)) + for cr in config_resources], + include_synonyms=include_synonyms, + min_version=min_version) + + @staticmethod + def _describe_configs_process_responses(responses, config_filter='modified'): if isinstance(config_filter, str): try: config_filter = ConfigFilterType[config_filter.upper()] except KeyError: raise ValueError(f'{config_filter} is not a valid ConfigFilterType') - min_version = 1 if include_synonyms else 0 - broker_resources, other_resources = self._group_config_resources(config_resources, key_only=True) - responses = [] - for broker_id, resources in broker_resources.items(): - request = DescribeConfigsRequest( - resources=resources, - include_synonyms=include_synonyms, - min_version=min_version) - responses.append(await self._manager.send(request, node_id=broker_id)) - if other_resources: - request = DescribeConfigsRequest( - resources=other_resources, - include_synonyms=include_synonyms, - min_version=min_version) - responses.append(await self._manager.send(request)) - ret = defaultdict(dict) for response in responses: for result in response.results: @@ -103,11 +137,23 @@ async def _async_describe_configs(self, config_resources, include_synonyms=False config['config_type'] = ConfigType(config['config_type']).name resource_configs[name] = config ret[resource_type.name.lower()][result.resource_name] = resource_configs + return dict(ret) + + async def _async_describe_configs(self, config_resources, include_synonyms=False, config_filter='modified', flat=False): + broker_resources, other_resources = self._group_config_resources(config_resources) + responses = [] + for broker_id, resources in broker_resources.items(): + request = self._describe_configs_request(resources, include_synonyms) + responses.append(await self._manager.send(request, node_id=broker_id)) + if other_resources: + request = self._describe_configs_request(other_resources, include_synonyms) + responses.append(await self._manager.send(request)) + ret = self._describe_configs_process_responses(responses, config_filter) if flat: return [ret[resource.resource_type.name.lower()][resource.name] for resource in config_resources] else: - return dict(ret) + return ret def describe_configs(self, config_resources, include_synonyms=False, config_filter='modified'): """Fetch configuration parameters for one or more Kafka resources. @@ -205,29 +251,21 @@ async def _validate_dynamic_configs(self, config_resources): if unknown: raise ValueError(f'Unrecognized configs: {unknown}') - async def _async_alter_configs(self, config_resources, validate_only=False, raise_on_unknown=True): - # Broker Version < (2, 3): use alter configs - # Broker Version >= (2, 3): use incremental alter configs - if raise_on_unknown: - await self._validate_dynamic_configs(config_resources) - await self._add_missing_dynamic_configs(config_resources) - return await self._send_alter_configs_requests(config_resources, validate_only=validate_only) - - async def _send_alter_configs_requests(self, config_resources, validate_only=False): - broker_resources, other_resources = self._group_config_resources(config_resources, key_only=False) - responses = [] - for broker_id, resources in broker_resources.items(): - request = AlterConfigsRequest( - resources=resources, + @classmethod + def _alter_configs_request(cls, config_resources, validate_only=False, incremental=False): + if incremental: + return IncrementalAlterConfigsRequest( + resources=[(cr.resource_type, cr.name, cls._incremental_configs_entries(cr.configs)) + for cr in config_resources], validate_only=validate_only) - response = await self._manager.send(request, node_id=broker_id) - responses.extend(response.responses) - if other_resources: - request = AlterConfigsRequest( - resources=other_resources, + else: + return AlterConfigsRequest( + resources=[(cr.resource_type, cr.name, cls._alter_configs_entries(cr.configs)) + for cr in config_resources], validate_only=validate_only) - response = await self._manager.send(request) - responses.extend(response.responses) + + @staticmethod + def _alter_configs_process_responses(responses): ret = defaultdict(dict) for response in responses: if response.error_code == 0: @@ -238,42 +276,115 @@ async def _send_alter_configs_requests(self, config_resources, validate_only=Fal ret[result_type][response.resource_name] = result return dict(ret) - def alter_configs(self, config_resources, validate_only=False, raise_on_unknown=True): + async def _send_alter_configs_requests(self, config_resources, validate_only=False, incremental=False): + broker_resources, other_resources = self._group_config_resources(config_resources) + responses = [] + for broker_id, resources in broker_resources.items(): + request = self._alter_configs_request(resources, validate_only, incremental) + response = await self._manager.send(request, node_id=broker_id) + responses.extend(response.responses) + if other_resources: + request = self._alter_configs_request(other_resources, validate_only, incremental) + response = await self._manager.send(request) + responses.extend(response.responses) + return self._alter_configs_process_responses(responses) + + async def _async_alter_configs(self, config_resources, validate_only=False, raise_on_unknown=True, incremental=None): + if raise_on_unknown: + await self._validate_dynamic_configs(config_resources) + if incremental is None: + incremental = self._check_incremental_alter_configs_support() + if not incremental: + await self._add_missing_dynamic_configs(config_resources) + return await self._send_alter_configs_requests(config_resources, validate_only, incremental) + + def alter_configs(self, config_resources, validate_only=False, raise_on_unknown=True, incremental=None): """Alter configuration parameters of one or more Kafka resources. Arguments: - config_resources: A list of ConfigResource objects. + config_resources: A list of ConfigResource objects. Each resource's + ``configs`` must be a dict mapping config key to either + ``(op, value)`` (where ``op`` is an :class:`AlterConfigOp`, + its name, or its int value) or a bare value (interpreted as SET). + For DELETE operations the value is ignored and sent as null. + Note: if broker does not support IncrementalAlterConfigsRequest, + AlterConfigOp APPEND/SUBTRACT are only supported on 2.3+ brokers, + which support the IncrementalAlterConfigsRequest. For older brokers + the client will use AlterConfigsRequest, which requires submitting + all dynamic configs together (the client will fill in missing keys + as required, though be wary of the inherent race with this approach). validate_only (bool, optional): If True, changes are sent to broker for validation only. Changes will not be applied. Default: False raise_on_unknown (bool, optional): If True, raises ValueError if any config key is not recognized as a dynamic config for the resource. + incremental (bool, optional): Set to True/False to force use of + IncrementalAlterConfigs (True) or AlterConfigs (False). + By Default, the admin client will use IncrementalAlterConfigs + if supported by the broker, otherwise AlterConfigs. Returns: dict of {resource_type (str): {resource_name (str): Error/Result}} """ - return self._manager.run(self._async_alter_configs, config_resources, validate_only, raise_on_unknown) + return self._manager.run(self._async_alter_configs, config_resources, validate_only, raise_on_unknown, incremental) - async def _async_reset_configs(self, config_resources, validate_only=False, raise_on_unknown=True): + async def _async_reset_configs(self, config_resources, validate_only=False, raise_on_unknown=True, incremental=None): if raise_on_unknown: await self._validate_dynamic_configs(config_resources) - # if no keys provided, submit as-is -- full reset - # if keys are provided, replace with missing -- partial reset - partial_resets = [resource for resource in config_resources if resource.configs] - missing_resource_configs = await self._get_missing_dynamic_configs(partial_resets) - for resource, missing in zip(partial_resets, missing_resource_configs): - resource.configs = missing - return await self._send_alter_configs_requests(config_resources, validate_only=validate_only) - - def reset_configs(self, config_resources, validate_only=False, raise_on_unknown=True): - """Reset configuration parameters of one or more Kafka resources. + if incremental is None: + incremental = self._check_incremental_alter_configs_support() + + if not incremental: + # if no keys provided, submit as-is -- full reset + # if keys are provided, replace with missing -- partial reset + partial_resets = [resource for resource in config_resources if resource.configs] + missing_resource_configs = await self._get_missing_dynamic_configs(partial_resets) + for resource, missing in zip(partial_resets, missing_resource_configs): + resource.configs = missing + else: + config_resources = [ + ConfigResource(cr.resource_type, cr.name, + {key: (AlterConfigOp.DELETE, None) + for key in cr.configs}) + for cr in config_resources + ] + return await self._send_alter_configs_requests(config_resources, validate_only, incremental) + + def reset_configs(self, config_resources, validate_only=False, raise_on_unknown=True, incremental=None): + """Reset configuration parameters of one or more Kafka resources to defaults. + + On 2.3+ brokers, the client will submit an IncrementalAlterConfigsRequest + with op DELETE for each resource/key. On older brokers, the client will + use submit an AlterConfigsRequest and attempt to include all modified + dynamic config values for each resource except the keys marked for reset. + (AlterConfigsRequest will reset any missing config key to its default). Arguments: - config_resources: A list of ConfigResource objects. + config_resources: A list of ConfigResource objects. Each resource's + ``configs`` should be a list or dict of config keys to reset. + (if dict, the values are ignored). Returns: dict of {resource_type (str): {resource_name (str): Error/Result}} """ - return self._manager.run(self._async_reset_configs, config_resources, validate_only, raise_on_unknown) + return self._manager.run(self._async_reset_configs, config_resources, validate_only, raise_on_unknown, incremental) + + +class AlterConfigOp(IntEnum): + SET = 0 + DELETE = 1 + APPEND = 2 + SUBTRACT = 3 + + @staticmethod + def value_for(op): + if isinstance(op, AlterConfigOp): + return op.value + if isinstance(op, int): + return AlterConfigOp(op).value + try: + return AlterConfigOp[str(op).upper()].value + except KeyError: + raise ValueError(f'Unrecognized AlterConfigOp: {op}') class ConfigFilterType(IntEnum): diff --git a/test/admin/test_admin_configs.py b/test/admin/test_admin_configs.py index 9e2850a18..a52270dae 100644 --- a/test/admin/test_admin_configs.py +++ b/test/admin/test_admin_configs.py @@ -1,10 +1,12 @@ import pytest -from kafka.admin import ConfigResource, ConfigResourceType, KafkaAdminClient -from kafka.errors import ClusterAuthorizationFailedError +from kafka.admin import ( + AlterConfigOp, ConfigResource, ConfigResourceType, KafkaAdminClient) +from kafka.errors import ClusterAuthorizationFailedError, InvalidConfigurationError from kafka.protocol.admin import ( AlterConfigsRequest, AlterConfigsResponse, DescribeConfigsRequest, DescribeConfigsResponse, + IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse, ListConfigResourcesRequest, ListConfigResourcesResponse, ) @@ -142,7 +144,7 @@ def test_fills_in_other_modified_keys(self, mock_broker, admin): captured = {} mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) - admin.alter_configs([ConfigResource('TOPIC', 'topic-a', {'foo': 'new'})]) + admin.alter_configs([ConfigResource('TOPIC', 'topic-a', {'foo': 'new'})], incremental=False) sent = _sent_configs(captured) assert sent == {'foo': 'new', 'bar': 'barval'} @@ -156,7 +158,7 @@ def test_user_value_wins_over_describe(self, mock_broker, admin): captured = {} mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) - admin.alter_configs([ConfigResource('TOPIC', 'topic-a', {'foo': 'userval'})]) + admin.alter_configs([ConfigResource('TOPIC', 'topic-a', {'foo': 'userval'})], incremental=False) assert _sent_configs(captured) == {'foo': 'userval'} @@ -170,7 +172,7 @@ def test_none_value_from_describe_is_skipped(self, mock_broker, admin): captured = {} mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) - admin.alter_configs([ConfigResource('TOPIC', 'topic-a', {'foo': 'userval'})]) + admin.alter_configs([ConfigResource('TOPIC', 'topic-a', {'foo': 'userval'})], incremental=False) sent = _sent_configs(captured) assert sent == {'foo': 'userval'} @@ -184,7 +186,8 @@ def test_raise_on_unknown_true_raises(self, mock_broker, admin): with pytest.raises(ValueError, match='Unrecognized configs'): admin.alter_configs( - [ConfigResource('TOPIC', 'topic-a', {'mystery': 'x'})]) + [ConfigResource('TOPIC', 'topic-a', {'mystery': 'x'})], + incremental=False) def test_raise_on_unknown_false_submits_anyway(self, mock_broker, admin): # only add_missing describe (validation is skipped) @@ -197,7 +200,8 @@ def test_raise_on_unknown_false_submits_anyway(self, mock_broker, admin): admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', {'mystery': 'x'})], - raise_on_unknown=False) + raise_on_unknown=False, + incremental=False) sent = _sent_configs(captured) assert sent['mystery'] == 'x' @@ -214,7 +218,8 @@ def test_validate_only_propagates(self, mock_broker, admin): admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', {'foo': 'new'})], - validate_only=True) + validate_only=True, + incremental=False) assert captured['request'].validate_only is True @@ -233,7 +238,8 @@ def test_full_reset_sends_empty_configs(self, mock_broker, admin): # configs=[] (empty iterable) opts out of validation + get_missing describe admin.reset_configs( [ConfigResource('TOPIC', 'topic-a', [])], - raise_on_unknown=False) + raise_on_unknown=False, + incremental=False) assert captured['request'].resources[0].configs == [] @@ -256,7 +262,7 @@ def test_partial_reset_excludes_user_keys_keeps_others(self, mock_broker, admin) captured = {} mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) - admin.reset_configs([ConfigResource('TOPIC', 'topic-a', ['foo'])]) + admin.reset_configs([ConfigResource('TOPIC', 'topic-a', ['foo'])], incremental=False) sent = _sent_configs(captured) assert 'foo' not in sent @@ -273,7 +279,7 @@ def test_reset_all_modified_keys_sends_empty(self, mock_broker, admin): captured = {} mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) - admin.reset_configs([ConfigResource('TOPIC', 'topic-a', ['foo', 'bar'])]) + admin.reset_configs([ConfigResource('TOPIC', 'topic-a', ['foo', 'bar'])], incremental=False) assert _sent_configs(captured) == {} @@ -289,7 +295,8 @@ def test_validate_only_propagates(self, mock_broker, admin): admin.reset_configs( [ConfigResource('TOPIC', 'topic-a', ['foo'])], - validate_only=True) + validate_only=True, + incremental=False) assert captured['request'].validate_only is True @@ -393,3 +400,254 @@ def test_empty_response_returns_empty_dict(self, mock_broker, admin): ListConfigResourcesRequest, _list_config_resources_response([])) assert admin.list_config_resources() == {} + + +# --------------------------------------------------------------------------- +# incremental_alter_configs +# --------------------------------------------------------------------------- + + +def _incremental_alter_configs_response(responses): + """responses: iterable of (resource_type, resource_name, error_code, error_message).""" + Response = IncrementalAlterConfigsResponse.AlterConfigsResourceResponse + return IncrementalAlterConfigsResponse( + throttle_time_ms=0, + responses=[ + Response( + error_code=error_code, + error_message=error_message, + resource_type=rtype, + resource_name=rname, + ) for rtype, rname, error_code, error_message in responses + ], + ) + + +def _capture_incremental_alter(captured, response=None): + def handler(api_key, api_version, correlation_id, request_bytes): + captured['request'] = IncrementalAlterConfigsRequest.decode( + request_bytes, version=api_version, header=True) + captured['version'] = api_version + return response if response is not None else _incremental_alter_configs_response([ + (_TOPIC, 'topic-a', 0, None), + ]) + return handler + + +def _sent_incremental(captured): + """Return a dict {name: (op, value)} from the captured IncrementalAlterConfigsRequest.""" + assert len(captured['request'].resources) == 1 + resource = captured['request'].resources[0] + return {c.name: (c.config_operation, c.value) for c in resource.configs} + + +class TestIncrementalAlterConfigsMockBroker: + + def test_set_op_sends_triples(self, mock_broker, admin): + # validation describe (dynamic filter) + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'old', _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + + result = admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', + {'foo': (AlterConfigOp.SET, 'new')})], + incremental=True) + + sent = _sent_incremental(captured) + assert sent == {'foo': (AlterConfigOp.SET.value, 'new')} + assert result == {'topic': {'topic-a': 'OK'}} + + def test_bare_value_is_treated_as_set(self, mock_broker, admin): + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'old', _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', {'foo': 'new'})], + incremental=True) + + assert _sent_incremental(captured) == {'foo': (AlterConfigOp.SET.value, 'new')} + + def test_delete_op_forces_null_value(self, mock_broker, admin): + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'old', _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', + {'foo': (AlterConfigOp.DELETE, 'ignored')})], + incremental=True) + + assert _sent_incremental(captured) == {'foo': (AlterConfigOp.DELETE.value, None)} + + def test_append_and_subtract_ops(self, mock_broker, admin): + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('follower.replication.throttled.replicas', '*', _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', { + 'follower.replication.throttled.replicas': + (AlterConfigOp.APPEND, '1:0'), + })], + incremental=True) + + assert _sent_incremental(captured) == { + 'follower.replication.throttled.replicas': + (AlterConfigOp.APPEND.value, '1:0'), + } + + captured = {} + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('follower.replication.throttled.replicas', '*', _SRC_DYNAMIC_TOPIC, False), + ])) + mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', { + 'follower.replication.throttled.replicas': + ('subtract', '1:0'), + })], + incremental=True) + + assert _sent_incremental(captured) == { + 'follower.replication.throttled.replicas': + (AlterConfigOp.SUBTRACT.value, '1:0'), + } + + def test_string_op_is_normalized(self, mock_broker, admin): + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'val', _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', {'foo': ('set', 'v')})], + incremental=True) + + assert _sent_incremental(captured) == {'foo': (AlterConfigOp.SET.value, 'v')} + + def test_unrecognized_op_raises(self, admin): + with pytest.raises(ValueError, match='Unrecognized AlterConfigOp'): + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', {'foo': ('bogus', 'v')})], + raise_on_unknown=False, + incremental=True) + + def test_raise_on_unknown_true_raises(self, mock_broker, admin): + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'val', _SRC_DYNAMIC_TOPIC, False), + ])) + + with pytest.raises(ValueError, match='Unrecognized configs'): + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', + {'mystery': (AlterConfigOp.SET, 'x')})], + incremental=True) + + def test_raise_on_unknown_false_skips_describe(self, mock_broker, admin): + captured = {} + mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', + {'mystery': (AlterConfigOp.SET, 'x')})], + raise_on_unknown=False, + incremental=True) + + assert _sent_incremental(captured) == { + 'mystery': (AlterConfigOp.SET.value, 'x'), + } + + def test_validate_only_propagates(self, mock_broker, admin): + captured = {} + mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', + {'foo': (AlterConfigOp.SET, 'v')})], + validate_only=True, + raise_on_unknown=False, + incremental=True) + + assert captured['request'].validate_only is True + + def test_does_not_fill_in_other_keys(self, mock_broker, admin): + """Unlike alter_configs, incremental should NOT send untouched keys.""" + mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + _TOPIC, 'topic-a', [ + ('foo', 'val', _SRC_DYNAMIC_TOPIC, False), + ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), + ])) + captured = {} + mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + + admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', + {'foo': (AlterConfigOp.SET, 'new')})], + incremental=True) + + assert _sent_incremental(captured) == { + 'foo': (AlterConfigOp.SET.value, 'new'), + } + + def test_broker_resource_routed_by_broker_id(self, mock_broker, admin): + captured = {} + mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter( + captured, + _incremental_alter_configs_response([ + (ConfigResourceType.BROKER.value, '0', 0, None), + ]), + )) + + result = admin.alter_configs( + [ConfigResource('BROKER', '0', + {'max.connections': (AlterConfigOp.SET, '100')})], + raise_on_unknown=False, + incremental=True) + + assert len(captured['request'].resources) == 1 + assert captured['request'].resources[0].resource_type == ConfigResourceType.BROKER.value + assert captured['request'].resources[0].resource_name == '0' + assert result == {'broker': {'0': 'OK'}} + + def test_non_integer_broker_name_raises(self, admin): + with pytest.raises(ValueError, match='Broker resource names must be an integer'): + admin.alter_configs( + [ConfigResource('BROKER', 'not-an-int', + {'foo': (AlterConfigOp.SET, 'v')})], + raise_on_unknown=False, + incremental=True) + + def test_error_response_surfaces_in_result(self, mock_broker, admin): + mock_broker.respond( + IncrementalAlterConfigsRequest, + _incremental_alter_configs_response([ + (_TOPIC, 'topic-a', InvalidConfigurationError.errno, 'bad value'), + ])) + + result = admin.alter_configs( + [ConfigResource('TOPIC', 'topic-a', + {'foo': (AlterConfigOp.SET, 'v')})], + raise_on_unknown=False, + incremental=True) + + assert 'topic' in result + assert 'bad value' in result['topic']['topic-a'] From 5829789f9032d8ea287e343696be7168fad779ce Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 09:20:13 -0700 Subject: [PATCH 3/6] admin cli: --force-alter/incremental, and support AlterConfigOps --- kafka/cli/admin/__init__.py | 3 --- kafka/cli/admin/configs/alter.py | 21 ++++++++++++++++++++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 5748e33a0..94325a04b 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -112,9 +112,6 @@ def run_cli(args=None): # --dry-run support # --trace ? - # [configs] - # IncrementalAlterConfigs (not supported yet) - # [client-quotas] # describe (DescribeClientQuotas - not supported yet) # alter (AlterClientQuotas - not supported yet) diff --git a/kafka/cli/admin/configs/alter.py b/kafka/cli/admin/configs/alter.py index efa2e0e29..33ea7b096 100644 --- a/kafka/cli/admin/configs/alter.py +++ b/kafka/cli/admin/configs/alter.py @@ -1,3 +1,6 @@ +import re + +from kafka.admin import AlterConfigOp from .common import add_resource_arguments, parse_resources @@ -10,15 +13,31 @@ def add_subparser(cls, subparsers): parser.add_argument('-c', '--config', type=str, action='append', dest='configs', required=True, help='key=value to alter') parser.add_argument('-v', '--validate-only', action='store_true', default=False) parser.add_argument('--allow-unknown', action='store_false', dest='raise_on_unknown', default=True) + incremental = parser.add_mutually_exclusive_group() + incremental.add_argument('--force-incremental', action='store_true', dest='incremental', default=None) + incremental.add_argument('--force-alter', action='store_false', dest='incremental', default=None) parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): try: configs = dict(config.split('=') for config in args.configs) + regex = r'^(set|del|add|sub)\((.*)\)$' + ops = { + 'set': AlterConfigOp.SET, + 'del': AlterConfigOp.DELETE, + 'add': AlterConfigOp.APPEND, + 'sub': AlterConfigOp.SUBTRACT, + } + for key in configs: + match = re.match(regex, configs[key]) + if match: + op_str, val = match.groups() + configs[key] = (ops[op_str], val) except ValueError: raise ValueError(f'Unable to parse configs! {args.configs}') resources = parse_resources(args, configs=configs) return client.alter_configs(resources, validate_only=args.validate_only, - raise_on_unknown=args.raise_on_unknown) + raise_on_unknown=args.raise_on_unknown, + incremental=args.incremental) From aca438d45125fa1066bf3685342a85a765c02a86 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 09:36:52 -0700 Subject: [PATCH 4/6] rename get_missing_dynamic -> get_missing_modified --- kafka/admin/_configs.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index 8491b6daf..845f45379 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -220,7 +220,7 @@ def list_config_resources(self, resource_types=None): """ return self._manager.run(self._async_list_config_resources, resource_types) - async def _get_missing_dynamic_configs(self, config_resources): + async def _get_missing_modified_configs(self, config_resources): resource_lookups = [ConfigResource(resource.resource_type, resource.name) for resource in config_resources] dynamic_configs = await self._async_describe_configs(resource_lookups, config_filter='modified', flat=True) missing_resource_configs = [] @@ -236,8 +236,8 @@ async def _get_missing_dynamic_configs(self, config_resources): return missing_resource_configs async def _add_missing_dynamic_configs(self, config_resources): - # Add missing dynamic config values to resource list to avoid accidental resets - missing_resource_configs = await self._get_missing_dynamic_configs(config_resources) + # Add missing modified config values to resource list to avoid accidental resets + missing_resource_configs = await self._get_missing_modified_configs(config_resources) for resource, missing in zip(config_resources, missing_resource_configs): if not isinstance(missing, dict): raise TypeError(f'missing configs: expected dict, found {type(missing)}') @@ -337,7 +337,7 @@ async def _async_reset_configs(self, config_resources, validate_only=False, rais # if no keys provided, submit as-is -- full reset # if keys are provided, replace with missing -- partial reset partial_resets = [resource for resource in config_resources if resource.configs] - missing_resource_configs = await self._get_missing_dynamic_configs(partial_resets) + missing_resource_configs = await self._get_missing_modified_configs(partial_resets) for resource, missing in zip(partial_resets, missing_resource_configs): resource.configs = missing else: From 6f30f98b13483b12a31f3829892dd0484bac2755 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 09:37:10 -0700 Subject: [PATCH 5/6] Support full reset w/ incremental --- kafka/admin/_configs.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index 845f45379..2d119ad58 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -334,19 +334,22 @@ async def _async_reset_configs(self, config_resources, validate_only=False, rais incremental = self._check_incremental_alter_configs_support() if not incremental: - # if no keys provided, submit as-is -- full reset - # if keys are provided, replace with missing -- partial reset + # if no keys provided (full reset), submit as-is + # if keys are provided (partial reset), replace with missing partial_resets = [resource for resource in config_resources if resource.configs] missing_resource_configs = await self._get_missing_modified_configs(partial_resets) for resource, missing in zip(partial_resets, missing_resource_configs): resource.configs = missing else: - config_resources = [ - ConfigResource(cr.resource_type, cr.name, - {key: (AlterConfigOp.DELETE, None) - for key in cr.configs}) - for cr in config_resources - ] + # if no keys provided (full reset): mark all modified keys as DELETE + full_resets = [resource for resource in config_resources if not resource.configs] + missing_resource_configs = await self._get_missing_modified_configs(full_resets) + for resource, missing in zip(full_resets, missing_resource_configs): + resource.configs = missing + # Update all configs to DELETE:None + for resource in config_resources: + resource.configs = {key: (AlterConfigOp.DELETE, None) + for key in resource.configs} return await self._send_alter_configs_requests(config_resources, validate_only, incremental) def reset_configs(self, config_resources, validate_only=False, raise_on_unknown=True, incremental=None): From fb000845a5a2fb0f05afe3e99770279ab0f7e779 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 09:37:17 -0700 Subject: [PATCH 6/6] cleanup docstring --- kafka/admin/_configs.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index 2d119ad58..b4c86aab8 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -307,12 +307,11 @@ def alter_configs(self, config_resources, validate_only=False, raise_on_unknown= ``(op, value)`` (where ``op`` is an :class:`AlterConfigOp`, its name, or its int value) or a bare value (interpreted as SET). For DELETE operations the value is ignored and sent as null. - Note: if broker does not support IncrementalAlterConfigsRequest, - AlterConfigOp APPEND/SUBTRACT are only supported on 2.3+ brokers, - which support the IncrementalAlterConfigsRequest. For older brokers - the client will use AlterConfigsRequest, which requires submitting - all dynamic configs together (the client will fill in missing keys - as required, though be wary of the inherent race with this approach). + APPEND/SUBTRACT require broker >= 2.3. On older brokers only + SET is supported; non-SET ops raise ValueError. On older brokers + the client also fills in all other modified dynamic keys before + submitting, since AlterConfigsRequest resets any omitted key to + its default (be aware of the inherent race in that approach). validate_only (bool, optional): If True, changes are sent to broker for validation only. Changes will not be applied. Default: False raise_on_unknown (bool, optional): If True, raises ValueError if any