Skip to content

Commit

Permalink
inotify: support filtering events, add unsupported events
Browse files Browse the repository at this point in the history
This adds the ability to only listen for some event types. If you're
only interested in Created events and you're getting a lot of Write
events then you're just wasting CPU cycles

This also adds the ability listen on extra unportable event types; since
this is so related I figured I might as well do both. Ideally we want to
1) make it very very obvious you're doing something unportable, and 2)
make it reasonably easy "fallback" for platforms where this isn't
supported.

Unportable events start with "Unportable", which should document their
unportabilitiness. Also add a new Supports(Op) method, which should make
adding fallback logic relatively painless.

For example, to use CloseWrite where supported, but falling back to
Write when it's not:

	var op fsnotify.Op
	if w.Supports(fsnotify.UnportableCloseWrite) {
		op |= fsnotify.UnportableCloseWrite
	} else {
		op |= fsnotify.Create | fsnotify.Write
	}
	w.AddWith("/tmp", fsnotify.WithEvents(op))

And then you can deal with this in the write loop. There's a full
example in cmd/fsnotify/closewrite.go

All of this is unexported for now, until support for other platforms has
been added.

Updates 7
Updates 519
  • Loading branch information
arp242 committed May 1, 2024
1 parent 9ca3e9f commit 91388e3
Show file tree
Hide file tree
Showing 16 changed files with 491 additions and 28 deletions.
50 changes: 34 additions & 16 deletions backend_fen.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ type Watcher struct {

mu sync.Mutex
port *unix.EventPort
done chan struct{} // Channel for sending a "quit message" to the reader goroutine
dirs map[string]struct{} // Explicitly watched directories
watches map[string]struct{} // Explicitly watched non-directories
done chan struct{} // Channel for sending a "quit message" to the reader goroutine
dirs map[string]Op // Explicitly watched directories
watches map[string]Op // Explicitly watched non-directories
}

// NewWatcher creates a new Watcher.
Expand Down Expand Up @@ -269,7 +269,10 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
time.Now().Format("15:04:05.000000000"), name)
}

_ = getOptions(opts...)
with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}

// Currently we resolve symlinks that were explicitly requested to be
// watched. Otherwise we would use LStat here.
Expand All @@ -286,7 +289,7 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
}

w.mu.Lock()
w.dirs[name] = struct{}{}
w.dirs[name] = op
w.mu.Unlock()
return nil
}
Expand All @@ -297,7 +300,7 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
}

w.mu.Lock()
w.watches[name] = struct{}{}
w.watches[name] = op
w.mu.Unlock()
return nil
}
Expand Down Expand Up @@ -594,20 +597,24 @@ func (w *Watcher) associateFile(path string, stat os.FileInfo, follow bool) erro
// cleared up that discrepancy. The most likely cause is that the event
// has fired but we haven't processed it yet.
err := w.port.DissociatePath(path)
if err != nil && err != unix.ENOENT {
if err != nil && !errors.Is(err, unix.ENOENT) {
return err
}
}
// FILE_NOFOLLOW means we watch symlinks themselves rather than their
// targets.
events := unix.FILE_MODIFIED | unix.FILE_ATTRIB | unix.FILE_NOFOLLOW
if follow {
// We *DO* follow symlinks for explicitly watched entries.
events = unix.FILE_MODIFIED | unix.FILE_ATTRIB

var events int
if !follow {
// Watch symlinks themselves rather than their targets unless this entry
// is explicitly watched.
events |= unix.FILE_NOFOLLOW
}
if true { // XXX
events |= unix.FILE_MODIFIED
}
return w.port.AssociatePath(path, stat,
events,
stat.Mode())
if true {
events |= unix.FILE_ATTRIB
}
return w.port.AssociatePath(path, stat, events, stat.Mode())
}

func (w *Watcher) dissociateFile(path string, stat os.FileInfo, unused bool) error {
Expand Down Expand Up @@ -639,3 +646,14 @@ func (w *Watcher) WatchList() []string {

return entries
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
if op.Has(xUnportableCloseWrite) {
return false
}
return true
}
38 changes: 34 additions & 4 deletions backend_inotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,30 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
}

