Skip to content

Commit

Permalink
Return response from HttpClient (#511)
Browse files Browse the repository at this point in the history
Make the HttpClient trait in opentelemetry-http return a HTTP reponse
instead of `()`. This enables exporters using this trait to implement
retry behaviour based on the result of the call.
  • Loading branch information
frigus02 committed Apr 5, 2021
1 parent 49b2654 commit 4d58d4b
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 136 deletions.
2 changes: 2 additions & 0 deletions opentelemetry-datadog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ lazy_static = "1.4"

[dev-dependencies]
base64 = "0.13"
bytes = "1"
futures-util = "0.3"
isahc = "0.9"
opentelemetry = { path = "../opentelemetry", features = ["trace", "testing"] }
5 changes: 3 additions & 2 deletions opentelemetry-datadog/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use opentelemetry::sdk::export::trace;
use opentelemetry::sdk::export::trace::SpanData;
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk, trace::TracerProvider};
use opentelemetry_http::HttpClient;
use opentelemetry_http::{HttpClient, ResponseExt};

/// Default Datadog collector endpoint
const DEFAULT_AGENT_ENDPOINT: &str = "http://127.0.0.1:8126";
Expand Down Expand Up @@ -201,7 +201,8 @@ impl trace::SpanExporter for DatadogExporter {
.header(DATADOG_TRACE_COUNT_HEADER, trace_count)
.body(data)
.map_err::<Error, _>(Into::into)?;
self.client.send(req).await
let _ = self.client.send(req).await?.error_for_status()?;
Ok(())
}
}

Expand Down
27 changes: 17 additions & 10 deletions opentelemetry-datadog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,28 +81,35 @@
//! use opentelemetry::{KeyValue, trace::Tracer};
//! use opentelemetry::sdk::{trace::{self, IdGenerator, Sampler}, Resource};
//! use opentelemetry::sdk::export::trace::ExportResult;
//! use opentelemetry_datadog::{new_pipeline, ApiVersion, Error};
//! use opentelemetry::global::shutdown_tracer_provider;
//! use opentelemetry_http::HttpClient;
//! use opentelemetry_datadog::{new_pipeline, ApiVersion, Error};
//! use opentelemetry_http::{HttpClient, HttpError};
//! use async_trait::async_trait;
//! use bytes::Bytes;
//! use futures_util::io::AsyncReadExt as _;
//! use http::{Request, Response};
//! use std::convert::TryInto as _;
//!
//! // `reqwest` and `surf` are supported through features, if you prefer an
//! // alternate http client you can add support by implementing `HttpClient` as
//! // shown here.
//! #[derive(Debug)]
//! struct IsahcClient(isahc::HttpClient);
//!
//! async fn body_to_bytes(mut body: isahc::Body) -> Result<Bytes, HttpError> {
//! let mut bytes = Vec::with_capacity(body.len().unwrap_or(0).try_into()?);
//! let _ = body.read_to_end(&mut bytes).await?;
//! Ok(bytes.into())
//! }
//!
//! #[async_trait]
//! impl HttpClient for IsahcClient {
//! async fn send(&self, request: http::Request<Vec<u8>>) -> ExportResult {
//! let result = self.0.send_async(request).await.map_err(|err| Error::Other(err.to_string()))?;
//!
//! if result.status().is_success() {
//! Ok(())
//! } else {
//! Err(Error::Other(result.status().to_string()).into())
//! async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
//! let response = self.0.send_async(request).await?;
//! Ok(Response::builder()
//! .status(response.status())
//! .body(body_to_bytes(response.into_body()).await?)?)
//! }
//! }
//! }
//!
//! fn main() -> Result<(), opentelemetry::trace::TraceError> {
Expand Down
7 changes: 4 additions & 3 deletions opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ license = "Apache-2.0"
edition = "2018"

[dependencies]
async-trait = "0.1.42"
http = "0.2.2"
async-trait = "0.1"
bytes = "1"
futures-util = { version = "0.3", default-features = false, features = ["io"] }
http = "0.2"
isahc = { version = "0.9", default-features = false, optional = true }
opentelemetry = { version = "0.13", path = "../opentelemetry", features = ["trace"] }
reqwest = { version = "0.11", default-features = false, features = ["blocking"], optional = true }
surf = { version = "2.0", default-features = false, optional = true }
thiserror = "1"
171 changes: 71 additions & 100 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::fmt::Debug;

