Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions kafka/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,20 +162,20 @@ def remove_sensor(self, name):
Arguments:
name (str): The name of the sensor to be removed
"""
sensor = self._sensors.get(name)
if sensor:
child_sensors = None
with sensor._lock:
with self._lock:
with self._lock:
sensor = self._sensors.get(name)
if sensor:
child_sensors = None
with sensor._lock:
val = self._sensors.pop(name, None)
if val and val == sensor:
for metric in sensor.metrics:
self.remove_metric(metric.metric_name)
logger.debug('Removed sensor with name %s', name)
child_sensors = self._children_sensors.pop(sensor, None)
if child_sensors:
for child_sensor in child_sensors:
self.remove_sensor(child_sensor.name)
if child_sensors:
for child_sensor in child_sensors:
self.remove_sensor(child_sensor.name)

def add_metric(self, metric_name, measurable, config=None):
"""
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/metrics_reporter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc


class AbstractMetricsReporter(object, metaclass=abc.ABCMeta):
class AbstractMetricsReporter(metaclass=abc.ABCMeta):
"""
An abstract class to allow things to listen as new metrics
are created so they can be reported.
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stat.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc


class AbstractStat(object, metaclass=abc.ABCMeta):
class AbstractStat(metaclass=abc.ABCMeta):
"""
An AbstractStat is a quantity such as average, max, etc that is computed
off the stream of updates to a sensor
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/avg.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class Avg(AbstractSampledStat):
__slots__ = ('_initial_value', '_samples', '_current')

def __init__(self):
super(Avg, self).__init__(0.0)
super().__init__(0.0)

def update(self, sample, config, value, now):
sample.value += value
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/count.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class Count(AbstractSampledStat):
__slots__ = ('_initial_value', '_samples', '_current')

def __init__(self):
super(Count, self).__init__(0.0)
super().__init__(0.0)

def update(self, sample, config, value, now):
sample.value += 1.0
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/histogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def counts(self):
return self._hist

def clear(self):
for i in range(self._hist):
for i in range(len(self._hist)):
self._hist[i] = 0.0
self._count = 0

Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/max_stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Max(AbstractSampledStat):
__slots__ = ('_initial_value', '_samples', '_current')

def __init__(self):
super(Max, self).__init__(float('-inf'))
super().__init__(float('-inf'))

def update(self, sample, config, value, now):
sample.value = max(sample.value, value)
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/min_stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class Min(AbstractSampledStat):
__slots__ = ('_initial_value', '_samples', '_current')

def __init__(self):
super(Min, self).__init__(float(sys.maxsize))
super().__init__(float(sys.maxsize))

def update(self, sample, config, value, now):
sample.value = min(sample.value, value)
Expand Down
6 changes: 3 additions & 3 deletions kafka/metrics/stats/percentiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Percentiles(AbstractSampledStat, AbstractCompoundStat):

def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0,
percentiles=None):
super(Percentiles, self).__init__(0.0)
super().__init__(0.0)
self._percentiles = percentiles or []
self._buckets = int(size_in_bytes / 4)
if bucketing == BucketSizing.CONSTANT:
Expand All @@ -26,7 +26,7 @@ def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0,
if min_val != 0.0:
raise ValueError('Linear bucket sizing requires min_val'
' to be 0.0.')
self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
self._bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
else:
raise ValueError('Unknown bucket type: %s' % (bucketing,))

Expand Down Expand Up @@ -71,5 +71,5 @@ def update(self, sample, config, value, time_ms):

class HistogramSample(AbstractSampledStat.Sample):
def __init__(self, scheme, now):
super(Percentiles.HistogramSample, self).__init__(0.0, now)
super().__init__(0.0, now)
self.histogram = Histogram(scheme)
2 changes: 1 addition & 1 deletion kafka/metrics/stats/rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class SampledTotal(AbstractSampledStat):
def __init__(self, initial_value=None):
if initial_value is not None:
raise ValueError('initial_value cannot be set on SampledTotal')
super(SampledTotal, self).__init__(0.0)
super().__init__(0.0)

def update(self, sample, config, value, time_ms):
sample.value += value
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/total.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class Total(AbstractMeasurableStat):
"""An un-windowed cumulative total maintained over all time."""
__slots__ = ('_total')
__slots__ = ('_total',)

def __init__(self, value=0.0):
self._total = value
Expand Down
16 changes: 12 additions & 4 deletions kafka/net/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import kafka.errors as Errors
from kafka.future import Future
from kafka.net.metrics import KafkaConnectionMetrics
from kafka.protocol.metadata import ApiVersionsRequest
from kafka.protocol.sasl import SaslAuthenticateRequest, SaslHandshakeRequest, SaslBytesRequest
from kafka.protocol.broker_version_data import BrokerVersionData
Expand Down Expand Up @@ -33,6 +34,8 @@ class KafkaConnection:
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'api_version_auto_timeout_ms': 2000,
'metrics': None,
'metric_group_prefix': '',
}

