Skip to content

Commit

Permalink
Adding a dynamic dispatch to Aggregator Selector
Browse files Browse the repository at this point in the history
  • Loading branch information
dawid-nowak committed Mar 29, 2021
1 parent b6d8494 commit 2eb9af1
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ members = [
"examples/aws-xray",
"examples/basic",
"examples/basic-otlp",
"examples/basic-otlp-with-selector",
"examples/datadog",
"examples/external-otlp-tonic-tokio",
"examples/grpc",
Expand Down
12 changes: 12 additions & 0 deletions examples/basic-otlp-with-selector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "basic-otlp-with-selector"
version = "0.1.0"
edition = "2018"

[dependencies]
futures = "0.3"
lazy_static = "1.4"
opentelemetry = { path = "../../opentelemetry", features = ["rt-tokio", "metrics", "serialize"] }
opentelemetry-otlp = { path = "../../opentelemetry-otlp", features = ["tonic", "metrics"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["full"] }
4 changes: 4 additions & 0 deletions examples/basic-otlp-with-selector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Basic OpenTelemetry Example

This example shows basic span and metric usage, and exports to the [OpenTelemetry Collector](https://github.com/open-telemetry/opentelemetry-collector) via OTLP with a custom metric aggregator selector.

132 changes: 132 additions & 0 deletions examples/basic-otlp-with-selector/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use futures::stream::Stream;
use futures::StreamExt;
use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::sdk::{
export::metrics::{Aggregator, AggregatorSelector},
metrics::{aggregators, PushController},
};
use opentelemetry::trace::TraceError;
use opentelemetry::{
baggage::BaggageExt,
metrics::{self, Descriptor, ObserverResult},
trace::{TraceContextExt, Tracer},
Context, Key, KeyValue,
};
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry_otlp::ExporterConfig;
use opentelemetry_otlp::Protocol;
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
.with_endpoint("http://localhost:4317")
.with_tonic()
.install_batch(opentelemetry::runtime::Tokio)
}

// Skip first immediate tick from tokio, not needed for async_std.
fn delayed_interval(duration: Duration) -> impl Stream<Item = tokio::time::Instant> {
opentelemetry::util::tokio_interval_stream(duration).skip(1)
}

#[derive(Debug)]
struct CustomAggregator();

impl AggregatorSelector for CustomAggregator {
fn aggregator_for(
&self,
descriptor: &Descriptor,
) -> Option<Arc<(dyn Aggregator + Sync + std::marker::Send + 'static)>> {
match descriptor.name() {
"ex.com.one" => Some(Arc::new(aggregators::last_value())),
"ex.com.two" => Some(Arc::new(aggregators::array())),
_ => Some(Arc::new(aggregators::sum())),
}
}
}

fn init_meter() -> metrics::Result<PushController> {
let export_config = ExporterConfig {
endpoint: "http://localhost:4317".to_string(),
protocol: Protocol::Grpc,
..ExporterConfig::default()
};
opentelemetry_otlp::new_metrics_pipeline(tokio::spawn, delayed_interval)
.with_export_config(export_config)
.with_aggregator_selector(Box::new(CustomAggregator()))
.build()
}

const FOO_KEY: Key = Key::from_static_str("ex.com/foo");
const BAR_KEY: Key = Key::from_static_str("ex.com/bar");
const LEMONS_KEY: Key = Key::from_static_str("lemons");
const ANOTHER_KEY: Key = Key::from_static_str("ex.com/another");

lazy_static::lazy_static! {
static ref COMMON_LABELS: [KeyValue; 4] = [
LEMONS_KEY.i64(10),
KeyValue::new("A", "1"),
KeyValue::new("B", "2"),
KeyValue::new("C", "3"),
];
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let _ = init_tracer()?;
let _started = init_meter()?;

let tracer = global::tracer("ex.com/basic");
let meter = global::meter("ex.com/basic");

let one_metric_callback = |res: ObserverResult<f64>| res.observe(1.0, COMMON_LABELS.as_ref());
let _ = meter
.f64_value_observer("ex.com.one", one_metric_callback)
.with_description("A ValueObserver set to 1.0")
.init();

let value_recorder_two = meter.f64_value_recorder("ex.com.two").init();

let another_recorder = meter.f64_value_recorder("ex.com.two").init();
another_recorder.record(5.5, COMMON_LABELS.as_ref());

let _baggage =
Context::current_with_baggage(vec![FOO_KEY.string("foo1"), BAR_KEY.string("bar1")])
.attach();

let value_recorder = value_recorder_two.bind(COMMON_LABELS.as_ref());

tracer.in_span("operation", |cx| {
let span = cx.span();
span.add_event(
"Nice operation!".to_string(),
vec![Key::new("bogons").i64(100)],
);
span.set_attribute(ANOTHER_KEY.string("yes"));

meter.record_batch_with_context(
// Note: call-site variables added as context Entries:
&Context::current_with_baggage(vec![ANOTHER_KEY.string("xyz")]),
COMMON_LABELS.as_ref(),
vec![value_recorder_two.measurement(2.0)],
);

tracer.in_span("Sub operation...", |cx| {
let span = cx.span();
span.set_attribute(LEMONS_KEY.string("five"));

span.add_event("Sub span event".to_string(), vec![]);

value_recorder.record(1.3);
});
});

// wait for 1 minutes so that we could see metrics being pushed via OTLP every 10 seconds.
tokio::time::sleep(Duration::from_secs(60)).await;

shutdown_tracer_provider();

Ok(())
}
4 changes: 2 additions & 2 deletions examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ fn init_meter() -> metrics::Result<PushController> {
};
opentelemetry_otlp::new_metrics_pipeline(tokio::spawn, delayed_interval)
.with_export_config(export_config)
.with_aggregator_selector(selectors::simple::Selector::Exact)
.with_aggregator_selector(Box::new(selectors::simple::Selector::Exact))
.build()
}

const FOO_KEY: Key = Key::from_static_str("ex.com/foo");
const BAR_KEY: Key = Key::from_static_str("ex.com/bar");
const LEMONS_KEY: Key = Key::from_static_str("ex.com/lemons");
const LEMONS_KEY: Key = Key::from_static_str("lemons");
const ANOTHER_KEY: Key = Key::from_static_str("ex.com/another");

lazy_static::lazy_static! {
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ pub use crate::span::TonicConfig;
pub use crate::span::GrpcioConfig;

#[cfg(feature = "metrics")]
pub use crate::metric::{new_metrics_pipeline, MetricsExporter, OtlpMetricPipelineBuilder};
pub use crate::metric::{
new_metrics_pipeline, new_metrics_pipeline_with_selector, MetricsExporter,
OtlpMetricPipelineBuilder,
};

#[cfg(feature = "grpc-sys")]
pub use crate::span::{Compression, Credentials};
Expand Down
37 changes: 29 additions & 8 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,33 @@ use tonic::Request;
pub fn new_metrics_pipeline<SP, SO, I, IO>(
spawn: SP,
interval: I,
) -> OtlpMetricPipelineBuilder<selectors::simple::Selector, ExportKindSelector, SP, SO, I, IO>
) -> OtlpMetricPipelineBuilder<ExportKindSelector, SP, SO, I, IO>
where
SP: Fn(PushControllerWorker) -> SO,
I: Fn(time::Duration) -> IO,
{
new_metrics_pipeline_with_selector(
spawn,
interval,
Box::new(selectors::simple::Selector::Inexpensive),
)
}

/// Return a pipeline to build OTLP metrics exporter.
///
/// Note that currently the OTLP metrics exporter only supports tonic as it's grpc layer and tokio as
/// runtime.
pub fn new_metrics_pipeline_with_selector<SP, SO, I, IO>(
spawn: SP,
interval: I,
aggregator_selector: Box<dyn AggregatorSelector + Send + Sync + 'static>,
) -> OtlpMetricPipelineBuilder<ExportKindSelector, SP, SO, I, IO>
where
SP: Fn(PushControllerWorker) -> SO,
I: Fn(time::Duration) -> IO,
{
OtlpMetricPipelineBuilder {
aggregator_selector: selectors::simple::Selector::Inexpensive,
aggregator_selector,
export_selector: ExportKindSelector::Cumulative,
spawn,
interval,
Expand All @@ -61,14 +81,13 @@ where
/// Note that currently the OTLP metrics exporter only supports tonic as it's grpc layer and tokio as
/// runtime.
#[derive(Debug)]
pub struct OtlpMetricPipelineBuilder<AS, ES, SP, SO, I, IO>
pub struct OtlpMetricPipelineBuilder<ES, SP, SO, I, IO>
where
AS: AggregatorSelector + Send + Sync + 'static,
ES: ExportKindFor + Send + Sync + Clone + 'static,
SP: Fn(PushControllerWorker) -> SO,
I: Fn(time::Duration) -> IO,
{
aggregator_selector: AS,
aggregator_selector: Box<dyn AggregatorSelector + Send + Sync + 'static>,
export_selector: ES,
spawn: SP,
interval: I,
Expand All @@ -80,9 +99,8 @@ where
timeout: Option<time::Duration>,
}

impl<AS, ES, SP, SO, I, IO, IOI> OtlpMetricPipelineBuilder<AS, ES, SP, SO, I, IO>
impl<ES, SP, SO, I, IO, IOI> OtlpMetricPipelineBuilder<ES, SP, SO, I, IO>
where
AS: AggregatorSelector + Send + Sync + 'static,
ES: ExportKindFor + Send + Sync + Clone + 'static,
SP: Fn(PushControllerWorker) -> SO,
I: Fn(time::Duration) -> IO,
Expand Down Expand Up @@ -113,7 +131,10 @@ where
}

/// Build with the aggregator selector
pub fn with_aggregator_selector(self, aggregator_selector: AS) -> Self {
pub fn with_aggregator_selector(
self,
aggregator_selector: Box<dyn AggregatorSelector + Send + Sync + 'static>,
) -> Self {
OtlpMetricPipelineBuilder {
aggregator_selector,
..self
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry/src/sdk/export/metrics/stdout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ where
let period = self.period.take();
let (spawn, interval, exporter) = self.try_build()?;
let mut push_builder = controllers::push(
simple::Selector::Exact,
Box::new(simple::Selector::Exact),
ExportKindSelector::Stateless,
exporter,
spawn,
Expand Down
7 changes: 3 additions & 4 deletions opentelemetry/src/sdk/metrics/controllers/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@ lazy_static::lazy_static! {
}

/// Create a new `PushControllerBuilder`.
pub fn push<AS, ES, E, SP, SO, I, IO>(
aggregator_selector: AS,
pub fn push<ES, E, SP, SO, I, IO>(
aggregator_selector: Box<dyn AggregatorSelector + Send + Sync + 'static>,
export_selector: ES,
exporter: E,
spawn: SP,
interval: I,
) -> PushControllerBuilder<SP, I>
where
AS: AggregatorSelector + Send + Sync + 'static,
ES: ExportKindFor + Send + Sync + 'static,
E: Exporter + Send + Sync + 'static,
SP: Fn(PushControllerWorker) -> SO,
I: Fn(time::Duration) -> IO,
{
PushControllerBuilder {
aggregator_selector: Box::new(aggregator_selector),
aggregator_selector: aggregator_selector,
export_selector: Box::new(export_selector),
exporter: Box::new(exporter),
spawn,
Expand Down

0 comments on commit 2eb9af1

Please sign in to comment.