Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from kafka.admin._configs import ConfigResource, ConfigResourceType
from kafka.admin.client import KafkaAdminClient
from kafka.admin._acls import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
ResourceType, ACLPermissionType, ACLResourcePatternType)
from kafka.admin._configs import ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType
from kafka.admin._topics import NewTopic, NewPartitions

__all__ = [
'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions', 'ACL', 'ACLFilter',
'ResourcePattern', 'ResourcePatternFilter', 'ACLOperation', 'ResourceType', 'ACLPermissionType',
'ACLResourcePatternType'
'KafkaAdminClient',
'ACL', 'ACLFilter', 'ACLOperation', 'ACLPermissionType', 'ACLResourcePatternType',
'ResourceType', 'ResourcePattern', 'ResourcePatternFilter',
'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType',
]
1 change: 1 addition & 0 deletions kafka/admin/_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def describe_cluster(self):
"""
metadata = self._manager.run(self._get_cluster_metadata, [])
metadata.pop('topics')
metadata.pop('throttle_time_ms', None)
return metadata

async def _async_describe_log_dirs(self, topic_partitions=(), brokers=None):
Expand Down
107 changes: 73 additions & 34 deletions kafka/admin/_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

from __future__ import annotations

import logging
from collections import defaultdict
from enum import IntEnum
import logging
from typing import TYPE_CHECKING

from kafka.errors import IncompatibleBrokerVersion
Expand All @@ -29,47 +30,57 @@ def _convert_describe_config_resource_request(config_resource):
return (
config_resource.resource_type,
config_resource.name,
[config_key for config_key, config_value in config_resource.configs.items()] if config_resource.configs else None
list(config_resource.configs.keys()) if isinstance(config_resource.configs, dict) else config_resource.configs
)

async def _async_describe_configs(self, config_resources, include_synonyms=False):
broker_resources = []
topic_resources = []
broker_resources = defaultdict(list)
other_resources = []

for config_resource in config_resources:
if config_resource.resource_type == ConfigResourceType.BROKER:
broker_resources.append(self._convert_describe_config_resource_request(config_resource))
if config_resource.resource_type in (ConfigResourceType.BROKER, ConfigResourceType.BROKER_LOGGER):
try:
broker_id = int(config_resource.name)
except ValueError:
raise ValueError("Broker resource names must be an integer or a string represented integer")
broker_resources[broker_id].append(self._convert_describe_config_resource_request(config_resource))
else:
topic_resources.append(self._convert_describe_config_resource_request(config_resource))
other_resources.append(self._convert_describe_config_resource_request(config_resource))

version = self._client.api_version(DescribeConfigsRequest, max_version=2)
if include_synonyms and version == 0:
raise IncompatibleBrokerVersion(
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
.format(self._manager.broker_version))

results = []
for broker_resource in broker_resources:
try:
broker_id = int(broker_resource[1])
except ValueError:
raise ValueError("Broker resource names must be an integer or a string represented integer")
if version == 0:
request = DescribeConfigsRequest[version](resources=[broker_resource])
else:
request = DescribeConfigsRequest[version](
resources=[broker_resource],
include_synonyms=include_synonyms)
results.append(await self._manager.send(request, node_id=broker_id))

if topic_resources:
if version == 0:
request = DescribeConfigsRequest[version](resources=topic_resources)
else:
request = DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms)
results.append(await self._manager.send(request))

return results
responses = []
for broker_id, resources in broker_resources.items():
request = DescribeConfigsRequest(
resources=resources,
include_synonyms=include_synonyms)
responses.append(await self._manager.send(request, node_id=broker_id))
if other_resources:
request = DescribeConfigsRequest(resources=other_resources, include_synonyms=include_synonyms)
responses.append(await self._manager.send(request))

ret = defaultdict(dict)
for response in responses:
for result in response.results:
result_type = ConfigResourceType(result.resource_type).name.lower()
ret[result_type][result.resource_name] = {}
for config in result.configs:
config = config.to_dict()
name = config.pop('name')
if 'config_source' in config:
config['config_source'] = ConfigSourceType(config['config_source']).name
if 'synonyms' in config:
for synonym in config['synonyms']:
synonym['source'] = ConfigSourceType(synonym['source']).name

if 'config_type' in config:
config['config_type'] = ConfigType(config['config_type']).name
ret[result_type][result.resource_name][name] = config
return dict(ret)

def describe_configs(self, config_resources, include_synonyms=False):
"""Fetch configuration parameters for one or more Kafka resources.
Expand Down Expand Up @@ -125,9 +136,12 @@ def alter_configs(self, config_resources):


class ConfigResourceType(IntEnum):
"""An enumerated type of config resources"""
BROKER = 4
UNKNOWN = 0
TOPIC = 2
BROKER = 4
BROKER_LOGGER = 8
CLIENT_METRICS = 16
GROUP = 32


