Skip to content

Commit

Permalink
[Logs SDK] Send resource once to processor and exporter, and not for …
Browse files Browse the repository at this point in the history
…every event. (#1636)
  • Loading branch information
lalitb committed Apr 30, 2024
1 parent 213fa3f commit 8c60749
Show file tree
Hide file tree
Showing 13 changed files with 254 additions and 50 deletions.
6 changes: 5 additions & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl LogExporter for OtlpHttpClient {
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = self.build_logs_export_body(batch)?;
let (body, content_type) = { self.build_logs_export_body(batch, &self.resource)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
Expand Down Expand Up @@ -50,4 +50,8 @@ impl LogExporter for OtlpHttpClient {
fn shutdown(&mut self) {
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}
}
15 changes: 12 additions & 3 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::{
};
use http::{HeaderName, HeaderValue, Uri};
use opentelemetry_http::HttpClient;
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;

#[cfg(feature = "logs")]
use opentelemetry_sdk::export::logs::LogData;
#[cfg(feature = "trace")]
Expand Down Expand Up @@ -274,6 +276,9 @@ struct OtlpHttpClient {
headers: HashMap<HeaderName, HeaderValue>,
protocol: Protocol,
_timeout: Duration,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

impl OtlpHttpClient {
Expand All @@ -291,6 +296,7 @@ impl OtlpHttpClient {
headers,
protocol,
_timeout: timeout,
resource: ResourceAttributesWithSchema::default(),
}
}

Expand Down Expand Up @@ -318,12 +324,15 @@ impl OtlpHttpClient {
fn build_logs_export_body(
&self,
logs: Vec<LogData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
) -> opentelemetry::logs::LogResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = logs
.into_iter()
.map(|log_event| (log_event, resource).into())
.collect::<Vec<_>>();
let req = ExportLogsServiceRequest { resource_logs };

let req = ExportLogsServiceRequest {
resource_logs: logs.into_iter().map(Into::into).collect(),
};
match self.protocol {
#[cfg(feature = "http-json")]
Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
Expand Down
23 changes: 18 additions & 5 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use core::fmt;

use async_trait::async_trait;
use core::fmt;
use opentelemetry::logs::{LogError, LogResult};
use opentelemetry_proto::tonic::collector::logs::v1::{
logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
Expand All @@ -12,6 +11,9 @@ use super::BoxInterceptor;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
Expand Down Expand Up @@ -43,6 +45,7 @@ impl TonicLogsClient {
client,
interceptor,
}),
resource: Default::default(),
}
}
}
Expand All @@ -62,13 +65,19 @@ impl LogExporter for TonicLogsClient {
None => return Err(LogError::Other("exporter is already shut down".into())),
};

let resource_logs = {
batch
.into_iter()
.map(|log_data| (log_data, &self.resource))
.map(Into::into)
.collect()
};

client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest {
resource_logs: batch.into_iter().map(Into::into).collect(),
},
ExportLogsServiceRequest { resource_logs },
))
.await
.map_err(crate::Error::from)?;
Expand All @@ -79,4 +88,8 @@ impl LogExporter for TonicLogsClient {
fn shutdown(&mut self) {
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}
}
4 changes: 4 additions & 0 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export(&mut self, batch: Vec<LogData>) -> opentelemetry::logs::LogResult<()> {
self.client.export(batch).await
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.client.set_resource(resource);
}
}

/// Recommended configuration for an OTLP exporter pipeline.
Expand Down
19 changes: 18 additions & 1 deletion opentelemetry-proto/src/transform/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@ pub mod tonic {
use opentelemetry::{Array, Value};
use std::borrow::Cow;

#[cfg(any(feature = "trace", feature = "logs"))]
#[derive(Debug, Default)]
pub struct ResourceAttributesWithSchema {
pub attributes: Attributes,
pub schema_url: Option<String>,
}

#[cfg(any(feature = "trace", feature = "logs"))]
impl From<&opentelemetry_sdk::Resource> for ResourceAttributesWithSchema {
fn from(resource: &opentelemetry_sdk::Resource) -> Self {
ResourceAttributesWithSchema {
attributes: resource_attributes(resource),
schema_url: resource.schema_url().map(ToString::to_string),
}
}
}

#[cfg(any(feature = "trace", feature = "logs"))]
use opentelemetry_sdk::Resource;

Expand Down Expand Up @@ -52,7 +69,7 @@ pub mod tonic {
}

/// Wrapper type for Vec<`KeyValue`>
#[derive(Default)]
#[derive(Default, Debug)]
pub struct Attributes(pub ::std::vec::Vec<crate::proto::tonic::common::v1::KeyValue>);

impl From<Vec<opentelemetry::KeyValue>> for Attributes {
Expand Down
26 changes: 17 additions & 9 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod tonic {
resource::v1::Resource,
Attributes,
},
transform::common::{to_nanos, tonic::resource_attributes},
transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
};
use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};

