Skip to content

Commit

Permalink
refactor: make all init synchronous to clean up data initialization
Browse files Browse the repository at this point in the history
This also makes generous use of a new type called an OptionalWatch which
allows downstream data dependencies to wait for a resource to become
available. All data dependencies are wrapped in an OptionalWatch, and
all initialization is strictly synchronous.
  • Loading branch information
arlyon committed Feb 9, 2024
1 parent 1b4ae80 commit 048d5f5
Show file tree
Hide file tree
Showing 8 changed files with 919 additions and 461 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/turborepo-filewatch/Cargo.toml
Expand Up @@ -15,8 +15,11 @@ dashmap = { workspace = true }
futures = { version = "0.3.26" }
itertools = { workspace = true }
notify = "6.0.1"
serde = { version = "1.0.190", features = ["derive"] }
serde_json = "1.0.106"
thiserror = "1.0.38"
tokio = { workspace = true, features = ["full", "time"] }
tokio-stream = "0.1.14"
tracing = "0.1.37"
tracing-test = "0.2.4"
turbopath = { workspace = true }
Expand All @@ -36,6 +39,7 @@ version = "0.2.4"

[dev-dependencies]
tempfile = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt"] }
tokio-scoped = "0.2.0"

[features]
Expand Down
64 changes: 50 additions & 14 deletions crates/turborepo-filewatch/src/cookies.rs
Expand Up @@ -3,12 +3,14 @@ use std::{collections::BinaryHeap, fs::OpenOptions, time::Duration};
use notify::EventKind;
use thiserror::Error;
use tokio::{
sync::{mpsc, oneshot},
sync::{broadcast, mpsc, oneshot},
time::error::Elapsed,
};
use tracing::trace;
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation};

use crate::{NotifyError, OptionalWatch};

#[derive(Debug, Error)]
pub enum CookieError {
#[error("cookie timeout expired")]
Expand All @@ -22,6 +24,8 @@ pub enum CookieError {
io_err: std::io::Error,
path: AbsoluteSystemPathBuf,
},
#[error("cookie queue is not available")]
Unavailable,
}

