Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 85 additions & 136 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,45 +297,6 @@ async def _find_coordinator_id(self, group_id):
self._coordinator_cache[group_id] = response.node_id
return response.node_id

def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.

Arguments:
node_id: The broker id to which to send the message.
request: The message to send.

Returns:
A future object that may be polled for status and results.
"""
return self._manager.send(request, node_id=node_id)

def _wait_for_futures(self, futures):
"""Block until all futures complete. If any fail, raise the encountered exception.

Arguments:
futures: A list of Future objects awaiting results.

Raises:
The first encountered exception if a future fails.
"""
failed = None
for future in futures:
self._manager.poll(future=future)
if failed is None and future.failed():
failed = future
if failed:
raise failed.exception # pylint: disable-msg=raising-bad-type

def send_request(self, request, node_id=None):
return self._manager.run(self._manager.send(request, node_id=node_id))

def send_requests(self, requests_and_node_ids, response_fn=lambda x: x):
futures = []
for request, node_id in requests_and_node_ids:
futures.append(self._manager.send(request, node_id=node_id))
self._wait_for_futures(futures)
return [response_fn(future.value) for future in futures]

async def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), raise_errors=True, ignore_errors=()):
"""Send a Kafka protocol message to the cluster controller.

Expand Down Expand Up @@ -693,7 +654,7 @@ def describe_acls(self, acl_filter):
operation=acl_filter.operation,
permission_type=acl_filter.permission_type
)
response = self.send_request(request) # pylint: disable=E0606
response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
# optionally we could retry if error_type.retriable
Expand Down Expand Up @@ -806,7 +767,7 @@ def create_acls(self, acls):
request = CreateAclsRequest[version](
creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls]
)
response = self.send_request(request) # pylint: disable=E0606
response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606
return self._convert_create_acls_response_to_acls(acls, response)

@staticmethod
Expand Down Expand Up @@ -920,7 +881,7 @@ def delete_acls(self, acl_filters):
request = DeleteAclsRequest[version](
filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters]
)
response = self.send_request(request) # pylint: disable=E0606
response = self._manager.run(self._manager.send(request)) # pylint: disable=E0606
return self._convert_delete_acls_response_to_matching_acls(acl_filters, response)

@staticmethod
Expand All @@ -941,23 +902,7 @@ def _convert_describe_config_resource_request(config_resource):
] if config_resource.configs else None
)

def describe_configs(self, config_resources, include_synonyms=False):
"""Fetch configuration parameters for one or more Kafka resources.

Arguments:
config_resources: An list of ConfigResource objects.
Any keys in ConfigResource.configs dict will be used to filter the
result. Setting the configs dict to None will get all values. An
empty dict will get zero values (as per Kafka protocol).

Keyword Arguments:
include_synonyms (bool, optional): If True, return synonyms in response. Not
supported by all versions. Default: False.

Returns:
List of DescribeConfigsResponses.
"""

async def _async_describe_configs(self, config_resources, include_synonyms=False):
# Break up requests by type - a broker config request must be sent to the specific broker.
# All other (currently just topic resources) can be sent to any broker.
broker_resources = []
Expand All @@ -975,30 +920,46 @@ def describe_configs(self, config_resources, include_synonyms=False):
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
.format(self._manager.broker_version))

requests = []
if len(broker_resources) > 0:
for broker_resource in broker_resources:
try:
broker_id = int(broker_resource[1])
except ValueError:
raise ValueError("Broker resource names must be an integer or a string represented integer")

if version == 0:
request = DescribeConfigsRequest[version](resources=[broker_resource])
else:
request = DescribeConfigsRequest[version](
resources=[broker_resource],
include_synonyms=include_synonyms)
requests.append((request, broker_id))
results = []
for broker_resource in broker_resources:
try:
broker_id = int(broker_resource[1])
except ValueError:
raise ValueError("Broker resource names must be an integer or a string represented integer")
if version == 0:
request = DescribeConfigsRequest[version](resources=[broker_resource])
else:
request = DescribeConfigsRequest[version](
resources=[broker_resource],
include_synonyms=include_synonyms)
results.append(await self._manager.send(request, node_id=broker_id))

if len(topic_resources) > 0:
if topic_resources:
if version == 0:
request = DescribeConfigsRequest[version](resources=topic_resources)
else:
request = DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms)
requests.append((request, None))
results.append(await self._manager.send(request))

return results

return self.send_requests(requests)
def describe_configs(self, config_resources, include_synonyms=False):
"""Fetch configuration parameters for one or more Kafka resources.

Arguments:
config_resources: An list of ConfigResource objects.
Any keys in ConfigResource.configs dict will be used to filter the
result. Setting the configs dict to None will get all values. An
empty dict will get zero values (as per Kafka protocol).

Keyword Arguments:
include_synonyms (bool, optional): If True, return synonyms in response. Not
supported by all versions. Default: False.

Returns:
List of DescribeConfigsResponses.
"""
return self._manager.run(self._async_describe_configs, config_resources, include_synonyms)

@staticmethod
def _convert_alter_config_resource_request(config_resource):
Expand All @@ -1018,6 +979,19 @@ def _convert_alter_config_resource_request(config_resource):
]
)

async def _async_alter_configs(self, config_resources):
version = self._client.api_version(AlterConfigsRequest, max_version=1)
request = AlterConfigsRequest[version](
resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
)
# TODO the Java client has the note:
# // We must make a separate AlterConfigs request for every BROKER resource we want to alter
# // and send the request to that specific broker. Other resources are grouped together into
# // a single request that may be sent to any broker.
#
# So this is currently broken as it always sends to the least_loaded_node()
return await self._manager.send(request)