class ConfigResource:
Expand All @@ -136,7 +150,7 @@ class ConfigResource:
Arguments:
resource_type (ConfigResourceType): the type of kafka resource
name (string): The name of the kafka resource
configs ({key : value}): A maps of config keys to values.
configs ([key] or {key : value}): config keys (values required to alter)
"""
def __init__(self, resource_type, name, configs=None):
if not isinstance(resource_type, ConfigResourceType):
Expand All @@ -146,7 +160,32 @@ def __init__(self, resource_type, name, configs=None):
self.configs = configs

def __str__(self):
return "ConfigResource %s=%s" % (self.resource_type, self.name)
return f"ConfigResource {self.name}={self.resource_type}"

def __repr__(self):
return "ConfigResource(%s, %s, %s)" % (self.resource_type, self.name, self.configs)
return f"ConfigResource({self.resource_type}, {self.name}, {self.configs})"


class ConfigType(IntEnum):
UNKNOWN = 0
BOOLEAN = 1
STRING = 2
INT = 3
SHORT = 4
LONG = 5
DOUBLE = 6
LIST = 7
CLASS = 8
PASSWORD = 9


class ConfigSourceType(IntEnum):
UNKNOWN = 0
TOPIC_CONFIG = 1
DYNAMIC_BROKER_CONFIG = 2
DYNAMIC_DEFAULT_BROKER_CONFIG = 3
STATIC_BROKER_CONFIG = 4
DEFAULT_CONFIG = 5
DYNAMIC_BROKER_LOGGER_CONFIG = 6
CLIENT_METRICS_CONFIG = 7
GROUP_CONFIG = 8
134 changes: 39 additions & 95 deletions kafka/admin/_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from kafka.protocol.consumer.metadata import (
ConsumerProtocolAssignment, ConsumerProtocolSubscription, ConsumerProtocolType,
)
from kafka.structs import GroupInformation, MemberInformation, OffsetAndMetadata, TopicPartition
from kafka.structs import OffsetAndMetadata, TopicPartition

if TYPE_CHECKING:
from kafka.net.manager import KafkaConnectionManager
Expand Down Expand Up @@ -48,53 +48,13 @@ def _describe_consumer_groups_process_response(self, response):
raise NotImplementedError(
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."
.format(response.API_VERSION))

assert len(response.groups) == 1
for response_name, response_field in response.fields.items():
if response_name == 'groups':
described_group = getattr(response, response_name)[0]
described_group_information_list = []
protocol_type_is_consumer = False
for group_information_name, group_information_field in response_field.fields.items():
if not group_information_field.for_version_q(response.API_VERSION):
continue
described_group_information = getattr(described_group, group_information_name)
if group_information_name == 'protocol_type':
protocol_type = described_group_information
protocol_type_is_consumer = (protocol_type == ConsumerProtocolType or not protocol_type)
if group_information_name == 'members':
member_information_list = []
for member in described_group_information:
member_information = []
for attr_name, attr_field in group_information_field.fields.items():
if not attr_field.for_version_q(response.API_VERSION):
continue
attr_val = getattr(member, attr_name)
if protocol_type_is_consumer:
if attr_name == 'member_metadata' and attr_val:
member_information.append(ConsumerProtocolSubscription.decode(attr_val))
elif attr_name == 'member_assignment' and attr_val:
member_information.append(ConsumerProtocolAssignment.decode(attr_val))
else:
member_information.append(attr_val)
member_info_tuple = MemberInformation._make(member_information)
member_information_list.append(member_info_tuple)
described_group_information_list.append(member_information_list)
else:
described_group_information_list.append(described_group_information)
if response.API_VERSION >= 3:
described_group_information_list[-1] = list(map(lambda acl: acl.name, valid_acl_operations(described_group_information_list[-1])))
else:
described_group_information_list.append([])
group_description = GroupInformation._make(described_group_information_list)
error_code = group_description.error_code
error_type = Errors.for_code(error_code)
if error_type is not Errors.NoError:
raise error_type(
"DescribeGroupsResponse failed with response '{}'."
.format(response))
return group_description
assert False, "DescribeGroupsResponse parsing failed"
for group in response.groups:
for member in group.members:
member.member_metadata = ConsumerProtocolSubscription.decode(member.member_metadata)
member.member_assignment = ConsumerProtocolAssignment.decode(member.member_assignment)
# Return dict (key, val) tuples
return [(group.group_id, self._process_acl_operations(group.to_dict())) for group in response.groups]

async def _async_describe_consumer_groups(self, group_ids, group_coordinator_id=None):
results = []
Expand All @@ -103,7 +63,8 @@ async def _async_describe_consumer_groups(self, group_ids, group_coordinator_id=
request = self._describe_consumer_groups_request(group_id)
response = await self._manager.send(request, node_id=coordinator_id)
results.append(self._describe_consumer_groups_process_response(response))
return results
# Combine key/vals from multiple requests into single dict
return dict(itertools.chain(*results))

def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False):
"""Describe a set of consumer groups.
Expand All @@ -123,10 +84,10 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include
have the same coordinator, otherwise it will error. Default: None.

