Skip to content

Commit

Permalink
WIP: Populate OM's created timestamps
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Silva Sens <arthur.sens@coralogix.com>
  • Loading branch information
ArthurSens committed Jul 27, 2023
1 parent 644c80d commit 7462b99
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 424 deletions.
2 changes: 1 addition & 1 deletion examples/exemplars/main.go
Expand Up @@ -66,4 +66,4 @@ func main() {
)
// To test: curl -H 'Accept: application/openmetrics-text' localhost:8080/metrics
log.Fatalln(http.ListenAndServe(":8080", nil))
}
}
13 changes: 8 additions & 5 deletions go.mod
Expand Up @@ -7,7 +7,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0
github.com/davecgh/go-spew v1.1.1
github.com/json-iterator/go v1.1.12
github.com/prometheus/client_model v0.3.0
github.com/prometheus/client_model v0.4.0
github.com/prometheus/common v0.42.0
github.com/prometheus/procfs v0.10.1
golang.org/x/sys v0.8.0
Expand All @@ -22,12 +22,15 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

exclude github.com/prometheus/client_golang v1.12.1

replace github.com/prometheus/common => github.com/ArthurSens/common v0.0.0-20230718212508-8ffc995a3589

replace github.com/prometheus/client_model => github.com/ArthurSens/client_model v0.0.0-20230718164431-9a2bf3000d16
417 changes: 12 additions & 405 deletions go.sum

Large diffs are not rendered by default.

17 changes: 15 additions & 2 deletions prometheus/counter.go
Expand Up @@ -20,6 +20,7 @@ import (
"time"

dto "github.com/prometheus/client_model/go"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)

// Counter is a Metric that represents a single numerical value that only ever
Expand Down Expand Up @@ -106,10 +107,11 @@ type counter struct {
selfCollector
desc *Desc

createdTs *timestamppb.Timestamp
labelPairs []*dto.LabelPair
exemplar atomic.Value // Containing nil or a *dto.Exemplar.

now func() time.Time // To mock out time.Now() for testing.
now func() time.Time // To mock out time.Now() for testing.
}

func (c *counter) Desc() *Desc {
Expand All @@ -121,6 +123,10 @@ func (c *counter) Add(v float64) {
panic(errors.New("counter cannot decrease in value"))
}

if c.createdTs == nil {
c.setCreatedTimestamp()
}

ival := uint64(v)
if float64(ival) == v {
atomic.AddUint64(&c.valInt, ival)
Expand All @@ -143,6 +149,13 @@ func (c *counter) AddWithExemplar(v float64, e Labels) {

func (c *counter) Inc() {
atomic.AddUint64(&c.valInt, 1)
if c.createdTs == nil {
c.setCreatedTimestamp()
}
}

func (c *counter) setCreatedTimestamp() {
c.createdTs = timestamppb.New(c.now())
}

func (c *counter) get() float64 {
Expand All @@ -160,7 +173,7 @@ func (c *counter) Write(out *dto.Metric) error {
}
val := c.get()

return populateMetric(CounterValue, val, c.labelPairs, exemplar, out)
return populateMetric(CounterValue, val, c.labelPairs, exemplar, out, c.createdTs)
}

func (c *counter) updateExemplar(v float64, l Labels) {
Expand Down
24 changes: 24 additions & 0 deletions prometheus/counter_test.go
Expand Up @@ -273,3 +273,27 @@ func TestCounterExemplar(t *testing.T) {
t.Error("adding exemplar with oversized labels succeeded")
}
}

func TestCounterCreatedTimestamp(t *testing.T) {
now := time.Now()

counter := NewCounter(CounterOpts{
Name: "test",
Help: "test help",
}).(*counter)
counter.now = func() time.Time { return now }

ts := timestamppb.New(now)
if err := ts.CheckValid(); err != nil {
t.Fatal(err)
}

expectedCounter := &dto.Counter{
CreatedTimestamp: ts,
}

counter.Inc()
if counter.createdTs.AsTime() != expectedCounter.CreatedTimestamp.AsTime() {
t.Errorf("expected created timestamp %s, got %s", expectedCounter.CreatedTimestamp, counter.createdTs)
}
}
2 changes: 1 addition & 1 deletion prometheus/gauge.go
Expand Up @@ -135,7 +135,7 @@ func (g *gauge) Sub(val float64) {

func (g *gauge) Write(out *dto.Metric) error {
val := math.Float64frombits(atomic.LoadUint64(&g.valBits))
return populateMetric(GaugeValue, val, g.labelPairs, nil, out)
return populateMetric(GaugeValue, val, g.labelPairs, nil, out, nil)
}

// GaugeVec is a Collector that bundles a set of Gauges that all share the same
Expand Down
21 changes: 17 additions & 4 deletions prometheus/histogram.go
Expand Up @@ -25,6 +25,7 @@ import (
dto "github.com/prometheus/client_model/go"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

// nativeHistogramBounds for the frac of observed values. Only relevant for
Expand Down Expand Up @@ -694,6 +695,7 @@ type histogram struct {
counts [2]*histogramCounts

upperBounds []float64
createdTs *timestamppb.Timestamp
labelPairs []*dto.LabelPair
exemplars []atomic.Value // One more than buckets (to include +Inf), each a *dto.Exemplar.
nativeHistogramSchema int32 // The initial schema. Set to math.MinInt32 if no sparse buckets are used.
Expand Down Expand Up @@ -742,9 +744,10 @@ func (h *histogram) Write(out *dto.Metric) error {
waitForCooldown(count, coldCounts)

his := &dto.Histogram{
Bucket: make([]*dto.Bucket, len(h.upperBounds)),
SampleCount: proto.Uint64(count),
SampleSum: proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits))),
Bucket: make([]*dto.Bucket, len(h.upperBounds)),
SampleCount: proto.Uint64(count),
SampleSum: proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits))),
CreatedTimestamp: h.createdTs,
}
out.Histogram = his
out.Label = h.labelPairs
Expand Down Expand Up @@ -815,6 +818,13 @@ func (h *histogram) observe(v float64, bucket int) {
if doSparse {
h.limitBuckets(hotCounts, v, bucket)
}
if h.createdTs == nil {
h.setCreatedTimestamp()
}
}

func (h *histogram) setCreatedTimestamp() {
h.createdTs = timestamppb.New(h.now())
}

// limitBuckets applies a strategy to limit the number of populated sparse
Expand Down Expand Up @@ -1175,6 +1185,7 @@ type constHistogram struct {
count uint64
sum float64
buckets map[float64]uint64
createdTS *timestamppb.Timestamp
labelPairs []*dto.LabelPair
}

Expand All @@ -1183,7 +1194,9 @@ func (h *constHistogram) Desc() *Desc {
}

func (h *constHistogram) Write(out *dto.Metric) error {
his := &dto.Histogram{}
his := &dto.Histogram{
CreatedTimestamp: h.createdTS,
}

buckets := make([]*dto.Bucket, 0, len(h.buckets))

Expand Down
25 changes: 25 additions & 0 deletions prometheus/histogram_test.go
Expand Up @@ -875,3 +875,28 @@ func TestGetLe(t *testing.T) {
}
}
}

