Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mostrecent aggregation to Gauge #967

Merged
merged 1 commit into from Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -711,9 +711,10 @@ Gauges have several modes they can run in, which can be selected with the `multi
- 'min': Return a single timeseries that is the minimum of the values of all processes (alive or dead).
- 'max': Return a single timeseries that is the maximum of the values of all processes (alive or dead).
- 'sum': Return a single timeseries that is the sum of the values of all processes (alive or dead).
- 'mostrecent': Return a single timeseries that is the most recent value among all processes (alive or dead).

Prepend 'live' to the beginning of the mode to return the same result but only considering living processes
(e.g., 'liveall, 'livesum', 'livemax', 'livemin').
(e.g., 'liveall, 'livesum', 'livemax', 'livemin', 'livemostrecent').

```python
from prometheus_client import Gauge
Expand Down
15 changes: 12 additions & 3 deletions prometheus_client/metrics.py
Expand Up @@ -346,7 +346,8 @@ def f():
d.set_function(lambda: len(my_dict))
"""
_type = 'gauge'
_MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'))
_MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'))
_MOST_RECENT_MODES = frozenset(('mostrecent', 'livemostrecent'))

def __init__(self,
name: str,
Expand All @@ -357,7 +358,7 @@ def __init__(self,
unit: str = '',
registry: Optional[CollectorRegistry] = REGISTRY,
_labelvalues: Optional[Sequence[str]] = None,
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'] = 'all',
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all',
):
self._multiprocess_mode = multiprocess_mode
if multiprocess_mode not in self._MULTIPROC_MODES:
Expand All @@ -373,6 +374,7 @@ def __init__(self,
_labelvalues=_labelvalues,
)
self._kwargs['multiprocess_mode'] = self._multiprocess_mode
self._is_most_recent = self._multiprocess_mode in self._MOST_RECENT_MODES

def _metric_init(self) -> None:
self._value = values.ValueClass(
Expand All @@ -382,18 +384,25 @@ def _metric_init(self) -> None:

def inc(self, amount: float = 1) -> None:
"""Increment gauge by the given amount."""
if self._is_most_recent:
raise RuntimeError("inc must not be used with the mostrecent mode")
self._raise_if_not_observable()
self._value.inc(amount)

def dec(self, amount: float = 1) -> None:
"""Decrement gauge by the given amount."""
if self._is_most_recent:
raise RuntimeError("dec must not be used with the mostrecent mode")
self._raise_if_not_observable()
self._value.inc(-amount)

def set(self, value: float) -> None:
"""Set gauge to the given value."""
self._raise_if_not_observable()
self._value.set(float(value))
if self._is_most_recent:
self._value.set(float(value), timestamp=time.time())
else:
self._value.set(float(value))

def set_to_current_time(self) -> None:
"""Set gauge to the current unixtime."""
Expand Down
40 changes: 20 additions & 20 deletions prometheus_client/mmap_dict.py
Expand Up @@ -6,25 +6,26 @@

_INITIAL_MMAP_SIZE = 1 << 16
_pack_integer_func = struct.Struct(b'i').pack
_pack_double_func = struct.Struct(b'd').pack
_pack_two_doubles_func = struct.Struct(b'dd').pack
_unpack_integer = struct.Struct(b'i').unpack_from
_unpack_double = struct.Struct(b'd').unpack_from
_unpack_two_doubles = struct.Struct(b'dd').unpack_from


# struct.pack_into has atomicity issues because it will temporarily write 0 into
# the mmap, resulting in false reads to 0 when experiencing a lot of writes.
# Using direct assignment solves this issue.

def _pack_double(data, pos, value):
data[pos:pos + 8] = _pack_double_func(value)

def _pack_two_doubles(data, pos, value, timestamp):
data[pos:pos + 16] = _pack_two_doubles_func(value, timestamp)


def _pack_integer(data, pos, value):
data[pos:pos + 4] = _pack_integer_func(value)


def _read_all_values(data, used=0):
"""Yield (key, value, pos). No locking is performed."""
"""Yield (key, value, timestamp, pos). No locking is performed."""

if used <= 0:
# If not valid `used` value is passed in, read it from the file.
Expand All @@ -41,9 +42,9 @@ def _read_all_values(data, used=0):
encoded_key = data[pos:pos + encoded_len]
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
pos += padded_len
value = _unpack_double(data, pos)[0]
yield encoded_key.decode('utf-8'), value, pos
pos += 8
value, timestamp = _unpack_two_doubles(data, pos)
yield encoded_key.decode('utf-8'), value, timestamp, pos
pos += 16


class MmapedDict:
Expand All @@ -53,7 +54,8 @@ class MmapedDict:
Then 4 bytes of padding.
There's then a number of entries, consisting of a 4 byte int which is the
size of the next field, a utf-8 encoded string key, padding to a 8 byte
alignment, and then a 8 byte float which is the value.
alignment, and then a 8 byte float which is the value and a 8 byte float
which is a UNIX timestamp in seconds.

Not thread safe.
"""
Expand All @@ -76,7 +78,7 @@ def __init__(self, filename, read_mode=False):
_pack_integer(self._m, 0, self._used)
else:
if not read_mode:
for key, _, pos in self._read_all_values():
for key, _, _, pos in self._read_all_values():
self._positions[key] = pos

