Skip to content

Commit

Permalink
Wait for exports on the simple span processor's ForceFlush
Browse files Browse the repository at this point in the history
ForceFlush seems to have been left behind in #502. With those changes, the processing is not really synchronous anymore, i.e. OnEnd now only sends the span down the pipe to be processed in the separate thread as soon as possible.

https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#forceflush-1 says:

> In particular, if any SpanProcessor has any associated exporter, it SHOULD try to call the exporter's Export with all spans for which this was not already done and then invoke ForceFlush on it.

As the comment states, all spans previously got exported synchronounsly right away, so that no such spans existed, but now they might be anywhere between the channel and (the end of) the export call. Doin
g nothing in ForceFlush even violates the specification as...

> The built-in SpanProcessors MUST do so.

Awaiting all open tasks from the channel on ForceFlush fixes this.

Previous discussions regarding parts of the specification that this does not tackle in line with Shutdown:

> ForceFlush SHOULD provide a way to let the caller know whether it succeeded, failed or timed out.

#358 (comment)

> ForceFlush SHOULD complete or abort within some timeout.

https://github.com/open-telemetry/opentelemetry-rust/pull/502/files#r603722431

This brings the simple processor a step closer to the batch processor with the obvious main difference of batches and the (not so obvious, also see #502 (comment)) difference that it works without a presumed async runtime.
  • Loading branch information
cschramm committed Apr 24, 2023
1 parent f42c11d commit d127d85
Showing 1 changed file with 53 additions and 33 deletions.
86 changes: 53 additions & 33 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,45 +92,64 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
fn shutdown(&mut self) -> TraceResult<()>;
}

/// A [`SpanProcessor`] that exports synchronously when spans are finished.
///
/// # Examples
///
/// Note that the simple processor exports synchronously every time a span is
/// ended. If you find this limiting, consider the batch processor instead.
/// A [`SpanProcessor`] that exports when spans are finished.
#[derive(Debug)]
pub struct SimpleSpanProcessor {
sender: crossbeam_channel::Sender<Option<SpanData>>,
shutdown: crossbeam_channel::Receiver<()>,
message_sender: crossbeam_channel::Sender<Message>,
}

impl SimpleSpanProcessor {
pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
let (span_tx, span_rx) = crossbeam_channel::unbounded();
let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0);
let (message_sender, rx) = crossbeam_channel::unbounded();

let _ = thread::Builder::new()
.name("opentelemetry-exporter".to_string())
.spawn(move || {
while let Ok(Some(span)) = span_rx.recv() {
if let Err(err) = futures_executor::block_on(exporter.export(vec![span])) {
global::handle_error(err);
while let Ok(msg) = rx.recv() {
match msg {
Message::ExportSpan(span) => {
if let Err(err) =

Check warning on line 111 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L111

Added line #L111 was not covered by tests
futures_executor::block_on(exporter.export(vec![span]))
{
global::handle_error(err);

Check warning on line 114 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L113-L114

Added lines #L113 - L114 were not covered by tests
}
}
Message::Flush(sender) => {
Self::respond(&sender, "sync");
}

Check warning on line 119 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L117-L119

Added lines #L117 - L119 were not covered by tests
Message::Shutdown(sender) => {
exporter.shutdown();

Self::respond(&sender, "shutdown");

return;
}
}
}

exporter.shutdown();

if let Err(err) = shutdown_tx.send(()) {
global::handle_error(TraceError::from(format!(
"could not send shutdown: {:?}",
err
)));
}
});

SimpleSpanProcessor {
sender: span_tx,
shutdown: shutdown_rx,
Self { message_sender }
}

fn signal(&self, msg: fn(crossbeam_channel::Sender<()>) -> Message, description: &str) {
let (tx, rx) = crossbeam_channel::bounded(0);

if self.message_sender.send(msg(tx)).is_ok() {
if let Err(err) = rx.recv() {
global::handle_error(TraceError::from(format!(
"error {description} span processor: {err:?}"
)));

Check warning on line 143 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L141-L143

Added lines #L141 - L143 were not covered by tests
}
}

Check warning on line 145 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L145

Added line #L145 was not covered by tests
}

fn respond(sender: &crossbeam_channel::Sender<()>, description: &str) {
if let Err(err) = sender.send(()) {
global::handle_error(TraceError::from(format!(
"could not send {description}: {err:?}"
)));

Check warning on line 152 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L150-L152

Added lines #L150 - L152 were not covered by tests
}
}
}
Expand All @@ -145,30 +164,31 @@ impl SpanProcessor for SimpleSpanProcessor {
return;
}

if let Err(err) = self.sender.send(Some(span)) {
if let Err(err) = self.message_sender.send(Message::ExportSpan(span)) {
global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
}
}

fn force_flush(&self) -> TraceResult<()> {
// Ignored since all spans in Simple Processor will be exported as they ended.
self.signal(Message::Flush, "flushing");

Check warning on line 174 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L173-L174

Added lines #L173 - L174 were not covered by tests
Ok(())
}

fn shutdown(&mut self) -> TraceResult<()> {
if self.sender.send(None).is_ok() {
if let Err(err) = self.shutdown.recv() {
global::handle_error(TraceError::from(format!(
"error shutting down span processor: {:?}",
err
)))
}
}
self.signal(Message::Shutdown, "shutting down");

Ok(())
}
}

#[derive(Debug)]

Check warning on line 185 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L185

Added line #L185 was not covered by tests
enum Message {
ExportSpan(SpanData),
Flush(crossbeam_channel::Sender<()>),
Shutdown(crossbeam_channel::Sender<()>),
}

/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
/// them at a preconfigured interval.
///
Expand Down

0 comments on commit d127d85

Please sign in to comment.