Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from kafka.admin.client import KafkaAdminClient
from kafka.admin._acls import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
ResourceType, ACLPermissionType, ACLResourcePatternType)
from kafka.admin._acls import (
ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
ResourceType, ACLPermissionType, ACLResourcePatternType)
from kafka.admin._configs import (
ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType)
AlterConfigOp, ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType)
from kafka.admin._groups import MemberToRemove
from kafka.admin._partitions import NewPartitions, OffsetSpec
from kafka.admin._topics import NewTopic
Expand All @@ -13,7 +14,7 @@
'KafkaAdminClient',
'ACL', 'ACLFilter', 'ACLOperation', 'ACLPermissionType', 'ACLResourcePatternType',
'ResourceType', 'ResourcePattern', 'ResourcePatternFilter',
'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType',
'AlterConfigOp', 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType',
'MemberToRemove', 'OffsetSpec', # NewTopic + NewPartitions are deprecated and not included in __all__
'ScramMechanism', 'UserScramCredentialDeletion', 'UserScramCredentialUpsertion',
]
253 changes: 183 additions & 70 deletions kafka/admin/_configs.py

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,6 @@ def run_cli(args=None):
# --dry-run support
# --trace ?

# [configs]
# IncrementalAlterConfigs (not supported yet)

# [client-quotas]
# describe (DescribeClientQuotas - not supported yet)
# alter (AlterClientQuotas - not supported yet)
Expand Down
21 changes: 20 additions & 1 deletion kafka/cli/admin/configs/alter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import re

from kafka.admin import AlterConfigOp
from .common import add_resource_arguments, parse_resources


Expand All @@ -10,15 +13,31 @@ def add_subparser(cls, subparsers):
parser.add_argument('-c', '--config', type=str, action='append', dest='configs', required=True, help='key=value to alter')
parser.add_argument('-v', '--validate-only', action='store_true', default=False)
parser.add_argument('--allow-unknown', action='store_false', dest='raise_on_unknown', default=True)
incremental = parser.add_mutually_exclusive_group()
incremental.add_argument('--force-incremental', action='store_true', dest='incremental', default=None)
incremental.add_argument('--force-alter', action='store_false', dest='incremental', default=None)
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
try:
configs = dict(config.split('=') for config in args.configs)
regex = r'^(set|del|add|sub)\((.*)\)$'
ops = {
'set': AlterConfigOp.SET,
'del': AlterConfigOp.DELETE,
'add': AlterConfigOp.APPEND,
'sub': AlterConfigOp.SUBTRACT,
}
for key in configs:
match = re.match(regex, configs[key])
if match:
op_str, val = match.groups()
configs[key] = (ops[op_str], val)
except ValueError:
raise ValueError(f'Unable to parse configs! {args.configs}')
resources = parse_resources(args, configs=configs)
return client.alter_configs(resources,
validate_only=args.validate_only,
raise_on_unknown=args.raise_on_unknown)
raise_on_unknown=args.raise_on_unknown,
incremental=args.incremental)
4 changes: 4 additions & 0 deletions kafka/protocol/admin/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ class AlterConfigsResponse(ApiMessage): pass
class DescribeConfigsRequest(ApiMessage): pass
class DescribeConfigsResponse(ApiMessage): pass

class IncrementalAlterConfigsRequest(ApiMessage): pass
class IncrementalAlterConfigsResponse(ApiMessage): pass

class ListConfigResourcesRequest(ApiMessage): pass
class ListConfigResourcesResponse(ApiMessage): pass


__all__ = [
'AlterConfigsRequest', 'AlterConfigsResponse',
'DescribeConfigsRequest', 'DescribeConfigsResponse',
'IncrementalAlterConfigsRequest', 'IncrementalAlterConfigsResponse',
'ListConfigResourcesRequest', 'ListConfigResourcesResponse',
]
111 changes: 110 additions & 1 deletion kafka/protocol/admin/configs.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ from typing import Any, Self
from kafka.protocol.api_message import ApiMessage
from kafka.protocol.data_container import DataContainer

