Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Span events stored as Vector instead of EvictedQueue #1350

Merged
merged 11 commits into from Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 2 additions & 3 deletions opentelemetry-datadog/src/exporter/model/mod.rs
Expand Up @@ -195,7 +195,7 @@ pub(crate) mod tests {
};
use opentelemetry_sdk::{
self,
trace::{EvictedQueue, SpanLinks},
trace::{SpanEvents, SpanLinks},
InstrumentationLibrary, Resource,
};
use std::borrow::Cow;
Expand All @@ -217,9 +217,8 @@ pub(crate) mod tests {
let start_time = SystemTime::UNIX_EPOCH;
let end_time = start_time.checked_add(Duration::from_secs(1)).unwrap();

let capacity = 3;
let attributes = vec![KeyValue::new("span.type", "web")];
let events = EvictedQueue::new(capacity);
let events = SpanEvents::default();
let links = SpanLinks::default();
let resource = Resource::new(vec![KeyValue::new("host.name", "test")]);

Expand Down
12 changes: 5 additions & 7 deletions opentelemetry-jaeger/src/exporter/mod.rs
Expand Up @@ -19,13 +19,11 @@
trace::{Event, Link, SpanKind, Status},
InstrumentationLibrary, Key, KeyValue,
};
use opentelemetry_sdk::{
export::{
trace::{ExportResult, SpanData, SpanExporter},
ExportError,
},
trace::EvictedQueue,
use opentelemetry_sdk::export::{
trace::{ExportResult, SpanData, SpanExporter},
ExportError,
};
use opentelemetry_sdk::trace::SpanEvents;

use crate::exporter::uploader::Uploader;

Expand Down Expand Up @@ -255,7 +253,7 @@
}
}