use async_trait::async_trait;
use http::Request;
use opentelemetry::propagation::{Extractor, Injector};
use opentelemetry::trace::TraceError;
use bytes::Bytes;
use http::{Request, Response};
use opentelemetry::{
propagation::{Extractor, Injector},
trace::TraceError,
};

pub struct HeaderInjector<'a>(pub &'a mut http::HeaderMap);

Expand Down Expand Up @@ -35,145 +38,113 @@ impl<'a> Extractor for HeaderExtractor<'a> {
}
}

pub type HttpError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// A minimal interface necessary for export spans over HTTP.
///
/// Users sometime choose http clients that relay on certain runtime. This trait
/// allows users to bring their choice of http clients.
/// Users sometime choose HTTP clients that relay on a certain async runtime. This trait allows
/// users to bring their choice of HTTP client.
#[async_trait]
pub trait HttpClient: Debug + Send + Sync {
/// Send a batch of spans to collectors
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError>;
/// Send the specified HTTP request
///
/// Returns the HTTP response including the status code and body.
///
/// Returns an error if it can't connect to the server or the request could not be completed,
/// e.g. because of a timeout, infinite redirects, or a loss of connection.
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError>;
}

#[cfg(feature = "reqwest")]
mod reqwest {
use super::{async_trait, HttpClient, Request, TraceError};
use opentelemetry::sdk::export::ExportError;
use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
use std::convert::TryInto;
use thiserror::Error;

#[async_trait]
impl HttpClient for reqwest::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let request = request.try_into().map_err(ReqwestError::from)?;
let _ = self
.execute(request)
.await
.and_then(|rsp| rsp.error_for_status())
.map_err(ReqwestError::from)?;
Ok(())
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let request = request.try_into()?;
let response = self.execute(request).await?;
Ok(Response::builder()
.status(response.status())
.body(response.bytes().await?)?)
}
}

#[async_trait]
impl HttpClient for reqwest::blocking::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let _ = request
.try_into()
.and_then(|req| self.execute(req))
.and_then(|rsp| rsp.error_for_status())
.map_err(ReqwestError::from)?;
Ok(())
}
}

#[derive(Debug, Error)]
#[error(transparent)]
struct ReqwestError(#[from] reqwest::Error);

impl ExportError for ReqwestError {
fn exporter_name(&self) -> &'static str {
"reqwest"
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let request = request.try_into()?;
let response = self.execute(request)?;
Ok(Response::builder()
.status(response.status())
.body(response.bytes()?)?)
}
}
}

#[cfg(feature = "surf")]
mod surf {
use super::{async_trait, HttpClient, Request, TraceError};
use opentelemetry::sdk::export::ExportError;
use std::fmt::{Display, Formatter};
use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};

#[async_trait]
impl HttpClient for surf::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let (parts, body) = request.into_parts();
let uri = parts
.uri
.to_string()
.parse()
.map_err(|_err: surf::http::url::ParseError| TraceError::from("error parse url"))?;

let req = surf::Request::builder(surf::http::Method::Post, uri)
.content_type("application/json")
.body(body);
let result = self.send(req).await.map_err::<SurfError, _>(Into::into)?;

if result.status().is_success() {
Ok(())
} else {
Err(SurfError(surf::Error::from_str(
result.status(),
result.status().canonical_reason(),
))
.into())
let method = parts.method.as_str().parse()?;
let uri = parts.uri.to_string().parse()?;

let mut request_builder = surf::Request::builder(method, uri).body(body);
let mut prev_name = None;
for (new_name, value) in parts.headers.into_iter() {
let name = new_name.or(prev_name).expect("the first time new_name should be set and from then on we always have a prev_name");
request_builder = request_builder.header(name.as_str(), value.to_str()?);
prev_name = Some(name);
}
}
}

#[derive(Debug)]
struct SurfError(surf::Error);

impl ExportError for SurfError {
fn exporter_name(&self) -> &'static str {
"surf"
}
}

impl From<surf::Error> for SurfError {
fn from(err: surf::Error) -> Self {
SurfError(err)
}
}

impl std::error::Error for SurfError {}

impl Display for SurfError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.to_string())
let mut response = self.send(request_builder).await?;
Ok(Response::builder()
.status(response.status() as u16)
.body(response.body_bytes().await?.into())?)
}
}
}

