Skip to content

Commit

Permalink
Span events stored as Vector instead of EvictedQueue (#1350)
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas committed Nov 9, 2023
1 parent 7478954 commit d7e4629
Show file tree
Hide file tree
Showing 14 changed files with 176 additions and 55 deletions.
5 changes: 2 additions & 3 deletions opentelemetry-datadog/src/exporter/model/mod.rs
Expand Up @@ -195,7 +195,7 @@ pub(crate) mod tests {
};
use opentelemetry_sdk::{
self,
trace::{EvictedQueue, SpanLinks},
trace::{SpanEvents, SpanLinks},
InstrumentationLibrary, Resource,
};
use std::borrow::Cow;
Expand All @@ -217,9 +217,8 @@ pub(crate) mod tests {
let start_time = SystemTime::UNIX_EPOCH;
let end_time = start_time.checked_add(Duration::from_secs(1)).unwrap();

let capacity = 3;
let attributes = vec![KeyValue::new("span.type", "web")];
let events = EvictedQueue::new(capacity);
let events = SpanEvents::default();
let links = SpanLinks::default();
let resource = Resource::new(vec![KeyValue::new("host.name", "test")]);

Expand Down
12 changes: 5 additions & 7 deletions opentelemetry-jaeger/src/exporter/mod.rs
Expand Up @@ -19,13 +19,11 @@ use opentelemetry::{
trace::{Event, Link, SpanKind, Status},
InstrumentationLibrary, Key, KeyValue,
};
use opentelemetry_sdk::{
export::{
trace::{ExportResult, SpanData, SpanExporter},
ExportError,
},
trace::EvictedQueue,
use opentelemetry_sdk::export::{
trace::{ExportResult, SpanData, SpanExporter},
ExportError,
};
use opentelemetry_sdk::trace::SpanEvents;

use crate::exporter::uploader::Uploader;

Expand Down Expand Up @@ -255,7 +253,7 @@ impl UserOverrides {
}
}

fn events_to_logs(events: EvictedQueue<Event>) -> Option<Vec<jaeger::Log>> {
fn events_to_logs(events: SpanEvents) -> Option<Vec<jaeger::Log>> {
if events.is_empty() {
None
} else {
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-proto/src/transform/trace.rs
Expand Up @@ -82,7 +82,7 @@ pub mod tonic {
end_time_unix_nano: to_nanos(source_span.end_time),
dropped_attributes_count: source_span.dropped_attributes_count,
attributes: Attributes::from(source_span.attributes).0,
dropped_events_count: source_span.events.dropped_count(),
dropped_events_count: source_span.events.dropped_count,
events: source_span
.events
.into_iter()
Expand Down Expand Up @@ -193,7 +193,7 @@ pub mod grpcio {
end_time_unix_nano: to_nanos(source_span.end_time),
dropped_attributes_count: source_span.dropped_attributes_count,
attributes: Attributes::from(source_span.attributes).0,
dropped_events_count: source_span.events.dropped_count(),
dropped_events_count: source_span.events.dropped_count,
events: source_span
.events
.into_iter()
Expand Down
18 changes: 11 additions & 7 deletions opentelemetry-sdk/CHANGELOG.md
Expand Up @@ -5,19 +5,23 @@
### Changed

- **Breaking**
[#1313](https://github.com/open-telemetry/opentelemetry-rust/issues/1313)
Changes how Span links are stored to achieve performance gains. See below for
details:

*Behavior Change*: When enforcing `max_links_per_span` from `SpanLimits`,
links are kept in the first-come order. The previous "eviction" based approach
is no longer performed.
[#1313](https://github.com/open-telemetry/opentelemetry-rust/pull/1313)
[#1350](https://github.com/open-telemetry/opentelemetry-rust/pull/1350)
Changes how Span links/events are stored to achieve performance gains. See
below for details:

*Behavior Change*: When enforcing `max_links_per_span`, `max_events_per_span`
from `SpanLimits`, links/events are kept in the first-come order. The previous
"eviction" based approach is no longer performed.

*Breaking Change Affecting Exporter authors*:

`SpanData` now stores `links` as `SpanLinks` instead of `EvictedQueue` where
`SpanLinks` is a struct with a `Vec` of links and `dropped_count`.

`SpanData` now stores `events` as `SpanEvents` instead of `EvictedQueue` where
`SpanEvents` is a struct with a `Vec` of events and `dropped_count`.

## v0.21.0

### Added
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/benches/batch_span_processor.rs
Expand Up @@ -5,7 +5,7 @@ use opentelemetry::trace::{
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::runtime::Tokio;
use opentelemetry_sdk::testing::trace::NoopSpanExporter;
use opentelemetry_sdk::trace::{BatchSpanProcessor, EvictedQueue, SpanLinks, SpanProcessor};
use opentelemetry_sdk::trace::{BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor};
use opentelemetry_sdk::Resource;
use std::borrow::Cow;
use std::sync::Arc;
Expand All @@ -29,7 +29,7 @@ fn get_span_data() -> Vec<SpanData> {
end_time: SystemTime::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: EvictedQueue::new(12),
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: Cow::Owned(Resource::empty()),
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/export/trace.rs
@@ -1,7 +1,7 @@
//! Trace exporters
use crate::Resource;
use futures_util::future::BoxFuture;
use opentelemetry::trace::{Event, SpanContext, SpanId, SpanKind, Status, TraceError};
use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceError};
use opentelemetry::KeyValue;
use std::borrow::Cow;
use std::fmt::Debug;
Expand Down Expand Up @@ -87,7 +87,7 @@ pub struct SpanData {
/// dropped.
pub dropped_attributes_count: u32,
/// Span events
pub events: crate::trace::EvictedQueue<Event>,
pub events: crate::trace::SpanEvents,
/// Span Links
pub links: crate::trace::SpanLinks,
/// Span status
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/testing/trace/mod.rs
Expand Up @@ -7,7 +7,7 @@ use crate::{
trace::{ExportResult, SpanData, SpanExporter},
ExportError,
},
trace::{Config, EvictedQueue, SpanLinks},
trace::{Config, SpanEvents, SpanLinks},
InstrumentationLibrary,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -36,7 +36,7 @@ pub fn new_test_export_span_data() -> SpanData {
end_time: opentelemetry::time::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: EvictedQueue::new(config.span_limits.max_events_per_span),
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: config.resource,
Expand Down
37 changes: 37 additions & 0 deletions opentelemetry-sdk/src/trace/events.rs
@@ -0,0 +1,37 @@
//! # Span Events

use std::ops::Deref;

use opentelemetry::trace::Event;
/// Stores span events along with dropped count.
#[derive(Clone, Debug, Default, PartialEq)]
#[non_exhaustive]
pub struct SpanEvents {
/// The events stored as a vector. Could be empty if there are no events.
pub events: Vec<Event>,
/// The number of Events dropped from the span.
pub dropped_count: u32,
}

impl Deref for SpanEvents {
type Target = [Event];

fn deref(&self) -> &Self::Target {
&self.events
}
}

impl IntoIterator for SpanEvents {
type Item = Event;
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.events.into_iter()
}
}

impl SpanEvents {
pub(crate) fn add_event(&mut self, event: Event) {
self.events.push(event);
}
}
44 changes: 42 additions & 2 deletions opentelemetry-sdk/src/trace/mod.rs
Expand Up @@ -7,6 +7,7 @@
//! current operation execution.
//! * The [`TracerProvider`] struct which configures and produces [`Tracer`]s.
mod config;
mod events;
mod evicted_hash_map;
mod evicted_queue;
mod id_generator;
Expand All @@ -19,6 +20,7 @@ mod span_processor;
mod tracer;

pub use config::{config, Config};
pub use events::SpanEvents;
pub use evicted_hash_map::EvictedHashMap;
pub use evicted_queue::EvictedQueue;
pub use id_generator::{aws::XrayIdGenerator, IdGenerator, RandomIdGenerator};
Expand All @@ -42,11 +44,12 @@ mod runtime_tests;
mod tests {
use super::*;
use crate::{
testing::trace::InMemorySpanExporterBuilder, trace::span_limit::DEFAULT_MAX_LINKS_PER_SPAN,
testing::trace::InMemorySpanExporterBuilder,
trace::span_limit::{DEFAULT_MAX_EVENT_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN},
};
use opentelemetry::{
trace::{
Link, Span, SpanBuilder, SpanContext, SpanId, TraceFlags, TraceId, Tracer,
Event, Link, Span, SpanBuilder, SpanContext, SpanId, TraceFlags, TraceId, Tracer,
TracerProvider as _,
},
KeyValue,
Expand Down Expand Up @@ -140,4 +143,41 @@ mod tests {
assert_eq!(span.name, "span_name");
assert_eq!(span.links.len(), DEFAULT_MAX_LINKS_PER_SPAN as usize);
}

#[test]
fn exceed_span_events_limit() {
// Arrange
let exporter = InMemorySpanExporterBuilder::new().build();
let provider = TracerProvider::builder()
.with_span_processor(SimpleSpanProcessor::new(Box::new(exporter.clone())))
.build();

// Act
let tracer = provider.tracer("test_tracer");

let mut events = Vec::new();
for _i in 0..(DEFAULT_MAX_EVENT_PER_SPAN * 2) {
events.push(Event::with_name("test event"))
}

// add events via span builder
let span_builder = SpanBuilder::from_name("span_name").with_events(events);
let mut span = tracer.build(span_builder);

// add events using span api after building the span
span.add_event("test event again, after span builder", Vec::new());
span.add_event("test event once again, after span builder", Vec::new());
span.end();
provider.force_flush();

// Assert
let exported_spans = exporter
.get_finished_spans()
.expect("Spans are expected to be exported.");
assert_eq!(exported_spans.len(), 1);
let span = &exported_spans[0];
assert_eq!(span.name, "span_name");
assert_eq!(span.events.len(), DEFAULT_MAX_EVENT_PER_SPAN as usize);
assert_eq!(span.events.dropped_count, DEFAULT_MAX_EVENT_PER_SPAN + 2);
}
}
66 changes: 50 additions & 16 deletions opentelemetry-sdk/src/trace/span.rs
Expand Up @@ -11,7 +11,7 @@
use crate::trace::SpanLimits;
use crate::Resource;
use opentelemetry::trace::{Event, SpanContext, SpanId, SpanKind, Status};
use opentelemetry::{trace, KeyValue};
use opentelemetry::KeyValue;
use std::borrow::Cow;
use std::time::SystemTime;

Expand Down Expand Up @@ -42,7 +42,7 @@ pub(crate) struct SpanData {
/// dropped.
pub(crate) dropped_attributes_count: u32,
/// Span events
pub(crate) events: crate::trace::EvictedQueue<trace::Event>,
pub(crate) events: crate::trace::SpanEvents,
/// Span Links
pub(crate) links: crate::trace::SpanLinks,
/// Span status
Expand Down Expand Up @@ -99,17 +99,23 @@ impl opentelemetry::trace::Span for Span {
) where
T: Into<Cow<'static, str>>,
{
let span_events_limit = self.span_limits.max_events_per_span as usize;
let event_attributes_limit = self.span_limits.max_attributes_per_event as usize;
self.with_data(|data| {
let dropped_attributes_count = attributes.len().saturating_sub(event_attributes_limit);
attributes.truncate(event_attributes_limit);

data.events.push_back(Event::new(
name,
timestamp,
attributes,
dropped_attributes_count as u32,
))
if data.events.len() < span_events_limit {
let dropped_attributes_count =
attributes.len().saturating_sub(event_attributes_limit);
attributes.truncate(event_attributes_limit);

data.events.add_event(Event::new(
name,
timestamp,
attributes,
dropped_attributes_count as u32,
));
} else {
data.events.dropped_count += 1;
}
});
}

Expand Down Expand Up @@ -252,17 +258,16 @@ mod tests {
use crate::testing::trace::NoopSpanExporter;
use crate::trace::span_limit::{
DEFAULT_MAX_ATTRIBUTES_PER_EVENT, DEFAULT_MAX_ATTRIBUTES_PER_LINK,
DEFAULT_MAX_ATTRIBUTES_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN,
DEFAULT_MAX_ATTRIBUTES_PER_SPAN, DEFAULT_MAX_EVENT_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN,
};
use crate::trace::SpanLinks;
use opentelemetry::trace::{Link, SpanBuilder, TraceFlags, TraceId, Tracer};
use crate::trace::{SpanEvents, SpanLinks};
use opentelemetry::trace::{self, Link, SpanBuilder, TraceFlags, TraceId, Tracer};
use opentelemetry::{trace::Span as _, trace::TracerProvider, KeyValue};
use std::time::Duration;
use std::vec;

fn init() -> (crate::trace::Tracer, SpanData) {
let provider = crate::trace::TracerProvider::default();
let config = provider.config();
let tracer = provider.tracer("opentelemetry");
let data = SpanData {
parent_span_id: SpanId::from_u64(0),
Expand All @@ -272,7 +277,7 @@ mod tests {
end_time: opentelemetry::time::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: crate::trace::EvictedQueue::new(config.span_limits.max_events_per_span),
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
};
Expand Down Expand Up @@ -649,6 +654,35 @@ mod tests {
assert_eq!(link_vec.len(), DEFAULT_MAX_LINKS_PER_SPAN as usize);
}

#[test]
fn exceed_span_events_limit() {
let exporter = NoopSpanExporter::new();
let provider_builder =
crate::trace::TracerProvider::builder().with_simple_exporter(exporter);
let provider = provider_builder.build();
let tracer = provider.tracer("opentelemetry-test");

let mut events = Vec::new();
for _i in 0..(DEFAULT_MAX_EVENT_PER_SPAN * 2) {
events.push(Event::with_name("test event"))
}

// add events via span builder
let span_builder = tracer.span_builder("test").with_events(events);
let mut span = tracer.build(span_builder);

// add events using span api after building the span
span.add_event("test event again, after span builder", Vec::new());
span.add_event("test event once again, after span builder", Vec::new());
let span_events = span
.data
.clone()
.expect("span data should not be empty as we already set it before")
.events;
let event_vec: Vec<_> = span_events.events;
assert_eq!(event_vec.len(), DEFAULT_MAX_EVENT_PER_SPAN as usize);
}

#[test]
fn test_span_exported_data() {
let provider = crate::trace::TracerProvider::builder()
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/trace/span_processor.rs
Expand Up @@ -721,7 +721,7 @@ mod tests {
use crate::testing::trace::{
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
};
use crate::trace::{BatchConfig, EvictedQueue, SpanLinks};
use crate::trace::{BatchConfig, SpanEvents, SpanLinks};
use async_trait::async_trait;
use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
use std::fmt::Debug;
Expand Down Expand Up @@ -750,7 +750,7 @@ mod tests {
end_time: opentelemetry::time::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: EvictedQueue::new(0),
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: Default::default(),
Expand Down

0 comments on commit d7e4629

Please sign in to comment.