From be14a80fb93897cddc2a0845116e3ad60370d353 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 13 Apr 2026 17:30:11 -0700 Subject: [PATCH 1/3] Admin: MetadataAdminMixin -> ClusterAdminMixin --- kafka/admin/{_metadata.py => _cluster.py} | 4 ++-- kafka/admin/client.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) rename kafka/admin/{_metadata.py => _cluster.py} (93%) diff --git a/kafka/admin/_metadata.py b/kafka/admin/_cluster.py similarity index 93% rename from kafka/admin/_metadata.py rename to kafka/admin/_cluster.py index 0a64c3dfa..b5648398e 100644 --- a/kafka/admin/_metadata.py +++ b/kafka/admin/_cluster.py @@ -13,8 +13,8 @@ log = logging.getLogger(__name__) -class MetadataAdminMixin: - """Mixin providing cluster metadata methods for KafkaAdminClient.""" +class ClusterAdminMixin: + """Mixin providing cluster management methods for KafkaAdminClient.""" _manager: KafkaConnectionManager async def _get_cluster_metadata(self, topics): diff --git a/kafka/admin/client.py b/kafka/admin/client.py index a91fc54e8..642b6ef5f 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -14,9 +14,9 @@ from kafka.version import __version__ from kafka.admin._acls import ACLAdminMixin +from kafka.admin._cluster import ClusterAdminMixin from kafka.admin._configs import ConfigAdminMixin from kafka.admin._groups import GroupAdminMixin -from kafka.admin._metadata import MetadataAdminMixin from kafka.admin._records import RecordAdminMixin from kafka.admin._topics import TopicAdminMixin @@ -25,7 +25,7 @@ class KafkaAdminClient( TopicAdminMixin, - MetadataAdminMixin, + ClusterAdminMixin, ACLAdminMixin, ConfigAdminMixin, GroupAdminMixin, From d52340af3a1fa2d9f3bb16be56ca6e8c3fc5b58f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 13 Apr 2026 17:38:52 -0700 Subject: [PATCH 2/3] Admin: move DescribeLogDirs -> _cluster / cluster cli subgroup --- kafka/admin/_cluster.py | 28 +++++++++++++++ kafka/admin/_records.py | 27 -------------- kafka/cli/admin/__init__.py | 36 ++++--------------- kafka/cli/admin/cluster/__init__.py | 3 +- .../describe.py => cluster/log_dirs.py} | 4 +-- kafka/cli/admin/log_dirs/__init__.py | 14 -------- 6 files changed, 38 insertions(+), 74 deletions(-) rename kafka/cli/admin/{log_dirs/describe.py => cluster/log_dirs.py} (69%) delete mode 100644 kafka/cli/admin/log_dirs/__init__.py diff --git a/kafka/admin/_cluster.py b/kafka/admin/_cluster.py index b5648398e..42c7311f0 100644 --- a/kafka/admin/_cluster.py +++ b/kafka/admin/_cluster.py @@ -44,3 +44,31 @@ def describe_cluster(self): metadata = self._manager.run(self._get_cluster_metadata, []) metadata.pop('topics') return metadata + + async def _async_describe_log_dirs(self, topic_partitions=(), brokers=None): + request = DescribeLogDirsRequest(topics=topic_partitions) + responses = [] + if brokers is None: + brokers = [broker.node_id for broker in self._manager.cluster.brokers()] + for node_id in brokers: + response = await self._manager.send(request, node_id=node_id) + responses.append({"broker": node_id, "log_dirs": [result.to_dict() for result in response.results]}) + return responses + + def describe_log_dirs(self, topic_partitions=None, brokers=None): + """Fetch broker log directory and topic/partition stats + + Keyword Arguments: + topic_partitions (dict, list, optional): + Either: dict of {topic_name: [partition ids]}. + Or: list of [topic_name], to query all partitions for topic. + Or: None, to query all topics / all partitions. + Default: None + brokers (list, optional): List of [node_id] for brokers to query. + If None, query is sent to all brokers. Default: None + + Returns: + list of dicts, containing per-broker log-dir data + """ + topic_partitions = self._get_topic_partitions(topic_partitions) + return self._manager.run(self._async_describe_log_dirs, topic_partitions, brokers) diff --git a/kafka/admin/_records.py b/kafka/admin/_records.py index c6e3b621e..38ba3a46a 100644 --- a/kafka/admin/_records.py +++ b/kafka/admin/_records.py @@ -166,30 +166,3 @@ def response_errors(r): ignore_errors = (Errors.ElectionNotNeededError,) return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors) - async def _async_describe_log_dirs(self, topic_partitions=(), brokers=None): - request = DescribeLogDirsRequest(topics=topic_partitions) - responses = [] - if brokers is None: - brokers = [broker.node_id for broker in self._manager.cluster.brokers()] - for node_id in brokers: - response = await self._manager.send(request, node_id=node_id) - responses.append({"broker": node_id, "log_dirs": [result.to_dict() for result in response.results]}) - return responses - - def describe_log_dirs(self, topic_partitions=None, brokers=None): - """Send a DescribeLogDirsRequest request to a broker. - - Keyword Arguments: - topic_partitions (dict, list, optional): - Either: dict of {topic_name: [partition ids]}. - Or: list of [topic_name], to query all partitions for topic. - Or: None, to query all topics / all partitions. - Default: None - brokers (list, optional): List of [node_id] for brokers to query. - If None, query is sent to all brokers. Default: None - - Returns: - list of dicts, containing per-broker log-dir data - """ - topic_partitions = self._get_topic_partitions(topic_partitions) - return self._manager.run(self._async_describe_log_dirs, topic_partitions, brokers) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index d7e4e45c0..90acb22d5 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -7,7 +7,6 @@ from .cluster import ClusterSubCommand from .configs import ConfigsSubCommand from .consumer_groups import ConsumerGroupsSubCommand -from .log_dirs import LogDirsSubCommand from .topics import TopicsSubCommand def main_parser(): @@ -60,7 +59,7 @@ def build_kwargs(props): def run_cli(args=None): parser = main_parser() subparsers = parser.add_subparsers(help='subcommands') - for cmd in [ClusterSubCommand, ConfigsSubCommand, LogDirsSubCommand, + for cmd in [ClusterSubCommand, ConfigsSubCommand, TopicsSubCommand, ConsumerGroupsSubCommand]: cmd.add_subparser(subparsers) config = parser.parse_args(args) @@ -105,39 +104,14 @@ def run_cli(args=None): # Commands TODO: - # [acls] - # describe - # create - # delete - # [configs] - # alter # IncrementalAlterConfigs (not supported yet) - # [partitions] - # create - # alter-reassignments (AlterPartitionReassignments - not supported yet) - # list-reassignments (ListPartitionReassignments - not supported yet) - - # [records] - # delete - # [consumer-groups] # remove-members (not supported yet) # delete-offsets (not supported yet) # alter-offsets (not supported yet) - # [offsets] - # list (not supported yet) - # delete (OffsetDelete - not supported yet) - - # leader-election - # perform_leader_election - - # [log-dirs] - # describe (currently broken) - # alter (AlterReplicaLogDirs - not supported yet) - # [client-quotas] # describe (DescribeClientQuotas - not supported yet) # alter (AlterClientQuotas - not supported yet) @@ -154,12 +128,14 @@ def run_cli(args=None): # [topics] # describe-partitions (DescribeTopicPartitions - not supported yet) + # list-offsets (not supported yet) + # delete-offsets (OffsetDelete - not supported yet) + # alter-reassignments (AlterPartitionReassignments - not supported yet) + # list-reassignments (ListPartitionReassignments - not supported yet) # [cluster] # describe-features (DescribeFeatures - not supported yet) # update-features (UpdateFeatures - not supported yet) # version # api-versions - - - + # alter-log-dirs (AlterReplicaLogDirs - not supported yet) diff --git a/kafka/cli/admin/cluster/__init__.py b/kafka/cli/admin/cluster/__init__.py index 7876c0afa..5baec8b44 100644 --- a/kafka/cli/admin/cluster/__init__.py +++ b/kafka/cli/admin/cluster/__init__.py @@ -1,6 +1,7 @@ import sys from .describe import DescribeCluster +from .log_dirs import DescribeLogDirs class ClusterSubCommand: @@ -9,6 +10,6 @@ class ClusterSubCommand: def add_subparser(cls, subparsers): parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster') commands = parser.add_subparsers() - for cmd in [DescribeCluster]: + for cmd in [DescribeCluster, DescribeLogDirs]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/log_dirs/describe.py b/kafka/cli/admin/cluster/log_dirs.py similarity index 69% rename from kafka/cli/admin/log_dirs/describe.py rename to kafka/cli/admin/cluster/log_dirs.py index d23df5673..f533bb11a 100644 --- a/kafka/cli/admin/log_dirs/describe.py +++ b/kafka/cli/admin/cluster/log_dirs.py @@ -2,7 +2,7 @@ class DescribeLogDirs: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe', help='Get topic log directories for brokers') + parser = subparsers.add_parser('log-dirs', help='Get topic log directories and stats') parser.add_argument('-b', '--broker', type=int, action='append', dest='brokers', help='Query specific broker(s)') - parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='Get additional data about specific topic(s)') + parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='Get data about specific topic(s)') parser.set_defaults(command=lambda cli, args: cli.describe_log_dirs(topic_partitions=args.topics, brokers=args.brokers)) diff --git a/kafka/cli/admin/log_dirs/__init__.py b/kafka/cli/admin/log_dirs/__init__.py deleted file mode 100644 index 46f5a254f..000000000 --- a/kafka/cli/admin/log_dirs/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -import sys - -from .describe import DescribeLogDirs - - -class LogDirsSubCommand: - - @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('log-dirs', help='Manage Kafka Topic/Partition Log Directories') - commands = parser.add_subparsers() - for cmd in [DescribeLogDirs]: - cmd.add_subparser(commands) - parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) From 9a10d7e75fa13da767a8ede6ee1aa39cfbd4bb99 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 13 Apr 2026 18:47:02 -0700 Subject: [PATCH 3/3] import and TODOs --- kafka/admin/_cluster.py | 1 + kafka/admin/_records.py | 2 +- kafka/cli/admin/__init__.py | 8 ++++++++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/kafka/admin/_cluster.py b/kafka/admin/_cluster.py index 42c7311f0..cdd1b5e47 100644 --- a/kafka/admin/_cluster.py +++ b/kafka/admin/_cluster.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.admin import DescribeLogDirsRequest if TYPE_CHECKING: from kafka.net.manager import KafkaConnectionManager diff --git a/kafka/admin/_records.py b/kafka/admin/_records.py index 38ba3a46a..aa22abc88 100644 --- a/kafka/admin/_records.py +++ b/kafka/admin/_records.py @@ -8,7 +8,7 @@ import kafka.errors as Errors from kafka.errors import UnknownTopicOrPartitionError -from kafka.protocol.admin import DeleteRecordsRequest, DescribeLogDirsRequest, ElectLeadersRequest, ElectionType +from kafka.protocol.admin import DeleteRecordsRequest, ElectLeadersRequest, ElectionType from kafka.structs import TopicPartition if TYPE_CHECKING: diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 90acb22d5..de24d5d8b 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -104,7 +104,13 @@ def run_cli(args=None): # Commands TODO: + # [acls] + # describe + # create + # delete + # [configs] + # alter # IncrementalAlterConfigs (not supported yet) # [consumer-groups] @@ -132,6 +138,8 @@ def run_cli(args=None): # delete-offsets (OffsetDelete - not supported yet) # alter-reassignments (AlterPartitionReassignments - not supported yet) # list-reassignments (ListPartitionReassignments - not supported yet) + # create-partitions + # elect-leaders # [cluster] # describe-features (DescribeFeatures - not supported yet)