fn events_to_logs(events: EvictedQueue<Event>) -> Option<Vec<jaeger::Log>> {
fn events_to_logs(events: SpanEvents) -> Option<Vec<jaeger::Log>> {

Check warning on line 256 in opentelemetry-jaeger/src/exporter/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-jaeger/src/exporter/mod.rs#L256

Added line #L256 was not covered by tests
if events.is_empty() {
None
} else {
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-proto/src/transform/trace.rs
Expand Up @@ -82,7 +82,7 @@
end_time_unix_nano: to_nanos(source_span.end_time),
dropped_attributes_count: source_span.dropped_attributes_count,
attributes: Attributes::from(source_span.attributes).0,
dropped_events_count: source_span.events.dropped_count(),
dropped_events_count: source_span.events.dropped_count,
events: source_span
.events
.into_iter()
Expand Down Expand Up @@ -193,7 +193,7 @@
end_time_unix_nano: to_nanos(source_span.end_time),
dropped_attributes_count: source_span.dropped_attributes_count,
attributes: Attributes::from(source_span.attributes).0,
dropped_events_count: source_span.events.dropped_count(),
dropped_events_count: source_span.events.dropped_count,

Check warning on line 196 in opentelemetry-proto/src/transform/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/trace.rs#L196

Added line #L196 was not covered by tests
events: source_span
.events
.into_iter()
Expand Down
18 changes: 11 additions & 7 deletions opentelemetry-sdk/CHANGELOG.md
Expand Up @@ -5,19 +5,23 @@
### Changed

- **Breaking**
[#1313](https://github.com/open-telemetry/opentelemetry-rust/issues/1313)
Changes how Span links are stored to achieve performance gains. See below for
details:

*Behavior Change*: When enforcing `max_links_per_span` from `SpanLimits`,
links are kept in the first-come order. The previous "eviction" based approach
is no longer performed.
[#1313](https://github.com/open-telemetry/opentelemetry-rust/pull/1313)
[#1350](https://github.com/open-telemetry/opentelemetry-rust/pull/1350)
Changes how Span links/events are stored to achieve performance gains. See
below for details:

*Behavior Change*: When enforcing `max_links_per_span`, `max_events_per_span`
from `SpanLimits`, links/events are kept in the first-come order. The previous
"eviction" based approach is no longer performed.

*Breaking Change Affecting Exporter authors*:

`SpanData` now stores `links` as `SpanLinks` instead of `EvictedQueue` where
`SpanLinks` is a struct with a `Vec` of links and `dropped_count`.

`SpanData` now stores `events` as `SpanEvents` instead of `EvictedQueue` where
`SpanEvents` is a struct with a `Vec` of events and `dropped_count`.

## v0.21.0

### Added
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/benches/batch_span_processor.rs
Expand Up @@ -5,7 +5,7 @@ use opentelemetry::trace::{
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::runtime::Tokio;
use opentelemetry_sdk::testing::trace::NoopSpanExporter;
use opentelemetry_sdk::trace::{BatchSpanProcessor, EvictedQueue, SpanLinks, SpanProcessor};
use opentelemetry_sdk::trace::{BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor};
use opentelemetry_sdk::Resource;
use std::borrow::Cow;
use std::sync::Arc;
Expand All @@ -29,7 +29,7 @@ fn get_span_data() -> Vec<SpanData> {
end_time: SystemTime::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: EvictedQueue::new(12),
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: Cow::Owned(Resource::empty()),
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/export/trace.rs
@@ -1,7 +1,7 @@
//! Trace exporters
use crate::Resource;
use futures_util::future::BoxFuture;
use opentelemetry::trace::{Event, SpanContext, SpanId, SpanKind, Status, TraceError};
use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceError};
use opentelemetry::KeyValue;
use std::borrow::Cow;
use std::fmt::Debug;
Expand Down Expand Up @@ -87,7 +87,7 @@ pub struct SpanData {
/// dropped.
pub dropped_attributes_count: u32,
/// Span events
pub events: crate::trace::EvictedQueue<Event>,
pub events: crate::trace::SpanEvents,
/// Span Links
pub links: crate::trace::SpanLinks,
/// Span status
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/testing/trace/mod.rs
Expand Up @@ -7,7 +7,7 @@ use crate::{
trace::{ExportResult, SpanData, SpanExporter},
ExportError,
},
trace::{Config, EvictedQueue, SpanLinks},
trace::{Config, SpanEvents, SpanLinks},
InstrumentationLibrary,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -36,7 +36,7 @@ pub fn new_test_export_span_data() -> SpanData {
end_time: opentelemetry::time::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: EvictedQueue::new(config.span_limits.max_events_per_span),
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: config.resource,
Expand Down
37 changes: 37 additions & 0 deletions opentelemetry-sdk/src/trace/events.rs
@@ -0,0 +1,37 @@
//! # Span Events

use std::ops::Deref;

use opentelemetry::trace::Event;
/// Stores span events along with dropped count.
#[derive(Clone, Debug, Default, PartialEq)]
#[non_exhaustive]
pub struct SpanEvents {
/// The events stored as a vector. Could be empty if there are no events.
pub events: Vec<Event>,
/// The number of Events dropped from the span.
pub dropped_count: u32,
}

impl Deref for SpanEvents {
type Target = [Event];

fn deref(&self) -> &Self::Target {
&self.events
}
}

impl IntoIterator for SpanEvents {
type Item = Event;
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.events.into_iter()
}
}

impl SpanEvents {
pub(crate) fn add_event(&mut self, event: Event) {
self.events.push(event);
}
}
44 changes: 42 additions & 2 deletions opentelemetry-sdk/src/trace/mod.rs
Expand Up @@ -7,6 +7,7 @@
//! current operation execution.
//! * The [`TracerProvider`] struct which configures and produces [`Tracer`]s.
mod config;
mod events;
mod evicted_hash_map;
mod evicted_queue;
mod id_generator;
Expand All @@ -19,6 +20,7 @@ mod span_processor;
mod tracer;

pub use config::{config, Config};
pub use events::SpanEvents;
pub use evicted_hash_map::EvictedHashMap;
pub use evicted_queue::EvictedQueue;
pub use id_generator::{aws::XrayIdGenerator, IdGenerator, RandomIdGenerator};
Expand All @@ -42,11 +44,12 @@ mod runtime_tests;
mod tests {
use super::*;
use crate::{
testing::trace::InMemorySpanExporterBuilder, trace::span_limit::DEFAULT_MAX_LINKS_PER_SPAN,
testing::trace::InMemorySpanExporterBuilder,
trace::span_limit::{DEFAULT_MAX_EVENT_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN},
};
use opentelemetry::{
trace::{
Link, Span, SpanBuilder, SpanContext, SpanId, TraceFlags, TraceId, Tracer,
Event, Link, Span, SpanBuilder, SpanContext, SpanId, TraceFlags, TraceId, Tracer,
TracerProvider as _,
},
KeyValue,
Expand Down Expand Up @@ -140,4 +143,41 @@ mod tests {
assert_eq!(span.name, "span_name");
assert_eq!(span.links.len(), DEFAULT_MAX_LINKS_PER_SPAN as usize);
}

#[test]
fn exceed_span_events_limit() {
// Arrange
let exporter = InMemorySpanExporterBuilder::new().build();
let provider = TracerProvider::builder()
.with_span_processor(SimpleSpanProcessor::new(Box::new(exporter.clone())))
.build();

// Act
let tracer = provider.tracer("test_tracer");

let mut events = Vec::new();
for _i in 0..(DEFAULT_MAX_EVENT_PER_SPAN * 2) {
events.push(Event::with_name("test event"))
}

// add events via span builder
let span_builder = SpanBuilder::from_name("span_name").with_events(events);
let mut span = tracer.build(span_builder);

// add events using span api after building the span
span.add_event("test event again, after span builder", Vec::new());
span.add_event("test event once again, after span builder", Vec::new());
span.end();
provider.force_flush();

// Assert
let exported_spans = exporter
.get_finished_spans()
.expect("Spans are expected to be exported.");
assert_eq!(exported_spans.len(), 1);
let span = &exported_spans[0];
assert_eq!(span.name, "span_name");
assert_eq!(span.events.len(), DEFAULT_MAX_EVENT_PER_SPAN as usize);
assert_eq!(span.events.dropped_count, DEFAULT_MAX_EVENT_PER_SPAN + 2);
}
}
66 changes: 50 additions & 16 deletions opentelemetry-sdk/src/trace/span.rs
Expand Up @@ -11,7 +11,7 @@
use crate::trace::SpanLimits;
use crate::Resource;
use opentelemetry::trace::{Event, SpanContext, SpanId, SpanKind, Status};
use opentelemetry::{trace, KeyValue};
use opentelemetry::KeyValue;
use std::borrow::Cow;
use std::time::SystemTime;

Expand Down Expand Up @@ -42,7 +42,7 @@ pub(crate) struct SpanData {
/// dropped.
pub(crate) dropped_attributes_count: u32,
/// Span events
pub(crate) events: crate::trace::EvictedQueue<trace::Event>,
pub(crate) events: crate::trace::SpanEvents,
/// Span Links
pub(crate) links: crate::trace::SpanLinks,
/// Span status
Expand Down Expand Up @@ -99,17 +99,23 @@ impl opentelemetry::trace::Span for Span {
) where
T: Into<Cow<'static, str>>,
{
let span_events_limit = self.span_limits.max_events_per_span as usize;
let event_attributes_limit = self.span_limits.max_attributes_per_event as usize;
self.with_data(|data| {
let dropped_attributes_count = attributes.len().saturating_sub(event_attributes_limit);
attributes.truncate(event_attributes_limit);

data.events.push_back(Event::new(
name,
timestamp,
attributes,
dropped_attributes_count as u32,
))
if data.events.len() < span_events_limit {
let dropped_attributes_count =
attributes.len().saturating_sub(event_attributes_limit);
attributes.truncate(event_attributes_limit);

data.events.add_event(Event::new(
name,
timestamp,
attributes,
dropped_attributes_count as u32,
));
} else {
data.events.dropped_count += 1;
}
});
}

Expand Down Expand Up @@ -252,17 +258,16 @@ mod tests {
use crate::testing::trace::NoopSpanExporter;
use crate::trace::span_limit::{
DEFAULT_MAX_ATTRIBUTES_PER_EVENT, DEFAULT_MAX_ATTRIBUTES_PER_LINK,
DEFAULT_MAX_ATTRIBUTES_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN,
DEFAULT_MAX_ATTRIBUTES_PER_SPAN, DEFAULT_MAX_EVENT_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN,
};
use crate::trace::SpanLinks;
use opentelemetry::trace::{Link, SpanBuilder, TraceFlags, TraceId, Tracer};
use crate::trace::{SpanEvents, SpanLinks};
use opentelemetry::trace::{self, Link, SpanBuilder, TraceFlags, TraceId, Tracer};
use opentelemetry::{trace::Span as _, trace::TracerProvider, KeyValue};
use std::time::Duration;
use std::vec;

fn init() -> (crate::trace::Tracer, SpanData) {
let provider = crate::trace::TracerProvider::default();
let config = provider.config();
let tracer = provider.tracer("opentelemetry");
let data = SpanData {
parent_span_id: SpanId::from_u64(0),
Expand All @@ -272,7 +277,7 @@ mod tests {
end_time: opentelemetry::time::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: crate::trace::EvictedQueue::new(config.span_limits.max_events_per_span),
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
};
Expand Down Expand Up @@ -649,6 +654,35 @@ mod tests {
assert_eq!(link_vec.len(), DEFAULT_MAX_LINKS_PER_SPAN as usize);
}

#[test]
fn exceed_span_events_limit() {
let exporter = NoopSpanExporter::new();
let provider_builder =
crate::trace::TracerProvider::builder().with_simple_exporter(exporter);
let provider = provider_builder.build();
let tracer = provider.tracer("opentelemetry-test");

let mut events = Vec::new();
for _i in 0..(DEFAULT_MAX_EVENT_PER_SPAN * 2) {
events.push(Event::with_name("test event"))
}

// add events via span builder
let span_builder = tracer.span_builder("test").with_events(events);
let mut span = tracer.build(span_builder);

// add events using span api after building the span
span.add_event("test event again, after span builder", Vec::new());
span.add_event("test event once again, after span builder", Vec::new());
let span_events = span
.data
.clone()
.expect("span data should not be empty as we already set it before")
.events;
let event_vec: Vec<_> = span_events.events;
assert_eq!(event_vec.len(), DEFAULT_MAX_EVENT_PER_SPAN as usize);
}

#[test]
fn test_span_exported_data() {
let provider = crate::trace::TracerProvider::builder()
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/trace/span_processor.rs
Expand Up @@ -721,7 +721,7 @@ mod tests {
use crate::testing::trace::{
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
};
use crate::trace::{BatchConfig, EvictedQueue, SpanLinks};
use crate::trace::{BatchConfig, SpanEvents, SpanLinks};
use async_trait::async_trait;
use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
use std::fmt::Debug;
Expand Down Expand Up @@ -750,7 +750,7 @@ mod tests {
end_time: opentelemetry::time::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: EvictedQueue::new(0),
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: Default::default(),
Expand Down