def __init__(self, net, node_id=None, broker_version_data=None, **configs):
Expand All @@ -55,6 +58,11 @@ def __init__(self, net, node_id=None, broker_version_data=None, **configs):
self.broker_version_data = broker_version_data
self._api_versions_idx = ApiVersionsRequest.max_version # version of ApiVersionsRequest to try on first connect
self._throttle_time = 0
if self.config['metrics']:
self._sensors = KafkaConnectionMetrics(
self.config['metrics'], self.config['metric_group_prefix'], node_id)
else:
self._sensors = None
self._init_future.add_errback(self.fail_in_flight_requests)
self._close_future.add_both(self.fail_in_flight_requests)

Expand Down Expand Up @@ -172,8 +180,8 @@ def data_received(self, data):
return self.close(Errors.KafkaConnectionError('Received unrecognized correlation id'))

latency_ms = (time.monotonic() - sent_time) * 1000
# if self._sensors:
# self._sensors.request_time.record(latency_ms)
if self._sensors:
self._sensors.request_time.record(latency_ms)

log.debug('%s: Response %d (%s ms): %s', self, resp_correlation_id, latency_ms, response)
self._maybe_throttle(response)
Expand Down Expand Up @@ -292,8 +300,8 @@ def close(self, error=None):

def _maybe_throttle(self, response):
throttle_time_ms = getattr(response, 'throttle_time_ms', 0)
#if self._sensors:
# self._sensors.throttle_time.record(throttle_time_ms)
if self._sensors:
self._sensors.throttle_time.record(throttle_time_ms)
if not throttle_time_ms:
return
# Client side throttling enabled in v2.0 brokers
Expand Down
12 changes: 12 additions & 0 deletions kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from .inet import create_connection
from .connection import KafkaConnection
from .metrics import KafkaManagerMetrics
from .transport import KafkaSSLTransport, KafkaTCPTransport
import kafka.errors as Errors
from kafka.protocol.broker_version_data import BrokerVersionData
Expand Down Expand Up @@ -42,6 +43,8 @@ class KafkaConnectionManager:
'socks5_proxy': None,
'api_version': None,
'api_version_auto_timeout_ms': 2000,
'metrics': None,
'metric_group_prefix': '',
}
def __init__(self, net, cluster, **configs):
self.config = copy.copy(self.DEFAULT_CONFIG)
Expand All @@ -58,6 +61,11 @@ def __init__(self, net, cluster, **configs):
self.broker_version_data = None
self._bootstrap_future = None
self._metadata_future = None
if self.config['metrics']:
self._sensors = KafkaManagerMetrics(
self.config['metrics'], self.config['metric_group_prefix'], self._conns)
else:
self._sensors = None
if self.config['api_version'] is not None:
self.broker_version_data = BrokerVersionData(self.config['api_version'])

Expand Down Expand Up @@ -190,6 +198,8 @@ async def _connect(self, node, conn):
self.update_backoff(node.node_id)
return

