Skip to content

Commit

Permalink
inotify: support filtering events, add unsupported events
Browse files Browse the repository at this point in the history
This adds the ability to only listen for some event types. If you're
only interested in Created events and you're getting a lot of Write
events then you're just wasting CPU cycles

This also adds the ability listen on extra unportable event types; since
this is so related I figured I might as well do both. Ideally we want to
1) make it very very obvious you're doing something unportable, and 2)
make it reasonably easy "fallback" for platforms where this isn't
supported.

Unportable events start with "Unportable", which should document their
unportabilitiness. Also add a new Supports(Op) method, which should make
adding fallback logic relatively painless.

For example, to use CloseWrite where supported, but falling back to
Write when it's not:

	var op fsnotify.Op
	if w.Supports(fsnotify.UnportableCloseWrite) {
		op |= fsnotify.UnportableCloseWrite
	} else {
		op |= fsnotify.Create | fsnotify.Write
	}
	w.AddWith("/tmp", fsnotify.WithEvents(op))

And then you can deal with this in the write loop. There's a full
example in cmd/fsnotify/closewrite.go

All of this is unexported for now, until support for other platforms has
been added.

Updates 7
Updates 519
  • Loading branch information
arp242 committed May 1, 2024
1 parent b528cd2 commit 0a0b082
Show file tree
Hide file tree
Showing 27 changed files with 750 additions and 41 deletions.
6 changes: 4 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ End-of-line escapes with `\` are not supported.

### Supported commands

watch path # Watch the path, reporting events for it. Nothing is
# watched by default.
watch path [ops] # Watch the path, reporting events for it. Nothing is
# watched by default. Optionally a list of ops can be
# given, as with AddWith(path, WithOps(...)).
unwatch path # Stop watching the path.
watchlist n # Assert watchlist length.

Expand All @@ -87,6 +88,7 @@ End-of-line escapes with `\` are not supported.
chmod mode path # Octal only
sleep time-in-ms

cat path # Read path (does nothing with the data; just reads it).
echo str >>path # Append "str" to "path".
echo str >path # Truncate "path" and write "str".

Expand Down
55 changes: 37 additions & 18 deletions backend_fen.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ type Watcher struct {

mu sync.Mutex
port *unix.EventPort
done chan struct{} // Channel for sending a "quit message" to the reader goroutine
dirs map[string]struct{} // Explicitly watched directories
watches map[string]struct{} // Explicitly watched non-directories
done chan struct{} // Channel for sending a "quit message" to the reader goroutine
dirs map[string]Op // Explicitly watched directories
watches map[string]Op // Explicitly watched non-directories
}

// NewWatcher creates a new Watcher.
Expand All @@ -153,8 +153,8 @@ func NewBufferedWatcher(sz uint) (*Watcher, error) {
w := &Watcher{
Events: make(chan Event, sz),
Errors: make(chan error),
dirs: make(map[string]struct{}),
watches: make(map[string]struct{}),
dirs: make(map[string]Op),
watches: make(map[string]Op),
done: make(chan struct{}),
}

Expand Down Expand Up @@ -269,7 +269,10 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
time.Now().Format("15:04:05.000000000"), name)
}

_ = getOptions(opts...)
with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}

// Currently we resolve symlinks that were explicitly requested to be
// watched. Otherwise we would use LStat here.
Expand All @@ -286,7 +289,7 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
}

w.mu.Lock()
w.dirs[name] = struct{}{}
w.dirs[name] = with.op
w.mu.Unlock()
return nil
}
Expand All @@ -297,7 +300,7 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
}

w.mu.Lock()
w.watches[name] = struct{}{}
w.watches[name] = with.op
w.mu.Unlock()
return nil
}
Expand Down Expand Up @@ -594,20 +597,24 @@ func (w *Watcher) associateFile(path string, stat os.FileInfo, follow bool) erro
// cleared up that discrepancy. The most likely cause is that the event
// has fired but we haven't processed it yet.
err := w.port.DissociatePath(path)
if err != nil && err != unix.ENOENT {
if err != nil && !errors.Is(err, unix.ENOENT) {
return err
}
}
// FILE_NOFOLLOW means we watch symlinks themselves rather than their
// targets.
events := unix.FILE_MODIFIED | unix.FILE_ATTRIB | unix.FILE_NOFOLLOW
if follow {
// We *DO* follow symlinks for explicitly watched entries.
events = unix.FILE_MODIFIED | unix.FILE_ATTRIB

var events int
if !follow {
// Watch symlinks themselves rather than their targets unless this entry
// is explicitly watched.
events |= unix.FILE_NOFOLLOW
}
if true { // TODO: implement withOps()
events |= unix.FILE_MODIFIED
}
return w.port.AssociatePath(path, stat,
events,
stat.Mode())
if true {
events |= unix.FILE_ATTRIB
}
return w.port.AssociatePath(path, stat, events, stat.Mode())
}

func (w *Watcher) dissociateFile(path string, stat os.FileInfo, unused bool) error {
Expand Down Expand Up @@ -639,3 +646,15 @@ func (w *Watcher) WatchList() []string {

return entries
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
if op.Has(xUnportableOpen) || op.Has(xUnportableRead) ||
op.Has(xUnportableCloseWrite) || op.Has(xUnportableCloseRead) {
return false
}
return true
}
58 changes: 53 additions & 5 deletions backend_inotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,41 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
time.Now().Format("15:04:05.000000000"), name)
}

name = filepath.Clean(name)
_ = getOptions(opts...)
with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}

var flags uint32 = unix.IN_MOVED_TO | unix.IN_MOVED_FROM |
unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY |
unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF
var flags uint32
if with.op.Has(Create) {
flags |= unix.IN_CREATE
}
if with.op.Has(Write) {
flags |= unix.IN_MODIFY
}
if with.op.Has(Remove) {
flags |= unix.IN_DELETE | unix.IN_DELETE_SELF
}
if with.op.Has(Rename) {
flags |= unix.IN_MOVED_TO | unix.IN_MOVED_FROM | unix.IN_MOVE_SELF
}
if with.op.Has(Chmod) {
flags |= unix.IN_ATTRIB
}
if with.op.Has(xUnportableOpen) {
flags |= unix.IN_OPEN
}
if with.op.Has(xUnportableRead) {
flags |= unix.IN_ACCESS
}
if with.op.Has(xUnportableCloseWrite) {
flags |= unix.IN_CLOSE_WRITE
}
if with.op.Has(xUnportableCloseRead) {
flags |= unix.IN_CLOSE_NOWRITE
}

name = filepath.Clean(name)
return w.watches.updatePath(name, func(existing *watch) (*watch, error) {
if existing != nil {
flags |= existing.flags | unix.IN_MASK_ADD
Expand Down Expand Up @@ -624,6 +652,18 @@ func (w *Watcher) newEvent(name string, mask, cookie uint32) Event {
if mask&unix.IN_MODIFY == unix.IN_MODIFY {
e.Op |= Write
}
if mask&unix.IN_OPEN == unix.IN_OPEN {
e.Op |= xUnportableOpen
}
if mask&unix.IN_ACCESS == unix.IN_ACCESS {
e.Op |= xUnportableRead
}
if mask&unix.IN_CLOSE_WRITE == unix.IN_CLOSE_WRITE {
e.Op |= xUnportableCloseWrite
}
if mask&unix.IN_CLOSE_NOWRITE == unix.IN_CLOSE_NOWRITE {
e.Op |= xUnportableCloseRead
}
if mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF || mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM {
e.Op |= Rename
}
Expand Down Expand Up @@ -655,3 +695,11 @@ func (w *Watcher) newEvent(name string, mask, cookie uint32) Event {
}
return e
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
return true // Supports everything.
}
20 changes: 19 additions & 1 deletion backend_kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
time.Now().Format("15:04:05.000000000"), name)
}

_ = getOptions(opts...)
with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}

w.watches.addUserWatch(name)
_, err := w.addWatch(name, noteAllEvents)
Expand Down Expand Up @@ -886,3 +889,18 @@ func (w *Watcher) read(events []unix.Kevent_t) ([]unix.Kevent_t, error) {
}
return events[0:n], nil
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
if runtime.GOOS == "freebsd" {
//return true // Supports everything.
}
if op.Has(xUnportableOpen) || op.Has(xUnportableRead) ||
op.Has(xUnportableCloseWrite) || op.Has(xUnportableCloseRead) {
return false
}
return true
}
6 changes: 6 additions & 0 deletions backend_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,9 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error { return nil }
//
// Returns nil if [Watcher.Close] was called.
func (w *Watcher) Remove(name string) error { return nil }

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool { return false }
15 changes: 15 additions & 0 deletions backend_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
}

with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}
if with.bufsize < 4096 {
return fmt.Errorf("fsnotify.WithBufferSize: buffer size cannot be smaller than 4096 bytes")
}
Expand Down Expand Up @@ -841,3 +844,15 @@ func (w *Watcher) toFSnotifyFlags(action uint32) uint64 {
}
return 0
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
if op.Has(xUnportableOpen) || op.Has(xUnportableRead) ||
op.Has(xUnportableCloseWrite) || op.Has(xUnportableCloseRead) {
return false
}
return true
}
89 changes: 89 additions & 0 deletions cmd/fsnotify/closewrite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

/*
func closeWrite(paths ...string) {
if len(paths) < 1 {
exit("must specify at least one path to watch")
}
w, err := fsnotify.NewWatcher()
if err != nil {
exit("creating a new watcher: %s", err)
}
defer w.Close()
var (
op fsnotify.Op
cw = w.Supports(fsnotify.UnportableCloseWrite)
)
if cw {
op |= fsnotify.UnportableCloseWrite
} else {
op |= fsnotify.Create | fsnotify.Write
}
go closeWriteLoop(w, cw)
for _, p := range paths {
err := w.AddWith(p, fsnotify.WithOps(op))
if err != nil {
exit("%q: %s", p, err)
}
}
printTime("ready; press ^C to exit")
<-make(chan struct{})
}
func closeWriteLoop(w *fsnotify.Watcher, cw bool) {
var (
waitFor = 100 * time.Millisecond
mu sync.Mutex
timers = make(map[string]*time.Timer)
)
for {
select {
case err, ok := <-w.Errors:
if !ok {
return
}
panic(err)
case e, ok := <-w.Events:
if !ok {
return
}
// CloseWrite is supported: easy case.
if cw {
if e.Has(fsnotify.UnportableCloseWrite) {
printTime(e.String())
}
continue
}
// Get timer.
mu.Lock()
t, ok := timers[e.Name]
mu.Unlock()
// No timer yet, so create one.
if !ok {
t = time.AfterFunc(math.MaxInt64, func() {
printTime(e.String())
mu.Lock()
delete(timers, e.Name)
mu.Unlock()
})
t.Stop()
mu.Lock()
timers[e.Name] = t
mu.Unlock()
}
// Reset the timer for this path, so it will start from 100ms again.
t.Reset(waitFor)
}
}
}
*/

0 comments on commit 0a0b082

Please sign in to comment.