diff --git a/kafka/admin/client.py b/kafka/admin/client.py index a536389fc..38c605ac0 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -124,9 +124,9 @@ class KafkaAdminClient: api_version (tuple): Specify which Kafka API version to use. If set to None, KafkaClient will attempt to infer the broker version by probing various APIs. Example: (0, 10, 2). Default: None - api_version_auto_timeout_ms (int): number of milliseconds to throw a - timeout exception from the constructor when checking the broker - api version. Only applies if api_version is None + bootstrap_timeout_ms (int): number of milliseconds to throw a + timeout exception from the constructor when bootstrapping. + Default: 2000. selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector @@ -177,7 +177,7 @@ class KafkaAdminClient: 'ssl_password': None, 'ssl_crlfile': None, 'api_version': None, - 'api_version_auto_timeout_ms': 2000, + 'bootstrap_timeout_ms': 2000, 'selector': selectors.DefaultSelector, 'sasl_mechanism': None, 'sasl_plain_username': None, @@ -220,9 +220,8 @@ def __init__(self, **configs): # Goal: migrate all self._client calls -> self._manager (skipping compat layer) self._manager = self._client._manager - # Get auto-discovered version from client if necessary - self.config['api_version'] = self._client.get_broker_version(timeout_ms=self.config['api_version_auto_timeout_ms']) - + # Bootstrap on __init__ + self._manager.run(self._manager.bootstrap(timeout_ms=self.config['bootstrap_timeout_ms'])) self._closed = False self._refresh_controller_id() log.debug("KafkaAdminClient started.") @@ -483,7 +482,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ if validate_only: raise IncompatibleBrokerVersion( "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." - .format(self.config['api_version'])) + .format(self._manager.broker_version)) request = CreateTopicsRequest[version]( topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], timeout_ms=timeout_ms @@ -1016,7 +1015,7 @@ def describe_configs(self, config_resources, include_synonyms=False): if include_synonyms and version == 0: raise IncompatibleBrokerVersion( "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}." - .format(self.config['api_version'])) + .format(self._manager.broker_version)) requests = [] if len(broker_resources) > 0: diff --git a/kafka/net/manager.py b/kafka/net/manager.py index cf2790739..571a6ca19 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -70,6 +70,12 @@ def __init__(self, net, cluster, **configs): if self.config['api_version'] is not None: self.broker_version_data = BrokerVersionData(self.config['api_version']) + @property + def broker_version(self): + if self.broker_version_data is None: + return None + return self.broker_version_data.broker_version + def least_used_connections(self): return sorted(filter(lambda conn: conn.connected, self._conns.values()), key=lambda conn: conn.transport.last_activity)