Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose log batchconfig #1471

Merged
merged 13 commits into from
Jan 22, 2024
19 changes: 17 additions & 2 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
OtlpLogPipeline {
log_config: None,
exporter_builder: NoExporterConfig(()),
batch_config: None,

Check warning on line 45 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L45

Added line #L45 was not covered by tests
}
}
}
Expand Down Expand Up @@ -124,6 +125,7 @@
pub struct OtlpLogPipeline<EB> {
exporter_builder: EB,
log_config: Option<opentelemetry_sdk::logs::Config>,
batch_config: Option<opentelemetry_sdk::logs::BatchConfig>,
}

impl<EB> OtlpLogPipeline<EB> {
Expand All @@ -132,6 +134,12 @@
self.log_config = Some(log_config);
self
}

/// Set the batch span processor configuration, and it will override the env vars.
CosminLazar marked this conversation as resolved.
Show resolved Hide resolved
pub fn with_batch_config(mut self, batch_config: opentelemetry_sdk::logs::BatchConfig) -> Self {
self.batch_config = Some(batch_config);
self
}

Check warning on line 142 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L139-L142

Added lines #L139 - L142 were not covered by tests
}

impl OtlpLogPipeline<NoExporterConfig> {
Expand All @@ -143,6 +151,7 @@
OtlpLogPipeline {
exporter_builder: pipeline.into(),
log_config: self.log_config,
batch_config: self.batch_config,

Check warning on line 154 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L154

Added line #L154 was not covered by tests
}
}
}
Expand Down Expand Up @@ -174,6 +183,7 @@
self.exporter_builder.build_log_exporter()?,
self.log_config,
runtime,
self.batch_config,

Check warning on line 186 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L186

Added line #L186 was not covered by tests
))
}
}
Expand Down Expand Up @@ -202,9 +212,14 @@
exporter: LogExporter,
log_config: Option<opentelemetry_sdk::logs::Config>,
runtime: R,
batch_config: Option<opentelemetry_sdk::logs::BatchConfig>,

Check warning on line 215 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L215

Added line #L215 was not covered by tests
) -> opentelemetry_sdk::logs::Logger {
let mut provider_builder =
opentelemetry_sdk::logs::LoggerProvider::builder().with_batch_exporter(exporter, runtime);
let mut provider_builder = opentelemetry_sdk::logs::LoggerProvider::builder();
let batch_processor = opentelemetry_sdk::logs::BatchLogProcessor::builder(exporter, runtime)
.with_batch_config(batch_config.unwrap_or_default())
.build();
provider_builder = provider_builder.with_log_processor(batch_processor);

Check warning on line 221 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L217-L221

Added lines #L217 - L221 were not covered by tests

if let Some(config) = log_config {
provider_builder = provider_builder.with_config(config);
}
Expand Down
290 changes: 284 additions & 6 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,30 @@
global,
logs::{LogError, LogResult},
};
use std::sync::Mutex;
use std::{env, sync::Mutex};
use std::{
fmt::{self, Debug, Formatter},
str::FromStr,
time::Duration,
};

/// Delay interval between two consecutive exports.
const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
/// Default delay interval between two consecutive exports.
const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: u64 = 1_000;
/// Maximum allowed time to export data.
const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT";
/// Default maximum allowed time to export data.
const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
/// Maximum queue size.
const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE";
/// Default maximum queue size.
const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
/// Maximum batch size, must be less than or equal to OTEL_BLRP_MAX_QUEUE_SIZE.
const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
/// Default maximum batch size.
const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;

/// The interface for plugging into a [`Logger`].
///
/// [`Logger`]: crate::logs::Logger
Expand Down Expand Up @@ -281,12 +299,87 @@

impl Default for BatchConfig {
fn default() -> Self {
BatchConfig {
max_queue_size: 2_048,
scheduled_delay: Duration::from_millis(1_000),
max_export_batch_size: 512,
max_export_timeout: Duration::from_millis(30_000),
let mut config = BatchConfig {
max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
scheduled_delay: Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT),
max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
max_export_timeout: Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT),
};

if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
.ok()
.and_then(|queue_size| usize::from_str(&queue_size).ok())
{
config.max_queue_size = max_queue_size;
}

if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
.ok()
.and_then(|batch_size| usize::from_str(&batch_size).ok())
{
config.max_export_batch_size = max_export_batch_size;
}

// max export batch size must be less or equal to max queue size.
// we set max export batch size to max queue size if it's larger than max queue size.
if config.max_export_batch_size > config.max_queue_size {
config.max_export_batch_size = config.max_queue_size;
}

if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
.ok()
.or_else(|| env::var("OTEL_BLRP_SCHEDULE_DELAY_MILLIS").ok())
.and_then(|delay| u64::from_str(&delay).ok())
{
config.scheduled_delay = Duration::from_millis(scheduled_delay);
}

if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
.ok()
.or_else(|| env::var("OTEL_BLRP_EXPORT_TIMEOUT_MILLIS").ok())
.and_then(|s| u64::from_str(&s).ok())
{
config.max_export_timeout = Duration::from_millis(max_export_timeout);
}

config
}
}