name = filepath.Clean(name)
_ = getOptions(opts...)
with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}

var flags uint32 = unix.IN_MOVED_TO | unix.IN_MOVED_FROM |
unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY |
unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF
var flags uint32
if with.op.Has(Create) {
flags |= unix.IN_CREATE
}
if with.op.Has(Write) {
flags |= unix.IN_MODIFY
}
if with.op.Has(Remove) {
flags |= unix.IN_DELETE | unix.IN_DELETE_SELF
}
if with.op.Has(Rename) {
flags |= unix.IN_MOVED_TO | unix.IN_MOVED_FROM | unix.IN_MOVE_SELF
}
if with.op.Has(Chmod) {
flags |= unix.IN_ATTRIB
}
if with.op.Has(xUnportableCloseWrite) {
flags |= unix.IN_CLOSE_WRITE
}

return w.watches.updatePath(name, func(existing *watch) (*watch, error) {
if existing != nil {
Expand Down Expand Up @@ -624,6 +643,9 @@ func (w *Watcher) newEvent(name string, mask, cookie uint32) Event {
if mask&unix.IN_MODIFY == unix.IN_MODIFY {
e.Op |= Write
}
if mask&unix.IN_CLOSE_WRITE == unix.IN_CLOSE_WRITE {
e.Op |= xUnportableCloseWrite
}
if mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF || mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM {
e.Op |= Rename
}
Expand Down Expand Up @@ -655,3 +677,11 @@ func (w *Watcher) newEvent(name string, mask, cookie uint32) Event {
}
return e
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
return true // Supports everything.
}
19 changes: 18 additions & 1 deletion backend_kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
time.Now().Format("15:04:05.000000000"), name)
}

_ = getOptions(opts...)
with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}

w.watches.addUserWatch(name)
_, err := w.addWatch(name, noteAllEvents)
Expand Down Expand Up @@ -886,3 +889,17 @@ func (w *Watcher) read(events []unix.Kevent_t) ([]unix.Kevent_t, error) {
}
return events[0:n], nil
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
if runtime.GOOS == "freebsd" {
return true // Supports everything.
}
if op.Has(xUnportableCloseWrite) {
return false
}
return true
}
6 changes: 6 additions & 0 deletions backend_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,9 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error { return nil }
//
// Returns nil if [Watcher.Close] was called.
func (w *Watcher) Remove(name string) error { return nil }

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool { return false }
14 changes: 14 additions & 0 deletions backend_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
}

with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}
if with.bufsize < 4096 {
return fmt.Errorf("fsnotify.WithBufferSize: buffer size cannot be smaller than 4096 bytes")
}
Expand Down Expand Up @@ -841,3 +844,14 @@ func (w *Watcher) toFSnotifyFlags(action uint32) uint64 {
}
return 0
}

// Supports reports if all the listed operations are supported by this platform.
//
// Create, Write, Remove, Rename, and Chmod are always supported. It can only
// return false for an Op starting with Unportable.
func (w *Watcher) xSupports(op Op) bool {
if op.Has(xUnportableCloseWrite) {
return false
}
return true
}
89 changes: 89 additions & 0 deletions cmd/fsnotify/closewrite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

