Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove metrics quantiles #525

Merged
merged 2 commits into from
Apr 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 4 additions & 5 deletions examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use opentelemetry::sdk::{metrics::PushController, trace as sdktrace};
use opentelemetry::trace::TraceError;
use opentelemetry::{
baggage::BaggageExt,
metrics::{self, MetricsError, ObserverResult},
metrics::{MetricsError, ObserverResult},
trace::{TraceContextExt, Tracer},
Context, Key, KeyValue,
};
Expand All @@ -27,15 +27,14 @@ fn delayed_interval(duration: Duration) -> impl Stream<Item = tokio::time::Insta
opentelemetry::util::tokio_interval_stream(duration).skip(1)
}

fn init_meter() -> metrics::Result<PushController> {
fn init_meter() -> PushController {
opentelemetry::sdk::export::metrics::stdout(tokio::spawn, delayed_interval)
.with_quantiles(vec![0.5, 0.9, 0.99])
.with_formatter(|batch| {
serde_json::to_value(batch)
.map(|value| value.to_string())
.map_err(|err| MetricsError::Other(err.to_string()))
})
.try_init()
.init()
}

const FOO_KEY: Key = Key::from_static_str("ex.com/foo");
Expand All @@ -55,7 +54,7 @@ lazy_static::lazy_static! {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let _tracer = init_tracer()?;
let _started = init_meter()?;
let _started = init_meter();

let tracer = global::tracer("ex.com/basic");
let meter = global::meter("ex.com/basic");
Expand Down
19 changes: 1 addition & 18 deletions opentelemetry/benches/ddsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use opentelemetry::metrics::{InstrumentKind, Number, NumberKind};
use opentelemetry::sdk::export::metrics::Aggregator;
use opentelemetry::{
metrics::Descriptor,
sdk::{
export::metrics::Quantile,
metrics::aggregators::{ArrayAggregator, DdSketchAggregator, DdSketchConfig},
},
sdk::metrics::aggregators::{ArrayAggregator, DdSketchAggregator, DdSketchConfig},
};
use rand::Rng;
use std::sync::Arc;
Expand All @@ -19,10 +16,6 @@ fn generate_normal_data(num: usize) -> Vec<f64> {
data
}

fn get_test_quantile() -> &'static [f64] {
&[0.0, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.999, 1.0]
}

fn ddsketch(data: Vec<f64>) {
let aggregator =
DdSketchAggregator::new(&DdSketchConfig::new(0.001, 2048, 1e-9), NumberKind::F64);
Expand All @@ -43,11 +36,6 @@ fn ddsketch(data: Vec<f64>) {
aggregator
.synchronized_move(&new_aggregator, &descriptor)
.unwrap();
for quantile in get_test_quantile() {
if let Some(new_aggregator) = new_aggregator.as_any().downcast_ref::<DdSketchAggregator>() {
let _ = new_aggregator.quantile(*quantile);
}
}
}

fn array(data: Vec<f64>) {
Expand All @@ -66,11 +54,6 @@ fn array(data: Vec<f64>) {
aggregator
.synchronized_move(&new_aggregator, &descriptor)
.unwrap();
for quantile in get_test_quantile() {
if let Some(new_aggregator) = new_aggregator.as_any().downcast_ref::<ArrayAggregator>() {
let _ = new_aggregator.quantile(*quantile);
}
}
}

pub fn histogram(c: &mut Criterion) {
Expand Down
11 changes: 0 additions & 11 deletions opentelemetry/src/sdk/export/metrics/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ pub trait Max {
fn max(&self) -> Result<Number>;
}

/// Quantile returns an exact or estimated quantile over the
/// set of values that were aggregated.
pub trait Quantile {
/// The quantile of the currently aggregated metrics
fn quantile(&self, q: f64) -> Result<Number>;
}

/// LastValue returns the latest value that was aggregated.
pub trait LastValue {
/// The last value of the currently aggregated metrics
Expand Down Expand Up @@ -86,7 +79,3 @@ pub trait Histogram: Sum + Count {

/// MinMaxSumCount supports the Min, Max, Sum, and Count interfaces.
pub trait MinMaxSumCount: Min + Max + Sum + Count {}

/// Distribution supports the Min, Max, Sum, Count, and Quantile
/// interfaces.
pub trait Distribution: MinMaxSumCount + Quantile {}
3 changes: 1 addition & 2 deletions opentelemetry/src/sdk/export/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ mod aggregation;
pub mod stdout;

pub use aggregation::{
Buckets, Count, Distribution, Histogram, LastValue, Max, Min, MinMaxSumCount, Points, Quantile,
Sum,
Buckets, Count, Histogram, LastValue, Max, Min, MinMaxSumCount, Points, Sum,
};
pub use stdout::stdout;

Expand Down
81 changes: 12 additions & 69 deletions opentelemetry/src/sdk/export/metrics/stdout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::global;
use crate::sdk::{
export::metrics::{
CheckpointSet, Count, ExportKind, ExportKindFor, ExportKindSelector, Exporter, LastValue,
Max, Min, Quantile, Sum,
Max, Min, Sum,
},
metrics::{
aggregators::{
Expand All @@ -16,7 +16,6 @@ use crate::sdk::{
};
use crate::{
labels::{default_encoder, Encoder, LabelSet},
metrics,
metrics::{Descriptor, MetricsError, Result},
KeyValue,
};
Expand Down Expand Up @@ -49,12 +48,6 @@ pub struct StdoutExporter<W> {
/// Suppresses timestamp printing. This is useful to create deterministic test
/// conditions.
do_not_print_time: bool,
/// Quantiles are the desired aggregation quantiles for distribution summaries,
/// used when the configured aggregator supports quantiles.
///
/// Note: this exporter is meant as a demonstration; a real exporter may wish to
/// configure quantiles on a per-metric basis.
quantiles: Vec<f64>,
/// Encodes the labels.
label_encoder: Box<dyn Encoder + Send + Sync>,
/// An optional user-defined function to format a given export batch.
Expand Down Expand Up @@ -84,9 +77,6 @@ struct ExportLine {
#[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
last_value: Option<ExportNumeric>,

#[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
quantiles: Option<Vec<ExporterQuantile>>,

#[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
timestamp: Option<SystemTime>,
}
Expand All @@ -111,13 +101,6 @@ impl Serialize for ExportNumeric {
}
}

#[cfg_attr(feature = "serialize", derive(Serialize))]
#[derive(Debug)]
struct ExporterQuantile {
q: f64,
v: ExportNumeric,
}

impl<W> Exporter for StdoutExporter<W>
where
W: fmt::Debug + io::Write,
Expand Down Expand Up @@ -145,22 +128,7 @@ where
let mut expose = ExportLine::default();

if let Some(array) = agg.as_any().downcast_ref::<ArrayAggregator>() {
expose.min = Some(ExportNumeric(array.min()?.to_debug(kind)));
expose.max = Some(ExportNumeric(array.max()?.to_debug(kind)));
expose.sum = Some(ExportNumeric(array.sum()?.to_debug(kind)));
expose.count = array.count()?;

let quantiles = self
.quantiles
.iter()
.map(|&q| {
Ok(ExporterQuantile {
q,
v: ExportNumeric(array.quantile(q)?.to_debug(kind)),
})
})
.collect::<Result<Vec<_>>>()?;
expose.quantiles = Some(quantiles);
}

if let Some(last_value) = agg.as_any().downcast_ref::<LastValueAggregator>() {
Expand Down Expand Up @@ -317,14 +285,6 @@ where
}
}

/// Set the quantiles that this exporter will use.
pub fn with_quantiles(self, quantiles: Vec<f64>) -> Self {
StdoutExporterBuilder {
quantiles: Some(quantiles),
..self
}
}

/// Set the label encoder that this exporter will use.
pub fn with_label_encoder<E>(self, label_encoder: E) -> Self
where
Expand Down Expand Up @@ -356,15 +316,21 @@ where
}

/// Build a new push controller, returning errors if they arise.
pub fn try_init(mut self) -> metrics::Result<PushController> {
pub fn init(mut self) -> PushController {
let period = self.period.take();
let (spawn, interval, exporter) = self.try_build()?;
let exporter = StdoutExporter {
writer: self.writer,
pretty_print: self.pretty_print,
do_not_print_time: self.do_not_print_time,
label_encoder: self.label_encoder.unwrap_or_else(default_encoder),
formatter: self.formatter,
};
let mut push_builder = controllers::push(
simple::Selector::Exact,
ExportKindSelector::Stateless,
exporter,
spawn,
interval,
self.spawn,
self.interval,
)
.with_stateful(true);
if let Some(period) = period {
Expand All @@ -373,29 +339,6 @@ where

let controller = push_builder.build();
global::set_meter_provider(controller.provider());
Ok(controller)
}

fn try_build(self) -> metrics::Result<(S, I, StdoutExporter<W>)> {
if let Some(quantiles) = self.quantiles.as_ref() {
for q in quantiles {
if *q < 0.0 || *q > 1.0 {
return Err(MetricsError::InvalidQuantile);
}
}
}

Ok((
self.spawn,
self.interval,
StdoutExporter {
writer: self.writer,
pretty_print: self.pretty_print,
do_not_print_time: self.do_not_print_time,
quantiles: self.quantiles.unwrap_or_else(|| vec![0.5, 0.9, 0.99]),
label_encoder: self.label_encoder.unwrap_or_else(default_encoder),
formatter: self.formatter,
},
))
controller
}
}
69 changes: 1 addition & 68 deletions opentelemetry/src/sdk/metrics/aggregators/array.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::metrics::{AtomicNumber, Descriptor, MetricsError, Number, NumberKind, Result};
use crate::sdk::{
export::metrics::{Count, Distribution, Max, Min, MinMaxSumCount, Points, Quantile, Sum},
export::metrics::{Count, Points},
metrics::Aggregator,
};
use std::any::Any;
Expand All @@ -18,37 +18,6 @@ pub struct ArrayAggregator {
inner: Mutex<Inner>,
}

impl Min for ArrayAggregator {
fn min(&self) -> Result<Number> {
self.inner.lock().map_err(Into::into).and_then(|inner| {
inner
.points
.as_ref()
.map_or(Ok(0u64.into()), |p| p.quantile(0.0))
})
}
}

impl Max for ArrayAggregator {
fn max(&self) -> Result<Number> {
self.inner.lock().map_err(Into::into).and_then(|inner| {
inner
.points
.as_ref()
.map_or(Ok(0u64.into()), |p| p.quantile(1.0))
})
}
}

impl Sum for ArrayAggregator {
fn sum(&self) -> Result<Number> {
self.inner
.lock()
.map_err(Into::into)
.map(|inner| inner.sum.load())
}
}

impl Count for ArrayAggregator {
fn count(&self) -> Result<u64> {
self.inner
Expand All @@ -58,21 +27,6 @@ impl Count for ArrayAggregator {
}
}

impl MinMaxSumCount for ArrayAggregator {}

impl Quantile for ArrayAggregator {
fn quantile(&self, q: f64) -> Result<Number> {
self.inner.lock().map_err(Into::into).and_then(|inner| {
inner
.points
.as_ref()
.map_or(Ok(0u64.into()), |p| p.quantile(q))
})
}
}

impl Distribution for ArrayAggregator {}

impl Points for ArrayAggregator {
fn points(&self) -> Result<Vec<Number>> {
self.inner
Expand Down Expand Up @@ -198,24 +152,3 @@ impl PointsData {
self.sort(kind)
}
}

impl Quantile for PointsData {
fn quantile(&self, q: f64) -> Result<Number> {
if self.0.is_empty() {
return Err(MetricsError::NoDataCollected);
}

if !(0.0..=1.0).contains(&q) {
return Err(MetricsError::InvalidQuantile);
}

if q == 0.0 || self.0.len() == 1 {
return Ok(self.0[0].clone());
} else if (q - 1.0).abs() < std::f64::EPSILON {
return Ok(self.0[self.0.len() - 1].clone());
}

let position = (self.0.len() as f64 - 1.0) * q;
Ok(self.0[position.ceil() as usize].clone())
}
}