#[cfg(feature = "isahc")]
mod isahc {
use super::{async_trait, HttpClient, Request, TraceError};
use opentelemetry::sdk::export::ExportError;
use thiserror::Error;
use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
use futures_util::io::AsyncReadExt as _;
use std::convert::TryInto as _;

#[async_trait]
impl HttpClient for isahc::HttpClient {
async fn send(&self, request: Request<Vec<u8>>) -> Result<(), TraceError> {
let res = self.send_async(request).await.map_err(IsahcError::from)?;

if !res.status().is_success() {
return Err(TraceError::from(format!(
"Expected success response, got {:?}",
res.status()
)));
}

Ok(())
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let response = self.send_async(request).await?;
Ok(Response::builder()
.status(response.status())
.body(body_to_bytes(response.into_body()).await?)?)
}
}

#[derive(Debug, Error)]
#[error(transparent)]
struct IsahcError(#[from] isahc::Error);
async fn body_to_bytes(mut body: isahc::Body) -> Result<Bytes, HttpError> {
let mut bytes = Vec::with_capacity(body.len().unwrap_or(0).try_into()?);
let _ = body.read_to_end(&mut bytes).await?;
Ok(bytes.into())
}
}

/// Methods to make working with responses from the [`HttpClient`] trait easier.
pub trait ResponseExt: Sized {
/// Turn a response into an error if the HTTP status does not indicate success (200 - 299).
fn error_for_status(self) -> Result<Self, TraceError>;
}

impl ExportError for IsahcError {
fn exporter_name(&self) -> &'static str {
"isahc"
impl<T> ResponseExt for Response<T> {
fn error_for_status(self) -> Result<Self, TraceError> {
if self.status().is_success() {
Ok(self)
} else {
Err(format!("request failed with status {}", self.status()).into())
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ headers = { version = "0.3.2", optional = true }
surf = { version = "2.0", optional = true }

[dev-dependencies]
opentelemetry = { version = "0.13", default-features = false, features = ["trace", "testing"], path = "../opentelemetry" }
bytes = "1"
futures = "0.3"
opentelemetry = { version = "0.13", default-features = false, features = ["trace", "testing"], path = "../opentelemetry" }

[dependencies.web-sys]
version = "0.3.4"
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-jaeger/src/exporter/collector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! # HTTP Jaeger Collector Client
use http::Uri;
#[cfg(feature = "collector_client")]
use opentelemetry_http::HttpClient;
use opentelemetry_http::{HttpClient, ResponseExt as _};
use std::sync::atomic::AtomicUsize;

/// `CollectorAsyncClientHttp` implements an async version of the
Expand Down Expand Up @@ -68,7 +68,8 @@ mod collector_client {
.expect("request should always be valid");

// Send request to collector
self.client.send(req).await
let _ = self.client.send(req).await?.error_for_status()?;
Ok(())
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions opentelemetry-jaeger/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,10 +672,9 @@ mod collector_client_tests {

mod test_http_client {
use async_trait::async_trait;
use http::Request;
use opentelemetry::sdk::export::trace::ExportResult;
use opentelemetry::trace::TraceError;
use opentelemetry_http::HttpClient;
use bytes::Bytes;
use http::{Request, Response};
use opentelemetry_http::{HttpClient, HttpError};
use std::fmt::Debug;

pub(crate) struct TestHttpClient;
Expand All @@ -688,8 +687,8 @@ mod collector_client_tests {

#[async_trait]
impl HttpClient for TestHttpClient {
async fn send(&self, _request: Request<Vec<u8>>) -> ExportResult {
Err(TraceError::from("wrong uri set in http client"))
async fn send(&self, _request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
Err("wrong uri set in http client".into())
}
}
}
Expand All @@ -707,7 +706,7 @@ mod collector_client_tests {
});
assert_eq!(
format!("{:?}", res.err().unwrap()),
"Other(Custom(\"wrong uri set in http client\"))"
"Other(\"wrong uri set in http client\")"
);

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-zipkin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,7 @@ surf = { version = "2.0", optional = true }
thiserror = { version = "1.0"}

[dev-dependencies]
bytes = "1"
futures-util = "0.3"
isahc = "=0.9.6"
opentelemetry = { version = "0.13", default-features = false, features = ["trace", "testing"], path = "../opentelemetry" }

0 comments on commit 4d58d4b

Please sign in to comment.