Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental synchronous gauge #1410

Merged
merged 17 commits into from Dec 12, 2023
8 changes: 6 additions & 2 deletions examples/metrics-basic/Cargo.toml
Expand Up @@ -6,8 +6,12 @@ license = "Apache-2.0"
publish = false

[dependencies]
opentelemetry = { path = "../../opentelemetry", features = ["metrics"] }
opentelemetry = { path = "../../opentelemetry", features = ["metrics", "sync-gauge-experimental"] }
shaun-cox marked this conversation as resolved.
Show resolved Hide resolved
opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["metrics", "rt-tokio"] }
opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["metrics"]}
tokio = { version = "1.0", features = ["full"] }
serde_json = {version = "1.0"}
serde_json = {version = "1.0"}

[features]
default = ["sync-gauge-experimental"]
sync-gauge-experimental = ["opentelemetry/sync-gauge-experimental"]
28 changes: 22 additions & 6 deletions examples/metrics-basic/src/main.rs
Expand Up @@ -110,17 +110,35 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {

// Note that there is no ObservableHistogram instrument.

// Create a ObservableGauge instrument and register a callback that reports the measurement.
// Create a Gauge Instrument.
// Note that the Gauge instrument is experimental, and can be changed/removed in the future releases.
#[cfg(feature = "sync-gauge-experimental")]
let gauge = meter
.f64_observable_gauge("my_gauge")
.f64_gauge("my_gauge")
.with_description("A gauge set to 1.0")
.with_unit(Unit::new("myunit"))
.init();
#[cfg(feature = "sync-gauge-experimental")]
gauge.record(
1.0,
[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
]
.as_ref(),
);

// Create a ObservableGauge instrument and register a callback that reports the measurement.
let observable_gauge = meter
.f64_observable_gauge("my_observable_gauge")
.with_description("An observable gauge set to 1.0")
.with_unit(Unit::new("myunit"))
.init();

// Register a callback that reports the measurement.
meter.register_callback(&[gauge.as_any()], move |observer| {
meter.register_callback(&[observable_gauge.as_any()], move |observer| {
observer.observe_f64(
&gauge,
&observable_gauge,
1.0,
[
KeyValue::new("mykey1", "myvalue1"),
Expand All @@ -130,8 +148,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
)
})?;

// Note that Gauge only has a Observable version.

// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
Expand Down
16 changes: 15 additions & 1 deletion opentelemetry-sdk/src/metrics/instrument.rs
Expand Up @@ -2,7 +2,8 @@ use std::{any::Any, borrow::Cow, collections::HashSet, hash::Hash, marker, sync:

use opentelemetry::{
metrics::{
AsyncInstrument, MetricsError, Result, SyncCounter, SyncHistogram, SyncUpDownCounter, Unit,
AsyncInstrument, MetricsError, Result, SyncCounter, SyncGauge, SyncHistogram,
SyncUpDownCounter, Unit,
},
Key, KeyValue,
};
Expand Down Expand Up @@ -33,6 +34,11 @@ pub enum InstrumentKind {
/// A group of instruments that record increasing and decreasing values in an
/// asynchronous callback.
ObservableUpDownCounter,

/// a group of instruments that record current value synchronously with
/// the code path they are measuring.
Gauge,
///
/// a group of instruments that record current values in an asynchronous callback.
ObservableGauge,
}
Expand Down Expand Up @@ -268,6 +274,14 @@ impl<T: Copy + 'static> SyncUpDownCounter<T> for ResolvedMeasures<T> {
}
}

impl<T: Copy + 'static> SyncGauge<T> for ResolvedMeasures<T> {
fn record(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(val, AttributeSet::from(attrs))
}
}
}

impl<T: Copy + 'static> SyncHistogram<T> for ResolvedMeasures<T> {
fn record(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
Expand Down
60 changes: 57 additions & 3 deletions opentelemetry-sdk/src/metrics/meter.rs
Expand Up @@ -5,9 +5,9 @@ use opentelemetry::{
global,
metrics::{
noop::{NoopAsyncInstrument, NoopRegistration},
AsyncInstrument, Callback, CallbackRegistration, Counter, Histogram, InstrumentProvider,
MetricsError, ObservableCounter, ObservableGauge, ObservableUpDownCounter,
Observer as ApiObserver, Result, Unit, UpDownCounter,
AsyncInstrument, Callback, CallbackRegistration, Counter, Gauge, Histogram,
InstrumentProvider, MetricsError, ObservableCounter, ObservableGauge,
ObservableUpDownCounter, Observer as ApiObserver, Result, Unit, UpDownCounter,
},
KeyValue,
};
Expand Down Expand Up @@ -299,6 +299,57 @@ impl InstrumentProvider for SdkMeter {
Ok(ObservableUpDownCounter::new(observable))
}

fn u64_gauge(
&self,
name: Cow<'static, str>,
description: Option<Cow<'static, str>>,
unit: Option<Unit>,
) -> Result<Gauge<u64>> {
validate_instrument_config(name.as_ref(), unit.as_ref(), self.validation_policy)?;
let p = InstrumentResolver::new(self, &self.u64_resolver);
p.lookup(
InstrumentKind::Gauge,
name,
description,
unit.unwrap_or_default(),
)
.map(|i| Gauge::new(Arc::new(i)))
}

fn f64_gauge(
&self,
name: Cow<'static, str>,
description: Option<Cow<'static, str>>,
unit: Option<Unit>,
) -> Result<Gauge<f64>> {
validate_instrument_config(name.as_ref(), unit.as_ref(), self.validation_policy)?;
let p = InstrumentResolver::new(self, &self.f64_resolver);
p.lookup(
InstrumentKind::Gauge,
name,
description,
unit.unwrap_or_default(),
)
.map(|i| Gauge::new(Arc::new(i)))
}

fn i64_gauge(
&self,
name: Cow<'static, str>,
description: Option<Cow<'static, str>>,
unit: Option<Unit>,
) -> Result<Gauge<i64>> {
validate_instrument_config(name.as_ref(), unit.as_ref(), self.validation_policy)?;
let p = InstrumentResolver::new(self, &self.i64_resolver);
p.lookup(
InstrumentKind::Gauge,
name,
description,
unit.unwrap_or_default(),
)
.map(|i| Gauge::new(Arc::new(i)))
}

fn u64_observable_gauge(
&self,
name: Cow<'static, str>,
Expand Down Expand Up @@ -784,6 +835,9 @@ mod tests {
.f64_observable_up_down_counter(name.into(), None, None, Vec::new())
.map(|_| ()),
);
assert(meter.u64_gauge(name.into(), None, None).map(|_| ()));
assert(meter.f64_gauge(name.into(), None, None).map(|_| ()));
assert(meter.i64_gauge(name.into(), None, None).map(|_| ()));
assert(
meter
.u64_observable_gauge(name.into(), None, None, Vec::new())
Expand Down
14 changes: 9 additions & 5 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Expand Up @@ -536,6 +536,7 @@ fn aggregate_fn<T: Number<T>>(
/// | Histogram | ✓ | | ✓ | ✓ | ✓ |
/// | Observable Counter | ✓ | | ✓ | ✓ | ✓ |
/// | Observable UpDownCounter | ✓ | | ✓ | ✓ | ✓ |
/// | Gauge | ✓ | ✓ | | ✓ | ✓ |
/// | Observable Gauge | ✓ | ✓ | | ✓ | ✓ |
fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregation) -> Result<()> {
use aggregation::Aggregation;
Expand All @@ -547,6 +548,7 @@ fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregatio
kind,
InstrumentKind::Counter
| InstrumentKind::UpDownCounter
| InstrumentKind::Gauge
| InstrumentKind::Histogram
| InstrumentKind::ObservableCounter
| InstrumentKind::ObservableUpDownCounter
Expand All @@ -571,12 +573,14 @@ fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregatio
}
}
Aggregation::LastValue => {
if kind == &InstrumentKind::ObservableGauge {
return Ok(());
match kind {
InstrumentKind::Gauge | InstrumentKind::ObservableGauge => Ok(()),
_ => {
// TODO: review need for aggregation check after
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
Err(MetricsError::Other("incompatible aggregation".into()))
}
}
// TODO: review need for aggregation check after
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
Err(MetricsError::Other("incompatible aggregation".into()))
}
Aggregation::Drop => Ok(()),
}
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/src/metrics/reader.rs
Expand Up @@ -121,6 +121,7 @@ where
/// * Observable Counter ⇨ Sum
/// * UpDownCounter ⇨ Sum
/// * Observable UpDownCounter ⇨ Sum
/// * Gauge ⇨ LastValue
/// * Observable Gauge ⇨ LastValue
/// * Histogram ⇨ ExplicitBucketHistogram
///
Expand All @@ -144,6 +145,7 @@ impl AggregationSelector for DefaultAggregationSelector {
| InstrumentKind::UpDownCounter
| InstrumentKind::ObservableCounter
| InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
InstrumentKind::Gauge => Aggregation::LastValue,
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
InstrumentKind::ObservableGauge => Aggregation::LastValue,
InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
boundaries: vec![
Expand Down
1 change: 1 addition & 0 deletions opentelemetry/Cargo.toml
Expand Up @@ -38,6 +38,7 @@ metrics = []
testing = ["trace", "metrics"]
logs = []
logs_level_enabled = ["logs"]
sync-gauge-experimental = []

[dev-dependencies]
opentelemetry_sdk = { path = "../opentelemetry-sdk" } # for documentation tests
68 changes: 66 additions & 2 deletions opentelemetry/src/metrics/instruments/gauge.rs
@@ -1,12 +1,76 @@
use crate::{
metrics::{AsyncInstrument, AsyncInstrumentBuilder, MetricsError},
metrics::{AsyncInstrument, AsyncInstrumentBuilder, InstrumentBuilder, MetricsError},
KeyValue,
};
use core::fmt;
use std::sync::Arc;
use std::{any::Any, convert::TryFrom};

/// An instrument that records independent readings.
/// An SDK implemented instrument that records independent values
pub trait SyncGauge<T> {
/// Records an independent value.
fn record(&self, value: T, attributes: &[KeyValue]);
}

/// An instrument that records independent values
#[derive(Clone)]
pub struct Gauge<T>(Arc<dyn SyncGauge<T> + Send + Sync>);

impl<T> fmt::Debug for Gauge<T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!("Gauge<{}>", std::any::type_name::<T>()))
}
}

impl<T> Gauge<T> {
/// Create a new gauge.
pub fn new(inner: Arc<dyn SyncGauge<T> + Send + Sync>) -> Self {
Gauge(inner)
}

/// Records an independent value.
pub fn record(&self, value: T, attributes: &[KeyValue]) {
self.0.record(value, attributes)
}
}

impl TryFrom<InstrumentBuilder<'_, Gauge<u64>>> for Gauge<u64> {
type Error = MetricsError;

fn try_from(builder: InstrumentBuilder<'_, Gauge<u64>>) -> Result<Self, Self::Error> {
builder
.meter
.instrument_provider
.u64_gauge(builder.name, builder.description, builder.unit)
}
}

impl TryFrom<InstrumentBuilder<'_, Gauge<f64>>> for Gauge<f64> {
type Error = MetricsError;

fn try_from(builder: InstrumentBuilder<'_, Gauge<f64>>) -> Result<Self, Self::Error> {
builder
.meter
.instrument_provider
.f64_gauge(builder.name, builder.description, builder.unit)
}
}

