Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions kafka/admin/_acls.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ class ACLAdminMixin:
_client: object
config: dict

# ACL Helper for Metadata / DescribeGroups
def _process_acl_operations(self, obj):
if obj.get('authorized_operations', None) is not None:
obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations'])))
return obj

def describe_acls(self, acl_filter):
"""Describe a set of ACLs

Expand Down
47 changes: 21 additions & 26 deletions kafka/admin/_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,17 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include
# -- List consumer groups --------------------------------------------------

def _list_consumer_groups_request(self):
version = self._client.api_version(ListGroupsRequest, max_version=2)
return ListGroupsRequest[version]()
# TODO: KIP-518: StatesFilter
# TODO: KIP-848: TypesFilter
return ListGroupsRequest()

def _list_consumer_groups_process_response(self, response):
"""Process a ListGroupsResponse into a list of groups."""
if response.API_VERSION <= 2:
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
"ListGroupsRequest failed with response '{}'."
.format(response))
else:
raise NotImplementedError(
"Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient."
.format(response.API_VERSION))
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
"ListGroupsRequest failed with response '{}'."
.format(response))
return [(group.group_id, group.protocol_type) for group in response.groups]

async def _async_list_consumer_groups(self, broker_ids=None):
Expand Down Expand Up @@ -189,25 +185,25 @@ def list_consumer_groups(self, broker_ids=None):
# -- List consumer group offsets -------------------------------------------

def _list_consumer_group_offsets_request(self, group_id, partitions=None):
version = self._client.api_version(OffsetFetchRequest, max_version=5)
_Topic = OffsetFetchRequest.OffsetFetchRequestTopic
if partitions is None:
if version <= 1:
raise ValueError(
"""OffsetFetchRequest_v{} requires specifying the
partitions for which to fetch offsets. Omitting the
partitions is only supported on brokers >= 0.10.2.
For details, see KIP-88.""".format(version))
topics_partitions = None
min_version = 1
topics = None
else:
min_version = 0
topics_partitions_dict = defaultdict(set)
for topic, partition in partitions:
topics_partitions_dict[topic].add(partition)
topics_partitions = list(topics_partitions_dict.items())
return OffsetFetchRequest[version](group_id, topics_partitions)
topics = [
_Topic(name=name, partition_indexes=list(partitions))
for name, partitions in topics_partitions_dict.items()
]
return OffsetFetchRequest(group_id=group_id, topics=topics,
min_version=min_version, max_version=6)

def _list_consumer_group_offsets_process_response(self, response):
"""Process an OffsetFetchResponse."""
if response.API_VERSION <= 5:
if response.API_VERSION <= 6:
if response.API_VERSION > 1:
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
Expand Down Expand Up @@ -271,12 +267,11 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
# -- Delete consumer groups ------------------------------------------------

def _delete_consumer_groups_request(self, group_ids):
version = self._client.api_version(DeleteGroupsRequest, max_version=1)
return DeleteGroupsRequest[version](group_ids)
return DeleteGroupsRequest(groups_names=group_ids)

def _convert_delete_groups_response(self, response):
"""Parse a DeleteGroupsResponse."""
if response.API_VERSION <= 1:
if response.API_VERSION <= 2:
results = []
for group_id, error_code in response.results:
results.append((group_id, Errors.for_code(error_code)))
Expand Down
28 changes: 0 additions & 28 deletions kafka/admin/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
from typing import TYPE_CHECKING

from kafka.admin._acls import valid_acl_operations
from kafka.protocol.metadata import MetadataRequest

if TYPE_CHECKING:
Expand All @@ -18,11 +17,6 @@ class MetadataAdminMixin:
"""Mixin providing cluster metadata methods for KafkaAdminClient."""
_manager: KafkaConnectionManager

def _process_acl_operations(self, obj):
if obj.get('authorized_operations', None) is not None:
obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations'])))
return obj

async def _get_cluster_metadata(self, topics):
"""topics = [] for no topics, None for all."""
request = MetadataRequest(
Expand All @@ -40,28 +34,6 @@ async def _get_cluster_metadata(self, topics):
self._process_acl_operations(topic)
return metadata

def list_topics(self):
"""Retrieve a list of all topic names in the cluster.

Returns:
A list of topic name strings.
"""
metadata = self._manager.run(self._get_cluster_metadata, None)
return [t['name'] for t in metadata['topics']]

def describe_topics(self, topics=None):
"""Fetch metadata for the specified topics or all topics if None.