def alter_configs(self, config_resources):
"""Alter configuration parameters of one or more Kafka resources.

Expand All @@ -1033,17 +1007,7 @@ def alter_configs(self, config_resources):
Returns:
Appropriate version of AlterConfigsResponse class.
"""
version = self._client.api_version(AlterConfigsRequest, max_version=1)
request = AlterConfigsRequest[version](
resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
)
# TODO the Java client has the note:
# // We must make a separate AlterConfigs request for every BROKER resource we want to alter
# // and send the request to that specific broker. Other resources are grouped together into
# // a single request that may be sent to any broker.
#
# So this is currently broken as it always sends to the least_loaded_node()
return self.send_request(request)
return self._manager.run(self._async_alter_configs, config_resources)

# alter replica logs dir protocol not yet implemented
# Note: have to lookup the broker with the replica assignment and send the request to that broker
Expand Down Expand Up @@ -1102,27 +1066,12 @@ def response_errors(r):
yield Errors.for_code(result.error_code)
return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors)

def _get_leader_for_partitions(self, partitions, timeout_ms=None):
"""Finds ID of the leader node for every given topic partition.

Will raise UnknownTopicOrPartitionError if for some partition no leader can be found.

Arguments:
partitions ([TopicPartition]): partitions for which to find leaders.

Keyword Arguments:
timeout_ms (numeric, optional): Timeout in milliseconds, if None (default), will be read from
config.

Returns:
dict with ``{leader_id -> {partitions}}``
"""
timeout_ms = self._validate_timeout(timeout_ms)

async def _async_get_leader_for_partitions(self, partitions):
"""Finds ID of the leader node for every given topic partition."""
partitions = set(partitions)
topics = set(tp.topic for tp in partitions)

metadata = self._manager.run(self._get_cluster_metadata, topics)
metadata = await self._get_cluster_metadata(topics)

leader2partitions = defaultdict(list)
valid_partitions = set()
Expand All @@ -1142,40 +1091,16 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None):

return leader2partitions

def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None):
"""Delete records whose offset is smaller than the given offset of the corresponding partition.

Arguments:
records_to_delete ({TopicPartition: int}): The earliest available offsets for the
given partitions.

Keyword Arguments:
timeout_ms (numeric, optional): Timeout in milliseconds, if None (default), will be read from
config.
partition_leader_id (node_id / int, optional): If specified, all deletion requests will be sent to
this node. No check is performed verifying that this is indeed the leader for all
listed partitions: use with caution.

Returns:
dict {topicPartition -> metadata}, where metadata is returned by the broker.
See DeleteRecordsResponse for possible fields. error_code for all partitions is
guaranteed to be zero, otherwise an exception is raised.
"""
async def _async_delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None):
timeout_ms = self._validate_timeout(timeout_ms)
responses = []
version = self._client.api_version(DeleteRecordsRequest, max_version=0)

# We want to make as few requests as possible
# If a single node serves as a partition leader for multiple partitions (and/or
# topics), we can send all of those in a single request.
# For that we store {leader -> {partitions for leader}}, and do 1 request per leader
if partition_leader_id is None:
leader2partitions = self._get_leader_for_partitions(
set(records_to_delete), timeout_ms
)
leader2partitions = await self._async_get_leader_for_partitions(set(records_to_delete))
else:
leader2partitions = {partition_leader_id: set(records_to_delete)}

responses = []
for leader, partitions in leader2partitions.items():
topic2partitions = defaultdict(list)
for partition in partitions:
Expand All @@ -1188,7 +1113,7 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id
],
timeout_ms=timeout_ms
)
response = self.send_request(request, node_id=leader)
response = await self._manager.send(request, node_id=leader)
responses.append(response.to_dict())

partition2result = {}
Expand Down Expand Up @@ -1219,6 +1144,27 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id

return partition2result

def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None):
"""Delete records whose offset is smaller than the given offset of the corresponding partition.

Arguments:
records_to_delete ({TopicPartition: int}): The earliest available offsets for the
given partitions.

Keyword Arguments:
timeout_ms (numeric, optional): Timeout in milliseconds, if None (default), will be read from
config.
partition_leader_id (node_id / int, optional): If specified, all deletion requests will be sent to
this node. No check is performed verifying that this is indeed the leader for all
listed partitions: use with caution.

Returns:
dict {topicPartition -> metadata}, where metadata is returned by the broker.
See DeleteRecordsResponse for possible fields. error_code for all partitions is
guaranteed to be zero, otherwise an exception is raised.
"""
return self._manager.run(self._async_delete_records, records_to_delete, timeout_ms, partition_leader_id)

# create delegation token protocol not yet implemented
# Note: send the request to the least_loaded_node()

Expand Down Expand Up @@ -1653,11 +1599,14 @@ def response_errors(r):
ignore_errors = (Errors.ElectionNotNeededError,)
return self._manager.run(self._send_request_to_controller, request, response_errors, raise_errors, ignore_errors)

async def _async_describe_log_dirs(self):
version = self._client.api_version(DescribeLogDirsRequest, max_version=0)
return await self._manager.send(DescribeLogDirsRequest[version]())

def describe_log_dirs(self):
"""Send a DescribeLogDirsRequest request to a broker.

Returns:
DescribeLogDirsResponse object
"""
version = self._client.api_version(DescribeLogDirsRequest, max_version=0)
return self.send_request(DescribeLogDirsRequest[version]())
return self._manager.run(self._async_describe_log_dirs)
Loading