Skip to content

Commit

Permalink
add cookie wrapper around OptionalWatch to synchronize updates
Browse files Browse the repository at this point in the history
  • Loading branch information
arlyon authored and Alexander Lyon committed Feb 16, 2024
1 parent d438018 commit 46838c9
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 47 deletions.
227 changes: 208 additions & 19 deletions crates/turborepo-filewatch/src/cookies.rs
Expand Up @@ -35,16 +35,17 @@

use std::{collections::BinaryHeap, fs::OpenOptions, time::Duration};

use futures::FutureExt;
use notify::EventKind;
use thiserror::Error;
use tokio::{
sync::{broadcast, mpsc, oneshot},
sync::{broadcast, mpsc, oneshot, watch},
time::error::Elapsed,
};
use tracing::trace;
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation};

use crate::{NotifyError, OptionalWatch};
use crate::{optional_watch::SomeRef, NotifyError, OptionalWatch};

#[derive(Debug, Error)]
pub enum CookieError {
Expand All @@ -60,7 +61,7 @@ pub enum CookieError {
path: AbsoluteSystemPathBuf,
},
#[error("cookie queue is not available")]
Unavailable,
Unavailable(#[from] watch::error::RecvError),
}

/// CookieWriter is responsible for assigning filesystem cookies to a request
Expand Down Expand Up @@ -152,7 +153,7 @@ impl<T> CookieWatcher<T> {
if !matches!(event_kind, EventKind::Create(_)) {
return None;
}
if let Some(serial) = self.serial_for_path(path) {
if let Some(serial) = serial_for_path(&self.root, path) {
self.latest = serial;
let mut ready_requests = Vec::new();
while let Some(cookied_request) = self.pending_requests.pop() {
Expand All @@ -168,14 +169,14 @@ impl<T> CookieWatcher<T> {
None
}
}
}

fn serial_for_path(&self, path: &AbsoluteSystemPath) -> Option<usize> {
if self.root.relation_to_path(path) == PathRelation::Parent {
let filename = path.file_name()?;
filename.strip_suffix(".cookie")?.parse().ok()
} else {
None
}
fn serial_for_path(root: &AbsoluteSystemPath, path: &AbsoluteSystemPath) -> Option<usize> {
if root.relation_to_path(path) == PathRelation::Parent {
let filename = path.file_name()?;
filename.strip_suffix(".cookie")?.parse().ok()
} else {
None
}
}

Expand Down Expand Up @@ -231,18 +232,38 @@ impl CookieWriter {
request: T,
) -> Result<CookiedRequest<T>, CookieError> {
// we need to write the cookie from a single task so as to serialize them
tokio::time::timeout(self.timeout, self.cookie_request_inner(request)).await?
tokio::time::timeout(self.timeout, self.cookie_request_inner(request, None)).await?
}

/// Internal services may want to wait for the cookie
/// system to be set up, before issuing a timeout for the
/// cookie request.
pub(crate) async fn initialized_cookie_request<T>(
&self,
request: T,
) -> Result<CookiedRequest<T>, CookieError> {
self.cookie_request_inner(request, Some(self.timeout)).await
}

async fn cookie_request_inner<T>(&self, request: T) -> Result<CookiedRequest<T>, CookieError> {
async fn cookie_request_inner<T>(
&self,
request: T,
timeout: Option<Duration>,
) -> Result<CookiedRequest<T>, CookieError> {
let (resp_tx, resp_rx) = oneshot::channel();
let mut cookie_request_tx = self.cookie_request_sender_lazy.clone();
let Ok(cookie_request_tx) = cookie_request_tx.get().await.map(|s| s.to_owned()) else {
// the cookie queue is not ready and will never be ready
return Err(CookieError::Unavailable);
let mut cookie_request_sender_lazy = self.cookie_request_sender_lazy.clone();
let cookie_request_sender_lazy = cookie_request_sender_lazy
.get()
.await
.map(|s| s.to_owned())?;
cookie_request_sender_lazy.send(resp_tx).await?;
let serial = match timeout {
Some(timeout) => {
let resp_rx = tokio::time::timeout(timeout, resp_rx).await??;
resp_rx?
}
None => resp_rx.await??,
};
cookie_request_tx.send(resp_tx).await?;
let serial = resp_rx.await??;
Ok(CookiedRequest { request, serial })
}
}
Expand Down Expand Up @@ -289,6 +310,174 @@ fn handle_cookie_request(
}
}

/// a lightweight wrapper around OptionalWatch that embeds cookie ids into the
/// get call. for requests that require cookies (ie, waiting for filesystem
/// flushes) then a cookie watch is ideal
pub struct CookiedOptionalWatch<T, P: CookieReady> {
value: watch::Receiver<Option<T>>,
cookie_index: watch::Receiver<usize>,
cookie_writer: CookieWriter,
parent: P,
}

impl<T, P: CookieReady + Clone> Clone for CookiedOptionalWatch<T, P> {
fn clone(&self) -> Self {
Self {
value: self.value.clone(),
cookie_index: self.cookie_index.clone(),
cookie_writer: self.cookie_writer.clone(),
parent: self.parent.clone(),
}
}
}

pub trait CookieReady {
fn ready(&mut self, id: usize) -> impl std::future::Future<Output = ()>;
}

impl CookieReady for () {
async fn ready(&mut self, _id: usize) {}
}

impl<T, U: CookieReady> CookieReady for CookiedOptionalWatch<T, U> {
async fn ready(&mut self, id: usize) {
tracing::debug!("waiting for cookie {}", id);
self.parent.ready(id).await;
_ = self.cookie_index.wait_for(|v| v >= &id).await;
}
}

impl<T> CookiedOptionalWatch<T, ()> {
pub fn new(
update: CookieWriter,
) -> (
watch::Sender<Option<T>>,
CookieRegister,
CookiedOptionalWatch<T, ()>,
) {
let (tx, rx) = watch::channel(None);
let (cookie_tx, cookie_rx) = watch::channel(0);
tracing::debug!("starting cookied optional watch in {}", update.root());
(
tx,
CookieRegister(cookie_tx, update.root().to_owned()),
CookiedOptionalWatch {
value: rx,
cookie_index: cookie_rx,
cookie_writer: update,
parent: (),
},
)
}
}

impl<T, U: CookieReady + Clone> CookiedOptionalWatch<T, U> {
/// Create a new sibling cookie watcher that inherits the same fs source as
/// this one.
pub fn new_sibling<T2>(&self) -> (watch::Sender<Option<T2>>, CookiedOptionalWatch<T2, U>) {
let (tx, rx) = watch::channel(None);
(
tx,
CookiedOptionalWatch {
value: rx,
cookie_index: self.cookie_index.clone(),
cookie_writer: self.cookie_writer.clone(),
parent: self.parent.clone(),
},
)
}

/// Create a new child cookie watcher that inherits the same fs source as
/// this one, but also has its own cookie source. This allows you to
/// synchronize two independent cookie streams.
pub fn new_child<T2>(
&self,
) -> (
watch::Sender<Option<T2>>,
CookieRegister,
CookiedOptionalWatch<T2, Self>,
) {
let (tx, rx) = watch::channel(None);
let (cookie_tx, cookie_rx) = watch::channel(0);

(
tx,
CookieRegister(cookie_tx, self.cookie_writer.root().to_owned()),
CookiedOptionalWatch {
value: rx,
cookie_index: cookie_rx,
cookie_writer: self.cookie_writer.clone(),
parent: self.clone(),
},
)
}

#[tracing::instrument(skip(self))]
pub async fn get(&mut self) -> Result<SomeRef<'_, T>, CookieError> {
let next_id = self
.cookie_writer
.initialized_cookie_request(())
.await?
.serial;
self.ready(next_id).await;
tracing::debug!("waiting for data");
Ok(self.get_inner().await?)
}

#[tracing::instrument(skip(self))]
pub async fn get_change(&mut self) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
self.value.changed().await?;
self.get_inner().await
}

/// Please do not use this data from a user-facing query. It should only
/// really be used for internal state management. Equivalent to
/// `OptionalWatch::get`
///
/// For an example as to why we need this, sometimes file event processing
/// needs to access data but issuing a cookie request would deadlock.
///
/// `_reason` is purely for documentation purposes and is not used.
pub async fn get_raw(
&mut self,
_reason: &str,
) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
self.get_inner().await
}

/// Get the current value, if it is available.
///
/// Unlike `OptionalWatch::get_immediate`, this method will block until the
/// cookie has been seen, at which point it will call `now_or_never` on the
/// value watch.
#[tracing::instrument(skip(self))]
pub async fn get_immediate(
&mut self,
) -> Option<Result<SomeRef<'_, T>, watch::error::RecvError>> {
let next_id = self.cookie_writer.cookie_request(()).await.ok()?.serial;
self.cookie_index.wait_for(|v| v >= &next_id).await.ok()?;
self.get_inner().now_or_never()
}

async fn get_inner(&mut self) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
self.value.wait_for(|f| f.is_some()).await?;
Ok(SomeRef(self.value.borrow()))
}
}

pub struct CookieRegister(watch::Sender<usize>, AbsoluteSystemPathBuf);
impl CookieRegister {
pub fn register(&self, paths: &[&AbsoluteSystemPath]) {
tracing::trace!("registering cookie for {:?}", paths);
for path in paths {
if let Some(serial) = serial_for_path(&self.1, path) {
tracing::trace!("updating cookie to {}", serial);
let _ = self.0.send(serial);
}
}
}
}

#[cfg(test)]
mod test {
use std::time::Duration;
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-filewatch/src/optional_watch.rs
Expand Up @@ -46,7 +46,7 @@ impl<T> OptionalWatch<T> {
}
}

pub struct SomeRef<'a, T>(Ref<'a, Option<T>>);
pub struct SomeRef<'a, T>(pub(crate) Ref<'a, Option<T>>);

impl<'a, T> std::ops::Deref for SomeRef<'a, T> {
type Target = T;
Expand Down

0 comments on commit 46838c9

Please sign in to comment.