Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow users to use different channels based on runtime in batch span processor #560

Merged
merged 4 commits into from Jun 5, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions opentelemetry/Cargo.toml
Expand Up @@ -60,6 +60,11 @@ rt-async-std = ["async-std"]
name = "trace"
harness = false

[[bench]]
name = "batch_span_processor"
harness = false
required-features = ["rt-tokio"]

[[bench]]
name = "metric"
harness = false
Expand Down
82 changes: 82 additions & 0 deletions opentelemetry/benches/batch_span_processor.rs
@@ -0,0 +1,82 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use opentelemetry::runtime::Tokio;
use opentelemetry::sdk::export::trace::SpanData;
use opentelemetry::sdk::trace::{BatchSpanProcessor, EvictedHashMap, EvictedQueue, SpanProcessor};
use opentelemetry::trace::{
NoopSpanExporter, SpanContext, SpanId, SpanKind, StatusCode, TraceId, TraceState,
};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::runtime::Runtime;

fn get_span_data() -> Vec<SpanData> {
(0..200)
.into_iter()
.map(|_| SpanData {
span_context: SpanContext::new(
TraceId::from_u128(12),
SpanId::from_u64(12),
0,
false,
TraceState::default(),
),
parent_span_id: SpanId::from_u64(12),
span_kind: SpanKind::Client,
name: Default::default(),
start_time: SystemTime::now(),
end_time: SystemTime::now(),
attributes: EvictedHashMap::new(12, 12),
events: EvictedQueue::new(12),
links: EvictedQueue::new(12),
status_code: StatusCode::Unset,
status_message: Default::default(),
resource: None,
instrumentation_lib: Default::default(),
})
.collect::<Vec<SpanData>>()
}

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("BatchSpanProcessor");
group.sample_size(50);

for task_num in [1, 2, 4, 8, 16, 32].iter() {
group.bench_with_input(
BenchmarkId::from_parameter(format!("with {} concurrent task", task_num)),
task_num,
|b, &task_num| {
b.iter(|| {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
let span_processor =
BatchSpanProcessor::builder(NoopSpanExporter::new(), Tokio)
.with_max_queue_size(10_000)
.build();
let mut shared_span_processor = Arc::new(span_processor);
let mut handles = Vec::with_capacity(10);
for _ in 0..task_num {
let span_processor = shared_span_processor.clone();
let spans = get_span_data();
handles.push(tokio::spawn(async move {
for span in spans {
span_processor.on_end(span);
tokio::task::yield_now().await;
}
}));
}
futures::future::join_all(handles).await;
let _ =
Arc::<BatchSpanProcessor<Tokio>>::get_mut(&mut shared_span_processor)
.unwrap()
.shutdown();
});
})
},
);
}

group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
68 changes: 68 additions & 0 deletions opentelemetry/src/runtime.rs
Expand Up @@ -6,7 +6,10 @@
//! [Tokio]: https://crates.io/crates/tokio
//! [async-std]: https://crates.io/crates/async-std

use crate::sdk::trace::BatchMessage;
use crate::trace::TraceError;
use futures::{future::BoxFuture, Stream};
use std::fmt::Debug;
use std::{future::Future, time::Duration};

/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
Expand All @@ -23,6 +26,12 @@ pub trait Runtime: Clone + Send + Sync + 'static {
/// not important.
type Delay: Future + Send;

/// A future stream to receive the batch messages from channels.
type Receiver: Stream<Item = BatchMessage> + Send;

/// A batch messages sender that could be sent across thread safely.
type Sender: TrySend + Debug;

/// Create a [Stream][futures::Stream], which returns a new item every
/// [Duration][std::time::Duration].
fn interval(&self, duration: Duration) -> Self::Interval;
Expand All @@ -40,6 +49,28 @@ pub trait Runtime: Clone + Send + Sync + 'static {

/// Return a new future, which resolves after the specified [Duration][std::time::Duration].
fn delay(&self, duration: Duration) -> Self::Delay;

/// Return the sender and receiver used to send batch message between tasks.
fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver);
}

/// TrySend is an abstraction of sender that is capable to send BatchMessage with reference.
pub trait TrySend: Sync + Send {
/// Try to send one batch message to worker thread.
///
/// It can fail because either the receiver has closed or the buffer is full.
fn try_send(&self, item: BatchMessage) -> Result<(), TraceError>;
}

