Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add token bucket example to Semaphore #5978

Merged
merged 10 commits into from
Sep 23, 2023
73 changes: 73 additions & 0 deletions tokio/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,79 @@ use std::sync::Arc;
/// }
/// ```
///
/// Implement a simple token bucket for rate limiting
///
/// Many applications and systems have constraints on the rate at which certain
/// operations should occur. Exceeding this rate can result in suboptimal
/// performance or even errors.
///
/// The provided example uses a `TokenBucket`, implemented using a semaphore, that
/// limits operations to a specific rate although token buckets allow short bursts that are faster
/// than the allowed rate. The token bucket will be refilled after each interval. When the rate is exceeded,
/// the `acquire` method will await until a token is available. Note that this implementation is suboptimal
/// when the rate is large, because it consumes a lot of cpu constantly looping and sleeping.
maminrayej marked this conversation as resolved.
Show resolved Hide resolved
/// ```
/// use std::sync::Arc;
/// use tokio::sync::{AcquireError, Semaphore};
/// use tokio::time::{interval, Duration};
///
/// struct TokenBucket {
/// sem: Arc<Semaphore>,
/// jh: tokio::task::JoinHandle<()>,
/// }
///
/// impl TokenBucket {
/// fn new(duration: Duration) -> Self {
/// let rate = (1.0 / duration.as_secs_f32()) as usize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're only using this to compute the maximum capacity. Instead, I suggest making the capacity into a separate argument.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the capacity is calculated based on the duration. If one specifies the rate, the duration is determined, and visa versa. Do you mean two new functions, one with duration and one with rate as an argument?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I mean one function with both parameters. Token buckets usually don't compute the capacity based on the duration.

/// let sem = Arc::new(Semaphore::new(rate));
///
/// // refills the tokens at the end of each interval
/// let jh = tokio::spawn({
/// let sem = sem.clone();
/// let mut interval = interval(duration);
/// interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
///
/// async move {
/// loop {
/// interval.tick().await;
///
/// let cap = rate - sem.available_permits();
/// sem.add_permits(cap.min(1));
maminrayej marked this conversation as resolved.
Show resolved Hide resolved
/// }
/// }
/// });
///
/// Self { jh, sem }
/// }
///
/// async fn acquire(&self) -> Result<(), AcquireError> {
/// self.sem.acquire().await.map(|p| p.forget())
/// }
///
/// async fn close(self) {
/// self.jh.abort();
/// let _ = self.jh.await;
///
/// self.sem.close();
maminrayej marked this conversation as resolved.
Show resolved Hide resolved
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let ops = 1.0; // operation per second
/// let update_interval = 1.0 / ops;
/// let bucket = TokenBucket::new(Duration::from_secs_f32(update_interval));
///
/// for _ in 0..5 {
/// bucket.acquire().await.unwrap();
///
/// // do the operation
/// }
///
/// bucket.close().await;
/// }
/// ```
///
/// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html
/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
#[derive(Debug)]
Expand Down