Skip to content

Commit

Permalink
Adding a dynamic dispatch to Aggregator Selector
Browse files Browse the repository at this point in the history
Rationale:
As a user I would like to be able to supply my own aggregator selector
  • Loading branch information
dawid-nowak committed Mar 27, 2021
1 parent b6d8494 commit f23311c
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 19 deletions.
30 changes: 25 additions & 5 deletions examples/basic-otlp/src/main.rs
Expand Up @@ -23,18 +23,38 @@ fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {

// Skip first immediate tick from tokio, not needed for async_std.
fn delayed_interval(duration: Duration) -> impl Stream<Item = tokio::time::Instant> {
opentelemetry::util::tokio_interval_stream(duration).skip(1)
opentelemetry::util::tokio_interval_stream(duration).skip(1)
}

use opentelemetry::metrics::Descriptor;
use opentelemetry::sdk::export::metrics::Aggregator;
use opentelemetry::sdk::export::metrics::AggregatorSelector;
use std::sync::Arc;

#[derive(Debug)]
struct NOPAggregator();

impl AggregatorSelector for NOPAggregator {
fn aggregator_for(
&self,
_: &Descriptor,
) -> Option<Arc<(dyn Aggregator + Sync + std::marker::Send + 'static)>> {
None
}
}

fn init_meter() -> metrics::Result<PushController> {
let export_config = ExporterConfig {
endpoint: "http://localhost:4317".to_string(),
..ExporterConfig::default()
};
opentelemetry_otlp::new_metrics_pipeline(tokio::spawn, delayed_interval)
.with_export_config(export_config)
.with_aggregator_selector(selectors::simple::Selector::Exact)
.build()
opentelemetry_otlp::new_metrics_pipeline(
tokio::spawn,
delayed_interval,
)
.with_export_config(export_config)
.with_aggregator_selector(Box::new(NOPAggregator()))
.build()
}

const FOO_KEY: Key = Key::from_static_str("ex.com/foo");
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-otlp/src/lib.rs
Expand Up @@ -188,7 +188,10 @@ pub use crate::span::TonicConfig;
pub use crate::span::GrpcioConfig;

#[cfg(feature = "metrics")]
pub use crate::metric::{new_metrics_pipeline, MetricsExporter, OtlpMetricPipelineBuilder};
pub use crate::metric::{
new_metrics_pipeline, new_metrics_pipeline_with_selector, MetricsExporter,
OtlpMetricPipelineBuilder,
};

#[cfg(feature = "grpc-sys")]
pub use crate::span::{Compression, Credentials};
Expand Down
37 changes: 29 additions & 8 deletions opentelemetry-otlp/src/metric.rs
Expand Up @@ -37,13 +37,33 @@ use tonic::Request;
pub fn new_metrics_pipeline<SP, SO, I, IO>(
spawn: SP,
interval: I,
) -> OtlpMetricPipelineBuilder<selectors::simple::Selector, ExportKindSelector, SP, SO, I, IO>
) -> OtlpMetricPipelineBuilder<ExportKindSelector, SP, SO, I, IO>
where
SP: Fn(PushControllerWorker) -> SO,
I: Fn(time::Duration) -> IO,
{
new_metrics_pipeline_with_selector(
spawn,
interval,
Box::new(selectors::simple::Selector::Inexpensive),
)
}

/// Return a pipeline to build OTLP metrics exporter.
///
/// Note that currently the OTLP metrics exporter only supports tonic as it's grpc layer and tokio as
/// runtime.
pub fn new_metrics_pipeline_with_selector<SP, SO, I, IO>(
spawn: SP,
interval: I,
aggregator_selector: Box<dyn AggregatorSelector + Send + Sync + 'static>,
) -> OtlpMetricPipelineBuilder<ExportKindSelector, SP, SO, I, IO>
where
SP: Fn(PushControllerWorker) -> SO,
I: Fn(time::Duration) -> IO,
{
OtlpMetricPipelineBuilder {
aggregator_selector: selectors::simple::Selector::Inexpensive,
aggregator_selector,
export_selector: ExportKindSelector::Cumulative,
spawn,
interval,
Expand All @@ -61,14 +81,13 @@ where
/// Note that currently the OTLP metrics exporter only supports tonic as it's grpc layer and tokio as
/// runtime.
#[derive(Debug)]
pub struct OtlpMetricPipelineBuilder<AS, ES, SP, SO, I, IO>
pub struct OtlpMetricPipelineBuilder<ES, SP, SO, I, IO>
where
AS: AggregatorSelector + Send + Sync + 'static,
ES: ExportKindFor + Send + Sync + Clone + 'static,
SP: Fn(PushControllerWorker) -> SO,
I: Fn(time::Duration) -> IO,
{
aggregator_selector: AS,
aggregator_selector: Box<dyn AggregatorSelector + Send + Sync + 'static>,
export_selector: ES,
spawn: SP,
interval: I,
Expand All @@ -80,9 +99,8 @@ where
timeout: Option<time::Duration>,
}

impl<AS, ES, SP, SO, I, IO, IOI> OtlpMetricPipelineBuilder<AS, ES, SP, SO, I, IO>
impl<ES, SP, SO, I, IO, IOI> OtlpMetricPipelineBuilder<ES, SP, SO, I, IO>
where
AS: AggregatorSelector + Send + Sync + 'static,
ES: ExportKindFor + Send + Sync + Clone + 'static,
SP: Fn(PushControllerWorker) -> SO,
I: Fn(time::Duration) -> IO,
Expand Down Expand Up @@ -113,7 +131,10 @@ where
}

/// Build with the aggregator selector
pub fn with_aggregator_selector(self, aggregator_selector: AS) -> Self {
pub fn with_aggregator_selector(
self,
aggregator_selector: Box<dyn AggregatorSelector + Send + Sync + 'static>,
) -> Self {
OtlpMetricPipelineBuilder {
aggregator_selector,
..self
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry/src/sdk/export/metrics/stdout.rs
Expand Up @@ -360,7 +360,7 @@ where
let period = self.period.take();
let (spawn, interval, exporter) = self.try_build()?;
let mut push_builder = controllers::push(
simple::Selector::Exact,
Box::new(simple::Selector::Exact),
ExportKindSelector::Stateless,
exporter,
spawn,
Expand Down
7 changes: 3 additions & 4 deletions opentelemetry/src/sdk/metrics/controllers/push.rs
Expand Up @@ -19,22 +19,21 @@ lazy_static::lazy_static! {
}

/// Create a new `PushControllerBuilder`.
pub fn push<AS, ES, E, SP, SO, I, IO>(
aggregator_selector: AS,
pub fn push<ES, E, SP, SO, I, IO>(
aggregator_selector: Box<dyn AggregatorSelector + Send + Sync + 'static>,
export_selector: ES,
exporter: E,
spawn: SP,
interval: I,
) -> PushControllerBuilder<SP, I>
where
AS: AggregatorSelector + Send + Sync + 'static,
ES: ExportKindFor + Send + Sync + 'static,
E: Exporter + Send + Sync + 'static,
SP: Fn(PushControllerWorker) -> SO,
I: Fn(time::Duration) -> IO,
{
PushControllerBuilder {
aggregator_selector: Box::new(aggregator_selector),
aggregator_selector: aggregator_selector,
export_selector: Box::new(export_selector),
exporter: Box::new(exporter),
spawn,
Expand Down

0 comments on commit f23311c

Please sign in to comment.