Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorendorff committed Nov 14, 2022
1 parent d6ac8e2 commit 5d38959
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 51 deletions.
4 changes: 1 addition & 3 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,6 @@ impl Spawner {
R: Send + 'static,
{
let fut = BlockingTask::new(func);
#[cfg(feature = "test-util")]
let fut = crate::time::inhibit_auto_advance(fut);
let id = task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
Expand All @@ -381,7 +379,7 @@ impl Spawner {
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;

let (task, handle) = task::unowned(fut, BlockingSchedule, id);
let (task, handle) = task::unowned(fut, BlockingSchedule::new(), id);

let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
(handle, spawned)
Expand Down
19 changes: 17 additions & 2 deletions tokio/src/runtime/blocking/schedule.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
use crate::runtime::task::{self, Task};
use crate::time::Clock;

/// `task::Schedule` implementation that does nothing (except some bookkeeping
/// in test-util builds). This is unique to the blocking scheduler as tasks
/// scheduled are not really futures but blocking operations.
///
/// We avoid storing the task by forgetting it in `bind` and re-materializing it
/// in `release`.
pub(crate) struct BlockingSchedule;
pub(crate) struct BlockingSchedule {
#[cfg(feature = "test-util")]
clock: Clock,
}

impl BlockingSchedule {
pub(crate) fn new() -> Self {
BlockingSchedule {
#[cfg(feature = "test-util")]
clock: crate::time::inhibit_auto_advance(),
}
}
}

impl task::Schedule for BlockingSchedule {
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
#[cfg(feature = "test-util")]
crate::time::allow_auto_advance();
{
self.clock.allow_auto_advance();
}
None
}

Expand Down
50 changes: 6 additions & 44 deletions tokio/src/time/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ cfg_not_test_util! {
}

cfg_test_util! {
use std::future::Future;

use crate::time::{Duration, Instant};
use crate::loom::sync::{Arc, Mutex};

Expand Down Expand Up @@ -134,47 +132,11 @@ cfg_test_util! {
inner.unfrozen = Some(std::time::Instant::now());
}

struct AutoAdvanceInhibit(Clock);

impl Drop for AutoAdvanceInhibit {
fn drop(&mut self) {
self.0.allow_auto_advance();
}
}

/// Temporarily stop auto-advancing the clock (see `tokio::time::pause`)
/// and decorate the given future with code to re-enable auto-advance if
/// it is dropped before producing a result.
///
/// If the future is polled and returns `Ready`, it becomes the caller's
/// responsibility to call `allow_auto_advance`. This quirk is to help
/// avoid a race between blocking task runners and a paused runtime.
pub(crate) fn inhibit_auto_advance<F>(fut: F) -> impl Future<Output = F::Output> + Send + 'static
where
F: Future + Send + 'static,
{
// Bump the inhibit count immediately, not inside the async block, to
// avoid a race condition when used by spawn_blocking.
let guard = clock().map(|clock| {
clock.inhibit_auto_advance();
AutoAdvanceInhibit(clock)
});
async move {
let result = fut.await;
// On success, do not resume auto-advance. It must be done on the
// main tokio thread (`BlockingSchedule::release`) to avoid a race.
std::mem::forget(guard);
result
}
}


/// Resume auto-advance. This should only be called to balance out a
/// previous call to `inhibit_auto_advance`.
pub(crate) fn allow_auto_advance() {
if let Some(clock) = clock() {
clock.allow_auto_advance();
}
/// Temporarily stop auto-advancing the clock (see `tokio::time::pause`).
pub(crate) fn inhibit_auto_advance() -> Clock {
let clock = clock().expect("can't inhibit auto-advance from outside the Tokio runtime");
clock.inhibit_auto_advance();
clock
}

/// Advances time.
Expand Down Expand Up @@ -266,7 +228,7 @@ cfg_test_util! {
inner.auto_advance_inhibit_count += 1;
}

fn allow_auto_advance(&self) {
pub(crate) fn allow_auto_advance(&self) {
let mut inner = self.inner.lock();
inner.auto_advance_inhibit_count -= 1;
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@
mod clock;
pub(crate) use self::clock::Clock;
#[cfg(feature = "test-util")]
pub use clock::{advance, pause, resume};
pub(crate) use clock::inhibit_auto_advance;
#[cfg(feature = "test-util")]
pub(crate) use clock::{allow_auto_advance, inhibit_auto_advance};
pub use clock::{advance, pause, resume};

pub mod error;

Expand Down

0 comments on commit 5d38959

Please sign in to comment.