Skip to content

Commit 4ace340

Browse files
authoredJul 18, 2023
fix(client): remove Send bounds for request Body (#3266)
Closes #3184
1 parent 0df9d7e commit 4ace340

File tree

3 files changed

+180
-59
lines changed

3 files changed

+180
-59
lines changed
 

‎examples/single_threaded.rs

+178-57
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl HttpBody for Body {
5252
fn main() {
5353
pretty_env_logger::init();
5454

55-
let server = thread::spawn(move || {
55+
let server_http2 = thread::spawn(move || {
5656
// Configure a runtime for the server that runs everything on the current thread
5757
let rt = tokio::runtime::Builder::new_current_thread()
5858
.enable_all()
@@ -61,10 +61,10 @@ fn main() {
6161

6262
// Combine it with a `LocalSet, which means it can spawn !Send futures...
6363
let local = tokio::task::LocalSet::new();
64-
local.block_on(&rt, server()).unwrap();
64+
local.block_on(&rt, http2_server()).unwrap();
6565
});
6666

67-
let client = thread::spawn(move || {
67+
let client_http2 = thread::spawn(move || {
6868
// Configure a runtime for the client that runs everything on the current thread
6969
let rt = tokio::runtime::Builder::new_current_thread()
7070
.enable_all()
@@ -76,16 +76,137 @@ fn main() {
7676
local
7777
.block_on(
7878
&rt,
79-
client("http://localhost:3000".parse::<hyper::Uri>().unwrap()),
79+
http2_client("http://localhost:3000".parse::<hyper::Uri>().unwrap()),
8080
)
8181
.unwrap();
8282
});
8383

84-
server.join().unwrap();
85-
client.join().unwrap();
84+
let server_http1 = thread::spawn(move || {
85+
// Configure a runtime for the server that runs everything on the current thread
86+
let rt = tokio::runtime::Builder::new_current_thread()
87+
.enable_all()
88+
.build()
89+
.expect("build runtime");
90+
91+
// Combine it with a `LocalSet, which means it can spawn !Send futures...
92+
let local = tokio::task::LocalSet::new();
93+
local.block_on(&rt, http1_server()).unwrap();
94+
});
95+
96+
let client_http1 = thread::spawn(move || {
97+
// Configure a runtime for the client that runs everything on the current thread
98+
let rt = tokio::runtime::Builder::new_current_thread()
99+
.enable_all()
100+
.build()
101+
.expect("build runtime");
102+
103+
// Combine it with a `LocalSet, which means it can spawn !Send futures...
104+
let local = tokio::task::LocalSet::new();
105+
local
106+
.block_on(
107+
&rt,
108+
http1_client("http://localhost:3001".parse::<hyper::Uri>().unwrap()),
109+
)
110+
.unwrap();
111+
});
112+
113+
server_http2.join().unwrap();
114+
client_http2.join().unwrap();
115+
116+
server_http1.join().unwrap();
117+
client_http1.join().unwrap();
118+
}
119+
120+
async fn http1_server() -> Result<(), Box<dyn std::error::Error>> {
121+
let addr = SocketAddr::from(([127, 0, 0, 1], 3001));
122+
123+
let listener = TcpListener::bind(addr).await?;
124+
125+
// For each connection, clone the counter to use in our service...
126+
let counter = Rc::new(Cell::new(0));
127+
128+
loop {
129+
let (stream, _) = listener.accept().await?;
130+
131+
let io = TokioIo::new(stream);
132+
133+
let cnt = counter.clone();
134+
135+
let service = service_fn(move |_| {
136+
let prev = cnt.get();
137+
cnt.set(prev + 1);
138+
let value = cnt.get();
139+
async move { Ok::<_, Error>(Response::new(Body::from(format!("Request #{}", value)))) }
140+
});
141+
142+
tokio::task::spawn_local(async move {
143+
if let Err(err) = hyper::server::conn::http1::Builder::new()
144+
.serve_connection(io, service)
145+
.await
146+
{
147+
println!("Error serving connection: {:?}", err);
148+
}
149+
});
150+
}
151+
}
152+
153+
async fn http1_client(url: hyper::Uri) -> Result<(), Box<dyn std::error::Error>> {
154+
let host = url.host().expect("uri has no host");
155+
let port = url.port_u16().unwrap_or(80);
156+
let addr = format!("{}:{}", host, port);
157+
let stream = TcpStream::connect(addr).await?;
158+
159+
let io = TokioIo::new(stream);
160+
161+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
162+
163+
tokio::task::spawn_local(async move {
164+
if let Err(err) = conn.await {
165+
let mut stdout = io::stdout();
166+
stdout
167+
.write_all(format!("Connection failed: {:?}", err).as_bytes())
168+
.await
169+
.unwrap();
170+
stdout.flush().await.unwrap();
171+
}
172+
});
173+
174+
let authority = url.authority().unwrap().clone();
175+
176+
// Make 4 requests
177+
for _ in 0..4 {
178+
let req = Request::builder()
179+
.uri(url.clone())
180+
.header(hyper::header::HOST, authority.as_str())
181+
.body(Body::from("test".to_string()))?;
182+
183+
let mut res = sender.send_request(req).await?;
184+
185+
let mut stdout = io::stdout();
186+
stdout
187+
.write_all(format!("Response: {}\n", res.status()).as_bytes())
188+
.await
189+
.unwrap();
190+
stdout
191+
.write_all(format!("Headers: {:#?}\n", res.headers()).as_bytes())
192+
.await
193+
.unwrap();
194+
stdout.flush().await.unwrap();
195+
196+
// Print the response body
197+
while let Some(next) = res.frame().await {
198+
let frame = next?;
199+
if let Some(chunk) = frame.data_ref() {
200+
stdout.write_all(&chunk).await.unwrap();
201+
}
202+
}
203+
stdout.write_all(b"\n-----------------\n").await.unwrap();
204+
stdout.flush().await.unwrap();
205+
}
206+
Ok(())
86207
}
87208

88-
async fn server() -> Result<(), Box<dyn std::error::Error>> {
209+
async fn http2_server() -> Result<(), Box<dyn std::error::Error>> {
89210
let mut stdout = io::stdout();
90211

91212
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
@@ -102,7 +223,7 @@ async fn server() -> Result<(), Box<dyn std::error::Error>> {
102223

103224
loop {
104225
let (stream, _) = listener.accept().await?;
105-
let io = TokioIo::new(stream);
226+
let io = IOTypeNotSend::new(TokioIo::new(stream));
106227

107228
// For each connection, clone the counter to use in our service...
108229
let cnt = counter.clone();
@@ -130,55 +251,7 @@ async fn server() -> Result<(), Box<dyn std::error::Error>> {
130251
}
131252
}
132253

133-
struct IOTypeNotSend {
134-
_marker: PhantomData<*const ()>,
135-
stream: TokioIo<TcpStream>,
136-
}
137-
138-
impl IOTypeNotSend {
139-
fn new(stream: TokioIo<TcpStream>) -> Self {
140-
Self {
141-
_marker: PhantomData,
142-
stream,
143-
}
144-
}
145-
}
146-
147-
impl hyper::rt::Write for IOTypeNotSend {
148-
fn poll_write(
149-
mut self: Pin<&mut Self>,
150-
cx: &mut Context<'_>,
151-
buf: &[u8],
152-
) -> Poll<Result<usize, std::io::Error>> {
153-
Pin::new(&mut self.stream).poll_write(cx, buf)
154-
}
155-
156-
fn poll_flush(
157-
mut self: Pin<&mut Self>,
158-
cx: &mut Context<'_>,
159-
) -> Poll<Result<(), std::io::Error>> {
160-
Pin::new(&mut self.stream).poll_flush(cx)
161-
}
162-
163-
fn poll_shutdown(
164-
mut self: Pin<&mut Self>,
165-
cx: &mut Context<'_>,
166-
) -> Poll<Result<(), std::io::Error>> {
167-
Pin::new(&mut self.stream).poll_shutdown(cx)
168-
}
169-
}
170-
171-
impl hyper::rt::Read for IOTypeNotSend {
172-
fn poll_read(
173-
mut self: Pin<&mut Self>,
174-
cx: &mut Context<'_>,
175-
buf: hyper::rt::ReadBufCursor<'_>,
176-
) -> Poll<std::io::Result<()>> {
177-
Pin::new(&mut self.stream).poll_read(cx, buf)
178-
}
179-
}
180-
181-
async fn client(url: hyper::Uri) -> Result<(), Box<dyn std::error::Error>> {
254+
async fn http2_client(url: hyper::Uri) -> Result<(), Box<dyn std::error::Error>> {
182255
let host = url.host().expect("uri has no host");
183256
let port = url.port_u16().unwrap_or(80);
184257
let addr = format!("{}:{}", host, port);
@@ -250,3 +323,51 @@ where
250323
tokio::task::spawn_local(fut);
251324
}
252325
}
326+
327+
struct IOTypeNotSend {
328+
_marker: PhantomData<*const ()>,
329+
stream: TokioIo<TcpStream>,
330+
}
331+
332+
impl IOTypeNotSend {
333+
fn new(stream: TokioIo<TcpStream>) -> Self {
334+
Self {
335+
_marker: PhantomData,
336+
stream,
337+
}
338+
}
339+
}
340+
341+
impl hyper::rt::Write for IOTypeNotSend {
342+
fn poll_write(
343+
mut self: Pin<&mut Self>,
344+
cx: &mut Context<'_>,
345+
buf: &[u8],
346+
) -> Poll<Result<usize, std::io::Error>> {
347+
Pin::new(&mut self.stream).poll_write(cx, buf)
348+
}
349+
350+
fn poll_flush(
351+
mut self: Pin<&mut Self>,
352+
cx: &mut Context<'_>,
353+
) -> Poll<Result<(), std::io::Error>> {
354+
Pin::new(&mut self.stream).poll_flush(cx)
355+
}
356+
357+
fn poll_shutdown(
358+
mut self: Pin<&mut Self>,
359+
cx: &mut Context<'_>,
360+
) -> Poll<Result<(), std::io::Error>> {
361+
Pin::new(&mut self.stream).poll_shutdown(cx)
362+
}
363+
}
364+
365+
impl hyper::rt::Read for IOTypeNotSend {
366+
fn poll_read(
367+
mut self: Pin<&mut Self>,
368+
cx: &mut Context<'_>,
369+
buf: hyper::rt::ReadBufCursor<'_>,
370+
) -> Poll<std::io::Result<()>> {
371+
Pin::new(&mut self.stream).poll_read(cx, buf)
372+
}
373+
}

‎src/client/conn/http1.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ where
249249
impl<T, B> Future for Connection<T, B>
250250
where
251251
T: Read + Write + Unpin + Send + 'static,
252-
B: Body + Send + 'static,
252+
B: Body + 'static,
253253
B::Data: Send,
254254
B::Error: Into<Box<dyn StdError + Send + Sync>>,
255255
{

‎src/client/conn/http2.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl<B> fmt::Debug for SendRequest<B> {
194194
impl<T, B, E> Connection<T, B, E>
195195
where
196196
T: Read + Write + Unpin + 'static,
197-
B: Body + Unpin + Send + 'static,
197+
B: Body + Unpin + 'static,
198198
B::Data: Send,
199199
B::Error: Into<Box<dyn Error + Send + Sync>>,
200200
E: ExecutorClient<B, T> + Unpin,

0 commit comments

Comments
 (0)
Please sign in to comment.