#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
impl TrySend for tokio::sync::mpsc::Sender<BatchMessage> {
fn try_send(&self, item: BatchMessage) -> Result<(), TraceError> {
self.try_send(item).map_err(|_err| {
TraceError::from(
"cannot send message to batch span processor because the channel is full",
)
})
}
}

/// Runtime implementation, which works with Tokio's multi thread runtime.
Expand All @@ -53,6 +84,8 @@ pub struct Tokio;
impl Runtime for Tokio {
type Interval = tokio_stream::wrappers::IntervalStream;
type Delay = tokio::time::Sleep;
type Receiver = tokio_stream::wrappers::ReceiverStream<BatchMessage>;
type Sender = tokio::sync::mpsc::Sender<BatchMessage>;

fn interval(&self, duration: Duration) -> Self::Interval {
crate::util::tokio_interval_stream(duration)
Expand All @@ -65,6 +98,14 @@ impl Runtime for Tokio {
fn delay(&self, duration: Duration) -> Self::Delay {
tokio::time::sleep(duration)
}

fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
sender,
tokio_stream::wrappers::ReceiverStream::new(receiver),
)
}
}

/// Runtime implementation, which works with Tokio's current thread runtime.
Expand All @@ -78,6 +119,8 @@ pub struct TokioCurrentThread;
impl Runtime for TokioCurrentThread {
type Interval = tokio_stream::wrappers::IntervalStream;
type Delay = tokio::time::Sleep;
type Receiver = tokio_stream::wrappers::ReceiverStream<BatchMessage>;
type Sender = tokio::sync::mpsc::Sender<BatchMessage>;

fn interval(&self, duration: Duration) -> Self::Interval {
crate::util::tokio_interval_stream(duration)
Expand All @@ -102,6 +145,14 @@ impl Runtime for TokioCurrentThread {
fn delay(&self, duration: Duration) -> Self::Delay {
tokio::time::sleep(duration)
}

fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
sender,
tokio_stream::wrappers::ReceiverStream::new(receiver),
)
}
}

