diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 21f8fbb0ed..bb69b06238 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -42,6 +42,7 @@ impl OtlpPipeline { OtlpLogPipeline { log_config: None, exporter_builder: NoExporterConfig(()), + batch_config: None, } } } @@ -124,6 +125,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { pub struct OtlpLogPipeline { exporter_builder: EB, log_config: Option, + batch_config: Option, } impl OtlpLogPipeline { @@ -132,6 +134,12 @@ impl OtlpLogPipeline { self.log_config = Some(log_config); self } + + /// Set the batch log processor configuration, and it will override the env vars. + pub fn with_batch_config(mut self, batch_config: opentelemetry_sdk::logs::BatchConfig) -> Self { + self.batch_config = Some(batch_config); + self + } } impl OtlpLogPipeline { @@ -143,6 +151,7 @@ impl OtlpLogPipeline { OtlpLogPipeline { exporter_builder: pipeline.into(), log_config: self.log_config, + batch_config: self.batch_config, } } } @@ -160,7 +169,7 @@ impl OtlpLogPipeline { )) } - /// Install the configured log exporter and a batch span processor using the + /// Install the configured log exporter and a batch log processor using the /// specified runtime. /// /// Returns a [`Logger`] with the name `opentelemetry-otlp` and the current crate version. @@ -174,6 +183,7 @@ impl OtlpLogPipeline { self.exporter_builder.build_log_exporter()?, self.log_config, runtime, + self.batch_config, )) } } @@ -202,9 +212,14 @@ fn build_batch_with_exporter( exporter: LogExporter, log_config: Option, runtime: R, + batch_config: Option, ) -> opentelemetry_sdk::logs::Logger { - let mut provider_builder = - opentelemetry_sdk::logs::LoggerProvider::builder().with_batch_exporter(exporter, runtime); + let mut provider_builder = opentelemetry_sdk::logs::LoggerProvider::builder(); + let batch_processor = opentelemetry_sdk::logs::BatchLogProcessor::builder(exporter, runtime) + .with_batch_config(batch_config.unwrap_or_default()) + .build(); + provider_builder = provider_builder.with_log_processor(batch_processor); + if let Some(config) = log_config { provider_builder = provider_builder.with_config(config); } diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 363e71e5d7..a68a812d9c 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -5,6 +5,7 @@ ### Added - [#1410](https://github.com/open-telemetry/opentelemetry-rust/pull/1410) Add experimental synchronous gauge +- [#1471](https://github.com/open-telemetry/opentelemetry-rust/pull/1471) Configure batch log record processor via [`OTEL_BLRP_*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/configuration/sdk-environment-variables.md#batch-logrecord-processor) environment variables and via `OtlpLogPipeline::with_batch_config` ### Changed diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index cc3ffd5e13..f62e68413b 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -13,12 +13,30 @@ use opentelemetry::{ global, logs::{LogError, LogResult}, }; -use std::sync::Mutex; +use std::{env, sync::Mutex}; use std::{ fmt::{self, Debug, Formatter}, + str::FromStr, time::Duration, }; +/// Delay interval between two consecutive exports. +const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY"; +/// Default delay interval between two consecutive exports. +const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: u64 = 1_000; +/// Maximum allowed time to export data. +const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT"; +/// Default maximum allowed time to export data. +const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000; +/// Maximum queue size. +const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE"; +/// Default maximum queue size. +const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048; +/// Maximum batch size, must be less than or equal to OTEL_BLRP_MAX_QUEUE_SIZE. +const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"; +/// Default maximum batch size. +const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; + /// The interface for plugging into a [`Logger`]. /// /// [`Logger`]: crate::logs::Logger @@ -281,12 +299,87 @@ pub struct BatchConfig { impl Default for BatchConfig { fn default() -> Self { - BatchConfig { - max_queue_size: 2_048, - scheduled_delay: Duration::from_millis(1_000), - max_export_batch_size: 512, - max_export_timeout: Duration::from_millis(30_000), + let mut config = BatchConfig { + max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, + scheduled_delay: Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT), + max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, + max_export_timeout: Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT), + }; + + if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE) + .ok() + .and_then(|queue_size| usize::from_str(&queue_size).ok()) + { + config.max_queue_size = max_queue_size; + } + + if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE) + .ok() + .and_then(|batch_size| usize::from_str(&batch_size).ok()) + { + config.max_export_batch_size = max_export_batch_size; + } + + // max export batch size must be less or equal to max queue size. + // we set max export batch size to max queue size if it's larger than max queue size. + if config.max_export_batch_size > config.max_queue_size { + config.max_export_batch_size = config.max_queue_size; } + + if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY) + .ok() + .or_else(|| env::var("OTEL_BLRP_SCHEDULE_DELAY_MILLIS").ok()) + .and_then(|delay| u64::from_str(&delay).ok()) + { + config.scheduled_delay = Duration::from_millis(scheduled_delay); + } + + if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT) + .ok() + .or_else(|| env::var("OTEL_BLRP_EXPORT_TIMEOUT_MILLIS").ok()) + .and_then(|s| u64::from_str(&s).ok()) + { + config.max_export_timeout = Duration::from_millis(max_export_timeout); + } + + config + } +} + +impl BatchConfig { + /// Set max_queue_size for [`BatchConfig`]. + /// It's the maximum queue size to buffer logs for delayed processing. + /// If the queue gets full it will drop the logs. + /// The default value of is 2048. + pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self { + self.max_queue_size = max_queue_size; + self + } + + /// Set scheduled_delay for [`BatchConfig`]. + /// It's the delay interval in milliseconds between two consecutive processing of batches. + /// The default value is 1000 milliseconds. + pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self { + self.scheduled_delay = scheduled_delay; + self + } + + /// Set max_export_timeout for [`BatchConfig`]. + /// It's the maximum duration to export a batch of data. + /// The default value is 30000 milliseconds. + pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self { + self.max_export_timeout = max_export_timeout; + self + } + + /// Set max_export_batch_size for [`BatchConfig`]. + /// It's the maximum number of logs to process in a single batch. If there are + /// more than one batch worth of logs then it processes multiple batches + /// of logs one batch after the other without any delay. + /// The default value is 512. + pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self { + self.max_export_batch_size = max_export_batch_size; + self } } @@ -342,6 +435,11 @@ where BatchLogProcessorBuilder { config, ..self } } + /// Set the BatchConfig for [`BatchLogProcessorBuilder`] + pub fn with_batch_config(self, config: BatchConfig) -> Self { + BatchLogProcessorBuilder { config, ..self } + } + /// Build a batch processor pub fn build(self) -> BatchLogProcessor { BatchLogProcessor::new(Box::new(self.exporter), self.config, self.runtime) @@ -360,3 +458,201 @@ enum BatchMessage { /// Shut down the worker thread, push all logs in buffer to the backend. Shutdown(oneshot::Sender), } + +#[cfg(all(test, feature = "testing", feature = "logs"))] +mod tests { + use super::{ + BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, + }; + use crate::{ + logs::{ + log_processor::{ + OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, + OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, + }, + BatchConfig, + }, + runtime, + testing::logs::InMemoryLogsExporter, + }; + use std::time::Duration; + + #[test] + fn test_default_const_values() { + assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY"); + assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, 1_000); + assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT"); + assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, 30_000); + assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE"); + assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048); + assert_eq!( + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE" + ); + assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512); + } + + #[test] + fn test_default_batch_config_adheres_to_specification() { + let config = BatchConfig::default(); + + assert_eq!( + config.scheduled_delay, + Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT) + ); + assert_eq!( + config.max_export_timeout, + Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT) + ); + assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT); + assert_eq!( + config.max_export_batch_size, + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT + ); + } + + #[test] + fn test_batch_config_configurable_by_env_vars() { + let env_vars = vec![ + (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")), + (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")), + (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")), + (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")), + ]; + + let config = temp_env::with_vars(env_vars, BatchConfig::default); + + assert_eq!(config.scheduled_delay, Duration::from_millis(2000)); + assert_eq!(config.max_export_timeout, Duration::from_millis(60000)); + assert_eq!(config.max_queue_size, 4096); + assert_eq!(config.max_export_batch_size, 1024); + } + + #[test] + fn test_batch_config_configurable_by_env_vars_millis() { + let env_vars = vec![ + ("OTEL_BLRP_SCHEDULE_DELAY_MILLIS", Some("3000")), + ("OTEL_BLRP_EXPORT_TIMEOUT_MILLIS", Some("70000")), + ]; + + let config = temp_env::with_vars(env_vars, BatchConfig::default); + + assert_eq!(config.scheduled_delay, Duration::from_millis(3000)); + assert_eq!(config.max_export_timeout, Duration::from_millis(70000)); + assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT); + assert_eq!( + config.max_export_batch_size, + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT + ); + } + + #[test] + fn test_batch_config_configurable_by_env_vars_precedence() { + let env_vars = vec![ + (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")), + ("OTEL_BLRP_SCHEDULE_DELAY_MILLIS", Some("3000")), + (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")), + ("OTEL_BLRP_EXPORT_TIMEOUT_MILLIS", Some("70000")), + ]; + + let config = temp_env::with_vars(env_vars, BatchConfig::default); + + assert_eq!(config.scheduled_delay, Duration::from_millis(2000)); + assert_eq!(config.max_export_timeout, Duration::from_millis(60000)); + assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT); + assert_eq!( + config.max_export_batch_size, + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT + ); + } + + #[test] + fn test_batch_config_max_export_batch_size_validation() { + let env_vars = vec![ + (OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")), + (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")), + ]; + + let config = temp_env::with_vars(env_vars, BatchConfig::default); + + assert_eq!(config.max_queue_size, 256); + assert_eq!(config.max_export_batch_size, 256); + assert_eq!( + config.scheduled_delay, + Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT) + ); + assert_eq!( + config.max_export_timeout, + Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT) + ); + } + + #[test] + fn test_batch_config_with_fields() { + let batch = BatchConfig::default() + .with_max_export_batch_size(1) + .with_scheduled_delay(Duration::from_millis(2)) + .with_max_export_timeout(Duration::from_millis(3)) + .with_max_queue_size(4); + + assert_eq!(batch.max_export_batch_size, 1); + assert_eq!(batch.scheduled_delay, Duration::from_millis(2)); + assert_eq!(batch.max_export_timeout, Duration::from_millis(3)); + assert_eq!(batch.max_queue_size, 4); + } + + #[test] + fn test_build_batch_log_processor_builder() { + let mut env_vars = vec![ + (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")), + (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")), + (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")), + ]; + temp_env::with_vars(env_vars.clone(), || { + let builder = + BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio); + + assert_eq!(builder.config.max_export_batch_size, 500); + assert_eq!( + builder.config.scheduled_delay, + Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT) + ); + assert_eq!( + builder.config.max_queue_size, + OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT + ); + assert_eq!( + builder.config.max_export_timeout, + Duration::from_millis(2046) + ); + }); + + env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120"))); + + temp_env::with_vars(env_vars, || { + let builder = + BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio); + assert_eq!(builder.config.max_export_batch_size, 120); + assert_eq!(builder.config.max_queue_size, 120); + }); + } + + #[test] + fn test_build_batch_log_processor_builder_with_custom_config() { + let expected = BatchConfig::default() + .with_max_export_batch_size(1) + .with_scheduled_delay(Duration::from_millis(2)) + .with_max_export_timeout(Duration::from_millis(3)) + .with_max_queue_size(4); + + let builder = BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio) + .with_batch_config(expected); + + let actual = &builder.config; + assert_eq!(actual.max_export_batch_size, 1); + assert_eq!(actual.scheduled_delay, Duration::from_millis(2)); + assert_eq!(actual.max_export_timeout, Duration::from_millis(3)); + assert_eq!(actual.max_queue_size, 4); + } +}