diff --git a/kafka/metrics/metrics.py b/kafka/metrics/metrics.py index 0b7ea919b..7f355cc85 100644 --- a/kafka/metrics/metrics.py +++ b/kafka/metrics/metrics.py @@ -162,20 +162,20 @@ def remove_sensor(self, name): Arguments: name (str): The name of the sensor to be removed """ - sensor = self._sensors.get(name) - if sensor: - child_sensors = None - with sensor._lock: - with self._lock: + with self._lock: + sensor = self._sensors.get(name) + if sensor: + child_sensors = None + with sensor._lock: val = self._sensors.pop(name, None) if val and val == sensor: for metric in sensor.metrics: self.remove_metric(metric.metric_name) logger.debug('Removed sensor with name %s', name) child_sensors = self._children_sensors.pop(sensor, None) - if child_sensors: - for child_sensor in child_sensors: - self.remove_sensor(child_sensor.name) + if child_sensors: + for child_sensor in child_sensors: + self.remove_sensor(child_sensor.name) def add_metric(self, metric_name, measurable, config=None): """ diff --git a/kafka/metrics/metrics_reporter.py b/kafka/metrics/metrics_reporter.py index fd7de8d5e..8423553bb 100644 --- a/kafka/metrics/metrics_reporter.py +++ b/kafka/metrics/metrics_reporter.py @@ -1,7 +1,7 @@ import abc -class AbstractMetricsReporter(object, metaclass=abc.ABCMeta): +class AbstractMetricsReporter(metaclass=abc.ABCMeta): """ An abstract class to allow things to listen as new metrics are created so they can be reported. diff --git a/kafka/metrics/stat.py b/kafka/metrics/stat.py index 2588493a7..bd70eb232 100644 --- a/kafka/metrics/stat.py +++ b/kafka/metrics/stat.py @@ -1,7 +1,7 @@ import abc -class AbstractStat(object, metaclass=abc.ABCMeta): +class AbstractStat(metaclass=abc.ABCMeta): """ An AbstractStat is a quantity such as average, max, etc that is computed off the stream of updates to a sensor diff --git a/kafka/metrics/stats/avg.py b/kafka/metrics/stats/avg.py index c7e7dc485..f3f8d1ddb 100644 --- a/kafka/metrics/stats/avg.py +++ b/kafka/metrics/stats/avg.py @@ -8,7 +8,7 @@ class Avg(AbstractSampledStat): __slots__ = ('_initial_value', '_samples', '_current') def __init__(self): - super(Avg, self).__init__(0.0) + super().__init__(0.0) def update(self, sample, config, value, now): sample.value += value diff --git a/kafka/metrics/stats/count.py b/kafka/metrics/stats/count.py index 060bd2d99..6f83fcbc2 100644 --- a/kafka/metrics/stats/count.py +++ b/kafka/metrics/stats/count.py @@ -8,7 +8,7 @@ class Count(AbstractSampledStat): __slots__ = ('_initial_value', '_samples', '_current') def __init__(self): - super(Count, self).__init__(0.0) + super().__init__(0.0) def update(self, sample, config, value, now): sample.value += 1.0 diff --git a/kafka/metrics/stats/histogram.py b/kafka/metrics/stats/histogram.py index ba5789345..87848116b 100644 --- a/kafka/metrics/stats/histogram.py +++ b/kafka/metrics/stats/histogram.py @@ -29,7 +29,7 @@ def counts(self): return self._hist def clear(self): - for i in range(self._hist): + for i in range(len(self._hist)): self._hist[i] = 0.0 self._count = 0 diff --git a/kafka/metrics/stats/max_stat.py b/kafka/metrics/stats/max_stat.py index 5e0382b3b..3ab68fcd6 100644 --- a/kafka/metrics/stats/max_stat.py +++ b/kafka/metrics/stats/max_stat.py @@ -6,7 +6,7 @@ class Max(AbstractSampledStat): __slots__ = ('_initial_value', '_samples', '_current') def __init__(self): - super(Max, self).__init__(float('-inf')) + super().__init__(float('-inf')) def update(self, sample, config, value, now): sample.value = max(sample.value, value) diff --git a/kafka/metrics/stats/min_stat.py b/kafka/metrics/stats/min_stat.py index a728fa2eb..dd04c3bcd 100644 --- a/kafka/metrics/stats/min_stat.py +++ b/kafka/metrics/stats/min_stat.py @@ -8,7 +8,7 @@ class Min(AbstractSampledStat): __slots__ = ('_initial_value', '_samples', '_current') def __init__(self): - super(Min, self).__init__(float(sys.maxsize)) + super().__init__(float(sys.maxsize)) def update(self, sample, config, value, now): sample.value = min(sample.value, value) diff --git a/kafka/metrics/stats/percentiles.py b/kafka/metrics/stats/percentiles.py index a35b4ab75..9c3185237 100644 --- a/kafka/metrics/stats/percentiles.py +++ b/kafka/metrics/stats/percentiles.py @@ -16,7 +16,7 @@ class Percentiles(AbstractSampledStat, AbstractCompoundStat): def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0, percentiles=None): - super(Percentiles, self).__init__(0.0) + super().__init__(0.0) self._percentiles = percentiles or [] self._buckets = int(size_in_bytes / 4) if bucketing == BucketSizing.CONSTANT: @@ -26,7 +26,7 @@ def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0, if min_val != 0.0: raise ValueError('Linear bucket sizing requires min_val' ' to be 0.0.') - self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val) + self._bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val) else: raise ValueError('Unknown bucket type: %s' % (bucketing,)) @@ -71,5 +71,5 @@ def update(self, sample, config, value, time_ms): class HistogramSample(AbstractSampledStat.Sample): def __init__(self, scheme, now): - super(Percentiles.HistogramSample, self).__init__(0.0, now) + super().__init__(0.0, now) self.histogram = Histogram(scheme) diff --git a/kafka/metrics/stats/rate.py b/kafka/metrics/stats/rate.py index 85550c904..23232649c 100644 --- a/kafka/metrics/stats/rate.py +++ b/kafka/metrics/stats/rate.py @@ -109,7 +109,7 @@ class SampledTotal(AbstractSampledStat): def __init__(self, initial_value=None): if initial_value is not None: raise ValueError('initial_value cannot be set on SampledTotal') - super(SampledTotal, self).__init__(0.0) + super().__init__(0.0) def update(self, sample, config, value, time_ms): sample.value += value diff --git a/kafka/metrics/stats/total.py b/kafka/metrics/stats/total.py index d43ceee18..eefe4aa7c 100644 --- a/kafka/metrics/stats/total.py +++ b/kafka/metrics/stats/total.py @@ -3,7 +3,7 @@ class Total(AbstractMeasurableStat): """An un-windowed cumulative total maintained over all time.""" - __slots__ = ('_total') + __slots__ = ('_total',) def __init__(self, value=0.0): self._total = value diff --git a/kafka/net/connection.py b/kafka/net/connection.py index 75cecf576..e21fdcc6f 100644 --- a/kafka/net/connection.py +++ b/kafka/net/connection.py @@ -6,6 +6,7 @@ import kafka.errors as Errors from kafka.future import Future +from kafka.net.metrics import KafkaConnectionMetrics from kafka.protocol.metadata import ApiVersionsRequest from kafka.protocol.sasl import SaslAuthenticateRequest, SaslHandshakeRequest, SaslBytesRequest from kafka.protocol.broker_version_data import BrokerVersionData @@ -33,6 +34,8 @@ class KafkaConnection: 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, 'api_version_auto_timeout_ms': 2000, + 'metrics': None, + 'metric_group_prefix': '', } def __init__(self, net, node_id=None, broker_version_data=None, **configs): @@ -55,6 +58,11 @@ def __init__(self, net, node_id=None, broker_version_data=None, **configs): self.broker_version_data = broker_version_data self._api_versions_idx = ApiVersionsRequest.max_version # version of ApiVersionsRequest to try on first connect self._throttle_time = 0 + if self.config['metrics']: + self._sensors = KafkaConnectionMetrics( + self.config['metrics'], self.config['metric_group_prefix'], node_id) + else: + self._sensors = None self._init_future.add_errback(self.fail_in_flight_requests) self._close_future.add_both(self.fail_in_flight_requests) @@ -172,8 +180,8 @@ def data_received(self, data): return self.close(Errors.KafkaConnectionError('Received unrecognized correlation id')) latency_ms = (time.monotonic() - sent_time) * 1000 - # if self._sensors: - # self._sensors.request_time.record(latency_ms) + if self._sensors: + self._sensors.request_time.record(latency_ms) log.debug('%s: Response %d (%s ms): %s', self, resp_correlation_id, latency_ms, response) self._maybe_throttle(response) @@ -292,8 +300,8 @@ def close(self, error=None): def _maybe_throttle(self, response): throttle_time_ms = getattr(response, 'throttle_time_ms', 0) - #if self._sensors: - # self._sensors.throttle_time.record(throttle_time_ms) + if self._sensors: + self._sensors.throttle_time.record(throttle_time_ms) if not throttle_time_ms: return # Client side throttling enabled in v2.0 brokers diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 9ca088a89..6d6c3e5e2 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -7,6 +7,7 @@ from .inet import create_connection from .connection import KafkaConnection +from .metrics import KafkaManagerMetrics from .transport import KafkaSSLTransport, KafkaTCPTransport import kafka.errors as Errors from kafka.protocol.broker_version_data import BrokerVersionData @@ -42,6 +43,8 @@ class KafkaConnectionManager: 'socks5_proxy': None, 'api_version': None, 'api_version_auto_timeout_ms': 2000, + 'metrics': None, + 'metric_group_prefix': '', } def __init__(self, net, cluster, **configs): self.config = copy.copy(self.DEFAULT_CONFIG) @@ -58,6 +61,11 @@ def __init__(self, net, cluster, **configs): self.broker_version_data = None self._bootstrap_future = None self._metadata_future = None + if self.config['metrics']: + self._sensors = KafkaManagerMetrics( + self.config['metrics'], self.config['metric_group_prefix'], self._conns) + else: + self._sensors = None if self.config['api_version'] is not None: self.broker_version_data = BrokerVersionData(self.config['api_version']) @@ -190,6 +198,8 @@ async def _connect(self, node, conn): self.update_backoff(node.node_id) return + if self._sensors: + self._sensors.connection_created.record() self.reset_backoff(node.node_id) if conn.broker_version_data is not None: if self.cluster.is_bootstrap(node.node_id): @@ -208,6 +218,8 @@ def get_connection(self, node_id, pop_on_close=True, refresh_metadata_on_err=Tru conn = KafkaConnection(self._net, node_id=node_id, broker_version_data=self.broker_version_data, **self.config) if pop_on_close: conn.close_future.add_both(lambda _: self._conns.pop(node.node_id, None)) + if self._sensors: + conn.close_future.add_both(lambda _: self._sensors.connection_closed.record()) if refresh_metadata_on_err: conn.close_future.add_errback(lambda _: self.cluster.request_update()) self._conns[node_id] = conn diff --git a/kafka/net/metrics.py b/kafka/net/metrics.py new file mode 100644 index 000000000..727182384 --- /dev/null +++ b/kafka/net/metrics.py @@ -0,0 +1,152 @@ +"""Metrics for kafka.net connection manager and connections. + +Mirrors the metrics from kafka/client_async.py (KafkaClientMetrics) +and kafka/conn.py (BrokerConnectionMetrics). +""" +import logging + +from kafka.metrics.measurable import AnonMeasurable +from kafka.metrics.stats import Avg, Count, Max, Rate +from kafka.metrics.stats.rate import TimeUnit + +log = logging.getLogger(__name__) + + +class KafkaManagerMetrics: + """Metrics for KafkaConnectionManager (equivalent to KafkaClientMetrics). + Note that kafka.net does not track select_time or io_time. + """ + def __init__(self, metrics, metric_group_prefix, conns): + self.metrics = metrics + metric_group_name = metric_group_prefix + '-metrics' + + self.connection_closed = metrics.sensor('connections-closed') + self.connection_closed.add(metrics.metric_name( + 'connection-close-rate', metric_group_name, + 'Connections closed per second in the window.'), Rate()) + + self.connection_created = metrics.sensor('connections-created') + self.connection_created.add(metrics.metric_name( + 'connection-creation-rate', metric_group_name, + 'New connections established per second in the window.'), Rate()) + + metrics.add_metric(metrics.metric_name( + 'connection-count', metric_group_name, + 'The current number of active connections.'), AnonMeasurable( + lambda config, now: len(conns))) + + +class KafkaConnectionMetrics: + """Metrics for a single KafkaConnection (equivalent to BrokerConnectionMetrics).""" + def __init__(self, metrics, metric_group_prefix, node_id): + self.metrics = metrics + + # Global aggregate sensors (created once, shared across all connections) + if not metrics.get_sensor('bytes-sent-received'): + metric_group_name = metric_group_prefix + '-metrics' + + bytes_transferred = metrics.sensor('bytes-sent-received') + bytes_transferred.add(metrics.metric_name( + 'network-io-rate', metric_group_name, + 'The average number of network operations (reads or writes) on all' + ' connections per second.'), Rate(sampled_stat=Count())) + + bytes_sent = metrics.sensor('bytes-sent', + parents=[bytes_transferred]) + bytes_sent.add(metrics.metric_name( + 'outgoing-byte-rate', metric_group_name, + 'The average number of outgoing bytes sent per second to all' + ' servers.'), Rate()) + bytes_sent.add(metrics.metric_name( + 'request-rate', metric_group_name, + 'The average number of requests sent per second.'), + Rate(sampled_stat=Count())) + bytes_sent.add(metrics.metric_name( + 'request-size-avg', metric_group_name, + 'The average size of all requests in the window.'), Avg()) + bytes_sent.add(metrics.metric_name( + 'request-size-max', metric_group_name, + 'The maximum size of any request sent in the window.'), Max()) + + bytes_received = metrics.sensor('bytes-received', + parents=[bytes_transferred]) + bytes_received.add(metrics.metric_name( + 'incoming-byte-rate', metric_group_name, + 'Bytes/second read off all sockets'), Rate()) + bytes_received.add(metrics.metric_name( + 'response-rate', metric_group_name, + 'Responses received sent per second.'), + Rate(sampled_stat=Count())) + + request_latency = metrics.sensor('request-latency') + request_latency.add(metrics.metric_name( + 'request-latency-avg', metric_group_name, + 'The average request latency in ms.'), Avg()) + request_latency.add(metrics.metric_name( + 'request-latency-max', metric_group_name, + 'The maximum request latency in ms.'), Max()) + + throttle_time = metrics.sensor('throttle-time') + throttle_time.add(metrics.metric_name( + 'throttle-time-avg', metric_group_name, + 'The average throttle time in ms.'), Avg()) + throttle_time.add(metrics.metric_name( + 'throttle-time-max', metric_group_name, + 'The maximum throttle time in ms.'), Max()) + + # Per-node sensors (created per connection, parent to global sensors) + if not metrics.get_sensor(f'node-{node_id}.bytes-sent'): + metric_group_name = f'{metric_group_prefix}-node-metrics.node-{node_id}' + + bytes_sent = metrics.sensor( + f'node-{node_id}.bytes-sent', + parents=[metrics.get_sensor('bytes-sent')]) + bytes_sent.add(metrics.metric_name( + 'outgoing-byte-rate', metric_group_name, + 'The average number of outgoing bytes sent per second.'), Rate()) + bytes_sent.add(metrics.metric_name( + 'request-rate', metric_group_name, + 'The average number of requests sent per second.'), + Rate(sampled_stat=Count())) + bytes_sent.add(metrics.metric_name( + 'request-size-avg', metric_group_name, + 'The average size of all requests in the window.'), Avg()) + bytes_sent.add(metrics.metric_name( + 'request-size-max', metric_group_name, + 'The maximum size of any request sent in the window.'), Max()) + + bytes_received = metrics.sensor( + f'node-{node_id}.bytes-received', + parents=[metrics.get_sensor('bytes-received')]) + bytes_received.add(metrics.metric_name( + 'incoming-byte-rate', metric_group_name, + 'Bytes/second read off node-connection socket'), Rate()) + bytes_received.add(metrics.metric_name( + 'response-rate', metric_group_name, + 'The average number of responses received per second.'), + Rate(sampled_stat=Count())) + + request_time = metrics.sensor( + f'node-{node_id}.latency', + parents=[metrics.get_sensor('request-latency')]) + request_time.add(metrics.metric_name( + 'request-latency-avg', metric_group_name, + 'The average request latency in ms.'), Avg()) + request_time.add(metrics.metric_name( + 'request-latency-max', metric_group_name, + 'The maximum request latency in ms.'), Max()) + + throttle_time = metrics.sensor( + f'node-{node_id}.throttle', + parents=[metrics.get_sensor('throttle-time')]) + throttle_time.add(metrics.metric_name( + 'throttle-time-avg', metric_group_name, + 'The average throttle time in ms.'), Avg()) + throttle_time.add(metrics.metric_name( + 'throttle-time-max', metric_group_name, + 'The maximum throttle time in ms.'), Max()) + + self.bytes_sent = metrics.sensor(f'node-{node_id}.bytes-sent') + self.bytes_received = metrics.sensor(f'node-{node_id}.bytes-received') + self.request_time = metrics.sensor(f'node-{node_id}.latency') + self.throttle_time = metrics.sensor(f'node-{node_id}.throttle') diff --git a/kafka/net/transport.py b/kafka/net/transport.py index 83d2e6b83..9a4075014 100644 --- a/kafka/net/transport.py +++ b/kafka/net/transport.py @@ -88,8 +88,8 @@ async def _read_from_sock(self): recvd_data, err = self._sock_recv() log.debug('%s: received %d bytes', self, len(recvd_data)) self.last_read = time.monotonic() - # if self._sensors: - # self._sensors.bytes_received.record(len(recvd_data)) + if self._protocol and self._protocol._sensors: + self._protocol._sensors.bytes_received.record(len(recvd_data)) try: self._protocol.data_received(recvd_data) @@ -188,6 +188,8 @@ async def _write_to_sock(self): total_bytes, err = self._sock_send() log.debug('%s: sent %d bytes', self, total_bytes) self.last_write = time.monotonic() + if self._protocol and self._protocol._sensors: + self._protocol._sensors.bytes_sent.record(total_bytes) if self._closed: self._close() elif not self._write: diff --git a/test/test_metrics.py b/test/test_metrics.py index 04aac3956..ba1b96f19 100644 --- a/test/test_metrics.py +++ b/test/test_metrics.py @@ -342,7 +342,7 @@ def test_duplicate_MetricName(metrics): metrics.sensor('test2').add(metrics.metric_name('test', 'grp1'), Total()) -def test_Quotas(metrics): +def test_quotas(metrics): sensor = metrics.sensor('test') sensor.add(metrics.metric_name('test1.total', 'grp1'), Total(), MetricConfig(quota=Quota.upper_bound(5.0))) @@ -360,7 +360,7 @@ def test_Quotas(metrics): sensor.record(-1.0) -def test_Quotas_equality(): +def test_quotas_equality(): quota1 = Quota.upper_bound(10.5) quota2 = Quota.lower_bound(10.5) assert quota1 != quota2, 'Quota with different upper values should not be equal' @@ -369,7 +369,7 @@ def test_Quotas_equality(): assert quota2 == quota3, 'Quota with same upper and bound values should be equal' -def test_Percentiles(metrics): +def test_percentiles(metrics): buckets = 100 _percentiles = [ Percentile(metrics.metric_name('test.p25', 'grp1'), 25),