Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/content/multiprocess/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,45 @@ from prometheus_client import Gauge
# Example gauge
IN_PROGRESS = Gauge("inprogress_requests", "help", multiprocess_mode='livesum')
```

**5. Customizing metric values**:

It's possible to customize the behavior of metric values by providing your own implementation of the `ValueClass`. This is useful if you want to add logging, custom synchronization, or change the data storage mechanism.

The `MmapedValue` and `MutexValue` classes are available in `prometheus_client.values` for this purpose. These are top-level classes, which makes it easy to inherit from them and override their methods.

To provide a custom `ValueClass`, set the `PROMETHEUS_VALUE_CLASS` environment variable to the full Python path of your class (e.g., `myapp.custom_values.MyValueClass`).

The class should inherit from `prometheus_client.values.MutexValue` (for single-process applications) or `prometheus_client.values.MmapedValue` (for multiprocess applications) to reuse the existing logic.

#### Example: Custom Mmaped Value

If you're using multiprocess mode and want to override the default increment behavior:

```python
# myapp/custom_values.py
from prometheus_client.values import MmapedValue

class MyMmapedValue(MmapedValue):
def inc(self, amount):
print(f"Incrementing metric by {amount}")
# Always call the superclass method to ensure the value is
# correctly stored and shared state is handled.
super().inc(amount)
```

Then, set the environment variable:

```bash
export PROMETHEUS_VALUE_CLASS=myapp.custom_values.MyMmapedValue
```

#### Behavior and Requirements:
- The environment variable must be set before any metric is instantiated. Therefore, preferrably, before python process start.
- The path must be a valid Python path to a class (including the class name).
- If the class cannot be imported, an `ImportError` will be raised during initialization.
- By default, `prometheus_client` uses `MmapedValue` if `PROMETHEUS_MULTIPROC_DIR` is set, and `MutexValue` otherwise.

**6. Advanced Customization with `MultiProcessValue`**:

For specialized use cases where you need a different process identifier than `os.getpid()`, you can use the `MultiProcessValue(process_identifier)` factory function. This returns a subclass of `MmapedValue` that uses the provided function to identify the process. Note that this cannot be set via the `PROMETHEUS_VALUE_CLASS` environment variable.
180 changes: 98 additions & 82 deletions prometheus_client/values.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import importlib
import os
from threading import Lock
import warnings
Expand Down Expand Up @@ -36,6 +37,82 @@ def get_exemplar(self):
return self._exemplar


class MmapedValue:
"""A float protected by a mutex backed by a per-process mmaped file."""

_multiprocess = True
_files = {}
_values = []
_pid = {'value': os.getpid()}
_lock = Lock()
_process_identifier = staticmethod(os.getpid)

def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode='', **kwargs):
self._params = typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode
# This deprecation warning can go away in a few releases when removing the compatibility
if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ:
os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir']
warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning)
with self._lock:
self.__check_for_pid_change()
self.__reset()
self._values.append(self)

def __reset(self):
typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode = self._params
if typ == 'gauge':
file_prefix = typ + '_' + multiprocess_mode
else:
file_prefix = typ
if file_prefix not in self._files:
filename = os.path.join(
os.environ.get('PROMETHEUS_MULTIPROC_DIR'),
'{}_{}.db'.format(file_prefix, self._pid['value']))

self._files[file_prefix] = MmapedDict(filename)
self._file = self._files[file_prefix]
self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
self._value, self._timestamp = self._file.read_value(self._key)

def __check_for_pid_change(self):
actual_pid = self._process_identifier()
if self._pid['value'] != actual_pid:
self._pid['value'] = actual_pid
# There has been a fork(), reset all the values.
for f in self._files.values():
f.close()
self._files.clear()
for value in self._values:
value.__reset()

def inc(self, amount):
with self._lock:
self.__check_for_pid_change()
self._value += amount
self._timestamp = 0.0
self._file.write_value(self._key, self._value, self._timestamp)

def set(self, value, timestamp=None):
with self._lock:
self.__check_for_pid_change()
self._value = value
self._timestamp = timestamp or 0.0
self._file.write_value(self._key, self._value, self._timestamp)

def set_exemplar(self, exemplar):
# TODO: Implement exemplars for multiprocess mode.
return

def get(self):
with self._lock:
self.__check_for_pid_change()
return self._value

def get_exemplar(self):
# TODO: Implement exemplars for multiprocess mode.
return None


def MultiProcessValue(process_identifier=os.getpid):
"""Returns a MmapedValue class based on a process_identifier function.

Expand All @@ -44,96 +121,35 @@ def MultiProcessValue(process_identifier=os.getpid):

Using a different function than the default 'os.getpid' is at your own risk.
"""
files = {}
values = []
pid = {'value': process_identifier()}
# Use a single global lock when in multi-processing mode
# as we presume this means there is no threading going on.
# This avoids the need to also have mutexes in __MmapDict.
lock = Lock()

class MmapedValue:
"""A float protected by a mutex backed by a per-process mmaped file."""

_multiprocess = True

def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode='', **kwargs):
self._params = typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode
# This deprecation warning can go away in a few releases when removing the compatibility
if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ:
os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir']
warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning)
with lock:
self.__check_for_pid_change()
self.__reset()
values.append(self)

def __reset(self):
typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode = self._params
if typ == 'gauge':
file_prefix = typ + '_' + multiprocess_mode
else:
file_prefix = typ
if file_prefix not in files:
filename = os.path.join(
os.environ.get('PROMETHEUS_MULTIPROC_DIR'),
'{}_{}.db'.format(file_prefix, pid['value']))

files[file_prefix] = MmapedDict(filename)
self._file = files[file_prefix]
self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
self._value, self._timestamp = self._file.read_value(self._key)

def __check_for_pid_change(self):
actual_pid = process_identifier()
if pid['value'] != actual_pid:
pid['value'] = actual_pid
# There has been a fork(), reset all the values.
for f in files.values():
f.close()
files.clear()
for value in values:
value.__reset()

def inc(self, amount):
with lock:
self.__check_for_pid_change()
self._value += amount
self._timestamp = 0.0
self._file.write_value(self._key, self._value, self._timestamp)

def set(self, value, timestamp=None):
with lock:
self.__check_for_pid_change()
self._value = value
self._timestamp = timestamp or 0.0
self._file.write_value(self._key, self._value, self._timestamp)

def set_exemplar(self, exemplar):
# TODO: Implement exemplars for multiprocess mode.
return

def get(self):
with lock:
self.__check_for_pid_change()
return self._value

def get_exemplar(self):
# TODO: Implement exemplars for multiprocess mode.
return None

return MmapedValue
class _MmapedValue(MmapedValue):
_files = {}
_values = []
_pid = {'value': process_identifier()}
_lock = Lock()
_process_identifier = staticmethod(process_identifier)

return _MmapedValue


def get_value_class():
# Should we enable multi-process mode?
# This needs to be chosen before the first metric is constructed,
# and as that may be in some arbitrary library the user/admin has
# no control over we use an environment variable.
value_class_path = os.environ.get('PROMETHEUS_VALUE_CLASS')
if value_class_path:
if '.' not in value_class_path:
raise ImportError(f"PROMETHEUS_VALUE_CLASS must be a full python path (e.g. module.ClassName), got '{value_class_path}'")
try:
module_path, class_name = value_class_path.rsplit('.', 1)
module = importlib.import_module(module_path)
return getattr(module, class_name)
except (ImportError, AttributeError) as e:
raise ImportError(f"Could not import PROMETHEUS_VALUE_CLASS '{value_class_path}': {e}") from None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise ImportError(f"Could not import PROMETHEUS_VALUE_CLASS '{value_class_path}': {e}") from None
raise ImportError(f"Could not import PROMETHEUS_VALUE_CLASS '{value_class_path}'") from e


if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
return MultiProcessValue()
else:
return MutexValue
return MmapedValue
return MutexValue


ValueClass = get_value_class()
90 changes: 90 additions & 0 deletions tests/e2e/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import sys

# Use LockedMmapedValue for this server
os.environ['PROMETHEUS_VALUE_CLASS'] = 'prometheus_client.values.LockedMmapedValue'

import http.server
import json
from urllib.parse import urlparse, parse_qs
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, generate_latest, values
from prometheus_client.multiprocess import MultiProcessAggregateCollector

# Define metrics at module level
C = Counter('c', 'test counter', ['l'])
G_SUM = Gauge('g_sum', 'test gauge sum', ['l'], multiprocess_mode='sum')
G_MAX = Gauge('g_max', 'test gauge max', ['l'], multiprocess_mode='max')
G_MIN = Gauge('g_min', 'test gauge min', ['l'], multiprocess_mode='min')
G_MOSTRECENT = Gauge('g_mostrecent', 'test gauge mostrecent', ['l'], multiprocess_mode='mostrecent')
G_ALL = Gauge('g_all', 'test gauge all', ['l'], multiprocess_mode='all')
G_LIVESUM = Gauge('g_livesum', 'test gauge livesum', ['l'], multiprocess_mode='livesum')
G_LIVEMAX = Gauge('g_livemax', 'test gauge livemax', ['l'], multiprocess_mode='livemax')
G_LIVEMIN = Gauge('g_livemin', 'test gauge livemin', ['l'], multiprocess_mode='livemin')
G_LIVEMOSTRECENT = Gauge('g_livemostrecent', 'test gauge livemostrecent', ['l'], multiprocess_mode='livemostrecent')
G_LIVEALL = Gauge('g_liveall', 'test gauge liveall', ['l'], multiprocess_mode='liveall')
H = Histogram('h', 'test histogram', ['l'], buckets=(1.0, 5.0, 10.0))

METRICS = {
'c': C,
'g_sum': G_SUM,
'g_max': G_MAX,
'g_min': G_MIN,
'g_mostrecent': G_MOSTRECENT,
'g_all': G_ALL,
'g_livesum': G_LIVESUM,
'g_livemax': G_LIVEMAX,
'g_livemin': G_LIVEMIN,
'g_livemostrecent': G_LIVEMOSTRECENT,
'g_liveall': G_LIVEALL,
'h': H,
}

class MetricHandler(http.server.BaseHTTPRequestHandler):
def send_ok(self, data=b'OK', content_type='text/plain'):
self.send_response(200)
self.send_header('Content-Type', content_type)
self.end_headers()
self.wfile.write(data)

def send_error(self, code=404):
self.send_response(code)
self.end_headers()

def do_GET(self):
parsed_url = urlparse(self.path)
query = parse_qs(parsed_url.query)
path = parsed_url.path

if path == '/metrics':
registry = CollectorRegistry()
MultiProcessAggregateCollector(registry)
self.send_ok(generate_latest(registry))
elif path in ('/inc', '/set', '/observe'):
name = query.get('name', [None])[0]
labels_json = query.get('labels', ['{}'])[0]
labels = json.loads(labels_json)
value = float(query.get('value', query.get('amount', [1]))[0])

if name not in METRICS:
self.send_error(400)
return

m = METRICS[name]
metric_with_labels = m.labels(**labels) if labels else m

if path == '/inc':
metric_with_labels.inc(value)
elif path == '/set':
metric_with_labels.set(value)
elif path == '/observe':
metric_with_labels.observe(value)

self.send_ok()
else:
self.send_error()

if __name__ == '__main__':
port = int(sys.argv[1])
server = http.server.HTTPServer(('127.0.0.1', port), MetricHandler)
print(f'Starting server on port {port}')
server.serve_forever()
Loading