Skip to content

Commit

Permalink
kqueue: don't immediately remove watches for all files in a directory…
Browse files Browse the repository at this point in the history
… on Delete event (#526)

The problem was that that on Delete events for directories it would call
Watcher.Remove() for all files in that directory too. This is fine if you call
w.Remove() from the application, but if you rm -rf a directory the directory
itself tends to be the first remove event (on FreeBSD at least):

	DELETE WRITE  /tmp/xxx
	DELETE        /tmp/xxx/a
	DELETE        /tmp/xxx/b
	DELETE        /tmp/xxx/c
	DELETE        /tmp/xxx/d
	DELETE        /tmp/xxx/e
	DELETE        /tmp/xxx/f
	DELETE        /tmp/xxx/g

So what would happen is that the internal state for /tmp/xxx/a etc. would get
removed, and when the event gets processed we no longer have access to it.

Move Remove() to remove() with a flag to control removing files. If a watched
directory is removed then the files will emit their own delete event, so we
don't really need to do this.

Fixes #514

Also:

- Don't send Remove|Write events; that's never really useful: if it's
  gone, then it's gone. No other backends do this.

- Save a stat call by checking the readdir error instead.

- Don't stat every directory; this isn't really needed, and getting a
  write event before a delete is fine (and this didn't suppress the
  write even anyway).

- Add test/kqueue.c; just a simple implementation to test things.
  • Loading branch information
arp242 committed Oct 15, 2022
1 parent c62f582 commit 8f6708d
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 58 deletions.
48 changes: 21 additions & 27 deletions backend_kqueue.go
Expand Up @@ -290,6 +290,10 @@ func (w *Watcher) Add(name string) error {
//
// Removing a path that has not yet been added returns [ErrNonExistentWatch].
func (w *Watcher) Remove(name string) error {
return w.remove(name, true)
}

func (w *Watcher) remove(name string, unwatchFiles bool) error {
name = filepath.Clean(name)
w.mu.Lock()
if w.isClosed {
Expand Down Expand Up @@ -327,7 +331,7 @@ func (w *Watcher) Remove(name string) error {
w.mu.Unlock()

// Find all watched paths that are in this directory that are not external.
if isDir {
if unwatchFiles && isDir {
var pathsToRemove []string
w.mu.Lock()
for fd := range w.watchesByDir[name] {
Expand All @@ -338,13 +342,12 @@ func (w *Watcher) Remove(name string) error {
}
w.mu.Unlock()
for _, name := range pathsToRemove {
// Since these are internal, not much sense in propagating error
// to the user, as that will just confuse them with an error about
// a path they did not explicitly watch themselves.
// Since these are internal, not much sense in propagating error to
// the user, as that will just confuse them with an error about a
// path they did not explicitly watch themselves.
w.Remove(name)
}
}

return nil
}

Expand Down Expand Up @@ -529,25 +532,15 @@ func (w *Watcher) readEvents() {

event := w.newEvent(path.name, mask)

if path.isDir && !event.Has(Remove) {
// Double check to make sure the directory exists. This can
// happen when we do a rm -fr on a recursively watched folders
// and we receive a modification event first but the folder has
// been deleted and later receive the delete event.
if _, err := os.Lstat(event.Name); os.IsNotExist(err) {
event.Op |= Remove
}
}

if event.Has(Rename) || event.Has(Remove) {
w.Remove(event.Name)
w.remove(event.Name, false)
w.mu.Lock()
delete(w.fileExists, event.Name)
w.mu.Unlock()
}

if path.isDir && event.Has(Write) && !event.Has(Remove) {
w.sendDirectoryChangeEvents(event.Name)
w.sendDirectoryChangeEvents(event.Name, false)
} else {
if !w.sendEvent(event) {
closed = true
Expand All @@ -564,13 +557,7 @@ func (w *Watcher) readEvents() {
_, found := w.watches[fileDir]
w.mu.Unlock()
if found {
// make sure the directory exists before we watch for changes. When we
// do a recursive watch and perform rm -fr, the parent directory might
// have gone missing, ignore the missing directory and let the
// upcoming delete event remove the watch from the parent directory.
if _, err := os.Lstat(fileDir); err == nil {
w.sendDirectoryChangeEvents(fileDir)
}
w.sendDirectoryChangeEvents(fileDir, true)
}
} else {
filePath := filepath.Clean(event.Name)
Expand Down Expand Up @@ -598,6 +585,11 @@ func (w *Watcher) newEvent(name string, mask uint32) Event {
if mask&unix.NOTE_ATTRIB == unix.NOTE_ATTRIB {
e.Op |= Chmod
}
// No point sending a write and delete event at the same time: if it's gone,
// then it's gone.
if e.Op.Has(Write) && e.Op.Has(Remove) {
e.Op &^= Write
}
return e
}

Expand Down Expand Up @@ -638,16 +630,18 @@ func (w *Watcher) watchDirectoryFiles(dirPath string) error {
//
// This functionality is to have the BSD watcher match the inotify, which sends
// a create event for files created in a watched directory.
func (w *Watcher) sendDirectoryChangeEvents(dir string) {
// Get all files
func (w *Watcher) sendDirectoryChangeEvents(dir string, ignoreNotExists bool) {
files, err := ioutil.ReadDir(dir)
if err != nil {
// Directory could have been deleted already; just ignore that.
if ignoreNotExists && errors.Is(err, os.ErrNotExist) {
return
}
if !w.sendError(fmt.Errorf("fsnotify.sendDirectoryChangeEvents: %w", err)) {
return
}
}

// Search for new files
for _, fi := range files {
err := w.sendFileCreatedEventIfNew(filepath.Join(dir, fi.Name()), fi)
if err != nil {
Expand Down
88 changes: 57 additions & 31 deletions fsnotify_test.go
Expand Up @@ -437,7 +437,7 @@ func TestWatchRename(t *testing.T) {
# TODO: this is broken.
dragonfly:
REMOVE|WRITE "/"
REMOVE "/"
`},

{"rename watched directory", func(t *testing.T, w *Watcher, tmp string) {
Expand All @@ -450,9 +450,6 @@ func TestWatchRename(t *testing.T) {
}, `
rename /dir
kqueue:
remove|rename /dir
# TODO(v2): Windows should behave the same by default. See #518
windows:
create /dir/file
Expand Down Expand Up @@ -687,37 +684,64 @@ func TestWatchRm(t *testing.T) {
`},

{"remove watched directory", func(t *testing.T, w *Watcher, tmp string) {
if runtime.GOOS == "openbsd" || runtime.GOOS == "netbsd" {
t.Skip("behaviour is inconsistent on OpenBSD and NetBSD, and this test is flaky")
}

file := join(tmp, "file")

touch(t, file)
touch(t, tmp, "a")
touch(t, tmp, "b")
touch(t, tmp, "c")
touch(t, tmp, "d")
touch(t, tmp, "e")
touch(t, tmp, "f")
touch(t, tmp, "g")

mkdir(t, tmp, "h")
mkdir(t, tmp, "h", "a")
mkdir(t, tmp, "i")
mkdir(t, tmp, "i", "a")
mkdir(t, tmp, "j")
mkdir(t, tmp, "j", "a")
addWatch(t, w, tmp)
rmAll(t, tmp)
}, `
# OpenBSD, NetBSD
remove /file
remove|write /
freebsd:
remove|write "/"
remove ""
create "."
darwin:
remove /file
remove|write /
linux:
remove /file
remove /
fen:
remove /
remove /file
remove /
remove /a
remove /b
remove /c
remove /d
remove /e
remove /f
remove /g
remove /h
remove /i
remove /j
# TODO: this is broken; I've also seen (/i and /j missing):
# REMOVE "/"
# REMOVE "/a"
# REMOVE "/b"
# REMOVE "/c"
# REMOVE "/d"
# REMOVE "/e"
# REMOVE "/f"
# REMOVE "/g"
# WRITE "/h"
# WRITE "/h"
windows:
remove /file
remove /
REMOVE "/"
REMOVE "/a"
REMOVE "/b"
REMOVE "/c"
REMOVE "/d"
REMOVE "/e"
REMOVE "/f"
REMOVE "/g"
REMOVE "/h"
REMOVE "/i"
REMOVE "/j"
WRITE "/h"
WRITE "/h"
WRITE "/i"
WRITE "/i"
WRITE "/j"
WRITE "/j"
`},
}

Expand All @@ -727,6 +751,8 @@ func TestWatchRm(t *testing.T) {
}
}

// TODO: this fails reguarly in the CI; not sure if it's a bug with the test or
// code; need to look in to it.
func TestClose(t *testing.T) {
chanClosed := func(t *testing.T, w *Watcher) {
t.Helper()
Expand Down
123 changes: 123 additions & 0 deletions test/kqueue.c
@@ -0,0 +1,123 @@
// This is an example kqueue program which watches a directory and all paths in
// it with the same flags as those fsnotify uses. This is useful sometimes to
// test what events kqueue sends with as little abstraction as possible.
//
// Note this does *not* set up monitoring on new files as they're created.
//
// Usage:
// cc kqueue.c -o kqueue
// ./kqueue /path/to/dir

#include <sys/event.h>
#include <sys/time.h>
#include <dirent.h>
#include <fcntl.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

void die(const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);

if (fmt[0] && fmt[strlen(fmt)-1] == ':') {
fputc(' ', stderr);
perror(NULL);
}
else
fputc('\n', stderr);

exit(1);
}

int main(int argc, char* argv[]) {
if (argc < 2) {
fprintf(stderr, "usage: %s path/to/dir\n", argv[0]);
return 1;
}
char *dir = argv[1];

int kq = kqueue();
if (kq == -1)
die("kqueue:");

int fp = open(dir, O_RDONLY);
if (fp == -1)
die("open: %s:", dir);
DIR *dp = fdopendir(fp);
if (dp == NULL)
die("fdopendir:");

int fds[1024] = {fp};
char *names[1024] = {dir};
int n_fds = 0;
struct dirent *ls;
while ((ls = readdir(dp)) != NULL) {
if (ls->d_name[0] == '.')
continue;

char *path = malloc(strlen(dir) + strlen(ls->d_name) + 2);
sprintf(path, "%s/%s", dir, ls->d_name);

int fp = open(path, O_RDONLY);
if (fp == -1)
die("open: %s:", path);
fds[++n_fds] = fp;
names[n_fds] = path;
}

for (int i=0; i<=n_fds; i++) {
struct kevent changes;
EV_SET(&changes, fds[i], EVFILT_VNODE,
EV_ADD | EV_CLEAR | EV_ENABLE,
NOTE_DELETE | NOTE_WRITE | NOTE_ATTRIB | NOTE_RENAME,
0, 0);

int n = kevent(kq, &changes, 1, NULL, 0, NULL);
if (n == -1)
die("register kevent changes:");
}

printf("Ready; press ^C to exit\n");
for (;;) {
struct kevent event;
int n = kevent(kq, NULL, 0, &event, 1, NULL);
if (n == -1)
die("kevent:");
if (n == 0)
continue;

char *ev_name = malloc(128);
if (event.fflags & NOTE_WRITE)
strncat(ev_name, "WRITE ", 6);
if (event.fflags & NOTE_RENAME)
strncat(ev_name, "RENAME ", 6);
if (event.fflags & NOTE_ATTRIB)
strncat(ev_name, "CHMOD ", 5);
if (event.fflags & NOTE_DELETE) {
strncat(ev_name, "DELETE ", 7);
struct kevent changes;
EV_SET(&changes, event.ident, EVFILT_VNODE,
EV_DELETE,
NOTE_DELETE | NOTE_WRITE | NOTE_ATTRIB | NOTE_RENAME,
0, 0);
int n = kevent(kq, &changes, 1, NULL, 0, NULL);
if (n == -1)
die("remove kevent on delete:");
}

char *name;
for (int i=0; i<=n_fds; i++)
if (fds[i] == event.ident) {
name = names[i];
break;
}

printf("%-13s %s\n", ev_name, name);
}
return 0;
}

0 comments on commit 8f6708d

Please sign in to comment.