Expand Down Expand Up @@ -110,18 +110,26 @@ pub mod tonic {
}
}

impl From<opentelemetry_sdk::export::logs::LogData> for ResourceLogs {
fn from(log_data: opentelemetry_sdk::export::logs::LogData) -> Self {
impl
From<(
opentelemetry_sdk::export::logs::LogData,
&ResourceAttributesWithSchema,
)> for ResourceLogs
{
fn from(
data: (
opentelemetry_sdk::export::logs::LogData,
&ResourceAttributesWithSchema,
),
) -> Self {
let (log_data, resource) = data;

ResourceLogs {
resource: Some(Resource {
attributes: resource_attributes(&log_data.resource).0,
attributes: resource.attributes.0.clone(),
dropped_attributes_count: 0,
}),
schema_url: log_data
.resource
.schema_url()
.map(Into::into)
.unwrap_or_default(),
schema_url: resource.schema_url.clone().unwrap_or_default(),
scope_logs: vec![ScopeLogs {
schema_url: log_data
.instrumentation
Expand Down
8 changes: 8 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
- **Breaking** [#1624](https://github.com/open-telemetry/opentelemetry-rust/pull/1624) Remove `OsResourceDetector` and
`ProcessResourceDetector` resource detectors, use the
[`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead.
- [#1636](https://github.com/open-telemetry/opentelemetry-rust/pull/1636) [Logs SDK] Improves performance by sending
Resource information to processors (and exporters) once, instead of sending with every log. If you are an author
of Processor, Exporter, the following are *BREAKING* changes.
- Implement `set_resource` method in your custom LogProcessor, which invokes exporter's `set_resource`.
- Implement `set_resource` method in your custom LogExporter. This method should save the resource object
in original or serialized format, to be merged with every log event during export.
- `LogData` doesn't have the resource attributes. The `LogExporter::export()` method needs to merge it
with the earlier preserved resource before export.
- Baggage propagation error will be reported to global error handler [#1640](https://github.com/open-telemetry/opentelemetry-rust/pull/1640)
- Improves `shutdown` behavior of `LoggerProvider` and `LogProcessor` [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643).
- `shutdown` can be called by any clone of the `LoggerProvider` without the need of waiting on all `Logger` drops. Thus, `try_shutdown` has been removed.
Expand Down
11 changes: 5 additions & 6 deletions opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use opentelemetry::{
logs::{LogError, LogRecord, LogResult},
InstrumentationLibrary,
};
use std::{borrow::Cow, fmt::Debug};
use std::fmt::Debug;

/// `LogExporter` defines the interface that log exporters should implement.
#[async_trait]
Expand All @@ -21,17 +21,16 @@ pub trait LogExporter: Send + Sync + Debug {
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}
/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
}

/// `LogData` associates a [`LogRecord`] with a [`Resource`] and
/// [`InstrumentationLibrary`].
/// `LogData` represents a single log event without resource context.
#[derive(Clone, Debug)]
pub struct LogData {
/// Log record
pub record: LogRecord,
/// Resource for the emitter who produced this `LogData`.
pub resource: Cow<'static, Resource>,
/// Instrumentation details for the emitter who produced this `LogData`.
/// Instrumentation details for the emitter who produced this `LogEvent`.
pub instrumentation: InstrumentationLibrary,
}

Expand Down
13 changes: 8 additions & 5 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ impl Builder {

/// Create a new provider from this configuration.
pub fn build(self) -> LoggerProvider {
// invoke set_resource on all the processors
for processor in &self.processors {
processor.set_resource(&self.config.resource);
}
LoggerProvider {
inner: Arc::new(LoggerProviderInner {
processors: self.processors,
Expand Down Expand Up @@ -221,20 +225,19 @@ impl opentelemetry::logs::Logger for Logger {
/// Emit a `LogRecord`.
fn emit(&self, record: LogRecord) {
let provider = self.provider();
let config = provider.config();
let processors = provider.log_processors();
let trace_context = Context::map_current(|cx| {
cx.has_active_span()
.then(|| TraceContext::from(cx.span().span_context()))
});

for p in processors {
let mut record = record.clone();
let mut cloned_record = record.clone();
if let Some(ref trace_context) = trace_context {
record.trace_context = Some(trace_context.clone())
cloned_record.trace_context = Some(trace_context.clone());
}
let data = LogData {
record,
resource: config.resource.clone(),
record: cloned_record,
instrumentation: self.instrumentation_library().clone(),
};
p.emit(data);
Expand Down

0 comments on commit 8c60749

Please sign in to comment.