Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for exports on the simple span processor's ForceFlush #1030

Merged
merged 1 commit into from
Apr 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
86 changes: 53 additions & 33 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,45 +92,64 @@
fn shutdown(&mut self) -> TraceResult<()>;
}

/// A [`SpanProcessor`] that exports synchronously when spans are finished.
///
/// # Examples
///
/// Note that the simple processor exports synchronously every time a span is
/// ended. If you find this limiting, consider the batch processor instead.
/// A [`SpanProcessor`] that exports when spans are finished.
#[derive(Debug)]
pub struct SimpleSpanProcessor {
sender: crossbeam_channel::Sender<Option<SpanData>>,
shutdown: crossbeam_channel::Receiver<()>,
message_sender: crossbeam_channel::Sender<Message>,
}

impl SimpleSpanProcessor {
pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
let (span_tx, span_rx) = crossbeam_channel::unbounded();
let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0);
let (message_sender, rx) = crossbeam_channel::unbounded();

let _ = thread::Builder::new()
.name("opentelemetry-exporter".to_string())
.spawn(move || {
while let Ok(Some(span)) = span_rx.recv() {
if let Err(err) = futures_executor::block_on(exporter.export(vec![span])) {
global::handle_error(err);
while let Ok(msg) = rx.recv() {
match msg {
Message::ExportSpan(span) => {
if let Err(err) =

Check warning on line 111 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L111

Added line #L111 was not covered by tests
futures_executor::block_on(exporter.export(vec![span]))
{
global::handle_error(err);

Check warning on line 114 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L113-L114

Added lines #L113 - L114 were not covered by tests
}
}
Message::Flush(sender) => {
Self::respond(&sender, "sync");
}

Check warning on line 119 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L117-L119

Added lines #L117 - L119 were not covered by tests
Message::Shutdown(sender) => {
exporter.shutdown();

Self::respond(&sender, "shutdown");

return;
}
}
}

exporter.shutdown();

if let Err(err) = shutdown_tx.send(()) {
global::handle_error(TraceError::from(format!(
"could not send shutdown: {:?}",
err
)));
}
});

SimpleSpanProcessor {
sender: span_tx,
shutdown: shutdown_rx,
Self { message_sender }
}

fn signal(&self, msg: fn(crossbeam_channel::Sender<()>) -> Message, description: &str) {
let (tx, rx) = crossbeam_channel::bounded(0);

if self.message_sender.send(msg(tx)).is_ok() {
if let Err(err) = rx.recv() {
global::handle_error(TraceError::from(format!(
"error {description} span processor: {err:?}"
)));

Check warning on line 143 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L141-L143

Added lines #L141 - L143 were not covered by tests
}
}

Check warning on line 145 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L145

Added line #L145 was not covered by tests
}

fn respond(sender: &crossbeam_channel::Sender<()>, description: &str) {
if let Err(err) = sender.send(()) {
global::handle_error(TraceError::from(format!(
"could not send {description}: {err:?}"
)));

Check warning on line 152 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L150-L152

Added lines #L150 - L152 were not covered by tests
}
}
}
Expand All @@ -145,30 +164,31 @@
return;
}

if let Err(err) = self.sender.send(Some(span)) {
if let Err(err) = self.message_sender.send(Message::ExportSpan(span)) {
global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
}
}

fn force_flush(&self) -> TraceResult<()> {
// Ignored since all spans in Simple Processor will be exported as they ended.
self.signal(Message::Flush, "flushing");

Check warning on line 174 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L173-L174

Added lines #L173 - L174 were not covered by tests
Ok(())
}

fn shutdown(&mut self) -> TraceResult<()> {
if self.sender.send(None).is_ok() {
if let Err(err) = self.shutdown.recv() {
global::handle_error(TraceError::from(format!(
"error shutting down span processor: {:?}",
err
)))
}
}
self.signal(Message::Shutdown, "shutting down");

Ok(())
}
}

#[derive(Debug)]

Check warning on line 185 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L185

Added line #L185 was not covered by tests
enum Message {
ExportSpan(SpanData),
Flush(crossbeam_channel::Sender<()>),
Shutdown(crossbeam_channel::Sender<()>),
}

/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
/// them at a preconfigured interval.
///
Expand Down