From 6d16ce1afcf84cae89b29ec0b8c4de591f22a8ec Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Fri, 21 Oct 2022 17:28:21 -0400 Subject: [PATCH 01/15] Don't auto-advance time when a spawn_blocking task is running. Time shouldn't auto-advance unless there's really nothing else going on. In general that is hard to fix, but this special case is relatively easy. Fixes: #4614 --- tokio/src/runtime/blocking/pool.rs | 4 +++ tokio/src/runtime/blocking/task.rs | 7 ++++- tokio/src/runtime/time/mod.rs | 2 +- tokio/src/time/clock.rs | 42 ++++++++++++++++++++++++++-- tokio/src/time/mod.rs | 2 ++ tokio/tests/task_blocking.rs | 44 +++++++++++++++++++++++++++++- 6 files changed, 96 insertions(+), 5 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 9c536141996..60ddf86434c 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -380,6 +380,10 @@ impl Spawner { let _ = name; let (task, handle) = task::unowned(fut, NoopSchedule, id); + + #[cfg(feature = "test-util")] + crate::time::inhibit_auto_advance(); + let spawned = self.spawn_task(Task::new(task, is_mandatory), rt); (handle, spawned) } diff --git a/tokio/src/runtime/blocking/task.rs b/tokio/src/runtime/blocking/task.rs index c4461754005..5e9f70a0bb2 100644 --- a/tokio/src/runtime/blocking/task.rs +++ b/tokio/src/runtime/blocking/task.rs @@ -39,6 +39,11 @@ where // we want it to start without any budgeting. crate::runtime::coop::stop(); - Poll::Ready(func()) + let r = func(); + + #[cfg(feature = "test-util")] + crate::time::allow_auto_advance(); + + Poll::Ready(r) } } diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 240f8f16e6d..f81cab8cc35 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -222,7 +222,7 @@ impl Driver { let handle = rt_handle.time(); let clock = &handle.time_source.clock; - if clock.is_paused() { + if clock.can_auto_advance() { self.park.park_timeout(rt_handle, Duration::from_secs(0)); // If the time driver was woken, then the park completed diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index 0343c4f4cf0..02fc8921376 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -65,6 +65,9 @@ cfg_test_util! { /// Instant at which the clock was last unfrozen. unfrozen: Option, + + /// Number of `inhibit_auto_advance` calls still in effect. + auto_advance_inhibit_count: usize, } /// Pauses time. @@ -129,6 +132,30 @@ cfg_test_util! { inner.unfrozen = Some(std::time::Instant::now()); } + /// Stop auto-advancing the clock (see `tokio::time::pause`) until + /// `allow_auto_advance` is called. + /// + /// # Panics + /// + /// Panics if called from outsie of a `current_thread` Tokio runtime. + #[track_caller] + pub(crate) fn inhibit_auto_advance() { + let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); + clock.inhibit_auto_advance(); + } + + /// Resume auto-advance. This should only be called to balance out a previous + /// call to `inhibit_auto_advance`. + /// + /// # Panics + /// + /// Panics if called from outsie of a `current_thread` Tokio runtime. + #[track_caller] + pub(crate) fn allow_auto_advance() { + let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); + clock.allow_auto_advance(); + } + /// Advances time. /// /// Increments the saved `Instant::now()` value by `duration`. Subsequent @@ -187,6 +214,7 @@ cfg_test_util! { enable_pausing, base: now, unfrozen: Some(now), + auto_advance_inhibit_count: 0, })), }; @@ -212,9 +240,19 @@ cfg_test_util! { inner.unfrozen = None; } - pub(crate) fn is_paused(&self) -> bool { + fn inhibit_auto_advance(&self) { + let mut inner = self.inner.lock(); + inner.auto_advance_inhibit_count += 1; + } + + fn allow_auto_advance(&self) { + let mut inner = self.inner.lock(); + inner.auto_advance_inhibit_count -= 1; + } + + pub(crate) fn can_auto_advance(&self) -> bool { let inner = self.inner.lock(); - inner.unfrozen.is_none() + inner.unfrozen.is_none() && inner.auto_advance_inhibit_count == 0 } #[track_caller] diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index a1f27b839e9..e15b674943f 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -88,6 +88,8 @@ mod clock; pub(crate) use self::clock::Clock; #[cfg(feature = "test-util")] pub use clock::{advance, pause, resume}; +#[cfg(feature = "test-util")] +pub(crate) use clock::{allow_auto_advance, inhibit_auto_advance}; pub mod error; diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index e5879332d0e..a5c3b3a7701 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads -use tokio::{runtime, task}; +use tokio::{runtime, task, time}; use tokio_test::assert_ok; use std::thread; @@ -226,3 +226,45 @@ fn coop_disabled_in_block_in_place_in_block_on() { done_rx.recv().unwrap().unwrap(); } + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn blocking_when_paused() { + // Do not auto-advance time when we have started a blocking task that has + // not yet finished. + time::timeout( + Duration::from_secs(3), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(250))), + ) + .await + .expect("timeout should not trigger") + .expect("blocking task should finish"); + + // Really: Do not auto-advance time, even if the timeout is short and the + // blocking task runs for longer than that. It doesn't matter: Tokio time + // is paused; system time is not. + time::timeout( + Duration::from_millis(1), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(250))), + ) + .await + .expect("timeout should not trigger") + .expect("blocking task should finish"); +} + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + time::timeout( + Duration::from_secs(15), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(250))), + ) + .await + .expect("timeout should not trigger") + .expect("blocking task should finish"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +} From 282236f3306b2e949c7657abbb929bfb8c8db28c Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Wed, 26 Oct 2022 10:19:04 -0500 Subject: [PATCH 02/15] spawn_blocking: Re-enable auto-advance if the task panics. This uses a destructor, so it will also work if tokio machinery panics while trying to e.g. spawn a thread. --- tokio/src/runtime/blocking/pool.rs | 6 ++--- tokio/src/runtime/blocking/task.rs | 7 +---- tokio/src/time/clock.rs | 42 ++++++++++++++++-------------- tokio/src/time/mod.rs | 4 +-- tokio/tests/task_blocking.rs | 20 ++++++++++++++ 5 files changed, 49 insertions(+), 30 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 60ddf86434c..9308fae20b9 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -379,10 +379,10 @@ impl Spawner { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let _ = name; - let (task, handle) = task::unowned(fut, NoopSchedule, id); - #[cfg(feature = "test-util")] - crate::time::inhibit_auto_advance(); + let fut = crate::time::inhibit_auto_advance(fut); + + let (task, handle) = task::unowned(fut, NoopSchedule, id); let spawned = self.spawn_task(Task::new(task, is_mandatory), rt); (handle, spawned) diff --git a/tokio/src/runtime/blocking/task.rs b/tokio/src/runtime/blocking/task.rs index 5e9f70a0bb2..c4461754005 100644 --- a/tokio/src/runtime/blocking/task.rs +++ b/tokio/src/runtime/blocking/task.rs @@ -39,11 +39,6 @@ where // we want it to start without any budgeting. crate::runtime::coop::stop(); - let r = func(); - - #[cfg(feature = "test-util")] - crate::time::allow_auto_advance(); - - Poll::Ready(r) + Poll::Ready(func()) } } diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index 02fc8921376..0b10885141b 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -28,6 +28,8 @@ cfg_not_test_util! { } cfg_test_util! { + use std::future::Future; + use crate::time::{Duration, Instant}; use crate::loom::sync::{Arc, Mutex}; @@ -132,28 +134,30 @@ cfg_test_util! { inner.unfrozen = Some(std::time::Instant::now()); } - /// Stop auto-advancing the clock (see `tokio::time::pause`) until - /// `allow_auto_advance` is called. - /// - /// # Panics - /// - /// Panics if called from outsie of a `current_thread` Tokio runtime. - #[track_caller] - pub(crate) fn inhibit_auto_advance() { - let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - clock.inhibit_auto_advance(); + struct AutoAdvanceInhibit(Clock); + + impl Drop for AutoAdvanceInhibit { + fn drop(&mut self) { + self.0.allow_auto_advance(); + } } - /// Resume auto-advance. This should only be called to balance out a previous - /// call to `inhibit_auto_advance`. - /// - /// # Panics + /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`) + /// and decorate the given future with code to re-enable auto-advance when + /// it returns `Ready` or is dropped. /// - /// Panics if called from outsie of a `current_thread` Tokio runtime. - #[track_caller] - pub(crate) fn allow_auto_advance() { - let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - clock.allow_auto_advance(); + /// This is a no-op when called from outside the Tokio runtime. + pub(crate) fn inhibit_auto_advance(fut: F) -> impl Future { + // Bump the inhibit count immediately, not inside the async block, to + // avoid a race condition when used by spawn_blocking. + let guard = clock().map(|clock| { + clock.inhibit_auto_advance(); + AutoAdvanceInhibit(clock) + }); + async move { + let _guard = guard; + fut.await + } } /// Advances time. diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index e15b674943f..42715de90f6 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -87,9 +87,9 @@ mod clock; pub(crate) use self::clock::Clock; #[cfg(feature = "test-util")] -pub use clock::{advance, pause, resume}; +pub(crate) use clock::inhibit_auto_advance; #[cfg(feature = "test-util")] -pub(crate) use clock::{allow_auto_advance, inhibit_auto_advance}; +pub use clock::{advance, pause, resume}; pub mod error; diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index a5c3b3a7701..5859368b4cd 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -268,3 +268,23 @@ async fn blocking_task_wakes_paused_runtime() { "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" ); } + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn panicking_blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + let result = time::timeout( + Duration::from_secs(15), + task::spawn_blocking(|| { + thread::sleep(Duration::from_millis(250)); + panic!("blocking task panicked"); + }), + ) + .await + .expect("timeout should not trigger"); + assert!(result.is_err(), "blocking task should have panicked"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +} From 53053f96013d1202369df82a37598796fe5fc158 Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Wed, 26 Oct 2022 15:09:11 -0500 Subject: [PATCH 03/15] Try to fix types to work in all feature combinations. --- tokio/src/runtime/blocking/pool.rs | 5 ++--- tokio/src/time/clock.rs | 5 ++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 9308fae20b9..34e00ddefc8 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -359,6 +359,8 @@ impl Spawner { R: Send + 'static, { let fut = BlockingTask::new(func); + #[cfg(feature = "test-util")] + let fut = crate::time::inhibit_auto_advance(fut); let id = task::Id::next(); #[cfg(all(tokio_unstable, feature = "tracing"))] let fut = { @@ -379,9 +381,6 @@ impl Spawner { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let _ = name; - #[cfg(feature = "test-util")] - let fut = crate::time::inhibit_auto_advance(fut); - let (task, handle) = task::unowned(fut, NoopSchedule, id); let spawned = self.spawn_task(Task::new(task, is_mandatory), rt); diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index 0b10885141b..0ac5b2eb73d 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -147,7 +147,10 @@ cfg_test_util! { /// it returns `Ready` or is dropped. /// /// This is a no-op when called from outside the Tokio runtime. - pub(crate) fn inhibit_auto_advance(fut: F) -> impl Future { + pub(crate) fn inhibit_auto_advance(fut: F) -> impl Future + Send + 'static + where + F: Future + Send + 'static, + { // Bump the inhibit count immediately, not inside the async block, to // avoid a race condition when used by spawn_blocking. let guard = clock().map(|clock| { From 0474462fa1361ab810732d420177e876935fed20 Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Mon, 7 Nov 2022 16:51:47 -0600 Subject: [PATCH 04/15] Add a loom test that seems to pass. --- tokio/src/runtime/tests/loom_blocking.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tokio/src/runtime/tests/loom_blocking.rs b/tokio/src/runtime/tests/loom_blocking.rs index 89de85e4362..3ecb967a4fa 100644 --- a/tokio/src/runtime/tests/loom_blocking.rs +++ b/tokio/src/runtime/tests/loom_blocking.rs @@ -73,6 +73,26 @@ fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread }); } +#[test] +fn spawn_blocking_when_paused() { + use std::time::Duration; + loom::model(|| { + let rt = crate::runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + let handle = rt.handle(); + let _enter = handle.enter(); + rt.block_on(crate::time::timeout( + Duration::from_millis(1), + crate::task::spawn_blocking(|| {}), + )) + .expect("timeout should not trigger") + .expect("blocking task should finish"); + }); +} + fn mk_runtime(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) From 58875586bdec8c4184a00750fcb9e4e7eb2d3a5a Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Mon, 7 Nov 2022 16:58:39 -0600 Subject: [PATCH 05/15] Modify the test so that it fails under loom. --- tokio/src/runtime/tests/loom_blocking.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/tests/loom_blocking.rs b/tokio/src/runtime/tests/loom_blocking.rs index 3ecb967a4fa..b98ee6ee959 100644 --- a/tokio/src/runtime/tests/loom_blocking.rs +++ b/tokio/src/runtime/tests/loom_blocking.rs @@ -84,12 +84,16 @@ fn spawn_blocking_when_paused() { .unwrap(); let handle = rt.handle(); let _enter = handle.enter(); + let a = crate::task::spawn_blocking(|| {}); + let b = crate::task::spawn_blocking(|| {}); rt.block_on(crate::time::timeout( Duration::from_millis(1), - crate::task::spawn_blocking(|| {}), + async move { + a.await.expect("blocking task should finish"); + b.await.expect("blocking task should finish"); + } )) - .expect("timeout should not trigger") - .expect("blocking task should finish"); + .expect("timeout should not trigger"); }); } From d62c616ed1d23e85168fd8bf16d7319170ae3803 Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Tue, 8 Nov 2022 10:19:32 -0600 Subject: [PATCH 06/15] Rename NoopSchedule to BlockingSchedule. --- tokio/src/runtime/blocking/mod.rs | 2 +- tokio/src/runtime/blocking/pool.rs | 8 ++++---- tokio/src/runtime/blocking/schedule.rs | 6 +++--- tokio/src/runtime/tests/loom_queue.rs | 4 ++-- tokio/src/runtime/tests/mod.rs | 10 +++++----- tokio/src/runtime/tests/task.rs | 16 ++++++++-------- 6 files changed, 23 insertions(+), 23 deletions(-) diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 88bdcfd6421..5b24fbfa463 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -18,7 +18,7 @@ mod schedule; mod shutdown; mod task; #[cfg(all(test, not(tokio_wasm)))] -pub(crate) use schedule::NoopSchedule; +pub(crate) use schedule::BlockingSchedule; pub(crate) use task::BlockingTask; use crate::runtime::Builder; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 34e00ddefc8..1713f7308bb 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -2,7 +2,7 @@ use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; -use crate::runtime::blocking::schedule::NoopSchedule; +use crate::runtime::blocking::schedule::BlockingSchedule; use crate::runtime::blocking::{shutdown, BlockingTask}; use crate::runtime::builder::ThreadNameFn; use crate::runtime::task::{self, JoinHandle}; @@ -120,7 +120,7 @@ struct Shared { } pub(crate) struct Task { - task: task::UnownedTask, + task: task::UnownedTask, mandatory: Mandatory, } @@ -151,7 +151,7 @@ impl From for io::Error { } impl Task { - pub(crate) fn new(task: task::UnownedTask, mandatory: Mandatory) -> Task { + pub(crate) fn new(task: task::UnownedTask, mandatory: Mandatory) -> Task { Task { task, mandatory } } @@ -381,7 +381,7 @@ impl Spawner { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let _ = name; - let (task, handle) = task::unowned(fut, NoopSchedule, id); + let (task, handle) = task::unowned(fut, BlockingSchedule, id); let spawned = self.spawn_task(Task::new(task, is_mandatory), rt); (handle, spawned) diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs index 54252241d94..28626e3dfb8 100644 --- a/tokio/src/runtime/blocking/schedule.rs +++ b/tokio/src/runtime/blocking/schedule.rs @@ -5,10 +5,10 @@ use crate::runtime::task::{self, Task}; /// operations. /// /// We avoid storing the task by forgetting it in `bind` and re-materializing it -/// in `release. -pub(crate) struct NoopSchedule; +/// in `release`. +pub(crate) struct BlockingSchedule; -impl task::Schedule for NoopSchedule { +impl task::Schedule for BlockingSchedule { fn release(&self, _task: &Task) -> Option> { None } diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index 8d4e1d384e2..4f1e938906f 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,4 +1,4 @@ -use crate::runtime::blocking::NoopSchedule; +use crate::runtime::blocking::BlockingSchedule; use crate::runtime::scheduler::multi_thread::queue; use crate::runtime::task::Inject; use crate::runtime::MetricsBatch; @@ -117,7 +117,7 @@ fn steal_overflow() { fn multi_stealer() { const NUM_TASKS: usize = 5; - fn steal_tasks(steal: queue::Steal) -> usize { + fn steal_tasks(steal: queue::Steal) -> usize { let mut metrics = MetricsBatch::new(); let (_, mut local) = queue::local(); diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index b4b8cb45844..2659836d9e0 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -5,11 +5,11 @@ use self::unowned_wrapper::unowned; mod unowned_wrapper { - use crate::runtime::blocking::NoopSchedule; + use crate::runtime::blocking::BlockingSchedule; use crate::runtime::task::{Id, JoinHandle, Notified}; #[cfg(all(tokio_unstable, feature = "tracing"))] - pub(crate) fn unowned(task: T) -> (Notified, JoinHandle) + pub(crate) fn unowned(task: T) -> (Notified, JoinHandle) where T: std::future::Future + Send + 'static, T::Output: Send + 'static, @@ -17,17 +17,17 @@ mod unowned_wrapper { use tracing::Instrument; let span = tracing::trace_span!("test_span"); let task = task.instrument(span); - let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, Id::next()); + let (task, handle) = crate::runtime::task::unowned(task, BlockingSchedule, Id::next()); (task.into_notified(), handle) } #[cfg(not(all(tokio_unstable, feature = "tracing")))] - pub(crate) fn unowned(task: T) -> (Notified, JoinHandle) + pub(crate) fn unowned(task: T) -> (Notified, JoinHandle) where T: std::future::Future + Send + 'static, T::Output: Send + 'static, { - let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, Id::next()); + let (task, handle) = crate::runtime::task::unowned(task, BlockingSchedule, Id::next()); (task.into_notified(), handle) } } diff --git a/tokio/src/runtime/tests/task.rs b/tokio/src/runtime/tests/task.rs index 173e5b0b23f..199a6971e5f 100644 --- a/tokio/src/runtime/tests/task.rs +++ b/tokio/src/runtime/tests/task.rs @@ -1,4 +1,4 @@ -use crate::runtime::blocking::NoopSchedule; +use crate::runtime::blocking::BlockingSchedule; use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task}; use crate::util::TryLock; @@ -54,7 +54,7 @@ fn create_drop1() { drop(ad); unreachable!() }, - NoopSchedule, + BlockingSchedule, Id::next(), ); drop(notified); @@ -71,7 +71,7 @@ fn create_drop2() { drop(ad); unreachable!() }, - NoopSchedule, + BlockingSchedule, Id::next(), ); drop(join); @@ -88,7 +88,7 @@ fn drop_abort_handle1() { drop(ad); unreachable!() }, - NoopSchedule, + BlockingSchedule, Id::next(), ); let abort = join.abort_handle(); @@ -108,7 +108,7 @@ fn drop_abort_handle2() { drop(ad); unreachable!() }, - NoopSchedule, + BlockingSchedule, Id::next(), ); let abort = join.abort_handle(); @@ -129,7 +129,7 @@ fn create_shutdown1() { drop(ad); unreachable!() }, - NoopSchedule, + BlockingSchedule, Id::next(), ); drop(join); @@ -146,7 +146,7 @@ fn create_shutdown2() { drop(ad); unreachable!() }, - NoopSchedule, + BlockingSchedule, Id::next(), ); handle.assert_not_dropped(); @@ -157,7 +157,7 @@ fn create_shutdown2() { #[test] fn unowned_poll() { - let (task, _) = unowned(async {}, NoopSchedule, Id::next()); + let (task, _) = unowned(async {}, BlockingSchedule, Id::next()); task.run(); } From 0e6f71b4e36d74121bf2f5a4c4dd75ac65f931b7 Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Tue, 8 Nov 2022 10:27:40 -0600 Subject: [PATCH 07/15] Attempt to fix race condition with blocking tasks and paused time. The auto-advance inhibit count must be decremented after the blocking task is finished, and crucially after waking the JoinHandle, to prevent the tokio thread from auto-advancing before tasks awaiting that JoinHandle are rescheduled. --- tokio/src/runtime/blocking/schedule.rs | 8 +++++--- tokio/src/time/clock.rs | 24 +++++++++++++++++++----- tokio/src/time/mod.rs | 4 ++-- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs index 28626e3dfb8..251dfc41e4e 100644 --- a/tokio/src/runtime/blocking/schedule.rs +++ b/tokio/src/runtime/blocking/schedule.rs @@ -1,8 +1,8 @@ use crate::runtime::task::{self, Task}; -/// `task::Schedule` implementation that does nothing. This is unique to the -/// blocking scheduler as tasks scheduled are not really futures but blocking -/// operations. +/// `task::Schedule` implementation that does nothing (except some bookkeeping +/// in test-util builds). This is unique to the blocking scheduler as tasks +/// scheduled are not really futures but blocking operations. /// /// We avoid storing the task by forgetting it in `bind` and re-materializing it /// in `release`. @@ -10,6 +10,8 @@ pub(crate) struct BlockingSchedule; impl task::Schedule for BlockingSchedule { fn release(&self, _task: &Task) -> Option> { + #[cfg(feature = "test-util")] + crate::time::allow_auto_advance(); None } diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index 0ac5b2eb73d..a6c1a7ae14f 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -143,10 +143,12 @@ cfg_test_util! { } /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`) - /// and decorate the given future with code to re-enable auto-advance when - /// it returns `Ready` or is dropped. + /// and decorate the given future with code to re-enable auto-advance if + /// it is dropped before producing a result. /// - /// This is a no-op when called from outside the Tokio runtime. + /// If the future is polled and returns `Ready`, it becomes the caller's + /// responsibility to call `allow_auto_advance`. This quirk is to help + /// avoid a race between blocking task runners and a paused runtime. pub(crate) fn inhibit_auto_advance(fut: F) -> impl Future + Send + 'static where F: Future + Send + 'static, @@ -158,8 +160,20 @@ cfg_test_util! { AutoAdvanceInhibit(clock) }); async move { - let _guard = guard; - fut.await + let result = fut.await; + // On success, do not resume auto-advance. It must be done on the + // main tokio thread (`BlockingSchedule::release`) to avoid a race. + std::mem::forget(guard); + result + } + } + + + /// Resume auto-advance. This should only be called to balance out a + /// previous call to `inhibit_auto_advance`. + pub(crate) fn allow_auto_advance() { + if let Some(clock) = clock() { + clock.allow_auto_advance(); } } diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index 42715de90f6..e15b674943f 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -87,9 +87,9 @@ mod clock; pub(crate) use self::clock::Clock; #[cfg(feature = "test-util")] -pub(crate) use clock::inhibit_auto_advance; -#[cfg(feature = "test-util")] pub use clock::{advance, pause, resume}; +#[cfg(feature = "test-util")] +pub(crate) use clock::{allow_auto_advance, inhibit_auto_advance}; pub mod error; From b8aee576bfd7f86754cccb66b80c99e8d3a68ecf Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Thu, 10 Nov 2022 15:23:30 -0600 Subject: [PATCH 08/15] Move NoopSchedule into the test directory. --- tokio/src/runtime/blocking/mod.rs | 2 -- tokio/src/runtime/tests/loom_queue.rs | 4 ++-- tokio/src/runtime/tests/mod.rs | 28 ++++++++++++++++++++++----- tokio/src/runtime/tests/task.rs | 16 +++++++-------- 4 files changed, 33 insertions(+), 17 deletions(-) diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 5b24fbfa463..c42924be77d 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -17,8 +17,6 @@ cfg_trace! { mod schedule; mod shutdown; mod task; -#[cfg(all(test, not(tokio_wasm)))] -pub(crate) use schedule::BlockingSchedule; pub(crate) use task::BlockingTask; use crate::runtime::Builder; diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index 4f1e938906f..fc93bf3e4a2 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,6 +1,6 @@ -use crate::runtime::blocking::BlockingSchedule; use crate::runtime::scheduler::multi_thread::queue; use crate::runtime::task::Inject; +use crate::runtime::tests::NoopSchedule; use crate::runtime::MetricsBatch; use loom::thread; @@ -117,7 +117,7 @@ fn steal_overflow() { fn multi_stealer() { const NUM_TASKS: usize = 5; - fn steal_tasks(steal: queue::Steal) -> usize { + fn steal_tasks(steal: queue::Steal) -> usize { let mut metrics = MetricsBatch::new(); let (_, mut local) = queue::local(); diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index 2659836d9e0..2f63308e70c 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -2,14 +2,32 @@ // other code when running loom tests. #![cfg_attr(loom, warn(dead_code, unreachable_pub))] +use self::noop_scheduler::NoopSchedule; use self::unowned_wrapper::unowned; +mod noop_scheduler { + use crate::runtime::task::{self, Task}; + + /// `task::Schedule` implementation that does nothing, for testing. + pub(crate) struct NoopSchedule; + + impl task::Schedule for NoopSchedule { + fn release(&self, _task: &Task) -> Option> { + None + } + + fn schedule(&self, _task: task::Notified) { + unreachable!(); + } + } +} + mod unowned_wrapper { - use crate::runtime::blocking::BlockingSchedule; use crate::runtime::task::{Id, JoinHandle, Notified}; + use crate::runtime::tests::NoopSchedule; #[cfg(all(tokio_unstable, feature = "tracing"))] - pub(crate) fn unowned(task: T) -> (Notified, JoinHandle) + pub(crate) fn unowned(task: T) -> (Notified, JoinHandle) where T: std::future::Future + Send + 'static, T::Output: Send + 'static, @@ -17,17 +35,17 @@ mod unowned_wrapper { use tracing::Instrument; let span = tracing::trace_span!("test_span"); let task = task.instrument(span); - let (task, handle) = crate::runtime::task::unowned(task, BlockingSchedule, Id::next()); + let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, Id::next()); (task.into_notified(), handle) } #[cfg(not(all(tokio_unstable, feature = "tracing")))] - pub(crate) fn unowned(task: T) -> (Notified, JoinHandle) + pub(crate) fn unowned(task: T) -> (Notified, JoinHandle) where T: std::future::Future + Send + 'static, T::Output: Send + 'static, { - let (task, handle) = crate::runtime::task::unowned(task, BlockingSchedule, Id::next()); + let (task, handle) = crate::runtime::task::unowned(task, NoopSchedule, Id::next()); (task.into_notified(), handle) } } diff --git a/tokio/src/runtime/tests/task.rs b/tokio/src/runtime/tests/task.rs index 199a6971e5f..3aca07c6aff 100644 --- a/tokio/src/runtime/tests/task.rs +++ b/tokio/src/runtime/tests/task.rs @@ -1,4 +1,4 @@ -use crate::runtime::blocking::BlockingSchedule; +use crate::runtime::tests::NoopSchedule; use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task}; use crate::util::TryLock; @@ -54,7 +54,7 @@ fn create_drop1() { drop(ad); unreachable!() }, - BlockingSchedule, + NoopSchedule, Id::next(), ); drop(notified); @@ -71,7 +71,7 @@ fn create_drop2() { drop(ad); unreachable!() }, - BlockingSchedule, + NoopSchedule, Id::next(), ); drop(join); @@ -88,7 +88,7 @@ fn drop_abort_handle1() { drop(ad); unreachable!() }, - BlockingSchedule, + NoopSchedule, Id::next(), ); let abort = join.abort_handle(); @@ -108,7 +108,7 @@ fn drop_abort_handle2() { drop(ad); unreachable!() }, - BlockingSchedule, + NoopSchedule, Id::next(), ); let abort = join.abort_handle(); @@ -129,7 +129,7 @@ fn create_shutdown1() { drop(ad); unreachable!() }, - BlockingSchedule, + NoopSchedule, Id::next(), ); drop(join); @@ -146,7 +146,7 @@ fn create_shutdown2() { drop(ad); unreachable!() }, - BlockingSchedule, + NoopSchedule, Id::next(), ); handle.assert_not_dropped(); @@ -157,7 +157,7 @@ fn create_shutdown2() { #[test] fn unowned_poll() { - let (task, _) = unowned(async {}, BlockingSchedule, Id::next()); + let (task, _) = unowned(async {}, NoopSchedule, Id::next()); task.run(); } From 895bd527756fe147985f97da8e7ad79f6c9d698f Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Thu, 10 Nov 2022 16:13:14 -0600 Subject: [PATCH 09/15] Address review comments. --- tokio/src/runtime/blocking/pool.rs | 4 +-- tokio/src/runtime/blocking/schedule.rs | 19 ++++++++-- tokio/src/time/clock.rs | 50 ++++---------------------- tokio/src/time/mod.rs | 4 +-- 4 files changed, 26 insertions(+), 51 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 1713f7308bb..3c2c15e830d 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -359,8 +359,6 @@ impl Spawner { R: Send + 'static, { let fut = BlockingTask::new(func); - #[cfg(feature = "test-util")] - let fut = crate::time::inhibit_auto_advance(fut); let id = task::Id::next(); #[cfg(all(tokio_unstable, feature = "tracing"))] let fut = { @@ -381,7 +379,7 @@ impl Spawner { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let _ = name; - let (task, handle) = task::unowned(fut, BlockingSchedule, id); + let (task, handle) = task::unowned(fut, BlockingSchedule::new(), id); let spawned = self.spawn_task(Task::new(task, is_mandatory), rt); (handle, spawned) diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs index 251dfc41e4e..dae09791ee4 100644 --- a/tokio/src/runtime/blocking/schedule.rs +++ b/tokio/src/runtime/blocking/schedule.rs @@ -1,4 +1,5 @@ use crate::runtime::task::{self, Task}; +use crate::time::Clock; /// `task::Schedule` implementation that does nothing (except some bookkeeping /// in test-util builds). This is unique to the blocking scheduler as tasks @@ -6,12 +7,26 @@ use crate::runtime::task::{self, Task}; /// /// We avoid storing the task by forgetting it in `bind` and re-materializing it /// in `release`. -pub(crate) struct BlockingSchedule; +pub(crate) struct BlockingSchedule { + #[cfg(feature = "test-util")] + clock: Clock, +} + +impl BlockingSchedule { + pub(crate) fn new() -> Self { + BlockingSchedule { + #[cfg(feature = "test-util")] + clock: crate::time::inhibit_auto_advance(), + } + } +} impl task::Schedule for BlockingSchedule { fn release(&self, _task: &Task) -> Option> { #[cfg(feature = "test-util")] - crate::time::allow_auto_advance(); + { + self.clock.allow_auto_advance(); + } None } diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index a6c1a7ae14f..9e55c82e89e 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -28,8 +28,6 @@ cfg_not_test_util! { } cfg_test_util! { - use std::future::Future; - use crate::time::{Duration, Instant}; use crate::loom::sync::{Arc, Mutex}; @@ -134,47 +132,11 @@ cfg_test_util! { inner.unfrozen = Some(std::time::Instant::now()); } - struct AutoAdvanceInhibit(Clock); - - impl Drop for AutoAdvanceInhibit { - fn drop(&mut self) { - self.0.allow_auto_advance(); - } - } - - /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`) - /// and decorate the given future with code to re-enable auto-advance if - /// it is dropped before producing a result. - /// - /// If the future is polled and returns `Ready`, it becomes the caller's - /// responsibility to call `allow_auto_advance`. This quirk is to help - /// avoid a race between blocking task runners and a paused runtime. - pub(crate) fn inhibit_auto_advance(fut: F) -> impl Future + Send + 'static - where - F: Future + Send + 'static, - { - // Bump the inhibit count immediately, not inside the async block, to - // avoid a race condition when used by spawn_blocking. - let guard = clock().map(|clock| { - clock.inhibit_auto_advance(); - AutoAdvanceInhibit(clock) - }); - async move { - let result = fut.await; - // On success, do not resume auto-advance. It must be done on the - // main tokio thread (`BlockingSchedule::release`) to avoid a race. - std::mem::forget(guard); - result - } - } - - - /// Resume auto-advance. This should only be called to balance out a - /// previous call to `inhibit_auto_advance`. - pub(crate) fn allow_auto_advance() { - if let Some(clock) = clock() { - clock.allow_auto_advance(); - } + /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`). + pub(crate) fn inhibit_auto_advance() -> Clock { + let clock = clock().expect("can't inhibit auto-advance from outside the Tokio runtime"); + clock.inhibit_auto_advance(); + clock } /// Advances time. @@ -266,7 +228,7 @@ cfg_test_util! { inner.auto_advance_inhibit_count += 1; } - fn allow_auto_advance(&self) { + pub(crate) fn allow_auto_advance(&self) { let mut inner = self.inner.lock(); inner.auto_advance_inhibit_count -= 1; } diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index e15b674943f..42715de90f6 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -87,9 +87,9 @@ mod clock; pub(crate) use self::clock::Clock; #[cfg(feature = "test-util")] -pub use clock::{advance, pause, resume}; +pub(crate) use clock::inhibit_auto_advance; #[cfg(feature = "test-util")] -pub(crate) use clock::{allow_auto_advance, inhibit_auto_advance}; +pub use clock::{advance, pause, resume}; pub mod error; From 4ec397e8ea1782877d49e28e62578e23e246bf5b Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Mon, 14 Nov 2022 14:10:54 -0600 Subject: [PATCH 10/15] Fix race when main thread is parked as auto-advance is re-enabled. Specifically, when a blocking task is spawned, but no thread immediately awaits the JoinHandle, if the main thread parks itself, we previously failed to unpark it when the blocking task completed. Auto-advance would be re-enabled but could not happen, a potential deadlock. --- tokio/src/runtime/blocking/schedule.rs | 18 ++++++++++++++---- tokio/src/time/clock.rs | 10 ++-------- tokio/src/time/mod.rs | 2 -- tokio/tests/task_blocking.rs | 19 +++++++++++++++++++ 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs index dae09791ee4..cece938fb0c 100644 --- a/tokio/src/runtime/blocking/schedule.rs +++ b/tokio/src/runtime/blocking/schedule.rs @@ -1,5 +1,6 @@ use crate::runtime::task::{self, Task}; -use crate::time::Clock; +#[cfg(feature = "test-util")] +use crate::runtime::{scheduler, Handle}; /// `task::Schedule` implementation that does nothing (except some bookkeeping /// in test-util builds). This is unique to the blocking scheduler as tasks @@ -9,14 +10,20 @@ use crate::time::Clock; /// in `release`. pub(crate) struct BlockingSchedule { #[cfg(feature = "test-util")] - clock: Clock, + handle: Handle, } impl BlockingSchedule { pub(crate) fn new() -> Self { BlockingSchedule { #[cfg(feature = "test-util")] - clock: crate::time::inhibit_auto_advance(), + handle: { + let handle = Handle::current(); + if let scheduler::Handle::CurrentThread(handle) = &handle.inner { + handle.driver.clock.inhibit_auto_advance(); + } + handle + }, } } } @@ -25,7 +32,10 @@ impl task::Schedule for BlockingSchedule { fn release(&self, _task: &Task) -> Option> { #[cfg(feature = "test-util")] { - self.clock.allow_auto_advance(); + if let scheduler::Handle::CurrentThread(handle) = &self.handle.inner { + handle.driver.clock.allow_auto_advance(); + handle.driver.unpark(); + } } None } diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index 9e55c82e89e..cd11a67527f 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -132,13 +132,6 @@ cfg_test_util! { inner.unfrozen = Some(std::time::Instant::now()); } - /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`). - pub(crate) fn inhibit_auto_advance() -> Clock { - let clock = clock().expect("can't inhibit auto-advance from outside the Tokio runtime"); - clock.inhibit_auto_advance(); - clock - } - /// Advances time. /// /// Increments the saved `Instant::now()` value by `duration`. Subsequent @@ -223,7 +216,8 @@ cfg_test_util! { inner.unfrozen = None; } - fn inhibit_auto_advance(&self) { + /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`). + pub(crate) fn inhibit_auto_advance(&self) { let mut inner = self.inner.lock(); inner.auto_advance_inhibit_count += 1; } diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index 42715de90f6..a1f27b839e9 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -87,8 +87,6 @@ mod clock; pub(crate) use self::clock::Clock; #[cfg(feature = "test-util")] -pub(crate) use clock::inhibit_auto_advance; -#[cfg(feature = "test-util")] pub use clock::{advance, pause, resume}; pub mod error; diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index 5859368b4cd..b50bf393afc 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -269,6 +269,25 @@ async fn blocking_task_wakes_paused_runtime() { ); } +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn unawaited_blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + + // When this task finishes, time should auto-advance, even though the + // JoinHandle has not been awaited yet. + let a = task::spawn_blocking(|| { + thread::sleep(Duration::from_millis(20)); + }); + + crate::time::sleep(Duration::from_secs(15)).await; + a.await.expect("blocking task should finish"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +} + #[cfg(feature = "test-util")] #[tokio::test(start_paused = true)] async fn panicking_blocking_task_wakes_paused_runtime() { From 9c1bee9ccefbe7ddc47416d9477468739ab2334b Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Tue, 15 Nov 2022 19:12:55 -0600 Subject: [PATCH 11/15] Fix compile errors in several feature combinations. --- tokio/src/runtime/blocking/schedule.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs index cece938fb0c..d6b27a8a256 100644 --- a/tokio/src/runtime/blocking/schedule.rs +++ b/tokio/src/runtime/blocking/schedule.rs @@ -19,8 +19,12 @@ impl BlockingSchedule { #[cfg(feature = "test-util")] handle: { let handle = Handle::current(); - if let scheduler::Handle::CurrentThread(handle) = &handle.inner { - handle.driver.clock.inhibit_auto_advance(); + match &handle.inner { + scheduler::Handle::CurrentThread(handle) => { + handle.driver.clock.inhibit_auto_advance(); + } + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + scheduler::Handle::MultiThread(_) => {} } handle }, @@ -32,9 +36,13 @@ impl task::Schedule for BlockingSchedule { fn release(&self, _task: &Task) -> Option> { #[cfg(feature = "test-util")] { - if let scheduler::Handle::CurrentThread(handle) = &self.handle.inner { - handle.driver.clock.allow_auto_advance(); - handle.driver.unpark(); + match &self.handle.inner { + scheduler::Handle::CurrentThread(handle) => { + handle.driver.clock.allow_auto_advance(); + handle.driver.unpark(); + } + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + scheduler::Handle::MultiThread(_) => {} } } None From f84b6b4ab027410726a39231f05ce4038252bb64 Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Tue, 15 Nov 2022 19:16:06 -0600 Subject: [PATCH 12/15] fmt --- tokio/src/runtime/tests/loom_blocking.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/tests/loom_blocking.rs b/tokio/src/runtime/tests/loom_blocking.rs index b98ee6ee959..5c4aeae39c5 100644 --- a/tokio/src/runtime/tests/loom_blocking.rs +++ b/tokio/src/runtime/tests/loom_blocking.rs @@ -86,13 +86,10 @@ fn spawn_blocking_when_paused() { let _enter = handle.enter(); let a = crate::task::spawn_blocking(|| {}); let b = crate::task::spawn_blocking(|| {}); - rt.block_on(crate::time::timeout( - Duration::from_millis(1), - async move { - a.await.expect("blocking task should finish"); - b.await.expect("blocking task should finish"); - } - )) + rt.block_on(crate::time::timeout(Duration::from_millis(1), async move { + a.await.expect("blocking task should finish"); + b.await.expect("blocking task should finish"); + })) .expect("timeout should not trigger"); }); } From 9ebd9eebc1c97f1834a02be6f32b47c0f144b2f0 Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Tue, 15 Nov 2022 21:50:17 -0600 Subject: [PATCH 13/15] BlockingSchedule::new: Avoid calling Handle::current(). It does not always work; and is it happens, the caller has a handle already and can just pass it in. --- tokio/src/runtime/blocking/pool.rs | 2 +- tokio/src/runtime/blocking/schedule.rs | 30 ++++++++++++++------------ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 3c2c15e830d..e9f6b66e0fc 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -379,7 +379,7 @@ impl Spawner { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let _ = name; - let (task, handle) = task::unowned(fut, BlockingSchedule::new(), id); + let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id); let spawned = self.spawn_task(Task::new(task, is_mandatory), rt); (handle, spawned) diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs index d6b27a8a256..edf775be8be 100644 --- a/tokio/src/runtime/blocking/schedule.rs +++ b/tokio/src/runtime/blocking/schedule.rs @@ -1,6 +1,7 @@ -use crate::runtime::task::{self, Task}; #[cfg(feature = "test-util")] -use crate::runtime::{scheduler, Handle}; +use crate::runtime::scheduler; +use crate::runtime::task::{self, Task}; +use crate::runtime::Handle; /// `task::Schedule` implementation that does nothing (except some bookkeeping /// in test-util builds). This is unique to the blocking scheduler as tasks @@ -14,20 +15,21 @@ pub(crate) struct BlockingSchedule { } impl BlockingSchedule { - pub(crate) fn new() -> Self { + #[cfg_attr(not(feature = "test-util"), allow(unused_variables))] + pub(crate) fn new(handle: &Handle) -> Self { + #[cfg(feature = "test-util")] + { + match &handle.inner { + scheduler::Handle::CurrentThread(handle) => { + handle.driver.clock.inhibit_auto_advance(); + } + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + scheduler::Handle::MultiThread(_) => {} + } + } BlockingSchedule { #[cfg(feature = "test-util")] - handle: { - let handle = Handle::current(); - match &handle.inner { - scheduler::Handle::CurrentThread(handle) => { - handle.driver.clock.inhibit_auto_advance(); - } - #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - scheduler::Handle::MultiThread(_) => {} - } - handle - }, + handle: handle.clone(), } } } From 939841f4dd297a10116a241ec4b1ecf8c272726e Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Wed, 16 Nov 2022 05:54:22 -0600 Subject: [PATCH 14/15] fmt --- tokio/src/runtime/tests/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/tests/task.rs b/tokio/src/runtime/tests/task.rs index 3aca07c6aff..a79c0f50d15 100644 --- a/tokio/src/runtime/tests/task.rs +++ b/tokio/src/runtime/tests/task.rs @@ -1,5 +1,5 @@ -use crate::runtime::tests::NoopSchedule; use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task}; +use crate::runtime::tests::NoopSchedule; use crate::util::TryLock; use std::collections::VecDeque; From 1104a8bea887d0c07d37e0da7c9f72c1f14a3532 Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Wed, 14 Dec 2022 14:34:03 -0600 Subject: [PATCH 15/15] Reduce sleeps, per review comment. --- tokio/tests/task_blocking.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index b50bf393afc..2999758ff36 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -234,7 +234,7 @@ async fn blocking_when_paused() { // not yet finished. time::timeout( Duration::from_secs(3), - task::spawn_blocking(|| thread::sleep(Duration::from_millis(250))), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))), ) .await .expect("timeout should not trigger") @@ -245,7 +245,7 @@ async fn blocking_when_paused() { // is paused; system time is not. time::timeout( Duration::from_millis(1), - task::spawn_blocking(|| thread::sleep(Duration::from_millis(250))), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))), ) .await .expect("timeout should not trigger") @@ -258,7 +258,7 @@ async fn blocking_task_wakes_paused_runtime() { let t0 = std::time::Instant::now(); time::timeout( Duration::from_secs(15), - task::spawn_blocking(|| thread::sleep(Duration::from_millis(250))), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))), ) .await .expect("timeout should not trigger") @@ -277,7 +277,7 @@ async fn unawaited_blocking_task_wakes_paused_runtime() { // When this task finishes, time should auto-advance, even though the // JoinHandle has not been awaited yet. let a = task::spawn_blocking(|| { - thread::sleep(Duration::from_millis(20)); + thread::sleep(Duration::from_millis(1)); }); crate::time::sleep(Duration::from_secs(15)).await; @@ -295,7 +295,7 @@ async fn panicking_blocking_task_wakes_paused_runtime() { let result = time::timeout( Duration::from_secs(15), task::spawn_blocking(|| { - thread::sleep(Duration::from_millis(250)); + thread::sleep(Duration::from_millis(1)); panic!("blocking task panicked"); }), )