Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow users to use different channels based on runtime in batch span processor #560

Merged
merged 4 commits into from Jun 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions opentelemetry-datadog/src/exporter/mod.rs
Expand Up @@ -7,9 +7,9 @@ pub use model::Error;
use async_trait::async_trait;
use http::{Method, Request, Uri};
use itertools::Itertools;
use opentelemetry::runtime::Runtime;
use opentelemetry::sdk::export::trace;
use opentelemetry::sdk::export::trace::SpanData;
use opentelemetry::sdk::trace::TraceRuntime;
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk, trace::TracerProvider};
use opentelemetry_http::{HttpClient, ResponseExt};
Expand Down Expand Up @@ -127,7 +127,7 @@ impl DatadogPipelineBuilder {

/// Install the Datadog trace exporter pipeline using a batch span processor with the specified
/// runtime.
pub fn install_batch<R: Runtime>(
pub fn install_batch<R: TraceRuntime>(
mut self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
Expand Down
12 changes: 7 additions & 5 deletions opentelemetry-jaeger/src/exporter/mod.rs
Expand Up @@ -23,10 +23,9 @@ use isahc::prelude::Configurable;
use opentelemetry::sdk::export::ExportError;
use opentelemetry::trace::TraceError;
use opentelemetry::{
global,
runtime::Runtime,
sdk,
global, sdk,
sdk::export::trace,
sdk::trace::TraceRuntime,
trace::{Event, Link, SpanKind, StatusCode, TracerProvider},
Key, KeyValue,
};
Expand Down Expand Up @@ -268,7 +267,10 @@ impl PipelineBuilder {
}

/// Install a Jaeger pipeline with a batch span processor using the specified runtime.
pub fn install_batch<R: Runtime>(self, runtime: R) -> Result<sdk::trace::Tracer, TraceError> {
pub fn install_batch<R: TraceRuntime>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
let tracer_provider = self.build_batch(runtime)?;
let tracer =
tracer_provider.get_tracer("opentelemetry-jaeger", Some(env!("CARGO_PKG_VERSION")));
Expand All @@ -290,7 +292,7 @@ impl PipelineBuilder {

/// Build a configured `sdk::trace::TracerProvider` with a batch span processor using the
/// specified runtime.
pub fn build_batch<R: Runtime>(
pub fn build_batch<R: TraceRuntime>(
mut self,
runtime: R,
) -> Result<sdk::trace::TracerProvider, TraceError> {
Expand Down
19 changes: 14 additions & 5 deletions opentelemetry-otlp/src/lib.rs
Expand Up @@ -146,7 +146,7 @@
#![cfg_attr(docsrs, feature(doc_cfg), deny(broken_intra_doc_links))]
#![cfg_attr(test, deny(warnings))]

use opentelemetry::{global, runtime::Runtime, sdk, trace::TracerProvider};
use opentelemetry::{global, sdk, sdk::trace::TraceRuntime, trace::TracerProvider};

#[cfg(any(feature = "grpc-sys", feature = "http-proto"))]
use std::collections::HashMap;
Expand Down Expand Up @@ -403,7 +403,10 @@ impl TonicPipelineBuilder {
///
/// [`Tracer`]: opentelemetry::trace::Tracer
/// [tonic]: https://github.com/hyperium/tonic
pub fn install_batch<R: Runtime>(self, runtime: R) -> Result<sdk::trace::Tracer, TraceError> {
pub fn install_batch<R: TraceRuntime>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
let exporter = match self.channel {
Some(channel) => {
TraceExporter::from_tonic_channel(self.exporter_config, self.tonic_config, channel)
Expand Down Expand Up @@ -488,7 +491,10 @@ impl GrpcioPipelineBuilder {
///
/// [`Tracer`]: opentelemetry::trace::Tracer
/// [grpcio]: https://github.com/tikv/grpc-rs
pub fn install_batch<R: Runtime>(self, runtime: R) -> Result<sdk::trace::Tracer, TraceError> {
pub fn install_batch<R: TraceRuntime>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
let exporter = TraceExporter::new_grpcio(self.exporter_config, self.grpcio_config);
Ok(build_batch_with_exporter(
exporter,
Expand Down Expand Up @@ -551,7 +557,10 @@ impl HttpPipelineBuilder {
/// `install_batch` will panic if not called within a tokio runtime
///
/// [`Tracer`]: opentelemetry::trace::Tracer
pub fn install_batch<R: Runtime>(self, runtime: R) -> Result<sdk::trace::Tracer, TraceError> {
pub fn install_batch<R: TraceRuntime>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
let exporter = TraceExporter::new_http(self.exporter_config, self.http_config)?;
Ok(build_batch_with_exporter(
exporter,
Expand All @@ -575,7 +584,7 @@ fn build_simple_with_exporter(
tracer
}

fn build_batch_with_exporter<R: Runtime>(
fn build_batch_with_exporter<R: TraceRuntime>(
exporter: TraceExporter,
trace_config: Option<sdk::trace::Config>,
runtime: R,
Expand Down
7 changes: 3 additions & 4 deletions opentelemetry-zipkin/src/exporter/mod.rs
Expand Up @@ -5,10 +5,9 @@ use async_trait::async_trait;
use http::Uri;
use model::endpoint::Endpoint;
use opentelemetry::{
global,
runtime::Runtime,
sdk,
global, sdk,
sdk::export::{trace, ExportError},
sdk::trace::TraceRuntime,
trace::{TraceError, TracerProvider},
};
use opentelemetry_http::HttpClient;
Expand Down Expand Up @@ -120,7 +119,7 @@ impl ZipkinPipelineBuilder {

/// Install the Zipkin trace exporter pipeline with a batch span processor using the specified
/// runtime.
pub fn install_batch<R: Runtime>(
pub fn install_batch<R: TraceRuntime>(
mut self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
Expand Down
5 changes: 5 additions & 0 deletions opentelemetry/Cargo.toml
Expand Up @@ -60,6 +60,11 @@ rt-async-std = ["async-std"]
name = "trace"
harness = false

[[bench]]
name = "batch_span_processor"
harness = false
required-features = ["rt-tokio"]

[[bench]]
name = "metric"
harness = false
Expand Down
82 changes: 82 additions & 0 deletions opentelemetry/benches/batch_span_processor.rs
@@ -0,0 +1,82 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use opentelemetry::runtime::Tokio;
use opentelemetry::sdk::export::trace::SpanData;
use opentelemetry::sdk::trace::{BatchSpanProcessor, EvictedHashMap, EvictedQueue, SpanProcessor};
use opentelemetry::trace::{
NoopSpanExporter, SpanContext, SpanId, SpanKind, StatusCode, TraceId, TraceState,
};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::runtime::Runtime;

fn get_span_data() -> Vec<SpanData> {
(0..200)
.into_iter()
.map(|_| SpanData {
span_context: SpanContext::new(
TraceId::from_u128(12),
SpanId::from_u64(12),
0,
false,
TraceState::default(),
),
parent_span_id: SpanId::from_u64(12),
span_kind: SpanKind::Client,
name: Default::default(),
start_time: SystemTime::now(),
end_time: SystemTime::now(),
attributes: EvictedHashMap::new(12, 12),
events: EvictedQueue::new(12),
links: EvictedQueue::new(12),
status_code: StatusCode::Unset,
status_message: Default::default(),
resource: None,
instrumentation_lib: Default::default(),
})
.collect::<Vec<SpanData>>()
}

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("BatchSpanProcessor");
group.sample_size(50);

for task_num in [1, 2, 4, 8, 16, 32].iter() {
group.bench_with_input(
BenchmarkId::from_parameter(format!("with {} concurrent task", task_num)),
task_num,
|b, &task_num| {
b.iter(|| {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
let span_processor =
BatchSpanProcessor::builder(NoopSpanExporter::new(), Tokio)
.with_max_queue_size(10_000)
.build();
let mut shared_span_processor = Arc::new(span_processor);
let mut handles = Vec::with_capacity(10);
for _ in 0..task_num {
let span_processor = shared_span_processor.clone();
let spans = get_span_data();
handles.push(tokio::spawn(async move {
for span in spans {
span_processor.on_end(span);
tokio::task::yield_now().await;
}
}));
}
futures::future::join_all(handles).await;
let _ =
Arc::<BatchSpanProcessor<Tokio>>::get_mut(&mut shared_span_processor)
.unwrap()
.shutdown();
});
})
},
);
}

group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
7 changes: 4 additions & 3 deletions opentelemetry/src/global/trace.rs
Expand Up @@ -328,8 +328,9 @@ pub fn force_flush_tracer_provider() {
// threads Use cargo test -- --ignored --test-threads=1 to run those tests.
mod tests {
use super::*;
use crate::sdk::trace::TraceRuntime;
use crate::{
runtime::{self, Runtime},
runtime,
trace::{NoopTracer, Tracer},
};
use std::{
Expand Down Expand Up @@ -479,7 +480,7 @@ mod tests {
assert!(second_resp.contains("thread 2"));
}

fn build_batch_tracer_provider<R: Runtime>(
fn build_batch_tracer_provider<R: TraceRuntime>(
assert_writer: AssertWriter,
runtime: R,
) -> crate::sdk::trace::TracerProvider {
Expand All @@ -500,7 +501,7 @@ mod tests {
.build()
}

async fn test_set_provider_in_tokio<R: Runtime>(runtime: R) -> AssertWriter {
async fn test_set_provider_in_tokio<R: TraceRuntime>(runtime: R) -> AssertWriter {
let buffer = AssertWriter::new();
let _ = set_tracer_provider(build_batch_tracer_provider(buffer.clone(), runtime));
let tracer = tracer("opentelemetery");
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry/src/sdk/trace/mod.rs
Expand Up @@ -11,6 +11,7 @@ mod evicted_hash_map;
mod evicted_queue;
mod id_generator;
mod provider;
mod runtime;
mod sampler;
mod span;
mod span_limit;
Expand All @@ -22,10 +23,12 @@ pub use evicted_hash_map::EvictedHashMap;
pub use evicted_queue::EvictedQueue;
pub use id_generator::{aws::XrayIdGenerator, IdGenerator};
pub use provider::{Builder, TracerProvider};
pub use runtime::{TraceRuntime, TrySend};
pub use sampler::{Sampler, SamplingDecision, SamplingResult, ShouldSample};
pub use span::Span;
pub use span_limit::SpanLimits;
pub use span_processor::{
BatchConfig, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor, SpanProcessor,
BatchConfig, BatchMessage, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor,
SpanProcessor,
};
pub use tracer::Tracer;
4 changes: 2 additions & 2 deletions opentelemetry/src/sdk/trace/provider.rs
Expand Up @@ -8,10 +8,10 @@
//! propagators) are provided by the `TracerProvider`. `Tracer` instances do
//! not duplicate this data to avoid that different `Tracer` instances
//! of the `TracerProvider` have different versions of these data.
use crate::sdk::trace::runtime::TraceRuntime;
use crate::trace::TraceResult;
use crate::{
global,
runtime::Runtime,
sdk::{self, export::trace::SpanExporter, trace::SpanProcessor},
};
use std::sync::Arc;
Expand Down Expand Up @@ -115,7 +115,7 @@ impl Builder {
}

/// The `SpanExporter` setup using a default `BatchSpanProcessor` that this provider should use.
pub fn with_batch_exporter<T: SpanExporter + 'static, R: Runtime>(
pub fn with_batch_exporter<T: SpanExporter + 'static, R: TraceRuntime>(
self,
exporter: T,
runtime: R,
Expand Down