Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Logs SDK] Send resource once to processor and exporter, and not for every event. #1636

Merged
merged 34 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
57fa974
initial commit
lalitb Mar 21, 2024
63fdf18
Merge branch 'main' into set-resource-optimize
lalitb Mar 25, 2024
597a2af
more changes
lalitb Mar 27, 2024
b6a16ac
Merge branch 'main' into set-resource-optimize
lalitb Mar 27, 2024
732b9d6
lint error
lalitb Mar 27, 2024
c18499a
fix lint
lalitb Mar 27, 2024
00c0eaa
lint errors, rename LogEvent to LogData ; and LogData to LogDataWithR…
lalitb Mar 27, 2024
5361996
optimize otlp
lalitb Mar 28, 2024
a0dc96e
leftover comments
lalitb Mar 28, 2024
36d9900
propagate resource
lalitb Mar 28, 2024
7ecf106
update changelog
lalitb Mar 28, 2024
f031e29
fix doc error
lalitb Mar 28, 2024
112a053
add tests
lalitb Mar 28, 2024
becd779
Remove todo
lalitb Mar 28, 2024
aaa3c89
add empty implementation for set_resource for processor and exporter
lalitb Mar 28, 2024
71a4efd
Update opentelemetry-sdk/CHANGELOG.md
lalitb Mar 28, 2024
ac4b788
Merge branch 'main' into set-resource-optimize
lalitb Mar 28, 2024
e4d5a70
leftover commit
lalitb Mar 28, 2024
2e5a2d9
Merge branch 'main' into set-resource-optimize
lalitb Mar 29, 2024
8f14674
Merge branch 'main' into set-resource-optimize
lalitb Mar 29, 2024
de7dc51
remove redundant set_resource definitions
lalitb Mar 29, 2024
0634469
avoid log data cloning
lalitb Apr 2, 2024
18645cf
Merge branch 'main' into set-resource-optimize
lalitb Apr 4, 2024
92e4864
Merge branch 'main' into set-resource-optimize
cijothomas Apr 8, 2024
38e1bf4
Merge branch 'main' into set-resource-optimize
lalitb Apr 16, 2024
f7786e1
Merge branch 'main' into set-resource-optimize
lalitb Apr 22, 2024
b220635
remove single processor optimization
lalitb Apr 25, 2024
abc7507
Merge branch 'main' into set-resource-optimize
lalitb Apr 26, 2024
98c11b1
resolve merge conflict
lalitb Apr 26, 2024
2ce3e0f
review comment - remove set_resource overwrite method
lalitb Apr 26, 2024
0aef2fc
fix merge conflict
lalitb Apr 26, 2024
dfecf91
Merge branch 'main' into set-resource-optimize
lalitb Apr 30, 2024
896f672
fix merge conflict
lalitb Apr 30, 2024
3576450
Merge branch 'main' into set-resource-optimize
cijothomas Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl LogExporter for OtlpHttpClient {
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = self.build_logs_export_body(batch)?;
let (body, content_type) = { self.build_logs_export_body(batch, &self.resource)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
Expand Down Expand Up @@ -50,4 +50,8 @@ impl LogExporter for OtlpHttpClient {
fn shutdown(&mut self) {
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}
}
15 changes: 12 additions & 3 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::{
};
use http::{HeaderName, HeaderValue, Uri};
use opentelemetry_http::HttpClient;
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;

#[cfg(feature = "logs")]
use opentelemetry_sdk::export::logs::LogData;
#[cfg(feature = "trace")]
Expand Down Expand Up @@ -274,6 +276,9 @@ struct OtlpHttpClient {
headers: HashMap<HeaderName, HeaderValue>,
protocol: Protocol,
_timeout: Duration,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

impl OtlpHttpClient {
Expand All @@ -291,6 +296,7 @@ impl OtlpHttpClient {
headers,
protocol,
_timeout: timeout,
resource: ResourceAttributesWithSchema::default(),
}
}

Expand Down Expand Up @@ -318,12 +324,15 @@ impl OtlpHttpClient {
fn build_logs_export_body(
&self,
logs: Vec<LogData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
) -> opentelemetry::logs::LogResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = logs
.into_iter()
.map(|log_event| (log_event, resource).into())
.collect::<Vec<_>>();
let req = ExportLogsServiceRequest { resource_logs };

let req = ExportLogsServiceRequest {
resource_logs: logs.into_iter().map(Into::into).collect(),
};
match self.protocol {
#[cfg(feature = "http-json")]
Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
Expand Down
23 changes: 18 additions & 5 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use core::fmt;

use async_trait::async_trait;
use core::fmt;
use opentelemetry::logs::{LogError, LogResult};
use opentelemetry_proto::tonic::collector::logs::v1::{
logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
Expand All @@ -12,6 +11,9 @@ use super::BoxInterceptor;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
Expand Down Expand Up @@ -43,6 +45,7 @@ impl TonicLogsClient {
client,
interceptor,
}),
resource: Default::default(),
}
}
}
Expand All @@ -62,13 +65,19 @@ impl LogExporter for TonicLogsClient {
None => return Err(LogError::Other("exporter is already shut down".into())),
};

let resource_logs = {
batch
.into_iter()
.map(|log_data| (log_data, &self.resource))
.map(Into::into)
.collect()
};

client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest {
resource_logs: batch.into_iter().map(Into::into).collect(),
},
ExportLogsServiceRequest { resource_logs },
))
.await
.map_err(crate::Error::from)?;
Expand All @@ -79,4 +88,8 @@ impl LogExporter for TonicLogsClient {
fn shutdown(&mut self) {
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}
}
4 changes: 4 additions & 0 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export(&mut self, batch: Vec<LogData>) -> opentelemetry::logs::LogResult<()> {
self.client.export(batch).await
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.client.set_resource(resource);
}
}

