Skip to content

Commit

Permalink
Remove metrics quantiles (#525)
Browse files Browse the repository at this point in the history
  • Loading branch information
jtescher committed Apr 17, 2021
1 parent 1cdf725 commit e756cf6
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 337 deletions.
9 changes: 4 additions & 5 deletions examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use opentelemetry::sdk::{metrics::PushController, trace as sdktrace};
use opentelemetry::trace::TraceError;
use opentelemetry::{
baggage::BaggageExt,
metrics::{self, MetricsError, ObserverResult},
metrics::{MetricsError, ObserverResult},
trace::{TraceContextExt, Tracer},
Context, Key, KeyValue,
};
Expand All @@ -27,15 +27,14 @@ fn delayed_interval(duration: Duration) -> impl Stream<Item = tokio::time::Insta
opentelemetry::util::tokio_interval_stream(duration).skip(1)
}

fn init_meter() -> metrics::Result<PushController> {
fn init_meter() -> PushController {
opentelemetry::sdk::export::metrics::stdout(tokio::spawn, delayed_interval)
.with_quantiles(vec![0.5, 0.9, 0.99])
.with_formatter(|batch| {
serde_json::to_value(batch)
.map(|value| value.to_string())
.map_err(|err| MetricsError::Other(err.to_string()))
})
.try_init()
.init()
}

const FOO_KEY: Key = Key::from_static_str("ex.com/foo");
Expand All @@ -55,7 +54,7 @@ lazy_static::lazy_static! {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let _tracer = init_tracer()?;
let _started = init_meter()?;
let _started = init_meter();

let tracer = global::tracer("ex.com/basic");
let meter = global::meter("ex.com/basic");
Expand Down
19 changes: 1 addition & 18 deletions opentelemetry/benches/ddsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use opentelemetry::metrics::{InstrumentKind, Number, NumberKind};
use opentelemetry::sdk::export::metrics::Aggregator;
use opentelemetry::{
metrics::Descriptor,
sdk::{
export::metrics::Quantile,
metrics::aggregators::{ArrayAggregator, DdSketchAggregator, DdSketchConfig},
},
sdk::metrics::aggregators::{ArrayAggregator, DdSketchAggregator, DdSketchConfig},
};
use rand::Rng;
use std::sync::Arc;
Expand All @@ -19,10 +16,6 @@ fn generate_normal_data(num: usize) -> Vec<f64> {
data
}

fn get_test_quantile() -> &'static [f64] {
&[0.0, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.999, 1.0]
}

fn ddsketch(data: Vec<f64>) {
let aggregator =
DdSketchAggregator::new(&DdSketchConfig::new(0.001, 2048, 1e-9), NumberKind::F64);
Expand All @@ -43,11 +36,6 @@ fn ddsketch(data: Vec<f64>) {
aggregator
.synchronized_move(&new_aggregator, &descriptor)
.unwrap();
for quantile in get_test_quantile() {
if let Some(new_aggregator) = new_aggregator.as_any().downcast_ref::<DdSketchAggregator>() {
let _ = new_aggregator.quantile(*quantile);
}
}
}

fn array(data: Vec<f64>) {
Expand All @@ -66,11 +54,6 @@ fn array(data: Vec<f64>) {
aggregator
.synchronized_move(&new_aggregator, &descriptor)
.unwrap();
for quantile in get_test_quantile() {
if let Some(new_aggregator) = new_aggregator.as_any().downcast_ref::<ArrayAggregator>() {
let _ = new_aggregator.quantile(*quantile);
}
}
}

pub fn histogram(c: &mut Criterion) {
Expand Down
11 changes: 0 additions & 11 deletions opentelemetry/src/sdk/export/metrics/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ pub trait Max {
fn max(&self) -> Result<Number>;
}

/// Quantile returns an exact or estimated quantile over the
/// set of values that were aggregated.
pub trait Quantile {
/// The quantile of the currently aggregated metrics
fn quantile(&self, q: f64) -> Result<Number>;
}

/// LastValue returns the latest value that was aggregated.
pub trait LastValue {
/// The last value of the currently aggregated metrics
Expand Down Expand Up @@ -86,7 +79,3 @@ pub trait Histogram: Sum + Count {

/// MinMaxSumCount supports the Min, Max, Sum, and Count interfaces.
pub trait MinMaxSumCount: Min + Max + Sum + Count {}

/// Distribution supports the Min, Max, Sum, Count, and Quantile
/// interfaces.
pub trait Distribution: MinMaxSumCount + Quantile {}
3 changes: 1 addition & 2 deletions opentelemetry/src/sdk/export/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ mod aggregation;
pub mod stdout;

pub use aggregation::{
Buckets, Count, Distribution, Histogram, LastValue, Max, Min, MinMaxSumCount, Points, Quantile,
Sum,
Buckets, Count, Histogram, LastValue, Max, Min, MinMaxSumCount, Points, Sum,
};
pub use stdout::stdout;

Expand Down
81 changes: 12 additions & 69 deletions opentelemetry/src/sdk/export/metrics/stdout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::global;
use crate::sdk::{
export::metrics::{
CheckpointSet, Count, ExportKind, ExportKindFor, ExportKindSelector, Exporter, LastValue,
Max, Min, Quantile, Sum,
Max, Min, Sum,
},
metrics::{
aggregators::{
Expand All @@ -16,7 +16,6 @@ use crate::sdk::{
};
use crate::{
labels::{default_encoder, Encoder, LabelSet},
metrics,
metrics::{Descriptor, MetricsError, Result},
KeyValue,
};
Expand Down Expand Up @@ -49,12 +48,6 @@ pub struct StdoutExporter<W> {
/// Suppresses timestamp printing. This is useful to create deterministic test
/// conditions.
do_not_print_time: bool,
/// Quantiles are the desired aggregation quantiles for distribution summaries,
/// used when the configured aggregator supports quantiles.
///
/// Note: this exporter is meant as a demonstration; a real exporter may wish to
/// configure quantiles on a per-metric basis.
quantiles: Vec<f64>,
/// Encodes the labels.
label_encoder: Box<dyn Encoder + Send + Sync>,
/// An optional user-defined function to format a given export batch.
Expand Down Expand Up @@ -84,9 +77,6 @@ struct ExportLine {
#[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
last_value: Option<ExportNumeric>,

#[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
quantiles: Option<Vec<ExporterQuantile>>,

#[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
timestamp: Option<SystemTime>,
}
Expand All @@ -111,13 +101,6 @@ impl Serialize for ExportNumeric {
}
}

#[cfg_attr(feature = "serialize", derive(Serialize))]
#[derive(Debug)]
struct ExporterQuantile {
q: f64,
v: ExportNumeric,
}

impl<W> Exporter for StdoutExporter<W>
where
W: fmt::Debug + io::Write,
Expand Down Expand Up @@ -145,22 +128,7 @@ where
let mut expose = ExportLine::default();

if let Some(array) = agg.as_any().downcast_ref::<ArrayAggregator>() {
expose.min = Some(ExportNumeric(array.min()?.to_debug(kind)));
expose.max = Some(ExportNumeric(array.max()?.to_debug(kind)));
expose.sum = Some(ExportNumeric(array.sum()?.to_debug(kind)));
expose.count = array.count()?;

let quantiles = self
.quantiles
.iter()
.map(|&q| {
Ok(ExporterQuantile {
q,
v: ExportNumeric(array.quantile(q)?.to_debug(kind)),
})
})
.collect::<Result<Vec<_>>>()?;
expose.quantiles = Some(quantiles);
}

if let Some(last_value) = agg.as_any().downcast_ref::<LastValueAggregator>() {
Expand Down Expand Up @@ -317,14 +285,6 @@ where
}
}

/// Set the quantiles that this exporter will use.
pub fn with_quantiles(self, quantiles: Vec<f64>) -> Self {
StdoutExporterBuilder {
quantiles: Some(quantiles),
..self
}
}

/// Set the label encoder that this exporter will use.
pub fn with_label_encoder<E>(self, label_encoder: E) -> Self
where
Expand Down Expand Up @@ -356,15 +316,21 @@ where
}

/// Build a new push controller, returning errors if they arise.
pub fn try_init(mut self) -> metrics::Result<PushController> {
pub fn init(mut self) -> PushController {
let period = self.period.take();
let (spawn, interval, exporter) = self.try_build()?;
let exporter = StdoutExporter {
writer: self.writer,
pretty_print: self.pretty_print,
do_not_print_time: self.do_not_print_time,
label_encoder: self.label_encoder.unwrap_or_else(default_encoder),
formatter: self.formatter,
};
let mut push_builder = controllers::push(
simple::Selector::Exact,
ExportKindSelector::Stateless,
exporter,
spawn,
interval,
self.spawn,
self.interval,
)
.with_stateful(true);
if let Some(period) = period {
Expand All @@ -373,29 +339,6 @@ where

let controller = push_builder.build();
global::set_meter_provider(controller.provider());
Ok(controller)
}

fn try_build(self) -> metrics::Result<(S, I, StdoutExporter<W>)> {
if let Some(quantiles) = self.quantiles.as_ref() {
for q in quantiles {
if *q < 0.0 || *q > 1.0 {
return Err(MetricsError::InvalidQuantile);
}
}
}

Ok((
self.spawn,
self.interval,
StdoutExporter {
writer: self.writer,
pretty_print: self.pretty_print,
do_not_print_time: self.do_not_print_time,
quantiles: self.quantiles.unwrap_or_else(|| vec![0.5, 0.9, 0.99]),
label_encoder: self.label_encoder.unwrap_or_else(default_encoder),
formatter: self.formatter,
},
))
controller
}
}
69 changes: 1 addition & 68 deletions opentelemetry/src/sdk/metrics/aggregators/array.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::metrics::{AtomicNumber, Descriptor, MetricsError, Number, NumberKind, Result};
use crate::sdk::{
export::metrics::{Count, Distribution, Max, Min, MinMaxSumCount, Points, Quantile, Sum},
export::metrics::{Count, Points},
metrics::Aggregator,
};
use std::any::Any;
Expand All @@ -18,37 +18,6 @@ pub struct ArrayAggregator {
inner: Mutex<Inner>,
}

impl Min for ArrayAggregator {
fn min(&self) -> Result<Number> {
self.inner.lock().map_err(Into::into).and_then(|inner| {
inner
.points
.as_ref()
.map_or(Ok(0u64.into()), |p| p.quantile(0.0))
})
}
}

impl Max for ArrayAggregator {
fn max(&self) -> Result<Number> {
self.inner.lock().map_err(Into::into).and_then(|inner| {
inner
.points
.as_ref()
.map_or(Ok(0u64.into()), |p| p.quantile(1.0))
})
}
}

impl Sum for ArrayAggregator {
fn sum(&self) -> Result<Number> {
self.inner
.lock()
.map_err(Into::into)
.map(|inner| inner.sum.load())
}
}

impl Count for ArrayAggregator {
fn count(&self) -> Result<u64> {
self.inner
Expand All @@ -58,21 +27,6 @@ impl Count for ArrayAggregator {
}
}

impl MinMaxSumCount for ArrayAggregator {}

impl Quantile for ArrayAggregator {
fn quantile(&self, q: f64) -> Result<Number> {
self.inner.lock().map_err(Into::into).and_then(|inner| {
inner
.points
.as_ref()
.map_or(Ok(0u64.into()), |p| p.quantile(q))
})
}
}

impl Distribution for ArrayAggregator {}

impl Points for ArrayAggregator {
fn points(&self) -> Result<Vec<Number>> {
self.inner
Expand Down Expand Up @@ -198,24 +152,3 @@ impl PointsData {
self.sort(kind)
}
}

impl Quantile for PointsData {
fn quantile(&self, q: f64) -> Result<Number> {
if self.0.is_empty() {
return Err(MetricsError::NoDataCollected);
}

if !(0.0..=1.0).contains(&q) {
return Err(MetricsError::InvalidQuantile);
}

if q == 0.0 || self.0.len() == 1 {
return Ok(self.0[0].clone());
} else if (q - 1.0).abs() < std::f64::EPSILON {
return Ok(self.0[self.0.len() - 1].clone());
}

let position = (self.0.len() as f64 - 1.0) * q;
Ok(self.0[position.ceil() as usize].clone())
}
}

0 comments on commit e756cf6

Please sign in to comment.