Skip to content

Commit

Permalink
Always export batch when limit reached (#519)
Browse files Browse the repository at this point in the history
Currently spans will begin to drop if more than the max queue size are
generated within a given batch window. This change causes the batch to
export when full, allowing more spans to be exported within a given
window even if the scheduled export time has not yet been reached.
  • Loading branch information
jtescher committed Apr 14, 2021
1 parent 607b32a commit 1cdf725
Showing 1 changed file with 60 additions and 65 deletions.
125 changes: 60 additions & 65 deletions opentelemetry/src/sdk/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,30 +246,30 @@ impl SpanProcessor for BatchSpanProcessor {

fn force_flush(&self) -> TraceResult<()> {
let mut sender = self.message_sender.lock().map_err(|_| TraceError::from("When force flushing the BatchSpanProcessor, the message sender's lock has been poisoned"))?;
let (res_sender, res_receiver) = oneshot::channel::<Vec<ExportResult>>();
let (res_sender, res_receiver) = oneshot::channel();
sender.try_send(BatchMessage::Flush(Some(res_sender)))?;
for result in futures::executor::block_on(res_receiver)? {
result?;
}
Ok(())

futures::executor::block_on(res_receiver)
.map_err(|err| TraceError::Other(err.into()))
.and_then(|identity| identity)
}

fn shutdown(&mut self) -> TraceResult<()> {
let mut sender = self.message_sender.lock().map_err(|_| TraceError::from("When shutting down the BatchSpanProcessor, the message sender's lock has been poisoned"))?;
let (res_sender, res_receiver) = oneshot::channel::<Vec<ExportResult>>();
let (res_sender, res_receiver) = oneshot::channel();
sender.try_send(BatchMessage::Shutdown(res_sender))?;
for result in futures::executor::block_on(res_receiver)? {
result?;
}
Ok(())

futures::executor::block_on(res_receiver)
.map_err(|err| TraceError::Other(err.into()))
.and_then(|identity| identity)
}
}

#[derive(Debug)]
enum BatchMessage {
ExportSpan(SpanData),
Flush(Option<oneshot::Sender<Vec<ExportResult>>>),
Shutdown(oneshot::Sender<Vec<ExportResult>>),
Flush(Option<oneshot::Sender<ExportResult>>),
Shutdown(oneshot::Sender<ExportResult>),
}

impl BatchSpanProcessor {
Expand All @@ -296,74 +296,65 @@ impl BatchSpanProcessor {
match message {
// Span has finished, add to buffer of pending spans.
BatchMessage::ExportSpan(span) => {
if spans.len() < config.max_queue_size {
spans.push(span);
}
}
// Span batch interval time reached or a force flush has been invoked, export current spans.
BatchMessage::Flush(Some(ch)) => {
let mut results =
Vec::with_capacity(spans.len() / config.max_export_batch_size + 1);
while !spans.is_empty() {
let batch = spans.split_off(
spans.len().saturating_sub(config.max_export_batch_size),
);

results.push(
export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
batch,
).await
);
}
let send_result = ch.send(results);
if send_result.is_err() {
global::handle_error(TraceError::from("fail to send the export response from worker handle in BatchProcessor"))
}
}
BatchMessage::Flush(None) => {
while !spans.is_empty() {
let batch = spans.split_off(
spans.len().saturating_sub(config.max_export_batch_size),
);
spans.push(span);

if spans.len() == config.max_export_batch_size {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
batch,
).await;
spans.split_off(0),
)
.await;

if let Err(err) = result {
global::handle_error(err);
}
}
}
// Span batch interval time reached or a force flush has been invoked, export current spans.
BatchMessage::Flush(res_channel) => {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
spans.split_off(0),
)
.await;

if let Some(channel) = res_channel {
if let Err(err) = channel.send(result) {
global::handle_error(TraceError::from(format!(
"failed to send flush result: {:?}",
err
)));
}
} else if let Err(err) = result {
global::handle_error(TraceError::from(format!(
"failed to export batch {:?}",
err
)));
}
}
// Stream has terminated or processor is shutdown, return to finish execution.
BatchMessage::Shutdown(ch) => {
let mut results =
Vec::with_capacity(spans.len() / config.max_export_batch_size + 1);
while !spans.is_empty() {
let batch = spans.split_off(
spans.len().saturating_sub(config.max_export_batch_size),
);

results.push(
export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
batch,
).await
);
}
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
spans.split_off(0),
)
.await;

exporter.shutdown();
let send_result = ch.send(results);
if send_result.is_err() {
global::handle_error(TraceError::from("fail to send the export response from worker handle in BatchProcessor"))

if let Err(err) = ch.send(result) {
global::handle_error(TraceError::from(format!(
"failed to send batch processor shutdown result: {:?}",
err
)));
}

break;
}
}
Expand Down Expand Up @@ -400,6 +391,10 @@ where
R: Runtime,
E: SpanExporter + ?Sized,
{
if batch.is_empty() {
return Ok(());
}

let export = exporter.export(batch);
let timeout = runtime.delay(time_out);
pin_mut!(export);
Expand Down

0 comments on commit 1cdf725

Please sign in to comment.