Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optionalwatch cookie support #7379

Merged
merged 2 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
242 changes: 223 additions & 19 deletions crates/turborepo-filewatch/src/cookies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@

use std::{collections::BinaryHeap, fs::OpenOptions, time::Duration};

use futures::FutureExt;
use notify::EventKind;
use thiserror::Error;
use tokio::{
sync::{broadcast, mpsc, oneshot},
sync::{broadcast, mpsc, oneshot, watch},
time::error::Elapsed,
};
use tracing::trace;
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation};

use crate::{NotifyError, OptionalWatch};
use crate::{optional_watch::SomeRef, NotifyError, OptionalWatch};

#[derive(Debug, Error)]
pub enum CookieError {
Expand All @@ -60,7 +61,7 @@ pub enum CookieError {
path: AbsoluteSystemPathBuf,
},
#[error("cookie queue is not available")]
Unavailable,
Unavailable(#[from] watch::error::RecvError),
}

/// CookieWriter is responsible for assigning filesystem cookies to a request
Expand Down Expand Up @@ -152,7 +153,7 @@ impl<T> CookieWatcher<T> {
if !matches!(event_kind, EventKind::Create(_)) {
return None;
}
if let Some(serial) = self.serial_for_path(path) {
if let Some(serial) = serial_for_path(&self.root, path) {
self.latest = serial;
let mut ready_requests = Vec::new();
while let Some(cookied_request) = self.pending_requests.pop() {
Expand All @@ -168,14 +169,14 @@ impl<T> CookieWatcher<T> {
None
}
}
}

fn serial_for_path(&self, path: &AbsoluteSystemPath) -> Option<usize> {
if self.root.relation_to_path(path) == PathRelation::Parent {
let filename = path.file_name()?;
filename.strip_suffix(".cookie")?.parse().ok()
} else {
None
}
fn serial_for_path(root: &AbsoluteSystemPath, path: &AbsoluteSystemPath) -> Option<usize> {
if root.relation_to_path(path) == PathRelation::Parent {
let filename = path.file_name()?;
filename.strip_suffix(".cookie")?.parse().ok()
} else {
None
}
}

Expand Down Expand Up @@ -231,18 +232,38 @@ impl CookieWriter {
request: T,
) -> Result<CookiedRequest<T>, CookieError> {
// we need to write the cookie from a single task so as to serialize them
tokio::time::timeout(self.timeout, self.cookie_request_inner(request)).await?
tokio::time::timeout(self.timeout, self.cookie_request_inner(request, None)).await?
}

async fn cookie_request_inner<T>(&self, request: T) -> Result<CookiedRequest<T>, CookieError> {
/// Internal services may want to wait for the cookie
/// system to be set up, before issuing a timeout for the
/// cookie request.
pub(crate) async fn initialized_cookie_request<T>(
&self,
request: T,
) -> Result<CookiedRequest<T>, CookieError> {
self.cookie_request_inner(request, Some(self.timeout)).await
}

async fn cookie_request_inner<T>(
&self,
request: T,
timeout: Option<Duration>,
) -> Result<CookiedRequest<T>, CookieError> {
let (resp_tx, resp_rx) = oneshot::channel();
let mut cookie_request_tx = self.cookie_request_sender_lazy.clone();
let Ok(cookie_request_tx) = cookie_request_tx.get().await.map(|s| s.to_owned()) else {
// the cookie queue is not ready and will never be ready
return Err(CookieError::Unavailable);
let mut cookie_request_sender_lazy = self.cookie_request_sender_lazy.clone();
let cookie_request_sender_lazy = cookie_request_sender_lazy
.get()
.await
.map(|s| s.to_owned())?;
cookie_request_sender_lazy.send(resp_tx).await?;
let serial = match timeout {
Some(timeout) => {
let resp_rx = tokio::time::timeout(timeout, resp_rx).await??;
resp_rx?
}
None => resp_rx.await??,
};
cookie_request_tx.send(resp_tx).await?;
let serial = resp_rx.await??;
Ok(CookiedRequest { request, serial })
}
}
Expand Down Expand Up @@ -289,6 +310,189 @@ fn handle_cookie_request(
}
}

/// a lightweight wrapper around OptionalWatch that embeds cookie ids into the
/// get call. for requests that require cookies (ie, waiting for filesystem
/// flushes) then a cookie watch is ideal
pub struct CookiedOptionalWatch<T, P: CookieReady> {
value: watch::Receiver<Option<T>>,
cookie_index: watch::Receiver<usize>,
cookie_writer: CookieWriter,
parent: P,
}

impl<T, P: CookieReady + Clone> Clone for CookiedOptionalWatch<T, P> {
fn clone(&self) -> Self {
Self {
value: self.value.clone(),
cookie_index: self.cookie_index.clone(),
cookie_writer: self.cookie_writer.clone(),
parent: self.parent.clone(),
}
}
}

pub trait CookieReady {
fn ready(&mut self, id: usize) -> impl std::future::Future<Output = ()>;
}

impl CookieReady for () {
async fn ready(&mut self, _id: usize) {}
}

impl<T, U: CookieReady> CookieReady for CookiedOptionalWatch<T, U> {
arlyon marked this conversation as resolved.
Show resolved Hide resolved
async fn ready(&mut self, id: usize) {
tracing::debug!("waiting for cookie {}", id);
self.parent.ready(id).await;
_ = self.cookie_index.wait_for(|v| v >= &id).await;
}
}

impl<T> CookiedOptionalWatch<T, ()> {
pub fn new(
update: CookieWriter,
) -> (
watch::Sender<Option<T>>,
CookieRegister,
CookiedOptionalWatch<T, ()>,
) {
let (tx, rx) = watch::channel(None);
let (cookie_tx, cookie_rx) = watch::channel(0);
tracing::debug!("starting cookied optional watch in {}", update.root());
(
tx,
CookieRegister(cookie_tx, update.root().to_owned()),
CookiedOptionalWatch {
value: rx,
cookie_index: cookie_rx,
cookie_writer: update,
parent: (),
},
)
}
}

impl<T, U: CookieReady + Clone> CookiedOptionalWatch<T, U> {
/// Create a new sibling cookie watcher that inherits the same fs source as
/// this one.
pub fn new_sibling<T2>(&self) -> (watch::Sender<Option<T2>>, CookiedOptionalWatch<T2, U>) {
let (tx, rx) = watch::channel(None);
(
tx,
CookiedOptionalWatch {
value: rx,
cookie_index: self.cookie_index.clone(),
cookie_writer: self.cookie_writer.clone(),
parent: self.parent.clone(),
},
)
}

/// Create a new child cookie watcher that inherits the same fs source as
/// this one, but also has its own cookie source. This allows you to
/// synchronize two independent cookie streams.
pub fn new_child<T2>(
&self,
) -> (
watch::Sender<Option<T2>>,
CookieRegister,
CookiedOptionalWatch<T2, Self>,
) {
let (tx, rx) = watch::channel(None);
let (cookie_tx, cookie_rx) = watch::channel(0);

(
tx,
CookieRegister(cookie_tx, self.cookie_writer.root().to_owned()),
CookiedOptionalWatch {
value: rx,
cookie_index: cookie_rx,
cookie_writer: self.cookie_writer.clone(),
parent: self.clone(),
},
)
}

#[tracing::instrument(skip(self))]
pub async fn get(&mut self) -> Result<SomeRef<'_, T>, CookieError> {
let next_id = self
.cookie_writer
.initialized_cookie_request(())
.await?
.serial;
self.ready(next_id).await;
tracing::debug!("waiting for data");
Ok(self.get_inner().await?)
}

#[tracing::instrument(skip(self))]
pub async fn get_change(&mut self) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
self.value.changed().await?;
self.get_inner().await
}

/// Please do not use this data from a user-facing query. It should only
/// really be used for internal state management. Equivalent to
/// `OptionalWatch::get`
///
/// For an example as to why we need this, sometimes file event processing
/// needs to access data but issuing a cookie request would deadlock.
///
/// `_reason` is purely for documentation purposes and is not used.
pub async fn get_raw(
&mut self,
_reason: &str,
) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
self.get_inner().await
}

/// Get the current value, if it is available.
///
/// Unlike `OptionalWatch::get_immediate`, this method will block until the
/// cookie has been seen, at which point it will call `now_or_never` on the
/// value watch.
#[tracing::instrument(skip(self))]
pub async fn get_immediate(
&mut self,
) -> Option<Result<SomeRef<'_, T>, watch::error::RecvError>> {
let next_id = self.cookie_writer.cookie_request(()).await.ok()?.serial;
self.cookie_index.wait_for(|v| v >= &next_id).await.ok()?;
self.get_inner().now_or_never()
}

/// Please do not use this data from a user-facing query. It should only
/// really be used for internal state management. Equivalent to
/// `OptionalWatch::get`
///
/// For an example as to why we need this, sometimes file event processing
/// needs to access data but issuing a cookie request would deadlock.
///
/// `_reason` is purely for documentation purposes and is not used.
pub async fn get_immediate_raw(
&mut self,
reason: &str,
) -> Option<Result<SomeRef<'_, T>, watch::error::RecvError>> {
self.get_raw(reason).now_or_never()
}

async fn get_inner(&mut self) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
self.value.wait_for(|f| f.is_some()).await?;
Ok(SomeRef(self.value.borrow()))
}
}

pub struct CookieRegister(watch::Sender<usize>, AbsoluteSystemPathBuf);
impl CookieRegister {
pub fn register(&self, paths: &[&AbsoluteSystemPath]) {
tracing::trace!("registering cookie for {:?}", paths);
for path in paths {
if let Some(serial) = serial_for_path(&self.1, path) {
tracing::trace!("updating cookie to {}", serial);
let _ = self.0.send(serial);
}
}
}
}

#[cfg(test)]
mod test {
use std::time::Duration;
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-filewatch/src/optional_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<T> OptionalWatch<T> {
}
}

pub struct SomeRef<'a, T>(Ref<'a, Option<T>>);
pub struct SomeRef<'a, T>(pub(crate) Ref<'a, Option<T>>);

impl<'a, T> std::ops::Deref for SomeRef<'a, T> {
type Target = T;
Expand Down