Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 160 additions & 22 deletions kafka/admin/_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import logging
from typing import TYPE_CHECKING

from kafka.errors import IncompatibleBrokerVersion
import kafka.errors as Errors
from kafka.protocol.admin import (
AlterConfigsRequest,
DescribeConfigsRequest,
Expand All @@ -31,6 +31,8 @@ class ConfigAdminMixin:
def _convert_config_resource(config_resource, key_only=True):
if key_only:
values = list(config_resource.configs.keys()) if isinstance(config_resource.configs, dict) else config_resource.configs
elif not config_resource.configs:
values = []
else:
assert isinstance(config_resource.configs, dict)
values = list(config_resource.configs.items())
Expand All @@ -50,7 +52,12 @@ def _group_config_resources(self, config_resources, key_only=True):
other_resources.append(self._convert_config_resource(config_resource, key_only=key_only))
return broker_resources, other_resources

async def _async_describe_configs(self, config_resources, include_synonyms=False):
async def _async_describe_configs(self, config_resources, include_synonyms=False, config_filter='modified', flat=False):
if isinstance(config_filter, str):
try:
config_filter = ConfigFilterType[config_filter.upper()]
except KeyError:
raise ValueError(f'{config_filter} is not a valid ConfigFilterType')
min_version = 1 if include_synonyms else 0
broker_resources, other_resources = self._group_config_resources(config_resources, key_only=True)
responses = []
Expand All @@ -70,23 +77,38 @@ async def _async_describe_configs(self, config_resources, include_synonyms=False
ret = defaultdict(dict)
for response in responses:
for result in response.results:
result_type = ConfigResourceType(result.resource_type).name.lower()
ret[result_type][result.resource_name] = {}
resource_type = ConfigResourceType(result.resource_type)
resource_configs = {}
for config in result.configs:
config = config.to_dict()
name = config.pop('name')
if config_filter == ConfigFilterType.DYNAMIC and config['read_only']:
continue
if 'config_source' in config:
config['config_source'] = ConfigSourceType(config['config_source']).name
config_source = ConfigSourceType(config['config_source'])
elif config['read_only'] and resource_type is ConfigResourceType.BROKER:
config_source = ConfigSourceType.STATIC_BROKER_CONFIG
elif config['is_default']:
config_source = ConfigSourceType.DEFAULT_CONFIG
else:
config_source = ConfigSourceType.dynamic_for_resource_type(resource_type)
if config_filter.should_skip(config_source):
continue
config['config_source'] = config_source.name
if 'synonyms' in config:
for synonym in config['synonyms']:
synonym['source'] = ConfigSourceType(synonym['source']).name

if 'config_type' in config:
config['config_type'] = ConfigType(config['config_type']).name
ret[result_type][result.resource_name][name] = config
return dict(ret)
resource_configs[name] = config
ret[resource_type.name.lower()][result.resource_name] = resource_configs
if flat:
return [ret[resource.resource_type.name.lower()][resource.name]
for resource in config_resources]
else:
return dict(ret)

def describe_configs(self, config_resources, include_synonyms=False):
def describe_configs(self, config_resources, include_synonyms=False, config_filter='modified'):
"""Fetch configuration parameters for one or more Kafka resources.

Arguments:
Expand All @@ -98,37 +120,133 @@ def describe_configs(self, config_resources, include_synonyms=False):
Keyword Arguments:
include_synonyms (bool, optional): If True, return synonyms in response. Not
supported by all versions. Default: False.
config_filter (ConfigFilterType or str): Modified returns only keys that have
non-default values; Dynamic returns all keys that can be modified with
alter_configs; All returns all available keys. Default: Modified.

Returns:
List of DescribeConfigsResponses.
dict of {resource_type (str): {resource_name (str): {config_key: {config data}}}}
"""
return self._manager.run(self._async_describe_configs, config_resources, include_synonyms)

async def _async_alter_configs(self, config_resources, validate_only=False):
return self._manager.run(self._async_describe_configs, config_resources,
include_synonyms, config_filter)

async def _get_missing_dynamic_configs(self, config_resources):
resource_lookups = [ConfigResource(resource.resource_type, resource.name) for resource in config_resources]
dynamic_configs = await self._async_describe_configs(resource_lookups, config_filter='modified', flat=True)
missing_resource_configs = []
for resource, describe in zip(config_resources, dynamic_configs):
missing = {}
for config_key in describe:
if config_key not in resource.configs:
config_value = describe[config_key]['value']
if config_value is None:
continue
missing[config_key] = config_value
missing_resource_configs.append(missing)
return missing_resource_configs

async def _add_missing_dynamic_configs(self, config_resources):
# Add missing dynamic config values to resource list to avoid accidental resets
missing_resource_configs = await self._get_missing_dynamic_configs(config_resources)
for resource, missing in zip(config_resources, missing_resource_configs):
if not isinstance(missing, dict):
raise TypeError(f'missing configs: expected dict, found {type(missing)}')
resource.configs.update(missing)

async def _validate_dynamic_configs(self, config_resources):
resource_lookups = [ConfigResource(resource.resource_type, resource.name) for resource in config_resources]
dynamic_configs = await self._async_describe_configs(resource_lookups, config_filter='dynamic', flat=True)
for resource, describe in zip(config_resources, dynamic_configs):
unknown = set(resource.configs or []) - set(describe)
if unknown:
raise ValueError(f'Unrecognized configs: {unknown}')

async def _async_alter_configs(self, config_resources, validate_only=False, raise_on_unknown=True):
# Broker Version < (2, 3): use alter configs
# Broker Version >= (2, 3): use incremental alter configs
if raise_on_unknown:
await self._validate_dynamic_configs(config_resources)
await self._add_missing_dynamic_configs(config_resources)
return await self._send_alter_configs_requests(config_resources, validate_only=validate_only)

async def _send_alter_configs_requests(self, config_resources, validate_only=False):
broker_resources, other_resources = self._group_config_resources(config_resources, key_only=False)
responses = []
for broker_id, resources in broker_resources.items():
request = AlterConfigsRequest(
resources=resources,
validate_only=validate_only)
responses.append(await self._manager.send(request, node_id=broker_id))
response = await self._manager.send(request, node_id=broker_id)
responses.extend(response.responses)
if other_resources:
request = AlterConfigsRequest(
resources=other_resources,
validate_only=validate_only)
responses.append(await self._manager.send(request))
return responses
response = await self._manager.send(request)
responses.extend(response.responses)
ret = defaultdict(dict)
for response in responses:
if response.error_code == 0:
result = 'OK'
else:
result = str(Errors.for_code(response.error_code)(response.error_message))
result_type = ConfigResourceType(response.resource_type).name.lower()
ret[result_type][response.resource_name] = result
return dict(ret)

def alter_configs(self, config_resources, validate_only=False):
def alter_configs(self, config_resources, validate_only=False, raise_on_unknown=True):
"""Alter configuration parameters of one or more Kafka resources.

Arguments:
config_resources: A list of ConfigResource objects.
validate_only (bool, optional): If True, changes are sent to broker for
validation only. Changes will not be applied. Default: False
raise_on_unknown (bool, optional): If True, raises ValueError if any
config key is not recognized as a dynamic config for the resource.

Returns:
Appropriate version of AlterConfigsResponse class.
dict of {resource_type (str): {resource_name (str): Error/Result}}
"""
return self._manager.run(self._async_alter_configs, config_resources, validate_only)
return self._manager.run(self._async_alter_configs, config_resources, validate_only, raise_on_unknown)

async def _async_reset_configs(self, config_resources, validate_only=False, raise_on_unknown=True):
if raise_on_unknown:
await self._validate_dynamic_configs(config_resources)
# if no keys provided, submit as-is -- full reset
# if keys are provided, replace with missing -- partial reset
partial_resets = [resource for resource in config_resources if resource.configs]
missing_resource_configs = await self._get_missing_dynamic_configs(partial_resets)
for resource, missing in zip(partial_resets, missing_resource_configs):
resource.configs = missing
return await self._send_alter_configs_requests(config_resources, validate_only=validate_only)

def reset_configs(self, config_resources, validate_only=False, raise_on_unknown=True):
"""Reset configuration parameters of one or more Kafka resources.

Arguments:
config_resources: A list of ConfigResource objects.

Returns:
dict of {resource_type (str): {resource_name (str): Error/Result}}
"""
return self._manager.run(self._async_reset_configs, config_resources, validate_only, raise_on_unknown)


class ConfigFilterType(IntEnum):
ALL = 0
DYNAMIC = 1
MODIFIED = 2
DEFAULT = 3
STATIC = 4

def should_skip(self, config_source):
if self is ConfigFilterType.MODIFIED:
return not config_source.is_modified()
elif self is ConfigFilterType.DEFAULT:
return config_source.is_modified()
elif self is ConfigFilterType.STATIC:
return config_source is not ConfigSourceType.STATIC_BROKER_CONFIG
return False


class ConfigResourceType(IntEnum):
Expand Down Expand Up @@ -177,11 +295,31 @@ class ConfigType(IntEnum):

class ConfigSourceType(IntEnum):
UNKNOWN = 0
TOPIC_CONFIG = 1
DYNAMIC_TOPIC_CONFIG = 1
DYNAMIC_BROKER_CONFIG = 2
DYNAMIC_DEFAULT_BROKER_CONFIG = 3
STATIC_BROKER_CONFIG = 4
DEFAULT_CONFIG = 5
DYNAMIC_BROKER_LOGGER_CONFIG = 6
CLIENT_METRICS_CONFIG = 7
GROUP_CONFIG = 8
DYNAMIC_CLIENT_METRICS_CONFIG = 7
DYNAMIC_GROUP_CONFIG = 8

def is_modified(self):
return self.value not in (3, 4, 5)

@classmethod
def dynamic_for_resource_type(cls, resource_type):
if resource_type is ConfigResourceType.UNKNOWN:
return ConfigSourceType.UNKNOWN
elif resource_type is ConfigResourceType.TOPIC:
return ConfigSourceType.DYNAMIC_TOPIC_CONFIG
elif resource_type is ConfigResourceType.BROKER:
return ConfigSourceType.DYNAMIC_BROKER_CONFIG
elif resource_type is ConfigResourceType.BROKER_LOGGER:
return ConfigSourceType.DYNAMIC_BROKER_LOGGER_CONFIG
elif resource_type is ConfigResourceType.CLIENT_METRICS:
return ConfigSourceType.DYNAMIC_CLIENT_METRICS_CONFIG
elif resource_type is ConfigResourceType.GROUP:
return ConfigSourceType.DYNAMIC_GROUP_CONFIG
else:
raise RuntimeError(f'Unrecognized resource type {resource_type}')
1 change: 1 addition & 0 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def run_cli(args=None):
# --trace ?

# [configs]
# ListConfigResources
# IncrementalAlterConfigs (not supported yet)

# [client-quotas]
Expand Down
3 changes: 2 additions & 1 deletion kafka/cli/admin/configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .alter import AlterConfigs
from .describe import DescribeConfigs
from .reset import ResetConfigs


class ConfigsSubCommand:
Expand All @@ -10,6 +11,6 @@ class ConfigsSubCommand:
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('configs', help='Manage Kafka Configuration')
commands = parser.add_subparsers()
for cmd in [DescribeConfigs, AlterConfigs]:
for cmd in [DescribeConfigs, AlterConfigs, ResetConfigs]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
16 changes: 14 additions & 2 deletions kafka/cli/admin/configs/alter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,31 @@ def add_subparser(cls, subparsers):
parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[])
parser.add_argument('--broker-logger', type=str, action='append', dest='broker_loggers', default=[])
parser.add_argument('-g', '--group', type=str, action='append', dest='groups', default=[])
parser.add_argument('-c', '--config', type=str, action='append', dest='configs', default=None, help='key=value to alter')
parser.add_argument('-c', '--config', type=str, action='append', dest='configs', required=True, help='key=value to alter')
parser.add_argument('-v', '--validate-only', action='store_true', default=False)
parser.add_argument('--allow-unknown', action='store_false', dest='raise_on_unknown', default=True)
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
configs = dict(config.split('=') for config in args.configs)
resources = []
for topic in args.topics:
if resources:
raise ValueError('Only one resource type per request')
resources.append(ConfigResource('TOPIC', topic, configs))
for broker in args.brokers:
if resources:
raise ValueError('Only one resource type per request')
resources.append(ConfigResource('BROKER', broker, configs))
for broker in args.broker_loggers:
if resources:
raise ValueError('Only one resource type per request')
resources.append(ConfigResource('BROKER_LOGGER', broker, configs))
for group in args.groups:
if resources:
raise ValueError('Only one resource type per request')
resources.append(ConfigResource('GROUP', group, configs))
return client.alter_configs(resources)
return client.alter_configs(resources,
validate_only=args.validate_only,
raise_on_unknown=args.raise_on_unknown)
16 changes: 15 additions & 1 deletion kafka/cli/admin/configs/describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ def add_subparser(cls, subparsers):
parser.add_argument('--broker-logger', type=str, action='append', dest='broker_loggers', default=[])
parser.add_argument('-g', '--group', type=str, action='append', dest='groups', default=[])
parser.add_argument('-k', '--key', type=str, action='append', dest='keys', default=None)
parser.add_argument('--dynamic', action='store_true', default=False)
parser.add_argument('--modified', action='store_true', default=False)
parser.add_argument('--static', action='store_true', default=False)
parser.add_argument('--default', action='store_true', default=False)
parser.set_defaults(command=cls.command)

@classmethod
Expand All @@ -25,4 +29,14 @@ def command(cls, client, args):
for group in args.groups:
resources.append(ConfigResource('GROUP', group, args.keys))

return client.describe_configs(resources)
if args.modified:
config_filter = 'modified'
elif args.dynamic:
config_filter = 'dynamic'
elif args.static:
config_filter = 'static'
elif args.default:
config_filter = 'default'
else:
config_filter = 'all'
return client.describe_configs(resources, config_filter=config_filter)
31 changes: 31 additions & 0 deletions kafka/cli/admin/configs/reset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from kafka.admin import ConfigResource


class ResetConfigs:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('reset', help='Reset Kafka Configs')
parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[])
parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[])
parser.add_argument('--broker-logger', type=str, action='append', dest='broker_loggers', default=[])
parser.add_argument('-g', '--group', type=str, action='append', dest='groups', default=[])
parser.add_argument('-c', '--config', type=str, action='append', dest='configs', default=[], help='key=value to reset')
parser.add_argument('-v', '--validate-only', action='store_true', default=False)
parser.add_argument('--allow-unknown', action='store_false', dest='raise_on_unknown', default=True)
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
resources = []
for topic in args.topics:
resources.append(ConfigResource('TOPIC', topic, args.configs))
for broker in args.brokers:
resources.append(ConfigResource('BROKER', broker, args.configs))
for broker in args.broker_loggers:
resources.append(ConfigResource('BROKER_LOGGER', broker, args.configs))
for group in args.groups:
resources.append(ConfigResource('GROUP', group, args.configs))
return client.reset_configs(resources,
validate_only=args.validate_only,
raise_on_unknown=args.raise_on_unknown)
Loading
Loading