/// Recommended configuration for an OTLP exporter pipeline.
Expand Down
19 changes: 18 additions & 1 deletion opentelemetry-proto/src/transform/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@ pub mod tonic {
use opentelemetry::{Array, Value};
use std::borrow::Cow;

#[cfg(any(feature = "trace", feature = "logs"))]
#[derive(Debug, Default)]
pub struct ResourceAttributesWithSchema {
pub attributes: Attributes,
pub schema_url: Option<String>,
}

#[cfg(any(feature = "trace", feature = "logs"))]
impl From<&opentelemetry_sdk::Resource> for ResourceAttributesWithSchema {
fn from(resource: &opentelemetry_sdk::Resource) -> Self {
ResourceAttributesWithSchema {
attributes: resource_attributes(resource),
schema_url: resource.schema_url().map(ToString::to_string),
}
}
}

#[cfg(any(feature = "trace", feature = "logs"))]
use opentelemetry_sdk::Resource;

Expand Down Expand Up @@ -52,7 +69,7 @@ pub mod tonic {
}

/// Wrapper type for Vec<`KeyValue`>
#[derive(Default)]
#[derive(Default, Debug)]
pub struct Attributes(pub ::std::vec::Vec<crate::proto::tonic::common::v1::KeyValue>);

impl From<Vec<opentelemetry::KeyValue>> for Attributes {
Expand Down
26 changes: 17 additions & 9 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod tonic {
resource::v1::Resource,
Attributes,
},
transform::common::{to_nanos, tonic::resource_attributes},
transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
};
use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};

Expand Down Expand Up @@ -110,18 +110,26 @@ pub mod tonic {
}
}

