Skip to content

Commit

Permalink
Flush fix for SimpleLogProcessor (#1308)
Browse files Browse the repository at this point in the history
SimpleLogProcessor's `force_flush` was incorrectly implemented as it did not do anything assuming there was nothing to flush. But the SimpleLogProcessor uses a queue with pub-sub mechanism to operate, so flush is not a no-op. This is handled correctly for SimpleSpanProcessor.

Instead of borrowing the approach from SimpleSpanProcessor, I have refactored the SimpleLogProcessor to use no queue and instead use a simple Mutex protected exporter/shutdown. I feel this is sufficient (and want to port this to SimpleSpanProcessor as well), but would like to get feedback on the approach - was there some scenario which prompted the queue/pub-sub for SimpleProcessors? Or this approach is sufficient?

There should not be any perf concerns as SimpleProcessors are used for learning/dev scenarios, and not for production. (Except when you are exporting to operating system native tracing like etw, user_events, but for them we have written RentrantProcessor separately.)
  • Loading branch information
cijothomas committed Nov 12, 2023
1 parent dd03fda commit cb2d127
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 40 deletions.
59 changes: 19 additions & 40 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use opentelemetry::{
global,
logs::{LogError, LogResult},
};
use std::thread;
use std::sync::Mutex;
use std::{
fmt::{self, Debug, Formatter},
time::Duration,
Expand Down Expand Up @@ -42,63 +42,42 @@ pub trait LogProcessor: Send + Sync + Debug {
/// emitted. If you find this limiting, consider the batch processor instead.
#[derive(Debug)]
pub struct SimpleLogProcessor {
sender: crossbeam_channel::Sender<Option<LogData>>,
shutdown: crossbeam_channel::Receiver<()>,
exporter: Mutex<Box<dyn LogExporter>>,
}

impl SimpleLogProcessor {
pub(crate) fn new(mut exporter: Box<dyn LogExporter>) -> Self {
let (log_tx, log_rx) = crossbeam_channel::unbounded();
let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0);

let _ = thread::Builder::new()
.name("opentelemetry-log-exporter".to_string())
.spawn(move || {
while let Ok(Some(log)) = log_rx.recv() {
if let Err(err) = futures_executor::block_on(exporter.export(vec![log])) {
global::handle_error(err);
}
}

exporter.shutdown();

if let Err(err) = shutdown_tx.send(()) {
global::handle_error(LogError::from(format!(
"could not send shutdown: {:?}",
err
)));
}
});

pub(crate) fn new(exporter: Box<dyn LogExporter>) -> Self {
SimpleLogProcessor {
sender: log_tx,
shutdown: shutdown_rx,
exporter: Mutex::new(exporter),
}
}
}

impl LogProcessor for SimpleLogProcessor {
fn emit(&self, data: LogData) {
if let Err(err) = self.sender.send(Some(data)) {
global::handle_error(LogError::from(format!("error processing log {:?}", err)));
let result = self
.exporter
.lock()
.map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
.and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![data])));
if let Err(err) = result {
global::handle_error(err);
}
}

fn force_flush(&self) -> LogResult<()> {
// Ignored since all logs in Simple Processor will be exported as they ended.
Ok(())
}

fn shutdown(&mut self) -> LogResult<()> {
if self.sender.send(None).is_ok() {
if let Err(err) = self.shutdown.recv() {
global::handle_error(LogError::from(format!(
"error shutting down log processor: {:?}",
err
)))
}
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
Err(LogError::Other(
"simple logprocessor mutex poison during shutdown".into(),
))
}
Ok(())
}

#[cfg(feature = "logs_level_enabled")]
Expand All @@ -108,7 +87,7 @@ impl LogProcessor for SimpleLogProcessor {
}

/// A [`LogProcessor`] that asynchronously buffers log records and reports
/// them at a preconfigured interval.
/// them at a pre-configured interval.
pub struct BatchLogProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,
}
Expand Down
48 changes: 48 additions & 0 deletions opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,51 @@ pub use log_emitter::{Builder, Logger, LoggerProvider};
pub use log_processor::{
BatchConfig, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor, SimpleLogProcessor,
};

#[cfg(all(test, feature = "testing"))]
mod tests {
use super::*;
use crate::testing::logs::InMemoryLogsExporter;
use opentelemetry::logs::{LogRecord, Logger, LoggerProvider as _, Severity};
use opentelemetry::{logs::AnyValue, Key};

#[test]
fn logging_sdk_test() {
// Arrange
let exporter: InMemoryLogsExporter = InMemoryLogsExporter::default();
let logger_provider = LoggerProvider::builder()
.with_log_processor(SimpleLogProcessor::new(Box::new(exporter.clone())))
.build();

// Act
let logger = logger_provider.logger("test-logger");
let mut log_record: LogRecord = LogRecord::default();
log_record.severity_number = Some(Severity::Error);
log_record.severity_text = Some("Error".into());
let attributes = vec![
(Key::new("key1"), "value1".into()),
(Key::new("key2"), "value2".into()),
];
log_record.attributes = Some(attributes);
logger.emit(log_record);

logger_provider.force_flush();

// Assert
let exported_logs = exporter
.get_emitted_logs()
.expect("Logs are expected to be exported.");
assert_eq!(exported_logs.len(), 1);
let log = exported_logs
.get(0)
.expect("Atleast one log is expected to be present.");
assert_eq!(log.instrumentation.name, "test-logger");
assert_eq!(log.record.severity_number, Some(Severity::Error));
let attributes: Vec<(Key, AnyValue)> = log
.record
.attributes
.clone()
.expect("Attributes are expected");
assert_eq!(attributes.len(), 2);
}
}

0 comments on commit cb2d127

Please sign in to comment.