Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental synchronous gauge #1410

Merged
merged 17 commits into from Dec 12, 2023
13 changes: 13 additions & 0 deletions CONTRIBUTING.md
Expand Up @@ -137,6 +137,19 @@ OpenTelemetry supports multiple ways to configure the API, SDK and other compone
- Environment variables
- Compiling time configurations provided in the source code

### Experimental/Unstable features:

Use `otel_unstable` flag for implementation of specification with [experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.27.0/specification/document-status.md) status. This approach ensures clear demarcation and safe integration of new or evolving features. Utilize the following structure:

```rust
#[cfg(otel_unstable)]
{
// Your feature implementation
}
```

It's important to regularly review and remove the `otel_unstable` flag from the code once the feature becomes stable. This cleanup process is crucial to maintain the overall code quality and to ensure that stable features are accurately reflected in the main build.

## Style Guide

- Run `cargo clippy --all` - this will catch common mistakes and improve
Expand Down
2 changes: 1 addition & 1 deletion examples/metrics-basic/Cargo.toml
Expand Up @@ -10,4 +10,4 @@ opentelemetry = { path = "../../opentelemetry", features = ["metrics"] }
opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["metrics", "rt-tokio"] }
opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["metrics"]}
tokio = { version = "1.0", features = ["full"] }
serde_json = {version = "1.0"}
serde_json = {version = "1.0"}
32 changes: 25 additions & 7 deletions examples/metrics-basic/src/main.rs
Expand Up @@ -110,17 +110,37 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {

// Note that there is no ObservableHistogram instrument.

// Create a Gauge Instrument.
// Note that the Gauge instrument is experimental, and can be changed/removed in the future releases.
#[cfg(otel_unstable)]
{
let gauge = meter
.f64_gauge("my_gauge")
.with_description("A gauge set to 1.0")
.with_unit(Unit::new("myunit"))
.init();

gauge.record(
1.0,
[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
]
.as_ref(),
);
}

// Create a ObservableGauge instrument and register a callback that reports the measurement.
let gauge = meter
.f64_observable_gauge("my_gauge")
.with_description("A gauge set to 1.0")
let observable_gauge = meter
.f64_observable_gauge("my_observable_gauge")
.with_description("An observable gauge set to 1.0")
.with_unit(Unit::new("myunit"))
.init();

// Register a callback that reports the measurement.
meter.register_callback(&[gauge.as_any()], move |observer| {
meter.register_callback(&[observable_gauge.as_any()], move |observer| {
observer.observe_f64(
&gauge,
&observable_gauge,
1.0,
[
KeyValue::new("mykey1", "myvalue1"),
Expand All @@ -130,8 +150,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
)
})?;

// Note that Gauge only has a Observable version.

// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Expand Up @@ -2,6 +2,10 @@

## vNext

### Added

- [#1410](https://github.com/open-telemetry/opentelemetry-rust/pull/1410) Add experimental synchronous gauge
cijothomas marked this conversation as resolved.
Show resolved Hide resolved

### Changed

- **Breaking**
Expand Down
16 changes: 15 additions & 1 deletion opentelemetry-sdk/src/metrics/instrument.rs
Expand Up @@ -2,7 +2,8 @@ use std::{any::Any, borrow::Cow, collections::HashSet, hash::Hash, marker, sync:

use opentelemetry::{
metrics::{
AsyncInstrument, MetricsError, Result, SyncCounter, SyncHistogram, SyncUpDownCounter, Unit,
AsyncInstrument, MetricsError, Result, SyncCounter, SyncGauge, SyncHistogram,
SyncUpDownCounter, Unit,
},
Key, KeyValue,
};
Expand Down Expand Up @@ -33,6 +34,11 @@ pub enum InstrumentKind {
/// A group of instruments that record increasing and decreasing values in an
/// asynchronous callback.
ObservableUpDownCounter,

/// a group of instruments that record current value synchronously with
/// the code path they are measuring.
Gauge,
///
/// a group of instruments that record current values in an asynchronous callback.
ObservableGauge,
}
Expand Down Expand Up @@ -268,6 +274,14 @@ impl<T: Copy + 'static> SyncUpDownCounter<T> for ResolvedMeasures<T> {
}
}

impl<T: Copy + 'static> SyncGauge<T> for ResolvedMeasures<T> {
fn record(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(val, AttributeSet::from(attrs))
}
}
}

impl<T: Copy + 'static> SyncHistogram<T> for ResolvedMeasures<T> {
fn record(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
Expand Down
60 changes: 57 additions & 3 deletions opentelemetry-sdk/src/metrics/meter.rs
Expand Up @@ -5,9 +5,9 @@ use opentelemetry::{
global,
metrics::{
noop::{NoopAsyncInstrument, NoopRegistration},
AsyncInstrument, Callback, CallbackRegistration, Counter, Histogram, InstrumentProvider,
MetricsError, ObservableCounter, ObservableGauge, ObservableUpDownCounter,
Observer as ApiObserver, Result, Unit, UpDownCounter,
AsyncInstrument, Callback, CallbackRegistration, Counter, Gauge, Histogram,
InstrumentProvider, MetricsError, ObservableCounter, ObservableGauge,
ObservableUpDownCounter, Observer as ApiObserver, Result, Unit, UpDownCounter,
},
KeyValue,
};
Expand Down Expand Up @@ -299,6 +299,57 @@ impl InstrumentProvider for SdkMeter {
Ok(ObservableUpDownCounter::new(observable))
}

fn u64_gauge(
&self,
name: Cow<'static, str>,
description: Option<Cow<'static, str>>,
unit: Option<Unit>,
) -> Result<Gauge<u64>> {
validate_instrument_config(name.as_ref(), unit.as_ref(), self.validation_policy)?;
let p = InstrumentResolver::new(self, &self.u64_resolver);
p.lookup(
InstrumentKind::Gauge,
name,
description,
unit.unwrap_or_default(),
)
.map(|i| Gauge::new(Arc::new(i)))
}

fn f64_gauge(
&self,
name: Cow<'static, str>,
description: Option<Cow<'static, str>>,
unit: Option<Unit>,
) -> Result<Gauge<f64>> {
validate_instrument_config(name.as_ref(), unit.as_ref(), self.validation_policy)?;
let p = InstrumentResolver::new(self, &self.f64_resolver);
p.lookup(
InstrumentKind::Gauge,
name,
description,
unit.unwrap_or_default(),
)
.map(|i| Gauge::new(Arc::new(i)))
}

fn i64_gauge(
&self,
name: Cow<'static, str>,
description: Option<Cow<'static, str>>,
unit: Option<Unit>,
) -> Result<Gauge<i64>> {
validate_instrument_config(name.as_ref(), unit.as_ref(), self.validation_policy)?;
let p = InstrumentResolver::new(self, &self.i64_resolver);
p.lookup(
InstrumentKind::Gauge,
name,
description,
unit.unwrap_or_default(),
)
.map(|i| Gauge::new(Arc::new(i)))
}

fn u64_observable_gauge(
&self,
name: Cow<'static, str>,
Expand Down Expand Up @@ -784,6 +835,9 @@ mod tests {
.f64_observable_up_down_counter(name.into(), None, None, Vec::new())
.map(|_| ()),
);
assert(meter.u64_gauge(name.into(), None, None).map(|_| ()));
assert(meter.f64_gauge(name.into(), None, None).map(|_| ()));
assert(meter.i64_gauge(name.into(), None, None).map(|_| ()));
assert(
meter
.u64_observable_gauge(name.into(), None, None, Vec::new())
Expand Down
14 changes: 9 additions & 5 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Expand Up @@ -536,6 +536,7 @@ fn aggregate_fn<T: Number<T>>(
/// | Histogram | ✓ | | ✓ | ✓ | ✓ |
/// | Observable Counter | ✓ | | ✓ | ✓ | ✓ |
/// | Observable UpDownCounter | ✓ | | ✓ | ✓ | ✓ |
/// | Gauge | ✓ | ✓ | | ✓ | ✓ |
/// | Observable Gauge | ✓ | ✓ | | ✓ | ✓ |
fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregation) -> Result<()> {
use aggregation::Aggregation;
Expand All @@ -547,6 +548,7 @@ fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregatio
kind,
InstrumentKind::Counter
| InstrumentKind::UpDownCounter
| InstrumentKind::Gauge
| InstrumentKind::Histogram
| InstrumentKind::ObservableCounter
| InstrumentKind::ObservableUpDownCounter
Expand All @@ -571,12 +573,14 @@ fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregatio
}
}
Aggregation::LastValue => {
if kind == &InstrumentKind::ObservableGauge {
return Ok(());
match kind {
InstrumentKind::Gauge | InstrumentKind::ObservableGauge => Ok(()),
_ => {
// TODO: review need for aggregation check after
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
Err(MetricsError::Other("incompatible aggregation".into()))
}
}
// TODO: review need for aggregation check after
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
Err(MetricsError::Other("incompatible aggregation".into()))
}
Aggregation::Drop => Ok(()),
}
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/src/metrics/reader.rs
Expand Up @@ -121,6 +121,7 @@ where
/// * Observable Counter ⇨ Sum
/// * UpDownCounter ⇨ Sum
/// * Observable UpDownCounter ⇨ Sum
/// * Gauge ⇨ LastValue
/// * Observable Gauge ⇨ LastValue
/// * Histogram ⇨ ExplicitBucketHistogram
///
Expand All @@ -144,6 +145,7 @@ impl AggregationSelector for DefaultAggregationSelector {
| InstrumentKind::UpDownCounter
| InstrumentKind::ObservableCounter
| InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
InstrumentKind::Gauge => Aggregation::LastValue,
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
InstrumentKind::ObservableGauge => Aggregation::LastValue,
InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
boundaries: vec![
Expand Down
6 changes: 6 additions & 0 deletions opentelemetry/CHANGELOG.md
Expand Up @@ -2,6 +2,12 @@

## vNext

### Added

- [#1410](https://github.com/open-telemetry/opentelemetry-rust/pull/1410) Add experimental synchronous gauge.

- [#1410](https://github.com/open-telemetry/opentelemetry-rust/pull/1410) Guidelines to add new unstable/experimental features.

### Changed

- Modified `AnyValue.Map` to be backed by `HashMap` instead of custom `OrderMap`,
Expand Down
1 change: 1 addition & 0 deletions opentelemetry/Cargo.toml
Expand Up @@ -38,6 +38,7 @@ metrics = []
testing = ["trace", "metrics"]
logs = []
logs_level_enabled = ["logs"]
unstable = []

[dev-dependencies]
opentelemetry_sdk = { path = "../opentelemetry-sdk" } # for documentation tests
68 changes: 66 additions & 2 deletions opentelemetry/src/metrics/instruments/gauge.rs
@@ -1,12 +1,76 @@
use crate::{
metrics::{AsyncInstrument, AsyncInstrumentBuilder, MetricsError},
metrics::{AsyncInstrument, AsyncInstrumentBuilder, InstrumentBuilder, MetricsError},
KeyValue,
};
use core::fmt;
use std::sync::Arc;
use std::{any::Any, convert::TryFrom};

/// An instrument that records independent readings.
/// An SDK implemented instrument that records independent values
pub trait SyncGauge<T> {
/// Records an independent value.
fn record(&self, value: T, attributes: &[KeyValue]);
}

/// An instrument that records independent values
#[derive(Clone)]
pub struct Gauge<T>(Arc<dyn SyncGauge<T> + Send + Sync>);

impl<T> fmt::Debug for Gauge<T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!("Gauge<{}>", std::any::type_name::<T>()))
}
}

impl<T> Gauge<T> {
/// Create a new gauge.
pub fn new(inner: Arc<dyn SyncGauge<T> + Send + Sync>) -> Self {
Gauge(inner)
}

/// Records an independent value.
pub fn record(&self, value: T, attributes: &[KeyValue]) {
self.0.record(value, attributes)
}
}

impl TryFrom<InstrumentBuilder<'_, Gauge<u64>>> for Gauge<u64> {
type Error = MetricsError;

fn try_from(builder: InstrumentBuilder<'_, Gauge<u64>>) -> Result<Self, Self::Error> {
builder
.meter
.instrument_provider
.u64_gauge(builder.name, builder.description, builder.unit)
}
}

impl TryFrom<InstrumentBuilder<'_, Gauge<f64>>> for Gauge<f64> {
type Error = MetricsError;

fn try_from(builder: InstrumentBuilder<'_, Gauge<f64>>) -> Result<Self, Self::Error> {
builder
.meter
.instrument_provider
.f64_gauge(builder.name, builder.description, builder.unit)
}
}

impl TryFrom<InstrumentBuilder<'_, Gauge<i64>>> for Gauge<i64> {
type Error = MetricsError;

fn try_from(builder: InstrumentBuilder<'_, Gauge<i64>>) -> Result<Self, Self::Error> {
builder
.meter
.instrument_provider
.i64_gauge(builder.name, builder.description, builder.unit)
}
}

/// An async instrument that records independent readings.
#[derive(Clone)]
pub struct ObservableGauge<T>(Arc<dyn AsyncInstrument<T>>);

Expand Down