Skip to content

Commit

Permalink
resolve most of the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
maminrayej committed Sep 8, 2023
1 parent 488b8f6 commit 199b52f
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions tokio/src/sync/semaphore.rs
Expand Up @@ -109,31 +109,34 @@ use std::sync::Arc;
/// performance or even errors.
///
/// The provided example uses a `TokenBucket`, implemented using a semaphore, that
/// limits operations to a specific rate. The token bucket will be refilled gradually.
/// When the rate is exceeded, the `acquire` method will await until a token is available.
/// limits operations to a specific rate although token buckets allow short bursts that are faster
/// than the allowed rate. The token bucket will be refilled after each interval. When the rate is exceeded,
/// the `acquire` method will await until a token is available. Note that this implementation is suboptimal
/// when the rate is large, because it consumes a lot of cpu constantly looping and sleeping.
/// ```
/// use std::sync::Arc;
/// use tokio::sync::{AcquireError, Semaphore, SemaphorePermit};
/// use tokio::time::{sleep, Duration};
/// use tokio::sync::{AcquireError, Semaphore};
/// use tokio::time::{interval, Duration};
///
/// struct TokenBucket {
/// sem: Arc<Semaphore>,
/// jh: tokio::task::JoinHandle<()>,
/// }
///
/// impl TokenBucket {
/// fn new(rate: usize) -> Self {
/// fn new(duration: Duration) -> Self {
/// let rate = (1.0 / duration.as_secs_f32()) as usize;
/// let sem = Arc::new(Semaphore::new(rate));
///
/// // refills the permits each 1/rate seconds.
/// // refills the tokens at the end of each interval
/// let jh = tokio::spawn({
/// let sem = sem.clone();
/// let mut interval = interval(duration);
/// interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
///
/// async move {
/// let time_slice = 1.0 / (rate as f32);
///
/// loop {
/// sleep(Duration::from_secs_f32(time_slice)).await;
/// interval.tick().await;
///
/// let cap = rate - sem.available_permits();
/// sem.add_permits(std::cmp::min(cap, 1));
Expand All @@ -144,8 +147,8 @@ use std::sync::Arc;
/// Self { jh, sem }
/// }
///
/// async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
/// self.sem.acquire().await
/// async fn acquire(&self) -> Result<(), AcquireError> {
/// self.sem.acquire().await.map(|p| p.forget())
/// }
///
/// async fn close(self) {
Expand All @@ -158,14 +161,12 @@ use std::sync::Arc;
///
/// #[tokio::main]
/// async fn main() {
/// let bucket = TokenBucket::new(1);
/// let ops = 1.0; // operation per second
/// let update_interval = 1.0 / ops;
/// let bucket = TokenBucket::new(Duration::from_secs_f32(update_interval));
///
/// for _ in 0..5 {
/// bucket
/// .acquire()
/// .await
/// .map(|permit| permit.forget())
/// .unwrap();
/// bucket.acquire().await.unwrap();
///
/// // do the operation
/// }
Expand Down

0 comments on commit 199b52f

Please sign in to comment.