func TestHistogramCreatedTimestamp(t *testing.T) {
now := time.Now()

histogram := NewHistogram(HistogramOpts{
Name: "test",
Help: "test help",
Buckets: []float64{1, 2, 3, 4},
}).(*histogram)
histogram.now = func() time.Time { return now }

ts := timestamppb.New(now)
if err := ts.CheckValid(); err != nil {
t.Fatal(err)
}
expectedHistogram := &dto.Histogram{
CreatedTimestamp: ts,
}

histogram.Observe(1.5)

if histogram.createdTs.AsTime() != expectedHistogram.CreatedTimestamp.AsTime() {
t.Errorf("expected created timestamp %s, got %s", expectedHistogram.CreatedTimestamp, histogram.createdTs)
}
}
32 changes: 29 additions & 3 deletions prometheus/summary.go
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/beorn7/perks/quantile"
"google.golang.org/protobuf/proto"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)

// quantileLabel is used for the label that defines the quantile in a
Expand Down Expand Up @@ -244,6 +245,7 @@ func newSummary(desc *Desc, opts SummaryOpts, labelValues ...string) Summary {
hotBuf: make([]float64, 0, opts.BufCap),
coldBuf: make([]float64, 0, opts.BufCap),
streamDuration: opts.MaxAge / time.Duration(opts.AgeBuckets),
now: time.Now,
}
s.headStreamExpTime = time.Now().Add(s.streamDuration)
s.hotBufExpTime = s.headStreamExpTime
Expand Down Expand Up @@ -274,6 +276,7 @@ type summary struct {
objectives map[float64]float64
sortedObjectives []float64

createdTs *timestamppb.Timestamp
labelPairs []*dto.LabelPair

sum float64
Expand All @@ -286,6 +289,8 @@ type summary struct {
headStream *quantile.Stream
headStreamIdx int
headStreamExpTime, hotBufExpTime time.Time

now func() time.Time // To mock out time.Now() for testing.
}

func (s *summary) Desc() *Desc {
Expand All @@ -304,10 +309,19 @@ func (s *summary) Observe(v float64) {
if len(s.hotBuf) == cap(s.hotBuf) {
s.asyncFlush(now)
}
if s.createdTs == nil {
s.setCreatedTimestamp()
}
}

func (s *summary) setCreatedTimestamp() {
s.createdTs = timestamppb.New(s.now())
}

func (s *summary) Write(out *dto.Metric) error {
sum := &dto.Summary{}
sum := &dto.Summary{
CreatedTimestamp: s.createdTs,
}
qs := make([]*dto.Quantile, 0, len(s.objectives))

s.bufMtx.Lock()
Expand Down Expand Up @@ -439,7 +453,10 @@ type noObjectivesSummary struct {
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG.
counts [2]*summaryCounts

createdTs *timestamppb.Timestamp
labelPairs []*dto.LabelPair

now func() time.Time // To mock out time.Now() for testing.
}

func (s *noObjectivesSummary) Desc() *Desc {
Expand All @@ -463,6 +480,14 @@ func (s *noObjectivesSummary) Observe(v float64) {
// Increment count last as we take it as a signal that the observation
// is complete.
atomic.AddUint64(&hotCounts.count, 1)

if s.createdTs == nil {
s.setCreatedTimestamp()
}
}

func (s *noObjectivesSummary) setCreatedTimestamp() {
s.createdTs = timestamppb.New(s.now())
}

func (s *noObjectivesSummary) Write(out *dto.Metric) error {
Expand Down Expand Up @@ -490,8 +515,9 @@ func (s *noObjectivesSummary) Write(out *dto.Metric) error {
}

sum := &dto.Summary{
SampleCount: proto.Uint64(count),
SampleSum: proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits))),
SampleCount: proto.Uint64(count),
SampleSum: proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits))),
CreatedTimestamp: s.createdTs,
}

