Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Resource Sharing Across Exporters with Arc Implementation #1526

Merged
merged 18 commits into from Feb 15, 2024
Merged
17 changes: 15 additions & 2 deletions opentelemetry-sdk/benches/log.rs
Expand Up @@ -7,10 +7,11 @@ use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::logs::{AnyValue, LogRecord, LogResult, Logger, LoggerProvider as _, Severity};
use opentelemetry::trace::Tracer;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::Key;
use opentelemetry::{Key, KeyValue};
use opentelemetry_sdk::export::logs::{LogData, LogExporter};
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::logs::{Config, LoggerProvider};
use opentelemetry_sdk::trace::{config, Sampler, TracerProvider};
use opentelemetry_sdk::Resource;

#[derive(Debug)]
struct VoidExporter;
Expand All @@ -27,6 +28,12 @@ fn log_benchmark_group<F: Fn(&dyn Logger)>(c: &mut Criterion, name: &str, f: F)

group.bench_function("no-context", |b| {
let provider = LoggerProvider::builder()
.with_config(Config::default().with_resource(Resource::new(vec![
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
KeyValue::new("service.name", "my-service"),
KeyValue::new("service.version", "1.0.0"),
KeyValue::new("service.environment", "production"),
KeyValue::new("service.instance.id", "1234"),
])))
.with_simple_exporter(VoidExporter)
.build();

Expand All @@ -38,6 +45,12 @@ fn log_benchmark_group<F: Fn(&dyn Logger)>(c: &mut Criterion, name: &str, f: F)
group.bench_function("with-context", |b| {
let provider = LoggerProvider::builder()
.with_simple_exporter(VoidExporter)
.with_config(Config::default().with_resource(Resource::new(vec![
KeyValue::new("service.name", "my-service"),
KeyValue::new("service.version", "1.0.0"),
KeyValue::new("service.environment", "production"),
KeyValue::new("service.instance.id", "1234"),
])))
.build();

let logger = provider.logger("with-context");
Expand Down
189 changes: 101 additions & 88 deletions opentelemetry-sdk/src/resource/mod.rs
Expand Up @@ -34,13 +34,22 @@
use std::borrow::Cow;
use std::collections::{hash_map, HashMap};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

/// Inner structure of `Resource` holding the actual data.
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
/// This structure is designed to be shared among `Resource` instances via `Arc`.
#[derive(Debug, Clone, PartialEq)]
struct ResourceInner {
attrs: HashMap<Key, Value>,
schema_url: Option<Cow<'static, str>>,
}

/// An immutable representation of the entity producing telemetry as attributes.
/// Utilizes `Arc` for efficient sharing and cloning.
#[derive(Clone, Debug, PartialEq)]
pub struct Resource {
attrs: HashMap<Key, Value>,
schema_url: Option<Cow<'static, str>>,
inner: Arc<ResourceInner>,
}

impl Default for Resource {
Expand All @@ -58,10 +67,13 @@

impl Resource {
/// Creates an empty resource.
/// This is the basic constructor that initializes a resource with no attributes and no schema URL.
pub fn empty() -> Self {
Self {
attrs: Default::default(),
schema_url: None,
Resource {
inner: Arc::new(ResourceInner {
attrs: HashMap::new(),
schema_url: None,
}),
}
}

Expand All @@ -70,13 +82,17 @@
/// Values are de-duplicated by key, and the first key-value pair with a non-empty string value
/// will be retained
pub fn new<T: IntoIterator<Item = KeyValue>>(kvs: T) -> Self {
let mut resource = Resource::empty();

for kv in kvs.into_iter() {
resource.attrs.insert(kv.key, kv.value);
let mut attrs = HashMap::new();
for kv in kvs {
attrs.insert(kv.key, kv.value);
}

resource
Resource {
inner: Arc::new(ResourceInner {
attrs,
schema_url: None,
}),
}
}

/// Create a new `Resource` from a key value pairs and [schema url].
Expand All @@ -92,8 +108,21 @@
KV: IntoIterator<Item = KeyValue>,
S: Into<Cow<'static, str>>,
{
let schema_url_str = schema_url.into();
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
let normalized_schema_url = if schema_url_str.is_empty() {
None
} else {
Some(schema_url_str)
};

let mut resource = Self::new(kvs);
resource.schema_url = Some(schema_url.into());
let new_inner = ResourceInner {
attrs: Arc::make_mut(&mut resource.inner).attrs.clone(),
schema_url: normalized_schema_url,
};

resource.inner = Arc::new(new_inner);

resource
}

Expand All @@ -105,8 +134,11 @@
for detector in detectors {
let detected_res = detector.detect(timeout);
for (key, value) in detected_res.into_iter() {
// using insert instead of merge to avoid clone.
resource.attrs.insert(key, value);
// This call ensures that if the Arc is not uniquely owned,
// the data is cloned before modification, preserving safety.
// If the Arc is uniquely owned, it simply returns a mutable reference to the data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a constructor, at what point would the inner not be uniquely owned?
That being said is there a reason we shouldn't put the make_mut before the loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it would be safe to move make_mut before loop at this point. The inner is ref-incremented only during LogData/SpanData creation.

let inner = Arc::make_mut(&mut resource.inner);
inner.attrs.insert(Key::new(key.clone()), value.clone());
}
}

Expand All @@ -129,87 +161,62 @@
///
/// [Schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url
pub fn merge<T: Deref<Target = Self>>(&self, other: T) -> Self {
if self.attrs.is_empty() {
if self.is_empty() {
return other.clone();
}
if other.attrs.is_empty() {
if other.is_empty() {
return self.clone();
}

let mut resource = Resource::empty();

// attrs from self take the less priority, even when the new value is empty.
for (k, v) in self.attrs.iter() {
resource.attrs.insert(k.clone(), v.clone());
}
for (k, v) in other.attrs.iter() {
resource.attrs.insert(k.clone(), v.clone());
let mut combined_attrs = self.inner.attrs.clone();
for (k, v) in other.inner.attrs.iter() {
combined_attrs.insert(k.clone(), v.clone());
}

if self.schema_url == other.schema_url {
resource.schema_url = self.schema_url.clone();
} else if self.schema_url.is_none() {
// if the other resource has schema url, use it.
if other.schema_url.is_some() {
resource.schema_url = other.schema_url.clone();
}
// else empty schema url.
} else {
// if self has schema url, use it.
if other.schema_url.is_none() {
resource.schema_url = self.schema_url.clone();
}
// Resolve the schema URL according to the precedence rules
let combined_schema_url = match (&self.inner.schema_url, &other.inner.schema_url) {
// If both resources have a schema URL and it's the same, use it
(Some(url1), Some(url2)) if url1 == url2 => Some(url1.clone()),
// If both resources have a schema URL but they are not the same, the schema URL will be empty
(Some(_), Some(_)) => None,
// If this resource does not have a schema URL, and the other resource has a schema URL, it will be used
(None, Some(url)) => Some(url.clone()),
// If this resource has a schema URL, it will be used (covers case 1 and any other cases where `self` has a schema URL)
(Some(url), _) => Some(url.clone()),
// If both resources do not have a schema URL, the schema URL will be empty
(None, None) => None,
};
Resource {
inner: Arc::new(ResourceInner {
attrs: combined_attrs,
schema_url: combined_schema_url,
}),
}

resource
}

/// Return the [schema url] of the resource. If the resource does not have a schema url, return `None`.
///
/// [schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url
pub fn schema_url(&self) -> Option<&str> {
self.schema_url.as_ref().map(|s| s.as_ref())
self.inner.schema_url.as_ref().map(|s| s.as_ref())
}

/// Returns the number of attributes for this resource
pub fn len(&self) -> usize {
self.attrs.len()
self.inner.attrs.len()
}

/// Returns `true` if the resource contains no attributes.
pub fn is_empty(&self) -> bool {
self.attrs.is_empty()
self.inner.attrs.is_empty()
}

/// Gets an iterator over the attributes of this resource, sorted by key.
pub fn iter(&self) -> Iter<'_> {
self.into_iter()
Iter(self.inner.attrs.iter())
}

/// Retrieve the value from resource associate with given key.
pub fn get(&self, key: Key) -> Option<Value> {
self.attrs.get(&key).cloned()
}
}

/// An owned iterator over the entries of a `Resource`.
#[derive(Debug)]
pub struct IntoIter(hash_map::IntoIter<Key, Value>);

impl Iterator for IntoIter {
type Item = (Key, Value);

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl IntoIterator for Resource {
type Item = (Key, Value);
type IntoIter = IntoIter;

fn into_iter(self) -> Self::IntoIter {
IntoIter(self.attrs.into_iter())
self.inner.attrs.get(&key).cloned()
}
}

Expand All @@ -230,7 +237,7 @@
type IntoIter = Iter<'a>;

fn into_iter(self) -> Self::IntoIter {
Iter(self.attrs.iter())
Iter(self.inner.attrs.iter())
}
}

Expand Down Expand Up @@ -264,13 +271,10 @@
let mut expected_attrs = HashMap::new();
expected_attrs.insert(Key::new("a"), Value::from("final"));

assert_eq!(
Resource::new(args_with_dupe_keys),
Resource {
attrs: expected_attrs,
schema_url: None,
}
);
let resource = Resource::new(args_with_dupe_keys);
let resource_inner = Arc::try_unwrap(resource.inner).expect("Failed to unwrap Arc");
assert_eq!(resource_inner.attrs, expected_attrs);
assert_eq!(resource_inner.schema_url, None);
}

#[test]
Expand All @@ -293,13 +297,14 @@
expected_attrs.insert(Key::new("c"), Value::from("c-value"));
expected_attrs.insert(Key::new("d"), Value::from(""));

assert_eq!(
resource_a.merge(&resource_b),
Resource {
let expected_resource = Resource {
inner: Arc::new(ResourceInner {
attrs: expected_attrs,
schema_url: None,
}
);
schema_url: None, // Assuming schema_url handling if needed
}),
};

assert_eq!(resource_a.merge(&resource_b), expected_resource);
}

#[test]
Expand All @@ -317,24 +322,32 @@
(None, None, None),
];

for (schema_url, other_schema_url, expect_schema_url) in test_cases.into_iter() {
let mut resource = Resource::new(vec![KeyValue::new("key", "")]);
resource.schema_url = schema_url.map(Into::into);
for (schema_url_a, schema_url_b, expected_schema_url) in test_cases.into_iter() {
let resource_a = Resource::from_schema_url(
vec![KeyValue::new("key", "")],
schema_url_a.unwrap_or(""),
);
let resource_b = Resource::from_schema_url(
vec![KeyValue::new("key", "")],
schema_url_b.unwrap_or(""),
);

let mut other_resource = Resource::new(vec![KeyValue::new("key", "")]);
other_resource.schema_url = other_schema_url.map(Into::into);
let merged_resource = resource_a.merge(&resource_b);
let result_schema_url = merged_resource.schema_url();

assert_eq!(
resource.merge(&other_resource).schema_url,
expect_schema_url.map(Into::into)
result_schema_url.map(|s| s as &str),
expected_schema_url,
"Merging schema_url_a {:?} with schema_url_b {:?} did not yield expected result {:?}",

Check warning on line 341 in opentelemetry-sdk/src/resource/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/resource/mod.rs#L341

Added line #L341 was not covered by tests
schema_url_a, schema_url_b, expected_schema_url
);
}

// if only one resource contains key value pairs
let resource = Resource::from_schema_url(vec![], "http://schema/a");
let other_resource = Resource::new(vec![KeyValue::new("key", "")]);

assert_eq!(resource.merge(&other_resource).schema_url, None);
assert_eq!(resource.merge(&other_resource).schema_url(), None);
}

#[test]
Expand Down