Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Rename daemon code #7440

Merged
merged 1 commit into from Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 10 additions & 9 deletions crates/turborepo-filewatch/src/cookies.rs
Expand Up @@ -7,17 +7,17 @@
//! won't get any stale events.
//!
//! Here's the `CookieWriter` flow:
//! - `CookieWriter` spins up a `watch_cookies` task and creates a
//! - `CookieWriter` spins up a `watch_for_cookie_requests` task and creates a
//! `cookie_requests` mpsc channel to send a cookie request to that task. The
//! cookie request consists of a oneshot `Sender` that the task can use to
//! send back the serial number.
//! - The `watch_cookies` task watches for cookie requests on
//! - The `watch_for_cookie_requests` task watches for cookie requests on
//! `cookie_requests_rx`. When one occurs, it creates the cookie file and
//! bumps the serial. It then sends the serial back using the `Sender`
//! - When `CookieWriter::cookie_request` is called, it sends the cookie request
//! to the `watch_cookies` channel and then waits for the serial as a response
//! (with a timeout). Upon getting the serial, a `CookiedRequest` gets
//! returned with the serial number attached.
//! to the `watch_for_cookie_request` channel and then waits for the serial as
//! a response (with a timeout). Upon getting the serial, a `CookiedRequest`
//! gets returned with the serial number attached.
//!
//! And here's the `CookieWatcher` flow:
//! - `GlobWatcher` creates a `CookieWatcher`.
Expand Down Expand Up @@ -208,7 +208,8 @@ impl CookieWriter {
tracing::debug!("nobody listening for cookie requests, exiting");
return;
};
watch_cookies(root.to_owned(), cookie_requests_rx, exit_signal).await;
watch_for_cookie_file_requests(root.to_owned(), cookie_requests_rx, exit_signal)
.await;
}
});
Self {
Expand Down Expand Up @@ -268,7 +269,7 @@ impl CookieWriter {
}
}

async fn watch_cookies(
async fn watch_for_cookie_file_requests(
root: AbsoluteSystemPathBuf,
mut cookie_requests: mpsc::Receiver<oneshot::Sender<Result<usize, CookieError>>>,
mut exit_signal: mpsc::Receiver<()>,
Expand All @@ -278,12 +279,12 @@ async fn watch_cookies(
tokio::select! {
biased;
_ = exit_signal.recv() => return,
req = cookie_requests.recv() => handle_cookie_request(&root, &mut serial, req),
req = cookie_requests.recv() => handle_cookie_file_request(&root, &mut serial, req),
}
}
}

fn handle_cookie_request(
fn handle_cookie_file_request(
root: &AbsoluteSystemPath,
serial: &mut usize,
req: Option<oneshot::Sender<Result<usize, CookieError>>>,
Expand Down
23 changes: 12 additions & 11 deletions crates/turborepo-filewatch/src/globwatcher.rs
Expand Up @@ -118,7 +118,7 @@ impl From<oneshot::error::RecvError> for Error {
}

pub struct GlobWatcher {
cookie_jar: CookieWriter,
cookie_writer: CookieWriter,
// _exit_ch exists to trigger a close on the receiver when an instance
// of this struct is dropped. The task that is receiving events will exit,
// dropping the other sender for the broadcast channel, causing all receivers
Expand Down Expand Up @@ -163,12 +163,12 @@ struct GlobTracker {
impl GlobWatcher {
pub fn new(
root: AbsoluteSystemPathBuf,
cookie_jar: CookieWriter,
cookie_writer: CookieWriter,
mut recv: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
) -> Self {
let (exit_ch, exit_signal) = tokio::sync::oneshot::channel();
let (query_ch_tx, query_ch_lazy) = OptionalWatch::new();
let cookie_root = cookie_jar.root().to_owned();
let cookie_root = cookie_writer.root().to_owned();
tokio::task::spawn(async move {
let Ok(recv) = recv.get().await.map(|r| r.resubscribe()) else {
// if this fails, it means that the filewatcher is not available
Expand All @@ -189,7 +189,7 @@ impl GlobWatcher {
.await
});
Self {
cookie_jar,
cookie_writer,
_exit_ch: exit_ch,
query_ch_lazy,
}
Expand Down Expand Up @@ -231,12 +231,13 @@ impl GlobWatcher {
candidates,
resp: tx,
};

self.send_request(req).await?;
tokio::time::timeout(timeout, rx).await??
}

async fn send_request(&self, req: Query) -> Result<(), Error> {
let cookied_request = self.cookie_jar.cookie_request(req).await?;
let cookied_request = self.cookie_writer.cookie_request(req).await?;
let mut query_ch = self.query_ch_lazy.clone();
let query_ch = query_ch
.get_immediate()
Expand Down Expand Up @@ -507,8 +508,8 @@ mod test {

let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap();
let recv = watcher.watch();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone());
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv);
let cookie_writer = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone());
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_writer, recv);

let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"];
let raw_excludes = ["my-pkg/.next/cache/**"];
Expand Down Expand Up @@ -590,9 +591,9 @@ mod test {

let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap();
let recv = watcher.watch();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone());
let cookie_writer = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone());

let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv);
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_writer, recv);

let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"];
let raw_excludes: [&str; 0] = [];
Expand Down Expand Up @@ -685,9 +686,9 @@ mod test {

let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap();
let recv = watcher.watch();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone());
let cookie_writer = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone());

let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv);
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_writer, recv);

// On windows, we expect different sanitization before the
// globs are passed in, due to alternative data streams in files.
Expand Down