/// CookieWriter is responsible for assigning filesystem cookies to a request
Expand All @@ -30,7 +34,7 @@ pub enum CookieError {
pub struct CookieWriter {
root: AbsoluteSystemPathBuf,
timeout: Duration,
cookie_request_tx: mpsc::Sender<oneshot::Sender<Result<usize, CookieError>>>,
cookie_request_tx: OptionalWatch<mpsc::Sender<oneshot::Sender<Result<usize, CookieError>>>>,
// _exit_ch exists to trigger a close on the receiver when all instances
// of this struct are dropped. The task that is receiving events will exit,
// dropping the other sender for the broadcast channel, causing all receivers
Expand Down Expand Up @@ -138,18 +142,40 @@ impl<T> CookieWatcher<T> {
}

impl CookieWriter {
pub fn new(root: &AbsoluteSystemPath, timeout: Duration) -> Self {
pub fn new(
root: &AbsoluteSystemPath,
timeout: Duration,
mut recv: OptionalWatch<broadcast::Receiver<Result<notify::Event, NotifyError>>>,
) -> Self {
let (cookie_request_sender_tx, cookie_request_sender_rx) = OptionalWatch::new();
let (exit_ch, exit_signal) = mpsc::channel(16);
let (cookie_requests_tx, cookie_requests_rx) = mpsc::channel(16);
tokio::spawn(watch_cookies(
root.to_owned(),
cookie_requests_rx,
exit_signal,
));
tokio::spawn({
let root = root.to_owned();
async move {
if recv.get().await.is_err() {
// here we need to wait for confirmation that the watching end is ready
// before we start sending requests. this has the side effect of not
// enabling the cookie writing mechanism until the watcher is ready
return;
}

let (cookie_requests_tx, cookie_requests_rx) = mpsc::channel(16);

if cookie_request_sender_tx
.send(Some(cookie_requests_tx))
.is_err()
{
// the receiver has already been dropped
tracing::debug!("nobody listening for cookie requests, exiting");
return;
};
watch_cookies(root.to_owned(), cookie_requests_rx, exit_signal).await;
}
});
Self {
root: root.to_owned(),
timeout,
cookie_request_tx: cookie_requests_tx,
cookie_request_tx: cookie_request_sender_rx,
_exit_ch: exit_ch,
}
}
Expand All @@ -164,7 +190,15 @@ impl CookieWriter {
) -> Result<CookiedRequest<T>, CookieError> {
// we need to write the cookie from a single task so as to serialize them
let (resp_tx, resp_rx) = oneshot::channel();
self.cookie_request_tx.clone().send(resp_tx).await?;

// make sure the cookie writer is ready
let mut cookie_request_tx = self.cookie_request_tx.clone();
let Ok(cookie_request_tx) = cookie_request_tx.get().await.map(|s| s.to_owned()) else {
// the cookie queue is not ready and will never be ready
return Err(CookieError::Unavailable);
};

cookie_request_tx.send(resp_tx).await?;
let serial = tokio::time::timeout(self.timeout, resp_rx).await???;
Ok(CookiedRequest { request, serial })
}
Expand Down Expand Up @@ -224,7 +258,7 @@ mod test {
use turbopath::AbsoluteSystemPathBuf;

use super::{CookieWatcher, CookiedRequest};
use crate::{cookies::CookieWriter, NotifyError};
use crate::{cookies::CookieWriter, NotifyError, OptionalWatch};

struct TestQuery {
resp: oneshot::Sender<()>,
Expand Down Expand Up @@ -288,8 +322,9 @@ mod test {
.unwrap();

let (send_file_events, file_events) = broadcast::channel(16);
let recv = OptionalWatch::once(file_events.resubscribe());
let (reqs_tx, reqs_rx) = mpsc::channel(16);
let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2));
let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2), recv);
let (exit_tx, exit_rx) = oneshot::channel();

let service = TestService {
Expand Down Expand Up @@ -351,8 +386,9 @@ mod test {
.unwrap();

let (send_file_events, file_events) = broadcast::channel(16);
let recv = OptionalWatch::once(file_events.resubscribe());
let (reqs_tx, reqs_rx) = mpsc::channel(16);
let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2));
let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2), recv);
let (exit_tx, exit_rx) = oneshot::channel();

let service = TestService {
Expand Down
50 changes: 27 additions & 23 deletions crates/turborepo-filewatch/src/globwatcher.rs
Expand Up @@ -10,12 +10,12 @@ use notify::Event;
use thiserror::Error;
use tokio::sync::{broadcast, mpsc, oneshot};
use tracing::warn;
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, RelativeUnixPath};
use turbopath::{AbsoluteSystemPathBuf, RelativeUnixPath};
use wax::{Any, Glob, Program};

use crate::{
cookies::{CookieError, CookieWatcher, CookieWriter, CookiedRequest},
NotifyError,
NotifyError, OptionalWatch,
};

type Hash = String;
Expand Down Expand Up @@ -146,16 +146,24 @@ struct GlobTracker {

impl GlobWatcher {
pub fn new(
root: &AbsoluteSystemPath,
root: AbsoluteSystemPathBuf,
cookie_jar: CookieWriter,
recv: broadcast::Receiver<Result<Event, NotifyError>>,
mut recv: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
) -> Self {
let (exit_ch, exit_signal) = tokio::sync::oneshot::channel();
let (query_ch, query_recv) = mpsc::channel(256);
let cookie_root = cookie_jar.root().to_owned();
tokio::task::spawn(
GlobTracker::new(root.to_owned(), cookie_root, exit_signal, recv, query_recv).watch(),
);
tokio::task::spawn(async move {
let Ok(recv) = recv.get().await.map(|r| r.resubscribe()) else {
// if this fails, it means that the filewatcher is not available
// so starting the glob tracker is pointless
return;
};

GlobTracker::new(root, cookie_root, exit_signal, recv, query_recv)
.watch()
.await
});
Self {
cookie_jar,
_exit_ch: exit_ch,
Expand Down Expand Up @@ -452,12 +460,10 @@ mod test {
setup(&repo_root);
let cookie_dir = repo_root.join_component(".git");

let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2));

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap();
let recv = watcher.watch();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone());
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv);

let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"];
let raw_excludes = ["my-pkg/.next/cache/**"];
Expand Down Expand Up @@ -536,12 +542,11 @@ mod test {
setup(&repo_root);
let cookie_dir = repo_root.join_component(".git");

let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2));
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap();
let recv = watcher.watch();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone());

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv);

let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"];
let raw_excludes: [&str; 0] = [];
Expand Down Expand Up @@ -630,12 +635,11 @@ mod test {
setup(&repo_root);
let cookie_dir = repo_root.join_component(".git");

let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2));
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap();
let recv = watcher.watch();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone());

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv);

// On windows, we expect different sanitization before the
// globs are passed in, due to alternative data streams in files.
Expand Down

0 comments on commit 048d5f5

Please sign in to comment.