Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt(alt): fix memory leak and increase max preemption when running Loom CI tests #5911

Merged
merged 7 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/loom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,3 @@ jobs:
working-directory: tokio
env:
SCOPE: ${{ matrix.scope }}
# TODO: remove this before stabilizing
LOOM_MAX_PREEMPTIONS: 1
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread_alt/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Handle {
}

pub(crate) fn shutdown(&self) {
self.shared.close();
self.shared.close(self);
self.driver.unpark();
}

Expand Down
34 changes: 16 additions & 18 deletions tokio/src/runtime/scheduler/multi_thread_alt/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::MutexGuard;
use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Shared};
use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Handle, Shared};

use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};

Expand Down Expand Up @@ -146,23 +146,13 @@ impl Idle {
// Find a sleeping worker
if let Some(worker) = synced.idle.sleepers.pop() {
// Find an available core
if let Some(mut core) = synced.idle.available_cores.pop() {
if let Some(mut core) = self.try_acquire_available_core(&mut synced.idle) {
debug_assert!(!core.is_searching);
core.is_searching = true;

self.idle_map.unset(core.index);
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));

// Assign the core to the worker
synced.assigned_cores[worker] = Some(core);

let num_idle = synced.idle.available_cores.len();
#[cfg(not(loom))]
debug_assert_eq!(num_idle, self.num_idle.load(Acquire) - 1);

// Update the number of sleeping workers
self.num_idle.store(num_idle, Release);

// Drop the lock before notifying the condvar.
drop(synced);

Expand Down Expand Up @@ -198,6 +188,7 @@ impl Idle {

for _ in 0..num {
if let Some(worker) = synced.idle.sleepers.pop() {
// TODO: can this be switched to use next_available_core?
if let Some(core) = synced.idle.available_cores.pop() {
debug_assert!(!core.is_searching);

Expand Down Expand Up @@ -236,15 +227,10 @@ impl Idle {
// eventually find the cores and shut them down.
while !synced.idle.sleepers.is_empty() && !synced.idle.available_cores.is_empty() {
let worker = synced.idle.sleepers.pop().unwrap();
let core = synced.idle.available_cores.pop().unwrap();

self.idle_map.unset(core.index);
let core = self.try_acquire_available_core(&mut synced.idle).unwrap();

synced.assigned_cores[worker] = Some(core);
shared.condvars[worker].notify_one();

self.num_idle
.store(synced.idle.available_cores.len(), Release);
}

debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
Expand All @@ -255,6 +241,18 @@ impl Idle {
}
}

pub(super) fn shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared) {
// If there are any remaining cores, shut them down here.
//
// This code is a bit convoluted to avoid lock-reentry.
while let Some(core) = {
let mut synced = shared.synced.lock();
self.try_acquire_available_core(&mut synced.idle)
} {
shared.shutdown_core(handle, core);
}
}

/// The worker releases the given core, making it available to other workers
/// that are waiting.
pub(super) fn release_core(&self, synced: &mut worker::Synced, core: Box<Core>) {
Expand Down
194 changes: 107 additions & 87 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ impl Worker {
// First try to acquire an available core
if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
// Try to poll a task from the global queue
let maybe_task = self.next_remote_task_synced(cx, &mut synced);
let maybe_task = cx.shared().next_remote_task_synced(&mut synced);
(maybe_task, core)
} else {
// block the thread to wait for a core to be assinged to us
Expand Down Expand Up @@ -589,10 +589,7 @@ impl Worker {
}
}

self.pre_shutdown(cx, &mut core);

// Signal shutdown
self.shutdown_core(cx, core);
cx.shared().shutdown_core(&cx.handle, core);

// It is possible that tasks wake others during drop, so we need to
// clear the defer list.
Expand Down Expand Up @@ -746,7 +743,7 @@ impl Worker {
}
}

if let Some(task) = self.next_local_task(&mut core) {
if let Some(task) = core.next_local_task() {
return Ok((Some(task), core));
}

Expand All @@ -759,12 +756,7 @@ impl Worker {
}

let mut synced = cx.shared().synced.lock();
self.next_remote_task_synced(cx, &mut synced)
}

fn next_remote_task_synced(&self, cx: &Context, synced: &mut Synced) -> Option<Notified> {
// safety: we only have access to a valid `Synced` in this file.
unsafe { cx.shared().inject.pop(&mut synced.inject) }
cx.shared().next_remote_task_synced(&mut synced)
}

fn next_remote_task_batch(&self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
Expand Down Expand Up @@ -818,14 +810,6 @@ impl Worker {
ret
}

fn next_local_task(&self, core: &mut Core) -> Option<Notified> {
self.next_lifo_task(core).or_else(|| core.run_queue.pop())
}

fn next_lifo_task(&self, core: &mut Core) -> Option<Notified> {
core.lifo_slot.take()
}

/// Function responsible for stealing tasks from another worker
///
/// Note: Only if less than half the workers are searching for tasks to steal
Expand Down Expand Up @@ -948,7 +932,7 @@ impl Worker {
};

// Check for a task in the LIFO slot
let task = match self.next_lifo_task(&mut core) {
let task = match core.next_lifo_task() {
Some(task) => task,
None => {
self.reset_lifo_enabled(cx);
Expand Down Expand Up @@ -1229,7 +1213,7 @@ impl Worker {
if cx.shared().inject.is_closed(&mut synced.inject) {
synced.shutdown_driver = Some(driver);
self.shutdown_clear_defer(cx);
self.shutdown_finalize(cx, synced);
cx.shared().shutdown_finalize(&cx.handle, &mut synced);
return Err(());
}

Expand Down Expand Up @@ -1281,61 +1265,6 @@ impl Worker {
core.lifo_slot.is_some() || !core.run_queue.is_empty()
}

/// Signals all tasks to shut down, and waits for them to complete. Must run
/// before we enter the single-threaded phase of shutdown processing.
fn pre_shutdown(&self, cx: &Context, core: &mut Core) {
// Signal to all tasks to shut down.
cx.shared().owned.close_and_shutdown_all();

core.stats.submit(&cx.shared().worker_metrics[core.index]);
}

/// Signals that a worker has observed the shutdown signal and has replaced
/// its core back into its handle.
///
/// If all workers have reached this point, the final cleanup is performed.
fn shutdown_core(&self, cx: &Context, core: Box<Core>) {
let mut synced = cx.shared().synced.lock();
synced.shutdown_cores.push(core);

self.shutdown_finalize(cx, synced);
}

fn shutdown_finalize(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) {
// Wait for all cores
if synced.shutdown_cores.len() != cx.shared().remotes.len() {
return;
}

let driver = synced.shutdown_driver.take();

if cx.shared().driver_enabled() && driver.is_none() {
return;
}

debug_assert!(cx.shared().owned.is_empty());

for mut core in synced.shutdown_cores.drain(..) {
// Drain tasks from the local queue
while self.next_local_task(&mut core).is_some() {}
}

// Shutdown the driver
if let Some(mut driver) = driver {
driver.shutdown(&cx.handle.driver);
}

// Drain the injection queue
//
// We already shut down every task, so we can simply drop the tasks. We
// cannot call `next_remote_task()` because we already hold the lock.
//
// safety: passing in correct `idle::Synced`
while let Some(task) = self.next_remote_task_synced(cx, &mut synced) {
drop(task);
}
}

fn reset_lifo_enabled(&self, cx: &Context) {
cx.lifo_enabled
.set(!cx.handle.shared.config.disable_lifo_slot);
Expand Down Expand Up @@ -1379,7 +1308,22 @@ impl Context {
}
}

impl Core {
fn next_local_task(&mut self) -> Option<Notified> {
self.next_lifo_task().or_else(|| self.run_queue.pop())
}

fn next_lifo_task(&mut self) -> Option<Notified> {
self.lifo_slot.take()
}
}

impl Shared {
fn next_remote_task_synced(&self, synced: &mut Synced) -> Option<Notified> {
// safety: we only have access to a valid `Synced` in this file.
unsafe { self.inject.pop(&mut synced.inject) }
}

pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
use std::ptr;

Expand Down Expand Up @@ -1449,17 +1393,25 @@ impl Shared {
self.idle.notify_remote(synced, self);
}

pub(super) fn close(&self) {
let mut synced = self.synced.lock();
pub(super) fn close(&self, handle: &Handle) {
{
let mut synced = self.synced.lock();

if let Some(driver) = self.driver.take() {
synced.shutdown_driver = Some(driver);
}
if let Some(driver) = self.driver.take() {
synced.shutdown_driver = Some(driver);
}

if !self.inject.close(&mut synced.inject) {
return;
}

if self.inject.close(&mut synced.inject) {
// Set the shutdown flag on all available cores
self.idle.shutdown(&mut synced, self);
}

// Any unassigned cores need to be shutdown, but we have to first drop
// the lock
self.idle.shutdown_unassigned_cores(handle, self);
}

fn push_remote_task(&self, synced: &mut Synced, task: Notified) {
Expand Down Expand Up @@ -1498,6 +1450,52 @@ impl Shared {
fn driver_enabled(&self) -> bool {
self.condvars.len() > self.remotes.len()
}

pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
self.owned.close_and_shutdown_all();

core.stats.submit(&self.worker_metrics[core.index]);

let mut synced = self.synced.lock();
synced.shutdown_cores.push(core);

self.shutdown_finalize(handle, &mut synced);
}

pub(super) fn shutdown_finalize(&self, handle: &Handle, synced: &mut Synced) {
// Wait for all cores
if synced.shutdown_cores.len() != self.remotes.len() {
return;
}

let driver = synced.shutdown_driver.take();

if self.driver_enabled() && driver.is_none() {
return;
}

debug_assert!(self.owned.is_empty());

for mut core in synced.shutdown_cores.drain(..) {
// Drain tasks from the local queue
while core.next_local_task().is_some() {}
}

// Shutdown the driver
if let Some(mut driver) = driver {
driver.shutdown(&handle.driver);
}

// Drain the injection queue
//
// We already shut down every task, so we can simply drop the tasks. We
// cannot call `next_remote_task()` because we already hold the lock.
//
// safety: passing in correct `idle::Synced`
while let Some(task) = self.next_remote_task_synced(synced) {
drop(task);
}
}
}

impl Overflow<Arc<Handle>> for Shared {
Expand All @@ -1514,10 +1512,20 @@ impl Overflow<Arc<Handle>> for Shared {
}

impl<'a> Lock<inject::Synced> for &'a Shared {
type Handle = InjectGuard<'a>;
type Handle = SyncedGuard<'a>;

fn lock(self) -> Self::Handle {
InjectGuard {
SyncedGuard {
lock: self.synced.lock(),
}
}
}

impl<'a> Lock<Synced> for &'a Shared {
type Handle = SyncedGuard<'a>;

fn lock(self) -> Self::Handle {
SyncedGuard {
lock: self.synced.lock(),
}
}
Expand All @@ -1537,16 +1545,28 @@ impl task::Schedule for Arc<Handle> {
}
}

pub(crate) struct InjectGuard<'a> {
impl AsMut<Synced> for Synced {
fn as_mut(&mut self) -> &mut Synced {
self
}
}

pub(crate) struct SyncedGuard<'a> {
lock: crate::loom::sync::MutexGuard<'a, Synced>,
}

impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
impl<'a> AsMut<inject::Synced> for SyncedGuard<'a> {
fn as_mut(&mut self) -> &mut inject::Synced {
&mut self.lock.inject
}
}

impl<'a> AsMut<Synced> for SyncedGuard<'a> {
fn as_mut(&mut self) -> &mut Synced {
&mut self.lock
}
}

#[track_caller]
fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
use scheduler::Context::MultiThreadAlt;
Expand Down