diff --git a/kafka/admin/_cluster.py b/kafka/admin/_cluster.py new file mode 100644 index 000000000..cdd1b5e47 --- /dev/null +++ b/kafka/admin/_cluster.py @@ -0,0 +1,75 @@ +"""Cluster metadata mixin for KafkaAdminClient.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.admin import DescribeLogDirsRequest + +if TYPE_CHECKING: + from kafka.net.manager import KafkaConnectionManager + +log = logging.getLogger(__name__) + + +class ClusterAdminMixin: + """Mixin providing cluster management methods for KafkaAdminClient.""" + _manager: KafkaConnectionManager + + async def _get_cluster_metadata(self, topics): + """topics = [] for no topics, None for all.""" + request = MetadataRequest( + topics=[ + MetadataRequest.MetadataRequestTopic(name=topic) + for topic in topics] if topics is not None else None, + allow_auto_topic_creation=False, + include_cluster_authorized_operations=True, + include_topic_authorized_operations=True, + ) + response = await self._manager.send(request) + metadata = response.to_dict() + self._process_acl_operations(metadata) + for topic in metadata['topics']: + self._process_acl_operations(topic) + return metadata + + def describe_cluster(self): + """Fetch cluster-wide metadata such as the list of brokers, the controller ID, + and the cluster ID. + + Returns: + A dict with cluster-wide metadata, excluding topic details. + """ + metadata = self._manager.run(self._get_cluster_metadata, []) + metadata.pop('topics') + return metadata + + async def _async_describe_log_dirs(self, topic_partitions=(), brokers=None): + request = DescribeLogDirsRequest(topics=topic_partitions) + responses = [] + if brokers is None: + brokers = [broker.node_id for broker in self._manager.cluster.brokers()] + for node_id in brokers: + response = await self._manager.send(request, node_id=node_id) + responses.append({"broker": node_id, "log_dirs": [result.to_dict() for result in response.results]}) + return responses + + def describe_log_dirs(self, topic_partitions=None, brokers=None): + """Fetch broker log directory and topic/partition stats + + Keyword Arguments: + topic_partitions (dict, list, optional): + Either: dict of {topic_name: [partition ids]}. + Or: list of [topic_name], to query all partitions for topic. + Or: None, to query all topics / all partitions. + Default: None + brokers (list, optional): List of [node_id] for brokers to query. + If None, query is sent to all brokers. Default: None + + Returns: + list of dicts, containing per-broker log-dir data + """ + topic_partitions = self._get_topic_partitions(topic_partitions) + return self._manager.run(self._async_describe_log_dirs, topic_partitions, brokers) diff --git a/kafka/admin/_metadata.py b/kafka/admin/_metadata.py deleted file mode 100644 index 0a64c3dfa..000000000 --- a/kafka/admin/_metadata.py +++ /dev/null @@ -1,46 +0,0 @@ -"""Cluster metadata mixin for KafkaAdminClient.""" - -from __future__ import annotations - -import logging -from typing import TYPE_CHECKING - -from kafka.protocol.metadata import MetadataRequest - -if TYPE_CHECKING: - from kafka.net.manager import KafkaConnectionManager - -log = logging.getLogger(__name__) - - -class MetadataAdminMixin: - """Mixin providing cluster metadata methods for KafkaAdminClient.""" - _manager: KafkaConnectionManager - - async def _get_cluster_metadata(self, topics): - """topics = [] for no topics, None for all.""" - request = MetadataRequest( - topics=[ - MetadataRequest.MetadataRequestTopic(name=topic) - for topic in topics] if topics is not None else None, - allow_auto_topic_creation=False, - include_cluster_authorized_operations=True, - include_topic_authorized_operations=True, - ) - response = await self._manager.send(request) - metadata = response.to_dict() - self._process_acl_operations(metadata) - for topic in metadata['topics']: - self._process_acl_operations(topic) - return metadata - - def describe_cluster(self): - """Fetch cluster-wide metadata such as the list of brokers, the controller ID, - and the cluster ID. - - Returns: - A dict with cluster-wide metadata, excluding topic details. - """ - metadata = self._manager.run(self._get_cluster_metadata, []) - metadata.pop('topics') - return metadata diff --git a/kafka/admin/_records.py b/kafka/admin/_records.py index c6e3b621e..aa22abc88 100644 --- a/kafka/admin/_records.py +++ b/kafka/admin/_records.py @@ -8,7 +8,7 @@ import kafka.errors as Errors from kafka.errors import UnknownTopicOrPartitionError -from kafka.protocol.admin import DeleteRecordsRequest, DescribeLogDirsRequest, ElectLeadersRequest, ElectionType +from kafka.protocol.admin import DeleteRecordsRequest, ElectLeadersRequest, ElectionType from kafka.structs import TopicPartition if TYPE_CHECKING: @@ -166,30 +166,3 @@ def response_errors(r): ignore_errors = (Errors.ElectionNotNeededError,) return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors) - async def _async_describe_log_dirs(self, topic_partitions=(), brokers=None): - request = DescribeLogDirsRequest(topics=topic_partitions) - responses = [] - if brokers is None: - brokers = [broker.node_id for broker in self._manager.cluster.brokers()] - for node_id in brokers: - response = await self._manager.send(request, node_id=node_id) - responses.append({"broker": node_id, "log_dirs": [result.to_dict() for result in response.results]}) - return responses - - def describe_log_dirs(self, topic_partitions=None, brokers=None): - """Send a DescribeLogDirsRequest request to a broker. - - Keyword Arguments: - topic_partitions (dict, list, optional): - Either: dict of {topic_name: [partition ids]}. - Or: list of [topic_name], to query all partitions for topic. - Or: None, to query all topics / all partitions. - Default: None - brokers (list, optional): List of [node_id] for brokers to query. - If None, query is sent to all brokers. Default: None - - Returns: - list of dicts, containing per-broker log-dir data - """ - topic_partitions = self._get_topic_partitions(topic_partitions) - return self._manager.run(self._async_describe_log_dirs, topic_partitions, brokers) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index a91fc54e8..642b6ef5f 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -14,9 +14,9 @@ from kafka.version import __version__ from kafka.admin._acls import ACLAdminMixin +from kafka.admin._cluster import ClusterAdminMixin from kafka.admin._configs import ConfigAdminMixin from kafka.admin._groups import GroupAdminMixin -from kafka.admin._metadata import MetadataAdminMixin from kafka.admin._records import RecordAdminMixin from kafka.admin._topics import TopicAdminMixin @@ -25,7 +25,7 @@ class KafkaAdminClient( TopicAdminMixin, - MetadataAdminMixin, + ClusterAdminMixin, ACLAdminMixin, ConfigAdminMixin, GroupAdminMixin, diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index d7e4e45c0..de24d5d8b 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -7,7 +7,6 @@ from .cluster import ClusterSubCommand from .configs import ConfigsSubCommand from .consumer_groups import ConsumerGroupsSubCommand -from .log_dirs import LogDirsSubCommand from .topics import TopicsSubCommand def main_parser(): @@ -60,7 +59,7 @@ def build_kwargs(props): def run_cli(args=None): parser = main_parser() subparsers = parser.add_subparsers(help='subcommands') - for cmd in [ClusterSubCommand, ConfigsSubCommand, LogDirsSubCommand, + for cmd in [ClusterSubCommand, ConfigsSubCommand, TopicsSubCommand, ConsumerGroupsSubCommand]: cmd.add_subparser(subparsers) config = parser.parse_args(args) @@ -114,30 +113,11 @@ def run_cli(args=None): # alter # IncrementalAlterConfigs (not supported yet) - # [partitions] - # create - # alter-reassignments (AlterPartitionReassignments - not supported yet) - # list-reassignments (ListPartitionReassignments - not supported yet) - - # [records] - # delete - # [consumer-groups] # remove-members (not supported yet) # delete-offsets (not supported yet) # alter-offsets (not supported yet) - # [offsets] - # list (not supported yet) - # delete (OffsetDelete - not supported yet) - - # leader-election - # perform_leader_election - - # [log-dirs] - # describe (currently broken) - # alter (AlterReplicaLogDirs - not supported yet) - # [client-quotas] # describe (DescribeClientQuotas - not supported yet) # alter (AlterClientQuotas - not supported yet) @@ -154,12 +134,16 @@ def run_cli(args=None): # [topics] # describe-partitions (DescribeTopicPartitions - not supported yet) + # list-offsets (not supported yet) + # delete-offsets (OffsetDelete - not supported yet) + # alter-reassignments (AlterPartitionReassignments - not supported yet) + # list-reassignments (ListPartitionReassignments - not supported yet) + # create-partitions + # elect-leaders # [cluster] # describe-features (DescribeFeatures - not supported yet) # update-features (UpdateFeatures - not supported yet) # version # api-versions - - - + # alter-log-dirs (AlterReplicaLogDirs - not supported yet) diff --git a/kafka/cli/admin/cluster/__init__.py b/kafka/cli/admin/cluster/__init__.py index 7876c0afa..5baec8b44 100644 --- a/kafka/cli/admin/cluster/__init__.py +++ b/kafka/cli/admin/cluster/__init__.py @@ -1,6 +1,7 @@ import sys from .describe import DescribeCluster +from .log_dirs import DescribeLogDirs class ClusterSubCommand: @@ -9,6 +10,6 @@ class ClusterSubCommand: def add_subparser(cls, subparsers): parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster') commands = parser.add_subparsers() - for cmd in [DescribeCluster]: + for cmd in [DescribeCluster, DescribeLogDirs]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/log_dirs/describe.py b/kafka/cli/admin/cluster/log_dirs.py similarity index 69% rename from kafka/cli/admin/log_dirs/describe.py rename to kafka/cli/admin/cluster/log_dirs.py index d23df5673..f533bb11a 100644 --- a/kafka/cli/admin/log_dirs/describe.py +++ b/kafka/cli/admin/cluster/log_dirs.py @@ -2,7 +2,7 @@ class DescribeLogDirs: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe', help='Get topic log directories for brokers') + parser = subparsers.add_parser('log-dirs', help='Get topic log directories and stats') parser.add_argument('-b', '--broker', type=int, action='append', dest='brokers', help='Query specific broker(s)') - parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='Get additional data about specific topic(s)') + parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='Get data about specific topic(s)') parser.set_defaults(command=lambda cli, args: cli.describe_log_dirs(topic_partitions=args.topics, brokers=args.brokers)) diff --git a/kafka/cli/admin/log_dirs/__init__.py b/kafka/cli/admin/log_dirs/__init__.py deleted file mode 100644 index 46f5a254f..000000000 --- a/kafka/cli/admin/log_dirs/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -import sys - -from .describe import DescribeLogDirs - - -class LogDirsSubCommand: - - @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('log-dirs', help='Manage Kafka Topic/Partition Log Directories') - commands = parser.add_subparsers() - for cmd in [DescribeLogDirs]: - cmd.add_subparser(commands) - parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))