Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 34 additions & 48 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def __init__(self, **configs):
# Bootstrap on __init__
self._manager.run(self._manager.bootstrap(timeout_ms=self.config['bootstrap_timeout_ms']))
self._closed = False
self._refresh_controller_id()
self._controller_id = None
log.debug("KafkaAdminClient started.")

def close(self):
Expand All @@ -248,32 +248,22 @@ def _validate_timeout(self, timeout_ms):
"""
return timeout_ms or self.config['request_timeout_ms']

def _refresh_controller_id(self, timeout_ms=30000):
async def _refresh_controller_id(self, timeout_ms=30000):
"""Determine the Kafka cluster controller."""
version = self._client.api_version(MetadataRequest, max_version=8)
if version == 0:
if self._manager.broker_version < (0, 10):
raise UnrecognizedBrokerVersion(
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
.format(version))
# use defaults for allow_auto_topic_creation / include_authorized_operations in v6+
request = MetadataRequest[version]()
"Kafka Admin Client controller requests requires broker version >= (0, 10)")

request = MetadataRequest()
timeout_at = time.monotonic() + timeout_ms / 1000
while time.monotonic() < timeout_at:
response = self.send_request(request)
response = await self._manager.send(request)
controller_id = response.controller_id
if controller_id == -1:
log.warning("Controller ID not available, got -1")
time.sleep(1)
await self._manager._net.sleep(1)
continue
# verify the controller is new enough to support our requests
controller_version = self._client.check_version(node_id=controller_id)
if controller_version < (0, 10, 0):
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."
.format(controller_version))
self._controller_id = controller_id
return
return controller_id
else:
raise Errors.NodeNotReadyError('controller')

Expand Down Expand Up @@ -385,7 +375,7 @@ def send_requests(self, requests_and_node_ids, response_fn=lambda x: x):
self._wait_for_futures(futures)
return [response_fn(future.value) for future in futures]

def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), ignore_errors=(), raise_errors=True, tries=2):
async def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), raise_errors=True, ignore_errors=()):
"""Send a Kafka protocol message to the cluster controller.

Will block until the message result is received.
Expand All @@ -397,31 +387,31 @@ def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), ignor
get_errors_fn (func): Function to process response and return an iterable of Error types.
ignore_errors (tuple): Any non-zero error codes that should be ignored. Not used if raise_errors=False.
raise_errors (bool): Whether to raise unhandled errors (True, default) or return response with errors (False).
tries (int): Number of times to refresh controller id and retry on NotControllerIdError.

Returns:
The Kafka protocol response for the message.
"""
# retry in case our controller_id is out of date
while tries:
tries -= 1
response = self.send_request(request, node_id=self._controller_id)
for error_type in get_errors_fn(response):
if tries and error_type is Errors.NotControllerError:
# No need to inspect the rest of the errors for
# non-retriable errors because NotControllerError should
# either be thrown for all errors or no errors.
self._refresh_controller_id()
break
elif raise_errors:
if error_type is not Errors.NoError and error_type not in ignore_errors:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
else:
# No controller refresh needed
return response
raise RuntimeError("Failed to find active controller id!")
if self._controller_id is None or self._controller_id == -1:
self._controller_id = await self._refresh_controller_id()

response = await self._manager.send(request, node_id=self._controller_id)

# Refresh controller and retry on NotControllerError
if Errors.NotControllerError in get_errors_fn(response):
self._controller_id = await self._refresh_controller_id()
response = await self._manager.send(request, node_id=self._controller_id)

for error_type in get_errors_fn(response):
if error_type is Errors.NoError:
continue
elif error_type is Errors.NotControllerError:
raise RuntimeError("Failed to find active controller id!")
elif raise_errors and error_type not in ignore_errors:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
return response

@staticmethod
def _convert_new_topic_request(new_topic):
Expand Down Expand Up @@ -471,8 +461,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
call :meth:`wait_for_topics` directly for finer control.
Mutually exclusive with validate_only. Default: False

Returns:
Appropriate version of CreateTopicResponse class.
Returns: CreateTopicResponse
"""
if validate_only and wait_for_metadata:
raise ValueError('validate_only and wait_for_metadata are mutually exclusive')
Expand All @@ -499,7 +488,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
def get_response_errors(r):
for topic in r.topics:
yield Errors.for_code(topic[1])
response = self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors)
response = self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors)
if wait_for_metadata: # implies not validate_only
self.wait_for_topics([new_topic.name for new_topic in new_topics])
return response
Expand Down Expand Up @@ -601,7 +590,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
def get_response_errors(r):
for response in r.responses:
yield Errors.for_code(response[1])
return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors)
return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors)

def _process_acl_operations(self, obj):
if obj.get('authorized_operations', None) is not None:
Expand Down Expand Up @@ -1138,7 +1127,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
def get_response_errors(r):
for result in r.results:
yield Errors.for_code(result[1])
return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors)
return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors)

def _get_leader_for_partitions(self, partitions, timeout_ms=None):
"""Finds ID of the leader node for every given topic partition.
Expand Down Expand Up @@ -1685,10 +1674,7 @@ def get_response_errors(r):
for partition in result[1]:
yield Errors.for_code(partition[1])
ignore_errors = (Errors.ElectionNotNeededError,)
return self._send_request_to_controller(request,
get_errors_fn=get_response_errors,
ignore_errors=ignore_errors,
raise_errors=raise_errors)
return self._manager.run(self._send_request_to_controller, request, get_response_errors, raise_errors, ignore_errors)

def describe_log_dirs(self):
"""Send a DescribeLogDirsRequest request to a broker.
Expand Down
Loading