Skip to content

Commit

Permalink
refactor: make all init synchronous to clean up data initialization
Browse files Browse the repository at this point in the history
This also makes generous use of a new type called an OptionalWatch which
allows downstream data dependencies to wait for a resource to become
available. All data dependencies are wrapped in an OptionalWatch, and
all initialization is strictly synchronous.
  • Loading branch information
arlyon committed Feb 8, 2024
1 parent d0140a8 commit 0fea88a
Show file tree
Hide file tree
Showing 8 changed files with 785 additions and 430 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/turborepo-filewatch/Cargo.toml
Expand Up @@ -15,8 +15,11 @@ dashmap = { workspace = true }
futures = { version = "0.3.26" }
itertools = { workspace = true }
notify = "6.0.1"
serde = { version = "1.0.190", features = ["derive"] }
serde_json = "1.0.106"
thiserror = "1.0.38"
tokio = { workspace = true, features = ["full", "time"] }
tokio-stream = "0.1.14"
tracing = "0.1.37"
tracing-test = "0.2.4"
turbopath = { workspace = true }
Expand All @@ -36,6 +39,7 @@ version = "0.2.4"

[dev-dependencies]
tempfile = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt"] }
tokio-scoped = "0.2.0"

[features]
Expand Down
Empty file.
27 changes: 17 additions & 10 deletions crates/turborepo-filewatch/src/globwatcher.rs
Expand Up @@ -10,12 +10,12 @@ use notify::Event;
use thiserror::Error;
use tokio::sync::{broadcast, mpsc, oneshot};
use tracing::warn;
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, RelativeUnixPath};
use turbopath::{AbsoluteSystemPathBuf, RelativeUnixPath};
use wax::{Any, Glob, Program};

use crate::{
cookies::{CookieError, CookieWatcher, CookieWriter, CookiedRequest},
NotifyError,
NotifyError, OptionalWatch,
};

type Hash = String;
Expand Down Expand Up @@ -146,16 +146,19 @@ struct GlobTracker {

impl GlobWatcher {
pub fn new(
root: &AbsoluteSystemPath,
root: AbsoluteSystemPathBuf,
cookie_jar: CookieWriter,
recv: broadcast::Receiver<Result<Event, NotifyError>>,
mut recv: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
) -> Self {
let (exit_ch, exit_signal) = tokio::sync::oneshot::channel();
let (query_ch, query_recv) = mpsc::channel(256);
let cookie_root = cookie_jar.root().to_owned();
tokio::task::spawn(
GlobTracker::new(root.to_owned(), cookie_root, exit_signal, recv, query_recv).watch(),
);
tokio::task::spawn(async move {
let recv = recv.get().await.unwrap().resubscribe();
GlobTracker::new(root, cookie_root, exit_signal, recv, query_recv)
.watch()
.await
});
Self {
cookie_jar,
_exit_ch: exit_ch,
Expand Down Expand Up @@ -455,9 +458,11 @@ mod test {
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let recv = watcher.watch();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2));

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), recv.clone());
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv);

let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"];
let raw_excludes = ["my-pkg/.next/cache/**"];
Expand Down Expand Up @@ -539,9 +544,10 @@ mod test {
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let recv = watcher.watch();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2));

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv);

let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"];
let raw_excludes: [&str; 0] = [];
Expand Down Expand Up @@ -633,9 +639,10 @@ mod test {
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let recv = watcher.watch();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2));

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv);

// On windows, we expect different sanitization before the
// globs are passed in, due to alternative data streams in files.
Expand Down

0 comments on commit 0fea88a

Please sign in to comment.