Skip to content

Commit

Permalink
Merge pull request #309 from moka-rs/async-cancellation-safety
Browse files Browse the repository at this point in the history
Improve async cancellation safety of `future::Cache` (v0.12.0-beta.X)
  • Loading branch information
tatsuya6502 committed Aug 27, 2023
2 parents 234a728 + 3cb2529 commit 3818083
Show file tree
Hide file tree
Showing 7 changed files with 742 additions and 295 deletions.
3 changes: 1 addition & 2 deletions MIGRATION-GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ let cache = Cache::builder()

`async_eviction_listener` takes a closure that returns a `Future`. If you need to
`.await` something in the eviction listener, use this method. The actual return type
of the closure is `future::ListenerFuture`, which is a type alias of
of the closure is `notification::ListenerFuture`, which is a type alias of
`Pin<Box<dyn Future<Output = ()> + Send>>`. You can use the `boxed` method of
`future::FutureExt` trait to convert a regular `Future` into this type.

Expand Down Expand Up @@ -242,7 +242,6 @@ free some memory), you do not need to call `run_pending_tasks` method.
- To enable it, see [Enabling the thread pool](#enabling-the-thread-pool) for more
details.


#### Enabling the thread pool

To enable the thread pool, do the followings:
Expand Down
57 changes: 56 additions & 1 deletion src/common/concurrent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::common::{deque::DeqNode, time::Instant};

use parking_lot::Mutex;
use std::{ptr::NonNull, sync::Arc};
use std::{fmt, ptr::NonNull, sync::Arc};
use tagptr::TagNonNull;
use triomphe::Arc as TrioArc;

Expand Down Expand Up @@ -112,6 +112,15 @@ impl<K, V> KvEntry<K, V> {
}
}

impl<K, V> Clone for KvEntry<K, V> {
fn clone(&self) -> Self {
Self {
key: Arc::clone(&self.key),
entry: TrioArc::clone(&self.entry),
}
}
}

impl<K> AccessTime for DeqNode<KeyHashDate<K>> {
#[inline]
fn last_accessed(&self) -> Option<Instant> {
Expand Down Expand Up @@ -316,3 +325,49 @@ pub(crate) enum WriteOp<K, V> {
},
Remove(KvEntry<K, V>),
}

/// Cloning a WriteOp is safe and cheap because it uses Arc and TrioArc pointers to
/// the actual data.
impl<K, V> Clone for WriteOp<K, V> {
fn clone(&self) -> Self {
match self {
Self::Upsert {
key_hash,
value_entry,
old_weight,
new_weight,
} => Self::Upsert {
key_hash: key_hash.clone(),
value_entry: TrioArc::clone(value_entry),
old_weight: *old_weight,
new_weight: *new_weight,
},
Self::Remove(kv_hash) => Self::Remove(kv_hash.clone()),
}
}
}

impl<K, V> fmt::Debug for WriteOp<K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Upsert { .. } => f.debug_struct("Upsert").finish(),
Self::Remove(..) => f.debug_tuple("Remove").finish(),
}
}
}

pub(crate) struct OldEntryInfo<K, V> {
pub(crate) entry: TrioArc<ValueEntry<K, V>>,
pub(crate) last_accessed: Option<Instant>,
pub(crate) last_modified: Option<Instant>,
}

impl<K, V> OldEntryInfo<K, V> {
pub(crate) fn new(entry: &TrioArc<ValueEntry<K, V>>) -> Self {
Self {
entry: TrioArc::clone(entry),
last_accessed: entry.last_accessed(),
last_modified: entry.last_modified(),
}
}
}
79 changes: 78 additions & 1 deletion src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
//! To use this module, enable a crate feature called "future".

use async_lock::Mutex;
use futures_util::future::BoxFuture;
use crossbeam_channel::Sender;
use futures_util::future::{BoxFuture, Shared};
use once_cell::sync::Lazy;
use std::{future::Future, hash::Hash, sync::Arc};

use crate::common::{concurrent::WriteOp, time::Instant};

mod base_cache;
mod builder;
mod cache;
Expand Down Expand Up @@ -71,6 +74,80 @@ where
}
}

/// Operation that has been interrupted (stopped polling) by async cancellation.
pub(crate) enum InterruptedOp<K, V> {
CallEvictionListener {
ts: Instant,
// 'static means that the future can capture only owned value and/or static
// references. No non-static references are allowed.
future: Shared<BoxFuture<'static, ()>>,
op: WriteOp<K, V>,
},
SendWriteOp {
ts: Instant,
op: WriteOp<K, V>,
},
}

/// Drop guard for an async task being performed. If this guard is dropped while it
/// is still having the shared `future` or the write `op`, it will convert them to an
/// `InterruptedOp` and send it to the interrupted operations channel. Later, the
/// interrupted op will be retried by `retry_interrupted_ops` method of
/// `BaseCache`.
struct CancelGuard<'a, K, V> {
interrupted_op_ch: &'a Sender<InterruptedOp<K, V>>,
ts: Instant,
future: Option<Shared<BoxFuture<'static, ()>>>,
op: Option<WriteOp<K, V>>,
}

impl<'a, K, V> CancelGuard<'a, K, V> {
fn new(interrupted_op_ch: &'a Sender<InterruptedOp<K, V>>, ts: Instant) -> Self {
Self {
interrupted_op_ch,
ts,
future: Default::default(),
op: Default::default(),
}
}

fn set_future_and_op(&mut self, future: Shared<BoxFuture<'static, ()>>, op: WriteOp<K, V>) {
self.future = Some(future);
self.op = Some(op);
}

fn set_op(&mut self, op: WriteOp<K, V>) {
self.op = Some(op);
}

fn unset_future(&mut self) {
self.future = None;
}

fn clear(&mut self) {
self.future = None;
self.op = None;
}
}

impl<'a, K, V> Drop for CancelGuard<'a, K, V> {
fn drop(&mut self) {
let interrupted_op = match (self.future.take(), self.op.take()) {
(Some(future), Some(op)) => InterruptedOp::CallEvictionListener {
ts: self.ts,
future,
op,
},
(None, Some(op)) => InterruptedOp::SendWriteOp { ts: self.ts, op },
_ => return,
};

self.interrupted_op_ch
.send(interrupted_op)
.expect("Failed to send a pending op");
}
}

/// May yield to other async tasks.
pub(crate) async fn may_yield() {
static LOCK: Lazy<Mutex<()>> = Lazy::new(Default::default);
Expand Down

0 comments on commit 3818083

Please sign in to comment.