Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always export batch when limit reached #519

Merged
merged 1 commit into from
Apr 14, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
125 changes: 60 additions & 65 deletions opentelemetry/src/sdk/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,30 +246,30 @@ impl SpanProcessor for BatchSpanProcessor {

fn force_flush(&self) -> TraceResult<()> {
let mut sender = self.message_sender.lock().map_err(|_| TraceError::from("When force flushing the BatchSpanProcessor, the message sender's lock has been poisoned"))?;
let (res_sender, res_receiver) = oneshot::channel::<Vec<ExportResult>>();
let (res_sender, res_receiver) = oneshot::channel();
sender.try_send(BatchMessage::Flush(Some(res_sender)))?;
for result in futures::executor::block_on(res_receiver)? {
result?;
}
Ok(())

futures::executor::block_on(res_receiver)
.map_err(|err| TraceError::Other(err.into()))
.and_then(|identity| identity)
}

fn shutdown(&mut self) -> TraceResult<()> {
let mut sender = self.message_sender.lock().map_err(|_| TraceError::from("When shutting down the BatchSpanProcessor, the message sender's lock has been poisoned"))?;
let (res_sender, res_receiver) = oneshot::channel::<Vec<ExportResult>>();
let (res_sender, res_receiver) = oneshot::channel();
sender.try_send(BatchMessage::Shutdown(res_sender))?;
for result in futures::executor::block_on(res_receiver)? {
result?;
}
Ok(())

futures::executor::block_on(res_receiver)
.map_err(|err| TraceError::Other(err.into()))
.and_then(|identity| identity)
}
}

#[derive(Debug)]
enum BatchMessage {
ExportSpan(SpanData),
Flush(Option<oneshot::Sender<Vec<ExportResult>>>),
Shutdown(oneshot::Sender<Vec<ExportResult>>),
Flush(Option<oneshot::Sender<ExportResult>>),
Shutdown(oneshot::Sender<ExportResult>),
}

impl BatchSpanProcessor {
Expand All @@ -296,74 +296,65 @@ impl BatchSpanProcessor {
match message {
// Span has finished, add to buffer of pending spans.
BatchMessage::ExportSpan(span) => {
if spans.len() < config.max_queue_size {
spans.push(span);
}
}
// Span batch interval time reached or a force flush has been invoked, export current spans.
BatchMessage::Flush(Some(ch)) => {
let mut results =
Vec::with_capacity(spans.len() / config.max_export_batch_size + 1);
while !spans.is_empty() {
let batch = spans.split_off(
spans.len().saturating_sub(config.max_export_batch_size),
);

results.push(
export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
batch,
).await
);
}
let send_result = ch.send(results);
if send_result.is_err() {
global::handle_error(TraceError::from("fail to send the export response from worker handle in BatchProcessor"))
}
}
BatchMessage::Flush(None) => {
while !spans.is_empty() {
let batch = spans.split_off(
spans.len().saturating_sub(config.max_export_batch_size),
);
spans.push(span);

if spans.len() == config.max_export_batch_size {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
batch,
).await;
spans.split_off(0),
)
.await;

if let Err(err) = result {
global::handle_error(err);
}
}
}
// Span batch interval time reached or a force flush has been invoked, export current spans.
BatchMessage::Flush(res_channel) => {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
spans.split_off(0),
)
.await;

if let Some(channel) = res_channel {
if let Err(err) = channel.send(result) {
global::handle_error(TraceError::from(format!(
"failed to send flush result: {:?}",
err
)));
}
} else if let Err(err) = result {
global::handle_error(TraceError::from(format!(
"failed to export batch {:?}",
err
)));
}
}
// Stream has terminated or processor is shutdown, return to finish execution.
BatchMessage::Shutdown(ch) => {
let mut results =
Vec::with_capacity(spans.len() / config.max_export_batch_size + 1);
while !spans.is_empty() {
let batch = spans.split_off(
spans.len().saturating_sub(config.max_export_batch_size),
);

results.push(
export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
batch,
).await
);
}
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
spans.split_off(0),
)
.await;

exporter.shutdown();
let send_result = ch.send(results);
if send_result.is_err() {
global::handle_error(TraceError::from("fail to send the export response from worker handle in BatchProcessor"))

if let Err(err) = ch.send(result) {
global::handle_error(TraceError::from(format!(
"failed to send batch processor shutdown result: {:?}",
err
)));
}

break;
}
}
Expand Down Expand Up @@ -400,6 +391,10 @@ where
R: Runtime,
E: SpanExporter + ?Sized,
{
if batch.is_empty() {
return Ok(());
}

let export = exporter.export(batch);
let timeout = runtime.delay(time_out);
pin_mut!(export);
Expand Down