impl TryFrom<InstrumentBuilder<'_, Gauge<i64>>> for Gauge<i64> {
type Error = MetricsError;

fn try_from(builder: InstrumentBuilder<'_, Gauge<i64>>) -> Result<Self, Self::Error> {
builder
.meter
.instrument_provider
.i64_gauge(builder.name, builder.description, builder.unit)
}
}

/// An async instrument that records independent readings.
#[derive(Clone)]
pub struct ObservableGauge<T>(Arc<dyn AsyncInstrument<T>>);

Expand Down
35 changes: 35 additions & 0 deletions opentelemetry/src/metrics/meter.rs
Expand Up @@ -3,6 +3,8 @@ use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;

#[cfg(feature = "sync-gauge-experimental")]
use crate::metrics::Gauge;
use crate::metrics::{
AsyncInstrumentBuilder, Counter, Histogram, InstrumentBuilder, InstrumentProvider,
ObservableCounter, ObservableGauge, ObservableUpDownCounter, Result, UpDownCounter,
Expand Down Expand Up @@ -333,6 +335,39 @@ impl Meter {
AsyncInstrumentBuilder::new(self, name.into())
}

/// # Experimental
/// This method is experimental and can be changed/removed in future releases.
/// creates an instrument builder for recording independent values.
#[cfg(feature = "sync-gauge-experimental")]
pub fn u64_gauge(
&self,
name: impl Into<Cow<'static, str>>,
) -> InstrumentBuilder<'_, Gauge<u64>> {
InstrumentBuilder::new(self, name.into())
}

/// # Experimental
/// This method is experimental and can be changed/removed in future releases.
/// creates an instrument builder for recording independent values.
#[cfg(feature = "sync-gauge-experimental")]
pub fn f64_gauge(
&self,
name: impl Into<Cow<'static, str>>,
) -> InstrumentBuilder<'_, Gauge<f64>> {
InstrumentBuilder::new(self, name.into())
}

/// # Experimental
/// This method is experimental and can be changed/removed in future releases.
/// creates an instrument builder for recording indenpendent values.
#[cfg(feature = "sync-gauge-experimental")]
pub fn i64_gauge(
&self,
name: impl Into<Cow<'static, str>>,
) -> InstrumentBuilder<'_, Gauge<i64>> {
InstrumentBuilder::new(self, name.into())
}

/// creates an instrument builder for recording the current value via callback.
pub fn u64_observable_gauge(
&self,
Expand Down