diff --git a/kafka/admin/client.py b/kafka/admin/client.py index db5b0551a..a536389fc 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -217,6 +217,8 @@ def __init__(self, **configs): metric_group_prefix='admin', **self.config ) + # Goal: migrate all self._client calls -> self._manager (skipping compat layer) + self._manager = self._client._manager # Get auto-discovered version from client if necessary self.config['api_version'] = self._client.get_broker_version(timeout_ms=self.config['api_version_auto_timeout_ms']) @@ -607,33 +609,18 @@ def _process_acl_operations(self, obj): obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations']))) return obj - def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): - """ - topics == None means "get all topics" - """ - version = self._client.api_version(MetadataRequest, max_version=8) - if version <= 3: - if auto_topic_creation: - raise IncompatibleBrokerVersion( - "auto_topic_creation requires MetadataRequest >= v4, which" - " is not supported by Kafka {}" - .format(self.config['api_version'])) - - request = MetadataRequest[version](topics=topics) - elif version <= 7: - request = MetadataRequest[version]( - topics=topics, - allow_auto_topic_creation=auto_topic_creation - ) - else: - request = MetadataRequest[version]( - topics=topics, - allow_auto_topic_creation=auto_topic_creation, - include_cluster_authorized_operations=True, - include_topic_authorized_operations=True, - ) - - metadata = self.send_request(request).to_dict() + async def _get_cluster_metadata(self, topics): + """topics = [] for no topics, None for all.""" + request = MetadataRequest( + topics=[ + MetadataRequest.MetadataRequestTopic(name=topic) + for topic in topics] if topics is not None else None, + allow_auto_topic_creation=False, + include_cluster_authorized_operations=True, + include_topic_authorized_operations=True, + ) + response = await self._manager.send(request) + metadata = response.to_dict() self._process_acl_operations(metadata) for topic in metadata['topics']: self._process_acl_operations(topic) @@ -645,7 +632,7 @@ def list_topics(self): Returns: A list of topic name strings. """ - metadata = self._get_cluster_metadata(topics=None) + metadata = self._manager.run(self._get_cluster_metadata, None) # None => request all topics return [t['name'] for t in metadata['topics']] def describe_topics(self, topics=None): @@ -658,7 +645,7 @@ def describe_topics(self, topics=None): Returns: A list of dicts describing each topic (including partition info). """ - metadata = self._get_cluster_metadata(topics=topics) + metadata = self._manager.run(self._get_cluster_metadata, topics) return metadata['topics'] def describe_cluster(self): @@ -668,7 +655,7 @@ def describe_cluster(self): Returns: A dict with cluster-wide metadata, excluding topic details. """ - metadata = self._get_cluster_metadata() + metadata = self._manager.run(self._get_cluster_metadata, []) # [] => no topics metadata.pop('topics') # We have 'describe_topics' for this return metadata @@ -1174,7 +1161,7 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): partitions = set(partitions) topics = set(tp.topic for tp in partitions) - metadata = self._get_cluster_metadata(topics=topics) + metadata = self._manager.run(self._get_cluster_metadata, topics) leader2partitions = defaultdict(list) valid_partitions = set()