Skip to content

Commit

Permalink
Add optionalwatch cookie support (#7379)
Browse files Browse the repository at this point in the history
### Description

We have two types of APIs in the daemon, one is request-based, and one
is 'watch-based'. In the first case, a request comes in from the system
and we want to make sure that the answer that is given has observed at
least the filesystem events up to the point of the request. In the
second case, there is constantly updating state (ie, in response to
filesystem changes) that also needs to be validated against some cookie
index.

CookiedOptionalWatch provides an API around some of the more complex
data dependencies, making sure that complicated derived state is able to
easily respect cookies when their values are requested. A cookie watch
can have siblings from the same source, or children, which effectively
composes two cookie watches. This is necessary because the file watching
will be processed by multiple actors whose individual processing queues
must all be up-to-date for the derived state to be considered up to
date.

### Testing Instructions

Existing tests suffice
  • Loading branch information
arlyon committed Feb 16, 2024
1 parent 4928e80 commit 7518da8
Show file tree
Hide file tree
Showing 3 changed files with 352 additions and 74 deletions.
242 changes: 223 additions & 19 deletions crates/turborepo-filewatch/src/cookies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@

use std::{collections::BinaryHeap, fs::OpenOptions, time::Duration};

use futures::FutureExt;
use notify::EventKind;
use thiserror::Error;
use tokio::{
sync::{broadcast, mpsc, oneshot},
sync::{broadcast, mpsc, oneshot, watch},
time::error::Elapsed,
};
use tracing::trace;
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation};

use crate::{NotifyError, OptionalWatch};
use crate::{optional_watch::SomeRef, NotifyError, OptionalWatch};