/*
func closeWrite(paths ...string) {
if len(paths) < 1 {
exit("must specify at least one path to watch")
}
w, err := fsnotify.NewWatcher()
if err != nil {
exit("creating a new watcher: %s", err)
}
defer w.Close()
var (
op fsnotify.Op
cw = w.Supports(fsnotify.UnportableCloseWrite)
)
if cw {
op |= fsnotify.UnportableCloseWrite
} else {
op |= fsnotify.Create | fsnotify.Write
}
go closeWriteLoop(w, cw)
for _, p := range paths {
err := w.AddWith(p, fsnotify.WithOps(op))
if err != nil {
exit("%q: %s", p, err)
}
}
printTime("ready; press ^C to exit")
<-make(chan struct{})
}
func closeWriteLoop(w *fsnotify.Watcher, cw bool) {
var (
waitFor = 100 * time.Millisecond
mu sync.Mutex
timers = make(map[string]*time.Timer)
)
for {
select {
case err, ok := <-w.Errors:
if !ok {
return
}
panic(err)
case e, ok := <-w.Events:
if !ok {
return
}
// CloseWrite is supported: easy case.
if cw {
if e.Has(fsnotify.UnportableCloseWrite) {
printTime(e.String())
}
continue
}
// Get timer.
mu.Lock()
t, ok := timers[e.Name]
mu.Unlock()
// No timer yet, so create one.
if !ok {
t = time.AfterFunc(math.MaxInt64, func() {
printTime(e.String())
mu.Lock()
delete(timers, e.Name)
mu.Unlock()
})
t.Stop()
mu.Lock()
timers[e.Name] = t
mu.Unlock()
}
// Reset the timer for this path, so it will start from 100ms again.
t.Reset(waitFor)
}
}
}
*/
4 changes: 4 additions & 0 deletions cmd/fsnotify/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Commands:
dedup [paths] Watch the paths for changes, suppressing duplicate events.
`[1:]

// cw [paths] CloseWrite example.

func exit(format string, a ...interface{}) {
fmt.Fprintf(os.Stderr, filepath.Base(os.Args[0])+": "+format+"\n", a...)
fmt.Print("\n" + usage)
Expand Down Expand Up @@ -61,5 +63,7 @@ func main() {
file(args...)
case "dedup":
dedup(args...)
//case "cw":
// closeWrite(args...)
}
}
33 changes: 33 additions & 0 deletions fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ const (
// get triggered very frequently by some software. For example, Spotlight
// indexing on macOS, anti-virus software, backup software, etc.
Chmod

// File opened for writing was closed.
//
// Only works on Linux and FreeBSD.
//
// The advantage of using this over Write is that it's more reliable than
// waiting for Write events to stop. It's also faster (if you're not
// listening to Write events): copying a file of a few GB can easily
// generate tens of thousands of Write events.
xUnportableCloseWrite
)

var (
Expand All @@ -105,6 +115,10 @@ var (
// - windows: The buffer size is too small; WithBufferSize() can be used to increase it.
// - kqueue, fen: Not used.
ErrEventOverflow = errors.New("fsnotify: queue or buffer overflow")

// ErrUnsupported is returned by AddWith() when WithOps() specified an
// Unportable event that's not supported on this platform.
xErrUnsupported = errors.New("fsnotify: not supported with this backend")
)

func (o Op) String() string {
Expand All @@ -118,6 +132,9 @@ func (o Op) String() string {
if o.Has(Write) {
b.WriteString("|WRITE")
}
if o.Has(xUnportableCloseWrite) {
b.WriteString("|CLOSE_WRITE")
}
if o.Has(Rename) {
b.WriteString("|RENAME")
}
Expand Down Expand Up @@ -148,6 +165,7 @@ type (
addOpt func(opt *withOpts)
withOpts struct {
bufsize int
op Op
}
)

Expand All @@ -160,6 +178,7 @@ var debug = func() bool {

var defaultOpts = withOpts{
bufsize: 65536, // 64K
op: Create | Write | Remove | Rename | Chmod,
}

func getOptions(opts ...addOpt) withOpts {
Expand All @@ -184,6 +203,20 @@ func WithBufferSize(bytes int) addOpt {
return func(opt *withOpts) { opt.bufsize = bytes }
}

// WithOps sets which operations to listen for.
//
// Excluding operations can save quite a bit of CPU time.
//
// Default is Create | Write | Remove | Rename | Chmod.
//
// This can also be used to add unportable operations not supported by all
// platforms.
//
// Using an unsupported operation returns an error.
func withOps(op Op) addOpt {
return func(opt *withOpts) { opt.op = op }
}

var enableRecurse = false

// Check if this path is recursive (ends with "/..." or "\..."), and return the
Expand Down

0 comments on commit 91388e3

Please sign in to comment.