/// Runtime implementation, which works with async-std.
Expand All @@ -110,11 +161,24 @@ impl Runtime for TokioCurrentThread {
#[derive(Debug, Clone)]
pub struct AsyncStd;

#[cfg(feature = "rt-async-std")]
impl TrySend for async_std::channel::Sender<BatchMessage> {
fn try_send(&self, item: BatchMessage) -> Result<(), TraceError> {
self.try_send(item).map_err(|_err| {
TraceError::from(
"cannot send message to batch span processor because the channel is full",
)
})
}
}

#[cfg(feature = "rt-async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
impl Runtime for AsyncStd {
type Interval = async_std::stream::Interval;
type Delay = BoxFuture<'static, ()>;
type Receiver = async_std::channel::Receiver<BatchMessage>;
type Sender = async_std::channel::Sender<BatchMessage>;

fn interval(&self, duration: Duration) -> Self::Interval {
async_std::stream::interval(duration)
Expand All @@ -127,4 +191,8 @@ impl Runtime for AsyncStd {
fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(async_std::task::sleep(duration))
}

fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
async_std::channel::bounded(capacity)
}
}
3 changes: 2 additions & 1 deletion opentelemetry/src/sdk/trace/mod.rs
Expand Up @@ -26,6 +26,7 @@ pub use sampler::{Sampler, SamplingDecision, SamplingResult, ShouldSample};
pub use span::Span;
pub use span_limit::SpanLimits;
pub use span_processor::{
BatchConfig, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor, SpanProcessor,
BatchConfig, BatchMessage, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor,
SpanProcessor,
};
pub use tracer::Tracer;
62 changes: 28 additions & 34 deletions opentelemetry/src/sdk/trace/span_processor.rs
Expand Up @@ -35,15 +35,15 @@
//! [`TracerProvider`]: crate::trace::TracerProvider

use crate::global;
use crate::runtime::Runtime;
use crate::runtime::{Runtime, TrySend};
use crate::sdk::trace::Span;
use crate::{
sdk::export::trace::{ExportResult, SpanData, SpanExporter},
trace::{TraceError, TraceResult},
Context,
};
use futures::{channel::mpsc, channel::oneshot, executor, future::Either, pin_mut, StreamExt};
use std::{env, fmt, str::FromStr, sync::Mutex, thread, time::Duration};
use futures::{channel::oneshot, executor, future::Either, pin_mut, StreamExt};
use std::{env, fmt, str::FromStr, thread, time::Duration};

/// Delay interval between two consecutive exports.
const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
Expand Down Expand Up @@ -211,77 +211,74 @@ impl SpanProcessor for SimpleSpanProcessor {
/// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html
/// [`tokio`]: https://tokio.rs
/// [`async-std`]: https://async.rs
pub struct BatchSpanProcessor {
message_sender: Mutex<mpsc::Sender<BatchMessage>>,
pub struct BatchSpanProcessor<R: Runtime> {
message_sender: R::Sender,
}

impl fmt::Debug for BatchSpanProcessor {
impl<R: Runtime> fmt::Debug for BatchSpanProcessor<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BatchSpanProcessor")
.field("message_sender", &self.message_sender)
.finish()
}
}

impl SpanProcessor for BatchSpanProcessor {
impl<R: Runtime> SpanProcessor for BatchSpanProcessor<R> {
fn on_start(&self, _span: &Span, _cx: &Context) {
// Ignored
}

fn on_end(&self, span: SpanData) {
let result = self
.message_sender
.lock()
.map_err(|_| TraceError::Other("batch span processor mutex poisoned".into()))
.and_then(|mut sender| {
sender
.try_send(BatchMessage::ExportSpan(span))
.map_err(|err| TraceError::Other(err.into()))
});
let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));

if let Err(err) = result {
global::handle_error(err);
}
}

fn force_flush(&self) -> TraceResult<()> {
let mut sender = self.message_sender.lock().map_err(|_| TraceError::from("When force flushing the BatchSpanProcessor, the message sender's lock has been poisoned"))?;
let (res_sender, res_receiver) = oneshot::channel();
sender.try_send(BatchMessage::Flush(Some(res_sender)))?;
&self
.message_sender
.try_send(BatchMessage::Flush(Some(res_sender)))?;

futures::executor::block_on(res_receiver)
.map_err(|err| TraceError::Other(err.into()))
.and_then(|identity| identity)
}

fn shutdown(&mut self) -> TraceResult<()> {
let mut sender = self.message_sender.lock().map_err(|_| TraceError::from("When shutting down the BatchSpanProcessor, the message sender's lock has been poisoned"))?;
let (res_sender, res_receiver) = oneshot::channel();
sender.try_send(BatchMessage::Shutdown(res_sender))?;
&self
.message_sender
.try_send(BatchMessage::Shutdown(res_sender))?;

futures::executor::block_on(res_receiver)
.map_err(|err| TraceError::Other(err.into()))
.and_then(|identity| identity)
}
}

/// Messages sent between application thread and batch span processor's work thread.
#[derive(Debug)]
enum BatchMessage {
pub enum BatchMessage {
/// Export spans, usually called when span ends
ExportSpan(SpanData),
/// Flush the current buffer to the backend, it can be triggered by
/// pre configured interval or a call to `force_push` function.
Flush(Option<oneshot::Sender<ExportResult>>),
/// Shut down the worker thread, push all spans in buffer to the backend.
Shutdown(oneshot::Sender<ExportResult>),
}

impl BatchSpanProcessor {
pub(crate) fn new<R>(
impl<R: Runtime> BatchSpanProcessor<R> {
pub(crate) fn new(
mut exporter: Box<dyn SpanExporter>,
config: BatchConfig,
runtime: R,
) -> Self
where
R: Runtime,
{
let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size);
) -> Self {
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);
let ticker = runtime
.interval(config.scheduled_delay)
.map(|_| BatchMessage::Flush(None));
Expand Down Expand Up @@ -362,16 +359,13 @@ impl BatchSpanProcessor {
}));

// Return batch processor with link to worker
BatchSpanProcessor {
message_sender: Mutex::new(message_sender),
}
BatchSpanProcessor { message_sender }
}

/// Create a new batch processor builder
pub fn builder<E, R>(exporter: E, runtime: R) -> BatchSpanProcessorBuilder<E, R>
pub fn builder<E>(exporter: E, runtime: R) -> BatchSpanProcessorBuilder<E, R>
where
E: SpanExporter,
R: Runtime,
{
BatchSpanProcessorBuilder {
exporter,
Expand Down Expand Up @@ -528,7 +522,7 @@ where
}

/// Build a batch processor
pub fn build(self) -> BatchSpanProcessor {
pub fn build(self) -> BatchSpanProcessor<R> {
BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime)
}
}
Expand Down