Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add force_push_tracer_provider function. #512

Merged
merged 3 commits into from Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions opentelemetry/src/global/mod.rs
Expand Up @@ -156,6 +156,7 @@ pub use propagation::{get_text_map_propagator, set_text_map_propagator};
#[cfg(feature = "trace")]
#[cfg_attr(docsrs, doc(cfg(feature = "trace")))]
pub use trace::{
set_tracer_provider, shutdown_tracer_provider, tracer, tracer_provider, tracer_with_version,
BoxedSpan, BoxedTracer, GenericTracer, GenericTracerProvider, GlobalTracerProvider,
force_flush_tracer_provider, set_tracer_provider, shutdown_tracer_provider, tracer,
tracer_provider, tracer_with_version, BoxedSpan, BoxedTracer, GenericTracer,
GenericTracerProvider, GlobalTracerProvider,
};
37 changes: 36 additions & 1 deletion opentelemetry/src/global/trace.rs
@@ -1,4 +1,5 @@
use crate::trace::NoopTracerProvider;
use crate::global::handle_error;
use crate::trace::{NoopTracerProvider, TraceResult};
use crate::{trace, trace::TracerProvider, Context, KeyValue};
use std::borrow::Cow;
use std::fmt;
Expand Down Expand Up @@ -168,6 +169,9 @@ pub trait GenericTracerProvider: fmt::Debug + 'static {
name: &'static str,
version: Option<&'static str>,
) -> Box<dyn GenericTracer + Send + Sync>;

/// Force flush all remaining spans in span processors and return results.
fn force_flush(&self) -> Vec<TraceResult<()>>;
}

impl<S, T, P> GenericTracerProvider for P
Expand All @@ -184,6 +188,10 @@ where
) -> Box<dyn GenericTracer + Send + Sync> {
Box::new(self.get_tracer(name, version))
}

fn force_flush(&self) -> Vec<TraceResult<()>> {
self.force_flush()
}
}

/// Represents the globally configured [`TracerProvider`] instance for this
Expand Down Expand Up @@ -217,6 +225,11 @@ impl trace::TracerProvider for GlobalTracerProvider {
fn get_tracer(&self, name: &'static str, version: Option<&'static str>) -> Self::Tracer {
BoxedTracer(self.provider.get_tracer_boxed(name, version))
}

/// Force flush all remaining spans in span processors and return results.
fn force_flush(&self) -> Vec<TraceResult<()>> {
self.provider.force_flush()
}
}

lazy_static::lazy_static! {
Expand Down Expand Up @@ -291,6 +304,24 @@ pub fn shutdown_tracer_provider() {
);
}

/// Force flush all remaining spans in span processors.
///
/// Use the [`global::handle_error`] to handle errors happened during force flush.
///
/// [`global::handle_error`]: crate::global::handle_error
pub fn force_flush_tracer_provider() {
let tracer_provider = GLOBAL_TRACER_PROVIDER
.write()
.expect("GLOBAL_TRACER_PROVIDER RwLock poisoned");

let results = trace::TracerProvider::force_flush(&*tracer_provider);
for result in results {
if let Err(err) = result {
handle_error(err)
}
}
}

#[cfg(test)]
// Note that all tests here should be marked as ignore so that it won't be picked up by default We
// need to run those tests one by one as the GlobalTracerProvider is a shared object between
Expand Down Expand Up @@ -378,6 +409,10 @@ mod tests {
fn get_tracer(&self, _name: &'static str, _version: Option<&'static str>) -> Self::Tracer {
NoopTracer::default()
}

fn force_flush(&self) -> Vec<TraceResult<()>> {
Vec::new()
}
}

#[test]
Expand Down
60 changes: 60 additions & 0 deletions opentelemetry/src/sdk/trace/provider.rs
Expand Up @@ -8,6 +8,7 @@
//! propagators) are provided by the `TracerProvider`. `Tracer` instances do
//! not duplicate this data to avoid that different `Tracer` instances
//! of the `TracerProvider` have different versions of these data.
use crate::trace::TraceResult;
use crate::{
global,
runtime::Runtime,
Expand Down Expand Up @@ -85,6 +86,14 @@ impl crate::trace::TracerProvider for TracerProvider {

sdk::trace::Tracer::new(instrumentation_lib, Arc::downgrade(&self.inner))
}

/// Force flush all remaining spans in span processors and return results.
fn force_flush(&self) -> Vec<TraceResult<()>> {
self.span_processors()
.iter()
.map(|processor| processor.force_flush())
.collect()
}
}

/// Builder for provider attributes.
Expand Down Expand Up @@ -146,3 +155,54 @@ impl Builder {
}
}
}

#[cfg(test)]
mod tests {
use crate::sdk::export::trace::SpanData;
use crate::sdk::trace::provider::TracerProviderInner;
use crate::sdk::trace::{Span, SpanProcessor};
use crate::trace::{TraceError, TraceResult, TracerProvider};
use crate::Context;
use std::sync::Arc;

#[derive(Debug)]
struct TestSpanProcessor {
success: bool,
}

impl SpanProcessor for TestSpanProcessor {
fn on_start(&self, _span: &Span, _cx: &Context) {
unimplemented!()
}

fn on_end(&self, _span: SpanData) {
unimplemented!()
}

fn force_flush(&self) -> TraceResult<()> {
if self.success {
Ok(())
} else {
Err(TraceError::from("cannot export"))
}
}

fn shutdown(&mut self) -> TraceResult<()> {
self.force_flush()
}
}

#[test]
fn test_force_flush() {
let tracer_provider = super::TracerProvider::new(Arc::from(TracerProviderInner {
processors: vec![
Box::from(TestSpanProcessor { success: true }),
Box::from(TestSpanProcessor { success: false }),
],
config: Default::default(),
}));

let results = tracer_provider.force_flush();
assert_eq!(results.len(), 2);
}
}
6 changes: 6 additions & 0 deletions opentelemetry/src/trace/noop.rs
Expand Up @@ -3,6 +3,7 @@
//! This implementation is returned as the global tracer if no `Tracer`
//! has been set. It is also useful for testing purposes as it is intended
//! to have minimal resource utilization and runtime impact.
use crate::trace::TraceResult;
use crate::{
sdk::export::trace::{ExportResult, SpanData, SpanExporter},
trace,
Expand Down Expand Up @@ -32,6 +33,11 @@ impl trace::TracerProvider for NoopTracerProvider {
fn get_tracer(&self, _name: &'static str, _version: Option<&'static str>) -> Self::Tracer {
NoopTracer::new()
}

/// Return an empty `Vec` as there isn't any span processors in `NoopTracerProvider`
fn force_flush(&self) -> Vec<TraceResult<()>> {
Vec::new()
}
}

/// A no-op instance of a `Span`.
Expand Down
5 changes: 4 additions & 1 deletion opentelemetry/src/trace/provider.rs
Expand Up @@ -19,7 +19,7 @@
//!
//! Implementations might require the user to specify configuration properties at
//! `TracerProvider` creation time, or rely on external configurations.
use crate::trace::Tracer;
use crate::trace::{TraceResult, Tracer};
use std::fmt;

/// An interface to create `Tracer` instances.
Expand All @@ -30,4 +30,7 @@ pub trait TracerProvider: fmt::Debug + 'static {
/// Creates a named tracer instance of `Self::Tracer`.
/// If the name is an empty string then provider uses default name.
fn get_tracer(&self, name: &'static str, version: Option<&'static str>) -> Self::Tracer;

/// Force flush all remaining spans in span processors and return results.
fn force_flush(&self) -> Vec<TraceResult<()>>;
}