@staticmethod
Expand All @@ -95,7 +97,7 @@ def _init_value(self, key):
encoded = key.encode('utf-8')
# Pad to be 8-byte aligned.
padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
value = struct.pack(f'i{len(padded)}sd'.encode(), len(encoded), padded, 0.0)
value = struct.pack(f'i{len(padded)}sdd'.encode(), len(encoded), padded, 0.0, 0.0)
while self._used + len(value) > self._capacity:
self._capacity *= 2
self._f.truncate(self._capacity)
Expand All @@ -105,30 +107,28 @@ def _init_value(self, key):
# Update how much space we've used.
self._used += len(value)
_pack_integer(self._m, 0, self._used)
self._positions[key] = self._used - 8
self._positions[key] = self._used - 16

def _read_all_values(self):
"""Yield (key, value, pos). No locking is performed."""
return _read_all_values(data=self._m, used=self._used)

def read_all_values(self):
"""Yield (key, value). No locking is performed."""
for k, v, _ in self._read_all_values():
yield k, v
"""Yield (key, value, timestamp). No locking is performed."""
for k, v, ts, _ in self._read_all_values():
yield k, v, ts

def read_value(self, key):
if key not in self._positions:
self._init_value(key)
pos = self._positions[key]
# We assume that reading from an 8 byte aligned value is atomic
return _unpack_double(self._m, pos)[0]
return _unpack_two_doubles(self._m, pos)

def write_value(self, key, value):
def write_value(self, key, value, timestamp):
if key not in self._positions:
self._init_value(key)
pos = self._positions[key]
# We assume that writing to an 8 byte aligned value is atomic
_pack_double(self._m, pos, value)
_pack_two_doubles(self._m, pos, value, timestamp)

def close(self):
if self._f:
Expand Down
11 changes: 9 additions & 2 deletions prometheus_client/multiprocess.py
Expand Up @@ -68,7 +68,7 @@ def _parse_key(key):
# the file is missing
continue
raise
for key, value, _ in file_values:
for key, value, timestamp, _ in file_values:
metric_name, name, labels, labels_key, help_text = _parse_key(key)

metric = metrics.get(metric_name)
Expand All @@ -79,7 +79,7 @@ def _parse_key(key):
if typ == 'gauge':
pid = parts[2][:-3]
metric._multiprocess_mode = parts[1]
metric.add_sample(name, labels_key + (('pid', pid),), value)
metric.add_sample(name, labels_key + (('pid', pid),), value, timestamp)
else:
# The duplicates and labels are fixed in the next for.
metric.add_sample(name, labels_key, value)
Expand All @@ -89,6 +89,7 @@ def _parse_key(key):
def _accumulate_metrics(metrics, accumulate):
for metric in metrics.values():
samples = defaultdict(float)
sample_timestamps = defaultdict(float)
buckets = defaultdict(lambda: defaultdict(float))
samples_setdefault = samples.setdefault
for s in metric.samples:
Expand All @@ -105,6 +106,12 @@ def _accumulate_metrics(metrics, accumulate):
samples[without_pid_key] = value
elif metric._multiprocess_mode in ('sum', 'livesum'):
samples[without_pid_key] += value
elif metric._multiprocess_mode in ('mostrecent', 'livemostrecent'):
current_timestamp = sample_timestamps[without_pid_key]
timestamp = float(timestamp or 0)
if current_timestamp < timestamp:
samples[without_pid_key] = value
sample_timestamps[without_pid_key] = timestamp
else: # all/liveall
samples[(name, labels)] = value

