-
Notifications
You must be signed in to change notification settings - Fork 384
/
smoke.rs
102 lines (92 loc) · 3.09 KB
/
smoke.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use futures::StreamExt;
use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::trace::{Span, SpanKind, Tracer};
use opentelemetry_otlp::proto::collector::trace::v1::{
trace_service_server::{TraceService, TraceServiceServer},
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use std::{net::SocketAddr, sync::Mutex};
use tokio::sync::mpsc;
use tokio_stream::wrappers::TcpListenerStream;
struct MockServer {
tx: Mutex<mpsc::Sender<ExportTraceServiceRequest>>,
}
impl MockServer {
pub fn new(tx: mpsc::Sender<ExportTraceServiceRequest>) -> Self {
Self { tx: Mutex::new(tx) }
}
}
#[tonic::async_trait]
impl TraceService for MockServer {
async fn export(
&self,
request: tonic::Request<ExportTraceServiceRequest>,
) -> Result<tonic::Response<ExportTraceServiceResponse>, tonic::Status> {
println!("Sending request into channel...");
self.tx
.lock()
.unwrap()
.try_send(request.into_inner())
.expect("Channel full");
Ok(tonic::Response::new(ExportTraceServiceResponse {}))
}
}
async fn setup() -> (SocketAddr, mpsc::Receiver<ExportTraceServiceRequest>) {
let addr: SocketAddr = "[::1]:0".parse().unwrap();
let listener = tokio::net::TcpListener::bind(addr)
.await
.expect("failed to bind");
let addr = listener.local_addr().unwrap();
let stream = TcpListenerStream::new(listener).map(|s| {
if let Ok(ref s) = s {
println!("Got new conn at {}", s.peer_addr().unwrap());
}
s
});
let (req_tx, req_rx) = mpsc::channel(10);
let service = TraceServiceServer::new(MockServer::new(req_tx));
tokio::task::spawn(async move {
tonic::transport::Server::builder()
.add_service(service)
.serve_with_incoming(stream)
.await
.expect("Server failed");
});
(addr, req_rx)
}
#[tokio::test(flavor = "multi_thread")]
async fn smoke_tracer() {
println!("Starting server setup...");
let (addr, mut req_rx) = setup().await;
{
println!("Installing tracer...");
let tracer = opentelemetry_otlp::new_pipeline()
.with_endpoint(format!("http://{}", addr))
.with_tonic()
.install_batch(opentelemetry::runtime::Tokio)
.expect("failed to install");
println!("Sending span...");
let mut span = tracer
.span_builder("my-test-span")
.with_kind(SpanKind::Server)
.start(&tracer);
span.add_event("my-test-event".into(), vec![]);
span.end();
shutdown_tracer_provider();
}
println!("Waiting for request...");
let req = req_rx.recv().await.expect("missing export request");
let first_span = req
.resource_spans
.get(0)
.unwrap()
.instrumentation_library_spans
.get(0)
.unwrap()
.spans
.get(0)
.unwrap();
assert_eq!("my-test-span", first_span.name);
let first_event = first_span.events.get(0).unwrap();
assert_eq!("my-test-event", first_event.name);
}