From 969b50b8213cdaa38b6ffb7bb652b6c6f407bbb5 Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Mon, 14 Nov 2022 14:10:54 -0600 Subject: [PATCH] Fix race when main thread is parked as auto-advance is re-enabled. Specifically, when a blocking task is spawned, but no thread immediately awaits the JoinHandle, if the main thread parks itself, we previously failed to unpark it when the blocking task completed. Auto-advance would be re-enabled but could not happen, a potential deadlock. --- tokio/src/runtime/blocking/schedule.rs | 17 +++++++++++++---- tokio/src/time/clock.rs | 10 ++-------- tokio/src/time/mod.rs | 2 -- tokio/tests/task_blocking.rs | 19 +++++++++++++++++++ 4 files changed, 34 insertions(+), 14 deletions(-) diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs index dae09791ee4..2c29472e657 100644 --- a/tokio/src/runtime/blocking/schedule.rs +++ b/tokio/src/runtime/blocking/schedule.rs @@ -1,5 +1,5 @@ use crate::runtime::task::{self, Task}; -use crate::time::Clock; +use crate::runtime::{scheduler, Handle}; /// `task::Schedule` implementation that does nothing (except some bookkeeping /// in test-util builds). This is unique to the blocking scheduler as tasks @@ -9,14 +9,20 @@ use crate::time::Clock; /// in `release`. pub(crate) struct BlockingSchedule { #[cfg(feature = "test-util")] - clock: Clock, + handle: Handle, } impl BlockingSchedule { pub(crate) fn new() -> Self { BlockingSchedule { #[cfg(feature = "test-util")] - clock: crate::time::inhibit_auto_advance(), + handle: { + let handle = Handle::current(); + if let scheduler::Handle::CurrentThread(handle) = &handle.inner { + handle.driver.clock.inhibit_auto_advance(); + } + handle + }, } } } @@ -25,7 +31,10 @@ impl task::Schedule for BlockingSchedule { fn release(&self, _task: &Task) -> Option> { #[cfg(feature = "test-util")] { - self.clock.allow_auto_advance(); + if let scheduler::Handle::CurrentThread(handle) = &self.handle.inner { + handle.driver.clock.allow_auto_advance(); + handle.driver.unpark(); + } } None } diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index 9e55c82e89e..cd11a67527f 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -132,13 +132,6 @@ cfg_test_util! { inner.unfrozen = Some(std::time::Instant::now()); } - /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`). - pub(crate) fn inhibit_auto_advance() -> Clock { - let clock = clock().expect("can't inhibit auto-advance from outside the Tokio runtime"); - clock.inhibit_auto_advance(); - clock - } - /// Advances time. /// /// Increments the saved `Instant::now()` value by `duration`. Subsequent @@ -223,7 +216,8 @@ cfg_test_util! { inner.unfrozen = None; } - fn inhibit_auto_advance(&self) { + /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`). + pub(crate) fn inhibit_auto_advance(&self) { let mut inner = self.inner.lock(); inner.auto_advance_inhibit_count += 1; } diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index 42715de90f6..a1f27b839e9 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -87,8 +87,6 @@ mod clock; pub(crate) use self::clock::Clock; #[cfg(feature = "test-util")] -pub(crate) use clock::inhibit_auto_advance; -#[cfg(feature = "test-util")] pub use clock::{advance, pause, resume}; pub mod error; diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index 5859368b4cd..b50bf393afc 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -269,6 +269,25 @@ async fn blocking_task_wakes_paused_runtime() { ); } +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn unawaited_blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + + // When this task finishes, time should auto-advance, even though the + // JoinHandle has not been awaited yet. + let a = task::spawn_blocking(|| { + thread::sleep(Duration::from_millis(20)); + }); + + crate::time::sleep(Duration::from_secs(15)).await; + a.await.expect("blocking task should finish"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +} + #[cfg(feature = "test-util")] #[tokio::test(start_paused = true)] async fn panicking_blocking_task_wakes_paused_runtime() {