Skip to content

Commit

Permalink
Add mostrecent aggregation to Gauge
Browse files Browse the repository at this point in the history
In the multiprocess mode, the process that exposes the metrics needs to
aggregate the samples from other processes. Gauge metric allows users to
choose the aggregation mode. This implements 'mostrecent' (and
'livemostrecent') mode where the last observed value is exposed.

In order to support this, the file format is expanded to store the
timestamps in addition to the values. The stored timestamps are read by
the reader process and it's used to find the latest value.

Closes #847

Consideration on the atomicity:

Previously, mmap_dict.py had a comment saying "We assume that reading
from an 8 byte aligned value is atomic". With this change, the value
write becomes a 16 bytes 8-byte aligned write. The code author tried to
find a basis on the original assumption, but couldn't find any.
According to write(2), **if a file descriptor is shared**, the write
becomes atomic. However, we do not share the file descriptors in the
current architecture.

Considering that Ruby implementation also does the same and hadn't seen
an issue with it, this write atomicity problem might be practically not
an issue.

See also:

* prometheus/client_ruby#172

  The approach and naming are taken from client_ruby.

* https://github.com/prometheus/client_golang/blob/v1.17.0/prometheus/metric.go#L149-L161

  client_golang has an API for setting timestamp already. It explains
  the use case for the timestamp beyond the client-local aggregation. In
  order to support the same use case in Python, further changes are
  needed.

Signed-off-by: Masaya Suzuki <draftcode@gmail.com>
  • Loading branch information
draftcode committed Oct 19, 2023
1 parent 249490e commit 172733f
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 41 deletions.
21 changes: 17 additions & 4 deletions prometheus_client/metrics.py
Expand Up @@ -346,7 +346,8 @@ def f():
d.set_function(lambda: len(my_dict))
"""
_type = 'gauge'
_MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'))
_MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'))
_MOST_RECENT_MODES = frozenset(('mostrecent', 'livemostrecent'))

def __init__(self,
name: str,
Expand All @@ -357,7 +358,7 @@ def __init__(self,
unit: str = '',
registry: Optional[CollectorRegistry] = REGISTRY,
_labelvalues: Optional[Sequence[str]] = None,
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'] = 'all',
multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all',
):
self._multiprocess_mode = multiprocess_mode
if multiprocess_mode not in self._MULTIPROC_MODES:
Expand All @@ -382,18 +383,30 @@ def _metric_init(self) -> None:

def inc(self, amount: float = 1) -> None:
"""Increment gauge by the given amount."""
if self._multiprocess_mode in self._MOST_RECENT_MODES:
raise RuntimeError("inc must not be used with the mostrecent mode")
self._raise_if_not_observable()
self._value.inc(amount)

def dec(self, amount: float = 1) -> None:
"""Decrement gauge by the given amount."""
if self._multiprocess_mode in self._MOST_RECENT_MODES:
raise RuntimeError("dec must not be used with the mostrecent mode")
self._raise_if_not_observable()
self._value.inc(-amount)

def set(self, value: float) -> None:
"""Set gauge to the given value."""
"""Set gauge to the given value.
This can take an optional timestamp to indicate when the sample was
taken. This is used for the most_recent aggregation and Prometheus
exposition.
"""
self._raise_if_not_observable()
self._value.set(float(value))
if self._multiprocess_mode in self._MOST_RECENT_MODES:
self._value.set(float(value), timestamp=time.time())
else:
self._value.set(float(value))

def set_to_current_time(self) -> None:
"""Set gauge to the current unixtime."""
Expand Down
40 changes: 20 additions & 20 deletions prometheus_client/mmap_dict.py
Expand Up @@ -6,25 +6,26 @@

_INITIAL_MMAP_SIZE = 1 << 16
_pack_integer_func = struct.Struct(b'i').pack
_pack_double_func = struct.Struct(b'd').pack
_pack_two_doubles_func = struct.Struct(b'dd').pack
_unpack_integer = struct.Struct(b'i').unpack_from
_unpack_double = struct.Struct(b'd').unpack_from
_unpack_two_doubles = struct.Struct(b'dd').unpack_from


# struct.pack_into has atomicity issues because it will temporarily write 0 into
# the mmap, resulting in false reads to 0 when experiencing a lot of writes.
# Using direct assignment solves this issue.

def _pack_double(data, pos, value):
data[pos:pos + 8] = _pack_double_func(value)

def _pack_two_doubles(data, pos, value, timestamp):
data[pos:pos + 16] = _pack_two_doubles_func(value, timestamp)


def _pack_integer(data, pos, value):
data[pos:pos + 4] = _pack_integer_func(value)


def _read_all_values(data, used=0):
"""Yield (key, value, pos). No locking is performed."""
"""Yield (key, value, timestamp, pos). No locking is performed."""

if used <= 0:
# If not valid `used` value is passed in, read it from the file.
Expand All @@ -41,9 +42,9 @@ def _read_all_values(data, used=0):
encoded_key = data[pos:pos + encoded_len]
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
pos += padded_len
value = _unpack_double(data, pos)[0]
yield encoded_key.decode('utf-8'), value, pos
pos += 8
value, timestamp = _unpack_two_doubles(data, pos)
yield encoded_key.decode('utf-8'), value, timestamp, pos
pos += 16


class MmapedDict:
Expand All @@ -53,7 +54,8 @@ class MmapedDict:
Then 4 bytes of padding.
There's then a number of entries, consisting of a 4 byte int which is the
size of the next field, a utf-8 encoded string key, padding to a 8 byte
alignment, and then a 8 byte float which is the value.
alignment, and then a 8 byte float which is the value and a 8 byte float
which is a UNIX timestamp in seconds.
Not thread safe.
"""
Expand All @@ -76,7 +78,7 @@ def __init__(self, filename, read_mode=False):
_pack_integer(self._m, 0, self._used)
else:
if not read_mode:
for key, _, pos in self._read_all_values():
for key, _, _, pos in self._read_all_values():
self._positions[key] = pos