impl BatchConfig {
/// Set max_queue_size for [`BatchConfig`].
/// It's the maximum queue size to buffer logs for delayed processing.
/// If the queue gets full it will drop the logs.
/// The default value of is 2048.
pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
self.max_queue_size = max_queue_size;
self
}

/// Set scheduled_delay for [`BatchConfig`].
/// It's the delay interval in milliseconds between two consecutive processing of batches.
/// The default value is 1000 milliseconds.
pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
self.scheduled_delay = scheduled_delay;
self
}

/// Set max_export_timeout for [`BatchConfig`].
/// It's the maximum duration to export a batch of data.
/// The default value is 30000 milliseconds.
pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
self.max_export_timeout = max_export_timeout;
self
}

/// Set max_export_batch_size for [`BatchConfig`].
/// It's the maximum number of logs to process in a single batch. If there are
/// more than one batch worth of logs then it processes multiple batches
/// of logs one batch after the other without any delay.
/// The default value is 512.
pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
self.max_export_batch_size = max_export_batch_size;
self
}
}

Expand Down Expand Up @@ -342,6 +435,11 @@
BatchLogProcessorBuilder { config, ..self }
}

/// Set the BatchConfig for [`BatchLogProcessorBuilder`]
pub fn with_batch_config(self, config: BatchConfig) -> Self {
BatchLogProcessorBuilder { config, ..self }
}

Check warning on line 441 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L439-L441

Added lines #L439 - L441 were not covered by tests

/// Build a batch processor
pub fn build(self) -> BatchLogProcessor<R> {
BatchLogProcessor::new(Box::new(self.exporter), self.config, self.runtime)
Expand All @@ -360,3 +458,183 @@
/// Shut down the worker thread, push all logs in buffer to the backend.
Shutdown(oneshot::Sender<ExportResult>),
}

#[cfg(all(test, feature = "testing", feature = "logs"))]
mod tests {
use super::{
BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY,
};
use crate::{
logs::{
log_processor::{
OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
},
BatchConfig,
},
runtime,
testing::logs::InMemoryLogsExporter,
};
use std::time::Duration;

#[test]
fn test_default_const_values() {
assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, 1_000);
assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, 30_000);
assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
assert_eq!(
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
"OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
);
assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
}

#[test]
fn test_default_batch_config_adheres_to_specification() {
let config = BatchConfig::default();

assert_eq!(
config.scheduled_delay,
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
config.max_export_timeout,
Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
);
assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
assert_eq!(
config.max_export_batch_size,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
);
}

#[test]
fn test_batch_config_configurable_by_env_vars() {
let env_vars = vec![
(OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
(OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
(OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
];

let config = temp_env::with_vars(env_vars, BatchConfig::default);

assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
assert_eq!(config.max_queue_size, 4096);
assert_eq!(config.max_export_batch_size, 1024);
}

#[test]
fn test_batch_config_configurable_by_env_vars_millis() {
let env_vars = vec![
("OTEL_BLRP_SCHEDULE_DELAY_MILLIS", Some("3000")),
("OTEL_BLRP_EXPORT_TIMEOUT_MILLIS", Some("70000")),
];

let config = temp_env::with_vars(env_vars, BatchConfig::default);

assert_eq!(config.scheduled_delay, Duration::from_millis(3000));
assert_eq!(config.max_export_timeout, Duration::from_millis(70000));
assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
assert_eq!(
config.max_export_batch_size,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
);
}

#[test]
fn test_batch_config_configurable_by_env_vars_precedence() {
let env_vars = vec![
(OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
("OTEL_BLRP_SCHEDULE_DELAY_MILLIS", Some("3000")),
(OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
("OTEL_BLRP_EXPORT_TIMEOUT_MILLIS", Some("70000")),
];

let config = temp_env::with_vars(env_vars, BatchConfig::default);

assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
assert_eq!(
config.max_export_batch_size,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
);
}

#[test]
fn test_batch_config_max_export_batch_size_validation() {
let env_vars = vec![
(OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
];

let config = temp_env::with_vars(env_vars, BatchConfig::default);

assert_eq!(config.max_queue_size, 256);
assert_eq!(config.max_export_batch_size, 256);
assert_eq!(
config.scheduled_delay,
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
config.max_export_timeout,
Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
);
}

#[test]
fn test_batch_config_with_fields() {
let batch = BatchConfig::default()
.with_max_export_batch_size(1)
.with_scheduled_delay(Duration::from_millis(2))
.with_max_export_timeout(Duration::from_millis(3))
.with_max_queue_size(4);

assert_eq!(batch.max_export_batch_size, 1);
assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
assert_eq!(batch.max_queue_size, 4);
}

#[test]
fn test_build_batch_log_processor_builder() {
let mut env_vars = vec![
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
(OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
(OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder =
BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio);

assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
builder.config.scheduled_delay,
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
builder.config.max_queue_size,
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
);
assert_eq!(
builder.config.max_export_timeout,
Duration::from_millis(2046)
);
});

env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));

temp_env::with_vars(env_vars, || {
let builder =
BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
});
}
}