diff --git a/kafka/admin/_records.py b/kafka/admin/_records.py index 0e15bd0fa..c6e3b621e 100644 --- a/kafka/admin/_records.py +++ b/kafka/admin/_records.py @@ -118,29 +118,20 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id """ return self._manager.run(self._async_delete_records, records_to_delete, timeout_ms, partition_leader_id) - @staticmethod - def _convert_topic_partitions(topic_partitions): - return [ - ( - topic, - partitions - ) - for topic, partitions in topic_partitions.items() - ] - - def _get_all_topic_partitions(self): + def _get_all_topic_partitions(self, topics=None): return [ ( topic['name'], [p['partition_index'] for p in topic['partitions']] ) - for topic in self.describe_topics() + for topic in self.describe_topics(topics) ] def _get_topic_partitions(self, topic_partitions): - if topic_partitions is None: - return self._get_all_topic_partitions() - return self._convert_topic_partitions(topic_partitions) + if isinstance(topic_partitions, dict): + return topic_partitions.items() + else: + return self._get_all_topic_partitions(topic_partitions) def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None, raise_errors=True): """Trigger leader election for the specified topic partitions. @@ -149,8 +140,11 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_ election_type: Type of election to attempt. 0 for Preferred, 1 for Unclean Keyword Arguments: - topic_partitions (dict): A map of topic name strings to partition ids list. - By default, will run on all topic partitions + topic_partitions (dict, list, optional): + Either: dict of {topic_name: [partition ids]}. + Or: list of [topic_name], and election will run on all partitions for topic. + Or: None, and election runs against all topics / all partitions. + Default: None timeout_ms (num, optional): Milliseconds to wait for the leader election process. raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. @@ -172,14 +166,30 @@ def response_errors(r): ignore_errors = (Errors.ElectionNotNeededError,) return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors) - async def _async_describe_log_dirs(self): - version = self._client.api_version(DescribeLogDirsRequest, max_version=0) - return await self._manager.send(DescribeLogDirsRequest[version]()) - - def describe_log_dirs(self): + async def _async_describe_log_dirs(self, topic_partitions=(), brokers=None): + request = DescribeLogDirsRequest(topics=topic_partitions) + responses = [] + if brokers is None: + brokers = [broker.node_id for broker in self._manager.cluster.brokers()] + for node_id in brokers: + response = await self._manager.send(request, node_id=node_id) + responses.append({"broker": node_id, "log_dirs": [result.to_dict() for result in response.results]}) + return responses + + def describe_log_dirs(self, topic_partitions=None, brokers=None): """Send a DescribeLogDirsRequest request to a broker. + Keyword Arguments: + topic_partitions (dict, list, optional): + Either: dict of {topic_name: [partition ids]}. + Or: list of [topic_name], to query all partitions for topic. + Or: None, to query all topics / all partitions. + Default: None + brokers (list, optional): List of [node_id] for brokers to query. + If None, query is sent to all brokers. Default: None + Returns: - DescribeLogDirsResponse object + list of dicts, containing per-broker log-dir data """ - return self._manager.run(self._async_describe_log_dirs) + topic_partitions = self._get_topic_partitions(topic_partitions) + return self._manager.run(self._async_describe_log_dirs, topic_partitions, brokers) diff --git a/kafka/cli/admin/log_dirs/describe.py b/kafka/cli/admin/log_dirs/describe.py index 6c3c27bea..d23df5673 100644 --- a/kafka/cli/admin/log_dirs/describe.py +++ b/kafka/cli/admin/log_dirs/describe.py @@ -3,4 +3,6 @@ class DescribeLogDirs: @classmethod def add_subparser(cls, subparsers): parser = subparsers.add_parser('describe', help='Get topic log directories for brokers') - parser.set_defaults(command=lambda cli, _args: cli.describe_log_dirs()) + parser.add_argument('-b', '--broker', type=int, action='append', dest='brokers', help='Query specific broker(s)') + parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='Get additional data about specific topic(s)') + parser.set_defaults(command=lambda cli, args: cli.describe_log_dirs(topic_partitions=args.topics, brokers=args.brokers)) diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 10a81a31a..48ab5fc66 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -469,3 +469,16 @@ def test_perform_leader_election(kafka_admin_client, topic): partition_set.remove(partition[0]) assert partition[1] == ElectionNotNeededError.errno assert partition_set == set() + + +@pytest.mark.skipif(env_kafka_version() < (1, 0), reason="DescribeLogDirsRequest requires broker >= 1.0") +def test_describe_log_dirs(kafka_admin_client): + log_dirs = kafka_admin_client.describe_log_dirs() + assert log_dirs + broker_map = {result['broker']: result for result in log_dirs} + for broker in kafka_admin_client._manager.cluster.brokers(): + assert broker.node_id in broker_map + assert len(broker_map[broker.node_id]['log_dirs']) > 0 + for log_dir in broker_map[broker.node_id]['log_dirs']: + assert 'log_dir' in log_dir + assert log_dir['error_code'] == 0