diff --git a/docs/apidoc/BrokerConnection.rst b/docs/apidoc/BrokerConnection.rst deleted file mode 100644 index c5366faf5..000000000 --- a/docs/apidoc/BrokerConnection.rst +++ /dev/null @@ -1,2 +0,0 @@ -.. autoclass:: kafka.BrokerConnection - :members: diff --git a/docs/apidoc/KafkaClient.rst b/docs/apidoc/KafkaClient.rst deleted file mode 100644 index 5a1630149..000000000 --- a/docs/apidoc/KafkaClient.rst +++ /dev/null @@ -1,2 +0,0 @@ -.. autoclass:: kafka.KafkaClient - :members: diff --git a/docs/apidoc/modules.rst b/docs/apidoc/modules.rst index cc72f7511..34a11349a 100644 --- a/docs/apidoc/modules.rst +++ b/docs/apidoc/modules.rst @@ -7,8 +7,6 @@ kafka-python API KafkaConsumer KafkaProducer KafkaAdminClient - KafkaClient - BrokerConnection ClusterMetadata OffsetAndMetadata TopicPartition diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 5dfab3ea8..c5d6d0133 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -121,8 +121,9 @@ class KafkaAdminClient( providing a file, only the leaf certificate will be checked against this CRL. Default: None. api_version (tuple): Specify which Kafka API version to use. If set - to None, KafkaClient will attempt to infer the broker version by - probing various APIs. Example: (0, 10, 2). Default: None + to None, KafkaConnectionManager will attempt to infer the + broker version by probing various APIs. Example: (0, 10, 2). + Default: None bootstrap_timeout_ms (int): number of milliseconds to throw a timeout exception from the constructor when bootstrapping. Default: 2000. @@ -149,7 +150,7 @@ class KafkaAdminClient( sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer token provider instance. Default: None socks5_proxy (str): Socks5 proxy url. Default: None - kafka_client (callable): Custom class / callable for creating KafkaClient instances + kafka_client (callable): Custom class / callable for creating KafkaNetClient instances """ DEFAULT_CONFIG = { # client configs diff --git a/kafka/client_async.py b/kafka/client_async.py deleted file mode 100644 index 053701021..000000000 --- a/kafka/client_async.py +++ /dev/null @@ -1,1155 +0,0 @@ -import collections -import copy -import logging -import random -import selectors -import socket -import threading -import time -import weakref - -from kafka.cluster import ClusterMetadata -from kafka.conn import BrokerConnection, ConnectionStates, get_ip_port_afi -from kafka import errors as Errors -from kafka.future import Future -from kafka.metrics import AnonMeasurable -from kafka.metrics.stats import Avg, Count, Rate -from kafka.metrics.stats.rate import TimeUnit -from kafka.protocol.broker_version_data import BrokerVersionData -from kafka.protocol.metadata import MetadataRequest -from kafka.util import Dict, Timer, WeakMethod -from kafka.version import __version__ - - -log = logging.getLogger('kafka.client') - - -class KafkaClient: - """ - A network client for asynchronous request/response network I/O. - - This is an internal class used to implement the user-facing producer and - consumer clients. - - This class is not thread-safe! - - Attributes: - cluster (:any:`ClusterMetadata`): Local cache of cluster metadata, retrieved - via MetadataRequests during :meth:`~kafka.KafkaClient.poll`. - - Keyword Arguments: - bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' - strings) that the client should contact to bootstrap initial - cluster metadata. This does not have to be the full node list. - It just needs to have at least one broker that will respond to a - Metadata API Request. Default port is 9092. If no servers are - specified, will default to localhost:9092. - client_id (str): a name for this client. This string is passed in - each request to servers and can be used to identify specific - server-side log entries that correspond to this client. Also - submitted to GroupCoordinator for logging with respect to - consumer group administration. Default: 'kafka-python-{version}' - reconnect_backoff_ms (int): The amount of time in milliseconds to - wait before attempting to reconnect to a given host. - Default: 50. - reconnect_backoff_max_ms (int): The maximum amount of time in - milliseconds to backoff/wait when reconnecting to a broker that has - repeatedly failed to connect. If provided, the backoff per host - will increase exponentially for each consecutive connection - failure, up to this maximum. Once the maximum is reached, - reconnection attempts will continue periodically with this fixed - rate. To avoid connection storms, a randomization factor of 0.2 - will be applied to the backoff resulting in a random range between - 20% below and 20% above the computed value. Default: 30000. - request_timeout_ms (int): Client request timeout in milliseconds. - Default: 30000. - connections_max_idle_ms: Close idle connections after the number of - milliseconds specified by this config. The broker closes idle - connections after connections.max.idle.ms, so this avoids hitting - unexpected socket disconnected errors on the client. - Default: 540000 - retry_backoff_ms (int): Milliseconds to backoff when retrying on - errors. Default: 100. - max_in_flight_requests_per_connection (int): Requests are pipelined - to kafka brokers up to this number of maximum requests per - broker connection. Default: 5. - receive_buffer_bytes (int): The size of the TCP receive buffer - (SO_RCVBUF) to use when reading data. Default: None (relies on - system defaults). Java client defaults to 32768. - send_buffer_bytes (int): The size of the TCP send buffer - (SO_SNDBUF) to use when sending data. Default: None (relies on - system defaults). Java client defaults to 131072. - socket_options (list): List of tuple-arguments to socket.setsockopt - to apply to broker connection sockets. Default: - [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] - metadata_max_age_ms (int): The period of time in milliseconds after - which we force a refresh of metadata even if we haven't seen any - partition leadership changes to proactively discover any new - brokers or partitions. Default: 300000 - allow_auto_create_topics (bool): Enable/disable auto topic creation - on metadata request. Only available with api_version >= (0, 11). - Default: True - security_protocol (str): Protocol used to communicate with brokers. - Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. - Default: PLAINTEXT. - ssl_context (ssl.SSLContext): Pre-configured SSLContext for wrapping - socket connections. If provided, all other ssl_* configurations - will be ignored. Default: None. - ssl_check_hostname (bool): Flag to configure whether SSL handshake - should verify that the certificate matches the broker's hostname. - Default: True. - ssl_cafile (str): Optional filename of CA file to use in certificate - verification. Default: None. - ssl_certfile (str): Optional filename of file in PEM format containing - the client certificate, as well as any CA certificates needed to - establish the certificate's authenticity. Default: None. - ssl_keyfile (str): Optional filename containing the client private key. - Default: None. - ssl_password (str): Optional password to be used when loading the - certificate chain. Default: None. - ssl_crlfile (str): Optional filename containing the CRL to check for - certificate expiration. By default, no CRL check is done. When - providing a file, only the leaf certificate will be checked against - this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. - Default: None. - ssl_ciphers (str): optionally set the available ciphers for ssl - connections. It should be a string in the OpenSSL cipher list - format. If no cipher can be selected (because compile-time options - or other configuration forbids use of all the specified ciphers), - an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers - api_version (tuple): Specify which Kafka API version to use. If set to - None, the client will attempt to determine the broker version via - ApiVersionsRequest API or, for brokers earlier than 0.10, probing - various known APIs. Dynamic version checking is performed eagerly - during __init__ and can raise NoBrokersAvailableError if no connection - was made before timeout (see api_version_auto_timeout_ms below). - Different versions enable different functionality. - - Examples: - (3, 9) most recent broker release, enable all supported features - (0, 10, 0) enables sasl authentication - (0, 8, 0) enables basic functionality only - - Default: None - api_version_auto_timeout_ms (int): number of milliseconds to throw a - timeout exception from the constructor when checking the broker - api version. Only applies if api_version set to None. - Default: 2000 - selector (selectors.BaseSelector): Provide a specific selector - implementation to use for I/O multiplexing. - Default: selectors.DefaultSelector - metrics (kafka.metrics.Metrics): Optionally provide a metrics - instance for capturing network IO stats. Default: None. - metric_group_prefix (str): Prefix for metric names. Default: '' - sasl_mechanism (str): Authentication mechanism when security_protocol - is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. - sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. - Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. - sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. - Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. - sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with - sasl mechanism handshake. If provided, sasl_kerberos_service_name and - sasl_kerberos_domain name are ignored. Default: None. - sasl_kerberos_service_name (str): Service name to include in GSSAPI - sasl mechanism handshake. Default: 'kafka' - sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI - sasl mechanism handshake. Default: one of bootstrap servers - sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer - token provider instance. Default: None - socks5_proxy (str): Socks5 proxy URL. Default: None - """ - - DEFAULT_CONFIG = { - 'bootstrap_servers': 'localhost', - 'client_id': 'kafka-python-' + __version__, - 'request_timeout_ms': 30000, - 'wakeup_timeout_ms': 3000, - 'connections_max_idle_ms': 9 * 60 * 1000, - 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max_ms': 30000, - 'max_in_flight_requests_per_connection': 5, - 'receive_buffer_bytes': None, - 'send_buffer_bytes': None, - 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], - 'sock_chunk_bytes': 4096, # undocumented experimental option - 'sock_chunk_buffer_count': 1000, # undocumented experimental option - 'retry_backoff_ms': 100, - 'allow_auto_create_topics': True, - 'metadata_max_age_ms': 300000, - 'security_protocol': 'PLAINTEXT', - 'ssl_context': None, - 'ssl_check_hostname': True, - 'ssl_cafile': None, - 'ssl_certfile': None, - 'ssl_keyfile': None, - 'ssl_password': None, - 'ssl_crlfile': None, - 'ssl_ciphers': None, - 'api_version': None, - 'api_version_auto_timeout_ms': 2000, - 'selector': selectors.DefaultSelector, - 'metrics': None, - 'metric_group_prefix': '', - 'sasl_mechanism': None, - 'sasl_plain_username': None, - 'sasl_plain_password': None, - 'sasl_kerberos_name': None, - 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None, - 'socks5_proxy': None, - } - - def __init__(self, **configs): - self.config = copy.copy(self.DEFAULT_CONFIG) - for key in self.config: - if key in configs: - self.config[key] = configs[key] - - # these properties need to be set on top of the initialization pipeline - # because they are used when __del__ method is called - self._closed = False - self._selector = self.config['selector']() - self._init_wakeup_socketpair() - self._wake_lock = threading.Lock() - - self.cluster = ClusterMetadata(**self.config) - self._conns = Dict() # object to support weakrefs - self.broker_version_data = None # set on bootstrap or via user config - self.broker_version_data_future = Future() - self._connecting = set() - self._sending = set() - - # Not currently used, but data is collected internally - self._last_bootstrap = 0 - self._bootstrap_fails = 0 - - self._lock = threading.RLock() - - # when requests complete, they are transferred to this queue prior to - # invocation. The purpose is to avoid invoking them while holding the - # lock above. - self._pending_completion = collections.deque() - - self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms']) - self._sensors = None - if self.config['metrics']: - self._sensors = KafkaClientMetrics(self.config['metrics'], - self.config['metric_group_prefix'], - weakref.proxy(self._conns)) - - # Check Broker Version if not set explicitly - if self.config['api_version'] is None: - self.get_broker_version(timeout_ms=self.config['api_version_auto_timeout_ms']) - else: - self.broker_version_data = BrokerVersionData(self.config['api_version']) - self.broker_version_data_future.success(self.broker_version_data) - - def _init_wakeup_socketpair(self): - self._wake_r, self._wake_w = socket.socketpair() - self._wake_r.setblocking(False) - self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0) - self._waking = False - self._selector.register(self._wake_r, selectors.EVENT_READ) - - def _close_wakeup_socketpair(self): - if self._wake_r is not None: - try: - self._selector.unregister(self._wake_r) - except (KeyError, ValueError, TypeError): - pass - self._wake_r.close() - if self._wake_w is not None: - self._wake_w.close() - self._wake_r = None - self._wake_w = None - - def _can_connect(self, node_id): - if node_id not in self._conns: - if self.cluster.broker_metadata(node_id): - return True - return False - conn = self._conns[node_id] - return conn.disconnected() and not conn.blacked_out() - - def _conn_state_change(self, node_id, sock, conn): - with self._lock: - if conn.state is ConnectionStates.CONNECTING: - # SSL connections can enter this state 2x (second during Handshake) - if node_id not in self._connecting: - self._connecting.add(node_id) - try: - self._selector.register(sock, selectors.EVENT_WRITE, conn) - except KeyError: - self._selector.modify(sock, selectors.EVENT_WRITE, conn) - - if self.cluster.is_bootstrap(node_id): - self._last_bootstrap = time.monotonic() - - elif conn.state is ConnectionStates.API_VERSIONS_SEND: - try: - self._selector.register(sock, selectors.EVENT_WRITE, conn) - except KeyError: - self._selector.modify(sock, selectors.EVENT_WRITE, conn) - - elif conn.state in (ConnectionStates.API_VERSIONS_RECV, ConnectionStates.AUTHENTICATING): - try: - self._selector.register(sock, selectors.EVENT_READ, conn) - except KeyError: - self._selector.modify(sock, selectors.EVENT_READ, conn) - - elif conn.state is ConnectionStates.CONNECTED: - log.debug("Node %s connected", node_id) - if node_id in self._connecting: - self._connecting.remove(node_id) - - try: - self._selector.modify(sock, selectors.EVENT_READ, conn) - except KeyError: - self._selector.register(sock, selectors.EVENT_READ, conn) - - if self._sensors: - self._sensors.connection_created.record() - - self._idle_expiry_manager.update(node_id) - - if self.cluster.is_bootstrap(node_id): - self._bootstrap_fails = 0 - if self.broker_version_data is None: - self.broker_version_data = conn.broker_version_data - self.broker_version_data_future.success(self.broker_version_data) - - else: - for node_id in list(self._conns.keys()): - if self.cluster.is_bootstrap(node_id): - self._conns.pop(node_id).close() - - # Connection failures imply that our metadata is stale, so let's refresh - elif conn.state is ConnectionStates.DISCONNECTED: - if node_id in self._connecting: - self._connecting.remove(node_id) - try: - self._selector.unregister(sock) - except (KeyError, ValueError): - pass - - if self._sensors: - self._sensors.connection_closed.record() - - idle_disconnect = False - if self._idle_expiry_manager.is_expired(node_id): - idle_disconnect = True - self._idle_expiry_manager.remove(node_id) - - # If the connection has already by popped from self._conns, - # we can assume the disconnect was intentional and not a failure - if node_id not in self._conns: - pass - - elif self.cluster.is_bootstrap(node_id): - self._bootstrap_fails += 1 - - elif conn.connect_failed() and not self._closed and not idle_disconnect: - log.warning("Node %s connection failed -- refreshing metadata", node_id) - self.cluster.request_update() - - def maybe_connect(self, node_id, wakeup=True): - """Queues a node for asynchronous connection during the next .poll()""" - if self._can_connect(node_id): - self._connecting.add(node_id) - # Wakeup signal is useful in case another thread is - # blocked waiting for incoming network traffic while holding - # the client lock in poll(). - if wakeup: - self.wakeup() - return True - return False - - def connection_failed(self, node_id): - if node_id not in self._conns: - return False - return self._conns[node_id].connect_failed() - - def _should_recycle_connection(self, conn): - # Never recycle unless disconnected - if not conn.disconnected(): - return False - - # Otherwise, only recycle when broker metadata has changed - broker = self.cluster.broker_metadata(conn.node_id) - if broker is None: - return False - - host, _, _ = get_ip_port_afi(broker.host) - if conn.host != host or conn.port != broker.port: - log.info("Broker metadata change detected for node %s" - " from %s:%s to %s:%s", conn.node_id, conn.host, conn.port, - broker.host, broker.port) - return True - - return False - - def _init_connect(self, node_id): - """Idempotent non-blocking connection attempt to the given node id. - - Returns True if connection object exists and is connected / connecting - """ - with self._lock: - conn = self._conns.get(node_id) - - # Check if existing connection should be recreated because host/port changed - if conn is not None and self._should_recycle_connection(conn): - self._conns.pop(node_id).close() - conn = None - - if conn is None: - broker = self.cluster.broker_metadata(node_id) - if broker is None: - log.debug('Broker id %s not in current metadata', node_id) - return False - - log.debug("Initiating connection to node %s at %s:%s", - node_id, broker.host, broker.port) - host, port, afi = get_ip_port_afi(broker.host) - cb = WeakMethod(self._conn_state_change) - conn = BrokerConnection(host, broker.port, afi, - state_change_callback=cb, - node_id=node_id, - broker_version_data=self.broker_version_data, - **self.config) - self._conns[node_id] = conn - - if conn.disconnected(): - conn.connect() - return not conn.disconnected() - - def ready(self, node_id, metadata_priority=True): - """Check whether a node is connected and ok to send more requests. - - Arguments: - node_id (int): the id of the node to check - metadata_priority (bool): Mark node as not-ready if a metadata - refresh is required. Default: True - - Returns: - bool: True if we are ready to send to the given node - """ - self.maybe_connect(node_id) - return self.is_ready(node_id, metadata_priority=metadata_priority) - - def connected(self, node_id): - """Return True iff the node_id is connected.""" - conn = self._conns.get(node_id) - if conn is None: - return False - return conn.connected() - - def _close(self): - if not self._closed: - self._closed = True - self._close_wakeup_socketpair() - self._selector.close() - - def close(self, node_id=None): - """Close one or all broker connections. - - Arguments: - node_id (int, optional): the id of the node to close - """ - with self._lock: - if node_id is None: - self._close() - conns = list(self._conns.values()) - self._conns.clear() - for conn in conns: - conn.close() - elif node_id in self._conns: - self._conns.pop(node_id).close() - else: - log.warning("Node %s not found in current connection list; skipping", node_id) - return - - def __del__(self): - self._close() - - def is_disconnected(self, node_id): - """Check whether the node connection has been disconnected or failed. - - A disconnected node has either been closed or has failed. Connection - failures are usually transient and can be resumed in the next ready() - call, but there are cases where transient failures need to be caught - and re-acted upon. - - Arguments: - node_id (int): the id of the node to check - - Returns: - bool: True iff the node exists and is disconnected - """ - conn = self._conns.get(node_id) - if conn is None: - return False - return conn.disconnected() - - def connection_delay(self, node_id): - """ - Return the number of milliseconds to wait, based on the connection - state, before attempting to send data. When connecting or disconnected, - this respects the reconnect backoff time. When connected, returns a very large - number to handle slow/stalled connections. - - Arguments: - node_id (int): The id of the node to check - - Returns: - int: The number of milliseconds to wait. - """ - conn = self._conns.get(node_id) - if conn is None: - return 0 - return conn.connection_delay() - - def throttle_delay(self, node_id): - """ - Return the number of milliseconds to wait until a broker is no longer throttled. - When disconnected / connecting, returns 0. - """ - conn = self._conns.get(node_id) - if conn is None: - return 0 - return conn.throttle_delay() - - def is_ready(self, node_id, metadata_priority=True): - """Check whether a node is ready to send more requests. - - In addition to connection-level checks, this method also is used to - block additional requests from being sent during a metadata refresh. - - Arguments: - node_id (int): id of the node to check - metadata_priority (bool): Mark node as not-ready if a metadata - refresh is required. Default: True - - Returns: - bool: True if the node is ready and metadata is not refreshing - """ - if not self._can_send_request(node_id): - return False - - # if we need to update our metadata now declare all requests unready to - # make metadata requests first priority - if metadata_priority: - if self.cluster.metadata_refresh_in_progress: - return False - if self.cluster.ttl() == 0: - return False - return True - - def _can_send_request(self, node_id): - conn = self._conns.get(node_id) - if not conn: - return False - return conn.connected() and conn.can_send_more() - - def send(self, node_id, request, wakeup=True, request_timeout_ms=None): - """Send a request to a specific node. Bytes are placed on an - internal per-connection send-queue. Actual network I/O will be - triggered in a subsequent call to .poll() - - Arguments: - node_id (int): destination node - request (Struct): request object (not-encoded) - - Keyword Arguments: - wakeup (bool, optional): optional flag to disable thread-wakeup. - request_timeout_ms (int, optional): Provide custom timeout in milliseconds. - If response is not processed before timeout, client will fail the - request and close the connection. - Default: None (uses value from client configuration) - - Raises: - AssertionError: if node_id is not in current cluster metadata - - Returns: - Future: resolves to Response struct or Error - """ - conn = self._conns.get(node_id) - if not conn or not self._can_send_request(node_id): - self.maybe_connect(node_id, wakeup=wakeup) - return Future().failure(Errors.NodeNotReadyError(node_id)) - - # conn.send will queue the request internally - # we will need to call send_pending_requests() - # to trigger network I/O - future = conn.send(request, blocking=False, request_timeout_ms=request_timeout_ms) - if not future.is_done: - self._sending.add(conn) - - # Wakeup signal is useful in case another thread is - # blocked waiting for incoming network traffic while holding - # the client lock in poll(). - if wakeup: - self.wakeup() - - return future - - def poll(self, timeout_ms=None, future=None): - """Try to read and write to sockets. - - This method will also attempt to complete node connections, refresh - stale metadata, and run previously-scheduled tasks. - - Arguments: - timeout_ms (int, optional): maximum amount of time to wait (in ms) - for at least one response. Must be non-negative. The actual - timeout will be the minimum of timeout, request timeout and - metadata timeout. Default: request_timeout_ms - future (Future, optional): if provided, blocks until future.is_done - - Returns: - list: responses received (can be empty) - """ - if not isinstance(timeout_ms, (int, float, type(None))): - raise TypeError('Invalid type for timeout: %s' % type(timeout_ms)) - timer = Timer(timeout_ms) - - # Loop for futures, break after first loop if None - responses = [] - while True: - with self._lock: - if self._closed: - break - - # Attempt to complete pending connections - for node_id in list(self._connecting): - # False return means no more connection progress is possible - # Connected nodes will update _connecting via state_change callback - if not self._init_connect(node_id): - # It's possible that the connection attempt triggered a state change - # but if not, make sure to remove from _connecting list - if node_id in self._connecting: - self._connecting.remove(node_id) - - # Send a metadata request if needed (or initiate new connection) - metadata_timeout_ms = self._maybe_refresh_metadata() - - # If we got a future that is already done, don't block in _poll - if future is not None and future.is_done: - timeout = 0 - else: - user_timeout_ms = timer.timeout_ms if timeout_ms is not None else self.config['request_timeout_ms'] - idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms() - request_timeout_ms = self._next_ifr_request_timeout_ms() - log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", user_timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms) - timeout = min( - user_timeout_ms, - metadata_timeout_ms, - idle_connection_timeout_ms, - request_timeout_ms) - timeout = max(0, timeout) # avoid negative timeouts - - self._poll(timeout / 1000) - - # called without the lock to avoid deadlock potential - # if handlers need to acquire locks - responses.extend(self._fire_pending_completed_requests()) - - # If all we had was a timeout (future is None) - only do one poll - # If we do have a future, we keep looping until it is done - if future is None: - break - elif future.is_done: - break - elif timeout_ms is not None and timer.expired: - break - - return responses - - def _register_send_sockets(self): - while self._sending: - conn = self._sending.pop() - if conn._sock is None: - continue - try: - key = self._selector.get_key(conn._sock) - events = key.events | selectors.EVENT_WRITE - self._selector.modify(key.fileobj, events, key.data) - except KeyError: - self._selector.register(conn._sock, selectors.EVENT_WRITE, conn) - - def _poll(self, timeout): - # Python throws OverflowError if timeout is > 2147483647 milliseconds - # (though the param to selector.select is in seconds) - # so convert any too-large timeout to blocking - if timeout > 2147483: - timeout = None - # This needs to be locked, but since it is only called from within the - # locked section of poll(), there is no additional lock acquisition here - processed = set() - - # Send pending requests first, before polling for responses - self._register_send_sockets() - - start_select = time.monotonic() - ready = self._selector.select(timeout) - end_select = time.monotonic() - if self._sensors: - self._sensors.select_time.record((end_select - start_select) * 1000000000) - - for key, events in ready: - if key.fileobj is self._wake_r: - self._clear_wake_fd() - continue - - # Send pending requests if socket is ready to write - if events & selectors.EVENT_WRITE: - conn = key.data - if conn.connecting(): - conn.connect() - else: - if conn.send_pending_requests_v2(): - # If send is complete, we dont need to track write readiness - # for this socket anymore - if key.events ^ selectors.EVENT_WRITE: - self._selector.modify( - key.fileobj, - key.events ^ selectors.EVENT_WRITE, - key.data) - else: - self._selector.unregister(key.fileobj) - - if not (events & selectors.EVENT_READ): - continue - conn = key.data - processed.add(conn) - - if not conn.in_flight_requests: - # if we got an EVENT_READ but there were no in-flight requests, one of - # two things has happened: - # - # 1. The remote end closed the connection (because it died, or because - # a firewall timed out, or whatever) - # 2. The protocol is out of sync. - # - # either way, we can no longer safely use this connection - # - # Do a 1-byte read to check protocol didnt get out of sync, and then close the conn - try: - unexpected_data = key.fileobj.recv(1) - if unexpected_data: # anything other than a 0-byte read means protocol issues - log.warning('Protocol out of sync on %r, closing', conn) - except socket.error: - pass - conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests')) - continue - - self._idle_expiry_manager.update(conn.node_id) - self._pending_completion.extend(conn.recv()) - - # Check for additional pending SSL bytes - if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): - # TODO: optimize - for conn in self._conns.values(): - if conn not in processed and conn.connected() and conn._sock.pending(): - self._pending_completion.extend(conn.recv()) - - for conn in self._conns.values(): - if conn.requests_timed_out(): - timed_out = conn.timed_out_ifrs() - timeout_ms = (timed_out[0][2] - timed_out[0][1]) * 1000 - log.warning('%s timed out after %s ms. Closing connection.', - conn, timeout_ms) - conn.close(error=Errors.RequestTimedOutError( - 'Request timed out after %s ms' % - timeout_ms)) - - if self._sensors: - self._sensors.io_time.record((time.monotonic() - end_select) * 1000000000) - - self._maybe_close_oldest_connection() - - def in_flight_request_count(self, node_id=None): - """Get the number of in-flight requests for a node or all nodes. - - Arguments: - node_id (int, optional): a specific node to check. If unspecified, - return the total for all nodes - - Returns: - int: pending in-flight requests for the node, or all nodes if None - """ - if node_id is not None: - conn = self._conns.get(node_id) - if conn is None: - return 0 - return len(conn.in_flight_requests) - else: - return sum([len(conn.in_flight_requests) - for conn in list(self._conns.values())]) - - def _fire_pending_completed_requests(self): - responses = [] - while True: - try: - # We rely on deque.popleft remaining threadsafe - # to allow both the heartbeat thread and the main thread - # to process responses - response, future = self._pending_completion.popleft() - except IndexError: - break - future.success(response) - responses.append(response) - - return responses - - def least_loaded_node(self, bootstrap_fallback=False): - """Choose the node with fewest outstanding requests, with fallbacks. - - This method will prefer a node with an existing connection (not throttled) - with no in-flight-requests. If no such node is found, a node will be chosen - randomly from all nodes that are not throttled or "blacked out" (i.e., - are not subject to a reconnect backoff). If no node metadata has been - obtained, will return a bootstrap node. - - Returns: - node_id or None if no suitable node was found - """ - brokers = self.cluster.brokers() - if not brokers and bootstrap_fallback: - brokers = self.cluster.bootstrap_brokers() - nodes = [broker.node_id for broker in brokers] - random.shuffle(nodes) - - inflight = float('inf') - found = None - for node_id in nodes: - conn = self._conns.get(node_id) - connected = conn is not None and conn.connected() and conn.can_send_more() - blacked_out = conn is not None and (conn.blacked_out() or conn.throttled()) - curr_inflight = len(conn.in_flight_requests) if conn is not None else 0 - if connected and curr_inflight == 0: - # if we find an established connection (not throttled) - # with no in-flight requests, we can stop right away - return node_id - elif not blacked_out and curr_inflight < inflight: - # otherwise if this is the best we have found so far, record that - inflight = curr_inflight - found = node_id - return found - - def _refresh_delay_ms(self, node_id): - conn = self._conns.get(node_id) - if conn is not None and conn.connected(): - return self.throttle_delay(node_id) - else: - return self.connection_delay(node_id) - - def least_loaded_node_refresh_ms(self, bootstrap_fallback=False): - """Return connection or throttle delay in milliseconds for next available node. - - This method is used primarily for retry/backoff during metadata refresh - during / after a cluster outage, in which there are no available nodes. - - Returns: - float: delay_ms - """ - brokers = self.cluster.brokers() - if not brokers and bootstrap_fallback: - brokers = self.cluster.bootstrap_brokers() - if not brokers: - return self.config['retry_backoff_ms'] - return min([self._refresh_delay_ms(broker.node_id) for broker in brokers]) - - def _next_ifr_request_timeout_ms(self): - if self._conns: - return min([conn.next_ifr_request_timeout_ms() for conn in self._conns.values()]) - else: - return float('inf') - - # This method should be locked when running multi-threaded - def _maybe_refresh_metadata(self, wakeup=False): - """Send a metadata request if needed. - - Returns: - float: milliseconds until next refresh - """ - metadata_timeout = self.cluster.ttl() - if metadata_timeout > 0: - return metadata_timeout - - # Beware that the behavior of this method and the computation of - # timeouts for poll() are highly dependent on the behavior of - # least_loaded_node() - node_id = self.least_loaded_node(bootstrap_fallback=True) - if node_id is None: - next_connect_ms = self.least_loaded_node_refresh_ms(bootstrap_fallback=True) - log.debug("Give up sending metadata request since no node is available. (reconnect delay %d ms)", next_connect_ms) - return next_connect_ms - - if not self._can_send_request(node_id): - # If there's any connection establishment underway, wait until it completes. This prevents - # the client from unnecessarily connecting to additional nodes while a previous connection - # attempt has not been completed. - if self._connecting: - return float('inf') - - elif self._can_connect(node_id): - log.debug("Initializing connection to node %s for metadata request", node_id) - self._connecting.add(node_id) - if not self._init_connect(node_id): - if node_id in self._connecting: - self._connecting.remove(node_id) - # Connection attempt failed immediately, need to retry with a different node - return self.config['reconnect_backoff_ms'] - else: - # Existing connection throttled or max in flight requests. - return self.throttle_delay(node_id) or self.config['request_timeout_ms'] - - # Recheck node_id in case we were able to connect immediately above - if self._can_send_request(node_id): - request = self.cluster.metadata_request() - log.debug("Sending metadata request %s to node %s", request, node_id) - future = self.send(node_id, request, wakeup=wakeup) - future.add_callback(self.cluster.update_metadata) - future.add_errback(self.cluster.failed_update) - return self.config['request_timeout_ms'] - - # Should only get here if still connecting - if self._connecting: - return float('inf') - else: - return self.config['reconnect_backoff_ms'] - - def get_api_versions(self): - """Return the ApiVersions map, if available. - - Note: Only available after bootstrap - - Returns: a map of dict mapping {api_key : (min_version, max_version)}, - """ - assert self.broker_version_data is not None - return self.broker_version_data.api_versions - - def check_version(self, node_id=None, timeout_ms=None): - """Attempt to guess the version of a Kafka broker. - - Keyword Arguments: - node_id (str, optional): Broker node id from cluster metadata. If None, attempts - to connect to any available broker until version is identified. - Default: None - timeout (num, optional): Maximum time in seconds to try to check broker version. - If unable to identify version before timeout, raise error (see below). - Default: None - - Returns: version tuple, i.e. (3, 9), (2, 0), (0, 10, 2) etc - - Raises: - NodeNotReadyError (if node_id is provided) - NoBrokersAvailable (if node_id is None) - """ - if node_id is not None: - self.await_ready(node_id, timeout_ms=timeout_ms) - return self._conns[node_id].broker_version - - # node_id is None, we can use the bootstrap broker_version - self.poll(future=self.broker_version_data_future, timeout_ms=timeout_ms) - if not self.broker_version_data_future.is_done: - raise Errors.NoBrokersAvailable() - elif self.broker_version_data_future.failed(): - raise self.broker_version_data_future.exception - return self.broker_version_data.broker_version - - def api_version(self, operation, max_version=None): - assert self.broker_version_data is not None - return self.broker_version_data.api_version(operation, max_version=max_version) - - def wakeup(self): - if self._closed or self._waking or self._wake_w is None: - return - with self._wake_lock: - try: - self._wake_w.sendall(b'x') - self._waking = True - except socket.timeout as e: - log.warning('Timeout to send to wakeup socket!') - raise Errors.KafkaTimeoutError(e) - except socket.error as e: - log.warning('Unable to send to wakeup socket! %s', e) - raise e - - def _clear_wake_fd(self): - # reading from wake socket should only happen in a single thread - with self._wake_lock: - self._waking = False - while True: - try: - if not self._wake_r.recv(1024): - # Non-blocking socket returns empty on error - log.warning("Error reading wakeup socket. Rebuilding socketpair.") - self._close_wakeup_socketpair() - self._init_wakeup_socketpair() - break - except socket.error: - # Non-blocking socket raises when socket is ok but no data available to read - break - - def _maybe_close_oldest_connection(self): - expired_connection = self._idle_expiry_manager.poll_expired_connection() - if expired_connection: - conn_id, ts = expired_connection - idle_ms = (time.monotonic() - ts) * 1000 - log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms) - self.close(node_id=conn_id) - - def get_broker_version(self, timeout_ms=None): - self.poll(future=self.broker_version_data_future, timeout_ms=timeout_ms) - if not self.broker_version_data_future.is_done: - raise Errors.KafkaTimeoutError('Timeout attempting to get broker api version!') - elif self.broker_version_data_future.failed(): - raise self.broker_version_data_future.exception - else: - return self.broker_version_data.broker_version - - def bootstrap_connected(self): - """Return True if a bootstrap node is connected""" - for node_id in self._conns: - if not self.cluster.is_bootstrap(node_id): - continue - if self._conns[node_id].connected(): - return True - else: - return False - - def await_ready(self, node_id, timeout_ms=30000): - """ - Invokes `poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll` - invocations until the connection to `node` is ready, the timeoutMs expires or the connection fails. - - It returns `true` if the call completes normally or `false` if the timeoutMs expires. If the connection fails, - an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive - connection timeoutMs, it is possible for this method to raise an `IOException` for a previous connection which - has recently disconnected. - - This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with - care. - """ - timer = Timer(timeout_ms) - self.poll(timeout_ms=0) - if self.is_ready(node_id): - return True - - while not self.is_ready(node_id) and not timer.expired: - if self.connection_failed(node_id): - raise Errors.KafkaConnectionError("Connection to %s failed." % (node_id,)) - self.maybe_connect(node_id) - self.poll(timeout_ms=timer.timeout_ms) - return self.is_ready(node_id) - - def send_and_receive(self, node_id, request): - future = self.send(node_id, request) - self.poll(future=future) - assert future.is_done - if future.failed(): - raise future.exception - return future.value - - -class IdleConnectionManager: - def __init__(self, connections_max_idle_ms): - if connections_max_idle_ms > 0: - self.connections_max_idle = connections_max_idle_ms / 1000 - else: - self.connections_max_idle = float('inf') - self.next_idle_close_check_time = None - self.update_next_idle_close_check_time(time.monotonic()) - self.lru_connections = collections.OrderedDict() - - def update(self, conn_id): - # order should reflect last-update - if conn_id in self.lru_connections: - del self.lru_connections[conn_id] - self.lru_connections[conn_id] = time.monotonic() - - def remove(self, conn_id): - if conn_id in self.lru_connections: - del self.lru_connections[conn_id] - - def is_expired(self, conn_id): - if conn_id not in self.lru_connections: - return None - return time.monotonic() >= self.lru_connections[conn_id] + self.connections_max_idle - - def next_check_ms(self): - now = time.monotonic() - if not self.lru_connections or self.next_idle_close_check_time == float('inf'): - return float('inf') - elif self.next_idle_close_check_time <= now: - return 0 - else: - return int((self.next_idle_close_check_time - now) * 1000) - - def update_next_idle_close_check_time(self, ts): - self.next_idle_close_check_time = ts + self.connections_max_idle - - def poll_expired_connection(self): - if time.monotonic() < self.next_idle_close_check_time: - return None - - if not len(self.lru_connections): - return None - - oldest_conn_id = None - oldest_ts = None - (oldest_conn_id, oldest_ts) = next(iter(self.lru_connections.items())) - - self.update_next_idle_close_check_time(oldest_ts) - - if time.monotonic() >= oldest_ts + self.connections_max_idle: - return (oldest_conn_id, oldest_ts) - else: - return None - - -class KafkaClientMetrics: - def __init__(self, metrics, metric_group_prefix, conns): - self.metrics = metrics - self.metric_group_name = metric_group_prefix + '-metrics' - - self.connection_closed = metrics.sensor('connections-closed') - self.connection_closed.add(metrics.metric_name( - 'connection-close-rate', self.metric_group_name, - 'Connections closed per second in the window.'), Rate()) - self.connection_created = metrics.sensor('connections-created') - self.connection_created.add(metrics.metric_name( - 'connection-creation-rate', self.metric_group_name, - 'New connections established per second in the window.'), Rate()) - - self.select_time = metrics.sensor('select-time') - self.select_time.add(metrics.metric_name( - 'select-rate', self.metric_group_name, - 'Number of times the I/O layer checked for new I/O to perform per' - ' second'), Rate(sampled_stat=Count())) - self.select_time.add(metrics.metric_name( - 'io-wait-time-ns-avg', self.metric_group_name, - 'The average length of time the I/O thread spent waiting for a' - ' socket ready for reads or writes in nanoseconds.'), Avg()) - self.select_time.add(metrics.metric_name( - 'io-wait-ratio', self.metric_group_name, - 'The fraction of time the I/O thread spent waiting.'), - Rate(time_unit=TimeUnit.NANOSECONDS)) - - self.io_time = metrics.sensor('io-time') - self.io_time.add(metrics.metric_name( - 'io-time-ns-avg', self.metric_group_name, - 'The average length of time for I/O per select call in nanoseconds.'), - Avg()) - self.io_time.add(metrics.metric_name( - 'io-ratio', self.metric_group_name, - 'The fraction of time the I/O thread spent doing I/O'), - Rate(time_unit=TimeUnit.NANOSECONDS)) - - metrics.add_metric(metrics.metric_name( - 'connection-count', self.metric_group_name, - 'The current number of active connections.'), AnonMeasurable( - lambda config, now: len(conns))) diff --git a/kafka/cluster.py b/kafka/cluster.py index ba70e1d98..2c4a4424a 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -3,11 +3,11 @@ import logging import random import re +import socket import threading import time from kafka import errors as Errors -from kafka.conn import get_ip_port_afi from kafka.future import Future from kafka.protocol.metadata import MetadataRequest, MetadataResponse from kafka.structs import TopicPartition @@ -484,3 +484,73 @@ def collect_hosts(hosts, randomize=True): if randomize: random.shuffle(result) return result + + +def _address_family(address): + """ + Attempt to determine the family of an address (or hostname) + + :return: either socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC if the address family + could not be determined + """ + if address.startswith('[') and address.endswith(']'): + return socket.AF_INET6 + for af in (socket.AF_INET, socket.AF_INET6): + try: + socket.inet_pton(af, address) + return af + except (ValueError, AttributeError, socket.error): + continue + return socket.AF_UNSPEC + + +DEFAULT_KAFKA_PORT = 9092 + + +def get_ip_port_afi(host_and_port_str): + """ + Parse the IP and port from a string in the format of: + + * host_or_ip <- Can be either IPv4 address literal or hostname/fqdn + * host_or_ipv4:port <- Can be either IPv4 address literal or hostname/fqdn + * [host_or_ip] <- IPv6 address literal + * [host_or_ip]:port. <- IPv6 address literal + + .. note:: IPv6 address literals with ports *must* be enclosed in brackets + + .. note:: If the port is not specified, default will be returned. + + :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC + """ + host_and_port_str = host_and_port_str.strip() + if host_and_port_str.startswith('['): + af = socket.AF_INET6 + host, rest = host_and_port_str[1:].split(']') + if rest: + port = int(rest[1:]) + else: + port = DEFAULT_KAFKA_PORT + return host, port, af + else: + if ':' not in host_and_port_str: + af = _address_family(host_and_port_str) + return host_and_port_str, DEFAULT_KAFKA_PORT, af + else: + # now we have something with a colon in it and no square brackets. It could be + # either an IPv6 address literal (e.g., "::1") or an IP:port pair or a host:port pair + try: + # if it decodes as an IPv6 address, use that + socket.inet_pton(socket.AF_INET6, host_and_port_str) + return host_and_port_str, DEFAULT_KAFKA_PORT, socket.AF_INET6 + except AttributeError: + log.warning('socket.inet_pton not available on this platform.' + ' consider `pip install win_inet_pton`') + pass + except (ValueError, socket.error): + # it's a host:port pair + pass + host, port = host_and_port_str.rsplit(':', 1) + port = int(port) + + af = _address_family(host) + return host, port, af diff --git a/kafka/conn.py b/kafka/conn.py deleted file mode 100644 index ac3b50fee..000000000 --- a/kafka/conn.py +++ /dev/null @@ -1,1429 +0,0 @@ -import copy -import errno -import io -import logging -from random import uniform -import selectors -import socket -import threading -import time - -import kafka.errors as Errors -from kafka.future import Future -from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.metadata import ApiVersionsRequest -from kafka.protocol.broker_version_data import BrokerVersionData, VERSION_CHECKS -from kafka.protocol.parser import KafkaProtocol -from kafka.protocol.sasl import SaslAuthenticateRequest, SaslHandshakeRequest -from kafka.protocol.schemas.fields.codecs import Int32 -from kafka.sasl import get_sasl_mechanism -from kafka.socks5_wrapper import Socks5Wrapper -from kafka.version import __version__ - - -log = logging.getLogger(__name__) - -DEFAULT_KAFKA_PORT = 9092 - -try: - import ssl - ssl_available = True - try: - SSLEOFError = ssl.SSLEOFError - SSLWantReadError = ssl.SSLWantReadError - SSLWantWriteError = ssl.SSLWantWriteError - SSLZeroReturnError = ssl.SSLZeroReturnError - except AttributeError: - # support older ssl libraries - log.warning('Old SSL module detected.' - ' SSL error handling may not operate cleanly.' - ' Consider upgrading to Python 3.3 or 2.7.9') - SSLEOFError = ssl.SSLError - SSLWantReadError = ssl.SSLError - SSLWantWriteError = ssl.SSLError - SSLZeroReturnError = ssl.SSLError -except ImportError: - # support Python without ssl libraries - ssl_available = False - class SSLWantReadError(Exception): - pass - class SSLWantWriteError(Exception): - pass - - -AFI_NAMES = { - socket.AF_UNSPEC: "unspecified", - socket.AF_INET: "IPv4", - socket.AF_INET6: "IPv6", -} - - -class ConnectionStates: - DISCONNECTED = '' - CONNECTING = '' - HANDSHAKE = '' - CONNECTED = '' - AUTHENTICATING = '' - API_VERSIONS_SEND = '' - API_VERSIONS_RECV = '' - - -class BrokerConnection: - """Initialize a Kafka broker connection - - Keyword Arguments: - client_id (str): a name for this client. This string is passed in - each request to servers and can be used to identify specific - server-side log entries that correspond to this client. Also - submitted to GroupCoordinator for logging with respect to - consumer group administration. Default: 'kafka-python-{version}' - client_software_name (str): Sent to kafka broker for KIP-511. - Default: 'kafka-python' - client_software_version (str): Sent to kafka broker for KIP-511. - Default: The kafka-python version (via kafka.version). - reconnect_backoff_ms (int): The amount of time in milliseconds to - wait before attempting to reconnect to a given host. - Default: 50. - reconnect_backoff_max_ms (int): The maximum amount of time in - milliseconds to backoff/wait when reconnecting to a broker that has - repeatedly failed to connect. If provided, the backoff per host - will increase exponentially for each consecutive connection - failure, up to this maximum. Once the maximum is reached, - reconnection attempts will continue periodically with this fixed - rate. To avoid connection storms, a randomization factor of 0.2 - will be applied to the backoff resulting in a random range between - 20% below and 20% above the computed value. Default: 30000. - request_timeout_ms (int): Client request timeout in milliseconds. - Default: 30000. - max_in_flight_requests_per_connection (int): Requests are pipelined - to kafka brokers up to this number of maximum requests per - broker connection. Default: 5. - receive_buffer_bytes (int): The size of the TCP receive buffer - (SO_RCVBUF) to use when reading data. Default: None (relies on - system defaults). Java client defaults to 32768. - send_buffer_bytes (int): The size of the TCP send buffer - (SO_SNDBUF) to use when sending data. Default: None (relies on - system defaults). Java client defaults to 131072. - socket_options (list): List of tuple-arguments to socket.setsockopt - to apply to broker connection sockets. Default: - [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] - security_protocol (str): Protocol used to communicate with brokers. - Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. - Default: PLAINTEXT. - ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping - socket connections. If provided, all other ssl_* configurations - will be ignored. Default: None. - ssl_check_hostname (bool): flag to configure whether ssl handshake - should verify that the certificate matches the brokers hostname. - default: True. - ssl_cafile (str): optional filename of ca file to use in certificate - verification. default: None. - ssl_certfile (str): optional filename of file in pem format containing - the client certificate, as well as any ca certificates needed to - establish the certificate's authenticity. default: None. - ssl_keyfile (str): optional filename containing the client private key. - default: None. - ssl_password (callable, str, bytes, bytearray): optional password or - callable function that returns a password, for decrypting the - client private key. Default: None. - ssl_crlfile (str): optional filename containing the CRL to check for - certificate expiration. By default, no CRL check is done. When - providing a file, only the leaf certificate will be checked against - this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. - default: None. - ssl_ciphers (str): optionally set the available ciphers for ssl - connections. It should be a string in the OpenSSL cipher list - format. If no cipher can be selected (because compile-time options - or other configuration forbids use of all the specified ciphers), - an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers - api_version (tuple): Specify which Kafka API version to use. - Must be None or >= (0, 10, 0) to enable SASL authentication. - Default: None - api_version_auto_timeout_ms (int): number of milliseconds to throw a - timeout exception from the constructor when checking the broker - api version. Only applies if api_version is None. Default: 2000. - selector (selectors.BaseSelector): Provide a specific selector - implementation to use for I/O multiplexing. - Default: selectors.DefaultSelector - state_change_callback (callable): function to be called when the - connection state changes from CONNECTING to CONNECTED etc. - metrics (kafka.metrics.Metrics): Optionally provide a metrics - instance for capturing network IO stats. Default: None. - metric_group_prefix (str): Prefix for metric names. Default: '' - sasl_mechanism (str): Authentication mechanism when security_protocol - is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. - sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. - Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. - sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. - Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. - sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with - sasl mechanism handshake. If provided, sasl_kerberos_service_name and - sasl_kerberos_domain name are ignored. Default: None. - sasl_kerberos_service_name (str): Service name to include in GSSAPI - sasl mechanism handshake. Default: 'kafka' - sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI - sasl mechanism handshake. Default: one of bootstrap servers - sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer - token provider instance. Default: None - socks5_proxy (str): Socks5 proxy url. Default: None - """ - - DEFAULT_CONFIG = { - 'client_id': 'kafka-python-' + __version__, - 'client_software_name': 'kafka-python', - 'client_software_version': __version__, - 'node_id': 0, - 'request_timeout_ms': 30000, - 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max_ms': 30000, - 'max_in_flight_requests_per_connection': 5, - 'receive_buffer_bytes': None, - 'send_buffer_bytes': None, - 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], - 'sock_chunk_bytes': 4096, # undocumented experimental option - 'sock_chunk_buffer_count': 1000, # undocumented experimental option - 'security_protocol': 'PLAINTEXT', - 'ssl_context': None, - 'ssl_check_hostname': True, - 'ssl_cafile': None, - 'ssl_certfile': None, - 'ssl_keyfile': None, - 'ssl_crlfile': None, - 'ssl_password': None, - 'ssl_ciphers': None, - 'api_version_auto_timeout_ms': 2000, - 'selector': selectors.DefaultSelector, - 'state_change_callback': lambda node_id, sock, conn: True, - 'metrics': None, - 'metric_group_prefix': '', - 'sasl_mechanism': None, - 'sasl_plain_username': None, - 'sasl_plain_password': None, - 'sasl_kerberos_name': None, - 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None, - 'socks5_proxy': None, - } - SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - - def __init__(self, host, port, afi, broker_version_data=None, **configs): - self.host = host - self.port = port - self.afi = afi - self._sock_afi = afi - self._sock_addr = None - self._api_versions_idx = None - self._api_versions_future = None - self._broker_version_data = None - self._check_version_idx = None - self._throttle_time = None - self._socks5_proxy = None - - self.config = copy.copy(self.DEFAULT_CONFIG) - for key in self.config: - if key in configs: - self.config[key] = configs[key] - - self.node_id = self.config.pop('node_id') - - # Accept cached data if provided - self.broker_version_data = broker_version_data - - if self.config['receive_buffer_bytes'] is not None: - self.config['socket_options'].append( - (socket.SOL_SOCKET, socket.SO_RCVBUF, - self.config['receive_buffer_bytes'])) - if self.config['send_buffer_bytes'] is not None: - self.config['socket_options'].append( - (socket.SOL_SOCKET, socket.SO_SNDBUF, - self.config['send_buffer_bytes'])) - - assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, ( - 'security_protocol must be in ' + ', '.join(self.SECURITY_PROTOCOLS)) - - if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): - assert ssl_available, "Python wasn't built with SSL support" - - self._init_sasl_mechanism() - - # This is not a general lock / this class is not generally thread-safe yet - # However, to avoid pushing responsibility for maintaining - # per-connection locks to the upstream client, we will use this lock to - # make sure that access to the protocol buffer is synchronized - # when sends happen on multiple threads - self._lock = threading.Lock() - - # the protocol parser instance manages actual tracking of the - # sequence of in-flight requests to responses, which should - # function like a FIFO queue. For additional request data, - # including tracking request futures and timestamps, we - # can use a simple dictionary of correlation_id => request data - self.in_flight_requests = dict() - - self._protocol = self._new_protocol_parser() - self.state = ConnectionStates.DISCONNECTED - self._reset_reconnect_backoff() - self._sock = None - self._send_buffer = b'' - self._ssl_context = None - if self.config['ssl_context'] is not None: - self._ssl_context = self.config['ssl_context'] - self._api_versions_check_timeout = self.config['api_version_auto_timeout_ms'] - self._sasl_auth_future = None - self.last_attempt = 0 - self._gai = [] - self._sensors = None - if self.config['metrics']: - self._sensors = BrokerConnectionMetrics(self.config['metrics'], - self.config['metric_group_prefix'], - self.node_id) - - @property - def broker_version(self): - if self.broker_version_data is None: - return None - return self.broker_version_data.broker_version - - @property - def broker_version_data(self): - return self._broker_version_data - - @broker_version_data.setter - def broker_version_data(self, value): - if value is None: - self._api_versions_idx = ApiVersionsRequest.max_version - self._check_version_idx = 0 - elif not isinstance(value, BrokerVersionData): - raise TypeError('expected BrokerVersionData') - else: - self._broker_version_data = value - # If we got cached broker data, we'll skip to the max supported ApiVersionsRequest - # or, if not supported at all, we'll just rely on the cached api_versions data - try: - self._api_versions_idx = self._broker_version_data.api_version(ApiVersionsRequest) - except Errors.IncompatibleBrokerVersion: - self._api_versions_idx = None - self._check_version_idx = None - self._api_versions_check_timeout = self.config['api_version_auto_timeout_ms'] - - def _new_protocol_parser(self): - return KafkaProtocol( - ident=f'node={self.node_id}[{self.host}:{self.port}]', - client_id=self.config['client_id'], - api_version=self.broker_version) - - def _init_sasl_mechanism(self): - if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): - self._sasl_mechanism = get_sasl_mechanism(self.config['sasl_mechanism'])(host=self.host, **self.config) - else: - self._sasl_mechanism = None - - def _dns_lookup(self): - self._gai = dns_lookup(self.host, self.port, self.afi) - if not self._gai: - log.error('%s: DNS lookup failed for %s:%i (%s)', - self, self.host, self.port, self.afi) - return False - return True - - def _next_afi_sockaddr(self): - if self.config["socks5_proxy"] and Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]): - return (socket.AF_UNSPEC, (self.host, self.port)) - - if not self._gai: - if not self._dns_lookup(): - return - afi, _, __, ___, sockaddr = self._gai.pop(0) - return (afi, sockaddr) - - def connect(self): - """Attempt to connect and return ConnectionState""" - if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out(): - self.state = ConnectionStates.CONNECTING - self.last_attempt = time.monotonic() - next_lookup = self._next_afi_sockaddr() - if not next_lookup: - self.close(Errors.KafkaConnectionError('DNS failure')) - return self.state - else: - log.debug('%s: creating new socket', self) - assert self._sock is None - self._sock_afi, self._sock_addr = next_lookup - try: - if self.config["socks5_proxy"] is not None: - log.debug('%s: initializing Socks5 proxy at %s', self, self.config["socks5_proxy"]) - self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi) - self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM) - else: - self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) - except (socket.error, OSError) as e: - self.close(e) - return self.state - - for option in self.config['socket_options']: - log.debug('%s: setting socket option %s', self, option) - self._sock.setsockopt(*option) - - self._sock.setblocking(False) - self.config['state_change_callback'](self.node_id, self._sock, self) - log.info('%s: connecting to %s:%d [%s %s]', self, self.host, - self.port, self._sock_addr, AFI_NAMES[self._sock_afi]) - - if self.state is ConnectionStates.CONNECTING: - # in non-blocking mode, use repeated calls to socket.connect_ex - # to check connection status - ret = None - try: - if self._socks5_proxy: - ret = self._socks5_proxy.connect_ex(self._sock_addr) - else: - ret = self._sock.connect_ex(self._sock_addr) - except socket.error as err: - ret = err.errno - - # Connection succeeded - if not ret or ret == errno.EISCONN: - log.debug('%s: established TCP connection', self) - - if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): - self.state = ConnectionStates.HANDSHAKE - log.debug('%s: initiating SSL handshake', self) - self.config['state_change_callback'](self.node_id, self._sock, self) - # _wrap_ssl can alter the connection state -- disconnects on failure - self._wrap_ssl() - else: - self.state = ConnectionStates.API_VERSIONS_SEND - log.debug('%s: checking broker Api Versions', self) - self.config['state_change_callback'](self.node_id, self._sock, self) - - # Connection failed - # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems - elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022): - log.error('%s: Connect attempt returned error %s.' - ' Disconnecting.', self, ret) - errstr = errno.errorcode.get(ret, 'UNKNOWN') - self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr))) - return self.state - - # Needs retry - else: - pass - - if self.state is ConnectionStates.HANDSHAKE: - if self._try_handshake(): - log.debug('%s: completed SSL handshake.', self) - self.state = ConnectionStates.API_VERSIONS_SEND - log.debug('%s: checking broker Api Versions', self) - self.config['state_change_callback'](self.node_id, self._sock, self) - - if self.state in (ConnectionStates.API_VERSIONS_SEND, ConnectionStates.API_VERSIONS_RECV): - if self._try_api_versions_check(): - # _try_api_versions_check has side-effects: possibly disconnected on socket errors - if self.state in (ConnectionStates.API_VERSIONS_SEND, ConnectionStates.API_VERSIONS_RECV): - if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): - self.state = ConnectionStates.AUTHENTICATING - log.debug('%s: initiating SASL authentication', self) - self.config['state_change_callback'](self.node_id, self._sock, self) - else: - # security_protocol PLAINTEXT - self.state = ConnectionStates.CONNECTED - log.info('%s: Connection complete.', self) - self._reset_reconnect_backoff() - self.config['state_change_callback'](self.node_id, self._sock, self) - - if self.state is ConnectionStates.AUTHENTICATING: - assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') - if self._try_authenticate(): - # _try_authenticate has side-effects: possibly disconnected on socket errors - if self.state is ConnectionStates.AUTHENTICATING: - self.state = ConnectionStates.CONNECTED - log.info('%s: Connection complete.', self) - self._reset_reconnect_backoff() - self.config['state_change_callback'](self.node_id, self._sock, self) - - if self.state not in (ConnectionStates.CONNECTED, - ConnectionStates.DISCONNECTED): - # Connection timed out - request_timeout = self.config['request_timeout_ms'] / 1000.0 - if time.monotonic() > request_timeout + self.last_attempt: - log.error('%s: Connection attempt timed out', self) - self.close(Errors.KafkaConnectionError('timeout')) - return self.state - - return self.state - - def _wrap_ssl(self): - assert self.config['security_protocol'] in ('SSL', 'SASL_SSL') - if self._ssl_context is None: - log.debug('%s: configuring default SSL Context', self) - self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - self._ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2 - self._ssl_context.verify_mode = ssl.CERT_OPTIONAL - if self.config['ssl_check_hostname']: - self._ssl_context.check_hostname = True - if self.config['ssl_cafile']: - log.info('%s: Loading SSL CA from %s', self, self.config['ssl_cafile']) - self._ssl_context.load_verify_locations(self.config['ssl_cafile']) - self._ssl_context.verify_mode = ssl.CERT_REQUIRED - else: - log.info('%s: Loading system default SSL CAs from %s', self, ssl.get_default_verify_paths()) - self._ssl_context.load_default_certs() - if self.config['ssl_certfile'] and self.config['ssl_keyfile']: - log.info('%s: Loading SSL Cert from %s', self, self.config['ssl_certfile']) - log.info('%s: Loading SSL Key from %s', self, self.config['ssl_keyfile']) - self._ssl_context.load_cert_chain( - certfile=self.config['ssl_certfile'], - keyfile=self.config['ssl_keyfile'], - password=self.config['ssl_password']) - if self.config['ssl_crlfile']: - if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'): - raise RuntimeError('This version of Python does not support ssl_crlfile!') - log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile']) - self._ssl_context.load_verify_locations(self.config['ssl_crlfile']) - self._ssl_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF - if self.config['ssl_ciphers']: - log.info('%s: Setting SSL Ciphers: %s', self, self.config['ssl_ciphers']) - self._ssl_context.set_ciphers(self.config['ssl_ciphers']) - log.debug('%s: wrapping socket in ssl context', self) - try: - self._sock = self._ssl_context.wrap_socket( - self._sock, - server_hostname=self.host.rstrip("."), - do_handshake_on_connect=False) - except ssl.SSLError as e: - log.exception('%s: Failed to wrap socket in SSLContext!', self) - self.close(e) - - def _try_handshake(self): - assert self.config['security_protocol'] in ('SSL', 'SASL_SSL') - try: - self._sock.do_handshake() - return True - # old ssl in python2.6 will swallow all SSLErrors here... - except (SSLWantReadError, SSLWantWriteError): - pass - except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError): - log.warning('%s: SSL connection closed by server during handshake.', self) - self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake')) - # Other SSLErrors will be raised to user - - return False - - def _try_api_versions_check(self): - if self._api_versions_future is None: - if self._api_versions_idx is not None: - version = self._api_versions_idx - request = ApiVersionsRequest(version=version, - client_software_name=self.config['client_software_name'], - client_software_version=self.config['client_software_version']) - future = Future() - self._api_versions_check_timeout /= 2 - response = self._send(request, blocking=True, request_timeout_ms=self._api_versions_check_timeout) - response.add_callback(self._handle_api_versions_response, future) - response.add_errback(self._handle_api_versions_failure, future) - self._api_versions_future = future - self.state = ConnectionStates.API_VERSIONS_RECV - self.config['state_change_callback'](self.node_id, self._sock, self) - # Fallback for early brokers without ApiVersions api support - elif self._check_version_idx is not None and self._check_version_idx < len(VERSION_CHECKS): - version, request = VERSION_CHECKS[self._check_version_idx] - log.debug('%s: Probing version %s with %s', self, version, request) - future = Future() - self._api_versions_check_timeout /= 2 - response = self._send(request, blocking=True, request_timeout_ms=self._api_versions_check_timeout) - response.add_callback(self._handle_check_version_response, future, version) - response.add_errback(self._handle_check_version_failure, future) - self._api_versions_future = future - self.state = ConnectionStates.API_VERSIONS_RECV - self.config['state_change_callback'](self.node_id, self._sock, self) - elif self.broker_version_data is None: - self.close(Errors.KafkaConnectionError('Unable to determine broker version.')) - return False - else: - log.debug('%s: Using pre-configured broker_version %s.', self, self.broker_version) - return True - - # Handle any immediate responses - self.recv(resolve_futures=True) - - # A connection error during blocking send could trigger close() which will reset the future - if self._api_versions_future is None: - return False - elif self._api_versions_future.failed(): - ex = self._api_versions_future.exception - if not isinstance(ex, Errors.KafkaConnectionError): - raise ex - return self._api_versions_future.succeeded() - - def _handle_api_versions_response(self, future, response): - error_type = Errors.for_code(response.error_code) - if error_type is not Errors.NoError: - if error_type is Errors.UnsupportedVersionError: - future.failure(error_type()) - self._api_versions_future = None - # Look for fallback ApiVersionsRequest if needed - # Starting with 2.4, brokers return the max supported version in api_keys - # Prior to that, just use version 0. - if self._api_versions_idx > 0: - for api_version_data in response.api_keys: - api_key, min_version, max_version = api_version_data[:3] - # If broker provides a lower max_version, skip to that - if api_key == response.API_KEY: - self._api_versions_idx = min(self._api_versions_idx, max_version) - break - else: - self._api_versions_idx = 0 - self.state = ConnectionStates.API_VERSIONS_SEND - self.config['state_change_callback'](self.node_id, self._sock, self) - return - self.close(error=error_type()) - return - api_versions = dict([ - (api_version_data[0], (api_version_data[1], api_version_data[2])) - for api_version_data in response.api_keys - ]) - discovered_bvd = BrokerVersionData(api_versions=api_versions) - if self.broker_version_data is None: - self.broker_version_data = discovered_bvd - log.info('%s: Broker version identified as %s.', self, '.'.join(map(str, self.broker_version_data.broker_version))) - elif discovered_bvd < self.broker_version_data: - log.warning('%s: Broker version identified as %s. Ignoring pre-configured broker_version %s.', self, - discovered_bvd.broker_version, self.broker_version_data.broker_version) - self.broker_version_data = discovered_bvd - else: - log.debug('%s: Using pre-configured broker_version %s.', self, self.broker_version) - future.success(self.broker_version) - self.connect() - - def _handle_api_versions_failure(self, future, ex): - future.failure(ex) - # If we have VERSION_CHECKS fallback enabled, disable api versions - if self._check_version_idx is not None: - self._api_versions_idx = None - # Otherwise, we'll just keep repeating api versions request on reconnect - # after failure connection is closed, so state should already be DISCONNECTED - - def _handle_check_version_response(self, future, version, _response): - log.info('%s: Broker version identified as %s', self, '.'.join(map(str, version))) - log.info('Set configuration api_version=%s to skip auto' - ' check_version requests on startup', version) - self.broker_version_data = BrokerVersionData(version) - future.success(version) - self.connect() - - def _handle_check_version_failure(self, future, ex): - future.failure(ex) - self._check_version_idx += 1 - # after failure connection is closed, so state should already be DISCONNECTED - - def _sasl_handshake_version(self): - if self.broker_version_data is None: - raise RuntimeError('broker_version not set') - if SaslHandshakeRequest[0].API_KEY not in self.broker_version_data.api_versions: - raise Errors.UnsupportedVersionError('SaslHandshake') - - # Build a SaslHandshakeRequest message - min_version, max_version = self.broker_version_data.api_versions[SaslHandshakeRequest[0].API_KEY] - if min_version > 1: - raise Errors.UnsupportedVersionError('SaslHandshake %s' % min_version) - return min(max_version, 1) - - def _try_authenticate(self): - if self._sasl_auth_future is None: - version = self._sasl_handshake_version() - request = SaslHandshakeRequest[version](self.config['sasl_mechanism']) - future = Future() - sasl_response = self._send(request, blocking=True) - sasl_response.add_callback(self._handle_sasl_handshake_response, future) - sasl_response.add_errback(lambda f, e: f.failure(e), future) - self._sasl_auth_future = future - - for r, f in self.recv(): - f.success(r) - - # A connection error could trigger close() which will reset the future - if self._sasl_auth_future is None: - return False - elif self._sasl_auth_future.failed(): - ex = self._sasl_auth_future.exception - if not isinstance(ex, Errors.KafkaConnectionError): - raise ex # pylint: disable-msg=raising-bad-type - return self._sasl_auth_future.succeeded() - - def _handle_sasl_handshake_response(self, future, response): - error_type = Errors.for_code(response.error_code) - if error_type is not Errors.NoError: - error = error_type(self) - self.close(error=error) - return future.failure(error_type(self)) - - if self.config['sasl_mechanism'] not in response.mechanisms: - future.failure( - Errors.UnsupportedSaslMechanismError( - 'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s' - % (self.config['sasl_mechanism'], response.mechanisms))) - else: - self._sasl_authenticate(future) - - assert future.is_done, 'SASL future not complete after mechanism processing!' - if future.failed(): - self.close(error=future.exception) - else: - self.connect() - - def _send_bytes(self, data): - """Send some data via non-blocking IO - - Note: this method is not synchronized internally; you should - always hold the _lock before calling - - Returns: number of bytes - Raises: socket exception - """ - total_sent = 0 - while total_sent < len(data): - try: - sent_bytes = self._sock.send(data[total_sent:]) - total_sent += sent_bytes - except (SSLWantReadError, SSLWantWriteError): - break - except (ConnectionError, TimeoutError) as e: - raise - except BlockingIOError: - break - return total_sent - - def _send_bytes_blocking(self, data): - self._sock.setblocking(True) - self._sock.settimeout(self.config['request_timeout_ms'] / 1000) - total_sent = 0 - try: - while total_sent < len(data): - sent_bytes = self._sock.send(data[total_sent:]) - total_sent += sent_bytes - if total_sent != len(data): - raise ConnectionError('Buffer overrun during socket send') - return total_sent - finally: - self._sock.settimeout(0.0) - self._sock.setblocking(False) - - def _recv_bytes_blocking(self, n): - self._sock.setblocking(True) - self._sock.settimeout(self.config['request_timeout_ms'] / 1000) - try: - data = b'' - while len(data) < n: - fragment = self._sock.recv(n - len(data)) - if not fragment: - raise ConnectionError('Connection reset during recv') - data += fragment - return data - finally: - self._sock.settimeout(0.0) - self._sock.setblocking(False) - - def _send_sasl_authenticate(self, sasl_auth_bytes): - version = self._sasl_handshake_version() - if version == 1: - request = SaslAuthenticateRequest[0](sasl_auth_bytes) - self._send(request, blocking=True) - else: - log.debug('%s: Sending %d raw sasl auth bytes to server', self, len(sasl_auth_bytes)) - try: - self._send_bytes_blocking(Int32.encode(len(sasl_auth_bytes)) + sasl_auth_bytes) - except (ConnectionError, TimeoutError) as e: - log.exception("%s: Error sending sasl auth bytes to server", self) - err = Errors.KafkaConnectionError("%s: %s" % (self, e)) - self.close(error=err) - - def _recv_sasl_authenticate(self): - version = self._sasl_handshake_version() - # GSSAPI mechanism does not get a final recv in old non-framed mode - if version == 0 and self._sasl_mechanism.is_done(): - return b'' - - try: - data = self._recv_bytes_blocking(4) - nbytes = Int32.decode(io.BytesIO(data)) - data += self._recv_bytes_blocking(nbytes) - except (ConnectionError, TimeoutError) as e: - log.exception("%s: Error receiving sasl auth bytes from server", self) - err = Errors.KafkaConnectionError("%s: %s" % (self, e)) - self.close(error=err) - return - - if version == 1: - ((correlation_id, response),) = self._protocol.receive_bytes(data) - (future, timestamp, _timeout) = self.in_flight_requests.pop(correlation_id) - latency_ms = (time.monotonic() - timestamp) * 1000 - if self._sensors: - self._sensors.request_time.record(latency_ms) - log.debug('%s: Response %d (%s ms): %s', self, correlation_id, latency_ms, response) - - error_type = Errors.for_code(response.error_code) - if error_type is not Errors.NoError: - log.error("%s: SaslAuthenticate error: %s (%s)", - self, error_type.__name__, response.error_message) - self.close(error=error_type(response.error_message)) - return - return response.auth_bytes - else: - # unframed bytes w/ SaslHandhake v0 - log.debug('%s: Received %d raw sasl auth bytes from server', self, nbytes) - return data[4:] - - def _sasl_authenticate(self, future): - while not self._sasl_mechanism.is_done(): - send_token = self._sasl_mechanism.auth_bytes() - self._send_sasl_authenticate(send_token) - if not self._can_send_recv(): - return future.failure(Errors.KafkaConnectionError("%s: Connection failure during Sasl Authenticate" % self)) - - recv_token = self._recv_sasl_authenticate() - if recv_token is None: - return future.failure(Errors.KafkaConnectionError("%s: Connection failure during Sasl Authenticate" % self)) - else: - self._sasl_mechanism.receive(recv_token) - - if self._sasl_mechanism.is_authenticated(): - log.info('%s: %s', self, self._sasl_mechanism.auth_details()) - return future.success(True) - else: - return future.failure(Errors.SaslAuthenticationFailedError('Failed to authenticate via SASL %s' % self.config['sasl_mechanism'])) - - def blacked_out(self): - """ - Return true if we are disconnected from the given node and can't - re-establish a connection yet - """ - if self.state is ConnectionStates.DISCONNECTED: - return self.connection_delay() > 0 - return False - - def throttled(self): - """ - Return True if we are connected but currently throttled. - """ - if self.state is not ConnectionStates.CONNECTED: - return False - return self.throttle_delay() > 0 - - def throttle_delay(self): - """ - Return the number of milliseconds to wait until connection is no longer throttled. - """ - if self._throttle_time is not None: - remaining_ms = (self._throttle_time - time.monotonic()) * 1000 - if remaining_ms > 0: - return remaining_ms - else: - self._throttle_time = None - return 0 - return 0 - - def connection_delay(self): - """ - Return the number of milliseconds to wait, based on the connection - state, before attempting to send data. When connecting or disconnected, - this respects the reconnect backoff time. When connected, returns a very - large number to handle slow/stalled connections. - """ - if self.disconnected() or self.connecting(): - if len(self._gai) > 0: - return 0 - elif self.config["socks5_proxy"] and Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]): - return 0 - else: - time_waited = time.monotonic() - self.last_attempt - return max(self._reconnect_backoff - time_waited, 0) * 1000 - else: - # When connecting or connected, we should be able to delay - # indefinitely since other events (connection or data acked) will - # cause a wakeup once data can be sent. - return float('inf') - - def connected(self): - """Return True iff socket is connected.""" - return self.state is ConnectionStates.CONNECTED - - def connecting(self): - """Returns True if still connecting (this may encompass several - different states, such as SSL handshake, authorization, etc).""" - return self.state in (ConnectionStates.CONNECTING, - ConnectionStates.HANDSHAKE, - ConnectionStates.AUTHENTICATING, - ConnectionStates.API_VERSIONS_SEND, - ConnectionStates.API_VERSIONS_RECV) - - def initializing(self): - """Returns True if socket is connected but full connection is not complete. - During this time the connection may send api requests to the broker to - check api versions and perform SASL authentication.""" - return self.state in (ConnectionStates.AUTHENTICATING, - ConnectionStates.API_VERSIONS_SEND, - ConnectionStates.API_VERSIONS_RECV) - - def disconnected(self): - """Return True iff socket is closed""" - return self.state is ConnectionStates.DISCONNECTED - - def connect_failed(self): - """Return True iff connection attempt failed after attempting all dns records""" - return self.disconnected() and self.last_attempt >= 0 and len(self._gai) == 0 - - def _reset_reconnect_backoff(self): - self._failures = 0 - self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0 - - def _reconnect_jitter_pct(self): - return uniform(0.8, 1.2) - - def _update_reconnect_backoff(self): - # Do not mark as failure if there are more dns entries available to try - if len(self._gai) > 0: - return - if self.config['reconnect_backoff_max_ms'] > self.config['reconnect_backoff_ms']: - self._failures += 1 - self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1) - self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms']) - self._reconnect_backoff *= self._reconnect_jitter_pct() - self._reconnect_backoff /= 1000.0 - log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures) - - def _close_socket(self): - if hasattr(self, '_sock') and self._sock is not None: - self._sock.close() - self._sock = None - - def __del__(self): - self._close_socket() - - def close(self, error=None): - """Close socket and fail all in-flight-requests. - - Arguments: - error (Exception, optional): pending in-flight-requests - will be failed with this exception. - Default: kafka.errors.KafkaConnectionError. - """ - if self.state is ConnectionStates.DISCONNECTED: - return - with self._lock: - if self.state is ConnectionStates.DISCONNECTED: - return - log.log(logging.ERROR if error else logging.INFO, '%s: Closing connection. %s', self, error or '') - if error: - self._update_reconnect_backoff() - self._api_versions_future = None - self._sasl_auth_future = None - self._init_sasl_mechanism() - self._protocol = self._new_protocol_parser() - self._send_buffer = b'' - if error is None: - error = Errors.Cancelled(str(self)) - ifrs = list(self.in_flight_requests.items()) - self.in_flight_requests.clear() - self.state = ConnectionStates.DISCONNECTED - # To avoid race conditions and/or deadlocks - # keep a reference to the socket but leave it - # open until after the state_change_callback - # This should give clients a change to deregister - # the socket fd from selectors cleanly. - sock = self._sock - self._sock = None - self._socks5_proxy = None - - # drop lock before state change callback and processing futures - self.config['state_change_callback'](self.node_id, sock, self) - if sock: - sock.close() - for (_correlation_id, (future, _timestamp, _timeout)) in ifrs: - future.failure(error) - - def _can_send_recv(self): - """Return True iff socket is ready for requests / responses""" - return self.connected() or self.initializing() - - def send(self, request, blocking=True, request_timeout_ms=None): - """Queue request for async network send, return Future() - - Arguments: - request (Request): kafka protocol request object to send. - - Keyword Arguments: - blocking (bool, optional): Whether to immediately send via - blocking socket I/O. Default: True. - request_timeout_ms: Custom timeout in milliseconds for request. - Default: None (uses value from connection configuration) - - Returns: future - """ - future = Future() - if self.connecting(): - return future.failure(Errors.NodeNotReadyError(str(self))) - elif not self.connected(): - return future.failure(Errors.KafkaConnectionError(str(self))) - elif not self.can_send_more(): - # very small race here, but prefer it over breaking abstraction to check self._throttle_time - if self.throttled(): - return future.failure(Errors.ThrottlingQuotaExceededError(str(self))) - return future.failure(Errors.TooManyInFlightRequests(str(self))) - return self._send(request, blocking=blocking, request_timeout_ms=request_timeout_ms) - - def _send(self, request, blocking=True, request_timeout_ms=None): - if request.API_VERSION is None: - request.API_VERSION = self.broker_version_data.api_version(request) - request_timeout_ms = request_timeout_ms or self.config['request_timeout_ms'] - future = Future() - with self._lock: - if not self._can_send_recv(): - # In this case, since we created the future above, - # we know there are no callbacks/errbacks that could fire w/ - # lock. So failing + returning inline should be safe - return future.failure(Errors.NodeNotReadyError(str(self))) - - correlation_id = self._protocol.send_request(request) - - log.debug('%s: Request %d (timeout_ms %s): %s', self, correlation_id, request_timeout_ms, request) - if request.expect_response(): - assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!' - sent_time = time.monotonic() - timeout_at = sent_time + (request_timeout_ms / 1000) - self.in_flight_requests[correlation_id] = (future, sent_time, timeout_at) - else: - future.success(None) - - # Attempt to replicate behavior from prior to introduction of - # send_pending_requests() / async sends - if blocking: - self.send_pending_requests() - - return future - - def send_pending_requests(self): - """Attempts to send pending requests messages via blocking IO - If all requests have been sent, return True - Otherwise, if the socket is blocked and there are more bytes to send, - return False. - """ - try: - with self._lock: - if not self._can_send_recv(): - return False - data = self._protocol.send_bytes() - total_bytes = self._send_bytes_blocking(data) - - if self._sensors: - self._sensors.bytes_sent.record(total_bytes) - return True - - except (ConnectionError, TimeoutError) as e: - log.exception("%s: Error sending request data", self) - error = Errors.KafkaConnectionError("%s: %s" % (self, e)) - self.close(error=error) - return False - - def send_pending_requests_v2(self): - """Attempts to send pending requests messages via non-blocking IO - If all requests have been sent, return True - Otherwise, if the socket is blocked and there are more bytes to send, - return False. - """ - try: - with self._lock: - if not self._can_send_recv(): - return False - - # _protocol.send_bytes returns encoded requests to send - # we send them via _send_bytes() - # and hold leftover bytes in _send_buffer - if not self._send_buffer: - self._send_buffer = self._protocol.send_bytes() - - total_bytes = 0 - if self._send_buffer: - total_bytes = self._send_bytes(self._send_buffer) - self._send_buffer = self._send_buffer[total_bytes:] - - # If all data was sent, we need to get the new data from the protocol now, otherwise - # this function would return True, indicating that there are no more pending - # requests. This could cause the calling thread to wait indefinitely as it won't - # know that there is still buffered data to send. - if not self._send_buffer: - self._send_buffer = self._protocol.send_bytes() - - if self._sensors: - self._sensors.bytes_sent.record(total_bytes) - # Return True iff send buffer is empty - return len(self._send_buffer) == 0 - - except (ConnectionError, TimeoutError, Exception) as e: - log.exception("%s: Error sending request data", self) - error = Errors.KafkaConnectionError("%s: %s" % (self, e)) - self.close(error=error) - return False - - def _maybe_throttle(self, response): - throttle_time_ms = getattr(response, 'throttle_time_ms', 0) - if self._sensors: - self._sensors.throttle_time.record(throttle_time_ms) - if not throttle_time_ms: - if self._throttle_time is not None: - self._throttle_time = None - return - # Client side throttling enabled in v2.0 brokers - # prior to that throttling (if present) was managed broker-side - if self.broker_version_data is not None and self.broker_version_data.broker_version >= (2, 0): - throttle_time = time.monotonic() + throttle_time_ms / 1000 - self._throttle_time = max(throttle_time, self._throttle_time or 0) - log.warning("%s: %s throttled by broker (%d ms)", self, - response.__class__.__name__, throttle_time_ms) - - def can_send_more(self): - """Check for throttling / quota violations and max in-flight-requests""" - if self.throttle_delay() > 0: - return False - max_ifrs = self.config['max_in_flight_requests_per_connection'] - return len(self.in_flight_requests) < max_ifrs - - def recv(self, responses=None, resolve_futures=False): - """Non-blocking network receive. - - Return list of (response, future) tuples - """ - if responses is None: - responses = self._recv() - if not responses and self.requests_timed_out(): - timed_out = self.timed_out_ifrs() - timeout_ms = (timed_out[0][2] - timed_out[0][1]) * 1000 - log.warning('%s: timed out after %s ms. Closing connection.', - self, timeout_ms) - self.close(error=Errors.RequestTimedOutError( - 'Request timed out after %s ms' % - timeout_ms)) - return () - - # augment responses w/ correlation_id, future, and timestamp - for i, (correlation_id, response) in enumerate(responses): - try: - with self._lock: - (future, timestamp, _timeout) = self.in_flight_requests.pop(correlation_id) - except KeyError: - self.close(Errors.KafkaConnectionError('Received unrecognized correlation id')) - return () - latency_ms = (time.monotonic() - timestamp) * 1000 - if self._sensors: - self._sensors.request_time.record(latency_ms) - - log.debug('%s: Response %d (%s ms): %s', self, correlation_id, latency_ms, response) - self._maybe_throttle(response) - responses[i] = (response, future) - - if resolve_futures: - for r, f in responses: - f.success(r) - return responses - - def _recv(self): - """Take all available bytes from socket, return list of any responses from parser""" - recvd = [] - err = None - with self._lock: - if not self._can_send_recv(): - log.warning('%s: cannot recv: socket not connected', self) - return () - - while len(recvd) < self.config['sock_chunk_buffer_count']: - try: - data = self._sock.recv(self.config['sock_chunk_bytes']) - # We expect socket.recv to raise an exception if there are no - # bytes available to read from the socket in non-blocking mode. - # but if the socket is disconnected, we will get empty data - # without an exception raised - if not data: - log.error('%s: socket disconnected', self) - err = Errors.KafkaConnectionError('socket disconnected') - break - else: - recvd.append(data) - - except (SSLWantReadError, SSLWantWriteError): - break - except (ConnectionError, TimeoutError) as e: - log.exception('%s: Error receiving network data' - ' closing socket', self) - err = Errors.KafkaConnectionError(e) - break - except BlockingIOError: - break - - # Only process bytes if there was no connection exception - if err is None: - recvd_data = b''.join(recvd) - if self._sensors: - self._sensors.bytes_received.record(len(recvd_data)) - - # We need to keep the lock through protocol receipt - # so that we ensure that the processed byte order is the - # same as the received byte order - try: - return self._protocol.receive_bytes(recvd_data) - except Errors.KafkaProtocolError as e: - err = e - - self.close(error=err) - return () - - def requests_timed_out(self): - return self.next_ifr_request_timeout_ms() == 0 - - def timed_out_ifrs(self): - now = time.monotonic() - ifrs = sorted(self.in_flight_requests.values(), reverse=True, key=lambda ifr: ifr[2]) - return list(filter(lambda ifr: ifr[2] <= now, ifrs)) - - def next_ifr_request_timeout_ms(self): - with self._lock: - if self.in_flight_requests: - def get_timeout(v): - return v[2] - next_timeout = min(map(get_timeout, - self.in_flight_requests.values())) - return max(0, (next_timeout - time.monotonic()) * 1000) - else: - return float('inf') - - def __str__(self): - return "" % ( - self.config['client_id'], self.node_id, self.broker_version, - self.host, self.port, '<-%d' % self._sock.getsockname()[1] if self._sock is not None else '', - self.state, AFI_NAMES[self._sock_afi], self._sock_addr) - - -class BrokerConnectionMetrics: - def __init__(self, metrics, metric_group_prefix, node_id): - self.metrics = metrics - - # Any broker may have registered summary metrics already - # but if not, we need to create them so we can set as parents below - all_conns_transferred = metrics.get_sensor('bytes-sent-received') - if not all_conns_transferred: - metric_group_name = metric_group_prefix + '-metrics' - - bytes_transferred = metrics.sensor('bytes-sent-received') - bytes_transferred.add(metrics.metric_name( - 'network-io-rate', metric_group_name, - 'The average number of network operations (reads or writes) on all' - ' connections per second.'), Rate(sampled_stat=Count())) - - bytes_sent = metrics.sensor('bytes-sent', - parents=[bytes_transferred]) - bytes_sent.add(metrics.metric_name( - 'outgoing-byte-rate', metric_group_name, - 'The average number of outgoing bytes sent per second to all' - ' servers.'), Rate()) - bytes_sent.add(metrics.metric_name( - 'request-rate', metric_group_name, - 'The average number of requests sent per second.'), - Rate(sampled_stat=Count())) - bytes_sent.add(metrics.metric_name( - 'request-size-avg', metric_group_name, - 'The average size of all requests in the window.'), Avg()) - bytes_sent.add(metrics.metric_name( - 'request-size-max', metric_group_name, - 'The maximum size of any request sent in the window.'), Max()) - - bytes_received = metrics.sensor('bytes-received', - parents=[bytes_transferred]) - bytes_received.add(metrics.metric_name( - 'incoming-byte-rate', metric_group_name, - 'Bytes/second read off all sockets'), Rate()) - bytes_received.add(metrics.metric_name( - 'response-rate', metric_group_name, - 'Responses received sent per second.'), - Rate(sampled_stat=Count())) - - request_latency = metrics.sensor('request-latency') - request_latency.add(metrics.metric_name( - 'request-latency-avg', metric_group_name, - 'The average request latency in ms.'), - Avg()) - request_latency.add(metrics.metric_name( - 'request-latency-max', metric_group_name, - 'The maximum request latency in ms.'), - Max()) - - throttle_time = metrics.sensor('throttle-time') - throttle_time.add(metrics.metric_name( - 'throttle-time-avg', metric_group_name, - 'The average throttle time in ms.'), - Avg()) - throttle_time.add(metrics.metric_name( - 'throttle-time-max', metric_group_name, - 'The maximum throttle time in ms.'), - Max()) - - # if one sensor of the metrics has been registered for the connection, - # then all other sensors should have been registered; and vice versa - node_str = 'node-{0}'.format(node_id) - node_sensor = metrics.get_sensor(node_str + '.bytes-sent') - if not node_sensor: - metric_group_name = metric_group_prefix + '-node-metrics.' + node_str - - bytes_sent = metrics.sensor( - node_str + '.bytes-sent', - parents=[metrics.get_sensor('bytes-sent')]) - bytes_sent.add(metrics.metric_name( - 'outgoing-byte-rate', metric_group_name, - 'The average number of outgoing bytes sent per second.'), - Rate()) - bytes_sent.add(metrics.metric_name( - 'request-rate', metric_group_name, - 'The average number of requests sent per second.'), - Rate(sampled_stat=Count())) - bytes_sent.add(metrics.metric_name( - 'request-size-avg', metric_group_name, - 'The average size of all requests in the window.'), - Avg()) - bytes_sent.add(metrics.metric_name( - 'request-size-max', metric_group_name, - 'The maximum size of any request sent in the window.'), - Max()) - - bytes_received = metrics.sensor( - node_str + '.bytes-received', - parents=[metrics.get_sensor('bytes-received')]) - bytes_received.add(metrics.metric_name( - 'incoming-byte-rate', metric_group_name, - 'Bytes/second read off node-connection socket'), - Rate()) - bytes_received.add(metrics.metric_name( - 'response-rate', metric_group_name, - 'The average number of responses received per second.'), - Rate(sampled_stat=Count())) - - request_time = metrics.sensor( - node_str + '.latency', - parents=[metrics.get_sensor('request-latency')]) - request_time.add(metrics.metric_name( - 'request-latency-avg', metric_group_name, - 'The average request latency in ms.'), - Avg()) - request_time.add(metrics.metric_name( - 'request-latency-max', metric_group_name, - 'The maximum request latency in ms.'), - Max()) - - throttle_time = metrics.sensor( - node_str + '.throttle', - parents=[metrics.get_sensor('throttle-time')]) - throttle_time.add(metrics.metric_name( - 'throttle-time-avg', metric_group_name, - 'The average throttle time in ms.'), - Avg()) - throttle_time.add(metrics.metric_name( - 'throttle-time-max', metric_group_name, - 'The maximum throttle time in ms.'), - Max()) - - - self.bytes_sent = metrics.sensor(node_str + '.bytes-sent') - self.bytes_received = metrics.sensor(node_str + '.bytes-received') - self.request_time = metrics.sensor(node_str + '.latency') - self.throttle_time = metrics.sensor(node_str + '.throttle') - - -def _address_family(address): - """ - Attempt to determine the family of an address (or hostname) - - :return: either socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC if the address family - could not be determined - """ - if address.startswith('[') and address.endswith(']'): - return socket.AF_INET6 - for af in (socket.AF_INET, socket.AF_INET6): - try: - socket.inet_pton(af, address) - return af - except (ValueError, AttributeError, socket.error): - continue - return socket.AF_UNSPEC - - -def get_ip_port_afi(host_and_port_str): - """ - Parse the IP and port from a string in the format of: - - * host_or_ip <- Can be either IPv4 address literal or hostname/fqdn - * host_or_ipv4:port <- Can be either IPv4 address literal or hostname/fqdn - * [host_or_ip] <- IPv6 address literal - * [host_or_ip]:port. <- IPv6 address literal - - .. note:: IPv6 address literals with ports *must* be enclosed in brackets - - .. note:: If the port is not specified, default will be returned. - - :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC - """ - host_and_port_str = host_and_port_str.strip() - if host_and_port_str.startswith('['): - af = socket.AF_INET6 - host, rest = host_and_port_str[1:].split(']') - if rest: - port = int(rest[1:]) - else: - port = DEFAULT_KAFKA_PORT - return host, port, af - else: - if ':' not in host_and_port_str: - af = _address_family(host_and_port_str) - return host_and_port_str, DEFAULT_KAFKA_PORT, af - else: - # now we have something with a colon in it and no square brackets. It could be - # either an IPv6 address literal (e.g., "::1") or an IP:port pair or a host:port pair - try: - # if it decodes as an IPv6 address, use that - socket.inet_pton(socket.AF_INET6, host_and_port_str) - return host_and_port_str, DEFAULT_KAFKA_PORT, socket.AF_INET6 - except AttributeError: - log.warning('socket.inet_pton not available on this platform.' - ' consider `pip install win_inet_pton`') - pass - except (ValueError, socket.error): - # it's a host:port pair - pass - host, port = host_and_port_str.rsplit(':', 1) - port = int(port) - - af = _address_family(host) - return host, port, af - - -def is_inet_4_or_6(gai): - """Given a getaddrinfo struct, return True iff ipv4 or ipv6""" - return gai[0] in (socket.AF_INET, socket.AF_INET6) - - -def dns_lookup(host, port, afi=socket.AF_UNSPEC): - """Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)""" - # XXX: all DNS functions in Python are blocking. If we really - # want to be non-blocking here, we need to use a 3rd-party - # library like python-adns, or move resolution onto its - # own thread. This will be subject to the default libc - # name resolution timeout (5s on most Linux boxes) - try: - return list(filter(is_inet_4_or_6, - socket.getaddrinfo(host, port, afi, - socket.SOCK_STREAM))) - except socket.gaierror as ex: - log.warning('DNS lookup failed for %s:%d,' - ' exception was %s. Is your' - ' advertised.listeners (called' - ' advertised.host.name before Kafka 9)' - ' correct and resolvable?', - host, port, ex) - return [] diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 15606aa59..6cad5602b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -271,7 +271,7 @@ class KafkaConsumer: sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer token provider instance. Default: None socks5_proxy (str): Socks5 proxy URL. Default: None - kafka_client (callable): Custom class / callable for creating KafkaClient instances + kafka_client (callable): Custom class / callable for creating KafkaNetClient instances Note: Configuration parameters are described in more detail at diff --git a/kafka/net/metrics.py b/kafka/net/metrics.py index 727182384..040a9a4c6 100644 --- a/kafka/net/metrics.py +++ b/kafka/net/metrics.py @@ -1,7 +1,4 @@ """Metrics for kafka.net connection manager and connections. - -Mirrors the metrics from kafka/client_async.py (KafkaClientMetrics) -and kafka/conn.py (BrokerConnectionMetrics). """ import logging diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 291efccdd..1080f26cb 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -370,7 +370,7 @@ class KafkaProducer: sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer token provider instance. Default: None socks5_proxy (str): Socks5 proxy URL. Default: None - kafka_client (callable): Custom class / callable for creating KafkaClient instances + kafka_client (callable): Custom class / callable for creating KafkaNetClient instances Note: Configuration parameters are described in more detail at diff --git a/test/conftest.py b/test/conftest.py index 946078955..4917e7f8e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,5 +1,9 @@ import pytest +from .mock_broker import MockBroker +from kafka.net.compat import KafkaNetClient +from kafka.protocol.metadata import MetadataResponse + @pytest.fixture def metrics(): @@ -13,40 +17,35 @@ def metrics(): @pytest.fixture -def conn(mocker): - """Return a connection mocker fixture""" - from kafka.conn import ConnectionStates - from kafka.future import Future - from kafka.protocol.metadata import MetadataResponse - conn = mocker.patch('kafka.client_async.BrokerConnection') - conn.return_value = conn - conn.state = ConnectionStates.CONNECTED - conn.send.return_value = Future().success( - MetadataResponse[0]( - [(0, 'foo', 12), (1, 'bar', 34)], # brokers - [])) # topics - conn.connection_delay.return_value = 0 - conn.blacked_out.return_value = False - conn.next_ifr_request_timeout_ms.return_value = float('inf') - def _set_conn_state(state): - conn.state = state - return state - conn._set_conn_state = _set_conn_state - conn.connect.side_effect = lambda: conn.state - conn.connecting = lambda: conn.state in (ConnectionStates.CONNECTING, - ConnectionStates.HANDSHAKE) - conn.connected = lambda: conn.state is ConnectionStates.CONNECTED - conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED - return conn +def broker(request): + """ + To set broker version, use indirect parametrize: + + @pytest.mark.parametrize("broker", [(2, 3)], indirect=True) + """ + broker_version = getattr(request, 'param', (4, 2)) + return MockBroker(broker_version=broker_version) @pytest.fixture -def client(conn, mocker): - from kafka.client_async import KafkaClient +def multi_broker(broker): + Broker = MetadataResponse.MetadataResponseBroker + broker.set_metadata(brokers=[ + Broker(node_id=0, host=broker.host, port=broker.port, rack=None), + Broker(node_id=1, host=broker.host, port=broker.port, rack=None), + ]) + return broker - cli = KafkaClient(api_version=(0, 9)) - mocker.patch.object(cli, '_init_connect', return_value=True) + +@pytest.fixture +def client(broker): + cli = KafkaNetClient( + bootstrap_servers='%s:%d' % (broker.host, broker.port), + api_version=broker.broker_version, + request_timeout_ms=5000, + ) + broker.attach(cli._manager) try: yield cli finally: - cli._close() + cli.close() diff --git a/test/consumer/test_coordinator.py b/test/consumer/test_coordinator.py index 9debee423..b17a1f899 100644 --- a/test/consumer/test_coordinator.py +++ b/test/consumer/test_coordinator.py @@ -3,7 +3,6 @@ import pytest -from kafka.client_async import KafkaClient from kafka.consumer.subscription_state import SubscriptionState, ConsumerRebalanceListener from kafka.coordinator.assignors.abstract import ( ConsumerProtocolSubscription, ConsumerProtocolAssignment, @@ -43,8 +42,8 @@ def test_init(client, coordinator): @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) -def test_autocommit_enable_api_version(conn, metrics, api_version): - coordinator = ConsumerCoordinator(KafkaClient(api_version=api_version), +def test_autocommit_enable_api_version(client, metrics, api_version): + coordinator = ConsumerCoordinator(client, SubscriptionState(), metrics=metrics, enable_auto_commit=True, @@ -90,8 +89,8 @@ def test_group_protocols(coordinator): @pytest.mark.parametrize('api_version', [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) -def test_pattern_subscription(conn, metrics, api_version): - coordinator = ConsumerCoordinator(KafkaClient(api_version=api_version), +def test_pattern_subscription(client, metrics, api_version): + coordinator = ConsumerCoordinator(client, SubscriptionState(), metrics=metrics, api_version=api_version, @@ -384,12 +383,11 @@ def test_commit_offsets_sync(mocker, coordinator, offsets): ((0, 9), 'foobar', True, None, True, True, False, False), ((0, 9), None, True, None, False, False, True, False), ]) -def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, +def test_maybe_auto_commit_offsets_sync(mocker, client, api_version, group_id, enable, error, has_auto_commit, commit_offsets, warn, exc): mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning') mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') - client = KafkaClient(api_version=api_version) coordinator = ConsumerCoordinator(client, SubscriptionState(), api_version=api_version, session_timeout_ms=30000, @@ -462,7 +460,7 @@ def test_send_offset_commit_request_fail(mocker, patched_coord, offsets): def test_send_offset_commit_request_versions(patched_coord, offsets, api_version, version): expect_node = 0 - patched_coord._client.broker_version_data = BrokerVersionData(api_version) + patched_coord._client._manager.broker_version_data = BrokerVersionData(api_version) patched_coord._send_offset_commit_request(offsets) (node, request), _ = patched_coord._client.send.call_args @@ -577,7 +575,7 @@ def test_send_offset_fetch_request_versions(patched_coord, partitions, api_version, version): # assuming fixture sets coordinator=0, least_loaded_node=1 expect_node = 0 - patched_coord._client.broker_version_data = BrokerVersionData(api_version) + patched_coord._client._manager.broker_version_data = BrokerVersionData(api_version) patched_coord._send_offset_fetch_request(partitions) (node, request), _ = patched_coord._client.send.call_args diff --git a/test/consumer/test_fetcher.py b/test/consumer/test_fetcher.py index e3c7058a2..cf72dc8a5 100644 --- a/test/consumer/test_fetcher.py +++ b/test/consumer/test_fetcher.py @@ -101,7 +101,7 @@ def build_fetch_offsets(request): ((0, 8, 2), 0) ]) def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): - fetcher._client.broker_version_data = BrokerVersionData(api_version) + fetcher._client._manager.broker_version_data = BrokerVersionData(api_version) mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) mocker.patch.object(fetcher._client, "ready", return_value=True) diff --git a/test/integration/conftest.py b/test/integration/conftest.py index fd7da01ce..93bcf3ce9 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -5,7 +5,7 @@ import pytest from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer -from kafka.client_async import KafkaClient +from kafka.net.compat import KafkaNetClient from kafka.util import TOPIC_LEGAL_CHARS, TOPIC_MAX_LENGTH, ensure_valid_topic_name from test.testutil import env_kafka_version, random_string from test.integration.fixtures import KafkaFixture, ZookeeperFixture, create_topics, client_params @@ -68,8 +68,8 @@ def factory(**broker_params): @pytest.fixture def kafka_client(kafka_broker, request): - """Return a KafkaClient fixture""" - client = KafkaClient(**client_params(kafka_broker, request.node.name)) + """Return a KafkaNetClient fixture""" + client = KafkaNetClient(**client_params(kafka_broker, request.node.name)) yield client client.close() diff --git a/test/integration/fixtures.py b/test/integration/fixtures.py index 56f31184a..61842e3d4 100644 --- a/test/integration/fixtures.py +++ b/test/integration/fixtures.py @@ -726,18 +726,16 @@ def get_api_versions(): k = KafkaFixture.instance(0) zk = k.zookeeper - from kafka.client_async import KafkaClient - client = KafkaClient(bootstrap_servers='localhost:{}'.format(k.port)) - client.check_version() - - from pprint import pprint - - print(client.get_api_versions()) + try: + from kafka.admin import KafkaAdminClient + client = KafkaAdminClient(bootstrap_servers='localhost:{}'.format(k.port)) + print(client.api_versions()) + client.close() - client.close() - k.close() - if zk: - zk.close() + finally: + k.close() + if zk: + zk.close() def run_brokers(args=()): diff --git a/test/integration/test_sasl_integration.py b/test/integration/test_sasl_integration.py index c74152a2c..f75f6eccf 100644 --- a/test/integration/test_sasl_integration.py +++ b/test/integration/test_sasl_integration.py @@ -7,7 +7,7 @@ from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer from kafka.admin import NewTopic -from kafka.client_async import KafkaClient +from kafka.net.compat import KafkaNetClient from kafka.protocol.metadata import MetadataRequest from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore from test.integration.fixtures import client_params, create_topics @@ -73,15 +73,11 @@ def test_client(request, sasl_kafka): topic_name = special_to_underscore(request.node.name + random_string(4)) create_topics(sasl_kafka, [topic_name], num_partitions=1) - client = KafkaClient(**client_params(sasl_kafka, 'client')) + client = KafkaNetClient(**client_params(sasl_kafka, 'client')) + client._manager.run(client._manager.bootstrap) request = MetadataRequest(topics=None, version=1) timeout_at = time.time() + 1 - while not client.is_ready(0): - client.maybe_connect(0) - client.poll(timeout_ms=100) - if time.time() > timeout_at: - raise RuntimeError("Couldn't connect to node 0") - future = client.send(0, request) + future = client.send(None, request) client.poll(future=future, timeout_ms=10000) if not future.is_done: raise RuntimeError("Couldn't fetch topic response from Broker.") diff --git a/test/integration/test_ssl_integration.py b/test/integration/test_ssl_integration.py index a2adebb96..57f2a0318 100644 --- a/test/integration/test_ssl_integration.py +++ b/test/integration/test_ssl_integration.py @@ -35,22 +35,8 @@ def test_ssl_handshake(self, ssl_kafka): assert ssock.version() is not None ssock.close() - def test_kafka_client_ssl(self, ssl_kafka): - """Test KafkaClient can connect and fetch metadata over SSL.""" - from kafka.client_async import KafkaClient - - client = KafkaClient( - bootstrap_servers='localhost:%d' % ssl_kafka.port, - security_protocol='SSL', - ssl_cafile=os.path.join(ssl_kafka.ssl_dir, 'ca-cert'), - ssl_check_hostname=False, - api_version=env_kafka_version(), - ) - assert client.broker_version_data - client.close() - def test_legacy_kafka_client_ssl(self, ssl_kafka): - """Test LegacyKafkaClient (kafka.net) can connect over SSL.""" + """Test KafkaNetClient (kafka.net) can connect over SSL.""" from kafka.net.compat import KafkaNetClient client = KafkaNetClient( diff --git a/test/producer/test_sender.py b/test/producer/test_sender.py index 918176cd9..b00a66066 100644 --- a/test/producer/test_sender.py +++ b/test/producer/test_sender.py @@ -8,7 +8,6 @@ import pytest -from kafka.client_async import KafkaClient from kafka.cluster import ClusterMetadata import kafka.errors as Errors from kafka.protocol.broker_version_data import BrokerVersionData @@ -103,7 +102,7 @@ def transaction_manager(): ((0, 8, 0), 0) ]) def test_produce_request(sender, api_version, produce_version): - sender._client.broker_version_data = BrokerVersionData(api_version) + sender._client._manager.broker_version_data = BrokerVersionData(api_version) magic = KafkaProducer.max_usable_produce_magic(api_version) batch = producer_batch(magic=magic) produce_request = sender._produce_request(0, 0, 0, [batch]) @@ -115,7 +114,7 @@ def test_produce_request(sender, api_version, produce_version): ((2, 1), 7), ]) def test_create_produce_requests(sender, api_version, produce_version): - sender._client.broker_version_data = BrokerVersionData(api_version) + sender._client._manager.broker_version_data = BrokerVersionData(api_version) tp = TopicPartition('foo', 0) magic = KafkaProducer.max_usable_produce_magic(api_version) batches_by_node = collections.defaultdict(list) diff --git a/test/protocol/test_api_compatibility.py b/test/protocol/test_api_compatibility.py index 62531e1e3..b9d7d26f9 100644 --- a/test/protocol/test_api_compatibility.py +++ b/test/protocol/test_api_compatibility.py @@ -293,8 +293,3 @@ def test_tagged_fields_retention(): assert decoded_unknown.unknown_tags is not None assert '_99' in decoded_unknown.unknown_tags assert decoded_unknown.unknown_tags['_99'] == b'\x01\x02\x03' - - -def test_new_class_len(): - # Used by kafka/client_async.py api_version() - assert len(NewApiVersionsRequest) == NewApiVersionsRequest.max_version + 1 diff --git a/test/test_client_async.py b/test/test_client_async.py deleted file mode 100644 index d157ac1d5..000000000 --- a/test/test_client_async.py +++ /dev/null @@ -1,363 +0,0 @@ -import selectors -import socket -import time - -import pytest - -from kafka.client_async import KafkaClient, IdleConnectionManager -from kafka.conn import ConnectionStates -import kafka.errors as Errors -from kafka.future import Future -from kafka.protocol.metadata import MetadataRequest, MetadataResponse -from kafka.protocol.producer import ProduceRequest - - -@pytest.fixture -def client_poll_mocked(mocker): - cli = KafkaClient(request_timeout_ms=9999999, - reconnect_backoff_ms=2222, - connections_max_idle_ms=float('inf'), - api_version=(0, 9)) - mocker.patch.object(cli, '_poll') - try: - yield cli - finally: - cli._close() - - -@pytest.fixture -def client_selector_mocked(mocker, conn): - client = KafkaClient(api_version=(0, 9)) - mocker.patch.object(client, '_selector') - client.poll(future=client.cluster.request_update()) - try: - yield client - finally: - client._close() - -def test_bootstrap(mocker, conn): - conn.state = ConnectionStates.CONNECTED - cli = KafkaClient(api_version=(2, 1)) - mocker.patch.object(cli, '_selector') - future = cli.cluster.request_update() - cli.poll(future=future) - - assert future.succeeded() - args, kwargs = conn.call_args - assert args == ('localhost', 9092, socket.AF_UNSPEC) - kwargs.pop('state_change_callback') - kwargs.pop('node_id') - kwargs.pop('broker_version_data') - assert kwargs == cli.config - conn.send.assert_called_once_with(MetadataRequest(topics=[]), blocking=False, request_timeout_ms=None) - assert cli._bootstrap_fails == 0 - assert cli.cluster.brokers() == list([MetadataResponse.MetadataResponseBroker(0, 'foo', 12, None), - MetadataResponse.MetadataResponseBroker(1, 'bar', 34, None)]) - - -def test_can_connect(client_selector_mocked, conn): - # Node is not in broker metadata - can't connect - assert not client_selector_mocked._can_connect(2) - - # Node is in broker metadata but not in _conns - assert 0 not in client_selector_mocked._conns - assert client_selector_mocked._can_connect(0) - - # Node is connected, can't reconnect - assert client_selector_mocked._init_connect(0) is True - assert not client_selector_mocked._can_connect(0) - - # Node is disconnected, can connect - client_selector_mocked._conns[0].state = ConnectionStates.DISCONNECTED - assert client_selector_mocked._can_connect(0) - - # Node is disconnected, but blacked out - conn.blacked_out.return_value = True - assert not client_selector_mocked._can_connect(0) - - -def test_init_connect(client_selector_mocked, conn): - # Node not in metadata, return False - assert not client_selector_mocked._init_connect(2) - - # New node_id creates a conn object - assert 0 not in client_selector_mocked._conns - conn.state = ConnectionStates.DISCONNECTED - conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING) - assert client_selector_mocked._init_connect(0) is True - assert client_selector_mocked._conns[0] is conn - - -def test_conn_state_change(client_selector_mocked, conn): - sel = client_selector_mocked._selector - - node_id = 0 - client_selector_mocked._conns[node_id] = conn - conn.state = ConnectionStates.CONNECTING - sock = conn._sock - client_selector_mocked._conn_state_change(node_id, sock, conn) - assert node_id in client_selector_mocked._connecting - sel.register.assert_called_with(sock, selectors.EVENT_WRITE, conn) - - conn.state = ConnectionStates.CONNECTED - client_selector_mocked._conn_state_change(node_id, sock, conn) - assert node_id not in client_selector_mocked._connecting - sel.modify.assert_called_with(sock, selectors.EVENT_READ, conn) - - # Failure to connect should trigger metadata update - assert client_selector_mocked.cluster._need_update is False - conn.state = ConnectionStates.DISCONNECTED - client_selector_mocked._conn_state_change(node_id, sock, conn) - assert node_id not in client_selector_mocked._connecting - assert client_selector_mocked.cluster._need_update is True - sel.unregister.assert_called_with(sock) - - conn.state = ConnectionStates.CONNECTING - client_selector_mocked._conn_state_change(node_id, sock, conn) - assert node_id in client_selector_mocked._connecting - conn.state = ConnectionStates.DISCONNECTED - client_selector_mocked._conn_state_change(node_id, sock, conn) - assert node_id not in client_selector_mocked._connecting - - -def test_ready(mocker, client_selector_mocked, conn): - maybe_connect = mocker.patch.object(client_selector_mocked, 'maybe_connect') - node_id = 1 - client_selector_mocked.ready(node_id) - maybe_connect.assert_called_with(node_id) - - -def test_is_ready(client_selector_mocked, conn): - client_selector_mocked._init_connect(0) - client_selector_mocked._init_connect(1) - - # metadata refresh blocks ready nodes - assert client_selector_mocked.is_ready(0) - assert client_selector_mocked.is_ready(1) - client_selector_mocked.cluster.metadata_refresh_in_progress = True - assert not client_selector_mocked.is_ready(0) - assert not client_selector_mocked.is_ready(1) - - # requesting metadata update also blocks ready nodes - client_selector_mocked.cluster.metadata_refresh_in_progress = False - assert client_selector_mocked.is_ready(0) - assert client_selector_mocked.is_ready(1) - client_selector_mocked.cluster.request_update() - client_selector_mocked.cluster.config['retry_backoff_ms'] = 0 - assert not client_selector_mocked.cluster.metadata_refresh_in_progress - assert not client_selector_mocked.is_ready(0) - assert not client_selector_mocked.is_ready(1) - client_selector_mocked.cluster._need_update = False - - # if connection can't send more, not ready - assert client_selector_mocked.is_ready(0) - conn.can_send_more.return_value = False - assert not client_selector_mocked.is_ready(0) - conn.can_send_more.return_value = True - - # disconnected nodes, not ready - assert client_selector_mocked.is_ready(0) - conn.state = ConnectionStates.DISCONNECTED - assert not client_selector_mocked.is_ready(0) - - -def test_close(client_selector_mocked, conn): - call_count = conn.close.call_count - - # Unknown node - silent - client_selector_mocked.close(2) - call_count += 0 - assert conn.close.call_count == call_count - - # Single node close - client_selector_mocked._init_connect(0) - assert conn.close.call_count == call_count - client_selector_mocked.close(0) - call_count += 1 - assert conn.close.call_count == call_count - - # All node close - client_selector_mocked._init_connect(1) - client_selector_mocked.close() - # +2 close: node 1, node bootstrap (node 0 already closed) - call_count += 2 - assert conn.close.call_count == call_count - - -def test_is_disconnected(client_selector_mocked, conn): - # False if not connected yet - conn.state = ConnectionStates.DISCONNECTED - assert not client_selector_mocked.is_disconnected(0) - - client_selector_mocked._init_connect(0) - assert client_selector_mocked.is_disconnected(0) - - conn.state = ConnectionStates.CONNECTING - assert not client_selector_mocked.is_disconnected(0) - - conn.state = ConnectionStates.CONNECTED - assert not client_selector_mocked.is_disconnected(0) - - -def test_send(client_selector_mocked, conn): - # Send to unknown node => raises AssertionError - try: - client_selector_mocked.send(2, None) - assert False, 'Exception not raised' - except AssertionError: - pass - - # Send to disconnected node => NodeNotReady - conn.state = ConnectionStates.DISCONNECTED - f = client_selector_mocked.send(0, None) - assert f.failed() - assert isinstance(f.exception, Errors.NodeNotReadyError) - - conn.state = ConnectionStates.CONNECTED - client_selector_mocked._init_connect(0) - # ProduceRequest w/ 0 required_acks -> no response - request = ProduceRequest[0](0, 0, []) - assert request.expect_response() is False - ret = client_selector_mocked.send(0, request) - conn.send.assert_called_with(request, blocking=False, request_timeout_ms=None) - assert isinstance(ret, Future) - - request = MetadataRequest[0]([]) - client_selector_mocked.send(0, request) - conn.send.assert_called_with(request, blocking=False, request_timeout_ms=None) - - -def test_poll(mocker, client_poll_mocked): - metadata = mocker.patch.object(client_poll_mocked, '_maybe_refresh_metadata') - ifr_request_timeout = mocker.patch.object(client_poll_mocked, '_next_ifr_request_timeout_ms') - now = time.monotonic() - t = mocker.patch('time.monotonic') - t.return_value = now - - # metadata timeout wins - ifr_request_timeout.return_value = float('inf') - metadata.return_value = 1000 - client_poll_mocked.poll() - client_poll_mocked._poll.assert_called_with(1.0) - - # user timeout wins - client_poll_mocked.poll(timeout_ms=250) - client_poll_mocked._poll.assert_called_with(0.25) - - # ifr request timeout wins - ifr_request_timeout.return_value = 30000 - metadata.return_value = 1000000 - client_poll_mocked.poll() - client_poll_mocked._poll.assert_called_with(30.0) - - -def test__poll(): - pass - - -def test_in_flight_request_count(): - pass - - -def test_least_loaded_node(): - pass - - -def test_maybe_refresh_metadata_ttl(mocker, client_poll_mocked): - ttl = mocker.patch.object(client_poll_mocked.cluster, 'ttl') - ttl.return_value = 1234 - - client_poll_mocked.poll(timeout_ms=12345678) - client_poll_mocked._poll.assert_called_with(1.234) - - -def test_maybe_refresh_metadata_backoff(mocker, client_poll_mocked): - mocker.patch.object(client_poll_mocked, 'least_loaded_node', return_value=None) - mocker.patch.object(client_poll_mocked, 'least_loaded_node_refresh_ms', return_value=4321) - now = time.monotonic() - t = mocker.patch('time.monotonic') - t.return_value = now - - client_poll_mocked.poll(timeout_ms=12345678) - client_poll_mocked._poll.assert_called_with(4.321) - - -def test_maybe_refresh_metadata_in_progress(client_poll_mocked): - client_poll_mocked.cluster.metadata_refresh_in_progress = True - - client_poll_mocked.poll(timeout_ms=12345678) - client_poll_mocked._poll.assert_called_with(0.100) # retry_backoff_ms - - -def test_maybe_refresh_metadata_update(mocker, client_poll_mocked): - mocker.patch.object(client_poll_mocked, 'least_loaded_node', return_value='foobar') - mocker.patch.object(client_poll_mocked, '_can_send_request', return_value=True) - send = mocker.patch.object(client_poll_mocked, 'send') - client_poll_mocked.cluster.need_all_topic_metadata = True - - client_poll_mocked.poll(timeout_ms=12345678) - client_poll_mocked._poll.assert_called_with(9999.999) # request_timeout_ms - assert client_poll_mocked.cluster.metadata_refresh_in_progress - request = MetadataRequest(topics=MetadataRequest.ALL_TOPICS) - send.assert_called_once_with('foobar', request, wakeup=False) - - -def test_maybe_refresh_metadata_cant_send(mocker, client_poll_mocked): - mocker.patch.object(client_poll_mocked, 'least_loaded_node', return_value='foobar') - mocker.patch.object(client_poll_mocked, '_can_send_request', return_value=False) - mocker.patch.object(client_poll_mocked, '_can_connect', return_value=True) - mocker.patch.object(client_poll_mocked, '_init_connect', return_value=True) - - now = time.monotonic() - t = mocker.patch('time.monotonic') - t.return_value = now - - # first poll attempts connection - client_poll_mocked.poll() - client_poll_mocked._poll.assert_called() - client_poll_mocked._init_connect.assert_called_once_with('foobar') - - # poll while connecting should not attempt a new connection - client_poll_mocked._connecting.add('foobar') - client_poll_mocked._can_connect.reset_mock() - client_poll_mocked.poll() - client_poll_mocked._poll.assert_called() - assert not client_poll_mocked._can_connect.called - assert not client_poll_mocked.cluster.metadata_refresh_in_progress - - -def test_schedule(): - pass - - -def test_unschedule(): - pass - - -def test_idle_connection_manager(mocker): - t = mocker.patch.object(time, 'monotonic') - t.return_value = 0 - - idle = IdleConnectionManager(100) - assert idle.next_check_ms() == float('inf') - - idle.update('foo') - assert not idle.is_expired('foo') - assert idle.poll_expired_connection() is None - assert idle.next_check_ms() == 100 - - t.return_value = 90 / 1000 - assert not idle.is_expired('foo') - assert idle.poll_expired_connection() is None - assert idle.next_check_ms() == 10 - - t.return_value = 100 / 1000 - assert idle.is_expired('foo') - assert idle.next_check_ms() == 0 - - conn_id, conn_ts = idle.poll_expired_connection() - assert conn_id == 'foo' - assert conn_ts == 0 - - idle.remove('foo') - assert idle.next_check_ms() == float('inf') diff --git a/test/test_conn.py b/test/test_conn.py deleted file mode 100644 index bc3cbb8d3..000000000 --- a/test/test_conn.py +++ /dev/null @@ -1,697 +0,0 @@ -# pylint: skip-file - -from errno import EALREADY, EINPROGRESS, EISCONN, ECONNRESET -import socket -from unittest import mock - -import pytest - -from kafka.conn import BrokerConnection, ConnectionStates -from kafka.future import Future -from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError -from kafka.metrics.metrics import Metrics -from kafka.metrics.stats.sensor import Sensor -from kafka.protocol.broker_version_data import BrokerVersionData, BROKER_API_VERSIONS, VERSION_CHECKS -from kafka.protocol.admin import ListGroupsResponse -from kafka.protocol.consumer import HeartbeatResponse -from kafka.protocol.metadata import MetadataRequest, ApiVersionsRequest, ApiVersionsResponse -from kafka.protocol.producer import ProduceRequest -from kafka.version import __version__ - -import kafka.errors as Errors - - -@pytest.fixture -def dns_lookup(mocker): - return mocker.patch('kafka.conn.dns_lookup', - return_value=[(socket.AF_INET, - None, None, None, - ('localhost', 9092))]) - -@pytest.fixture -def _socket(mocker): - socket = mocker.MagicMock() - socket.connect_ex.return_value = 0 - socket.send.side_effect = lambda d: len(d) - socket.recv.side_effect = BlockingIOError("mocked recv") - mocker.patch('socket.socket', return_value=socket) - return socket - -@pytest.fixture -def metrics(mocker): - metrics = mocker.MagicMock(Metrics) - metrics.mocked_sensors = {} - def sensor(name, **kwargs): - if name not in metrics.mocked_sensors: - metrics.mocked_sensors[name] = mocker.MagicMock(Sensor) - return metrics.mocked_sensors[name] - metrics.sensor.side_effect = sensor - return metrics - -@pytest.fixture -def conn(_socket, dns_lookup, metrics, mocker): - conn = BrokerConnection('localhost', 9092, socket.AF_INET, metrics=metrics) - mocker.patch.object(conn, '_try_api_versions_check', return_value=True) - return conn - - -@pytest.mark.parametrize("states", [ - (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),), - (([EALREADY, EALREADY], ConnectionStates.CONNECTING),), - (([0], ConnectionStates.CONNECTED),), - (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING), - ([ECONNRESET], ConnectionStates.DISCONNECTED)), - (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING), - ([EALREADY], ConnectionStates.CONNECTING), - ([EISCONN], ConnectionStates.CONNECTED)), -]) -def test_connect(_socket, conn, states): - assert conn.state is ConnectionStates.DISCONNECTED - - for errno, state in states: - _socket.connect_ex.side_effect = errno - conn.connect() - assert conn.state is state - - -def test_api_versions_check(_socket, mocker): - conn = BrokerConnection('localhost', 9092, socket.AF_INET) - mocker.patch.object(conn, 'send_pending_requests') - mocker.patch.object(conn, 'connection_delay', return_value=0) - mocker.spy(conn, '_send') - assert conn._api_versions_future is None - conn.connect() - assert conn._api_versions_future is not None - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - conn._send.assert_called_with( - ApiVersionsRequest(version=ApiVersionsRequest.max_version, client_software_name='kafka-python', client_software_version=__version__), - blocking=True, - request_timeout_ms=1000.0 - ) - send_call_count = conn._send.call_count - - assert conn._try_api_versions_check() is False - assert conn._send.call_count == send_call_count - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - - api_versions_response = ApiVersionsResponse( - version=ApiVersionsRequest.max_version, - error_code=0, - api_keys=[(MetadataRequest.API_KEY, 0, 5)] - ) - conn.recv(responses=[(1, api_versions_response)], resolve_futures=True) - assert conn.broker_version_data.broker_version == (1, 0) - assert conn.broker_version_data.api_versions == {MetadataRequest.API_KEY: (0, 5)} - assert conn.state is ConnectionStates.CONNECTED - - -def test_api_versions_unsupported_versions(_socket, mocker): - conn = BrokerConnection('localhost', 9092, socket.AF_INET) - mocker.patch.object(conn, 'send_pending_requests') - mocker.patch.object(conn, 'connection_delay', return_value=0) - mocker.spy(conn, '_send') - assert conn._api_versions_future is None - conn.connect() - assert conn._api_versions_future is not None - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - conn._send.assert_called_with( - ApiVersionsRequest(version=ApiVersionsRequest.max_version, client_software_name='kafka-python', client_software_version=__version__), - blocking=True, - request_timeout_ms=1000.0 - ) - send_call_count = conn._send.call_count - - assert conn._try_api_versions_check() is False - assert conn._send.call_count == send_call_count - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - - # If we sent a higher-than-supported version, the broker sends back the min/max - # We should skip to the max supported for our next try. - # Note: this only happens for brokers >= 2.4 - api_versions_error = ApiVersionsResponse( - version=0, - error_code=Errors.UnsupportedVersionError.errno, - api_keys=[(ApiVersionsRequest.API_KEY, 0, 3)] - ) - conn.recv(responses=[(1, api_versions_error)], resolve_futures=True) - assert conn._api_versions_idx == 3 - assert conn._api_versions_future is None - assert conn.state is ConnectionStates.API_VERSIONS_SEND - - assert conn._try_api_versions_check() is False - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - conn._send.assert_called_with( - ApiVersionsRequest(version=3, client_software_name='kafka-python', client_software_version=__version__), - blocking=True, - request_timeout_ms=500.0 - ) - - api_versions_response = ApiVersionsResponse( - version=3, error_code=0, - api_keys=[(key, *vals) for key, vals in BROKER_API_VERSIONS[(2, 4)].items()], - ) - conn.recv(responses=[(2, api_versions_response)], resolve_futures=True) - assert conn.broker_version_data.broker_version == (2, 4) - assert conn.broker_version_data.api_versions == BROKER_API_VERSIONS[(2, 4)] - assert conn.state is ConnectionStates.CONNECTED - - conn.close() - assert conn.state is ConnectionStates.DISCONNECTED - - # Reconnect uses the last api versions check - conn.connect() - assert conn.state is ConnectionStates.API_VERSIONS_RECV - conn._send.assert_called_with( - ApiVersionsRequest(version=3, client_software_name='kafka-python', client_software_version=__version__), - blocking=True, - request_timeout_ms=1000.0 - ) - - -def test_api_versions_unsupported_versions_empty(_socket, mocker): - conn = BrokerConnection('localhost', 9092, socket.AF_INET) - mocker.patch.object(conn, 'send_pending_requests') - mocker.patch.object(conn, 'connection_delay', return_value=0) - mocker.spy(conn, '_send') - assert conn._api_versions_future is None - conn.connect() - assert conn._api_versions_future is not None - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - conn._send.assert_called_with( - ApiVersionsRequest(version=ApiVersionsRequest.max_version, client_software_name='kafka-python', client_software_version=__version__), - blocking=True, - request_timeout_ms=1000.0 - ) - send_call_count = conn._send.call_count - - assert conn._try_api_versions_check() is False - assert conn._send.call_count == send_call_count - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - - # For broker versions < 2.4, the UnsupportedVersionError does not include api_keys - # We should fallback to v0 - api_versions_error = ApiVersionsResponse( - version=0, - error_code=Errors.UnsupportedVersionError.errno, - api_keys=[] - ) - conn.recv(responses=[(1, api_versions_error)], resolve_futures=True) - assert conn._api_versions_idx == 0 - assert conn._api_versions_future is None - assert conn.state is ConnectionStates.API_VERSIONS_SEND - - assert conn._try_api_versions_check() is False - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - conn._send.assert_called_with( - ApiVersionsRequest(version=0, client_software_name='kafka-python', client_software_version=__version__), - blocking=True, - request_timeout_ms=500.0 - ) - - api_versions_response = ApiVersionsResponse( - version=0, error_code=0, - api_keys=[(key, *vals) for key, vals in BROKER_API_VERSIONS[(2, 0)].items()], - ) - conn.recv(responses=[(2, api_versions_response)], resolve_futures=True) - assert conn.broker_version_data.broker_version == (2, 0) - assert conn.broker_version_data.api_versions == BROKER_API_VERSIONS[(2, 0)] - assert conn.state is ConnectionStates.CONNECTED - - conn.close() - assert conn.state is ConnectionStates.DISCONNECTED - - # Reconnect uses the max supported api versions check (v2 for 2.0 brokers) - conn.connect() - assert conn.state is ConnectionStates.API_VERSIONS_RECV - conn._send.assert_called_with( - ApiVersionsRequest(version=2, client_software_name='kafka-python', client_software_version=__version__), - blocking=True, - request_timeout_ms=1000.0 - ) - - -def test_api_versions_fallback(_socket, mocker): - conn = BrokerConnection('localhost', 9092, socket.AF_INET) - mocker.patch.object(conn, 'send_pending_requests') - mocker.patch.object(conn, 'connection_delay', return_value=0) - mocker.spy(conn, '_send') - assert conn._api_versions_future is None - conn.connect() - assert conn._api_versions_future is not None - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - conn._send.assert_called_with( - ApiVersionsRequest(version=ApiVersionsRequest.max_version, client_software_name='kafka-python', client_software_version=__version__), - blocking=True, - request_timeout_ms=1000.0 - ) - send_call_count = conn._send.call_count - - assert conn._try_api_versions_check() is False - assert conn._send.call_count == send_call_count - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - - future = conn._api_versions_future - assert not future.is_done - conn.close(Errors.RequestTimedOutError()) - assert conn._api_versions_idx == None - assert conn._api_versions_future is None - assert conn._check_version_idx == 0 - assert conn.state is ConnectionStates.DISCONNECTED - - conn.connect() - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - conn._send.assert_called_with( - VERSION_CHECKS[0][1], - blocking=True, - request_timeout_ms=500.0 - ) - - conn.recv(responses=[(1, ListGroupsResponse())], resolve_futures=True) - - assert conn.broker_version_data == BrokerVersionData((0, 9)) - assert conn.state is ConnectionStates.CONNECTED - - conn.close() - assert conn.state is ConnectionStates.DISCONNECTED - - # Reconnect skips check versions - conn.connect() - assert conn.state is ConnectionStates.CONNECTED - - -def test_api_versions_check_with_broker_version_data(_socket, mocker): - conn = BrokerConnection('localhost', 9092, socket.AF_INET, broker_version_data=BrokerVersionData((1, 0))) - mocker.patch.object(conn, 'send_pending_requests') - mocker.patch.object(conn, 'connection_delay', return_value=0) - mocker.spy(conn, '_send') - assert conn._api_versions_future is None - conn.connect() - assert conn._api_versions_future is not None - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - conn._send.assert_called_with( - ApiVersionsRequest(version=1, client_software_name='kafka-python', client_software_version=__version__), - blocking=True, - request_timeout_ms=1000.0 - ) - send_call_count = conn._send.call_count - - assert conn._try_api_versions_check() is False - assert conn._send.call_count == send_call_count - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - - future = conn._api_versions_future - assert not future.is_done - conn.close(Errors.RequestTimedOutError()) - assert future.failed() - assert future.exception == Errors.RequestTimedOutError() - assert conn._api_versions_idx == 1 - assert conn._api_versions_future is None - assert conn.state is ConnectionStates.DISCONNECTED - - conn.connect() - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - conn._send.assert_called_with( - ApiVersionsRequest(version=1, client_software_name='kafka-python', client_software_version=__version__), - blocking=True, - request_timeout_ms=500.0 - ) - - # Even if we connect to a newer broker, we should still use the cached broker_version_data - api_versions_response = ApiVersionsResponse( - version=0, error_code=0, - api_keys=[(key, *vals) for key, vals in BROKER_API_VERSIONS[(2, 0)].items()], - ) - conn.recv(responses=[(1, api_versions_response)], resolve_futures=True) - assert conn.broker_version_data.broker_version == (1, 0) - assert conn.broker_version_data.api_versions == BROKER_API_VERSIONS[(1, 0)] - assert conn.state is ConnectionStates.CONNECTED - - conn.close() - conn.connect() - assert conn.connecting() is True - assert conn.state is ConnectionStates.API_VERSIONS_RECV - conn._send.assert_called_with( - ApiVersionsRequest(version=1, client_software_name='kafka-python', client_software_version=__version__), - blocking=True, - request_timeout_ms=250.0 - ) - - # But if we connect to an older broker, we should use it instead of the cached broker_version_data - api_versions_response = ApiVersionsResponse( - version=0, error_code=0, - api_keys=[(key, *vals) for key, vals in BROKER_API_VERSIONS[(0, 11)].items()], - ) - conn.recv(responses=[(1, api_versions_response)], resolve_futures=True) - assert conn.broker_version_data.broker_version == (0, 11) - assert conn.broker_version_data.api_versions == BROKER_API_VERSIONS[(0, 11)] - assert conn.state is ConnectionStates.CONNECTED - - -def test_connect_timeout(_socket, conn): - assert conn.state is ConnectionStates.DISCONNECTED - - # Initial connect returns EINPROGRESS - # immediate inline connect returns EALREADY - # second explicit connect returns EALREADY - # third explicit connect returns EALREADY and times out via last_attempt - _socket.connect_ex.side_effect = [EINPROGRESS, EALREADY, EALREADY, EALREADY] - conn.connect() - assert conn.state is ConnectionStates.CONNECTING - conn.connect() - assert conn.state is ConnectionStates.CONNECTING - conn.last_attempt = 0 - conn.connect() - assert conn.state is ConnectionStates.DISCONNECTED - - -def test_blacked_out(conn): - with mock.patch("time.monotonic", return_value=1000): - conn.last_attempt = 0 - assert conn.blacked_out() is False - conn.last_attempt = 1000 - assert conn.blacked_out() is True - - -def test_connection_delay(conn, mocker): - mocker.patch.object(conn, '_reconnect_jitter_pct', return_value=1.0) - with mock.patch("time.monotonic", return_value=1000): - conn.last_attempt = 1000 - assert conn.connection_delay() == conn.config['reconnect_backoff_ms'] - conn.state = ConnectionStates.CONNECTING - assert conn.connection_delay() == conn.config['reconnect_backoff_ms'] - conn.state = ConnectionStates.CONNECTED - assert conn.connection_delay() == float('inf') - - del conn._gai[:] - conn._update_reconnect_backoff() - conn.state = ConnectionStates.DISCONNECTED - assert conn.connection_delay() == 1.0 * conn.config['reconnect_backoff_ms'] - conn.state = ConnectionStates.CONNECTING - assert conn.connection_delay() == 1.0 * conn.config['reconnect_backoff_ms'] - - conn._update_reconnect_backoff() - conn.state = ConnectionStates.DISCONNECTED - assert conn.connection_delay() == 2.0 * conn.config['reconnect_backoff_ms'] - conn.state = ConnectionStates.CONNECTING - assert conn.connection_delay() == 2.0 * conn.config['reconnect_backoff_ms'] - - conn._update_reconnect_backoff() - conn.state = ConnectionStates.DISCONNECTED - assert conn.connection_delay() == 4.0 * conn.config['reconnect_backoff_ms'] - conn.state = ConnectionStates.CONNECTING - assert conn.connection_delay() == 4.0 * conn.config['reconnect_backoff_ms'] - - -def test_connected(conn): - assert conn.connected() is False - conn.state = ConnectionStates.CONNECTED - assert conn.connected() is True - - -def test_connecting(conn): - assert conn.connecting() is False - conn.state = ConnectionStates.CONNECTING - assert conn.connecting() is True - conn.state = ConnectionStates.CONNECTED - assert conn.connecting() is False - - -def test_send_disconnected(conn): - conn.state = ConnectionStates.DISCONNECTED - f = conn.send('foobar') - assert f.failed() is True - assert isinstance(f.exception, Errors.KafkaConnectionError) - - -def test_send_connecting(conn): - conn.state = ConnectionStates.CONNECTING - f = conn.send('foobar') - assert f.failed() is True - assert isinstance(f.exception, Errors.NodeNotReadyError) - - -def test_send_max_ifr(conn): - conn.state = ConnectionStates.CONNECTED - max_ifrs = conn.config['max_in_flight_requests_per_connection'] - for i in range(max_ifrs): - conn.in_flight_requests[i] = 'foo' - f = conn.send('foobar') - assert f.failed() is True - assert isinstance(f.exception, Errors.TooManyInFlightRequests) - - -def test_send_no_response(_socket, conn): - conn.connect() - assert conn.state is ConnectionStates.CONNECTED - req = ProduceRequest[0](acks=0, timeout_ms=0, topic_data=()) - req.with_header(correlation_id=0, client_id=conn.config['client_id']) - payload_bytes = len(req.encode(header=True, framed=False)) - third = payload_bytes // 3 - remainder = payload_bytes % 3 - _socket.send.side_effect = [4, third, third, third, remainder] - - assert len(conn.in_flight_requests) == 0 - f = conn.send(req) - assert f.succeeded() is True - assert f.value is None - assert len(conn.in_flight_requests) == 0 - - -def test_send_response(_socket, conn): - conn.connect() - assert conn.state is ConnectionStates.CONNECTED - req = MetadataRequest[0]([]) - req.with_header(correlation_id=0, client_id=conn.config['client_id']) - payload_bytes = len(req.encode(header=True, framed=False)) - third = payload_bytes // 3 - remainder = payload_bytes % 3 - _socket.send.side_effect = [4, third, third, third, remainder] - - assert len(conn.in_flight_requests) == 0 - f = conn.send(req) - assert f.is_done is False - assert len(conn.in_flight_requests) == 1 - - -def test_send_async_request_while_other_request_is_already_in_buffer(_socket, conn, metrics): - conn.connect() - assert conn.state is ConnectionStates.CONNECTED - assert 'node-0.bytes-sent' in metrics.mocked_sensors - bytes_sent_sensor = metrics.mocked_sensors['node-0.bytes-sent'] - - req1 = MetadataRequest[0](topics='foo') - req1.with_header(correlation_id=0, client_id=conn.config['client_id']) - payload_bytes1 = len(req1.encode(header=True, framed=False)) - req2 = MetadataRequest[0]([]) - req2.with_header(correlation_id=0, client_id=conn.config['client_id']) - payload_bytes2 = len(req2.encode(header=True, framed=False)) - - # The first call to the socket will raise a transient SSL exception. This will make the first - # request to be kept in the internal buffer to be sent in the next call of - # send_pending_requests_v2. - _socket.send.side_effect = [SSLWantWriteError, 4 + payload_bytes1, 4 + payload_bytes2] - - conn.send(req1, blocking=False) - # This won't send any bytes because of the SSL exception and the request bytes will be kept in - # the buffer. - assert conn.send_pending_requests_v2() is False - assert bytes_sent_sensor.record.call_args_list[0].args == (0,) - - conn.send(req2, blocking=False) - # This will send the remaining bytes in the buffer from the first request, but should notice - # that the second request was queued, therefore it should return False. - bytes_sent_sensor.record.reset_mock() - assert conn.send_pending_requests_v2() is False - bytes_sent_sensor.record.assert_called_once_with(4 + payload_bytes1) - - bytes_sent_sensor.record.reset_mock() - assert conn.send_pending_requests_v2() is True - bytes_sent_sensor.record.assert_called_once_with(4 + payload_bytes2) - - bytes_sent_sensor.record.reset_mock() - assert conn.send_pending_requests_v2() is True - bytes_sent_sensor.record.assert_called_once_with(0) - - -def test_send_error(_socket, conn): - conn.connect() - assert conn.state is ConnectionStates.CONNECTED - req = MetadataRequest[0]([]) - try: - _socket.send.side_effect = ConnectionError - except NameError: - _socket.send.side_effect = socket.error - f = conn.send(req) - assert f.failed() is True - assert isinstance(f.exception, Errors.KafkaConnectionError) - assert _socket.close.call_count == 1 - assert conn.state is ConnectionStates.DISCONNECTED - - -def test_can_send_more(conn): - assert conn.can_send_more() is True - max_ifrs = conn.config['max_in_flight_requests_per_connection'] - for i in range(max_ifrs): - assert conn.can_send_more() is True - conn.in_flight_requests[i] = 'foo' - assert conn.can_send_more() is False - - -def test_recv_disconnected(_socket, conn): - conn.connect() - assert conn.connected() - - req = MetadataRequest[0]([]) - req.with_header(correlation_id=0, client_id=conn.config['client_id']) - payload_bytes = len(req.encode(header=True, framed=False)) - _socket.send.side_effect = [4, payload_bytes] - conn.send(req) - - # Empty data on recv means the socket is disconnected - _socket.recv.side_effect = None - _socket.recv.return_value = b'' - - # Attempt to receive should mark connection as disconnected - assert conn.connected(), 'Not connected: %s' % conn.state - conn.recv() - assert conn.disconnected(), 'Not disconnected: %s' % conn.state - - -def test_recv(_socket, conn): - pass # TODO - - -def test_close(conn): - pass # TODO - - -def test_lookup_on_connect(): - hostname = 'example.org' - port = 9092 - conn = BrokerConnection(hostname, port, socket.AF_UNSPEC) - assert conn.host == hostname - assert conn.port == port - assert conn.afi == socket.AF_UNSPEC - afi1 = socket.AF_INET - sockaddr1 = ('127.0.0.1', 9092) - mock_return1 = [ - (afi1, socket.SOCK_STREAM, 6, '', sockaddr1), - ] - with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m: - conn.connect() - m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM) - assert conn._sock_afi == afi1 - assert conn._sock_addr == sockaddr1 - conn.close() - - afi2 = socket.AF_INET6 - sockaddr2 = ('::1', 9092, 0, 0) - mock_return2 = [ - (afi2, socket.SOCK_STREAM, 6, '', sockaddr2), - ] - - with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m: - conn.last_attempt = 0 - conn.connect() - m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM) - assert conn._sock_afi == afi2 - assert conn._sock_addr == sockaddr2 - conn.close() - - -def test_relookup_on_failure(): - hostname = 'example.org' - port = 9092 - conn = BrokerConnection(hostname, port, socket.AF_UNSPEC) - assert conn.host == hostname - mock_return1 = [] - with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m: - last_attempt = conn.last_attempt - conn.connect() - m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM) - assert conn.disconnected() - assert conn.last_attempt > last_attempt - - afi2 = socket.AF_INET - sockaddr2 = ('127.0.0.2', 9092) - mock_return2 = [ - (afi2, socket.SOCK_STREAM, 6, '', sockaddr2), - ] - - with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m: - conn.last_attempt = 0 - conn.connect() - m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM) - assert conn._sock_afi == afi2 - assert conn._sock_addr == sockaddr2 - conn.close() - - -def test_requests_timed_out(conn): - with mock.patch("time.monotonic", return_value=0): - # No in-flight requests, not timed out - assert not conn.requests_timed_out() - - # Single request, timeout_at > now (0) - conn.in_flight_requests[0] = ('foo', 0, 1) - assert not conn.requests_timed_out() - - # Add another request w/ timestamp > request_timeout ago - request_timeout = conn.config['request_timeout_ms'] - expired_timestamp = 0 - request_timeout - 1 - conn.in_flight_requests[1] = ('bar', 0, expired_timestamp) - assert conn.requests_timed_out() - - # Drop the expired request and we should be good to go again - conn.in_flight_requests.pop(1) - assert not conn.requests_timed_out() - - -def test_maybe_throttle(conn): - assert conn.state is ConnectionStates.DISCONNECTED - assert not conn.throttled() - - conn.state = ConnectionStates.CONNECTED - assert not conn.throttled() - - # No throttle_time_ms attribute - conn._maybe_throttle(HeartbeatResponse[0](error_code=0)) - assert not conn.throttled() - - with mock.patch("time.monotonic", return_value=1000) as time: - # server-side throttling in v1.0 - conn.broker_version_data = BrokerVersionData((1, 0)) - conn._maybe_throttle(HeartbeatResponse[1](throttle_time_ms=1000, error_code=0)) - assert not conn.throttled() - - # client-side throttling in v2.0 - conn.broker_version_data = BrokerVersionData((2, 0)) - conn._maybe_throttle(HeartbeatResponse[2](throttle_time_ms=1000, error_code=0)) - assert conn.throttled() - - time.return_value = 3000 - assert not conn.throttled() - - -def test_host_in_sasl_config(): - hostname = 'example.org' - port = 9092 - for security_protocol in ('SASL_PLAINTEXT', 'SASL_SSL'): - with mock.patch("kafka.conn.get_sasl_mechanism") as get_sasl_mechanism: - BrokerConnection(hostname, port, socket.AF_UNSPEC, security_protocol=security_protocol) - call_config = get_sasl_mechanism.mock_calls[1].kwargs - assert call_config['host'] == hostname