Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Resource Sharing Across Exporters with Arc Implementation #1526

Merged
merged 18 commits into from Feb 15, 2024
Merged
3 changes: 3 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Expand Up @@ -8,6 +8,9 @@
- [#1471](https://github.com/open-telemetry/opentelemetry-rust/pull/1471) Configure batch log record processor via [`OTEL_BLRP_*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/configuration/sdk-environment-variables.md#batch-logrecord-processor) environment variables and via `OtlpLogPipeline::with_batch_config`
- [#1503](https://github.com/open-telemetry/opentelemetry-rust/pull/1503) Make the documentation for In-Memory exporters visible.

- [#1526](https://github.com/open-telemetry/opentelemetry-rust/pull/1526)
Performance Improvement : Creating Spans and LogRecords are now faster, by avoiding expensive cloning of `Resource` for every Span/LogRecord.

### Changed

- **Breaking**
Expand Down
193 changes: 103 additions & 90 deletions opentelemetry-sdk/src/resource/mod.rs
Expand Up @@ -34,13 +34,22 @@
use std::borrow::Cow;
use std::collections::{hash_map, HashMap};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

/// Inner structure of `Resource` holding the actual data.
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
/// This structure is designed to be shared among `Resource` instances via `Arc`.
#[derive(Debug, Clone, PartialEq)]
struct ResourceInner {
attrs: HashMap<Key, Value>,
schema_url: Option<Cow<'static, str>>,
}

/// An immutable representation of the entity producing telemetry as attributes.
/// Utilizes `Arc` for efficient sharing and cloning.
#[derive(Clone, Debug, PartialEq)]
pub struct Resource {
attrs: HashMap<Key, Value>,
schema_url: Option<Cow<'static, str>>,
inner: Arc<ResourceInner>,
}

impl Default for Resource {
Expand All @@ -58,10 +67,13 @@

impl Resource {
/// Creates an empty resource.
/// This is the basic constructor that initializes a resource with no attributes and no schema URL.
pub fn empty() -> Self {
Self {
attrs: Default::default(),
schema_url: None,
Resource {
inner: Arc::new(ResourceInner {
attrs: HashMap::new(),
schema_url: None,
}),
}
}

Expand All @@ -70,13 +82,17 @@
/// Values are de-duplicated by key, and the first key-value pair with a non-empty string value
/// will be retained
pub fn new<T: IntoIterator<Item = KeyValue>>(kvs: T) -> Self {
let mut resource = Resource::empty();

for kv in kvs.into_iter() {
resource.attrs.insert(kv.key, kv.value);
let mut attrs = HashMap::new();
for kv in kvs {
attrs.insert(kv.key, kv.value);
}

resource
Resource {
inner: Arc::new(ResourceInner {
attrs,
schema_url: None,
}),
}
}

/// Create a new `Resource` from a key value pairs and [schema url].
Expand All @@ -92,9 +108,22 @@
KV: IntoIterator<Item = KeyValue>,
S: Into<Cow<'static, str>>,
{
let mut resource = Self::new(kvs);
resource.schema_url = Some(schema_url.into());
resource
let schema_url_str = schema_url.into();
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
let normalized_schema_url = if schema_url_str.is_empty() {
None
} else {
Some(schema_url_str)
};
let mut attrs = HashMap::new();
for kv in kvs {
attrs.insert(kv.key, kv.value);
}
Resource {
inner: Arc::new(ResourceInner {
attrs,
schema_url: normalized_schema_url,
}),
}
}

/// Create a new `Resource` from resource detectors.
Expand All @@ -104,9 +133,12 @@
let mut resource = Resource::empty();
for detector in detectors {
let detected_res = detector.detect(timeout);
// This call ensures that if the Arc is not uniquely owned,
// the data is cloned before modification, preserving safety.
// If the Arc is uniquely owned, it simply returns a mutable reference to the data.
let inner = Arc::make_mut(&mut resource.inner);
for (key, value) in detected_res.into_iter() {
// using insert instead of merge to avoid clone.
resource.attrs.insert(key, value);
inner.attrs.insert(Key::new(key.clone()), value.clone());
}
}

Expand All @@ -129,87 +161,62 @@
///
/// [Schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url
pub fn merge<T: Deref<Target = Self>>(&self, other: T) -> Self {
if self.attrs.is_empty() {
if self.is_empty() {
return other.clone();
}
if other.attrs.is_empty() {
if other.is_empty() {
return self.clone();
}

let mut resource = Resource::empty();

// attrs from self take the less priority, even when the new value is empty.
for (k, v) in self.attrs.iter() {
resource.attrs.insert(k.clone(), v.clone());
}
for (k, v) in other.attrs.iter() {
resource.attrs.insert(k.clone(), v.clone());
let mut combined_attrs = self.inner.attrs.clone();
for (k, v) in other.inner.attrs.iter() {
combined_attrs.insert(k.clone(), v.clone());
}

if self.schema_url == other.schema_url {
resource.schema_url = self.schema_url.clone();
} else if self.schema_url.is_none() {
// if the other resource has schema url, use it.
if other.schema_url.is_some() {
resource.schema_url = other.schema_url.clone();
}
// else empty schema url.
} else {
// if self has schema url, use it.
if other.schema_url.is_none() {
resource.schema_url = self.schema_url.clone();
}
// Resolve the schema URL according to the precedence rules
let combined_schema_url = match (&self.inner.schema_url, &other.inner.schema_url) {
// If both resources have a schema URL and it's the same, use it
(Some(url1), Some(url2)) if url1 == url2 => Some(url1.clone()),
// If both resources have a schema URL but they are not the same, the schema URL will be empty
(Some(_), Some(_)) => None,
// If this resource does not have a schema URL, and the other resource has a schema URL, it will be used
(None, Some(url)) => Some(url.clone()),
// If this resource has a schema URL, it will be used (covers case 1 and any other cases where `self` has a schema URL)
(Some(url), _) => Some(url.clone()),
// If both resources do not have a schema URL, the schema URL will be empty
(None, None) => None,
};
Resource {
inner: Arc::new(ResourceInner {
attrs: combined_attrs,
schema_url: combined_schema_url,
}),
}

resource
}

/// Return the [schema url] of the resource. If the resource does not have a schema url, return `None`.
///
/// [schema url]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.9.0/specification/schemas/overview.md#schema-url
pub fn schema_url(&self) -> Option<&str> {
self.schema_url.as_ref().map(|s| s.as_ref())
self.inner.schema_url.as_ref().map(|s| s.as_ref())
}

/// Returns the number of attributes for this resource
pub fn len(&self) -> usize {
self.attrs.len()
self.inner.attrs.len()
}

/// Returns `true` if the resource contains no attributes.
pub fn is_empty(&self) -> bool {
self.attrs.is_empty()
self.inner.attrs.is_empty()
}

/// Gets an iterator over the attributes of this resource, sorted by key.
pub fn iter(&self) -> Iter<'_> {
self.into_iter()
Iter(self.inner.attrs.iter())
}

/// Retrieve the value from resource associate with given key.
pub fn get(&self, key: Key) -> Option<Value> {
self.attrs.get(&key).cloned()
}
}

/// An owned iterator over the entries of a `Resource`.
#[derive(Debug)]
pub struct IntoIter(hash_map::IntoIter<Key, Value>);

impl Iterator for IntoIter {
type Item = (Key, Value);

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl IntoIterator for Resource {
type Item = (Key, Value);
type IntoIter = IntoIter;

fn into_iter(self) -> Self::IntoIter {
IntoIter(self.attrs.into_iter())
self.inner.attrs.get(&key).cloned()
}
}

Expand All @@ -230,7 +237,7 @@
type IntoIter = Iter<'a>;

fn into_iter(self) -> Self::IntoIter {
Iter(self.attrs.iter())
Iter(self.inner.attrs.iter())
}
}

Expand Down Expand Up @@ -264,13 +271,10 @@
let mut expected_attrs = HashMap::new();
expected_attrs.insert(Key::new("a"), Value::from("final"));

assert_eq!(
Resource::new(args_with_dupe_keys),
Resource {
attrs: expected_attrs,
schema_url: None,
}
);
let resource = Resource::new(args_with_dupe_keys);
let resource_inner = Arc::try_unwrap(resource.inner).expect("Failed to unwrap Arc");
assert_eq!(resource_inner.attrs, expected_attrs);
assert_eq!(resource_inner.schema_url, None);
}

#[test]
Expand All @@ -293,13 +297,14 @@
expected_attrs.insert(Key::new("c"), Value::from("c-value"));
expected_attrs.insert(Key::new("d"), Value::from(""));

assert_eq!(
resource_a.merge(&resource_b),
Resource {
let expected_resource = Resource {
inner: Arc::new(ResourceInner {
attrs: expected_attrs,
schema_url: None,
}
);
schema_url: None, // Assuming schema_url handling if needed
}),
};

assert_eq!(resource_a.merge(&resource_b), expected_resource);
}

#[test]
Expand All @@ -317,24 +322,32 @@
(None, None, None),
];

for (schema_url, other_schema_url, expect_schema_url) in test_cases.into_iter() {
let mut resource = Resource::new(vec![KeyValue::new("key", "")]);
resource.schema_url = schema_url.map(Into::into);
for (schema_url_a, schema_url_b, expected_schema_url) in test_cases.into_iter() {
let resource_a = Resource::from_schema_url(
vec![KeyValue::new("key", "")],
schema_url_a.unwrap_or(""),
);
let resource_b = Resource::from_schema_url(
vec![KeyValue::new("key", "")],
schema_url_b.unwrap_or(""),
);

let mut other_resource = Resource::new(vec![KeyValue::new("key", "")]);
other_resource.schema_url = other_schema_url.map(Into::into);
let merged_resource = resource_a.merge(&resource_b);
let result_schema_url = merged_resource.schema_url();

assert_eq!(
resource.merge(&other_resource).schema_url,
expect_schema_url.map(Into::into)
result_schema_url.map(|s| s as &str),
expected_schema_url,
"Merging schema_url_a {:?} with schema_url_b {:?} did not yield expected result {:?}",

Check warning on line 341 in opentelemetry-sdk/src/resource/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/resource/mod.rs#L341

Added line #L341 was not covered by tests
schema_url_a, schema_url_b, expected_schema_url
);
}

// if only one resource contains key value pairs
let resource = Resource::from_schema_url(vec![], "http://schema/a");
let other_resource = Resource::new(vec![KeyValue::new("key", "")]);

assert_eq!(resource.merge(&other_resource).schema_url, None);
assert_eq!(resource.merge(&other_resource).schema_url(), None);
}

#[test]
Expand Down