Keyword Arguments:
topics ([str], optional) A list of topic names. If None, metadata for all
topics is retrieved.

Returns:
A list of dicts describing each topic (including partition info).
"""
metadata = self._manager.run(self._get_cluster_metadata, topics)
return metadata['topics']

def describe_cluster(self):
"""Fetch cluster-wide metadata such as the list of brokers, the controller ID,
and the cluster ID.
Expand Down
5 changes: 2 additions & 3 deletions kafka/admin/_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,13 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_
election_type=ElectionType(election_type),
topic_partitions=self._get_topic_partitions(topic_partitions),
timeout_ms=timeout_ms,
max_version=1,
)
def response_errors(r):
if r.API_VERSION >= 1:
yield Errors.for_code(r.error_code)
for result in r.replica_election_results:
for partition in result[1]:
yield Errors.for_code(partition[1])
for partition in result.partition_result:
yield Errors.for_code(partition.error_code)
ignore_errors = (Errors.ElectionNotNeededError,)
return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors)

Expand Down
152 changes: 109 additions & 43 deletions kafka/admin/_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import time
from typing import TYPE_CHECKING
import uuid

import kafka.errors as Errors
from kafka.errors import IncompatibleBrokerVersion
Expand All @@ -25,26 +26,80 @@ class TopicAdminMixin:
_client: object
config: dict

def list_topics(self):
"""Retrieve a list of all topic names in the cluster.

Returns:
A list of topic name strings.
"""
metadata = self._manager.run(self._get_cluster_metadata, None)
return [t['name'] for t in metadata['topics']]

def describe_topics(self, topics=None):
"""Fetch metadata for the specified topics or all topics if None.

Keyword Arguments:
topics ([str], optional) A list of topic names. If None, metadata for all
topics is retrieved.

Returns:
A list of dicts describing each topic (including partition info).
"""
metadata = self._manager.run(self._get_cluster_metadata, topics)
return metadata['topics']

@staticmethod
def _convert_new_topic_request(new_topic):
return (
new_topic.name,
new_topic.num_partitions,
new_topic.replication_factor,
[
(partition_id, replicas) for partition_id, replicas in new_topic.replica_assignments.items()
],
[
(config_key, config_value) for config_key, config_value in new_topic.topic_configs.items()
]
)
def _process_create_topics_input(new_topics):
_Topic = CreateTopicsRequest.CreatableTopic
_Assignment = _Topic.CreatableReplicaAssignment
_Config = _Topic.CreatableTopicConfig
topics = []
if isinstance(new_topics, dict):
# {topic_name: {num_partitions:, replication_factor:, assignments: {partition: [broker_ids]}, configs: {key: value}}
for topic, data in new_topics.items():
configs = data.get('configs', {})
topics.append(_Topic(
name=topic,
num_partitions=data.get('num_partitions', -1),
replication_factor=data.get('replication_factor', -1),
assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas)
for partition_id, replicas in data.get('assignments', {}).items()],
configs=[_Config(name=config_key, value=config_value)
for config_key, config_value in data.get('configs', {}).items()]
))
elif all(isinstance(v, str) for v in new_topics):
for new_topic in new_topics:
topics.append(_Topic(name=new_topic, num_partitions=-1, replication_factor=-1))
else:
if all(isinstance(v, NewTopic) for v in new_topics):
for new_topic in new_topics:
topics.append(_Topic(
name=new_topic.name,
num_partitions=new_topic.num_partitions,
replication_factor=new_topic.replication_factor,
assignments=[_Assignment(partition_index=partition_id, broker_ids=replicas)
for partition_id, replicas in new_topic.replica_assignments.items()],
configs=[_Config(name=config_key, value=config_value)
for config_key, config_value in new_topic.topic_configs.items()]
))
if not topics:
raise ValueError(f"No valid topics found in new_topics: {new_topics}")
return topics

