Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support filtering events, add unsupported events #629

Merged
merged 1 commit into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ End-of-line escapes with `\` are not supported.

### Supported commands

watch path # Watch the path, reporting events for it. Nothing is
# watched by default.
watch path [ops] # Watch the path, reporting events for it. Nothing is
# watched by default. Optionally a list of ops can be
# given, as with AddWith(path, WithOps(...)).
unwatch path # Stop watching the path.
watchlist n # Assert watchlist length.

Expand All @@ -87,6 +88,7 @@ End-of-line escapes with `\` are not supported.
chmod mode path # Octal only
sleep time-in-ms

cat path # Read path (does nothing with the data; just reads it).
echo str >>path # Append "str" to "path".
echo str >path # Truncate "path" and write "str".

Expand Down
55 changes: 37 additions & 18 deletions backend_fen.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ type Watcher struct {

mu sync.Mutex
port *unix.EventPort
done chan struct{} // Channel for sending a "quit message" to the reader goroutine
dirs map[string]struct{} // Explicitly watched directories
watches map[string]struct{} // Explicitly watched non-directories
done chan struct{} // Channel for sending a "quit message" to the reader goroutine
dirs map[string]Op // Explicitly watched directories
watches map[string]Op // Explicitly watched non-directories
}

// NewWatcher creates a new Watcher.
Expand All @@ -153,8 +153,8 @@ func NewBufferedWatcher(sz uint) (*Watcher, error) {
w := &Watcher{
Events: make(chan Event, sz),
Errors: make(chan error),
dirs: make(map[string]struct{}),
watches: make(map[string]struct{}),
dirs: make(map[string]Op),
watches: make(map[string]Op),
done: make(chan struct{}),
}

Expand Down Expand Up @@ -269,7 +269,10 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
time.Now().Format("15:04:05.000000000"), name)
}

_ = getOptions(opts...)
with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}

// Currently we resolve symlinks that were explicitly requested to be
// watched. Otherwise we would use LStat here.
Expand All @@ -286,7 +289,7 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
}

w.mu.Lock()
w.dirs[name] = struct{}{}
w.dirs[name] = with.op
w.mu.Unlock()
return nil
}
Expand All @@ -297,7 +300,7 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
}

w.mu.Lock()
w.watches[name] = struct{}{}
w.watches[name] = with.op
w.mu.Unlock()
return nil
}
Expand Down Expand Up @@ -594,20 +597,24 @@ func (w *Watcher) associateFile(path string, stat os.FileInfo, follow bool) erro
// cleared up that discrepancy. The most likely cause is that the event
// has fired but we haven't processed it yet.
err := w.port.DissociatePath(path)
if err != nil && err != unix.ENOENT {
if err != nil && !errors.Is(err, unix.ENOENT) {
return err
}
}
// FILE_NOFOLLOW means we watch symlinks themselves rather than their
// targets.
events := unix.FILE_MODIFIED | unix.FILE_ATTRIB | unix.FILE_NOFOLLOW
if follow {
// We *DO* follow symlinks for explicitly watched entries.
events = unix.FILE_MODIFIED | unix.FILE_ATTRIB

var events int
if !follow {
// Watch symlinks themselves rather than their targets unless this entry
// is explicitly watched.
events |= unix.FILE_NOFOLLOW
}
if true { // TODO: implement withOps()
events |= unix.FILE_MODIFIED
}
return w.port.AssociatePath(path, stat,
events,
stat.Mode())
if true {
events |= unix.FILE_ATTRIB
}
return w.port.AssociatePath(path, stat, events, stat.Mode())
}

func (w *Watcher) dissociateFile(path string, stat os.FileInfo, unused bool) error {
Expand Down Expand Up @@ -639,3 +646,15 @@ func (w *Watcher) WatchList() []string {

return entries
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
if op.Has(xUnportableOpen) || op.Has(xUnportableRead) ||
op.Has(xUnportableCloseWrite) || op.Has(xUnportableCloseRead) {
return false
}
return true
}
58 changes: 53 additions & 5 deletions backend_inotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,41 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
time.Now().Format("15:04:05.000000000"), name)
}

name = filepath.Clean(name)
_ = getOptions(opts...)
with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}

var flags uint32 = unix.IN_MOVED_TO | unix.IN_MOVED_FROM |
unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY |
unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF
var flags uint32
if with.op.Has(Create) {
flags |= unix.IN_CREATE
}
if with.op.Has(Write) {
flags |= unix.IN_MODIFY
}
if with.op.Has(Remove) {
flags |= unix.IN_DELETE | unix.IN_DELETE_SELF
}
if with.op.Has(Rename) {
flags |= unix.IN_MOVED_TO | unix.IN_MOVED_FROM | unix.IN_MOVE_SELF
}
if with.op.Has(Chmod) {
flags |= unix.IN_ATTRIB
}
if with.op.Has(xUnportableOpen) {
flags |= unix.IN_OPEN
}
if with.op.Has(xUnportableRead) {
flags |= unix.IN_ACCESS
}
if with.op.Has(xUnportableCloseWrite) {
flags |= unix.IN_CLOSE_WRITE
}
if with.op.Has(xUnportableCloseRead) {
flags |= unix.IN_CLOSE_NOWRITE
}

name = filepath.Clean(name)
return w.watches.updatePath(name, func(existing *watch) (*watch, error) {
if existing != nil {
flags |= existing.flags | unix.IN_MASK_ADD
Expand Down Expand Up @@ -624,6 +652,18 @@ func (w *Watcher) newEvent(name string, mask, cookie uint32) Event {
if mask&unix.IN_MODIFY == unix.IN_MODIFY {
e.Op |= Write
}
if mask&unix.IN_OPEN == unix.IN_OPEN {
e.Op |= xUnportableOpen
}
if mask&unix.IN_ACCESS == unix.IN_ACCESS {
e.Op |= xUnportableRead
}
if mask&unix.IN_CLOSE_WRITE == unix.IN_CLOSE_WRITE {
e.Op |= xUnportableCloseWrite
}
if mask&unix.IN_CLOSE_NOWRITE == unix.IN_CLOSE_NOWRITE {
e.Op |= xUnportableCloseRead
}
if mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF || mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM {
e.Op |= Rename
}
Expand Down Expand Up @@ -655,3 +695,11 @@ func (w *Watcher) newEvent(name string, mask, cookie uint32) Event {
}
return e
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
return true // Supports everything.
}
20 changes: 19 additions & 1 deletion backend_kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
time.Now().Format("15:04:05.000000000"), name)
}

_ = getOptions(opts...)
with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}

w.watches.addUserWatch(name)
_, err := w.addWatch(name, noteAllEvents)
Expand Down Expand Up @@ -886,3 +889,18 @@ func (w *Watcher) read(events []unix.Kevent_t) ([]unix.Kevent_t, error) {
}
return events[0:n], nil
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
if runtime.GOOS == "freebsd" {
//return true // Supports everything.
}
if op.Has(xUnportableOpen) || op.Has(xUnportableRead) ||
op.Has(xUnportableCloseWrite) || op.Has(xUnportableCloseRead) {
return false
}
return true
}
6 changes: 6 additions & 0 deletions backend_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,9 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error { return nil }
//
// Returns nil if [Watcher.Close] was called.
func (w *Watcher) Remove(name string) error { return nil }

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool { return false }
15 changes: 15 additions & 0 deletions backend_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
}

with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}
if with.bufsize < 4096 {
return fmt.Errorf("fsnotify.WithBufferSize: buffer size cannot be smaller than 4096 bytes")
}
Expand Down Expand Up @@ -841,3 +844,15 @@ func (w *Watcher) toFSnotifyFlags(action uint32) uint64 {
}
return 0
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
if op.Has(xUnportableOpen) || op.Has(xUnportableRead) ||
op.Has(xUnportableCloseWrite) || op.Has(xUnportableCloseRead) {
return false
}
return true
}
89 changes: 89 additions & 0 deletions cmd/fsnotify/closewrite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

/*
func closeWrite(paths ...string) {
if len(paths) < 1 {
exit("must specify at least one path to watch")
}

w, err := fsnotify.NewWatcher()
if err != nil {
exit("creating a new watcher: %s", err)
}
defer w.Close()

var (
op fsnotify.Op
cw = w.Supports(fsnotify.UnportableCloseWrite)
)
if cw {
op |= fsnotify.UnportableCloseWrite
} else {
op |= fsnotify.Create | fsnotify.Write
}

go closeWriteLoop(w, cw)

for _, p := range paths {
err := w.AddWith(p, fsnotify.WithOps(op))
if err != nil {
exit("%q: %s", p, err)
}
}

printTime("ready; press ^C to exit")
<-make(chan struct{})
}

func closeWriteLoop(w *fsnotify.Watcher, cw bool) {
var (
waitFor = 100 * time.Millisecond
mu sync.Mutex
timers = make(map[string]*time.Timer)
)
for {
select {
case err, ok := <-w.Errors:
if !ok {
return
}
panic(err)
case e, ok := <-w.Events:
if !ok {
return
}

// CloseWrite is supported: easy case.
if cw {
if e.Has(fsnotify.UnportableCloseWrite) {
printTime(e.String())
}
continue
}

// Get timer.
mu.Lock()
t, ok := timers[e.Name]
mu.Unlock()

// No timer yet, so create one.
if !ok {
t = time.AfterFunc(math.MaxInt64, func() {
printTime(e.String())
mu.Lock()
delete(timers, e.Name)
mu.Unlock()
})
t.Stop()

mu.Lock()
timers[e.Name] = t
mu.Unlock()
}

// Reset the timer for this path, so it will start from 100ms again.
t.Reset(waitFor)
}
}
}
*/