if self._sensors:
self._sensors.connection_created.record()
self.reset_backoff(node.node_id)
if conn.broker_version_data is not None:
if self.cluster.is_bootstrap(node.node_id):
Expand All @@ -208,6 +218,8 @@ def get_connection(self, node_id, pop_on_close=True, refresh_metadata_on_err=Tru
conn = KafkaConnection(self._net, node_id=node_id, broker_version_data=self.broker_version_data, **self.config)
if pop_on_close:
conn.close_future.add_both(lambda _: self._conns.pop(node.node_id, None))
if self._sensors:
conn.close_future.add_both(lambda _: self._sensors.connection_closed.record())
if refresh_metadata_on_err:
conn.close_future.add_errback(lambda _: self.cluster.request_update())
self._conns[node_id] = conn
Expand Down
152 changes: 152 additions & 0 deletions kafka/net/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""Metrics for kafka.net connection manager and connections.

Mirrors the metrics from kafka/client_async.py (KafkaClientMetrics)
and kafka/conn.py (BrokerConnectionMetrics).
"""
import logging

from kafka.metrics.measurable import AnonMeasurable
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.metrics.stats.rate import TimeUnit

log = logging.getLogger(__name__)


class KafkaManagerMetrics:
"""Metrics for KafkaConnectionManager (equivalent to KafkaClientMetrics).
Note that kafka.net does not track select_time or io_time.
"""
def __init__(self, metrics, metric_group_prefix, conns):
self.metrics = metrics
metric_group_name = metric_group_prefix + '-metrics'

self.connection_closed = metrics.sensor('connections-closed')
self.connection_closed.add(metrics.metric_name(
'connection-close-rate', metric_group_name,
'Connections closed per second in the window.'), Rate())

self.connection_created = metrics.sensor('connections-created')
self.connection_created.add(metrics.metric_name(
'connection-creation-rate', metric_group_name,
'New connections established per second in the window.'), Rate())

metrics.add_metric(metrics.metric_name(
'connection-count', metric_group_name,
'The current number of active connections.'), AnonMeasurable(
lambda config, now: len(conns)))


class KafkaConnectionMetrics:
"""Metrics for a single KafkaConnection (equivalent to BrokerConnectionMetrics)."""
def __init__(self, metrics, metric_group_prefix, node_id):
self.metrics = metrics

# Global aggregate sensors (created once, shared across all connections)
if not metrics.get_sensor('bytes-sent-received'):
metric_group_name = metric_group_prefix + '-metrics'

bytes_transferred = metrics.sensor('bytes-sent-received')
bytes_transferred.add(metrics.metric_name(
'network-io-rate', metric_group_name,
'The average number of network operations (reads or writes) on all'
' connections per second.'), Rate(sampled_stat=Count()))

bytes_sent = metrics.sensor('bytes-sent',
parents=[bytes_transferred])
bytes_sent.add(metrics.metric_name(
'outgoing-byte-rate', metric_group_name,
'The average number of outgoing bytes sent per second to all'
' servers.'), Rate())
bytes_sent.add(metrics.metric_name(
'request-rate', metric_group_name,
'The average number of requests sent per second.'),
Rate(sampled_stat=Count()))
bytes_sent.add(metrics.metric_name(
'request-size-avg', metric_group_name,
'The average size of all requests in the window.'), Avg())
bytes_sent.add(metrics.metric_name(
'request-size-max', metric_group_name,
'The maximum size of any request sent in the window.'), Max())

bytes_received = metrics.sensor('bytes-received',
parents=[bytes_transferred])
bytes_received.add(metrics.metric_name(
'incoming-byte-rate', metric_group_name,
'Bytes/second read off all sockets'), Rate())
bytes_received.add(metrics.metric_name(
'response-rate', metric_group_name,
'Responses received sent per second.'),
Rate(sampled_stat=Count()))

request_latency = metrics.sensor('request-latency')
request_latency.add(metrics.metric_name(
'request-latency-avg', metric_group_name,
'The average request latency in ms.'), Avg())
request_latency.add(metrics.metric_name(
'request-latency-max', metric_group_name,
'The maximum request latency in ms.'), Max())

throttle_time = metrics.sensor('throttle-time')
throttle_time.add(metrics.metric_name(
'throttle-time-avg', metric_group_name,
'The average throttle time in ms.'), Avg())
throttle_time.add(metrics.metric_name(
'throttle-time-max', metric_group_name,
'The maximum throttle time in ms.'), Max())

# Per-node sensors (created per connection, parent to global sensors)
if not metrics.get_sensor(f'node-{node_id}.bytes-sent'):
metric_group_name = f'{metric_group_prefix}-node-metrics.node-{node_id}'

bytes_sent = metrics.sensor(
f'node-{node_id}.bytes-sent',
parents=[metrics.get_sensor('bytes-sent')])
bytes_sent.add(metrics.metric_name(
'outgoing-byte-rate', metric_group_name,
'The average number of outgoing bytes sent per second.'), Rate())
bytes_sent.add(metrics.metric_name(
'request-rate', metric_group_name,
'The average number of requests sent per second.'),
Rate(sampled_stat=Count()))
bytes_sent.add(metrics.metric_name(
'request-size-avg', metric_group_name,
'The average size of all requests in the window.'), Avg())
bytes_sent.add(metrics.metric_name(
'request-size-max', metric_group_name,
'The maximum size of any request sent in the window.'), Max())

bytes_received = metrics.sensor(
f'node-{node_id}.bytes-received',
parents=[metrics.get_sensor('bytes-received')])
bytes_received.add(metrics.metric_name(
'incoming-byte-rate', metric_group_name,
'Bytes/second read off node-connection socket'), Rate())
bytes_received.add(metrics.metric_name(
'response-rate', metric_group_name,
'The average number of responses received per second.'),
Rate(sampled_stat=Count()))

request_time = metrics.sensor(
f'node-{node_id}.latency',
parents=[metrics.get_sensor('request-latency')])
request_time.add(metrics.metric_name(
'request-latency-avg', metric_group_name,
'The average request latency in ms.'), Avg())
request_time.add(metrics.metric_name(
'request-latency-max', metric_group_name,
'The maximum request latency in ms.'), Max())

throttle_time = metrics.sensor(
f'node-{node_id}.throttle',
parents=[metrics.get_sensor('throttle-time')])
throttle_time.add(metrics.metric_name(
'throttle-time-avg', metric_group_name,
'The average throttle time in ms.'), Avg())
throttle_time.add(metrics.metric_name(
'throttle-time-max', metric_group_name,
'The maximum throttle time in ms.'), Max())

self.bytes_sent = metrics.sensor(f'node-{node_id}.bytes-sent')
self.bytes_received = metrics.sensor(f'node-{node_id}.bytes-received')
self.request_time = metrics.sensor(f'node-{node_id}.latency')
self.throttle_time = metrics.sensor(f'node-{node_id}.throttle')
Loading
Loading