diff --git a/prometheus_client/metrics.py b/prometheus_client/metrics.py index 66f51fa6..4ab6e900 100644 --- a/prometheus_client/metrics.py +++ b/prometheus_client/metrics.py @@ -346,7 +346,8 @@ def f(): d.set_function(lambda: len(my_dict)) """ _type = 'gauge' - _MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum')) + _MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent')) + _MOST_RECENT_MODES = frozenset(('mostrecent', 'livemostrecent')) def __init__(self, name: str, @@ -357,7 +358,7 @@ def __init__(self, unit: str = '', registry: Optional[CollectorRegistry] = REGISTRY, _labelvalues: Optional[Sequence[str]] = None, - multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum'] = 'all', + multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all', ): self._multiprocess_mode = multiprocess_mode if multiprocess_mode not in self._MULTIPROC_MODES: @@ -373,6 +374,7 @@ def __init__(self, _labelvalues=_labelvalues, ) self._kwargs['multiprocess_mode'] = self._multiprocess_mode + self._is_most_recent = self._multiprocess_mode in self._MOST_RECENT_MODES def _metric_init(self) -> None: self._value = values.ValueClass( @@ -382,18 +384,25 @@ def _metric_init(self) -> None: def inc(self, amount: float = 1) -> None: """Increment gauge by the given amount.""" + if self._is_most_recent: + raise RuntimeError("inc must not be used with the mostrecent mode") self._raise_if_not_observable() self._value.inc(amount) def dec(self, amount: float = 1) -> None: """Decrement gauge by the given amount.""" + if self._is_most_recent: + raise RuntimeError("dec must not be used with the mostrecent mode") self._raise_if_not_observable() self._value.inc(-amount) def set(self, value: float) -> None: """Set gauge to the given value.""" self._raise_if_not_observable() - self._value.set(float(value)) + if self._multiprocess_mode in self._MOST_RECENT_MODES: + self._value.set(float(value), timestamp=time.time()) + else: + self._value.set(float(value)) def set_to_current_time(self) -> None: """Set gauge to the current unixtime.""" diff --git a/prometheus_client/mmap_dict.py b/prometheus_client/mmap_dict.py index c3de38fa..edd895cd 100644 --- a/prometheus_client/mmap_dict.py +++ b/prometheus_client/mmap_dict.py @@ -6,17 +6,18 @@ _INITIAL_MMAP_SIZE = 1 << 16 _pack_integer_func = struct.Struct(b'i').pack -_pack_double_func = struct.Struct(b'd').pack +_pack_two_doubles_func = struct.Struct(b'dd').pack _unpack_integer = struct.Struct(b'i').unpack_from -_unpack_double = struct.Struct(b'd').unpack_from +_unpack_two_doubles = struct.Struct(b'dd').unpack_from # struct.pack_into has atomicity issues because it will temporarily write 0 into # the mmap, resulting in false reads to 0 when experiencing a lot of writes. # Using direct assignment solves this issue. -def _pack_double(data, pos, value): - data[pos:pos + 8] = _pack_double_func(value) + +def _pack_two_doubles(data, pos, value, timestamp): + data[pos:pos + 16] = _pack_two_doubles_func(value, timestamp) def _pack_integer(data, pos, value): @@ -24,7 +25,7 @@ def _pack_integer(data, pos, value): def _read_all_values(data, used=0): - """Yield (key, value, pos). No locking is performed.""" + """Yield (key, value, timestamp, pos). No locking is performed.""" if used <= 0: # If not valid `used` value is passed in, read it from the file. @@ -41,9 +42,9 @@ def _read_all_values(data, used=0): encoded_key = data[pos:pos + encoded_len] padded_len = encoded_len + (8 - (encoded_len + 4) % 8) pos += padded_len - value = _unpack_double(data, pos)[0] - yield encoded_key.decode('utf-8'), value, pos - pos += 8 + value, timestamp = _unpack_two_doubles(data, pos) + yield encoded_key.decode('utf-8'), value, timestamp, pos + pos += 16 class MmapedDict: @@ -53,7 +54,8 @@ class MmapedDict: Then 4 bytes of padding. There's then a number of entries, consisting of a 4 byte int which is the size of the next field, a utf-8 encoded string key, padding to a 8 byte - alignment, and then a 8 byte float which is the value. + alignment, and then a 8 byte float which is the value and a 8 byte float + which is a UNIX timestamp in seconds. Not thread safe. """ @@ -76,7 +78,7 @@ def __init__(self, filename, read_mode=False): _pack_integer(self._m, 0, self._used) else: if not read_mode: - for key, _, pos in self._read_all_values(): + for key, _, _, pos in self._read_all_values(): self._positions[key] = pos @staticmethod @@ -95,7 +97,7 @@ def _init_value(self, key): encoded = key.encode('utf-8') # Pad to be 8-byte aligned. padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8)) - value = struct.pack(f'i{len(padded)}sd'.encode(), len(encoded), padded, 0.0) + value = struct.pack(f'i{len(padded)}sdd'.encode(), len(encoded), padded, 0.0, 0.0) while self._used + len(value) > self._capacity: self._capacity *= 2 self._f.truncate(self._capacity) @@ -105,30 +107,28 @@ def _init_value(self, key): # Update how much space we've used. self._used += len(value) _pack_integer(self._m, 0, self._used) - self._positions[key] = self._used - 8 + self._positions[key] = self._used - 16 def _read_all_values(self): """Yield (key, value, pos). No locking is performed.""" return _read_all_values(data=self._m, used=self._used) def read_all_values(self): - """Yield (key, value). No locking is performed.""" - for k, v, _ in self._read_all_values(): - yield k, v + """Yield (key, value, timestamp). No locking is performed.""" + for k, v, ts, _ in self._read_all_values(): + yield k, v, ts def read_value(self, key): if key not in self._positions: self._init_value(key) pos = self._positions[key] - # We assume that reading from an 8 byte aligned value is atomic - return _unpack_double(self._m, pos)[0] + return _unpack_two_doubles(self._m, pos) - def write_value(self, key, value): + def write_value(self, key, value, timestamp): if key not in self._positions: self._init_value(key) pos = self._positions[key] - # We assume that writing to an 8 byte aligned value is atomic - _pack_double(self._m, pos, value) + _pack_two_doubles(self._m, pos, value, timestamp) def close(self): if self._f: diff --git a/prometheus_client/multiprocess.py b/prometheus_client/multiprocess.py index dd343913..7021b49a 100644 --- a/prometheus_client/multiprocess.py +++ b/prometheus_client/multiprocess.py @@ -68,7 +68,7 @@ def _parse_key(key): # the file is missing continue raise - for key, value, _ in file_values: + for key, value, timestamp, _ in file_values: metric_name, name, labels, labels_key, help_text = _parse_key(key) metric = metrics.get(metric_name) @@ -79,7 +79,7 @@ def _parse_key(key): if typ == 'gauge': pid = parts[2][:-3] metric._multiprocess_mode = parts[1] - metric.add_sample(name, labels_key + (('pid', pid),), value) + metric.add_sample(name, labels_key + (('pid', pid),), value, timestamp) else: # The duplicates and labels are fixed in the next for. metric.add_sample(name, labels_key, value) @@ -89,6 +89,7 @@ def _parse_key(key): def _accumulate_metrics(metrics, accumulate): for metric in metrics.values(): samples = defaultdict(float) + sample_timestamps = defaultdict(float) buckets = defaultdict(lambda: defaultdict(float)) samples_setdefault = samples.setdefault for s in metric.samples: @@ -105,6 +106,12 @@ def _accumulate_metrics(metrics, accumulate): samples[without_pid_key] = value elif metric._multiprocess_mode in ('sum', 'livesum'): samples[without_pid_key] += value + elif metric._multiprocess_mode in ('mostrecent', 'livemostrecent'): + current_timestamp = sample_timestamps[without_pid_key] + timestamp = float(timestamp or 0) + if current_timestamp < timestamp: + samples[without_pid_key] = value + sample_timestamps[without_pid_key] = timestamp else: # all/liveall samples[(name, labels)] = value diff --git a/prometheus_client/values.py b/prometheus_client/values.py index 3373379b..6ff85e3b 100644 --- a/prometheus_client/values.py +++ b/prometheus_client/values.py @@ -19,7 +19,7 @@ def inc(self, amount): with self._lock: self._value += amount - def set(self, value): + def set(self, value, timestamp=None): with self._lock: self._value = value @@ -82,7 +82,7 @@ def __reset(self): files[file_prefix] = MmapedDict(filename) self._file = files[file_prefix] self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text) - self._value = self._file.read_value(self._key) + self._value, self._timestamp = self._file.read_value(self._key) def __check_for_pid_change(self): actual_pid = process_identifier() @@ -99,13 +99,15 @@ def inc(self, amount): with lock: self.__check_for_pid_change() self._value += amount - self._file.write_value(self._key, self._value) + self._timestamp = 0.0 + self._file.write_value(self._key, self._value, self._timestamp) - def set(self, value): + def set(self, value, timestamp=None): with lock: self.__check_for_pid_change() self._value = value - self._file.write_value(self._key, self._value) + self._timestamp = timestamp or 0.0 + self._file.write_value(self._key, self._value, self._timestamp) def set_exemplar(self, exemplar): # TODO: Implement exemplars for multiprocess mode. diff --git a/tests/test_multiprocess.py b/tests/test_multiprocess.py index 10990ad3..6e188e51 100644 --- a/tests/test_multiprocess.py +++ b/tests/test_multiprocess.py @@ -185,6 +185,26 @@ def test_gauge_livesum(self): mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) self.assertEqual(2, self.registry.get_sample_value('g')) + def test_gauge_mostrecent(self): + g1 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent') + values.ValueClass = MultiProcessValue(lambda: 456) + g2 = Gauge('g', 'help', registry=None, multiprocess_mode='mostrecent') + g2.set(2) + g1.set(1) + self.assertEqual(1, self.registry.get_sample_value('g')) + mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) + self.assertEqual(1, self.registry.get_sample_value('g')) + + def test_gauge_livemostrecent(self): + g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent') + values.ValueClass = MultiProcessValue(lambda: 456) + g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livemostrecent') + g2.set(2) + g1.set(1) + self.assertEqual(1, self.registry.get_sample_value('g')) + mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) + self.assertEqual(2, self.registry.get_sample_value('g')) + def test_namespace_subsystem(self): c1 = Counter('c', 'help', registry=None, namespace='ns', subsystem='ss') c1.inc(1) @@ -369,28 +389,28 @@ def setUp(self): self.d = mmap_dict.MmapedDict(self.tempfile) def test_process_restart(self): - self.d.write_value('abc', 123.0) + self.d.write_value('abc', 123.0, 987.0) self.d.close() self.d = mmap_dict.MmapedDict(self.tempfile) - self.assertEqual(123, self.d.read_value('abc')) - self.assertEqual([('abc', 123.0)], list(self.d.read_all_values())) + self.assertEqual((123, 987.0), self.d.read_value('abc')) + self.assertEqual([('abc', 123.0, 987.0)], list(self.d.read_all_values())) def test_expansion(self): key = 'a' * mmap_dict._INITIAL_MMAP_SIZE - self.d.write_value(key, 123.0) - self.assertEqual([(key, 123.0)], list(self.d.read_all_values())) + self.d.write_value(key, 123.0, 987.0) + self.assertEqual([(key, 123.0, 987.0)], list(self.d.read_all_values())) def test_multi_expansion(self): key = 'a' * mmap_dict._INITIAL_MMAP_SIZE * 4 - self.d.write_value('abc', 42.0) - self.d.write_value(key, 123.0) - self.d.write_value('def', 17.0) + self.d.write_value('abc', 42.0, 987.0) + self.d.write_value(key, 123.0, 876.0) + self.d.write_value('def', 17.0, 765.0) self.assertEqual( - [('abc', 42.0), (key, 123.0), ('def', 17.0)], + [('abc', 42.0, 987.0), (key, 123.0, 876.0), ('def', 17.0, 765.0)], list(self.d.read_all_values())) def test_corruption_detected(self): - self.d.write_value('abc', 42.0) + self.d.write_value('abc', 42.0, 987.0) # corrupt the written data self.d._m[8:16] = b'somejunk' with self.assertRaises(RuntimeError):