def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True,
wait_for_metadata=False):
"""Create new topics in the cluster.

Arguments:
new_topics: A list of NewTopic objects.
new_topics: A list of topic names,
or a dict of {topic_name: {num_partitions: int (default -1),
replication_factor: int (default -1),
assignments: {partition: [broker_ids]},
configs: {key: value}}}
All keys are optional.
List of NewTopic objects is deprecated.
Note: for brokers < 2.4, num_partitions and replication_factor
are required and must be provided via dict or [NewTopic].

Keyword Arguments:
timeout_ms (numeric, optional): Milliseconds to wait for new topics to be created
Expand All @@ -65,8 +120,15 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
.format(self._manager.broker_version))

topics = self._process_create_topics_input(new_topics)
if self._manager.broker_version < (2, 4):
if any(topic.num_partitions == -1 or topic.replication_factor == -1 for topic in topics):
raise IncompatibleBrokerVersion(
"Broker version {} requires explicit num_partitions and replication_factor"
.format(self._manager.broker_version))

request = CreateTopicsRequest(
topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
topics=topics,
timeout_ms=timeout_ms,
validate_only=validate_only,
max_version=3,
Expand All @@ -76,7 +138,7 @@ def response_errors(r):
yield Errors.for_code(topic.error_code)
response = self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors)
if wait_for_metadata:
self.wait_for_topics([new_topic.name for new_topic in new_topics])
self.wait_for_topics([new_topic.name for new_topic in request.topics])
return response

def wait_for_topics(self, topic_names, timeout_ms=10000):
Expand Down Expand Up @@ -155,7 +217,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
"""Delete topics from the cluster.

Arguments:
topics ([str]): A list of topic name strings.
topics ([str]): A list of topic name strings or uuid.UUID ids.

Keyword Arguments:
timeout_ms (numeric, optional): Milliseconds to wait for topics to be deleted
Expand All @@ -166,35 +228,18 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
Appropriate version of DeleteTopicsResponse class.
"""
timeout_ms = self._validate_timeout(timeout_ms)
_Topic = DeleteTopicsRequest.DeleteTopicState
request = DeleteTopicsRequest(
topic_names=topics, timeout_ms=timeout_ms,
max_version=5,
)
topics=[_Topic(topic_id=topic) if isinstance(topic, uuid.UUID) else _Topic(name=topic)
for topic in topics],
timeout_ms=timeout_ms)
def response_errors(r):
for response in r.responses:
yield Errors.for_code(response.error_code)
return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors)

def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True):
"""Create additional partitions for an existing topic.

Arguments:
topic_partitions: A dict of topic name strings to total partition count (int),
or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}}
if manual assignment is desired.
dict of {topic_name: NewPartition} is deprecated.

Keyword Arguments:
timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be
created before the broker returns.
validate_only (bool, optional): If True, don't actually create new partitions.
Default: False
raise_errors (bool, optional): Whether to raise errors as exceptions. Default True.

Returns:
Appropriate version of CreatePartitionsResponse class.
"""
timeout_ms = self._validate_timeout(timeout_ms)
@staticmethod
def _process_create_partitions_input(topic_partitions):
_Topic = CreatePartitionsRequest.CreatePartitionsTopic
_Assignment = CreatePartitionsRequest.CreatePartitionsTopic.CreatePartitionsAssignment
topics = []
Expand All @@ -208,16 +253,37 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
count=count['count'],
assignments=[_Assignment(broker_ids=broker_ids)
for broker_ids in count['assignments']]))

else:
topics.append(
_Topic(
name=topic,
count=count.total_count,
assignments=[_Assignment(broker_ids=broker_ids)
for broker_ids in count.new_assignments]))
return topics

def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True):
"""Create additional partitions for an existing topic.

Arguments:
topic_partitions: A dict of topic name strings to total partition count (int),
or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}}
if manual assignment is desired.
dict of {topic_name: NewPartition} is deprecated.

Keyword Arguments:
timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be
created before the broker returns.
validate_only (bool, optional): If True, don't actually create new partitions.
Default: False
raise_errors (bool, optional): Whether to raise errors as exceptions. Default True.

Returns:
Appropriate version of CreatePartitionsResponse class.
"""
timeout_ms = self._validate_timeout(timeout_ms)
request = CreatePartitionsRequest(
topics=topics,
topics=self._process_create_partitions_input(topic_partitions),
timeout_ms=timeout_ms,
validate_only=validate_only)

Expand All @@ -228,7 +294,7 @@ def response_errors(r):


class NewTopic:
"""A class for new topic creation.
"""DEPRECATED: A class for new topic creation.

Arguments:
name (string): name of the topic
Expand All @@ -251,7 +317,7 @@ def __init__(self, name, num_partitions=-1, replication_factor=-1,


class NewPartitions:
"""A class for new partition creation on existing topics.
"""DEPRECATED: A class for new partition creation on existing topics.

Note that the length of new_assignments, if specified, must be the
difference between the new total number of partitions and the existing
Expand Down
Loading
Loading