@staticmethod
Expand All @@ -95,7 +97,7 @@ def _init_value(self, key):
encoded = key.encode('utf-8')
# Pad to be 8-byte aligned.
padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8))
value = struct.pack(f'i{len(padded)}sd'.encode(), len(encoded), padded, 0.0)
value = struct.pack(f'i{len(padded)}sdd'.encode(), len(encoded), padded, 0.0, 0.0)
while self._used + len(value) > self._capacity:
self._capacity *= 2
self._f.truncate(self._capacity)
Expand All @@ -105,30 +107,28 @@ def _init_value(self, key):
# Update how much space we've used.
self._used += len(value)
_pack_integer(self._m, 0, self._used)
self._positions[key] = self._used - 8
self._positions[key] = self._used - 16

def _read_all_values(self):
"""Yield (key, value, pos). No locking is performed."""
return _read_all_values(data=self._m, used=self._used)

def read_all_values(self):
"""Yield (key, value). No locking is performed."""
for k, v, _ in self._read_all_values():
yield k, v
"""Yield (key, value, timestamp). No locking is performed."""
for k, v, ts, _ in self._read_all_values():
yield k, v, ts

def read_value(self, key):
if key not in self._positions:
self._init_value(key)
pos = self._positions[key]
# We assume that reading from an 8 byte aligned value is atomic
return _unpack_double(self._m, pos)[0]
return _unpack_two_doubles(self._m, pos)

def write_value(self, key, value):
def write_value(self, key, value, timestamp):
if key not in self._positions:
self._init_value(key)
pos = self._positions[key]
# We assume that writing to an 8 byte aligned value is atomic
_pack_double(self._m, pos, value)
_pack_two_doubles(self._m, pos, value, timestamp)

def close(self):
if self._f:
Expand Down
11 changes: 9 additions & 2 deletions prometheus_client/multiprocess.py
Expand Up @@ -68,7 +68,7 @@ def _parse_key(key):
# the file is missing
continue
raise
for key, value, _ in file_values:
for key, value, timestamp, _ in file_values:
metric_name, name, labels, labels_key, help_text = _parse_key(key)

metric = metrics.get(metric_name)
Expand All @@ -79,7 +79,7 @@ def _parse_key(key):
if typ == 'gauge':
pid = parts[2][:-3]
metric._multiprocess_mode = parts[1]
metric.add_sample(name, labels_key + (('pid', pid),), value)
metric.add_sample(name, labels_key + (('pid', pid),), value, timestamp)
else:
# The duplicates and labels are fixed in the next for.
metric.add_sample(name, labels_key, value)
Expand All @@ -89,6 +89,7 @@ def _parse_key(key):
def _accumulate_metrics(metrics, accumulate):
for metric in metrics.values():
samples = defaultdict(float)
sample_timestamps = defaultdict(float)
buckets = defaultdict(lambda: defaultdict(float))
samples_setdefault = samples.setdefault
for s in metric.samples:
Expand All @@ -105,6 +106,12 @@ def _accumulate_metrics(metrics, accumulate):
samples[without_pid_key] = value
elif metric._multiprocess_mode in ('sum', 'livesum'):
samples[without_pid_key] += value
elif metric._multiprocess_mode in ('mostrecent', 'livemostrecent'):
current_timestamp = sample_timestamps[without_pid_key]
timestamp = float(timestamp or 0)
if current_timestamp < timestamp:
samples[without_pid_key] = value
sample_timestamps[without_pid_key] = timestamp
else: # all/liveall
samples[(name, labels)] = value

