diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index 9e427affe..405bb482b 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -3,7 +3,8 @@ ResourceType, ACLPermissionType, ACLResourcePatternType) from kafka.admin._configs import ( ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType) -from kafka.admin._topics import NewTopic, NewPartitions +from kafka.admin._topics import NewTopic +from kafka.admin._partitions import NewPartitions from kafka.admin._users import ( ScramMechanism, UserScramCredentialDeletion, UserScramCredentialUpsertion) diff --git a/kafka/admin/_partitions.py b/kafka/admin/_partitions.py new file mode 100644 index 000000000..37fcc470d --- /dev/null +++ b/kafka/admin/_partitions.py @@ -0,0 +1,439 @@ +"""Partition management mixin for KafkaAdminClient. + +Also defines NewPartitions data class. +""" + +from __future__ import annotations + +import logging +from collections import defaultdict +from typing import TYPE_CHECKING + +import kafka.errors as Errors +from kafka.errors import UnknownTopicOrPartitionError +from kafka.protocol.admin import ( + AlterPartitionReassignmentsRequest, + CreatePartitionsRequest, + DeleteRecordsRequest, + DescribeTopicPartitionsRequest, + ElectLeadersRequest, + ElectionType, + ListPartitionReassignmentsRequest, +) +from kafka.structs import TopicPartition + +if TYPE_CHECKING: + from kafka.net.manager import KafkaConnectionManager + +log = logging.getLogger(__name__) + + +class PartitionAdminMixin: + """Mixin providing partition and record management methods.""" + _manager: KafkaConnectionManager + config: dict + + @staticmethod + def _process_create_partitions_input(topic_partitions): + _Topic = CreatePartitionsRequest.CreatePartitionsTopic + _Assignment = CreatePartitionsRequest.CreatePartitionsTopic.CreatePartitionsAssignment + topics = [] + for topic, count in topic_partitions.items(): + if isinstance(count, int): + topics.append(_Topic(name=topic, count=count)) + elif isinstance(count, dict): + topics.append( + _Topic( + name=topic, + count=count['count'], + assignments=[_Assignment(broker_ids=broker_ids) + for broker_ids in count['assignments']])) + else: + topics.append( + _Topic( + name=topic, + count=count.total_count, + assignments=[_Assignment(broker_ids=broker_ids) + for broker_ids in count.new_assignments])) + return topics + + def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True): + """Create additional partitions for an existing topic. + + Arguments: + topic_partitions: A dict of topic name strings to total partition count (int), + or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}} + if manual assignment is desired. + dict of {topic_name: NewPartitions} is deprecated. + + Keyword Arguments: + timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be + created before the broker returns. + validate_only (bool, optional): If True, don't actually create new partitions. + Default: False + raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. + + Returns: + Appropriate version of CreatePartitionsResponse class. + """ + timeout_ms = self._validate_timeout(timeout_ms) + request = CreatePartitionsRequest( + topics=self._process_create_partitions_input(topic_partitions), + timeout_ms=timeout_ms, + validate_only=validate_only) + + def response_errors(r): + for result in r.results: + yield Errors.for_code(result.error_code) + return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) + + async def _async_get_leader_for_partitions(self, partitions): + """Finds ID of the leader node for every given topic partition.""" + partitions = set(partitions) + topics = set(tp.topic for tp in partitions) + + metadata = await self._get_cluster_metadata(topics) + + leader2partitions = defaultdict(list) + valid_partitions = set() + for topic in metadata.get("topics", ()): + for partition in topic.get("partitions", ()): + t2p = TopicPartition(topic=topic["name"], partition=partition["partition_index"]) + if t2p in partitions: + leader2partitions[partition["leader_id"]].append(t2p) + valid_partitions.add(t2p) + + if len(partitions) != len(valid_partitions): + unknown = set(partitions) - valid_partitions + raise UnknownTopicOrPartitionError( + "The following partitions are not known: %s" + % ", ".join(str(x) for x in unknown) + ) + + return leader2partitions + + async def _async_delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): + timeout_ms = self._validate_timeout(timeout_ms) + if partition_leader_id is None: + leader2partitions = await self._async_get_leader_for_partitions(set(records_to_delete)) + else: + leader2partitions = {partition_leader_id: set(records_to_delete)} + + responses = [] + for leader, partitions in leader2partitions.items(): + topic2partitions = defaultdict(list) + for partition in partitions: + topic2partitions[partition.topic].append(partition) + + request = DeleteRecordsRequest( + topics=[ + (topic, [(tp.partition, records_to_delete[tp]) for tp in partitions]) + for topic, partitions in topic2partitions.items() + ], + timeout_ms=timeout_ms + ) + response = await self._manager.send(request, node_id=leader) + responses.append(response.to_dict()) + + partition2result = {} + partition2error = {} + for response in responses: + for topic in response["topics"]: + for partition in topic["partitions"]: + tp = TopicPartition(topic["name"], partition["partition_index"]) + partition2result[tp] = partition + if partition["error_code"] != 0: + partition2error[tp] = partition["error_code"] + + if partition2error: + if len(partition2error) == 1: + key, error = next(iter(partition2error.items())) + raise Errors.for_code(error)( + "Error deleting records from topic %s partition %s" % (key.topic, key.partition) + ) + else: + raise Errors.BrokerResponseError( + "The following errors occured when trying to delete records: " + + ", ".join( + "%s(partition=%d): %s" % + (partition.topic, partition.partition, Errors.for_code(error).__name__) + for partition, error in partition2error.items() + ) + ) + + return partition2result + + def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): + """Delete records whose offset is smaller than the given offset of the corresponding partition. + + Arguments: + records_to_delete ({TopicPartition: int}): The earliest available offsets for the + given partitions. + + Keyword Arguments: + timeout_ms (numeric, optional): Timeout in milliseconds. + partition_leader_id (node_id / int, optional): If specified, all deletion requests + will be sent to this node. + + Returns: + dict {topicPartition -> metadata} + """ + return self._manager.run(self._async_delete_records, records_to_delete, timeout_ms, partition_leader_id) + + def _get_all_topic_partitions(self, topics=None): + return [ + ( + topic['name'], + [p['partition_index'] for p in topic['partitions']] + ) + for topic in self.describe_topics(topics) + ] + + def _get_topic_partitions(self, topic_partitions): + if isinstance(topic_partitions, dict): + return topic_partitions.items() + else: + return self._get_all_topic_partitions(topic_partitions) + + def elect_leaders(self, election_type, topic_partitions=None, timeout_ms=None, raise_errors=True): + """Trigger leader election for the specified topic partitions. + + Arguments: + election_type: Type of election to attempt. 0 for Preferred, 1 for Unclean + + Keyword Arguments: + topic_partitions (dict, list, optional): + Either: dict of {topic_name: [partition ids]}. + Or: list of [topic_name], and election will run on all partitions for topic. + Or: None, and election runs against all topics / all partitions. + Default: None + timeout_ms (num, optional): Milliseconds to wait for the leader election process. + raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. + + Returns: + Appropriate version of ElectLeadersResponse class. + """ + timeout_ms = self._validate_timeout(timeout_ms) + request = ElectLeadersRequest( + election_type=ElectionType(election_type), + topic_partitions=self._get_topic_partitions(topic_partitions), + timeout_ms=timeout_ms, + ) + def response_errors(r): + if r.API_VERSION >= 1: + yield Errors.for_code(r.error_code) + for result in r.replica_election_results: + for partition in result.partition_result: + yield Errors.for_code(partition.error_code) + ignore_errors = (Errors.ElectionNotNeededError,) + return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors) + + @staticmethod + def _process_alter_partition_reassignments_input(reassignments): + _Topic = AlterPartitionReassignmentsRequest.ReassignableTopic + _Partition = _Topic.ReassignablePartition + topic2partitions = defaultdict(list) + for tp, replicas in reassignments.items(): + topic2partitions[tp.topic].append(_Partition( + partition_index=tp.partition, + replicas=list(replicas) if replicas is not None else None, + )) + return [_Topic(name=topic, partitions=parts) for topic, parts in topic2partitions.items()] + + def alter_partition_reassignments(self, reassignments, timeout_ms=None, raise_errors=True): + """Alter the replica sets for the given partitions. + + Arguments: + reassignments (dict): A dict mapping + :class:`~kafka.TopicPartition` to a list of broker IDs for + the new replica set, or ``None`` to cancel a pending + reassignment for that partition. + + Keyword Arguments: + timeout_ms (numeric, optional): The time in ms to wait for the + request to complete. + raise_errors (bool, optional): Whether to raise errors as + exceptions. Default True. + + Returns: + Decoded AlterPartitionReassignmentsResponse (as a dict). + """ + timeout_ms = self._validate_timeout(timeout_ms) + + def response_errors(r): + yield Errors.for_code(r.error_code) + for topic in r.responses: + for partition in topic.partitions: + yield Errors.for_code(partition.error_code) + + request = AlterPartitionReassignmentsRequest( + timeout_ms=timeout_ms, + topics=self._process_alter_partition_reassignments_input(reassignments), + ) + response = self._manager.run( + self._send_request_to_controller, request, response_errors, raise_errors) + return response.to_dict() + + async def _async_list_partition_reassignments(self, topic_partitions=None, timeout_ms=None): + timeout_ms = self._validate_timeout(timeout_ms) + + if topic_partitions is None: + topics_field = None + else: + _Topic = ListPartitionReassignmentsRequest.ListPartitionReassignmentsTopics + if isinstance(topic_partitions, dict): + topics_field = [ + _Topic(name=topic, partition_indexes=list(partitions)) + for topic, partitions in topic_partitions.items() + ] + else: + topic2partitions = defaultdict(list) + for tp in topic_partitions: + topic2partitions[tp.topic].append(tp.partition) + topics_field = [ + _Topic(name=topic, partition_indexes=partitions) + for topic, partitions in topic2partitions.items() + ] + + request = ListPartitionReassignmentsRequest( + timeout_ms=timeout_ms, + topics=topics_field, + ) + response = await self._manager.send(request) + + top_level_error = Errors.for_code(response.error_code) + if top_level_error is not Errors.NoError: + raise top_level_error( + "ListPartitionReassignmentsRequest failed: %s" % response.error_message) + + ret = {} + for topic in response.topics: + for partition in topic.partitions: + ret[TopicPartition(topic.name, partition.partition_index)] = { + 'replicas': list(partition.replicas), + 'adding_replicas': list(partition.adding_replicas), + 'removing_replicas': list(partition.removing_replicas), + } + return ret + + def list_partition_reassignments(self, topic_partitions=None, timeout_ms=None): + """List the current ongoing partition reassignments. + + Arguments: + topic_partitions (dict, list, optional): + Either: a dict of ``{topic_name: [partition_ids]}``, + or a list of :class:`~kafka.TopicPartition`, + or ``None`` to list ongoing reassignments for all partitions. + Default: None. + + Keyword Arguments: + timeout_ms (numeric, optional): The time in ms to wait for the + request to complete. + + Returns: + dict: A dict mapping :class:`~kafka.TopicPartition` to a dict + with keys ``'replicas'``, ``'adding_replicas'``, and + ``'removing_replicas'`` (each a list of broker IDs). + """ + return self._manager.run( + self._async_list_partition_reassignments, topic_partitions, timeout_ms) + + async def _async_describe_topic_partitions(self, topics, response_partition_limit, cursor): + _Topic = DescribeTopicPartitionsRequest.TopicRequest + _Cursor = DescribeTopicPartitionsRequest.Cursor + + if cursor is not None: + cursor_field = _Cursor( + topic_name=cursor['topic_name'], + partition_index=cursor['partition_index'], + ) + else: + cursor_field = None + + request = DescribeTopicPartitionsRequest( + topics=[_Topic(name=t) for t in topics], + response_partition_limit=response_partition_limit, + cursor=cursor_field, + ) + response = await self._manager.send(request) + + result = [] + for topic in response.topics: + topic_dict = { + 'error_code': topic.error_code, + 'name': topic.name, + 'topic_id': topic.topic_id, + 'is_internal': topic.is_internal, + 'partitions': [ + { + 'error_code': p.error_code, + 'partition_index': p.partition_index, + 'leader_id': p.leader_id, + 'leader_epoch': p.leader_epoch, + 'replica_nodes': list(p.replica_nodes), + 'isr_nodes': list(p.isr_nodes), + 'eligible_leader_replicas': list(p.eligible_leader_replicas) if p.eligible_leader_replicas else None, + 'last_known_elr': list(p.last_known_elr) if p.last_known_elr else None, + 'offline_replicas': list(p.offline_replicas), + } + for p in topic.partitions + ], + 'topic_authorized_operations': topic.topic_authorized_operations, + } + result.append(topic_dict) + + next_cursor = None + if response.next_cursor is not None: + next_cursor = { + 'topic_name': response.next_cursor.topic_name, + 'partition_index': response.next_cursor.partition_index, + } + + return {'topics': result, 'next_cursor': next_cursor} + + def describe_topic_partitions(self, topics, response_partition_limit=2000, cursor=None): + """Describe topics with fine-grained partition-level control (KIP-966). + + Unlike :meth:`describe_topics`, this uses the DescribeTopicPartitions + API (apiKey 75, broker 3.7+) which supports pagination via a cursor + and partition-level ELR (Eligible Leader Replicas) information. + + Arguments: + topics ([str]): A list of topic names. + + Keyword Arguments: + response_partition_limit (int, optional): Maximum number of + partitions to include in the response. Default: 2000. + cursor (dict, optional): Dict with ``'topic_name'`` and + ``'partition_index'`` keys to start pagination from. Default: + None. + + Returns: + dict: ``{'topics': [...], 'next_cursor': None | {...}}``. + ``topics`` is a list of dicts (one per topic) with keys + ``error_code``, ``name``, ``topic_id``, ``is_internal``, + ``partitions``, and ``topic_authorized_operations``. + ``next_cursor`` is None if pagination is complete, otherwise a + dict with the next page's ``topic_name`` and ``partition_index``. + """ + return self._manager.run( + self._async_describe_topic_partitions, topics, response_partition_limit, cursor) + + +class NewPartitions: + """DEPRECATED: A class for new partition creation on existing topics. + + Note that the length of new_assignments, if specified, must be the + difference between the new total number of partitions and the existing + number of partitions. + + Arguments: + total_count (int): the total number of partitions that should exist + on the topic + new_assignments ([[int]]): an array of arrays of replica assignments + for new partitions. If not set, broker assigns replicas per an + internal algorithm. + """ + def __init__(self, total_count, new_assignments=None): + self.total_count = total_count + self.new_assignments = new_assignments diff --git a/kafka/admin/_records.py b/kafka/admin/_records.py deleted file mode 100644 index cc2df2954..000000000 --- a/kafka/admin/_records.py +++ /dev/null @@ -1,165 +0,0 @@ -"""Record deletion and cluster operation mixin for KafkaAdminClient.""" - -from __future__ import annotations - -import logging -from collections import defaultdict -from typing import TYPE_CHECKING - -import kafka.errors as Errors -from kafka.errors import UnknownTopicOrPartitionError -from kafka.protocol.admin import DeleteRecordsRequest, ElectLeadersRequest, ElectionType -from kafka.structs import TopicPartition - -if TYPE_CHECKING: - from kafka.net.manager import KafkaConnectionManager - -log = logging.getLogger(__name__) - - -class RecordAdminMixin: - """Mixin providing record deletion and cluster operation methods.""" - _manager: KafkaConnectionManager - config: dict - - async def _async_get_leader_for_partitions(self, partitions): - """Finds ID of the leader node for every given topic partition.""" - partitions = set(partitions) - topics = set(tp.topic for tp in partitions) - - metadata = await self._get_cluster_metadata(topics) - - leader2partitions = defaultdict(list) - valid_partitions = set() - for topic in metadata.get("topics", ()): - for partition in topic.get("partitions", ()): - t2p = TopicPartition(topic=topic["name"], partition=partition["partition_index"]) - if t2p in partitions: - leader2partitions[partition["leader_id"]].append(t2p) - valid_partitions.add(t2p) - - if len(partitions) != len(valid_partitions): - unknown = set(partitions) - valid_partitions - raise UnknownTopicOrPartitionError( - "The following partitions are not known: %s" - % ", ".join(str(x) for x in unknown) - ) - - return leader2partitions - - async def _async_delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): - timeout_ms = self._validate_timeout(timeout_ms) - if partition_leader_id is None: - leader2partitions = await self._async_get_leader_for_partitions(set(records_to_delete)) - else: - leader2partitions = {partition_leader_id: set(records_to_delete)} - - responses = [] - for leader, partitions in leader2partitions.items(): - topic2partitions = defaultdict(list) - for partition in partitions: - topic2partitions[partition.topic].append(partition) - - request = DeleteRecordsRequest( - topics=[ - (topic, [(tp.partition, records_to_delete[tp]) for tp in partitions]) - for topic, partitions in topic2partitions.items() - ], - timeout_ms=timeout_ms - ) - response = await self._manager.send(request, node_id=leader) - responses.append(response.to_dict()) - - partition2result = {} - partition2error = {} - for response in responses: - for topic in response["topics"]: - for partition in topic["partitions"]: - tp = TopicPartition(topic["name"], partition["partition_index"]) - partition2result[tp] = partition - if partition["error_code"] != 0: - partition2error[tp] = partition["error_code"] - - if partition2error: - if len(partition2error) == 1: - key, error = next(iter(partition2error.items())) - raise Errors.for_code(error)( - "Error deleting records from topic %s partition %s" % (key.topic, key.partition) - ) - else: - raise Errors.BrokerResponseError( - "The following errors occured when trying to delete records: " + - ", ".join( - "%s(partition=%d): %s" % - (partition.topic, partition.partition, Errors.for_code(error).__name__) - for partition, error in partition2error.items() - ) - ) - - return partition2result - - def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): - """Delete records whose offset is smaller than the given offset of the corresponding partition. - - Arguments: - records_to_delete ({TopicPartition: int}): The earliest available offsets for the - given partitions. - - Keyword Arguments: - timeout_ms (numeric, optional): Timeout in milliseconds. - partition_leader_id (node_id / int, optional): If specified, all deletion requests - will be sent to this node. - - Returns: - dict {topicPartition -> metadata} - """ - return self._manager.run(self._async_delete_records, records_to_delete, timeout_ms, partition_leader_id) - - def _get_all_topic_partitions(self, topics=None): - return [ - ( - topic['name'], - [p['partition_index'] for p in topic['partitions']] - ) - for topic in self.describe_topics(topics) - ] - - def _get_topic_partitions(self, topic_partitions): - if isinstance(topic_partitions, dict): - return topic_partitions.items() - else: - return self._get_all_topic_partitions(topic_partitions) - - def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None, raise_errors=True): - """Trigger leader election for the specified topic partitions. - - Arguments: - election_type: Type of election to attempt. 0 for Preferred, 1 for Unclean - - Keyword Arguments: - topic_partitions (dict, list, optional): - Either: dict of {topic_name: [partition ids]}. - Or: list of [topic_name], and election will run on all partitions for topic. - Or: None, and election runs against all topics / all partitions. - Default: None - timeout_ms (num, optional): Milliseconds to wait for the leader election process. - raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. - - Returns: - Appropriate version of ElectLeadersResponse class. - """ - timeout_ms = self._validate_timeout(timeout_ms) - request = ElectLeadersRequest( - election_type=ElectionType(election_type), - topic_partitions=self._get_topic_partitions(topic_partitions), - timeout_ms=timeout_ms, - ) - def response_errors(r): - if r.API_VERSION >= 1: - yield Errors.for_code(r.error_code) - for result in r.replica_election_results: - for partition in result.partition_result: - yield Errors.for_code(partition.error_code) - ignore_errors = (Errors.ElectionNotNeededError,) - return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors) - diff --git a/kafka/admin/_topics.py b/kafka/admin/_topics.py index 1670d45d6..7c26e3690 100644 --- a/kafka/admin/_topics.py +++ b/kafka/admin/_topics.py @@ -1,6 +1,6 @@ """Topic management mixin for KafkaAdminClient. -Also defines NewTopic and NewPartitions data classes. +Also defines NewTopic data class. """ from __future__ import annotations @@ -244,60 +244,6 @@ def response_errors(r): result['topics'] = result.pop('responses') return result - @staticmethod - def _process_create_partitions_input(topic_partitions): - _Topic = CreatePartitionsRequest.CreatePartitionsTopic - _Assignment = CreatePartitionsRequest.CreatePartitionsTopic.CreatePartitionsAssignment - topics = [] - for topic, count in topic_partitions.items(): - if isinstance(count, int): - topics.append(_Topic(name=topic, count=count)) - elif isinstance(count, dict): - topics.append( - _Topic( - name=topic, - count=count['count'], - assignments=[_Assignment(broker_ids=broker_ids) - for broker_ids in count['assignments']])) - else: - topics.append( - _Topic( - name=topic, - count=count.total_count, - assignments=[_Assignment(broker_ids=broker_ids) - for broker_ids in count.new_assignments])) - return topics - - def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True): - """Create additional partitions for an existing topic. - - Arguments: - topic_partitions: A dict of topic name strings to total partition count (int), - or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}} - if manual assignment is desired. - dict of {topic_name: NewPartition} is deprecated. - - Keyword Arguments: - timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be - created before the broker returns. - validate_only (bool, optional): If True, don't actually create new partitions. - Default: False - raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. - - Returns: - Appropriate version of CreatePartitionsResponse class. - """ - timeout_ms = self._validate_timeout(timeout_ms) - request = CreatePartitionsRequest( - topics=self._process_create_partitions_input(topic_partitions), - timeout_ms=timeout_ms, - validate_only=validate_only) - - def response_errors(r): - for result in r.results: - yield Errors.for_code(result.error_code) - return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) - class NewTopic: """DEPRECATED: A class for new topic creation. @@ -320,22 +266,3 @@ def __init__(self, name, num_partitions=-1, replication_factor=-1, self.replication_factor = replication_factor self.replica_assignments = replica_assignments or {} self.topic_configs = topic_configs or {} - - -class NewPartitions: - """DEPRECATED: A class for new partition creation on existing topics. - - Note that the length of new_assignments, if specified, must be the - difference between the new total number of partitions and the existing - number of partitions. - - Arguments: - total_count (int): the total number of partitions that should exist - on the topic - new_assignments ([[int]]): an array of arrays of replica assignments - for new partitions. If not set, broker assigns replicas per an - internal algorithm. - """ - def __init__(self, total_count, new_assignments=None): - self.total_count = total_count - self.new_assignments = new_assignments diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 18c315fe7..fc52a27d0 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -17,7 +17,7 @@ from kafka.admin._cluster import ClusterAdminMixin from kafka.admin._configs import ConfigAdminMixin from kafka.admin._groups import GroupAdminMixin -from kafka.admin._records import RecordAdminMixin +from kafka.admin._partitions import PartitionAdminMixin from kafka.admin._topics import TopicAdminMixin from kafka.admin._users import UserAdminMixin @@ -29,7 +29,7 @@ class KafkaAdminClient( ClusterAdminMixin, ConfigAdminMixin, GroupAdminMixin, - RecordAdminMixin, + PartitionAdminMixin, TopicAdminMixin, UserAdminMixin, ): diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index c013369a8..98c27a690 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -8,6 +8,7 @@ from .cluster import ClusterSubCommand from .configs import ConfigsSubCommand from .consumer_groups import ConsumerGroupsSubCommand +from .partitions import PartitionsSubCommand from .topics import TopicsSubCommand from .users import UsersSubCommand from ..common import add_common_cli_args @@ -49,7 +50,8 @@ def run_cli(args=None): parser = main_parser() subparsers = parser.add_subparsers(help='subcommands') for cmd in [ACLsSubCommand, ClusterSubCommand, ConfigsSubCommand, - TopicsSubCommand, ConsumerGroupsSubCommand, UsersSubCommand]: + TopicsSubCommand, PartitionsSubCommand, + ConsumerGroupsSubCommand, UsersSubCommand]: cmd.add_subparser(subparsers) config = parser.parse_args(args) @@ -100,7 +102,6 @@ def run_cli(args=None): # Commands TODO: # [configs] - # alter # IncrementalAlterConfigs (not supported yet) # [consumer-groups] @@ -123,13 +124,8 @@ def run_cli(args=None): # abort (not supported yet) # [topics] - # describe-partitions (DescribeTopicPartitions - not supported yet) # list-offsets (not supported yet) # delete-offsets (OffsetDelete - not supported yet) - # alter-reassignments (AlterPartitionReassignments - not supported yet) - # list-reassignments (ListPartitionReassignments - not supported yet) - # create-partitions - # elect-leaders # [cluster] # describe-features (DescribeFeatures - not supported yet) diff --git a/kafka/cli/admin/partitions/__init__.py b/kafka/cli/admin/partitions/__init__.py new file mode 100644 index 000000000..d1416f16b --- /dev/null +++ b/kafka/cli/admin/partitions/__init__.py @@ -0,0 +1,26 @@ +import sys + +from .alter_reassignments import AlterPartitionReassignments +from .create import CreatePartitions +from .delete_records import DeleteRecords +from .describe import DescribeTopicPartitions +from .elect_leaders import ElectLeaders +from .list_reassignments import ListPartitionReassignments + + +class PartitionsSubCommand: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('partitions', help='Manage Kafka Partitions') + commands = parser.add_subparsers() + for cmd in [ + CreatePartitions, + DeleteRecords, + ElectLeaders, + AlterPartitionReassignments, + ListPartitionReassignments, + DescribeTopicPartitions, + ]: + cmd.add_subparser(commands) + parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/partitions/alter_reassignments.py b/kafka/cli/admin/partitions/alter_reassignments.py new file mode 100644 index 000000000..59692cc52 --- /dev/null +++ b/kafka/cli/admin/partitions/alter_reassignments.py @@ -0,0 +1,39 @@ +from kafka.structs import TopicPartition + + +class AlterPartitionReassignments: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'alter-reassignments', + help='Alter replica assignments for partitions') + parser.add_argument( + '-r', '--reassign', type=str, action='append', + dest='reassignments', default=[], required=True, + help='TOPIC:PARTITION=BROKER_ID[,BROKER_ID...] to set a new ' + 'replica set, or TOPIC:PARTITION=cancel to cancel an ' + 'in-progress reassignment for that partition. Repeatable.') + parser.add_argument( + '--timeout-ms', type=int, default=None, + help='Request timeout in milliseconds') + parser.add_argument( + '--no-raise-errors', dest='raise_errors', action='store_false', + help='Do not raise on partition-level errors; return the response instead') + parser.set_defaults(command=cls.command, raise_errors=True) + + @classmethod + def command(cls, client, args): + reassignments = {} + for spec in args.reassignments: + tp_str, replicas_str = spec.rsplit('=', 1) + topic, partition = tp_str.rsplit(':', 1) + tp = TopicPartition(topic, int(partition)) + if replicas_str.lower() == 'cancel': + reassignments[tp] = None + else: + reassignments[tp] = [int(b) for b in replicas_str.split(',') if b] + return client.alter_partition_reassignments( + reassignments, + timeout_ms=args.timeout_ms, + raise_errors=args.raise_errors) diff --git a/kafka/cli/admin/partitions/create.py b/kafka/cli/admin/partitions/create.py new file mode 100644 index 000000000..43e02ae40 --- /dev/null +++ b/kafka/cli/admin/partitions/create.py @@ -0,0 +1,29 @@ +class CreatePartitions: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'create', + help='Create additional partitions for existing topics') + parser.add_argument( + '-p', '--topic-partitions', type=str, action='append', + dest='topic_partitions', default=[], required=True, + help='TOPIC:TOTAL_PARTITION_COUNT pair (repeatable)') + parser.add_argument( + '--timeout-ms', type=int, default=None, + help='Request timeout in milliseconds') + parser.add_argument( + '--validate-only', action='store_true', + help='Validate the request without actually creating partitions') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + topic_partitions = {} + for spec in args.topic_partitions: + topic, count = spec.rsplit(':', 1) + topic_partitions[topic] = int(count) + return client.create_partitions( + topic_partitions, + timeout_ms=args.timeout_ms, + validate_only=args.validate_only) diff --git a/kafka/cli/admin/partitions/delete_records.py b/kafka/cli/admin/partitions/delete_records.py new file mode 100644 index 000000000..03d2da0cc --- /dev/null +++ b/kafka/cli/admin/partitions/delete_records.py @@ -0,0 +1,33 @@ +from kafka.structs import TopicPartition + + +class DeleteRecords: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'delete-records', + help='Delete records from partitions up to a given offset') + parser.add_argument( + '-r', '--record', type=str, action='append', + dest='records', default=[], required=True, + help='TOPIC:PARTITION:OFFSET triple (repeatable). ' + 'Use -1 as OFFSET to delete up to the current high-water mark.') + parser.add_argument( + '--timeout-ms', type=int, default=None, + help='Request timeout in milliseconds') + parser.add_argument( + '--partition-leader-id', type=int, default=None, + help='Send all delete requests to this broker id, skipping metadata lookup') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + records_to_delete = {} + for spec in args.records: + topic, partition, offset = spec.rsplit(':', 2) + records_to_delete[TopicPartition(topic, int(partition))] = int(offset) + return client.delete_records( + records_to_delete, + timeout_ms=args.timeout_ms, + partition_leader_id=args.partition_leader_id) diff --git a/kafka/cli/admin/partitions/describe.py b/kafka/cli/admin/partitions/describe.py new file mode 100644 index 000000000..2319e067f --- /dev/null +++ b/kafka/cli/admin/partitions/describe.py @@ -0,0 +1,38 @@ +class DescribeTopicPartitions: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'describe', + help='Describe topic partitions with pagination (KIP-966, broker >=3.9)') + parser.add_argument( + '-t', '--topic', type=str, action='append', + dest='topics', default=[], required=True, + help='Topic to describe (repeatable)') + parser.add_argument( + '--response-partition-limit', type=int, default=2000, + help='Maximum number of partitions to include in the response ' + '(default: 2000)') + parser.add_argument( + '--cursor-topic', type=str, default=None, + help='Topic name to start pagination from') + parser.add_argument( + '--cursor-partition', type=int, default=None, + help='Partition index to start pagination from') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + cursor = None + if args.cursor_topic is not None or args.cursor_partition is not None: + if args.cursor_topic is None or args.cursor_partition is None: + raise ValueError( + '--cursor-topic and --cursor-partition must be used together') + cursor = { + 'topic_name': args.cursor_topic, + 'partition_index': args.cursor_partition, + } + return client.describe_topic_partitions( + args.topics, + response_partition_limit=args.response_partition_limit, + cursor=cursor) diff --git a/kafka/cli/admin/partitions/elect_leaders.py b/kafka/cli/admin/partitions/elect_leaders.py new file mode 100644 index 000000000..f90789627 --- /dev/null +++ b/kafka/cli/admin/partitions/elect_leaders.py @@ -0,0 +1,55 @@ +from collections import defaultdict + + +_ELECTION_TYPES = {'preferred': 0, 'unclean': 1} + + +class ElectLeaders: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'elect-leaders', + help='Trigger leader election for partitions') + parser.add_argument( + '--election-type', type=str, default='preferred', + choices=sorted(_ELECTION_TYPES), + help='Election type (default: preferred)') + parser.add_argument( + '-p', '--topic-partition', type=str, action='append', + dest='topic_partitions', default=[], + help='TOPIC:PARTITION pair (repeatable). Omit to elect leaders for ' + 'all partitions of all topics.') + parser.add_argument( + '-t', '--topic', type=str, action='append', + dest='topics', default=[], + help='Elect leaders for all partitions of TOPIC (repeatable). ' + 'Mutually exclusive with --topic-partition.') + parser.add_argument( + '--timeout-ms', type=int, default=None, + help='Request timeout in milliseconds') + parser.add_argument( + '--no-raise-errors', dest='raise_errors', action='store_false', + help='Do not raise on partition-level errors; return the response instead') + parser.set_defaults(command=cls.command, raise_errors=True) + + @classmethod + def command(cls, client, args): + if args.topic_partitions and args.topics: + raise ValueError( + '--topic-partition and --topic are mutually exclusive') + if args.topic_partitions: + topic2partitions = defaultdict(list) + for spec in args.topic_partitions: + topic, partition = spec.rsplit(':', 1) + topic2partitions[topic].append(int(partition)) + topic_partitions = dict(topic2partitions) + elif args.topics: + topic_partitions = args.topics + else: + topic_partitions = None + return client.elect_leaders( + _ELECTION_TYPES[args.election_type], + topic_partitions=topic_partitions, + timeout_ms=args.timeout_ms, + raise_errors=args.raise_errors) diff --git a/kafka/cli/admin/partitions/list_reassignments.py b/kafka/cli/admin/partitions/list_reassignments.py new file mode 100644 index 000000000..6ef97187d --- /dev/null +++ b/kafka/cli/admin/partitions/list_reassignments.py @@ -0,0 +1,37 @@ +from collections import defaultdict + +from kafka.structs import TopicPartition + + +class ListPartitionReassignments: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser( + 'list-reassignments', + help='List the current ongoing partition reassignments') + parser.add_argument( + '-p', '--topic-partition', type=str, action='append', + dest='topic_partitions', default=[], + help='TOPIC:PARTITION pair (repeatable). Omit to list ' + 'reassignments for all partitions.') + parser.add_argument( + '--timeout-ms', type=int, default=None, + help='Request timeout in milliseconds') + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + if args.topic_partitions: + topic2partitions = defaultdict(list) + for spec in args.topic_partitions: + topic, partition = spec.rsplit(':', 1) + topic2partitions[topic].append(int(partition)) + topic_partitions = dict(topic2partitions) + else: + topic_partitions = None + result = client.list_partition_reassignments( + topic_partitions, timeout_ms=args.timeout_ms) + # TopicPartition keys don't render cleanly in --format json; normalize + # to "topic:partition" strings for CLI output. + return {'%s:%d' % (tp.topic, tp.partition): v for tp, v in result.items()} diff --git a/kafka/protocol/admin/topics.py b/kafka/protocol/admin/topics.py index 83afe6e8e..e5fb9d487 100644 --- a/kafka/protocol/admin/topics.py +++ b/kafka/protocol/admin/topics.py @@ -20,9 +20,6 @@ class DeleteTopicsResponse(ApiMessage): pass class CreatePartitionsRequest(ApiMessage): pass class CreatePartitionsResponse(ApiMessage): pass -class AlterPartitionRequest(ApiMessage): pass -class AlterPartitionResponse(ApiMessage): pass - class AlterPartitionReassignmentsRequest(ApiMessage): pass class AlterPartitionReassignmentsResponse(ApiMessage): pass @@ -48,7 +45,6 @@ class ElectionType(IntEnum): 'CreateTopicsRequest', 'CreateTopicsResponse', 'DeleteTopicsRequest', 'DeleteTopicsResponse', 'CreatePartitionsRequest', 'CreatePartitionsResponse', - 'AlterPartitionRequest', 'AlterPartitionResponse', 'AlterPartitionReassignmentsRequest', 'AlterPartitionReassignmentsResponse', 'ListPartitionReassignmentsRequest', 'ListPartitionReassignmentsResponse', 'DescribeTopicPartitionsRequest', 'DescribeTopicPartitionsResponse', diff --git a/kafka/protocol/admin/topics.pyi b/kafka/protocol/admin/topics.pyi index ceda4b494..044317c9e 100644 --- a/kafka/protocol/admin/topics.pyi +++ b/kafka/protocol/admin/topics.pyi @@ -6,7 +6,7 @@ from enum import IntEnum from kafka.protocol.api_message import ApiMessage from kafka.protocol.data_container import DataContainer -__all__ = ['CreateTopicsRequest', 'CreateTopicsResponse', 'DeleteTopicsRequest', 'DeleteTopicsResponse', 'CreatePartitionsRequest', 'CreatePartitionsResponse', 'AlterPartitionRequest', 'AlterPartitionResponse', 'AlterPartitionReassignmentsRequest', 'AlterPartitionReassignmentsResponse', 'ListPartitionReassignmentsRequest', 'ListPartitionReassignmentsResponse', 'DescribeTopicPartitionsRequest', 'DescribeTopicPartitionsResponse', 'DeleteRecordsRequest', 'DeleteRecordsResponse', 'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType'] +__all__ = ['CreateTopicsRequest', 'CreateTopicsResponse', 'DeleteTopicsRequest', 'DeleteTopicsResponse', 'CreatePartitionsRequest', 'CreatePartitionsResponse', 'AlterPartitionReassignmentsRequest', 'AlterPartitionReassignmentsResponse', 'ListPartitionReassignmentsRequest', 'ListPartitionReassignmentsResponse', 'DescribeTopicPartitionsRequest', 'DescribeTopicPartitionsResponse', 'DeleteRecordsRequest', 'DeleteRecordsResponse', 'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType'] class CreateTopicsRequest(ApiMessage): class CreatableTopic(DataContainer): @@ -362,159 +362,6 @@ class CreatePartitionsResponse(ApiMessage): def expect_response(self) -> bool: ... def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... -class AlterPartitionRequest(ApiMessage): - class TopicData(DataContainer): - class PartitionData(DataContainer): - class BrokerState(DataContainer): - broker_id: int - broker_epoch: int - def __init__( - self, - *args: Any, - broker_id: int = ..., - broker_epoch: int = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - partition_index: int - leader_epoch: int - new_isr: list[int] - new_isr_with_epochs: list[BrokerState] - leader_recovery_state: int - partition_epoch: int - def __init__( - self, - *args: Any, - partition_index: int = ..., - leader_epoch: int = ..., - new_isr: list[int] = ..., - new_isr_with_epochs: list[BrokerState] = ..., - leader_recovery_state: int = ..., - partition_epoch: int = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - topic_id: uuid.UUID - partitions: list[PartitionData] - def __init__( - self, - *args: Any, - topic_id: uuid.UUID = ..., - partitions: list[PartitionData] = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - broker_id: int - broker_epoch: int - topics: list[TopicData] - def __init__( - self, - *args: Any, - broker_id: int = ..., - broker_epoch: int = ..., - topics: list[TopicData] = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - name: str - type: str - API_KEY: int - API_VERSION: int - valid_versions: tuple[int, int] - min_version: int - max_version: int - @property - def header(self) -> Any: ... - @classmethod - def is_request(cls) -> bool: ... - def expect_response(self) -> bool: ... - def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... - -class AlterPartitionResponse(ApiMessage): - class TopicData(DataContainer): - class PartitionData(DataContainer): - partition_index: int - error_code: int - leader_id: int - leader_epoch: int - isr: list[int] - leader_recovery_state: int - partition_epoch: int - def __init__( - self, - *args: Any, - partition_index: int = ..., - error_code: int = ..., - leader_id: int = ..., - leader_epoch: int = ..., - isr: list[int] = ..., - leader_recovery_state: int = ..., - partition_epoch: int = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - topic_id: uuid.UUID - partitions: list[PartitionData] - def __init__( - self, - *args: Any, - topic_id: uuid.UUID = ..., - partitions: list[PartitionData] = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - - throttle_time_ms: int - error_code: int - topics: list[TopicData] - def __init__( - self, - *args: Any, - throttle_time_ms: int = ..., - error_code: int = ..., - topics: list[TopicData] = ..., - version: int | None = None, - **kwargs: Any, - ) -> None: ... - @property - def version(self) -> int | None: ... - def to_dict(self, meta: bool = False, json: bool = True) -> dict: ... - name: str - type: str - API_KEY: int - API_VERSION: int - valid_versions: tuple[int, int] - min_version: int - max_version: int - @property - def header(self) -> Any: ... - @classmethod - def is_request(cls) -> bool: ... - def expect_response(self) -> bool: ... - def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ... - class AlterPartitionReassignmentsRequest(ApiMessage): class ReassignableTopic(DataContainer): class ReassignablePartition(DataContainer): diff --git a/kafka/protocol/schemas/resources/AlterPartitionRequest.json b/kafka/protocol/schemas/resources/AlterPartitionRequest.json deleted file mode 100644 index fa8d318e5..000000000 --- a/kafka/protocol/schemas/resources/AlterPartitionRequest.json +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implie -// See the License for the specific language governing permissions and -// limitations under the License. - -{ - "apiKey": 56, - "type": "request", - "listeners": ["controller"], - "name": "AlterPartitionRequest", - // Versions 0-1 were removed in Apache Kafka 4.0, version 2 is the new baseline. - - // Version 1 adds LeaderRecoveryState field (KIP-704). - // Version 2 adds TopicId field to replace TopicName field (KIP-841). - // - // Version 3 adds the NewIsrEpochs field and deprecates the NewIsr field (KIP-903). - "validVersions": "2-3", - "flexibleVersions": "0+", - "fields": [ - { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", - "about": "The ID of the requesting broker." }, - { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1", - "about": "The epoch of the requesting broker." }, - { "name": "Topics", "type": "[]TopicData", "versions": "0+", - "about": "The topics to alter ISRs for.", "fields": [ - { "name": "TopicId", "type": "uuid", "versions": "2+", "ignorable": true, - "about": "The ID of the topic to alter ISRs for." }, - { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", - "about": "The partitions to alter ISRs for.", "fields": [ - { "name": "PartitionIndex", "type": "int32", "versions": "0+", - "about": "The partition index." }, - { "name": "LeaderEpoch", "type": "int32", "versions": "0+", - "about": "The leader epoch of this partition." }, - { "name": "NewIsr", "type": "[]int32", "versions": "0-2", "entityType": "brokerId", - "about": "The ISR for this partition. Deprecated since version 3." }, - { "name": "NewIsrWithEpochs", "type": "[]BrokerState", "versions": "3+", - "about": "The ISR for this partition.", "fields": [ - { "name": "BrokerId", "type": "int32", "versions": "3+", "entityType": "brokerId", - "about": "The ID of the broker." }, - { "name": "BrokerEpoch", "type": "int64", "versions": "3+", "default": "-1", - "about": "The epoch of the broker. It will be -1 if the epoch check is not supported." } - ]}, - { "name": "LeaderRecoveryState", "type": "int8", "versions": "1+", "default": "0", - "about": "1 if the partition is recovering from an unclean leader election; 0 otherwise." }, - { "name": "PartitionEpoch", "type": "int32", "versions": "0+", - "about": "The expected epoch of the partition which is being updated." } - ]} - ]} - ] -} diff --git a/kafka/protocol/schemas/resources/AlterPartitionResponse.json b/kafka/protocol/schemas/resources/AlterPartitionResponse.json deleted file mode 100644 index 94daf85cb..000000000 --- a/kafka/protocol/schemas/resources/AlterPartitionResponse.json +++ /dev/null @@ -1,57 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -{ - "apiKey": 56, - "type": "response", - "name": "AlterPartitionResponse", - // Versions 0-1 were removed in Apache Kafka 4.0, version 2 is the new baseline. - - // Version 1 adds LeaderRecoveryState field (KIP-704). - // Version 2 adds TopicId field to replace TopicName field, can return the following new errors: - // INELIGIBLE_REPLICA, NEW_LEADER_ELECTED and UNKNOWN_TOPIC_ID (KIP-841). - // - // Version 3 is the same as version 2 (KIP-903). - "validVersions": "2-3", - "flexibleVersions": "0+", - "fields": [ - { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", - "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, - { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The top level response error code." }, - { "name": "Topics", "type": "[]TopicData", "versions": "0+", - "about": "The responses for each topic.", "fields": [ - { "name": "TopicId", "type": "uuid", "versions": "2+", "ignorable": true, - "about": "The ID of the topic." }, - { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", - "about": "The responses for each partition.", "fields": [ - { "name": "PartitionIndex", "type": "int32", "versions": "0+", - "about": "The partition index." }, - { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The partition level error code." }, - { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId", - "about": "The broker ID of the leader." }, - { "name": "LeaderEpoch", "type": "int32", "versions": "0+", - "about": "The leader epoch." }, - { "name": "Isr", "type": "[]int32", "versions": "0+", "entityType": "brokerId", - "about": "The in-sync replica IDs." }, - { "name": "LeaderRecoveryState", "type": "int8", "versions": "1+", "default": "0", "ignorable": true, - "about": "1 if the partition is recovering from an unclean leader election; 0 otherwise." }, - { "name": "PartitionEpoch", "type": "int32", "versions": "0+", - "about": "The current epoch for the partition for KRaft controllers." } - ]} - ]} - ] -} diff --git a/test/admin/test_admin_partitions.py b/test/admin/test_admin_partitions.py new file mode 100644 index 000000000..f4bb64738 --- /dev/null +++ b/test/admin/test_admin_partitions.py @@ -0,0 +1,435 @@ +import uuid + +import pytest + +from kafka.admin import KafkaAdminClient +from kafka.protocol.admin import ( + AlterPartitionReassignmentsRequest, AlterPartitionReassignmentsResponse, + ListPartitionReassignmentsRequest, ListPartitionReassignmentsResponse, + DescribeTopicPartitionsRequest, DescribeTopicPartitionsResponse, +) +from kafka.protocol.metadata import MetadataResponse +from kafka.structs import TopicPartition + +from test.mock_broker import MockBroker + + +def _make_admin(broker): + return KafkaAdminClient( + kafka_client=broker.client_factory(), + bootstrap_servers='%s:%d' % (broker.host, broker.port), + api_version=broker.broker_version, + request_timeout_ms=5000, + ) + + +# --------------------------------------------------------------------------- +# alter_partition_reassignments +# --------------------------------------------------------------------------- + + +class TestAlterPartitionReassignmentsMockBroker: + + def test_success_returns_dict(self): + broker = MockBroker() + Topic = AlterPartitionReassignmentsResponse.ReassignableTopicResponse + Partition = Topic.ReassignablePartitionResponse + broker.respond( + AlterPartitionReassignmentsRequest, + AlterPartitionReassignmentsResponse( + throttle_time_ms=0, + error_code=0, + error_message=None, + responses=[ + Topic( + name='topic-a', + partitions=[ + Partition(partition_index=0, error_code=0, error_message=None), + ], + ), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.alter_partition_reassignments({ + TopicPartition('topic-a', 0): [1, 2, 3], + }) + finally: + admin.close() + + assert result['error_code'] == 0 + assert result['responses'][0]['name'] == 'topic-a' + assert result['responses'][0]['partitions'][0]['error_code'] == 0 + + def test_cancel_reassignment_sends_null_replicas(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + decoded = AlterPartitionReassignmentsRequest.decode( + request_bytes, version=api_version, header=True) + captured['request'] = decoded + return AlterPartitionReassignmentsResponse( + throttle_time_ms=0, error_code=0, error_message=None, responses=[]) + + broker.respond_fn(AlterPartitionReassignmentsRequest, handler) + + admin = _make_admin(broker) + try: + admin.alter_partition_reassignments({ + TopicPartition('topic-a', 0): None, # cancel + TopicPartition('topic-a', 1): [4, 5], + }) + finally: + admin.close() + + req = captured['request'] + assert len(req.topics) == 1 + assert req.topics[0].name == 'topic-a' + by_index = {p.partition_index: p for p in req.topics[0].partitions} + assert by_index[0].replicas is None + assert list(by_index[1].replicas) == [4, 5] + + def test_partition_level_error_raises(self): + broker = MockBroker() + Topic = AlterPartitionReassignmentsResponse.ReassignableTopicResponse + Partition = Topic.ReassignablePartitionResponse + broker.respond( + AlterPartitionReassignmentsRequest, + AlterPartitionReassignmentsResponse( + throttle_time_ms=0, + error_code=0, + error_message=None, + responses=[ + Topic( + name='topic-a', + partitions=[ + Partition(partition_index=0, error_code=37, # InvalidPartitionsError + error_message='bad partition'), + ], + ), + ], + ), + ) + + admin = _make_admin(broker) + try: + with pytest.raises(Exception): + admin.alter_partition_reassignments({ + TopicPartition('topic-a', 0): [1, 2, 3], + }) + finally: + admin.close() + + def test_partition_error_suppressed_with_raise_errors_false(self): + broker = MockBroker() + Topic = AlterPartitionReassignmentsResponse.ReassignableTopicResponse + Partition = Topic.ReassignablePartitionResponse + broker.respond( + AlterPartitionReassignmentsRequest, + AlterPartitionReassignmentsResponse( + throttle_time_ms=0, + error_code=0, + error_message=None, + responses=[ + Topic( + name='topic-a', + partitions=[ + Partition(partition_index=0, error_code=37, error_message='bad'), + ], + ), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.alter_partition_reassignments( + {TopicPartition('topic-a', 0): [1, 2, 3]}, + raise_errors=False, + ) + finally: + admin.close() + + assert result['responses'][0]['partitions'][0]['error_code'] == 37 + + +# --------------------------------------------------------------------------- +# list_partition_reassignments +# --------------------------------------------------------------------------- + + +class TestListPartitionReassignmentsMockBroker: + + def test_returns_tp_to_reassignment_dict(self): + broker = MockBroker() + Topic = ListPartitionReassignmentsResponse.OngoingTopicReassignment + Partition = Topic.OngoingPartitionReassignment + broker.respond( + ListPartitionReassignmentsRequest, + ListPartitionReassignmentsResponse( + throttle_time_ms=0, + error_code=0, + error_message=None, + topics=[ + Topic( + name='topic-a', + partitions=[ + Partition( + partition_index=0, + replicas=[1, 2, 3], + adding_replicas=[4], + removing_replicas=[1], + ), + Partition( + partition_index=1, + replicas=[2, 3, 4], + adding_replicas=[], + removing_replicas=[], + ), + ], + ), + ], + ), + ) + + admin = _make_admin(broker) + try: + result = admin.list_partition_reassignments() + finally: + admin.close() + + assert result == { + TopicPartition('topic-a', 0): { + 'replicas': [1, 2, 3], + 'adding_replicas': [4], + 'removing_replicas': [1], + }, + TopicPartition('topic-a', 1): { + 'replicas': [2, 3, 4], + 'adding_replicas': [], + 'removing_replicas': [], + }, + } + + def test_none_topic_partitions_sends_null_topics(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + decoded = ListPartitionReassignmentsRequest.decode( + request_bytes, version=api_version, header=True) + captured['request'] = decoded + return ListPartitionReassignmentsResponse( + throttle_time_ms=0, error_code=0, error_message=None, topics=[]) + + broker.respond_fn(ListPartitionReassignmentsRequest, handler) + + admin = _make_admin(broker) + try: + admin.list_partition_reassignments() + finally: + admin.close() + + assert captured['request'].topics is None + + def test_dict_input_encodes_topic_partitions(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + decoded = ListPartitionReassignmentsRequest.decode( + request_bytes, version=api_version, header=True) + captured['request'] = decoded + return ListPartitionReassignmentsResponse( + throttle_time_ms=0, error_code=0, error_message=None, topics=[]) + + broker.respond_fn(ListPartitionReassignmentsRequest, handler) + + admin = _make_admin(broker) + try: + admin.list_partition_reassignments({'topic-a': [0, 1], 'topic-b': [2]}) + finally: + admin.close() + + topics = {t.name: list(t.partition_indexes) for t in captured['request'].topics} + assert topics == {'topic-a': [0, 1], 'topic-b': [2]} + + def test_tp_list_input_groups_by_topic(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + decoded = ListPartitionReassignmentsRequest.decode( + request_bytes, version=api_version, header=True) + captured['request'] = decoded + return ListPartitionReassignmentsResponse( + throttle_time_ms=0, error_code=0, error_message=None, topics=[]) + + broker.respond_fn(ListPartitionReassignmentsRequest, handler) + + admin = _make_admin(broker) + try: + admin.list_partition_reassignments([ + TopicPartition('topic-a', 0), + TopicPartition('topic-a', 1), + TopicPartition('topic-b', 5), + ]) + finally: + admin.close() + + topics = {t.name: sorted(t.partition_indexes) for t in captured['request'].topics} + assert topics == {'topic-a': [0, 1], 'topic-b': [5]} + + def test_top_level_error_raises(self): + broker = MockBroker() + broker.respond( + ListPartitionReassignmentsRequest, + ListPartitionReassignmentsResponse( + throttle_time_ms=0, + error_code=41, # NotControllerError + error_message='not controller', + topics=[], + ), + ) + + admin = _make_admin(broker) + try: + with pytest.raises(Exception) as exc_info: + admin.list_partition_reassignments() + assert 'not controller' in str(exc_info.value) + finally: + admin.close() + + +# --------------------------------------------------------------------------- +# describe_topic_partitions +# --------------------------------------------------------------------------- + + +class TestDescribeTopicPartitionsMockBroker: + + def test_returns_topic_partition_details(self): + broker = MockBroker() + topic_id = uuid.uuid4() + Topic = DescribeTopicPartitionsResponse.DescribeTopicPartitionsResponseTopic + Partition = Topic.DescribeTopicPartitionsResponsePartition + broker.respond( + DescribeTopicPartitionsRequest, + DescribeTopicPartitionsResponse( + throttle_time_ms=0, + topics=[ + Topic( + error_code=0, + name='topic-a', + topic_id=topic_id, + is_internal=False, + partitions=[ + Partition( + error_code=0, + partition_index=0, + leader_id=1, + leader_epoch=5, + replica_nodes=[1, 2, 3], + isr_nodes=[1, 2], + eligible_leader_replicas=[3], + last_known_elr=[2], + offline_replicas=[], + ), + ], + topic_authorized_operations=-2147483648, + ), + ], + next_cursor=None, + ), + ) + + admin = _make_admin(broker) + try: + result = admin.describe_topic_partitions(['topic-a']) + finally: + admin.close() + + assert result['next_cursor'] is None + assert len(result['topics']) == 1 + t = result['topics'][0] + assert t['name'] == 'topic-a' + assert t['topic_id'] == topic_id + assert t['is_internal'] is False + assert t['partitions'][0]['partition_index'] == 0 + assert t['partitions'][0]['leader_id'] == 1 + assert t['partitions'][0]['eligible_leader_replicas'] == [3] + assert t['partitions'][0]['last_known_elr'] == [2] + + def test_pagination_cursor_returned(self): + broker = MockBroker() + Cursor = DescribeTopicPartitionsResponse.Cursor + broker.respond( + DescribeTopicPartitionsRequest, + DescribeTopicPartitionsResponse( + throttle_time_ms=0, + topics=[], + next_cursor=Cursor(topic_name='topic-next', partition_index=5), + ), + ) + + admin = _make_admin(broker) + try: + result = admin.describe_topic_partitions(['topic-a']) + finally: + admin.close() + + assert result['next_cursor'] == {'topic_name': 'topic-next', 'partition_index': 5} + + def test_request_encodes_topics_and_limit(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + decoded = DescribeTopicPartitionsRequest.decode( + request_bytes, version=api_version, header=True) + captured['request'] = decoded + return DescribeTopicPartitionsResponse( + throttle_time_ms=0, topics=[], next_cursor=None) + + broker.respond_fn(DescribeTopicPartitionsRequest, handler) + + admin = _make_admin(broker) + try: + admin.describe_topic_partitions( + ['topic-a', 'topic-b'], response_partition_limit=100) + finally: + admin.close() + + req = captured['request'] + assert [t.name for t in req.topics] == ['topic-a', 'topic-b'] + assert req.response_partition_limit == 100 + assert req.cursor is None + + def test_request_encodes_cursor(self): + broker = MockBroker() + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + decoded = DescribeTopicPartitionsRequest.decode( + request_bytes, version=api_version, header=True) + captured['request'] = decoded + return DescribeTopicPartitionsResponse( + throttle_time_ms=0, topics=[], next_cursor=None) + + broker.respond_fn(DescribeTopicPartitionsRequest, handler) + + admin = _make_admin(broker) + try: + admin.describe_topic_partitions( + ['topic-a'], + cursor={'topic_name': 'topic-a', 'partition_index': 3}) + finally: + admin.close() + + cursor = captured['request'].cursor + assert cursor is not None + assert cursor.topic_name == 'topic-a' + assert cursor.partition_index == 3 diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 68b8c1ef1..0de5baa7c 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -460,14 +460,14 @@ def test_create_partitions(kafka_admin_client, topic): @pytest.mark.skipif(env_kafka_version() < (2, 2), reason="Leader Election requires broker >=2.2") -def test_perform_leader_election(kafka_admin_client, topic): +def test_elect_leaders(kafka_admin_client, topic): topic_metadata = kafka_admin_client.describe_topics([topic])[0] assert topic_metadata['name'] == topic partitions = list(map(lambda p: p['partition_index'], topic_metadata['partitions'])) election_type = 0 # Preferred topic_partitions = {topic: partitions} # When Leader Election is not needed (cluster is stable), error 84 is returned - response = kafka_admin_client.perform_leader_election(election_type, topic_partitions) + response = kafka_admin_client.elect_leaders(election_type, topic_partitions) assert len(response.replica_election_results) == 1 result = response.replica_election_results[0] assert result[0] == topic @@ -490,3 +490,65 @@ def test_describe_log_dirs(kafka_admin_client): for log_dir in broker_map[broker.node_id]['log_dirs']: assert 'log_dir' in log_dir assert log_dir['error_code'] == 0 + + +@pytest.mark.skipif(env_kafka_version() < (2, 4), reason="AlterPartitionReassignments requires broker >=2.4") +def test_alter_partition_reassignments(kafka_admin_client, topic): + topic_metadata = kafka_admin_client.describe_topics([topic])[0] + brokers = [b.node_id for b in kafka_admin_client._manager.cluster.brokers()] + # Single-broker cluster: only valid reassignment target is [broker] + tp = TopicPartition(topic, 0) + + result = kafka_admin_client.alter_partition_reassignments({tp: brokers}) + assert result['error_code'] == 0 + assert len(result['responses']) == 1 + assert result['responses'][0]['name'] == topic + + +@pytest.mark.skipif(env_kafka_version() < (2, 4), reason="ListPartitionReassignments requires broker >=2.4") +def test_list_partition_reassignments(kafka_admin_client, topic): + # No reassignments in progress on a freshly-created topic + result = kafka_admin_client.list_partition_reassignments() + assert isinstance(result, dict) + + # Scoped lookup for specific partitions also returns an (empty) dict + tp = TopicPartition(topic, 0) + result = kafka_admin_client.list_partition_reassignments([tp]) + assert isinstance(result, dict) + for key, value in result.items(): + assert isinstance(key, TopicPartition) + assert set(value.keys()) == {'replicas', 'adding_replicas', 'removing_replicas'} + + +@pytest.mark.skipif(env_kafka_version() < (4, 0), reason="DescribeTopicPartitions requires broker >=4.0 (KRaft)") +def test_describe_topic_partitions(kafka_admin_client, topic): + result = kafka_admin_client.describe_topic_partitions([topic]) + assert 'topics' in result + assert 'next_cursor' in result + assert len(result['topics']) == 1 + t = result['topics'][0] + assert t['name'] == topic + assert t['error_code'] == 0 + # topic fixture creates 4 partitions + assert len(t['partitions']) == 4 + for p in t['partitions']: + assert p['error_code'] == 0 + assert p['partition_index'] in {0, 1, 2, 3} + assert p['leader_id'] >= 0 + assert len(p['replica_nodes']) >= 1 + + +@pytest.mark.skipif(env_kafka_version() < (4, 0), reason="DescribeTopicPartitions requires broker >=4.0 (KRaft)") +def test_describe_topic_partitions_pagination(kafka_admin_client, topic): + # Request only 1 partition per page; we should get a cursor back. + result = kafka_admin_client.describe_topic_partitions( + [topic], response_partition_limit=1) + # Small clusters may still satisfy the request without a cursor if the + # broker ignores the partition limit — tolerate either outcome but verify + # the cursor round-trips when present. + if result['next_cursor'] is not None: + cursor = result['next_cursor'] + assert cursor['topic_name'] == topic + next_result = kafka_admin_client.describe_topic_partitions( + [topic], response_partition_limit=10, cursor=cursor) + assert next_result['topics'] diff --git a/test/protocol/admin/test_protocol_admin_partitions.py b/test/protocol/admin/test_protocol_admin_partitions.py index 0d0190ff7..5bac1f7aa 100644 --- a/test/protocol/admin/test_protocol_admin_partitions.py +++ b/test/protocol/admin/test_protocol_admin_partitions.py @@ -3,6 +3,8 @@ import pytest from kafka.protocol.admin import ( + AlterPartitionReassignmentsRequest, AlterPartitionReassignmentsResponse, + ListPartitionReassignmentsRequest, ListPartitionReassignmentsResponse, DescribeTopicPartitionsRequest, DescribeTopicPartitionsResponse, ) @@ -12,6 +14,100 @@ def _versions(cls): return range(lo, hi + 1) +@pytest.mark.parametrize("version", _versions(AlterPartitionReassignmentsRequest)) +def test_alter_partition_reassignments_request_roundtrip(version): + Topic = AlterPartitionReassignmentsRequest.ReassignableTopic + Partition = Topic.ReassignablePartition + request = AlterPartitionReassignmentsRequest( + timeout_ms=30000, + topics=[ + Topic( + name='topic-a', + partitions=[ + Partition(partition_index=0, replicas=[1, 2, 3]), + Partition(partition_index=1, replicas=None), + ], + ), + ], + ) + encoded = request.encode(version=version) + decoded = AlterPartitionReassignmentsRequest.decode(encoded, version=version) + assert decoded == request + + +@pytest.mark.parametrize("version", _versions(AlterPartitionReassignmentsResponse)) +def test_alter_partition_reassignments_response_roundtrip(version): + Topic = AlterPartitionReassignmentsResponse.ReassignableTopicResponse + Partition = Topic.ReassignablePartitionResponse + response = AlterPartitionReassignmentsResponse( + throttle_time_ms=5, + error_code=0, + error_message=None, + responses=[ + Topic( + name='topic-a', + partitions=[ + Partition(partition_index=0, error_code=0, error_message=None), + Partition(partition_index=1, error_code=37, error_message='invalid'), + ], + ), + ], + ) + encoded = response.encode(version=version) + decoded = AlterPartitionReassignmentsResponse.decode(encoded, version=version) + assert decoded == response + + +@pytest.mark.parametrize("version", _versions(ListPartitionReassignmentsRequest)) +def test_list_partition_reassignments_request_roundtrip(version): + Topic = ListPartitionReassignmentsRequest.ListPartitionReassignmentsTopics + request = ListPartitionReassignmentsRequest( + timeout_ms=30000, + topics=[ + Topic(name='topic-a', partition_indexes=[0, 1]), + Topic(name='topic-b', partition_indexes=[2]), + ], + ) + encoded = request.encode(version=version) + decoded = ListPartitionReassignmentsRequest.decode(encoded, version=version) + assert decoded == request + + +@pytest.mark.parametrize("version", _versions(ListPartitionReassignmentsRequest)) +def test_list_partition_reassignments_request_null_topics(version): + request = ListPartitionReassignmentsRequest(timeout_ms=30000, topics=None) + encoded = request.encode(version=version) + decoded = ListPartitionReassignmentsRequest.decode(encoded, version=version) + assert decoded == request + + +@pytest.mark.parametrize("version", _versions(ListPartitionReassignmentsResponse)) +def test_list_partition_reassignments_response_roundtrip(version): + Topic = ListPartitionReassignmentsResponse.OngoingTopicReassignment + Partition = Topic.OngoingPartitionReassignment + response = ListPartitionReassignmentsResponse( + throttle_time_ms=5, + error_code=0, + error_message=None, + topics=[ + Topic( + name='topic-a', + partitions=[ + Partition( + partition_index=0, + replicas=[1, 2, 3], + adding_replicas=[4], + removing_replicas=[2], + ), + ], + ), + ], + ) + encoded = response.encode(version=version) + decoded = ListPartitionReassignmentsResponse.decode(encoded, version=version) + assert decoded == response + + def test_describe_topic_partitions_raw_bytes(): # bytes encoding from java client data = bytes.fromhex('00 4b 00 00 00 00 00 7b 00 09 6d 79 2d 63 6c 69 65 6e 74 00 02 04 66 6f 6f 00 00 00 07 d0 ff 00')