From aa182b52f5965c3a94f9d4ca8c7aac77bab3f078 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 13 Apr 2026 12:05:14 -0700 Subject: [PATCH 1/4] BREAKING: Change admin api responses to simple dict -- use key access not attributes --- kafka/admin/_groups.py | 134 +++++------------- kafka/admin/_topics.py | 10 +- kafka/structs.py | 6 - test/integration/test_admin_integration.py | 72 +++++----- test/integration/test_producer_integration.py | 20 ++- 5 files changed, 99 insertions(+), 143 deletions(-) diff --git a/kafka/admin/_groups.py b/kafka/admin/_groups.py index 3d7b55282..d911e88fd 100644 --- a/kafka/admin/_groups.py +++ b/kafka/admin/_groups.py @@ -14,7 +14,7 @@ from kafka.protocol.consumer.metadata import ( ConsumerProtocolAssignment, ConsumerProtocolSubscription, ConsumerProtocolType, ) -from kafka.structs import GroupInformation, MemberInformation, OffsetAndMetadata, TopicPartition +from kafka.structs import OffsetAndMetadata, TopicPartition if TYPE_CHECKING: from kafka.net.manager import KafkaConnectionManager @@ -48,53 +48,13 @@ def _describe_consumer_groups_process_response(self, response): raise NotImplementedError( "Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient." .format(response.API_VERSION)) - assert len(response.groups) == 1 - for response_name, response_field in response.fields.items(): - if response_name == 'groups': - described_group = getattr(response, response_name)[0] - described_group_information_list = [] - protocol_type_is_consumer = False - for group_information_name, group_information_field in response_field.fields.items(): - if not group_information_field.for_version_q(response.API_VERSION): - continue - described_group_information = getattr(described_group, group_information_name) - if group_information_name == 'protocol_type': - protocol_type = described_group_information - protocol_type_is_consumer = (protocol_type == ConsumerProtocolType or not protocol_type) - if group_information_name == 'members': - member_information_list = [] - for member in described_group_information: - member_information = [] - for attr_name, attr_field in group_information_field.fields.items(): - if not attr_field.for_version_q(response.API_VERSION): - continue - attr_val = getattr(member, attr_name) - if protocol_type_is_consumer: - if attr_name == 'member_metadata' and attr_val: - member_information.append(ConsumerProtocolSubscription.decode(attr_val)) - elif attr_name == 'member_assignment' and attr_val: - member_information.append(ConsumerProtocolAssignment.decode(attr_val)) - else: - member_information.append(attr_val) - member_info_tuple = MemberInformation._make(member_information) - member_information_list.append(member_info_tuple) - described_group_information_list.append(member_information_list) - else: - described_group_information_list.append(described_group_information) - if response.API_VERSION >= 3: - described_group_information_list[-1] = list(map(lambda acl: acl.name, valid_acl_operations(described_group_information_list[-1]))) - else: - described_group_information_list.append([]) - group_description = GroupInformation._make(described_group_information_list) - error_code = group_description.error_code - error_type = Errors.for_code(error_code) - if error_type is not Errors.NoError: - raise error_type( - "DescribeGroupsResponse failed with response '{}'." - .format(response)) - return group_description - assert False, "DescribeGroupsResponse parsing failed" + for group in response.groups: + for member in group.members: + member.member_metadata = ConsumerProtocolSubscription.decode(member.member_metadata) + member.member_assignment = ConsumerProtocolAssignment.decode(member.member_assignment) + # Return dict (key, val) tuples + return [(group.group_id, self._process_acl_operations(group.to_dict())) for group in response.groups] async def _async_describe_consumer_groups(self, group_ids, group_coordinator_id=None): results = [] @@ -103,7 +63,8 @@ async def _async_describe_consumer_groups(self, group_ids, group_coordinator_id= request = self._describe_consumer_groups_request(group_id) response = await self._manager.send(request, node_id=coordinator_id) results.append(self._describe_consumer_groups_process_response(response)) - return results + # Combine key/vals from multiple requests into single dict + return dict(itertools.chain(*results)) def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False): """Describe a set of consumer groups. @@ -123,10 +84,10 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include have the same coordinator, otherwise it will error. Default: None. Returns: - A list of group descriptions. For now the group descriptions - are the raw results from the DescribeGroupsResponse. Long-term, we - plan to change this to return namedtuples as well as decoding the - partition assignments. + A dict of {group_id: {key: val}}. key/vals are simple to_dict translations + of the raw results from DescribeGroupsResponse (with inline decoding + of ConsumerSubscription and ConsumerAssignment metadata, and conversion + of acl set ints to semantic enums). """ return self._manager.run(self._async_describe_consumer_groups, group_ids, group_coordinator_id) @@ -144,17 +105,17 @@ def _list_consumer_groups_process_response(self, response): raise error_type( "ListGroupsRequest failed with response '{}'." .format(response)) - return [(group.group_id, group.protocol_type) for group in response.groups] + return [group.to_dict() for group in response.groups] async def _async_list_consumer_groups(self, broker_ids=None): if broker_ids is None: broker_ids = [broker.node_id for broker in self._manager.cluster.brokers()] - consumer_groups = set() + consumer_groups = [] for broker_id in broker_ids: request = self._list_consumer_groups_request() response = await self._manager.send(request, node_id=broker_id) - consumer_groups.update(self._list_consumer_groups_process_response(response)) - return list(consumer_groups) + consumer_groups.extend(self._list_consumer_groups_process_response(response)) + return consumer_groups def list_consumer_groups(self, broker_ids=None): """List all consumer groups known to the cluster. @@ -178,7 +139,7 @@ def list_consumer_groups(self, broker_ids=None): consumer groups are coordinated by those broker(s). Default: None Returns: - list: List of tuples of Consumer Groups. + List of group data dicts, with key/vals from ListGroupsRequest """ return self._manager.run(self._async_list_consumer_groups, broker_ids) @@ -203,32 +164,20 @@ def _list_consumer_group_offsets_request(self, group_id, partitions=None): def _list_consumer_group_offsets_process_response(self, response): """Process an OffsetFetchResponse.""" - if response.API_VERSION <= 6: - if response.API_VERSION > 1: - error_type = Errors.for_code(response.error_code) - if error_type is not Errors.NoError: - raise error_type( - "OffsetFetchResponse failed with response '{}'." - .format(response)) - offsets = {} - for topic, partitions in response.topics: - for partition_data in partitions: - if response.API_VERSION <= 4: - partition, offset, metadata, error_code = partition_data - leader_epoch = -1 - else: - partition, offset, leader_epoch, metadata, error_code = partition_data - error_type = Errors.for_code(error_code) - if error_type is not Errors.NoError: - raise error_type( - "Unable to fetch consumer group offsets for topic {}, partition {}" - .format(topic, partition)) - offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata, leader_epoch) - else: - raise NotImplementedError( - "Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient." - .format(response.API_VERSION)) - return offsets + if response.API_VERSION > 1: + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + raise error_type( + "OffsetFetchResponse failed with response '{}'." + .format(response)) + def _partitions_to_dict(partitions): + d = {} + for p in partitions: + d[p.partition_index] = p.to_dict() + d[p.partition_index].pop('partition_index') + return d + return {topic.name: _partitions_to_dict(topic.partitions) + for topic in response.topics} async def _async_list_consumer_group_offsets(self, group_id, group_coordinator_id=None, partitions=None): if group_coordinator_id is None: @@ -259,8 +208,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, known offsets for the consumer group. Default: None. Returns: - dictionary: A dictionary with TopicPartition keys and - OffsetAndMetadata values. + dict: {topic: [{partition data}]} key/vals from OffsetCommitResponse}]} """ return self._manager.run(self._async_list_consumer_group_offsets, group_id, group_coordinator_id, partitions) @@ -271,15 +219,11 @@ def _delete_consumer_groups_request(self, group_ids): def _convert_delete_groups_response(self, response): """Parse a DeleteGroupsResponse.""" - if response.API_VERSION <= 2: - results = [] - for group_id, error_code in response.results: - results.append((group_id, Errors.for_code(error_code))) - return results - else: - raise NotImplementedError( - "Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient." - .format(response.API_VERSION)) + results = [] + for group_id, error_code in response.results: + res = 'OK' if error_code == 0 else Errors.for_code(error_code).__name__ + results.append((group_id, res)) + return results async def _async_delete_consumer_groups(self, group_ids, group_coordinator_id=None): coordinators_groups = defaultdict(list) @@ -295,7 +239,7 @@ async def _async_delete_consumer_groups(self, group_ids, group_coordinator_id=No request = self._delete_consumer_groups_request(coordinator_group_ids) response = await self._manager.send(request, node_id=coordinator_id) results.extend(self._convert_delete_groups_response(response)) - return results + return dict(results) def delete_consumer_groups(self, group_ids, group_coordinator_id=None): """Delete Consumer Group Offsets for given consumer groups. diff --git a/kafka/admin/_topics.py b/kafka/admin/_topics.py index d09b006f3..97bd0094b 100644 --- a/kafka/admin/_topics.py +++ b/kafka/admin/_topics.py @@ -110,7 +110,8 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ wait_for_metadata (bool, optional): If True, block until each new topic is visible in broker metadata with a leader assigned for every partition. Default: False - Returns: CreateTopicResponse + Returns: + dict of CreateTopicResponse key/vals """ if validate_only and wait_for_metadata: raise ValueError('validate_only and wait_for_metadata are mutually exclusive') @@ -139,7 +140,7 @@ def response_errors(r): response = self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) if wait_for_metadata: self.wait_for_topics([new_topic.name for new_topic in request.topics]) - return response + return response.to_dict() def wait_for_topics(self, topic_names, timeout_ms=10000): """Block until each of the given topics is ready to use. @@ -225,7 +226,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. Returns: - Appropriate version of DeleteTopicsResponse class. + dict of DeleteTopicsResponse key/vals (version-dependent) """ timeout_ms = self._validate_timeout(timeout_ms) _Topic = DeleteTopicsRequest.DeleteTopicState @@ -236,7 +237,8 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): def response_errors(r): for response in r.responses: yield Errors.for_code(response.error_code) - return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) + response = self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) + return response.to_dict() @staticmethod def _process_create_partitions_input(topic_partitions): diff --git a/kafka/structs.py b/kafka/structs.py index 490505239..5093120e0 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -39,12 +39,6 @@ OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", ["offset", "timestamp", "leader_epoch"]) -MemberInformation = namedtuple("MemberInformation", - ["member_id", "client_id", "client_host", "member_metadata", "member_assignment"]) - -GroupInformation = namedtuple("GroupInformation", - ["error_code", "group", "state", "protocol_type", "protocol", "members", "authorized_operations"]) - """Define retry policy for async producer Keyword Arguments: diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 48ab5fc66..49fb47b4a 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -224,16 +224,16 @@ def consumer_thread(i, group_id): output = kafka_admin_client.describe_consumer_groups(group_id_list) assert len(output) == 2 consumer_groups = set() - for consumer_group in output: - assert(consumer_group.group in group_id_list) - if consumer_group.group == group_id_list[0]: - assert(len(consumer_group.members) == 2) + for consumer_group in output.values(): + assert(consumer_group['group_id'] in group_id_list) + if consumer_group['group_id'] == group_id_list[0]: + assert(len(consumer_group['members']) == 2) else: - assert(len(consumer_group.members) == 1) - for member in consumer_group.members: - assert(member.member_metadata.topics[0] == topic) - assert(member.member_assignment.assigned_partitions[0][0] == topic) - consumer_groups.add(consumer_group.group) + assert(len(consumer_group['members']) == 1) + for member in consumer_group['members']: + assert(member['member_metadata']['topics'] == [topic]) + assert(member['member_assignment']['assigned_partitions'][0]['topic'] == topic) + consumer_groups.add(consumer_group['group_id']) assert(sorted(list(consumer_groups)) == group_id_list) finally: info('Shutting down %s consumers', num_consumers) @@ -266,20 +266,17 @@ def test_delete_consumer_groups(kafka_admin_client, kafka_consumer_factory, send next(consumer3) consumer3.close() - groups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + groups = {group['group_id'] for group in kafka_admin_client.list_consumer_groups()} assert group1 in groups assert group2 in groups assert group3 in groups - delete_results = { - group_id: error - for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2]) - } - assert delete_results[group1] == NoError - assert delete_results[group2] == NoError + delete_results = kafka_admin_client.delete_consumer_groups([group1, group2]) + assert delete_results[group1] == 'OK' + assert delete_results[group2] == 'OK' assert group3 not in delete_results - groups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + groups = {group['group_id'] for group in kafka_admin_client.list_consumer_groups()} assert group1 not in groups assert group2 not in groups assert group3 in groups @@ -300,24 +297,22 @@ def test_delete_consumer_groups_with_errors(kafka_admin_client, kafka_consumer_f consumer2 = kafka_consumer_factory(group_id=group2) next(consumer2) - groups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + groups = {group['group_id'] for group in kafka_admin_client.list_consumer_groups()} assert group1 in groups assert group2 in groups assert group3 not in groups - delete_results = { - group_id: error - for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2, group3]) - } - assert delete_results[group1] == NoError - assert delete_results[group2] == NonEmptyGroupError - assert delete_results[group3] == GroupIdNotFoundError + delete_results = kafka_admin_client.delete_consumer_groups([group1, group2, group3]) + assert delete_results[group1] == 'OK' + assert delete_results[group2] == 'NonEmptyGroupError' + assert delete_results[group3] == 'GroupIdNotFoundError' - groups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + groups = {group['group_id'] for group in kafka_admin_client.list_consumer_groups()} assert group1 not in groups assert group2 in groups assert group3 not in groups + @pytest.fixture(name="topic2") def _topic2(kafka_broker, request): """Same as `topic` fixture, but a different name if you need to topics.""" @@ -389,21 +384,30 @@ def test_delete_records_with_errors(kafka_admin_client, topic, send_messages): def test_create_delete_topics(kafka_admin_client): topic_name = random_string(4) response = kafka_admin_client.create_topics([NewTopic(topic_name, 1, 1)]) - assert response.topics[0].name == topic_name - assert response.topics[0].error_code == 0 # NoError + assert response['topics'][0]['name'] == topic_name + assert response['topics'][0]['error_code'] == 0 # NoError + + response = kafka_admin_client.delete_topics([topic_name]) + assert response['responses'][0]['name'] == topic_name + assert response['responses'][0]['error_code'] == 0 # NoError + + topic_name = random_string(4) + response = kafka_admin_client.create_topics([topic_name]) + assert response['topics'][0]['name'] == topic_name + assert response['topics'][0]['error_code'] == 0 # NoError response = kafka_admin_client.delete_topics([topic_name]) - assert response.responses[0].name == topic_name - assert response.responses[0].error_code == 0 # NoError + assert response['responses'][0]['name'] == topic_name + assert response['responses'][0]['error_code'] == 0 # NoError topic_name = random_string(4) response = kafka_admin_client.create_topics({topic_name: {'num_partitions': 1, 'replication_factor': 1}}) - assert response.topics[0].name == topic_name - assert response.topics[0].error_code == 0 # NoError + assert response['topics'][0]['name'] == topic_name + assert response['topics'][0]['error_code'] == 0 # NoError response = kafka_admin_client.delete_topics([topic_name]) - assert response.responses[0].name == topic_name - assert response.responses[0].error_code == 0 # NoError + assert response['responses'][0]['name'] == topic_name + assert response['responses'][0]['error_code'] == 0 # NoError # Create topics requires explicit num_partitions/replication_factor on < 2.4 if env_kafka_version() < (2, 4): diff --git a/test/integration/test_producer_integration.py b/test/integration/test_producer_integration.py index 5133d086a..47fe09975 100644 --- a/test/integration/test_producer_integration.py +++ b/test/integration/test_producer_integration.py @@ -215,17 +215,29 @@ def test_transactional_producer_offsets(kafka_producer_factory, kafka_admin_clie leader_epoch = 0 else: leader_epoch = -1 - offsets = {TopicPartition('transactional_test_topic', 0): OffsetAndMetadata(0, 'metadata', leader_epoch)} + topic = 'transactional_test_topic' + offsets = {TopicPartition(topic, 0): OffsetAndMetadata(0, 'metadata', leader_epoch)} producer = kafka_producer_factory(transactional_id='testing') producer.init_transactions() producer.begin_transaction() - producer.send('transactional_test_topic', partition=0, value=b'msg1').get() + producer.send(topic, partition=0, value=b'msg1').get() producer.send_offsets_to_transaction(offsets, 'txn-test-group') producer.commit_transaction() producer.begin_transaction() - producer.send_offsets_to_transaction({TopicPartition('transactional_test_topic', 1): OffsetAndMetadata(1, 'bad', 1)}, 'txn-test-group') + producer.send_offsets_to_transaction({TopicPartition(topic, 1): OffsetAndMetadata(1, 'bad', 1)}, 'txn-test-group') producer.abort_transaction() admin = kafka_admin_client_factory() - assert admin.list_consumer_group_offsets('txn-test-group') == offsets + result = { + topic: { + 0: { + 'committed_offset': 0, + 'error_code': 0, + 'metadata': 'metadata', + }, + } + } + if env_kafka_version() >= (2, 1): + result[topic][0]['committed_leader_epoch'] = leader_epoch + assert admin.list_consumer_group_offsets('txn-test-group') == result From 51659fb565ff11e24b97e661d5acf8bb4b564bb1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 14 Apr 2026 11:21:54 -0700 Subject: [PATCH 2/4] dont return throttle_time_ms in describe_cluster --- kafka/admin/_cluster.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/admin/_cluster.py b/kafka/admin/_cluster.py index cdd1b5e47..d78e168a7 100644 --- a/kafka/admin/_cluster.py +++ b/kafka/admin/_cluster.py @@ -44,6 +44,7 @@ def describe_cluster(self): """ metadata = self._manager.run(self._get_cluster_metadata, []) metadata.pop('topics') + metadata.pop('throttle_time_ms', None) return metadata async def _async_describe_log_dirs(self, topic_partitions=(), brokers=None): From d4a4743ec43841fbf317e6cda23d43019aa0ed2e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 14 Apr 2026 11:55:14 -0700 Subject: [PATCH 3/4] Refactor describe_configs / return dict; decode source and config type ints; support group and logger configs --- kafka/admin/__init__.py | 9 +- kafka/admin/_configs.py | 107 ++++++++++++++------- kafka/cli/admin/configs/describe.py | 20 ++-- test/integration/test_admin_integration.py | 45 ++++++--- 4 files changed, 117 insertions(+), 64 deletions(-) diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index f120bfe63..cc8d29701 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -1,11 +1,12 @@ -from kafka.admin._configs import ConfigResource, ConfigResourceType from kafka.admin.client import KafkaAdminClient from kafka.admin._acls import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation, ResourceType, ACLPermissionType, ACLResourcePatternType) +from kafka.admin._configs import ConfigResource, ConfigResourceType, ConfigType, ConfigSourceType from kafka.admin._topics import NewTopic, NewPartitions __all__ = [ - 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions', 'ACL', 'ACLFilter', - 'ResourcePattern', 'ResourcePatternFilter', 'ACLOperation', 'ResourceType', 'ACLPermissionType', - 'ACLResourcePatternType' + 'KafkaAdminClient', + 'ACL', 'ACLFilter', 'ACLOperation', 'ACLPermissionType', 'ACLResourcePatternType', + 'ResourceType', 'ResourcePattern', 'ResourcePatternFilter', + 'ConfigResource', 'ConfigResourceType', 'ConfigType', 'ConfigSourceType', ] diff --git a/kafka/admin/_configs.py b/kafka/admin/_configs.py index 2e2c276b0..a54994694 100644 --- a/kafka/admin/_configs.py +++ b/kafka/admin/_configs.py @@ -5,8 +5,9 @@ from __future__ import annotations -import logging +from collections import defaultdict from enum import IntEnum +import logging from typing import TYPE_CHECKING from kafka.errors import IncompatibleBrokerVersion @@ -29,18 +30,22 @@ def _convert_describe_config_resource_request(config_resource): return ( config_resource.resource_type, config_resource.name, - [config_key for config_key, config_value in config_resource.configs.items()] if config_resource.configs else None + list(config_resource.configs.keys()) if isinstance(config_resource.configs, dict) else config_resource.configs ) async def _async_describe_configs(self, config_resources, include_synonyms=False): - broker_resources = [] - topic_resources = [] + broker_resources = defaultdict(list) + other_resources = [] for config_resource in config_resources: - if config_resource.resource_type == ConfigResourceType.BROKER: - broker_resources.append(self._convert_describe_config_resource_request(config_resource)) + if config_resource.resource_type in (ConfigResourceType.BROKER, ConfigResourceType.BROKER_LOGGER): + try: + broker_id = int(config_resource.name) + except ValueError: + raise ValueError("Broker resource names must be an integer or a string represented integer") + broker_resources[broker_id].append(self._convert_describe_config_resource_request(config_resource)) else: - topic_resources.append(self._convert_describe_config_resource_request(config_resource)) + other_resources.append(self._convert_describe_config_resource_request(config_resource)) version = self._client.api_version(DescribeConfigsRequest, max_version=2) if include_synonyms and version == 0: @@ -48,28 +53,34 @@ async def _async_describe_configs(self, config_resources, include_synonyms=False "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}." .format(self._manager.broker_version)) - results = [] - for broker_resource in broker_resources: - try: - broker_id = int(broker_resource[1]) - except ValueError: - raise ValueError("Broker resource names must be an integer or a string represented integer") - if version == 0: - request = DescribeConfigsRequest[version](resources=[broker_resource]) - else: - request = DescribeConfigsRequest[version]( - resources=[broker_resource], - include_synonyms=include_synonyms) - results.append(await self._manager.send(request, node_id=broker_id)) - - if topic_resources: - if version == 0: - request = DescribeConfigsRequest[version](resources=topic_resources) - else: - request = DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms) - results.append(await self._manager.send(request)) - - return results + responses = [] + for broker_id, resources in broker_resources.items(): + request = DescribeConfigsRequest( + resources=resources, + include_synonyms=include_synonyms) + responses.append(await self._manager.send(request, node_id=broker_id)) + if other_resources: + request = DescribeConfigsRequest(resources=other_resources, include_synonyms=include_synonyms) + responses.append(await self._manager.send(request)) + + ret = defaultdict(dict) + for response in responses: + for result in response.results: + result_type = ConfigResourceType(result.resource_type).name.lower() + ret[result_type][result.resource_name] = {} + for config in result.configs: + config = config.to_dict() + name = config.pop('name') + if 'config_source' in config: + config['config_source'] = ConfigSourceType(config['config_source']).name + if 'synonyms' in config: + for synonym in config['synonyms']: + synonym['source'] = ConfigSourceType(synonym['source']).name + + if 'config_type' in config: + config['config_type'] = ConfigType(config['config_type']).name + ret[result_type][result.resource_name][name] = config + return dict(ret) def describe_configs(self, config_resources, include_synonyms=False): """Fetch configuration parameters for one or more Kafka resources. @@ -125,9 +136,12 @@ def alter_configs(self, config_resources): class ConfigResourceType(IntEnum): - """An enumerated type of config resources""" - BROKER = 4 + UNKNOWN = 0 TOPIC = 2 + BROKER = 4 + BROKER_LOGGER = 8 + CLIENT_METRICS = 16 + GROUP = 32 class ConfigResource: @@ -136,7 +150,7 @@ class ConfigResource: Arguments: resource_type (ConfigResourceType): the type of kafka resource name (string): The name of the kafka resource - configs ({key : value}): A maps of config keys to values. + configs ([key] or {key : value}): config keys (values required to alter) """ def __init__(self, resource_type, name, configs=None): if not isinstance(resource_type, ConfigResourceType): @@ -146,7 +160,32 @@ def __init__(self, resource_type, name, configs=None): self.configs = configs def __str__(self): - return "ConfigResource %s=%s" % (self.resource_type, self.name) + return f"ConfigResource {self.name}={self.resource_type}" def __repr__(self): - return "ConfigResource(%s, %s, %s)" % (self.resource_type, self.name, self.configs) + return f"ConfigResource({self.resource_type}, {self.name}, {self.configs})" + + +class ConfigType(IntEnum): + UNKNOWN = 0 + BOOLEAN = 1 + STRING = 2 + INT = 3 + SHORT = 4 + LONG = 5 + DOUBLE = 6 + LIST = 7 + CLASS = 8 + PASSWORD = 9 + + +class ConfigSourceType(IntEnum): + UNKNOWN = 0 + TOPIC_CONFIG = 1 + DYNAMIC_BROKER_CONFIG = 2 + DYNAMIC_DEFAULT_BROKER_CONFIG = 3 + STATIC_BROKER_CONFIG = 4 + DEFAULT_CONFIG = 5 + DYNAMIC_BROKER_LOGGER_CONFIG = 6 + CLIENT_METRICS_CONFIG = 7 + GROUP_CONFIG = 8 diff --git a/kafka/cli/admin/configs/describe.py b/kafka/cli/admin/configs/describe.py index 9abb2eb9c..af4fc172a 100644 --- a/kafka/cli/admin/configs/describe.py +++ b/kafka/cli/admin/configs/describe.py @@ -8,21 +8,21 @@ def add_subparser(cls, subparsers): parser = subparsers.add_parser('describe', help='Describe Kafka Configs') parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[]) parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[]) + parser.add_argument('--broker-logger', type=str, action='append', dest='broker_loggers', default=[]) + parser.add_argument('-g', '--group', type=str, action='append', dest='groups', default=[]) + parser.add_argument('-k', '--key', type=str, action='append', dest='keys', default=None) parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): resources = [] for topic in args.topics: - resources.append(ConfigResource('TOPIC', topic)) + resources.append(ConfigResource('TOPIC', topic, args.keys)) for broker in args.brokers: - resources.append(ConfigResource('BROKER', broker)) + resources.append(ConfigResource('BROKER', broker, args.keys)) + for broker in args.broker_loggers: + resources.append(ConfigResource('BROKER_LOGGER', broker, args.keys)) + for group in args.groups: + resources.append(ConfigResource('GROUP', group, args.keys)) - responses = client.describe_configs(resources) - # Return shape is list of (resource, configs) tuples - # resource => (type, name) - # configs => {key: value} - return [( - (resources[i].resource_type.name, resources[i].name), - {str(config.name): config.value for config in r.results[0].configs} - ) for i, r in enumerate(responses)] + return client.describe_configs(resources) diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 49fb47b4a..077ff2522 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -105,14 +105,23 @@ def test_describe_configs_broker_resource_returns_configs(kafka_admin_client): configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) assert len(configs) == 1 - assert configs[0].results[0][2] == ConfigResourceType.BROKER - assert configs[0].results[0][3] == str(broker_id) - assert len(configs[0].results[0][4]) > 1 + assert len(configs['broker']) == 1 + if env_kafka_version() >= (4, 0): + assert configs['broker'][str(broker_id)]['advertised.listeners']['config_source'] == 'STATIC_BROKER_CONFIG' + elif env_kafka_version() >= (1, 1): + assert configs['broker'][str(broker_id)]['advertised.listeners']['config_source'] == 'DEFAULT_CONFIG' + if env_kafka_version() >= (2, 6): + assert configs['broker'][str(broker_id)]['advertised.listeners']['config_type'] in ('LIST', 'STRING') + if env_kafka_version() >= (4, 0): + assert configs['broker'][str(broker_id)]['advertised.listeners']['read_only'] is True + elif env_kafka_version() >= (1, 0): + assert configs['broker'][str(broker_id)]['advertised.listeners']['read_only'] is False + assert configs['broker'][str(broker_id)]['advertised.listeners']['is_sensitive'] is False + if env_kafka_version() >= (1, 1): + assert configs['broker'][str(broker_id)]['advertised.listeners']['synonyms'] == [] + assert 'value' in configs['broker'][str(broker_id)]['advertised.listeners'] -@pytest.mark.xfail(condition=True, - reason="https://github.com/dpkp/kafka-python/issues/1929", - raises=AssertionError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_client): """Tests that describe config returns configs for topic @@ -120,9 +129,16 @@ def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_clie configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)]) assert len(configs) == 1 - assert configs[0].results[0][2] == ConfigResourceType.TOPIC - assert configs[0].results[0][3] == topic - assert len(configs[0].results[0][4]) > 1 + assert len(configs['topic']) == 1 + if env_kafka_version() >= (1, 1): + assert configs['topic'][topic]['retention.bytes']['config_source'] == 'DEFAULT_CONFIG' + if env_kafka_version() >= (2, 6): + assert configs['topic'][topic]['retention.bytes']['config_type'] == 'LONG' + assert configs['topic'][topic]['retention.bytes']['read_only'] is False + assert configs['topic'][topic]['retention.bytes']['is_sensitive'] is False + if env_kafka_version() >= (1, 1): + assert configs['topic'][topic]['retention.bytes']['synonyms'] == [] + assert configs['topic'][topic]['retention.bytes']['value'] == '-1' @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") @@ -135,13 +151,10 @@ def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_cli ConfigResource(ConfigResourceType.BROKER, broker_id)]) assert len(configs) == 2 - - for config in configs: - assert (config.results[0][2] == ConfigResourceType.TOPIC - and config.results[0][3] == topic) or \ - (config.results[0][2] == ConfigResourceType.BROKER - and config.results[0][3] == str(broker_id)) - assert len(config.results[0][4]) > 1 + assert topic in configs['topic'] + assert len(configs['topic'][topic]) > 1 + assert str(broker_id) in configs['broker'] + assert len(configs['broker'][str(broker_id)]) > 1 @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") From 8259b9cbfb6e66cc326e5ac4d516b1282dcfb5b4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 14 Apr 2026 12:10:27 -0700 Subject: [PATCH 4/4] Cleanup create/delete topics results --- kafka/admin/_topics.py | 9 +++++++-- test/integration/test_admin_integration.py | 21 ++++++--------------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/kafka/admin/_topics.py b/kafka/admin/_topics.py index 97bd0094b..c503a20b1 100644 --- a/kafka/admin/_topics.py +++ b/kafka/admin/_topics.py @@ -140,7 +140,9 @@ def response_errors(r): response = self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) if wait_for_metadata: self.wait_for_topics([new_topic.name for new_topic in request.topics]) - return response.to_dict() + result = response.to_dict() + result.pop('throttle_time_ms', None) + return result def wait_for_topics(self, topic_names, timeout_ms=10000): """Block until each of the given topics is ready to use. @@ -238,7 +240,10 @@ def response_errors(r): for response in r.responses: yield Errors.for_code(response.error_code) response = self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors) - return response.to_dict() + result = response.to_dict() + result.pop('throttle_time_ms', None) + result['topics'] = result.pop('responses') + return result @staticmethod def _process_create_partitions_input(topic_partitions): diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 077ff2522..68b8c1ef1 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -401,26 +401,17 @@ def test_create_delete_topics(kafka_admin_client): assert response['topics'][0]['error_code'] == 0 # NoError response = kafka_admin_client.delete_topics([topic_name]) - assert response['responses'][0]['name'] == topic_name - assert response['responses'][0]['error_code'] == 0 # NoError - - topic_name = random_string(4) - response = kafka_admin_client.create_topics([topic_name]) assert response['topics'][0]['name'] == topic_name assert response['topics'][0]['error_code'] == 0 # NoError - response = kafka_admin_client.delete_topics([topic_name]) - assert response['responses'][0]['name'] == topic_name - assert response['responses'][0]['error_code'] == 0 # NoError - topic_name = random_string(4) response = kafka_admin_client.create_topics({topic_name: {'num_partitions': 1, 'replication_factor': 1}}) assert response['topics'][0]['name'] == topic_name assert response['topics'][0]['error_code'] == 0 # NoError response = kafka_admin_client.delete_topics([topic_name]) - assert response['responses'][0]['name'] == topic_name - assert response['responses'][0]['error_code'] == 0 # NoError + assert response['topics'][0]['name'] == topic_name + assert response['topics'][0]['error_code'] == 0 # NoError # Create topics requires explicit num_partitions/replication_factor on < 2.4 if env_kafka_version() < (2, 4): @@ -433,12 +424,12 @@ def test_create_delete_topics(kafka_admin_client): else: topic_name = random_string(4) response = kafka_admin_client.create_topics([topic_name]) - assert response.topics[0].name == topic_name - assert response.topics[0].error_code == 0 # NoError + assert response['topics'][0]['name'] == topic_name + assert response['topics'][0]['error_code'] == 0 # NoError response = kafka_admin_client.delete_topics([topic_name]) - assert response.responses[0].name == topic_name - assert response.responses[0].error_code == 0 # NoError + assert response['topics'][0]['name'] == topic_name + assert response['topics'][0]['error_code'] == 0 # NoError @pytest.mark.skipif(env_kafka_version() < (1, 0), reason="CreatePartitions requires broker >=1.0")