Skip to content

Commit

Permalink
Adding a dynamic dispatch to Aggregator Selector (#497)
Browse files Browse the repository at this point in the history
  • Loading branch information
dawid-nowak committed Apr 7, 2021
1 parent 406eb9a commit b7bd0d9
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -18,6 +18,7 @@ members = [
"examples/aws-xray",
"examples/basic",
"examples/basic-otlp",
"examples/basic-otlp-with-selector",
"examples/datadog",
"examples/external-otlp-tonic-tokio",
"examples/grpc",
Expand Down
12 changes: 12 additions & 0 deletions examples/basic-otlp-with-selector/Cargo.toml
@@ -0,0 +1,12 @@
[package]
name = "basic-otlp-with-selector"
version = "0.1.0"
edition = "2018"

[dependencies]
futures = "0.3"
lazy_static = "1.4"
opentelemetry = { path = "../../opentelemetry", features = ["rt-tokio", "metrics", "serialize"] }
opentelemetry-otlp = { path = "../../opentelemetry-otlp", features = ["tonic", "metrics"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["full"] }
4 changes: 4 additions & 0 deletions examples/basic-otlp-with-selector/README.md
@@ -0,0 +1,4 @@
# Basic OpenTelemetry Example

This example shows basic span and metric usage, and exports to the [OpenTelemetry Collector](https://github.com/open-telemetry/opentelemetry-collector) via OTLP with a custom metric aggregator selector.

134 changes: 134 additions & 0 deletions examples/basic-otlp-with-selector/src/main.rs
@@ -0,0 +1,134 @@
use futures::stream::Stream;
use futures::StreamExt;
use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::sdk::{
export::metrics::{Aggregator, AggregatorSelector},
metrics::{aggregators, PushController},
};
use opentelemetry::trace::TraceError;
use opentelemetry::{
baggage::BaggageExt,
metrics::{self, Descriptor, ObserverResult},
trace::{TraceContextExt, Tracer},
Context, Key, KeyValue,
};
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry_otlp::ExporterConfig;
use opentelemetry_otlp::Protocol;
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
.with_endpoint("http://localhost:4317")
.with_tonic()
.install_batch(opentelemetry::runtime::Tokio)
}

// Skip first immediate tick from tokio, not needed for async_std.
fn delayed_interval(duration: Duration) -> impl Stream<Item = tokio::time::Instant> {
opentelemetry::util::tokio_interval_stream(duration).skip(1)
}

#[derive(Debug)]
struct CustomAggregator();

impl AggregatorSelector for CustomAggregator {
fn aggregator_for(
&self,
descriptor: &Descriptor,
) -> Option<Arc<(dyn Aggregator + Sync + std::marker::Send + 'static)>> {
match descriptor.name() {
"ex.com.one" => Some(Arc::new(aggregators::last_value())),
"ex.com.two" => Some(Arc::new(aggregators::histogram(
descriptor,
&[0.0, 0.5, 1.0, 10.0],
))),
_ => Some(Arc::new(aggregators::sum())),
}
}
}

fn init_meter() -> metrics::Result<PushController> {
let export_config = ExporterConfig {
endpoint: "http://localhost:4317".to_string(),
protocol: Protocol::Grpc,
..ExporterConfig::default()
};
opentelemetry_otlp::new_metrics_pipeline(tokio::spawn, delayed_interval)
.with_export_config(export_config)
.with_aggregator_selector(CustomAggregator())
.build()
}

const FOO_KEY: Key = Key::from_static_str("ex.com/foo");
const BAR_KEY: Key = Key::from_static_str("ex.com/bar");
const LEMONS_KEY: Key = Key::from_static_str("lemons");
const ANOTHER_KEY: Key = Key::from_static_str("ex.com/another");

lazy_static::lazy_static! {
static ref COMMON_LABELS: [KeyValue; 4] = [
LEMONS_KEY.i64(10),
KeyValue::new("A", "1"),
KeyValue::new("B", "2"),
KeyValue::new("C", "3"),
];
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let _ = init_tracer()?;
let _started = init_meter()?;

let tracer = global::tracer("ex.com/basic");
let meter = global::meter("ex.com/basic");

let one_metric_callback = |res: ObserverResult<f64>| res.observe(1.0, COMMON_LABELS.as_ref());
let _ = meter
.f64_value_observer("ex.com.one", one_metric_callback)
.with_description("A ValueObserver set to 1.0")
.init();

let value_recorder_two = meter.f64_value_recorder("ex.com.two").init();

let another_recorder = meter.f64_value_recorder("ex.com.two").init();
another_recorder.record(5.5, COMMON_LABELS.as_ref());

let _baggage =
Context::current_with_baggage(vec![FOO_KEY.string("foo1"), BAR_KEY.string("bar1")])
.attach();

let value_recorder = value_recorder_two.bind(COMMON_LABELS.as_ref());
tracer.in_span("operation", |cx| {
let span = cx.span();
span.add_event(
"Nice operation!".to_string(),
vec![Key::new("bogons").i64(100)],
);
span.set_attribute(ANOTHER_KEY.string("yes"));

meter.record_batch_with_context(
// Note: call-site variables added as context Entries:
&Context::current_with_baggage(vec![ANOTHER_KEY.string("xyz")]),
COMMON_LABELS.as_ref(),
vec![value_recorder_two.measurement(2.0)],
);

tracer.in_span("Sub operation...", |cx| {
let span = cx.span();
span.set_attribute(LEMONS_KEY.string("five"));

span.add_event("Sub span event".to_string(), vec![]);

value_recorder.record(1.3);
});
});

// wait for 1 minutes so that we could see metrics being pushed via OTLP every 10 seconds.
tokio::time::sleep(Duration::from_secs(60)).await;

shutdown_tracer_provider();

Ok(())
}
2 changes: 1 addition & 1 deletion examples/basic-otlp/src/main.rs
Expand Up @@ -39,7 +39,7 @@ fn init_meter() -> metrics::Result<PushController> {

const FOO_KEY: Key = Key::from_static_str("ex.com/foo");
const BAR_KEY: Key = Key::from_static_str("ex.com/bar");
const LEMONS_KEY: Key = Key::from_static_str("ex.com/lemons");
const LEMONS_KEY: Key = Key::from_static_str("lemons");
const ANOTHER_KEY: Key = Key::from_static_str("ex.com/another");

lazy_static::lazy_static! {
Expand Down
18 changes: 16 additions & 2 deletions opentelemetry-otlp/src/metric.rs
Expand Up @@ -113,10 +113,24 @@ where
}

/// Build with the aggregator selector
pub fn with_aggregator_selector(self, aggregator_selector: AS) -> Self {
pub fn with_aggregator_selector<T>(
self,
aggregator_selector: T,
) -> OtlpMetricPipelineBuilder<T, ES, SP, SO, I, IO>
where
T: AggregatorSelector + Send + Sync + 'static,
{
OtlpMetricPipelineBuilder {
aggregator_selector,
..self
export_selector: self.export_selector,
spawn: self.spawn,
interval: self.interval,
export_config: self.export_config,
tonic_config: self.tonic_config,
resource: self.resource,
stateful: self.stateful,
period: self.period,
timeout: self.timeout,
}
}

Expand Down

0 comments on commit b7bd0d9

Please sign in to comment.