Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ class KafkaAdminClient:
api_version (tuple): Specify which Kafka API version to use. If set
to None, KafkaClient will attempt to infer the broker version by
probing various APIs. Example: (0, 10, 2). Default: None
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker
api version. Only applies if api_version is None
bootstrap_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when bootstrapping.
Default: 2000.
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
Expand Down Expand Up @@ -177,7 +177,7 @@ class KafkaAdminClient:
'ssl_password': None,
'ssl_crlfile': None,
'api_version': None,
'api_version_auto_timeout_ms': 2000,
'bootstrap_timeout_ms': 2000,
'selector': selectors.DefaultSelector,
'sasl_mechanism': None,
'sasl_plain_username': None,
Expand Down Expand Up @@ -220,9 +220,8 @@ def __init__(self, **configs):
# Goal: migrate all self._client calls -> self._manager (skipping compat layer)
self._manager = self._client._manager

# Get auto-discovered version from client if necessary
self.config['api_version'] = self._client.get_broker_version(timeout_ms=self.config['api_version_auto_timeout_ms'])

# Bootstrap on __init__
self._manager.run(self._manager.bootstrap(timeout_ms=self.config['bootstrap_timeout_ms']))
self._closed = False
self._refresh_controller_id()
log.debug("KafkaAdminClient started.")
Expand Down Expand Up @@ -483,7 +482,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
if validate_only:
raise IncompatibleBrokerVersion(
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
.format(self._manager.broker_version))
request = CreateTopicsRequest[version](
topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout_ms=timeout_ms
Expand Down Expand Up @@ -1016,7 +1015,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
if include_synonyms and version == 0:
raise IncompatibleBrokerVersion(
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
.format(self._manager.broker_version))

requests = []
if len(broker_resources) > 0:
Expand Down
6 changes: 6 additions & 0 deletions kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ def __init__(self, net, cluster, **configs):
if self.config['api_version'] is not None:
self.broker_version_data = BrokerVersionData(self.config['api_version'])

@property
def broker_version(self):
if self.broker_version_data is None:
return None
return self.broker_version_data.broker_version

def least_used_connections(self):
return sorted(filter(lambda conn: conn.connected, self._conns.values()), key=lambda conn: conn.transport.last_activity)

Expand Down
Loading