__all__ = ['AlterConfigsRequest', 'AlterConfigsResponse', 'DescribeConfigsRequest', 'DescribeConfigsResponse', 'ListConfigResourcesRequest', 'ListConfigResourcesResponse']
__all__ = ['AlterConfigsRequest', 'AlterConfigsResponse', 'DescribeConfigsRequest', 'DescribeConfigsResponse', 'IncrementalAlterConfigsRequest', 'IncrementalAlterConfigsResponse', 'ListConfigResourcesRequest', 'ListConfigResourcesResponse']

class AlterConfigsRequest(ApiMessage):
class AlterConfigsResource(DataContainer):
Expand Down Expand Up @@ -256,6 +256,115 @@ class DescribeConfigsResponse(ApiMessage):
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class IncrementalAlterConfigsRequest(ApiMessage):
class AlterConfigsResource(DataContainer):
class AlterableConfig(DataContainer):
name: str
config_operation: int
value: str | None
def __init__(
self,
*args: Any,
name: str = ...,
config_operation: int = ...,
value: str | None = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

resource_type: int
resource_name: str
configs: list[AlterableConfig]
def __init__(
self,
*args: Any,
resource_type: int = ...,
resource_name: str = ...,
configs: list[AlterableConfig] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

resources: list[AlterConfigsResource]
validate_only: bool
def __init__(
self,
*args: Any,
resources: list[AlterConfigsResource] = ...,
validate_only: bool = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...
name: str
type: str
API_KEY: int
API_VERSION: int
valid_versions: tuple[int, int]
min_version: int
max_version: int
@property
def header(self) -> Any: ...
@classmethod
def is_request(cls) -> bool: ...
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class IncrementalAlterConfigsResponse(ApiMessage):
class AlterConfigsResourceResponse(DataContainer):
error_code: int
error_message: str | None
resource_type: int
resource_name: str
def __init__(
self,
*args: Any,
error_code: int = ...,
error_message: str | None = ...,
resource_type: int = ...,
resource_name: str = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

throttle_time_ms: int
responses: list[AlterConfigsResourceResponse]
def __init__(
self,
*args: Any,
throttle_time_ms: int = ...,
responses: list[AlterConfigsResourceResponse] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...
name: str
type: str
API_KEY: int
API_VERSION: int
valid_versions: tuple[int, int]
min_version: int
max_version: int
@property
def header(self) -> Any: ...
@classmethod
def is_request(cls) -> bool: ...
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class ListConfigResourcesRequest(ApiMessage):
resource_types: list[int]
def __init__(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 44,
"type": "request",
"listeners": ["broker", "controller"],
"name": "IncrementalAlterConfigsRequest",
// Version 1 is the first flexible version.
"validVersions": "0-1",
"flexibleVersions": "1+",
"fields": [
{ "name": "Resources", "type": "[]AlterConfigsResource", "versions": "0+",
"about": "The incremental updates for each resource.", "fields": [
{ "name": "ResourceType", "type": "int8", "versions": "0+", "mapKey": true,
"about": "The resource type." },
{ "name": "ResourceName", "type": "string", "versions": "0+", "mapKey": true,
"about": "The resource name." },
{ "name": "Configs", "type": "[]AlterableConfig", "versions": "0+",
"about": "The configurations.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
"about": "The configuration key name." },
{ "name": "ConfigOperation", "type": "int8", "versions": "0+", "mapKey": true,
"about": "The type (Set, Delete, Append, Subtract) of operation." },
{ "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The value to set for the configuration key."}
]}
]},
{ "name": "ValidateOnly", "type": "bool", "versions": "0+",
"about": "True if we should validate the request, but not change the configurations."}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 44,
"type": "response",
"name": "IncrementalAlterConfigsResponse",
// Version 1 is the first flexible version.
"validVersions": "0-1",
"flexibleVersions": "1+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Responses", "type": "[]AlterConfigsResourceResponse", "versions": "0+",
"about": "The responses for each resource.", "fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The resource error code." },
{ "name": "ErrorMessage", "type": "string", "nullableVersions": "0+", "versions": "0+",
"about": "The resource error message, or null if there was no error." },
{ "name": "ResourceType", "type": "int8", "versions": "0+",
"about": "The resource type." },
{ "name": "ResourceName", "type": "string", "versions": "0+",
"about": "The resource name." }
]}
]
}
Loading
Loading