diff --git a/Cargo.toml b/Cargo.toml index 6274257ba1..22010cdf3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "examples/aws-xray", "examples/basic", "examples/basic-otlp", + "examples/basic-otlp-with-selector", "examples/datadog", "examples/external-otlp-tonic-tokio", "examples/grpc", diff --git a/examples/basic-otlp-with-selector/Cargo.toml b/examples/basic-otlp-with-selector/Cargo.toml new file mode 100644 index 0000000000..9224157aa2 --- /dev/null +++ b/examples/basic-otlp-with-selector/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "basic-otlp-with-selector" +version = "0.1.0" +edition = "2018" + +[dependencies] +futures = "0.3" +lazy_static = "1.4" +opentelemetry = { path = "../../opentelemetry", features = ["rt-tokio", "metrics", "serialize"] } +opentelemetry-otlp = { path = "../../opentelemetry-otlp", features = ["tonic", "metrics"] } +serde_json = "1.0" +tokio = { version = "1.0", features = ["full"] } diff --git a/examples/basic-otlp-with-selector/README.md b/examples/basic-otlp-with-selector/README.md new file mode 100644 index 0000000000..db067418cb --- /dev/null +++ b/examples/basic-otlp-with-selector/README.md @@ -0,0 +1,4 @@ +# Basic OpenTelemetry Example + +This example shows basic span and metric usage, and exports to the [OpenTelemetry Collector](https://github.com/open-telemetry/opentelemetry-collector) via OTLP with a custom metric aggregator selector. + diff --git a/examples/basic-otlp-with-selector/src/main.rs b/examples/basic-otlp-with-selector/src/main.rs new file mode 100644 index 0000000000..7cbc46d176 --- /dev/null +++ b/examples/basic-otlp-with-selector/src/main.rs @@ -0,0 +1,134 @@ +use futures::stream::Stream; +use futures::StreamExt; +use opentelemetry::global::shutdown_tracer_provider; +use opentelemetry::sdk::{ + export::metrics::{Aggregator, AggregatorSelector}, + metrics::{aggregators, PushController}, +}; +use opentelemetry::trace::TraceError; +use opentelemetry::{ + baggage::BaggageExt, + metrics::{self, Descriptor, ObserverResult}, + trace::{TraceContextExt, Tracer}, + Context, Key, KeyValue, +}; +use opentelemetry::{global, sdk::trace as sdktrace}; +use opentelemetry_otlp::ExporterConfig; +use opentelemetry_otlp::Protocol; +use std::error::Error; +use std::sync::Arc; +use std::time::Duration; + +fn init_tracer() -> Result { + opentelemetry_otlp::new_pipeline() + .with_endpoint("http://localhost:4317") + .with_tonic() + .install_batch(opentelemetry::runtime::Tokio) +} + +// Skip first immediate tick from tokio, not needed for async_std. +fn delayed_interval(duration: Duration) -> impl Stream { + opentelemetry::util::tokio_interval_stream(duration).skip(1) +} + +#[derive(Debug)] +struct CustomAggregator(); + +impl AggregatorSelector for CustomAggregator { + fn aggregator_for( + &self, + descriptor: &Descriptor, + ) -> Option> { + match descriptor.name() { + "ex.com.one" => Some(Arc::new(aggregators::last_value())), + "ex.com.two" => Some(Arc::new(aggregators::histogram( + descriptor, + &[0.0, 0.5, 1.0, 10.0], + ))), + _ => Some(Arc::new(aggregators::sum())), + } + } +} + +fn init_meter() -> metrics::Result { + let export_config = ExporterConfig { + endpoint: "http://localhost:4317".to_string(), + protocol: Protocol::Grpc, + ..ExporterConfig::default() + }; + opentelemetry_otlp::new_metrics_pipeline(tokio::spawn, delayed_interval) + .with_export_config(export_config) + .with_aggregator_selector(CustomAggregator()) + .build() +} + +const FOO_KEY: Key = Key::from_static_str("ex.com/foo"); +const BAR_KEY: Key = Key::from_static_str("ex.com/bar"); +const LEMONS_KEY: Key = Key::from_static_str("lemons"); +const ANOTHER_KEY: Key = Key::from_static_str("ex.com/another"); + +lazy_static::lazy_static! { + static ref COMMON_LABELS: [KeyValue; 4] = [ + LEMONS_KEY.i64(10), + KeyValue::new("A", "1"), + KeyValue::new("B", "2"), + KeyValue::new("C", "3"), + ]; +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let _ = init_tracer()?; + let _started = init_meter()?; + + let tracer = global::tracer("ex.com/basic"); + let meter = global::meter("ex.com/basic"); + + let one_metric_callback = |res: ObserverResult| res.observe(1.0, COMMON_LABELS.as_ref()); + let _ = meter + .f64_value_observer("ex.com.one", one_metric_callback) + .with_description("A ValueObserver set to 1.0") + .init(); + + let value_recorder_two = meter.f64_value_recorder("ex.com.two").init(); + + let another_recorder = meter.f64_value_recorder("ex.com.two").init(); + another_recorder.record(5.5, COMMON_LABELS.as_ref()); + + let _baggage = + Context::current_with_baggage(vec![FOO_KEY.string("foo1"), BAR_KEY.string("bar1")]) + .attach(); + + let value_recorder = value_recorder_two.bind(COMMON_LABELS.as_ref()); + tracer.in_span("operation", |cx| { + let span = cx.span(); + span.add_event( + "Nice operation!".to_string(), + vec![Key::new("bogons").i64(100)], + ); + span.set_attribute(ANOTHER_KEY.string("yes")); + + meter.record_batch_with_context( + // Note: call-site variables added as context Entries: + &Context::current_with_baggage(vec![ANOTHER_KEY.string("xyz")]), + COMMON_LABELS.as_ref(), + vec![value_recorder_two.measurement(2.0)], + ); + + tracer.in_span("Sub operation...", |cx| { + let span = cx.span(); + span.set_attribute(LEMONS_KEY.string("five")); + + span.add_event("Sub span event".to_string(), vec![]); + + value_recorder.record(1.3); + }); + }); + + // wait for 1 minutes so that we could see metrics being pushed via OTLP every 10 seconds. + tokio::time::sleep(Duration::from_secs(60)).await; + + shutdown_tracer_provider(); + + Ok(()) +} diff --git a/examples/basic-otlp/src/main.rs b/examples/basic-otlp/src/main.rs index 2739cdaf5d..51ac886ec4 100644 --- a/examples/basic-otlp/src/main.rs +++ b/examples/basic-otlp/src/main.rs @@ -39,7 +39,7 @@ fn init_meter() -> metrics::Result { const FOO_KEY: Key = Key::from_static_str("ex.com/foo"); const BAR_KEY: Key = Key::from_static_str("ex.com/bar"); -const LEMONS_KEY: Key = Key::from_static_str("ex.com/lemons"); +const LEMONS_KEY: Key = Key::from_static_str("lemons"); const ANOTHER_KEY: Key = Key::from_static_str("ex.com/another"); lazy_static::lazy_static! { diff --git a/opentelemetry-otlp/src/metric.rs b/opentelemetry-otlp/src/metric.rs index e8df777a94..3f784fc510 100644 --- a/opentelemetry-otlp/src/metric.rs +++ b/opentelemetry-otlp/src/metric.rs @@ -113,10 +113,24 @@ where } /// Build with the aggregator selector - pub fn with_aggregator_selector(self, aggregator_selector: AS) -> Self { + pub fn with_aggregator_selector( + self, + aggregator_selector: T, + ) -> OtlpMetricPipelineBuilder + where + T: AggregatorSelector + Send + Sync + 'static, + { OtlpMetricPipelineBuilder { aggregator_selector, - ..self + export_selector: self.export_selector, + spawn: self.spawn, + interval: self.interval, + export_config: self.export_config, + tonic_config: self.tonic_config, + resource: self.resource, + stateful: self.stateful, + period: self.period, + timeout: self.timeout, } }