out.Summary = sum
Expand Down
38 changes: 38 additions & 0 deletions prometheus/summary_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"time"

dto "github.com/prometheus/client_model/go"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)

func TestSummaryWithDefaultObjectives(t *testing.T) {
Expand Down Expand Up @@ -420,3 +421,40 @@ func getBounds(vars []float64, q, ε float64) (min, max float64) {
}
return
}

func TestSummaryCreatedTimestamp(t *testing.T) {
now := time.Now()

noObjSummary := NewSummary(SummaryOpts{
Name: "test",
Help: "test help",
}).(*noObjectivesSummary)
noObjSummary.now = func() time.Time { return now }

summary := NewSummary(SummaryOpts{
Name: "test2",
Help: "test2",
Objectives: map[float64]float64{
1.0: 1.0,
},
}).(*summary)
summary.now = func() time.Time { return now }

ts := timestamppb.New(now)
if err := ts.CheckValid(); err != nil {
t.Fatal(err)
}

expectedSummary := &dto.Summary{
CreatedTimestamp: ts,
}

noObjSummary.Observe(1.5)
summary.Observe(1.5)
if noObjSummary.createdTs.AsTime() != expectedSummary.CreatedTimestamp.AsTime() {
t.Errorf("expected created timestamp %s, got %s", expectedSummary.CreatedTimestamp, noObjSummary.createdTs)
}
if summary.createdTs.AsTime() != expectedSummary.CreatedTimestamp.AsTime() {
t.Errorf("expected created timestamp %s, got %s", expectedSummary.CreatedTimestamp, summary.createdTs)
}
}
7 changes: 4 additions & 3 deletions prometheus/value.go
Expand Up @@ -91,7 +91,7 @@ func (v *valueFunc) Desc() *Desc {
}

func (v *valueFunc) Write(out *dto.Metric) error {
return populateMetric(v.valType, v.function(), v.labelPairs, nil, out)
return populateMetric(v.valType, v.function(), v.labelPairs, nil, out, nil)
}

// NewConstMetric returns a metric with one fixed value that cannot be
Expand All @@ -110,7 +110,7 @@ func NewConstMetric(desc *Desc, valueType ValueType, value float64, labelValues
}

metric := &dto.Metric{}
if err := populateMetric(valueType, value, MakeLabelPairs(desc, labelValues), nil, metric); err != nil {
if err := populateMetric(valueType, value, MakeLabelPairs(desc, labelValues), nil, metric, timestamppb.Now()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -153,11 +153,12 @@ func populateMetric(
labelPairs []*dto.LabelPair,
e *dto.Exemplar,
m *dto.Metric,
cTs *timestamppb.Timestamp,
) error {
m.Label = labelPairs
switch t {
case CounterValue:
m.Counter = &dto.Counter{Value: proto.Float64(v), Exemplar: e}
m.Counter = &dto.Counter{Value: proto.Float64(v), Exemplar: e, CreatedTimestamp: cTs}
case GaugeValue:
m.Gauge = &dto.Gauge{Value: proto.Float64(v)}
case UntypedValue:
Expand Down

0 comments on commit 7462b99

Please sign in to comment.