impl From<opentelemetry_sdk::export::logs::LogData> for ResourceLogs {
fn from(log_data: opentelemetry_sdk::export::logs::LogData) -> Self {
impl
From<(
opentelemetry_sdk::export::logs::LogData,
&ResourceAttributesWithSchema,
)> for ResourceLogs
{
fn from(
data: (
opentelemetry_sdk::export::logs::LogData,
&ResourceAttributesWithSchema,
),
) -> Self {
let (log_data, resource) = data;

ResourceLogs {
resource: Some(Resource {
attributes: resource_attributes(&log_data.resource).0,
attributes: resource.attributes.0.clone(),
dropped_attributes_count: 0,
}),
schema_url: log_data
.resource
.schema_url()
.map(Into::into)
.unwrap_or_default(),
schema_url: resource.schema_url.clone().unwrap(),
scope_logs: vec![ScopeLogs {
schema_url: log_data
.instrumentation
Expand Down
8 changes: 8 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
- **Breaking** [#1624](https://github.com/open-telemetry/opentelemetry-rust/pull/1624) Remove `OsResourceDetector` and
`ProcessResourceDetector` resource detectors, use the
[`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead.
- [#1636](https://github.com/open-telemetry/opentelemetry-rust/pull/1636) [Logs SDK] Improves performance by sending
Resource information to processors (and exporters) once, instead of sending with every log. If you are an author
of Processor, Exporter, the following are *BREAKING* changes.
- Implement `set_resource` method in your custom LogProcessor, which invokes exporter's `set_resource`.
- Implement `set_resource` method in your custom LogExporter. This method should save the resource object
in original or serialized format, to be merged with every log event during export.
- `LogData` doesn't have the resource attributes. The `LogExporter::export()` method needs to merge it
with the earlier preserved resource before export.
- Baggage propagation error will be reported to global error handler [#1640](https://github.com/open-telemetry/opentelemetry-rust/pull/1640)

## v0.22.1
Expand Down
11 changes: 5 additions & 6 deletions opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use opentelemetry::{
logs::{LogError, LogRecord, LogResult},
InstrumentationLibrary,
};
use std::{borrow::Cow, fmt::Debug};
use std::fmt::Debug;

/// `LogExporter` defines the interface that log exporters should implement.
#[async_trait]
Expand All @@ -21,17 +21,16 @@ pub trait LogExporter: Send + Sync + Debug {
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}
/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
}

/// `LogData` associates a [`LogRecord`] with a [`Resource`] and
/// [`InstrumentationLibrary`].
/// `LogData` represents a single log event without resource context.
#[derive(Clone, Debug)]
pub struct LogData {
/// Log record
pub record: LogRecord,
/// Resource for the emitter who produced this `LogData`.
pub resource: Cow<'static, Resource>,
/// Instrumentation details for the emitter who produced this `LogData`.
/// Instrumentation details for the emitter who produced this `LogEvent`.
pub instrumentation: InstrumentationLibrary,
}

Expand Down
13 changes: 8 additions & 5 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ impl Builder {

/// Create a new provider from this configuration.
pub fn build(self) -> LoggerProvider {
// invoke set_resource on all the processors
for processor in &self.processors {
processor.set_resource(&self.config.resource);
}
LoggerProvider {
inner: Arc::new(LoggerProviderInner {
processors: self.processors,
Expand Down Expand Up @@ -207,20 +211,19 @@ impl opentelemetry::logs::Logger for Logger {
/// Emit a `LogRecord`.
fn emit(&self, record: LogRecord) {
let provider = self.provider();
let config = provider.config();
let processors = provider.log_processors();
let trace_context = Context::map_current(|cx| {
cx.has_active_span()
.then(|| TraceContext::from(cx.span().span_context()))
});

for p in processors {
let mut record = record.clone();
let mut cloned_record = record.clone();
if let Some(ref trace_context) = trace_context {
record.trace_context = Some(trace_context.clone())
cloned_record.trace_context = Some(trace_context.clone());
}
let data = LogData {
record,
resource: config.resource.clone(),
record: cloned_record,
instrumentation: self.instrumentation_library().clone(),
};
p.emit(data);
Expand Down