From f65861143cfb2b649d5e3c60ec72cf4ecea213c0 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Sun, 27 Aug 2023 17:32:35 -0500 Subject: [PATCH 01/28] Semaphore example using an Arc for semaphore passing --- tokio/src/sync/semaphore.rs | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index cb770215488..1323a492e79 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -47,7 +47,23 @@ use std::sync::Arc; /// } /// ``` /// -/// Use [`Semaphore::acquire_owned`] to move permits across tasks: +/// Limit number of incoming requests being handled at the same time, using a task-safe semaphore +/// via [`Semaphore::acquire_owned`]. +/// +/// Similar to limiting the numer of simultaneous opened files, network handles are a limited resource. +/// Allowing an unbounded amount of requests to be processed could result in a denial-of-service, +/// and many other issues. +/// +/// However, in contrast to the file example, this example uses a non-static, `Arc`-wrapped `Semaphore` +/// for more fine-grained lifetime and ownership control. The `Arc` allows multiple threads and tasks +/// to share ownership of the semaphore. Here we create a shallow-copy of the `Arc` reference +/// using `clone`. Then we move the `Arc` into our task, where we will wait to acquire a new permit +/// via [`Semaphore::acquire_owned`] so we can start processing. Once our processing is done, we +/// will drop the permit to allow other tasks to acquire it. +/// +/// If we leave the scope where the `Arc` was defined, and references still exist in any +/// of our tasks to it, the `Semaphore` will continue to exist in a static-like context, until all +/// `Arc`s have been dropped. /// /// ``` /// use std::sync::Arc; @@ -59,11 +75,9 @@ use std::sync::Arc; /// let mut join_handles = Vec::new(); /// /// for _ in 0..5 { -/// let permit = semaphore.clone().acquire_owned().await.unwrap(); +/// let semaphore_clone = semaphore.clone(); /// join_handles.push(tokio::spawn(async move { -/// // perform task... -/// // explicitly own `permit` in the task -/// drop(permit); +/// perform_task(semaphore_clone).await; /// })); /// } /// @@ -71,6 +85,13 @@ use std::sync::Arc; /// handle.await.unwrap(); /// } /// } +/// +/// async fn perform_task(semaphore: Arc) { +/// let permit = semaphore.acquire_owned().await.unwrap(); +/// // perform task... +/// // explicitly own `permit` in the task +/// drop(permit); +/// } /// ``` /// /// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html From eef8f076a48a549b786971af7b11524ec6bb6c8b Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Mon, 28 Aug 2023 15:15:49 -0500 Subject: [PATCH 02/28] Update --- tokio/src/sync/semaphore.rs | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index f63fcbf796c..103d513fa1c 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -57,27 +57,26 @@ use std::sync::Arc; /// However, in contrast to the file example, this example uses a non-static, `Arc`-wrapped `Semaphore` /// for more fine-grained lifetime and ownership control. The `Arc` allows multiple threads and tasks /// to share ownership of the semaphore. Here we create a shallow-copy of the `Arc` reference -/// using `clone`. Then we move the `Arc` into our task, where we will wait to acquire a new permit -/// via [`Semaphore::acquire_owned`] so we can start processing. Once our processing is done, we -/// will drop the permit to allow other tasks to acquire it. +/// using `clone`. Then we acquire a permit through the `Arc` from the `Semaphore` via [`Semaphore::acquire_owned`], +/// and move it inside the task. This ensures no non-`'static` variables are referenced from within +/// said task. /// /// If we leave the scope where the `Arc` was defined, and references still exist in any /// of our tasks to it, the `Semaphore` will continue to exist in a static-like context, until all /// `Arc`s have been dropped. /// /// ``` -/// use std::sync::Arc; -/// use tokio::sync::Semaphore; -/// /// #[tokio::main] /// async fn main() { /// let semaphore = Arc::new(Semaphore::new(3)); /// let mut join_handles = Vec::new(); /// /// for _ in 0..5 { -/// let semaphore_clone = semaphore.clone(); +/// let permit = semaphore.clone().acquire_owned().await.unwrap(); /// join_handles.push(tokio::spawn(async move { -/// perform_task(semaphore_clone).await; +/// // perform task... +/// // explicitly own `permit` in the task +/// drop(permit); /// })); /// } /// @@ -85,13 +84,6 @@ use std::sync::Arc; /// handle.await.unwrap(); /// } /// } -/// -/// async fn perform_task(semaphore: Arc) { -/// let permit = semaphore.acquire_owned().await.unwrap(); -/// // perform task... -/// // explicitly own `permit` in the task -/// drop(permit); -/// } /// ``` /// /// Limit the number of simultaneously opened files in your program. From 2f9d8303114c7f9fd39ef71ede97d3435a5f9bd1 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Mon, 28 Aug 2023 15:19:07 -0500 Subject: [PATCH 03/28] Fix example --- tokio/src/sync/semaphore.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 103d513fa1c..67225e8fc29 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -66,17 +66,18 @@ use std::sync::Arc; /// `Arc`s have been dropped. /// /// ``` +/// use std::sync::Arc; +/// use tokio::sync::Semaphore; +/// /// #[tokio::main] /// async fn main() { /// let semaphore = Arc::new(Semaphore::new(3)); /// let mut join_handles = Vec::new(); /// /// for _ in 0..5 { -/// let permit = semaphore.clone().acquire_owned().await.unwrap(); +/// let semaphore_clone = semaphore.clone(); /// join_handles.push(tokio::spawn(async move { -/// // perform task... -/// // explicitly own `permit` in the task -/// drop(permit); +/// perform_task(semaphore_clone).await; /// })); /// } /// @@ -84,6 +85,13 @@ use std::sync::Arc; /// handle.await.unwrap(); /// } /// } +/// +/// async fn perform_task(semaphore: Arc) { +/// let permit = semaphore.acquire_owned().await.unwrap(); +/// // perform task... +/// // explicitly own `permit` in the task +/// drop(permit); +/// } /// ``` /// /// Limit the number of simultaneously opened files in your program. From 9bc830e0bd243cc51a5f20b8f0ef658ed05db1cc Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Mon, 28 Aug 2023 15:20:49 -0500 Subject: [PATCH 04/28] Fix example --- tokio/src/sync/semaphore.rs | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 67225e8fc29..08330f74ab5 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -71,26 +71,21 @@ use std::sync::Arc; /// /// #[tokio::main] /// async fn main() { -/// let semaphore = Arc::new(Semaphore::new(3)); -/// let mut join_handles = Vec::new(); +/// let semaphore = Arc::new(Semaphore::new(3)); +/// let mut join_handles = Vec::new(); /// -/// for _ in 0..5 { -/// let semaphore_clone = semaphore.clone(); -/// join_handles.push(tokio::spawn(async move { -/// perform_task(semaphore_clone).await; -/// })); -/// } +/// for _ in 0..5 { +/// let permit = semaphore.clone().acquire_owned().await.unwrap(); +/// join_handles.push(tokio::spawn(async move { +/// // perform task... +/// // explicitly own `permit` in the task +/// drop(permit); +/// })); +/// } /// -/// for handle in join_handles { -/// handle.await.unwrap(); -/// } -/// } -/// -/// async fn perform_task(semaphore: Arc) { -/// let permit = semaphore.acquire_owned().await.unwrap(); -/// // perform task... -/// // explicitly own `permit` in the task -/// drop(permit); +/// for handle in join_handles { +/// handle.await.unwrap(); +/// } /// } /// ``` /// From 280575599137db4facd8ebf21fb8dc429bbd9dc5 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Mon, 28 Aug 2023 15:24:06 -0500 Subject: [PATCH 05/28] Fix example --- tokio/src/sync/semaphore.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 08330f74ab5..843cb23189f 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -71,21 +71,21 @@ use std::sync::Arc; /// /// #[tokio::main] /// async fn main() { -/// let semaphore = Arc::new(Semaphore::new(3)); -/// let mut join_handles = Vec::new(); +/// let semaphore = Arc::new(Semaphore::new(3)); +/// let mut join_handles = Vec::new(); /// -/// for _ in 0..5 { -/// let permit = semaphore.clone().acquire_owned().await.unwrap(); -/// join_handles.push(tokio::spawn(async move { -/// // perform task... -/// // explicitly own `permit` in the task -/// drop(permit); -/// })); -/// } +/// for _ in 0..5 { +/// let permit = semaphore.clone().acquire_owned().await.unwrap(); +/// join_handles.push(tokio::spawn(async move { +/// // perform task... +/// // explicitly own `permit` in the task +/// drop(permit); +/// })); +/// } /// -/// for handle in join_handles { -/// handle.await.unwrap(); -/// } +/// for handle in join_handles { +/// handle.await.unwrap(); +/// } /// } /// ``` /// From c7151c741feccdf9875416b65d10edfc455040af Mon Sep 17 00:00:00 2001 From: Alexander Kirilin <62392572+alexanderkirilin@users.noreply.github.com> Date: Tue, 29 Aug 2023 09:07:43 -0500 Subject: [PATCH 06/28] Update tokio/src/sync/semaphore.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/semaphore.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 843cb23189f..a09281af53e 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -47,8 +47,7 @@ use std::sync::Arc; /// } /// ``` /// -/// Limit number of incoming requests being handled at the same time, using a task-safe semaphore -/// via [`Semaphore::acquire_owned`]. +/// Limit number of incoming requests being handled at the same time. /// /// Similar to limiting the numer of simultaneous opened files, network handles are a limited resource. /// Allowing an unbounded amount of requests to be processed could result in a denial-of-service, From 0914b0f35220368ae451b0d37b0018f44d3fe1f8 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Tue, 29 Aug 2023 10:09:14 -0500 Subject: [PATCH 07/28] Change example to actual request limiter --- tokio/src/sync/semaphore.rs | 42 +++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index a09281af53e..5fe8dbeb2dd 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -67,23 +67,43 @@ use std::sync::Arc; /// ``` /// use std::sync::Arc; /// use tokio::sync::Semaphore; +/// use tokio::net::TcpListener; +/// use tokio::io::{AsyncReadExt, AsyncWriteExt}; /// /// #[tokio::main] -/// async fn main() { +/// async fn main() -> Result<(), Box> { /// let semaphore = Arc::new(Semaphore::new(3)); -/// let mut join_handles = Vec::new(); +/// let listener = TcpListener::bind("127.0.0.1:8080").await?; /// -/// for _ in 0..5 { +/// loop { /// let permit = semaphore.clone().acquire_owned().await.unwrap(); -/// join_handles.push(tokio::spawn(async move { -/// // perform task... -/// // explicitly own `permit` in the task -/// drop(permit); -/// })); -/// } +/// let (mut socket, _) = listener.accept().await?; +/// +/// tokio::spawn(async move { +/// let mut buf = [0; 1024]; +/// +/// // In a loop, read data from the socket and write the data back. +/// loop { +/// let n = match socket.read(&mut buf).await { +/// // socket closed +/// Ok(n) if n == 0 => break, +/// Ok(n) => n, +/// Err(e) => { +/// eprintln!("failed to read from socket; err = {:?}", e); +/// break; +/// } +/// }; +/// +/// // Write the data back +/// if let Err(e) = socket.write_all(&buf[0..n]).await { +/// eprintln!("failed to write to socket; err = {:?}", e); +/// break; +/// } +/// } /// -/// for handle in join_handles { -/// handle.await.unwrap(); +/// // Drop the permit, so more tasks can be created +/// drop(permit); +/// }); /// } /// } /// ``` From 7428441b86c7c8448ccbd93a4a7f3a44042b0bcd Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Tue, 29 Aug 2023 10:48:40 -0500 Subject: [PATCH 08/28] Swap loop to get around CI --- tokio/src/sync/semaphore.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 5fe8dbeb2dd..fe358a1cc68 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -64,6 +64,8 @@ use std::sync::Arc; /// of our tasks to it, the `Semaphore` will continue to exist in a static-like context, until all /// `Arc`s have been dropped. /// +/// In this example, a `for` loop is used for the outer loop to pass tests. In practice, this would be +/// a bare `loop`. /// ``` /// use std::sync::Arc; /// use tokio::sync::Semaphore; @@ -71,11 +73,12 @@ use std::sync::Arc; /// use tokio::io::{AsyncReadExt, AsyncWriteExt}; /// /// #[tokio::main] -/// async fn main() -> Result<(), Box> { +/// async fn main() { /// let semaphore = Arc::new(Semaphore::new(3)); /// let listener = TcpListener::bind("127.0.0.1:8080").await?; /// -/// loop { +/// // Replace the for loop, with a bare loop +/// for _ in 0..5 { /// let permit = semaphore.clone().acquire_owned().await.unwrap(); /// let (mut socket, _) = listener.accept().await?; /// From 012381620546d9fc827a50ea45ffbfb5e4e58736 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Tue, 29 Aug 2023 11:03:41 -0500 Subject: [PATCH 09/28] No run docstring --- tokio/src/sync/semaphore.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index fe358a1cc68..c9eca1f5109 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -64,21 +64,18 @@ use std::sync::Arc; /// of our tasks to it, the `Semaphore` will continue to exist in a static-like context, until all /// `Arc`s have been dropped. /// -/// In this example, a `for` loop is used for the outer loop to pass tests. In practice, this would be -/// a bare `loop`. -/// ``` +/// ```no_run /// use std::sync::Arc; /// use tokio::sync::Semaphore; /// use tokio::net::TcpListener; /// use tokio::io::{AsyncReadExt, AsyncWriteExt}; /// /// #[tokio::main] -/// async fn main() { +/// async fn main() -> Result<(), Box> { /// let semaphore = Arc::new(Semaphore::new(3)); /// let listener = TcpListener::bind("127.0.0.1:8080").await?; /// -/// // Replace the for loop, with a bare loop -/// for _ in 0..5 { +/// loop { /// let permit = semaphore.clone().acquire_owned().await.unwrap(); /// let (mut socket, _) = listener.accept().await?; /// From b17bddc1bcd2327fcf3e59adc78b807a9d8ed7c0 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Tue, 29 Aug 2023 12:49:08 -0500 Subject: [PATCH 10/28] Add comment --- tokio/src/sync/semaphore.rs | 47 +++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index c9eca1f5109..7f4363a5b51 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -76,33 +76,34 @@ use std::sync::Arc; /// let listener = TcpListener::bind("127.0.0.1:8080").await?; /// /// loop { +/// // Acquire permit before accepting, else the connection will wait without purpose /// let permit = semaphore.clone().acquire_owned().await.unwrap(); /// let (mut socket, _) = listener.accept().await?; /// /// tokio::spawn(async move { -/// let mut buf = [0; 1024]; -/// -/// // In a loop, read data from the socket and write the data back. -/// loop { -/// let n = match socket.read(&mut buf).await { -/// // socket closed -/// Ok(n) if n == 0 => break, -/// Ok(n) => n, -/// Err(e) => { -/// eprintln!("failed to read from socket; err = {:?}", e); -/// break; -/// } -/// }; -/// -/// // Write the data back -/// if let Err(e) = socket.write_all(&buf[0..n]).await { -/// eprintln!("failed to write to socket; err = {:?}", e); -/// break; -/// } -/// } -/// -/// // Drop the permit, so more tasks can be created -/// drop(permit); +/// let mut buf = [0; 1024]; +/// +/// // In a loop, read data from the socket and write the data back. +/// loop { +/// let n = match socket.read(&mut buf).await { +/// // socket closed +/// Ok(n) if n == 0 => break, +/// Ok(n) => n, +/// Err(e) => { +/// eprintln!("failed to read from socket; err = {:?}", e); +/// break; +/// } +/// }; +/// +/// // Write the data back +/// if let Err(e) = socket.write_all(&buf[0..n]).await { +/// eprintln!("failed to write to socket; err = {:?}", e); +/// break; +/// } +/// } +/// +/// // Drop the permit, so more tasks can be created +/// drop(permit); /// }); /// } /// } From 1a7a25b1f65b3338e8874657dd4a0946f389cc74 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin <62392572+alexanderkirilin@users.noreply.github.com> Date: Tue, 29 Aug 2023 12:50:01 -0500 Subject: [PATCH 11/28] Update tokio/src/sync/semaphore.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/semaphore.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index c9eca1f5109..3f41ed0e493 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -101,6 +101,7 @@ use std::sync::Arc; /// } /// } /// +/// drop(socket); /// // Drop the permit, so more tasks can be created /// drop(permit); /// }); From bd208130b8b193ed9a19001b7cb9e4a159e1223a Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Tue, 29 Aug 2023 13:21:09 -0500 Subject: [PATCH 12/28] Update --- tokio/src/sync/semaphore.rs | 56 ++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 1fda4f59dff..d1ad3f6d49e 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -67,7 +67,7 @@ use std::sync::Arc; /// ```no_run /// use std::sync::Arc; /// use tokio::sync::Semaphore; -/// use tokio::net::TcpListener; +/// use tokio::net::{TcpListener, TcpStream}; /// use tokio::io::{AsyncReadExt, AsyncWriteExt}; /// /// #[tokio::main] @@ -78,36 +78,40 @@ use std::sync::Arc; /// loop { /// // Acquire permit before accepting, else the connection will wait without purpose /// let permit = semaphore.clone().acquire_owned().await.unwrap(); -/// let (mut socket, _) = listener.accept().await?; +/// let (socket, _) = listener.accept().await?; /// /// tokio::spawn(async move { -/// let mut buf = [0; 1024]; -/// -/// // In a loop, read data from the socket and write the data back. -/// loop { -/// let n = match socket.read(&mut buf).await { -/// // socket closed -/// Ok(n) if n == 0 => break, -/// Ok(n) => n, -/// Err(e) => { -/// eprintln!("failed to read from socket; err = {:?}", e); -/// break; -/// } -/// }; -/// -/// // Write the data back -/// if let Err(e) = socket.write_all(&buf[0..n]).await { -/// eprintln!("failed to write to socket; err = {:?}", e); -/// break; -/// } -/// } -/// -/// drop(socket); -/// // Drop the permit, so more tasks can be created -/// drop(permit); +/// handle_connection(socket).await; +/// // Explicitly drop the permit, so more tasks can be created +/// drop(permit); /// }); /// } /// } +/// +/// async fn handle_connection(mut socket: TcpStream) { +/// let mut buf = [0; 1024]; +/// +/// // In a loop, read data from the socket and write the data back. +/// loop { +/// let n = match socket.read(&mut buf).await { +/// // socket closed +/// Ok(n) if n == 0 => break, +/// Ok(n) => n, +/// Err(e) => { +/// eprintln!("failed to read from socket; err = {:?}", e); +/// break; +/// } +/// }; +/// +/// // Write the data back +/// if let Err(e) = socket.write_all(&buf[0..n]).await { +/// eprintln!("failed to write to socket; err = {:?}", e); +/// break; +/// } +/// } +/// +/// // Implicitly drop the socket here +/// } /// ``` /// /// Limit the number of simultaneously opened files in your program. From a8bb79da01870faf84141fea96547e63a2f30aed Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Tue, 29 Aug 2023 17:11:03 -0500 Subject: [PATCH 13/28] Update --- tokio/src/sync/semaphore.rs | 35 +++++------------------------------ 1 file changed, 5 insertions(+), 30 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index d1ad3f6d49e..d33bbac16ce 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -64,11 +64,10 @@ use std::sync::Arc; /// of our tasks to it, the `Semaphore` will continue to exist in a static-like context, until all /// `Arc`s have been dropped. /// -/// ```no_run +/// ```compile_fail /// use std::sync::Arc; /// use tokio::sync::Semaphore; -/// use tokio::net::{TcpListener, TcpStream}; -/// use tokio::io::{AsyncReadExt, AsyncWriteExt}; +/// use tokio::net::{TcpListener}; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { @@ -78,40 +77,16 @@ use std::sync::Arc; /// loop { /// // Acquire permit before accepting, else the connection will wait without purpose /// let permit = semaphore.clone().acquire_owned().await.unwrap(); -/// let (socket, _) = listener.accept().await?; +/// let (mut socket, _) = listener.accept().await?; /// /// tokio::spawn(async move { -/// handle_connection(socket).await; +/// // Do work, and drop socket after +/// handle_connection(&mut socket).await; /// // Explicitly drop the permit, so more tasks can be created /// drop(permit); /// }); /// } /// } -/// -/// async fn handle_connection(mut socket: TcpStream) { -/// let mut buf = [0; 1024]; -/// -/// // In a loop, read data from the socket and write the data back. -/// loop { -/// let n = match socket.read(&mut buf).await { -/// // socket closed -/// Ok(n) if n == 0 => break, -/// Ok(n) => n, -/// Err(e) => { -/// eprintln!("failed to read from socket; err = {:?}", e); -/// break; -/// } -/// }; -/// -/// // Write the data back -/// if let Err(e) = socket.write_all(&buf[0..n]).await { -/// eprintln!("failed to write to socket; err = {:?}", e); -/// break; -/// } -/// } -/// -/// // Implicitly drop the socket here -/// } /// ``` /// /// Limit the number of simultaneously opened files in your program. From d605549011fc8cfaadd3c65084e3f26d7553ae9c Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Tue, 29 Aug 2023 17:13:31 -0500 Subject: [PATCH 14/28] Update --- tokio/src/sync/semaphore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index d33bbac16ce..b56cb0652f3 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -75,7 +75,7 @@ use std::sync::Arc; /// let listener = TcpListener::bind("127.0.0.1:8080").await?; /// /// loop { -/// // Acquire permit before accepting, else the connection will wait without purpose +/// // Acquire permit before accepting, to avoid needlessly taking up a (finite) file descriptor /// let permit = semaphore.clone().acquire_owned().await.unwrap(); /// let (mut socket, _) = listener.accept().await?; /// From ae49b96212551354bf69b76db50889c8ac384b5a Mon Sep 17 00:00:00 2001 From: Alexander Kirilin <62392572+alexanderkirilin@users.noreply.github.com> Date: Thu, 31 Aug 2023 07:16:00 -0500 Subject: [PATCH 15/28] Update tokio/src/sync/semaphore.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/semaphore.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index b56cb0652f3..4d294de1993 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -80,9 +80,11 @@ use std::sync::Arc; /// let (mut socket, _) = listener.accept().await?; /// /// tokio::spawn(async move { -/// // Do work, and drop socket after +/// // Do work using the socket. /// handle_connection(&mut socket).await; -/// // Explicitly drop the permit, so more tasks can be created +/// // Drop socket while the permit is still live. +/// drop(socket); +/// // Drop the permit, so more tasks can be created. /// drop(permit); /// }); /// } From 77d465ba1601f9d6af74a80cb556561311520f73 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Thu, 31 Aug 2023 07:44:58 -0500 Subject: [PATCH 16/28] Update --- tokio/src/sync/semaphore.rs | 90 +++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 43 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 4d294de1993..d833566b76a 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -47,27 +47,56 @@ use std::sync::Arc; /// } /// ``` /// -/// Limit number of incoming requests being handled at the same time. +/// Limit the number of simultaneously opened files in your program. +/// +/// Most operating systems have limits on the number of open file +/// handles. Even in systems without explicit limits, resource constraints +/// implicitly set an upper bound on the number of open files. If your +/// program attempts to open a large number of files and exceeds this +/// limit, it will result in an error. +/// +/// This example uses a Semaphore with 100 permits. By acquiring a permit from +/// the Semaphore before accessing a file, you ensure that your program opens +/// no more than 100 files at a time. When trying to open the 101st +/// file, the program will wait until a permit becomes available before +/// proceeding to open another file. +/// ``` +/// use std::io::Result; +/// use tokio::fs::File; +/// use tokio::sync::Semaphore; +/// use tokio::io::AsyncWriteExt; +/// +/// static PERMITS: Semaphore = Semaphore::const_new(100); +/// +/// async fn write_to_file(message: &[u8]) -> Result<()> { +/// let _permit = PERMITS.acquire().await.unwrap(); +/// let mut buffer = File::create("example.txt").await?; +/// buffer.write_all(message).await?; +/// Ok(()) // Permit goes out of scope here, and is available again for acquisition +/// } +/// ``` /// -/// Similar to limiting the numer of simultaneous opened files, network handles are a limited resource. -/// Allowing an unbounded amount of requests to be processed could result in a denial-of-service, -/// and many other issues. +/// Limit number of incoming requests being handled at the same time. /// -/// However, in contrast to the file example, this example uses a non-static, `Arc`-wrapped `Semaphore` -/// for more fine-grained lifetime and ownership control. The `Arc` allows multiple threads and tasks -/// to share ownership of the semaphore. Here we create a shallow-copy of the `Arc` reference -/// using `clone`. Then we acquire a permit through the `Arc` from the `Semaphore` via [`Semaphore::acquire_owned`], -/// and move it inside the task. This ensures no non-`'static` variables are referenced from within -/// said task. +/// Similar to limiting the number of simultaneous opened files, network handles +/// are a limited resource. Allowing an unbounded amount of requests to be processed +/// could result in a denial-of-service, among many other issues. /// -/// If we leave the scope where the `Arc` was defined, and references still exist in any -/// of our tasks to it, the `Semaphore` will continue to exist in a static-like context, until all -/// `Arc`s have been dropped. +/// However, in contrast to the file example, this example uses an `Arc`. +/// Before we spawn a new task to process a request, we must acquire a permit from +/// this semaphore, in order to limit usage of system resources (like file descriptors, +/// memory usage, CPU time, etc.). Once acquired, a new task is spawned; and once +/// finished, the permit is dropped inside of the task to allow others to spawn. +/// Permits must be acquired via [`Semaphore::acquire_owned`], in order to be +/// movable across the task boundary. Contrastingly, if the semaphore were a global, +/// as in the file-limiting example, only ['Semaphore::acquire'] would be necessary. +/// This is because a `Permit<'static>` would be returned, which is movable across +/// task boundaries, since the semaphore lives forever. /// -/// ```compile_fail +/// ```no_run /// use std::sync::Arc; /// use tokio::sync::Semaphore; -/// use tokio::net::{TcpListener}; +/// use tokio::net::{TcpListener, TcpStream}; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { @@ -89,35 +118,10 @@ use std::sync::Arc; /// }); /// } /// } -/// ``` -/// -/// Limit the number of simultaneously opened files in your program. /// -/// Most operating systems have limits on the number of open file -/// handles. Even in systems without explicit limits, resource constraints -/// implicitly set an upper bound on the number of open files. If your -/// program attempts to open a large number of files and exceeds this -/// limit, it will result in an error. -/// -/// This example uses a Semaphore with 100 permits. By acquiring a permit from -/// the Semaphore before accessing a file, you ensure that your program opens -/// no more than 100 files at a time. When trying to open the 101st -/// file, the program will wait until a permit becomes available before -/// proceeding to open another file. -/// ``` -/// use std::io::Result; -/// use tokio::fs::File; -/// use tokio::sync::Semaphore; -/// use tokio::io::AsyncWriteExt; -/// -/// static PERMITS: Semaphore = Semaphore::const_new(100); -/// -/// async fn write_to_file(message: &[u8]) -> Result<()> { -/// let _permit = PERMITS.acquire().await.unwrap(); -/// let mut buffer = File::create("example.txt").await?; -/// buffer.write_all(message).await?; -/// Ok(()) // Permit goes out of scope here, and is available again for acquisition -/// } +/// # async fn handle_connection(socket: &mut TcpStream) { +/// # todo!() +/// # } /// ``` /// /// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html From 8de8bef051567f3613b9db5cbee75d24984cce07 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Thu, 31 Aug 2023 07:47:30 -0500 Subject: [PATCH 17/28] Update --- tokio/src/sync/semaphore.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index d833566b76a..f34a7600c44 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -76,9 +76,9 @@ use std::sync::Arc; /// } /// ``` /// -/// Limit number of incoming requests being handled at the same time. +/// Limit the number of incoming requests being handled at the same time. /// -/// Similar to limiting the number of simultaneous opened files, network handles +/// Similar to limiting the number of simultaneously opened files, network handles /// are a limited resource. Allowing an unbounded amount of requests to be processed /// could result in a denial-of-service, among many other issues. /// From 8fd7a16e8bc7913c927c5907075ca6ebad5f4e94 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Thu, 31 Aug 2023 07:51:16 -0500 Subject: [PATCH 18/28] Update --- tokio/src/sync/semaphore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index f34a7600c44..0c078e890b1 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -120,7 +120,7 @@ use std::sync::Arc; /// } /// /// # async fn handle_connection(socket: &mut TcpStream) { -/// # todo!() +/// # // Do work /// # } /// ``` /// From b92d933f5d0ca8819d85d6af048d815f584bed90 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin <62392572+alexanderkirilin@users.noreply.github.com> Date: Fri, 1 Sep 2023 08:54:55 -0500 Subject: [PATCH 19/28] Update tokio/src/sync/semaphore.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/semaphore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 0c078e890b1..dbfdce0446a 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -82,7 +82,7 @@ use std::sync::Arc; /// are a limited resource. Allowing an unbounded amount of requests to be processed /// could result in a denial-of-service, among many other issues. /// -/// However, in contrast to the file example, this example uses an `Arc`. +/// This example uses an `Arc` instead of a global variable. /// Before we spawn a new task to process a request, we must acquire a permit from /// this semaphore, in order to limit usage of system resources (like file descriptors, /// memory usage, CPU time, etc.). Once acquired, a new task is spawned; and once From cf4f6fbf245f76fbcb2a8628c237e11d194632bd Mon Sep 17 00:00:00 2001 From: Alexander Kirilin <62392572+alexanderkirilin@users.noreply.github.com> Date: Fri, 1 Sep 2023 08:59:08 -0500 Subject: [PATCH 20/28] Update tokio/src/sync/semaphore.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/semaphore.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index dbfdce0446a..c2bd587b212 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -118,7 +118,6 @@ use std::sync::Arc; /// }); /// } /// } -/// /// # async fn handle_connection(socket: &mut TcpStream) { /// # // Do work /// # } From 87af82544ce80ed370947b0c84f9a91d888e078f Mon Sep 17 00:00:00 2001 From: Alexander Kirilin <62392572+alexanderkirilin@users.noreply.github.com> Date: Fri, 1 Sep 2023 09:01:50 -0500 Subject: [PATCH 21/28] Update tokio/src/sync/semaphore.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/semaphore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index c2bd587b212..fc09af99c6c 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -87,7 +87,7 @@ use std::sync::Arc; /// this semaphore, in order to limit usage of system resources (like file descriptors, /// memory usage, CPU time, etc.). Once acquired, a new task is spawned; and once /// finished, the permit is dropped inside of the task to allow others to spawn. -/// Permits must be acquired via [`Semaphore::acquire_owned`], in order to be +/// Permits must be acquired via [`Semaphore::acquire_owned`] to be /// movable across the task boundary. Contrastingly, if the semaphore were a global, /// as in the file-limiting example, only ['Semaphore::acquire'] would be necessary. /// This is because a `Permit<'static>` would be returned, which is movable across From 05a2900605d0df0e7c317d49898ad73552ae1118 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin <62392572+alexanderkirilin@users.noreply.github.com> Date: Sat, 2 Sep 2023 12:18:36 -0500 Subject: [PATCH 22/28] Update tokio/src/sync/semaphore.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/semaphore.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index fc09af99c6c..a9f74f2ff46 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -88,10 +88,7 @@ use std::sync::Arc; /// memory usage, CPU time, etc.). Once acquired, a new task is spawned; and once /// finished, the permit is dropped inside of the task to allow others to spawn. /// Permits must be acquired via [`Semaphore::acquire_owned`] to be -/// movable across the task boundary. Contrastingly, if the semaphore were a global, -/// as in the file-limiting example, only ['Semaphore::acquire'] would be necessary. -/// This is because a `Permit<'static>` would be returned, which is movable across -/// task boundaries, since the semaphore lives forever. +/// movable across the task boundary. (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.) /// /// ```no_run /// use std::sync::Arc; From f8afd1f6445293661e2990736cccdff33fe243dc Mon Sep 17 00:00:00 2001 From: Alexander Kirilin <62392572+alexanderkirilin@users.noreply.github.com> Date: Sat, 2 Sep 2023 12:20:03 -0500 Subject: [PATCH 23/28] Update tokio/src/sync/semaphore.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/semaphore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index a9f74f2ff46..fcd07b8e678 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -101,7 +101,7 @@ use std::sync::Arc; /// let listener = TcpListener::bind("127.0.0.1:8080").await?; /// /// loop { -/// // Acquire permit before accepting, to avoid needlessly taking up a (finite) file descriptor +/// // Acquire permit before accepting the next socket. /// let permit = semaphore.clone().acquire_owned().await.unwrap(); /// let (mut socket, _) = listener.accept().await?; /// From b4790c49ea0292cd1a493b5026140be0361416b2 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Sat, 2 Sep 2023 12:23:27 -0500 Subject: [PATCH 24/28] Update --- tokio/src/sync/semaphore.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index fcd07b8e678..3b419c362b5 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -83,12 +83,12 @@ use std::sync::Arc; /// could result in a denial-of-service, among many other issues. /// /// This example uses an `Arc` instead of a global variable. -/// Before we spawn a new task to process a request, we must acquire a permit from -/// this semaphore, in order to limit usage of system resources (like file descriptors, -/// memory usage, CPU time, etc.). Once acquired, a new task is spawned; and once -/// finished, the permit is dropped inside of the task to allow others to spawn. -/// Permits must be acquired via [`Semaphore::acquire_owned`] to be -/// movable across the task boundary. (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.) +/// To limit the number of requests that can be processed at the time, +/// we acquire a permit for each task before spawning it. Once acquired, +/// a new task is spawned; and once finished, the permit is dropped inside +/// of the task to allow others to spawn. Permits must be acquired via +/// [`Semaphore::acquire_owned`] to be movable across the task boundary. +/// (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.) /// /// ```no_run /// use std::sync::Arc; From b364ca8b1356e91c8fd402558bc4a38d4f0988ac Mon Sep 17 00:00:00 2001 From: Alexander Kirilin <62392572+alexanderkirilin@users.noreply.github.com> Date: Wed, 6 Sep 2023 07:31:26 -0500 Subject: [PATCH 25/28] Update tokio/src/sync/semaphore.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/semaphore.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 3b419c362b5..e34c34c2e00 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -102,6 +102,9 @@ use std::sync::Arc; /// /// loop { /// // Acquire permit before accepting the next socket. +/// // +/// // We use `acquire_owned` so that we can move `permit` into +/// // other tasks. /// let permit = semaphore.clone().acquire_owned().await.unwrap(); /// let (mut socket, _) = listener.accept().await?; /// From 9180840c89c29801adf77a2fa10cc31080b5275c Mon Sep 17 00:00:00 2001 From: Alexander Kirilin <62392572+alexanderkirilin@users.noreply.github.com> Date: Wed, 6 Sep 2023 07:31:36 -0500 Subject: [PATCH 26/28] Update tokio/src/sync/semaphore.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/semaphore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index e34c34c2e00..2fb79fb8840 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -96,7 +96,7 @@ use std::sync::Arc; /// use tokio::net::{TcpListener, TcpStream}; /// /// #[tokio::main] -/// async fn main() -> Result<(), Box> { +/// async fn main() -> std::io::Result<()> { /// let semaphore = Arc::new(Semaphore::new(3)); /// let listener = TcpListener::bind("127.0.0.1:8080").await?; /// From e18f86c312dbb9ffb5194118e46eb57d3653ee04 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin <62392572+alexanderkirilin@users.noreply.github.com> Date: Wed, 6 Sep 2023 07:31:44 -0500 Subject: [PATCH 27/28] Update tokio/src/sync/semaphore.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/semaphore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 2fb79fb8840..7db2c8f222c 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -93,7 +93,7 @@ use std::sync::Arc; /// ```no_run /// use std::sync::Arc; /// use tokio::sync::Semaphore; -/// use tokio::net::{TcpListener, TcpStream}; +/// use tokio::net::TcpListener; /// /// #[tokio::main] /// async fn main() -> std::io::Result<()> { From 115841222ce6907b781d45aa5c8b1b39eb94bb39 Mon Sep 17 00:00:00 2001 From: Alexander Kirilin Date: Wed, 6 Sep 2023 07:35:35 -0500 Subject: [PATCH 28/28] Update --- tokio/src/sync/semaphore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 7db2c8f222c..b1f9093bba4 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -118,7 +118,7 @@ use std::sync::Arc; /// }); /// } /// } -/// # async fn handle_connection(socket: &mut TcpStream) { +/// # async fn handle_connection(_socket: &mut tokio::net::TcpStream) { /// # // Do work /// # } /// ```