Returns:
A list of group descriptions. For now the group descriptions
are the raw results from the DescribeGroupsResponse. Long-term, we
plan to change this to return namedtuples as well as decoding the
partition assignments.
A dict of {group_id: {key: val}}. key/vals are simple to_dict translations
of the raw results from DescribeGroupsResponse (with inline decoding
of ConsumerSubscription and ConsumerAssignment metadata, and conversion
of acl set ints to semantic enums).
"""
return self._manager.run(self._async_describe_consumer_groups, group_ids, group_coordinator_id)

Expand All @@ -144,17 +105,17 @@ def _list_consumer_groups_process_response(self, response):
raise error_type(
"ListGroupsRequest failed with response '{}'."
.format(response))
return [(group.group_id, group.protocol_type) for group in response.groups]
return [group.to_dict() for group in response.groups]

async def _async_list_consumer_groups(self, broker_ids=None):
if broker_ids is None:
broker_ids = [broker.node_id for broker in self._manager.cluster.brokers()]
consumer_groups = set()
consumer_groups = []
for broker_id in broker_ids:
request = self._list_consumer_groups_request()
response = await self._manager.send(request, node_id=broker_id)
consumer_groups.update(self._list_consumer_groups_process_response(response))
return list(consumer_groups)
consumer_groups.extend(self._list_consumer_groups_process_response(response))
return consumer_groups

def list_consumer_groups(self, broker_ids=None):
"""List all consumer groups known to the cluster.
Expand All @@ -178,7 +139,7 @@ def list_consumer_groups(self, broker_ids=None):
consumer groups are coordinated by those broker(s). Default: None

Returns:
list: List of tuples of Consumer Groups.
List of group data dicts, with key/vals from ListGroupsRequest
"""
return self._manager.run(self._async_list_consumer_groups, broker_ids)

Expand All @@ -203,32 +164,20 @@ def _list_consumer_group_offsets_request(self, group_id, partitions=None):

def _list_consumer_group_offsets_process_response(self, response):
"""Process an OffsetFetchResponse."""
if response.API_VERSION <= 6:
if response.API_VERSION > 1:
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
"OffsetFetchResponse failed with response '{}'."
.format(response))
offsets = {}
for topic, partitions in response.topics:
for partition_data in partitions:
if response.API_VERSION <= 4:
partition, offset, metadata, error_code = partition_data
leader_epoch = -1
else:
partition, offset, leader_epoch, metadata, error_code = partition_data
error_type = Errors.for_code(error_code)
if error_type is not Errors.NoError:
raise error_type(
"Unable to fetch consumer group offsets for topic {}, partition {}"
.format(topic, partition))
offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata, leader_epoch)
else:
raise NotImplementedError(
"Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient."
.format(response.API_VERSION))
return offsets
if response.API_VERSION > 1:
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
"OffsetFetchResponse failed with response '{}'."
.format(response))
def _partitions_to_dict(partitions):
d = {}
for p in partitions:
d[p.partition_index] = p.to_dict()
d[p.partition_index].pop('partition_index')
return d
return {topic.name: _partitions_to_dict(topic.partitions)
for topic in response.topics}

async def _async_list_consumer_group_offsets(self, group_id, group_coordinator_id=None, partitions=None):
if group_coordinator_id is None:
Expand Down Expand Up @@ -259,8 +208,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
known offsets for the consumer group. Default: None.

Returns:
dictionary: A dictionary with TopicPartition keys and
OffsetAndMetadata values.
dict: {topic: [{partition data}]} key/vals from OffsetCommitResponse}]}
"""
return self._manager.run(self._async_list_consumer_group_offsets, group_id, group_coordinator_id, partitions)

Expand All @@ -271,15 +219,11 @@ def _delete_consumer_groups_request(self, group_ids):

def _convert_delete_groups_response(self, response):
"""Parse a DeleteGroupsResponse."""
if response.API_VERSION <= 2:
results = []
for group_id, error_code in response.results:
results.append((group_id, Errors.for_code(error_code)))
return results
else:
raise NotImplementedError(
"Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient."
.format(response.API_VERSION))
results = []
for group_id, error_code in response.results:
res = 'OK' if error_code == 0 else Errors.for_code(error_code).__name__
results.append((group_id, res))
return results

async def _async_delete_consumer_groups(self, group_ids, group_coordinator_id=None):
coordinators_groups = defaultdict(list)
Expand All @@ -295,7 +239,7 @@ async def _async_delete_consumer_groups(self, group_ids, group_coordinator_id=No
request = self._delete_consumer_groups_request(coordinator_group_ids)
response = await self._manager.send(request, node_id=coordinator_id)
results.extend(self._convert_delete_groups_response(response))
return results
return dict(results)

def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
"""Delete Consumer Group Offsets for given consumer groups.
Expand Down
Loading
Loading