From 68a2567b1011936b3dd34c3a570c9ec9b7601ac5 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Mon, 5 Apr 2021 20:38:52 -0400 Subject: [PATCH 1/2] feat: add force_push_tracer_provider function. This helps user to force push all remaining spans within span processors --- opentelemetry/src/global/mod.rs | 5 ++- opentelemetry/src/global/trace.rs | 33 +++++++++++++- opentelemetry/src/sdk/trace/provider.rs | 60 +++++++++++++++++++++++++ opentelemetry/src/trace/noop.rs | 6 +++ opentelemetry/src/trace/provider.rs | 5 ++- 5 files changed, 105 insertions(+), 4 deletions(-) diff --git a/opentelemetry/src/global/mod.rs b/opentelemetry/src/global/mod.rs index 3922ee96c3..2e12bfccf2 100644 --- a/opentelemetry/src/global/mod.rs +++ b/opentelemetry/src/global/mod.rs @@ -156,6 +156,7 @@ pub use propagation::{get_text_map_propagator, set_text_map_propagator}; #[cfg(feature = "trace")] #[cfg_attr(docsrs, doc(cfg(feature = "trace")))] pub use trace::{ - set_tracer_provider, shutdown_tracer_provider, tracer, tracer_provider, tracer_with_version, - BoxedSpan, BoxedTracer, GenericTracer, GenericTracerProvider, GlobalTracerProvider, + force_push_tracer_provider, set_tracer_provider, shutdown_tracer_provider, tracer, + tracer_provider, tracer_with_version, BoxedSpan, BoxedTracer, GenericTracer, + GenericTracerProvider, GlobalTracerProvider, }; diff --git a/opentelemetry/src/global/trace.rs b/opentelemetry/src/global/trace.rs index 98635c7cc9..12b74cad9a 100644 --- a/opentelemetry/src/global/trace.rs +++ b/opentelemetry/src/global/trace.rs @@ -1,4 +1,5 @@ -use crate::trace::NoopTracerProvider; +use crate::global::handle_error; +use crate::trace::{NoopTracerProvider, TraceResult}; use crate::{trace, trace::TracerProvider, Context, KeyValue}; use std::borrow::Cow; use std::fmt; @@ -168,6 +169,9 @@ pub trait GenericTracerProvider: fmt::Debug + 'static { name: &'static str, version: Option<&'static str>, ) -> Box; + + /// Force push all remaining spans in span processors and return results. + fn force_push(&self) -> Vec>; } impl GenericTracerProvider for P @@ -184,6 +188,10 @@ where ) -> Box { Box::new(self.get_tracer(name, version)) } + + fn force_push(&self) -> Vec> { + self.force_push() + } } /// Represents the globally configured [`TracerProvider`] instance for this @@ -217,6 +225,11 @@ impl trace::TracerProvider for GlobalTracerProvider { fn get_tracer(&self, name: &'static str, version: Option<&'static str>) -> Self::Tracer { BoxedTracer(self.provider.get_tracer_boxed(name, version)) } + + /// Force push all remaining spans in span processors and return results. + fn force_push(&self) -> Vec> { + self.provider.force_push() + } } lazy_static::lazy_static! { @@ -291,6 +304,20 @@ pub fn shutdown_tracer_provider() { ); } +/// Force push all remaining spans in span processors. +pub fn force_push_tracer_provider() { + let tracer_provider = GLOBAL_TRACER_PROVIDER + .write() + .expect("GLOBAL_TRACER_PROVIDER RwLock poisoned"); + + let results = trace::TracerProvider::force_push(&*tracer_provider); + for result in results { + if let Err(err) = result { + handle_error(err) + } + } +} + #[cfg(test)] // Note that all tests here should be marked as ignore so that it won't be picked up by default We // need to run those tests one by one as the GlobalTracerProvider is a shared object between @@ -378,6 +405,10 @@ mod tests { fn get_tracer(&self, _name: &'static str, _version: Option<&'static str>) -> Self::Tracer { NoopTracer::default() } + + fn force_push(&self) -> Vec> { + Vec::new() + } } #[test] diff --git a/opentelemetry/src/sdk/trace/provider.rs b/opentelemetry/src/sdk/trace/provider.rs index 60e4eb2bd3..e148257079 100644 --- a/opentelemetry/src/sdk/trace/provider.rs +++ b/opentelemetry/src/sdk/trace/provider.rs @@ -8,6 +8,7 @@ //! propagators) are provided by the `TracerProvider`. `Tracer` instances do //! not duplicate this data to avoid that different `Tracer` instances //! of the `TracerProvider` have different versions of these data. +use crate::trace::TraceResult; use crate::{ global, runtime::Runtime, @@ -85,6 +86,14 @@ impl crate::trace::TracerProvider for TracerProvider { sdk::trace::Tracer::new(instrumentation_lib, Arc::downgrade(&self.inner)) } + + /// Force push all remaining spans in span processors and return results. + fn force_push(&self) -> Vec> { + self.span_processors() + .iter() + .map(|processor| processor.force_flush()) + .collect() + } } /// Builder for provider attributes. @@ -146,3 +155,54 @@ impl Builder { } } } + +#[cfg(test)] +mod tests { + use crate::sdk::export::trace::SpanData; + use crate::sdk::trace::provider::TracerProviderInner; + use crate::sdk::trace::{Span, SpanProcessor}; + use crate::trace::{TraceError, TraceResult, TracerProvider}; + use crate::Context; + use std::sync::Arc; + + #[derive(Debug)] + struct TestSpanProcessor { + success: bool, + } + + impl SpanProcessor for TestSpanProcessor { + fn on_start(&self, _span: &Span, _cx: &Context) { + unimplemented!() + } + + fn on_end(&self, _span: SpanData) { + unimplemented!() + } + + fn force_flush(&self) -> TraceResult<()> { + if self.success { + Ok(()) + } else { + Err(TraceError::from("cannot export")) + } + } + + fn shutdown(&mut self) -> TraceResult<()> { + self.force_flush() + } + } + + #[test] + fn test_force_push() { + let tracer_provider = super::TracerProvider::new(Arc::from(TracerProviderInner { + processors: vec![ + Box::from(TestSpanProcessor { success: true }), + Box::from(TestSpanProcessor { success: false }), + ], + config: Default::default(), + })); + + let results = tracer_provider.force_push(); + assert_eq!(results.len(), 2); + } +} diff --git a/opentelemetry/src/trace/noop.rs b/opentelemetry/src/trace/noop.rs index 19250744c3..ca418db350 100644 --- a/opentelemetry/src/trace/noop.rs +++ b/opentelemetry/src/trace/noop.rs @@ -3,6 +3,7 @@ //! This implementation is returned as the global tracer if no `Tracer` //! has been set. It is also useful for testing purposes as it is intended //! to have minimal resource utilization and runtime impact. +use crate::trace::TraceResult; use crate::{ sdk::export::trace::{ExportResult, SpanData, SpanExporter}, trace, @@ -32,6 +33,11 @@ impl trace::TracerProvider for NoopTracerProvider { fn get_tracer(&self, _name: &'static str, _version: Option<&'static str>) -> Self::Tracer { NoopTracer::new() } + + /// Return an empty `Vec` as there isn't any span processors in `NoopTracerProvider` + fn force_push(&self) -> Vec> { + Vec::new() + } } /// A no-op instance of a `Span`. diff --git a/opentelemetry/src/trace/provider.rs b/opentelemetry/src/trace/provider.rs index 82d5991e29..1e84067d81 100644 --- a/opentelemetry/src/trace/provider.rs +++ b/opentelemetry/src/trace/provider.rs @@ -19,7 +19,7 @@ //! //! Implementations might require the user to specify configuration properties at //! `TracerProvider` creation time, or rely on external configurations. -use crate::trace::Tracer; +use crate::trace::{TraceResult, Tracer}; use std::fmt; /// An interface to create `Tracer` instances. @@ -30,4 +30,7 @@ pub trait TracerProvider: fmt::Debug + 'static { /// Creates a named tracer instance of `Self::Tracer`. /// If the name is an empty string then provider uses default name. fn get_tracer(&self, name: &'static str, version: Option<&'static str>) -> Self::Tracer; + + /// Force push all remaining spans in span processors and return results. + fn force_push(&self) -> Vec>; } From 34f260c8f603f072250c07a77a5fa0ca0b2fb3ab Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Mon, 5 Apr 2021 23:06:01 -0400 Subject: [PATCH 2/2] fix: rename the function from force_push to force_flush to keep it consistent across span processors and tracer providers. --- opentelemetry/src/global/mod.rs | 2 +- opentelemetry/src/global/trace.rs | 26 ++++++++++++++----------- opentelemetry/src/sdk/trace/provider.rs | 8 ++++---- opentelemetry/src/trace/noop.rs | 2 +- opentelemetry/src/trace/provider.rs | 4 ++-- 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/opentelemetry/src/global/mod.rs b/opentelemetry/src/global/mod.rs index 2e12bfccf2..bacf516397 100644 --- a/opentelemetry/src/global/mod.rs +++ b/opentelemetry/src/global/mod.rs @@ -156,7 +156,7 @@ pub use propagation::{get_text_map_propagator, set_text_map_propagator}; #[cfg(feature = "trace")] #[cfg_attr(docsrs, doc(cfg(feature = "trace")))] pub use trace::{ - force_push_tracer_provider, set_tracer_provider, shutdown_tracer_provider, tracer, + force_flush_tracer_provider, set_tracer_provider, shutdown_tracer_provider, tracer, tracer_provider, tracer_with_version, BoxedSpan, BoxedTracer, GenericTracer, GenericTracerProvider, GlobalTracerProvider, }; diff --git a/opentelemetry/src/global/trace.rs b/opentelemetry/src/global/trace.rs index 12b74cad9a..7f3bae226d 100644 --- a/opentelemetry/src/global/trace.rs +++ b/opentelemetry/src/global/trace.rs @@ -170,8 +170,8 @@ pub trait GenericTracerProvider: fmt::Debug + 'static { version: Option<&'static str>, ) -> Box; - /// Force push all remaining spans in span processors and return results. - fn force_push(&self) -> Vec>; + /// Force flush all remaining spans in span processors and return results. + fn force_flush(&self) -> Vec>; } impl GenericTracerProvider for P @@ -189,8 +189,8 @@ where Box::new(self.get_tracer(name, version)) } - fn force_push(&self) -> Vec> { - self.force_push() + fn force_flush(&self) -> Vec> { + self.force_flush() } } @@ -226,9 +226,9 @@ impl trace::TracerProvider for GlobalTracerProvider { BoxedTracer(self.provider.get_tracer_boxed(name, version)) } - /// Force push all remaining spans in span processors and return results. - fn force_push(&self) -> Vec> { - self.provider.force_push() + /// Force flush all remaining spans in span processors and return results. + fn force_flush(&self) -> Vec> { + self.provider.force_flush() } } @@ -304,13 +304,17 @@ pub fn shutdown_tracer_provider() { ); } -/// Force push all remaining spans in span processors. -pub fn force_push_tracer_provider() { +/// Force flush all remaining spans in span processors. +/// +/// Use the [`global::handle_error`] to handle errors happened during force flush. +/// +/// [`global::handle_error`]: crate::global::handle_error +pub fn force_flush_tracer_provider() { let tracer_provider = GLOBAL_TRACER_PROVIDER .write() .expect("GLOBAL_TRACER_PROVIDER RwLock poisoned"); - let results = trace::TracerProvider::force_push(&*tracer_provider); + let results = trace::TracerProvider::force_flush(&*tracer_provider); for result in results { if let Err(err) = result { handle_error(err) @@ -406,7 +410,7 @@ mod tests { NoopTracer::default() } - fn force_push(&self) -> Vec> { + fn force_flush(&self) -> Vec> { Vec::new() } } diff --git a/opentelemetry/src/sdk/trace/provider.rs b/opentelemetry/src/sdk/trace/provider.rs index e148257079..0659cad23f 100644 --- a/opentelemetry/src/sdk/trace/provider.rs +++ b/opentelemetry/src/sdk/trace/provider.rs @@ -87,8 +87,8 @@ impl crate::trace::TracerProvider for TracerProvider { sdk::trace::Tracer::new(instrumentation_lib, Arc::downgrade(&self.inner)) } - /// Force push all remaining spans in span processors and return results. - fn force_push(&self) -> Vec> { + /// Force flush all remaining spans in span processors and return results. + fn force_flush(&self) -> Vec> { self.span_processors() .iter() .map(|processor| processor.force_flush()) @@ -193,7 +193,7 @@ mod tests { } #[test] - fn test_force_push() { + fn test_force_flush() { let tracer_provider = super::TracerProvider::new(Arc::from(TracerProviderInner { processors: vec![ Box::from(TestSpanProcessor { success: true }), @@ -202,7 +202,7 @@ mod tests { config: Default::default(), })); - let results = tracer_provider.force_push(); + let results = tracer_provider.force_flush(); assert_eq!(results.len(), 2); } } diff --git a/opentelemetry/src/trace/noop.rs b/opentelemetry/src/trace/noop.rs index ca418db350..b7c3e084a1 100644 --- a/opentelemetry/src/trace/noop.rs +++ b/opentelemetry/src/trace/noop.rs @@ -35,7 +35,7 @@ impl trace::TracerProvider for NoopTracerProvider { } /// Return an empty `Vec` as there isn't any span processors in `NoopTracerProvider` - fn force_push(&self) -> Vec> { + fn force_flush(&self) -> Vec> { Vec::new() } } diff --git a/opentelemetry/src/trace/provider.rs b/opentelemetry/src/trace/provider.rs index 1e84067d81..ef357a5a5e 100644 --- a/opentelemetry/src/trace/provider.rs +++ b/opentelemetry/src/trace/provider.rs @@ -31,6 +31,6 @@ pub trait TracerProvider: fmt::Debug + 'static { /// If the name is an empty string then provider uses default name. fn get_tracer(&self, name: &'static str, version: Option<&'static str>) -> Self::Tracer; - /// Force push all remaining spans in span processors and return results. - fn force_push(&self) -> Vec>; + /// Force flush all remaining spans in span processors and return results. + fn force_flush(&self) -> Vec>; }