Skip to content

Commit

Permalink
refactor: documentation and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
arlyon committed Feb 9, 2024
1 parent bc21fae commit 1a90d25
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions crates/turborepo-filewatch/src/package_watcher.rs
Expand Up @@ -72,13 +72,17 @@ pub struct PackageWatcher {
_exit_tx: oneshot::Sender<()>,
_handle: tokio::task::JoinHandle<()>,

/// The current package data, if available.
package_data: OptionalWatch<HashMap<AbsoluteSystemPathBuf, WorkspaceData>>,

/// The current package manager, if available.
manager_rx: OptionalWatch<PackageManagerState>,
}

impl PackageWatcher {
/// Creates a new package watcher whose current package data can be queried.
/// `backup_discovery` is used to perform the initial discovery of packages,
/// to populate the state before we can watch.
pub fn new<T: PackageDiscovery + Send + 'static>(
root: AbsoluteSystemPathBuf,
recv: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
Expand Down Expand Up @@ -176,6 +180,10 @@ struct PackageManagerState {
}

impl<T: PackageDiscovery + Send + 'static> Subscriber<T> {
/// Creates a new instance of PackageDiscovery. This will start a task that
/// performs the initial discovery using the `backup_discovery` of your
/// choice, and then listens to file system events to keep the package
/// data up to date.
fn new(
repo_root: AbsoluteSystemPathBuf,
mut recv: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
Expand All @@ -187,7 +195,8 @@ impl<T: PackageDiscovery + Send + 'static> Subscriber<T> {
let manager_tx = Arc::new(manager_tx);

// we create a second optional watch here so that we can ensure it is ready and
// pass it down stream after the initial discovery
// pass it down stream after the initial discovery, otherwise our package
// discovery watcher will consume events before we have our initial state
let (recv_tx, recv_rx) = OptionalWatch::new();
let recv_tx = Arc::new(recv_tx);

Expand Down Expand Up @@ -374,6 +383,7 @@ impl<T: PackageDiscovery + Send + 'static> Subscriber<T> {

match self.have_workspace_globs_changed(file_event).await {
Ok(true) => {
//
self.rediscover_packages().await;
Ok(())
}
Expand Down Expand Up @@ -497,17 +507,18 @@ impl<T: PackageDiscovery + Send + 'static> Subscriber<T> {
// a previous or subsequent event in the 'cluster' will still trigger
.unwrap_or_else(|_| state.filter.clone());

let changed = state.filter != new_filter;

if changed {
self.manager_tx.send_modify(|f| {
if let Some(state) = f {
state.filter = new_filter;
}
});
}

Ok(changed)
Ok(self.manager_tx.send_if_modified(|f| match f {
Some(state) if state.filter == new_filter => false,
Some(state) => {
tracing::debug!("workspace globs changed: {:?}", new_filter);
state.filter = new_filter;
true
}
// if we haven't got a valid manager, then it probably means
// that we are currently calcuating one, so we should just
// ignore this event
None => false,
}))
} else {
Ok(false)
}
Expand Down

0 comments on commit 1a90d25

Please sign in to comment.