Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve async cancellation safety of future::Cache (v0.12.0-beta.X) #309

Merged
merged 8 commits into from
Aug 27, 2023
3 changes: 1 addition & 2 deletions MIGRATION-GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ let cache = Cache::builder()

`async_eviction_listener` takes a closure that returns a `Future`. If you need to
`.await` something in the eviction listener, use this method. The actual return type
of the closure is `future::ListenerFuture`, which is a type alias of
of the closure is `notification::ListenerFuture`, which is a type alias of
`Pin<Box<dyn Future<Output = ()> + Send>>`. You can use the `boxed` method of
`future::FutureExt` trait to convert a regular `Future` into this type.

Expand Down Expand Up @@ -242,7 +242,6 @@ free some memory), you do not need to call `run_pending_tasks` method.
- To enable it, see [Enabling the thread pool](#enabling-the-thread-pool) for more
details.


#### Enabling the thread pool

To enable the thread pool, do the followings:
Expand Down
57 changes: 56 additions & 1 deletion src/common/concurrent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::common::{deque::DeqNode, time::Instant};

use parking_lot::Mutex;
use std::{ptr::NonNull, sync::Arc};
use std::{fmt, ptr::NonNull, sync::Arc};
use tagptr::TagNonNull;
use triomphe::Arc as TrioArc;

Expand Down Expand Up @@ -112,6 +112,15 @@ impl<K, V> KvEntry<K, V> {
}
}

impl<K, V> Clone for KvEntry<K, V> {
fn clone(&self) -> Self {
Self {
key: Arc::clone(&self.key),
entry: TrioArc::clone(&self.entry),
}
}
}

impl<K> AccessTime for DeqNode<KeyHashDate<K>> {
#[inline]
fn last_accessed(&self) -> Option<Instant> {
Expand Down Expand Up @@ -316,3 +325,49 @@ pub(crate) enum WriteOp<K, V> {
},
Remove(KvEntry<K, V>),
}

/// Cloning a WriteOp is safe and cheap because it uses Arc and TrioArc pointers to
/// the actual data.
impl<K, V> Clone for WriteOp<K, V> {
fn clone(&self) -> Self {
match self {
Self::Upsert {
key_hash,
value_entry,
old_weight,
new_weight,
} => Self::Upsert {
key_hash: key_hash.clone(),
value_entry: TrioArc::clone(value_entry),
old_weight: *old_weight,
new_weight: *new_weight,
},
Self::Remove(kv_hash) => Self::Remove(kv_hash.clone()),
}
}
}

impl<K, V> fmt::Debug for WriteOp<K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Upsert { .. } => f.debug_struct("Upsert").finish(),
Self::Remove(..) => f.debug_tuple("Remove").finish(),
}
}
}

pub(crate) struct OldEntryInfo<K, V> {
pub(crate) entry: TrioArc<ValueEntry<K, V>>,
pub(crate) last_accessed: Option<Instant>,
pub(crate) last_modified: Option<Instant>,
}

impl<K, V> OldEntryInfo<K, V> {
pub(crate) fn new(entry: &TrioArc<ValueEntry<K, V>>) -> Self {
Self {
entry: TrioArc::clone(entry),
last_accessed: entry.last_accessed(),
last_modified: entry.last_modified(),
}
}
}
79 changes: 78 additions & 1 deletion src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
//! To use this module, enable a crate feature called "future".

use async_lock::Mutex;
use futures_util::future::BoxFuture;
use crossbeam_channel::Sender;
use futures_util::future::{BoxFuture, Shared};
use once_cell::sync::Lazy;
use std::{future::Future, hash::Hash, sync::Arc};

use crate::common::{concurrent::WriteOp, time::Instant};

mod base_cache;
mod builder;
mod cache;
Expand Down Expand Up @@ -71,6 +74,80 @@ where
}
}

/// Operation that has been interrupted (stopped polling) by async cancellation.
pub(crate) enum InterruptedOp<K, V> {
CallEvictionListener {
ts: Instant,
// 'static means that the future can capture only owned value and/or static
// references. No non-static references are allowed.
future: Shared<BoxFuture<'static, ()>>,
op: WriteOp<K, V>,
},
SendWriteOp {
ts: Instant,
op: WriteOp<K, V>,
},
}

/// Drop guard for an async task being performed. If this guard is dropped while it
/// is still having the shared `future` or the write `op`, it will convert them to an
/// `InterruptedOp` and send it to the interrupted operations channel. Later, the
/// interrupted op will be retried by `retry_interrupted_ops` method of
/// `BaseCache`.
struct CancelGuard<'a, K, V> {
interrupted_op_ch: &'a Sender<InterruptedOp<K, V>>,
ts: Instant,
future: Option<Shared<BoxFuture<'static, ()>>>,
op: Option<WriteOp<K, V>>,
}

impl<'a, K, V> CancelGuard<'a, K, V> {
fn new(interrupted_op_ch: &'a Sender<InterruptedOp<K, V>>, ts: Instant) -> Self {
Self {
interrupted_op_ch,
ts,
future: Default::default(),
op: Default::default(),
}
}

fn set_future_and_op(&mut self, future: Shared<BoxFuture<'static, ()>>, op: WriteOp<K, V>) {
self.future = Some(future);
self.op = Some(op);
}

fn set_op(&mut self, op: WriteOp<K, V>) {
self.op = Some(op);
}

fn unset_future(&mut self) {
self.future = None;
}

fn clear(&mut self) {
self.future = None;
self.op = None;
}
}

impl<'a, K, V> Drop for CancelGuard<'a, K, V> {
fn drop(&mut self) {
let interrupted_op = match (self.future.take(), self.op.take()) {
(Some(future), Some(op)) => InterruptedOp::CallEvictionListener {
ts: self.ts,
future,
op,
},
(None, Some(op)) => InterruptedOp::SendWriteOp { ts: self.ts, op },
_ => return,
};

self.interrupted_op_ch
.send(interrupted_op)
.expect("Failed to send a pending op");
}
}

/// May yield to other async tasks.
pub(crate) async fn may_yield() {
static LOCK: Lazy<Mutex<()>> = Lazy::new(Default::default);
Expand Down