Skip to content

Commit

Permalink
feat: allow users to use different channels based on runtime in batch…
Browse files Browse the repository at this point in the history
… span processor (#560)
  • Loading branch information
TommyCpp committed Jun 5, 2021
1 parent fb576b0 commit 1ca62d3
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 58 deletions.
4 changes: 2 additions & 2 deletions opentelemetry-datadog/src/exporter/mod.rs
Expand Up @@ -7,9 +7,9 @@ pub use model::Error;
use async_trait::async_trait;
use http::{Method, Request, Uri};
use itertools::Itertools;
use opentelemetry::runtime::Runtime;
use opentelemetry::sdk::export::trace;
use opentelemetry::sdk::export::trace::SpanData;
use opentelemetry::sdk::trace::TraceRuntime;
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk, trace::TracerProvider};
use opentelemetry_http::{HttpClient, ResponseExt};
Expand Down Expand Up @@ -127,7 +127,7 @@ impl DatadogPipelineBuilder {

/// Install the Datadog trace exporter pipeline using a batch span processor with the specified
/// runtime.
pub fn install_batch<R: Runtime>(
pub fn install_batch<R: TraceRuntime>(
mut self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
Expand Down
12 changes: 7 additions & 5 deletions opentelemetry-jaeger/src/exporter/mod.rs
Expand Up @@ -23,10 +23,9 @@ use isahc::prelude::Configurable;
use opentelemetry::sdk::export::ExportError;
use opentelemetry::trace::TraceError;
use opentelemetry::{
global,
runtime::Runtime,
sdk,
global, sdk,
sdk::export::trace,
sdk::trace::TraceRuntime,
trace::{Event, Link, SpanKind, StatusCode, TracerProvider},
Key, KeyValue,
};
Expand Down Expand Up @@ -268,7 +267,10 @@ impl PipelineBuilder {
}

/// Install a Jaeger pipeline with a batch span processor using the specified runtime.
pub fn install_batch<R: Runtime>(self, runtime: R) -> Result<sdk::trace::Tracer, TraceError> {
pub fn install_batch<R: TraceRuntime>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
let tracer_provider = self.build_batch(runtime)?;
let tracer =
tracer_provider.get_tracer("opentelemetry-jaeger", Some(env!("CARGO_PKG_VERSION")));
Expand All @@ -290,7 +292,7 @@ impl PipelineBuilder {

/// Build a configured `sdk::trace::TracerProvider` with a batch span processor using the
/// specified runtime.
pub fn build_batch<R: Runtime>(
pub fn build_batch<R: TraceRuntime>(
mut self,
runtime: R,
) -> Result<sdk::trace::TracerProvider, TraceError> {
Expand Down
19 changes: 14 additions & 5 deletions opentelemetry-otlp/src/lib.rs
Expand Up @@ -146,7 +146,7 @@
#![cfg_attr(docsrs, feature(doc_cfg), deny(broken_intra_doc_links))]
#![cfg_attr(test, deny(warnings))]

use opentelemetry::{global, runtime::Runtime, sdk, trace::TracerProvider};
use opentelemetry::{global, sdk, sdk::trace::TraceRuntime, trace::TracerProvider};

#[cfg(any(feature = "grpc-sys", feature = "http-proto"))]
use std::collections::HashMap;
Expand Down Expand Up @@ -403,7 +403,10 @@ impl TonicPipelineBuilder {
///
/// [`Tracer`]: opentelemetry::trace::Tracer
/// [tonic]: https://github.com/hyperium/tonic
pub fn install_batch<R: Runtime>(self, runtime: R) -> Result<sdk::trace::Tracer, TraceError> {
pub fn install_batch<R: TraceRuntime>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
let exporter = match self.channel {
Some(channel) => {
TraceExporter::from_tonic_channel(self.exporter_config, self.tonic_config, channel)
Expand Down Expand Up @@ -488,7 +491,10 @@ impl GrpcioPipelineBuilder {
///
/// [`Tracer`]: opentelemetry::trace::Tracer
/// [grpcio]: https://github.com/tikv/grpc-rs
pub fn install_batch<R: Runtime>(self, runtime: R) -> Result<sdk::trace::Tracer, TraceError> {
pub fn install_batch<R: TraceRuntime>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
let exporter = TraceExporter::new_grpcio(self.exporter_config, self.grpcio_config);
Ok(build_batch_with_exporter(
exporter,
Expand Down Expand Up @@ -551,7 +557,10 @@ impl HttpPipelineBuilder {
/// `install_batch` will panic if not called within a tokio runtime
///
/// [`Tracer`]: opentelemetry::trace::Tracer
pub fn install_batch<R: Runtime>(self, runtime: R) -> Result<sdk::trace::Tracer, TraceError> {
pub fn install_batch<R: TraceRuntime>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
let exporter = TraceExporter::new_http(self.exporter_config, self.http_config)?;
Ok(build_batch_with_exporter(
exporter,
Expand All @@ -575,7 +584,7 @@ fn build_simple_with_exporter(
tracer
}

fn build_batch_with_exporter<R: Runtime>(
fn build_batch_with_exporter<R: TraceRuntime>(
exporter: TraceExporter,
trace_config: Option<sdk::trace::Config>,
runtime: R,
Expand Down
7 changes: 3 additions & 4 deletions opentelemetry-zipkin/src/exporter/mod.rs
Expand Up @@ -5,10 +5,9 @@ use async_trait::async_trait;
use http::Uri;
use model::endpoint::Endpoint;
use opentelemetry::{
global,
runtime::Runtime,
sdk,
global, sdk,
sdk::export::{trace, ExportError},
sdk::trace::TraceRuntime,
trace::{TraceError, TracerProvider},
};
use opentelemetry_http::HttpClient;
Expand Down Expand Up @@ -120,7 +119,7 @@ impl ZipkinPipelineBuilder {

/// Install the Zipkin trace exporter pipeline with a batch span processor using the specified
/// runtime.
pub fn install_batch<R: Runtime>(
pub fn install_batch<R: TraceRuntime>(
mut self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
Expand Down
5 changes: 5 additions & 0 deletions opentelemetry/Cargo.toml
Expand Up @@ -60,6 +60,11 @@ rt-async-std = ["async-std"]
name = "trace"
harness = false

[[bench]]
name = "batch_span_processor"
harness = false
required-features = ["rt-tokio"]

[[bench]]
name = "metric"
harness = false
Expand Down
82 changes: 82 additions & 0 deletions opentelemetry/benches/batch_span_processor.rs
@@ -0,0 +1,82 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use opentelemetry::runtime::Tokio;
use opentelemetry::sdk::export::trace::SpanData;
use opentelemetry::sdk::trace::{BatchSpanProcessor, EvictedHashMap, EvictedQueue, SpanProcessor};
use opentelemetry::trace::{
NoopSpanExporter, SpanContext, SpanId, SpanKind, StatusCode, TraceId, TraceState,
};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::runtime::Runtime;

fn get_span_data() -> Vec<SpanData> {
(0..200)
.into_iter()
.map(|_| SpanData {
span_context: SpanContext::new(
TraceId::from_u128(12),
SpanId::from_u64(12),
0,
false,
TraceState::default(),
),
parent_span_id: SpanId::from_u64(12),
span_kind: SpanKind::Client,
name: Default::default(),
start_time: SystemTime::now(),
end_time: SystemTime::now(),
attributes: EvictedHashMap::new(12, 12),
events: EvictedQueue::new(12),
links: EvictedQueue::new(12),
status_code: StatusCode::Unset,
status_message: Default::default(),
resource: None,
instrumentation_lib: Default::default(),
})
.collect::<Vec<SpanData>>()
}

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("BatchSpanProcessor");
group.sample_size(50);

for task_num in [1, 2, 4, 8, 16, 32].iter() {
group.bench_with_input(
BenchmarkId::from_parameter(format!("with {} concurrent task", task_num)),
task_num,
|b, &task_num| {
b.iter(|| {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
let span_processor =
BatchSpanProcessor::builder(NoopSpanExporter::new(), Tokio)
.with_max_queue_size(10_000)
.build();
let mut shared_span_processor = Arc::new(span_processor);
let mut handles = Vec::with_capacity(10);
for _ in 0..task_num {
let span_processor = shared_span_processor.clone();
let spans = get_span_data();
handles.push(tokio::spawn(async move {
for span in spans {
span_processor.on_end(span);
tokio::task::yield_now().await;
}
}));
}
futures::future::join_all(handles).await;
let _ =
Arc::<BatchSpanProcessor<Tokio>>::get_mut(&mut shared_span_processor)
.unwrap()
.shutdown();
});
})
},
);
}

group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
7 changes: 4 additions & 3 deletions opentelemetry/src/global/trace.rs
Expand Up @@ -328,8 +328,9 @@ pub fn force_flush_tracer_provider() {
// threads Use cargo test -- --ignored --test-threads=1 to run those tests.
mod tests {
use super::*;
use crate::sdk::trace::TraceRuntime;
use crate::{
runtime::{self, Runtime},
runtime,
trace::{NoopTracer, Tracer},
};
use std::{
Expand Down Expand Up @@ -479,7 +480,7 @@ mod tests {
assert!(second_resp.contains("thread 2"));
}

fn build_batch_tracer_provider<R: Runtime>(
fn build_batch_tracer_provider<R: TraceRuntime>(
assert_writer: AssertWriter,
runtime: R,
) -> crate::sdk::trace::TracerProvider {
Expand All @@ -500,7 +501,7 @@ mod tests {
.build()
}

async fn test_set_provider_in_tokio<R: Runtime>(runtime: R) -> AssertWriter {
async fn test_set_provider_in_tokio<R: TraceRuntime>(runtime: R) -> AssertWriter {
let buffer = AssertWriter::new();
let _ = set_tracer_provider(build_batch_tracer_provider(buffer.clone(), runtime));
let tracer = tracer("opentelemetery");
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry/src/sdk/trace/mod.rs
Expand Up @@ -11,6 +11,7 @@ mod evicted_hash_map;
mod evicted_queue;
mod id_generator;
mod provider;
mod runtime;
mod sampler;
mod span;
mod span_limit;
Expand All @@ -22,10 +23,12 @@ pub use evicted_hash_map::EvictedHashMap;
pub use evicted_queue::EvictedQueue;
pub use id_generator::{aws::XrayIdGenerator, IdGenerator};
pub use provider::{Builder, TracerProvider};
pub use runtime::{TraceRuntime, TrySend};
pub use sampler::{Sampler, SamplingDecision, SamplingResult, ShouldSample};
pub use span::Span;
pub use span_limit::SpanLimits;
pub use span_processor::{
BatchConfig, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor, SpanProcessor,
BatchConfig, BatchMessage, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor,
SpanProcessor,
};
pub use tracer::Tracer;
4 changes: 2 additions & 2 deletions opentelemetry/src/sdk/trace/provider.rs
Expand Up @@ -8,10 +8,10 @@
//! propagators) are provided by the `TracerProvider`. `Tracer` instances do
//! not duplicate this data to avoid that different `Tracer` instances
//! of the `TracerProvider` have different versions of these data.
use crate::sdk::trace::runtime::TraceRuntime;
use crate::trace::TraceResult;
use crate::{
global,
runtime::Runtime,
sdk::{self, export::trace::SpanExporter, trace::SpanProcessor},
};
use std::sync::Arc;
Expand Down Expand Up @@ -115,7 +115,7 @@ impl Builder {
}

/// The `SpanExporter` setup using a default `BatchSpanProcessor` that this provider should use.
pub fn with_batch_exporter<T: SpanExporter + 'static, R: Runtime>(
pub fn with_batch_exporter<T: SpanExporter + 'static, R: TraceRuntime>(
self,
exporter: T,
runtime: R,
Expand Down

0 comments on commit 1ca62d3

Please sign in to comment.