Expand Down
12 changes: 7 additions & 5 deletions prometheus_client/values.py
Expand Up @@ -19,7 +19,7 @@ def inc(self, amount):
with self._lock:
self._value += amount

def set(self, value):
def set(self, value, timestamp=None):
with self._lock:
self._value = value

Expand Down Expand Up @@ -82,7 +82,7 @@ def __reset(self):
files[file_prefix] = MmapedDict(filename)
self._file = files[file_prefix]
self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
self._value = self._file.read_value(self._key)
self._value, self._timestamp = self._file.read_value(self._key)

def __check_for_pid_change(self):
actual_pid = process_identifier()
Expand All @@ -99,13 +99,15 @@ def inc(self, amount):
with lock:
self.__check_for_pid_change()
self._value += amount
self._file.write_value(self._key, self._value)
self._timestamp = 0.0
self._file.write_value(self._key, self._value, self._timestamp)

def set(self, value):
def set(self, value, timestamp=None):
with lock:
self.__check_for_pid_change()
self._value = value
self._file.write_value(self._key, self._value)
self._timestamp = timestamp or 0.0
self._file.write_value(self._key, self._value, self._timestamp)

def set_exemplar(self, exemplar):
# TODO: Implement exemplars for multiprocess mode.
Expand Down
40 changes: 30 additions & 10 deletions tests/test_multiprocess.py
Expand Up @@ -185,6 +185,26 @@ def test_gauge_livesum(self):
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(2, self.registry.get_sample_value('g'))

def test_gauge_mostrecent(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent')
values.ValueClass = MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent')
g2.set(2)
g1.set(1)
self.assertEqual(1, self.registry.get_sample_value('g'))
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(1, self.registry.get_sample_value('g'))

def test_gauge_livemostrecent(self):
g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent')
values.ValueClass = MultiProcessValue(lambda: 456)
g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent')
g2.set(2)
g1.set(1)
self.assertEqual(1, self.registry.get_sample_value('g'))
mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR'])
self.assertEqual(2, self.registry.get_sample_value('g'))

def test_namespace_subsystem(self):
c1 = Counter('c', 'help', registry=None, namespace='ns', subsystem='ss')
c1.inc(1)
Expand Down Expand Up @@ -369,28 +389,28 @@ def setUp(self):
self.d = mmap_dict.MmapedDict(self.tempfile)

def test_process_restart(self):
self.d.write_value('abc', 123.0)
self.d.write_value('abc', 123.0, 987.0)
self.d.close()
self.d = mmap_dict.MmapedDict(self.tempfile)
self.assertEqual(123, self.d.read_value('abc'))
self.assertEqual([('abc', 123.0)], list(self.d.read_all_values()))
self.assertEqual((123, 987.0), self.d.read_value('abc'))
self.assertEqual([('abc', 123.0, 987.0)], list(self.d.read_all_values()))

def test_expansion(self):
key = 'a' * mmap_dict._INITIAL_MMAP_SIZE
self.d.write_value(key, 123.0)
self.assertEqual([(key, 123.0)], list(self.d.read_all_values()))
self.d.write_value(key, 123.0, 987.0)
self.assertEqual([(key, 123.0, 987.0)], list(self.d.read_all_values()))

def test_multi_expansion(self):
key = 'a' * mmap_dict._INITIAL_MMAP_SIZE * 4
self.d.write_value('abc', 42.0)
self.d.write_value(key, 123.0)
self.d.write_value('def', 17.0)
self.d.write_value('abc', 42.0, 987.0)
self.d.write_value(key, 123.0, 876.0)
self.d.write_value('def', 17.0, 765.0)
self.assertEqual(
[('abc', 42.0), (key, 123.0), ('def', 17.0)],
[('abc', 42.0, 987.0), (key, 123.0, 876.0), ('def', 17.0, 765.0)],
list(self.d.read_all_values()))

def test_corruption_detected(self):
self.d.write_value('abc', 42.0)
self.d.write_value('abc', 42.0, 987.0)
# corrupt the written data
self.d._m[8:16] = b'somejunk'
with self.assertRaises(RuntimeError):
Expand Down

0 comments on commit 172733f

Please sign in to comment.