Expand Down
12 changes: 7 additions & 5 deletions prometheus_client/values.py
Expand Up @@ -19,7 +19,7 @@ def inc(self, amount):
with self._lock:
self._value += amount

def set(self, value):
def set(self, value, timestamp=None):
with self._lock:
self._value = value

Expand Down Expand Up @@ -82,7 +82,7 @@ def __reset(self):
files[file_prefix] = MmapedDict(filename)
self._file = files[file_prefix]
self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
self._value = self._file.read_value(self._key)
self._value, self._timestamp = self._file.read_value(self._key)

def __check_for_pid_change(self):
actual_pid = process_identifier()
Expand All @@ -99,13 +99,15 @@ def inc(self, amount):
with lock:
self.__check_for_pid_change()
self._value += amount
self._file.write_value(self._key, self._value)
self._timestamp = 0.0
self._file.write_value(self._key, self._value, self._timestamp)

def set(self, value):
def set(self, value, timestamp=None):
with lock:
self.__check_for_pid_change()
self._value = value
self._file.write_value(self._key, self._value)
self._timestamp = timestamp or 0.0
self._file.write_value(self._key, self._value, self._timestamp)

def set_exemplar(self, exemplar):
# TODO: Implement exemplars for multiprocess mode.
Expand Down
40 changes: 30 additions & 10 deletions tests/test_multiprocess.py
Expand Up @@ -185,6 +185,26 @@ def test_gauge_livesum(self):
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(2, self.registry.get_sample_value('g'))

def test_gauge_mostrecent(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent')
values.ValueClass = MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent')
g2.set(2)
g1.set(1)
self.assertEqual(1, self.registry.get_sample_value('g'))
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(1, self.registry.get_sample_value('g'))

def test_gauge_livemostrecent(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent')
values.ValueClass = MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent')
g2.set(2)
g1.set(1)
self.assertEqual(1, self.registry.get_sample_value('g'))
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(2, self.registry.get_sample_value('g'))

def test_namespace_subsystem(self):
c1 = Counter('c', 'help', registry=None, namespace='ns', subsystem='ss')
c1.inc(1)
Expand Down Expand Up @@ -369,28 +389,28 @@ def setUp(self):
self.d = mmap_dict.MmapedDict(self.tempfile)

def test_process_restart(self):
self.d.write_value('abc', 123.0)
self.d.write_value('abc', 123.0, 987.0)
self.d.close()
self.d = mmap_dict.MmapedDict(self.tempfile)
self.assertEqual(123, self.d.read_value('abc'))
self.assertEqual([('abc', 123.0)], list(self.d.read_all_values()))
self.assertEqual((123, 987.0), self.d.read_value('abc'))
self.assertEqual([('abc', 123.0, 987.0)], list(self.d.read_all_values()))

def test_expansion(self):
key = 'a' * mmap_dict._INITIAL_MMAP_SIZE
self.d.write_value(key, 123.0)
self.assertEqual([(key, 123.0)], list(self.d.read_all_values()))
self.d.write_value(key, 123.0, 987.0)
self.assertEqual([(key, 123.0, 987.0)], list(self.d.read_all_values()))

def test_multi_expansion(self):
key = 'a' * mmap_dict._INITIAL_MMAP_SIZE * 4
self.d.write_value('abc', 42.0)
self.d.write_value(key, 123.0)
self.d.write_value('def', 17.0)
self.d.write_value('abc', 42.0, 987.0)
self.d.write_value(key, 123.0, 876.0)
self.d.write_value('def', 17.0, 765.0)
self.assertEqual(
[('abc', 42.0), (key, 123.0), ('def', 17.0)],
[('abc', 42.0, 987.0), (key, 123.0, 876.0), ('def', 17.0, 765.0)],
list(self.d.read_all_values()))

def test_corruption_detected(self):
self.d.write_value('abc', 42.0)
self.d.write_value('abc', 42.0, 987.0)
# corrupt the written data
self.d._m[8:16] = b'somejunk'
with self.assertRaises(RuntimeError):
Expand Down