#[derive(Debug, Error)]
pub enum CookieError {
Expand All @@ -60,7 +61,7 @@ pub enum CookieError {
path: AbsoluteSystemPathBuf,
},
#[error("cookie queue is not available")]
Unavailable,
Unavailable(#[from] watch::error::RecvError),
}

/// CookieWriter is responsible for assigning filesystem cookies to a request
Expand Down Expand Up @@ -152,7 +153,7 @@ impl<T> CookieWatcher<T> {
if !matches!(event_kind, EventKind::Create(_)) {
return None;
}
if let Some(serial) = self.serial_for_path(path) {
if let Some(serial) = serial_for_path(&self.root, path) {
self.latest = serial;
let mut ready_requests = Vec::new();
while let Some(cookied_request) = self.pending_requests.pop() {
Expand All @@ -168,14 +169,14 @@ impl<T> CookieWatcher<T> {
None
}
}
}

fn serial_for_path(&self, path: &AbsoluteSystemPath) -> Option<usize> {
if self.root.relation_to_path(path) == PathRelation::Parent {
let filename = path.file_name()?;
filename.strip_suffix(".cookie")?.parse().ok()
} else {
None
}
fn serial_for_path(root: &AbsoluteSystemPath, path: &AbsoluteSystemPath) -> Option<usize> {
if root.relation_to_path(path) == PathRelation::Parent {
let filename = path.file_name()?;
filename.strip_suffix(".cookie")?.parse().ok()
} else {
None
}
}

Expand Down Expand Up @@ -231,18 +232,38 @@ impl CookieWriter {
request: T,
) -> Result<CookiedRequest<T>, CookieError> {
// we need to write the cookie from a single task so as to serialize them
tokio::time::timeout(self.timeout, self.cookie_request_inner(request)).await?
tokio::time::timeout(self.timeout, self.cookie_request_inner(request, None)).await?
}

async fn cookie_request_inner<T>(&self, request: T) -> Result<CookiedRequest<T>, CookieError> {
/// Internal services may want to wait for the cookie
/// system to be set up, before issuing a timeout for the
/// cookie request.
pub(crate) async fn initialized_cookie_request<T>(
&self,
request: T,
) -> Result<CookiedRequest<T>, CookieError> {
self.cookie_request_inner(request, Some(self.timeout)).await
}

async fn cookie_request_inner<T>(
&self,
request: T,
timeout: Option<Duration>,
) -> Result<CookiedRequest<T>, CookieError> {
let (resp_tx, resp_rx) = oneshot::channel();
let mut cookie_request_tx = self.cookie_request_sender_lazy.clone();
let Ok(cookie_request_tx) = cookie_request_tx.get().await.map(|s| s.to_owned()) else {
// the cookie queue is not ready and will never be ready
return Err(CookieError::Unavailable);
let mut cookie_request_sender_lazy = self.cookie_request_sender_lazy.clone();
let cookie_request_sender_lazy = cookie_request_sender_lazy
.get()
.await
.map(|s| s.to_owned())?;
cookie_request_sender_lazy.send(resp_tx).await?;
let serial = match timeout {
Some(timeout) => {
let resp_rx = tokio::time::timeout(timeout, resp_rx).await??;
resp_rx?
}
None => resp_rx.await??,
};
cookie_request_tx.send(resp_tx).await?;
let serial = resp_rx.await??;
Ok(CookiedRequest { request, serial })
}
}
Expand Down Expand Up @@ -289,6 +310,189 @@ fn handle_cookie_request(
}
}

/// a lightweight wrapper around OptionalWatch that embeds cookie ids into the
/// get call. for requests that require cookies (ie, waiting for filesystem
/// flushes) then a cookie watch is ideal
pub struct CookiedOptionalWatch<T, P: CookieReady> {
value: watch::Receiver<Option<T>>,
cookie_index: watch::Receiver<usize>,
cookie_writer: CookieWriter,
parent: P,
}

impl<T, P: CookieReady + Clone> Clone for CookiedOptionalWatch<T, P> {
fn clone(&self) -> Self {
Self {
value: self.value.clone(),
cookie_index: self.cookie_index.clone(),
cookie_writer: self.cookie_writer.clone(),
parent: self.parent.clone(),
}
}
}

pub trait CookieReady {
fn ready(&mut self, id: usize) -> impl std::future::Future<Output = ()>;
}

impl CookieReady for () {
async fn ready(&mut self, _id: usize) {}
}

impl<T, U: CookieReady> CookieReady for CookiedOptionalWatch<T, U> {
async fn ready(&mut self, id: usize) {
tracing::debug!("waiting for cookie {}", id);
self.parent.ready(id).await;
_ = self.cookie_index.wait_for(|v| v >= &id).await;
}
}

impl<T> CookiedOptionalWatch<T, ()> {
pub fn new(
update: CookieWriter,
) -> (
watch::Sender<Option<T>>,
CookieRegister,
CookiedOptionalWatch<T, ()>,
) {
let (tx, rx) = watch::channel(None);
let (cookie_tx, cookie_rx) = watch::channel(0);
tracing::debug!("starting cookied optional watch in {}", update.root());
(
tx,
CookieRegister(cookie_tx, update.root().to_owned()),
CookiedOptionalWatch {
value: rx,
cookie_index: cookie_rx,
cookie_writer: update,
parent: (),
},
)
}
}

impl<T, U: CookieReady + Clone> CookiedOptionalWatch<T, U> {
/// Create a new sibling cookie watcher that inherits the same fs source as
/// this one.
pub fn new_sibling<T2>(&self) -> (watch::Sender<Option<T2>>, CookiedOptionalWatch<T2, U>) {
let (tx, rx) = watch::channel(None);
(
tx,
CookiedOptionalWatch {
value: rx,
cookie_index: self.cookie_index.clone(),
cookie_writer: self.cookie_writer.clone(),
parent: self.parent.clone(),
},
)
}

/// Create a new child cookie watcher that inherits the same fs source as
/// this one, but also has its own cookie source. This allows you to
/// synchronize two independent cookie streams.
pub fn new_child<T2>(
&self,
) -> (
watch::Sender<Option<T2>>,
CookieRegister,
CookiedOptionalWatch<T2, Self>,
) {
let (tx, rx) = watch::channel(None);
let (cookie_tx, cookie_rx) = watch::channel(0);

(
tx,
CookieRegister(cookie_tx, self.cookie_writer.root().to_owned()),
CookiedOptionalWatch {
value: rx,
cookie_index: cookie_rx,
cookie_writer: self.cookie_writer.clone(),
parent: self.clone(),
},
)
}

#[tracing::instrument(skip(self))]
pub async fn get(&mut self) -> Result<SomeRef<'_, T>, CookieError> {
let next_id = self
.cookie_writer
.initialized_cookie_request(())
.await?
.serial;
self.ready(next_id).await;
tracing::debug!("waiting for data");
Ok(self.get_inner().await?)
}

#[tracing::instrument(skip(self))]
pub async fn get_change(&mut self) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
self.value.changed().await?;
self.get_inner().await
}

/// Please do not use this data from a user-facing query. It should only
/// really be used for internal state management. Equivalent to
/// `OptionalWatch::get`
///
/// For an example as to why we need this, sometimes file event processing
/// needs to access data but issuing a cookie request would deadlock.
///
/// `_reason` is purely for documentation purposes and is not used.
pub async fn get_raw(
&mut self,
_reason: &str,
) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
self.get_inner().await
}

/// Get the current value, if it is available.
///
/// Unlike `OptionalWatch::get_immediate`, this method will block until the
/// cookie has been seen, at which point it will call `now_or_never` on the
/// value watch.
#[tracing::instrument(skip(self))]
pub async fn get_immediate(
&mut self,
) -> Option<Result<SomeRef<'_, T>, watch::error::RecvError>> {
let next_id = self.cookie_writer.cookie_request(()).await.ok()?.serial;
self.cookie_index.wait_for(|v| v >= &next_id).await.ok()?;
self.get_inner().now_or_never()
}

/// Please do not use this data from a user-facing query. It should only
/// really be used for internal state management. Equivalent to
/// `OptionalWatch::get`
///
/// For an example as to why we need this, sometimes file event processing
/// needs to access data but issuing a cookie request would deadlock.
///
/// `_reason` is purely for documentation purposes and is not used.
pub async fn get_immediate_raw(
&mut self,
reason: &str,
) -> Option<Result<SomeRef<'_, T>, watch::error::RecvError>> {
self.get_raw(reason).now_or_never()
}

async fn get_inner(&mut self) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
self.value.wait_for(|f| f.is_some()).await?;
Ok(SomeRef(self.value.borrow()))
}
}

pub struct CookieRegister(watch::Sender<usize>, AbsoluteSystemPathBuf);
impl CookieRegister {
pub fn register(&self, paths: &[&AbsoluteSystemPath]) {
tracing::trace!("registering cookie for {:?}", paths);
for path in paths {
if let Some(serial) = serial_for_path(&self.1, path) {
tracing::trace!("updating cookie to {}", serial);
let _ = self.0.send(serial);
}
}
}
}

#[cfg(test)]
mod test {
use std::time::Duration;
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-filewatch/src/optional_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<T> OptionalWatch<T> {
}
}

pub struct SomeRef<'a, T>(Ref<'a, Option<T>>);
pub struct SomeRef<'a, T>(pub(crate) Ref<'a, Option<T>>);

impl<'a, T> std::ops::Deref for SomeRef<'a, T> {
type Target = T;
Expand Down

0 comments on commit 7518da8

Please sign in to comment.