Skip to content

Commit

Permalink
Support listening events, add unsupported events
Browse files Browse the repository at this point in the history
This adds the ability to only listen for some event types. If you're
only interested in Created events and you're getting a lot of Write
events then you're just wasting CPU cycles

This also adds the ability listen on extra unportable event types; since
this is so related I figured I might as well do both. Ideally we want to
1) make it very very obvious you're doing something unportable, and 2)
make it reasonably easy "fallback" for platforms where this isn't
supported.

Unportable events start with "Unportable", which should document their
unportabilitiness. Also add a new Supports(Op) method, which should make
adding fallback logic relatively painless.

For example, to use CloseWrite where supported, but falling back to
Write when it's not:

	var op fsnotify.Op
	if w.Supports(fsnotify.UnportableCloseWrite) {
		op |= fsnotify.UnportableCloseWrite
	} else {
		op |= fsnotify.Create | fsnotify.Write
	}
	w.AddWith("/tmp", fsnotify.WithEvents(op))

And then you can deal with this in the write loop. There's a full
example in cmd/fsnotify/closewrite.go

TODO: need to write tests for this, update all platforms.

Fixes 7
Updates 519
  • Loading branch information
arp242 committed Apr 28, 2024
1 parent f04cd68 commit ad859c6
Show file tree
Hide file tree
Showing 14 changed files with 243 additions and 12 deletions.
13 changes: 12 additions & 1 deletion backend_fen.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,10 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
time.Now().Format("15:04:05.000000000"), name)
}

_ = getOptions(opts...)
with := getOptions(opts...)
if !w.Supports(with.op) {
return fmt.Errorf("%w: %s", ErrUnsupported, with.op)
}

// Currently we resolve symlinks that were explicitly requested to be
// watched. Otherwise we would use LStat here.
Expand Down Expand Up @@ -639,3 +642,11 @@ func (w *Watcher) WatchList() []string {

return entries
}

// Supports reports if all listed events are supported by this watcher backend.
func (w *Watcher) Supports(op Op) bool {
if op.Has(UnportableCloseWrite) {
return false
}
return true
}
35 changes: 31 additions & 4 deletions backend_inotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,30 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
}

name = filepath.Clean(name)
_ = getOptions(opts...)
with := getOptions(opts...)
if !w.Supports(with.op) {
return fmt.Errorf("%w: %s", ErrUnsupported, with.op)
}

var flags uint32 = unix.IN_MOVED_TO | unix.IN_MOVED_FROM |
unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY |
unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF
var flags uint32
if with.op.Has(Create) {
flags |= unix.IN_CREATE
}
if with.op.Has(Write) {
flags |= unix.IN_MODIFY
}
if with.op.Has(Remove) {
flags |= unix.IN_DELETE | unix.IN_DELETE_SELF
}
if with.op.Has(Rename) {
flags |= unix.IN_MOVED_TO | unix.IN_MOVED_FROM | unix.IN_MOVE_SELF
}
if with.op.Has(Chmod) {
flags |= unix.IN_ATTRIB
}
if with.op.Has(UnportableCloseWrite) {
flags |= unix.IN_CLOSE_WRITE
}

return w.watches.updatePath(name, func(existing *watch) (*watch, error) {
if existing != nil {
Expand Down Expand Up @@ -602,6 +621,9 @@ func (w *Watcher) newEvent(name string, mask uint32) Event {
if mask&unix.IN_MODIFY == unix.IN_MODIFY {
e.Op |= Write
}
if mask&unix.IN_CLOSE_WRITE == unix.IN_CLOSE_WRITE {
e.Op |= UnportableCloseWrite
}
if mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF || mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM {
e.Op |= Rename
}
Expand All @@ -610,3 +632,8 @@ func (w *Watcher) newEvent(name string, mask uint32) Event {
}
return e
}

// Supports reports if all listed events are supported by this watcher backend.
func (w *Watcher) Supports(op Op) bool {
return true // Supports everything.
}
41 changes: 39 additions & 2 deletions backend_kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,30 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
time.Now().Format("15:04:05.000000000"), name)
}

_ = getOptions(opts...)
with := getOptions(opts...)
if !w.Supports(with.op) {
return fmt.Errorf("%w: %s", ErrUnsupported, with.op)
}

var notes int
if with.op.Has(Create) {
// TODO: no way to disable this? Meh.
}
if with.op.Has(Write) {
notes |= unix.NOTE_WRITE
}
if with.op.Has(Remove) {
notes |= unix.NOTE_DELETE
}
if with.op.Has(Rename) {
notes |= unix.NOTE_RENAME
}
if with.op.Has(Chmod) {
notes |= unix.NOTE_ATTRIB
}
if with.op.Has(UnportableCloseWrite) {
notes |= internal.NOTE_CLOSE_WRITE
}

w.watches.addUserWatch(name)
_, err := w.addWatch(name, noteAllEvents)
Expand Down Expand Up @@ -755,6 +778,9 @@ func (w *Watcher) newEvent(name, linkName string, mask uint32) Event {
if mask&unix.NOTE_ATTRIB == unix.NOTE_ATTRIB {
e.Op |= Chmod
}
if internal.NOTE_CLOSE_WRITE > 0 && mask&internal.NOTE_CLOSE_WRITE == internal.NOTE_CLOSE_WRITE {
e.Op |= UnportableCloseWrite
}
// No point sending a write and delete event at the same time: if it's gone,
// then it's gone.
if e.Op.Has(Write) && e.Op.Has(Remove) {
Expand Down Expand Up @@ -858,7 +884,7 @@ func (w *Watcher) internalWatch(name string, fi os.FileInfo) (string, error) {
}

// watch file to mimic Linux inotify
return w.addWatch(name, noteAllEvents)
return w.addWatch(name, noteAllEvents) // XXX: remember flags from AddWith()
}

// Register events with the queue.
Expand Down Expand Up @@ -886,3 +912,14 @@ func (w *Watcher) read(events []unix.Kevent_t) ([]unix.Kevent_t, error) {
}
return events[0:n], nil
}

// Supports reports if all listed events are supported by this watcher backend.
func (w *Watcher) Supports(op Op) bool {
if runtime.GOOS == "freebsd" {
return true // Supports everything.
}
if op.Has(UnportableCloseWrite) {
return false
}
return true
}
3 changes: 3 additions & 0 deletions backend_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,6 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error { return nil }
//
// Returns nil if [Watcher.Close] was called.
func (w *Watcher) Remove(name string) error { return nil }

// Supports reports if all listed events are supported by this watcher backend.
func (w *Watcher) Supports(op Op) bool { return false }
11 changes: 11 additions & 0 deletions backend_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
}

with := getOptions(opts...)
if !w.Supports(with.op) {
return fmt.Errorf("%w: %s", ErrUnsupported, with.op)
}
if with.bufsize < 4096 {
return fmt.Errorf("fsnotify.WithBufferSize: buffer size cannot be smaller than 4096 bytes")
}
Expand Down Expand Up @@ -840,3 +843,11 @@ func (w *Watcher) toFSnotifyFlags(action uint32) uint64 {
}
return 0
}

// Supports reports if all listed events are supported by this watcher backend.
func (w *Watcher) Supports(op Op) bool {
if op.Has(UnportableCloseWrite) {
return false
}
return true
}
95 changes: 95 additions & 0 deletions cmd/fsnotify/closewrite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

import (
"math"
"sync"
"time"

"github.com/fsnotify/fsnotify"
)

func closeWrite(paths ...string) {
if len(paths) < 1 {
exit("must specify at least one path to watch")
}

w, err := fsnotify.NewWatcher()
if err != nil {
exit("creating a new watcher: %s", err)
}
defer w.Close()

var (
op fsnotify.Op
cw = w.Supports(fsnotify.UnportableCloseWrite)
)
if cw {
op |= fsnotify.UnportableCloseWrite
} else {
op |= fsnotify.Create | fsnotify.Write
}

go closeWriteLoop(w, cw)

for _, p := range paths {
err := w.AddWith(p, fsnotify.WithEvents(op))
if err != nil {
exit("%q: %s", p, err)
}
}

printTime("ready; press ^C to exit")
<-make(chan struct{})
}

func closeWriteLoop(w *fsnotify.Watcher, cw bool) {
var (
waitFor = 100 * time.Millisecond
mu sync.Mutex
timers = make(map[string]*time.Timer)
)
for {
select {
case err, ok := <-w.Errors:
if !ok {
return
}
panic(err)
case e, ok := <-w.Events:
if !ok {
return
}

// CloseWrite is supported: easy case.
if cw {
if e.Has(fsnotify.UnportableCloseWrite) {
printTime(e.String())
}
continue
}

// Get timer.
mu.Lock()
t, ok := timers[e.Name]
mu.Unlock()

// No timer yet, so create one.
if !ok {
t = time.AfterFunc(math.MaxInt64, func() {
printTime(e.String())
mu.Lock()
delete(timers, e.Name)
mu.Unlock()
})
t.Stop()

mu.Lock()
timers[e.Name] = t
mu.Unlock()
}

// Reset the timer for this path, so it will start from 100ms again.
t.Reset(waitFor)
}
}
}
3 changes: 3 additions & 0 deletions cmd/fsnotify/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Commands:
watch [paths] Watch the paths for changes and print the events.
file [file] Watch a single file for changes.
dedup [paths] Watch the paths for changes, suppressing duplicate events.
cw [paths] CloseWrite example.
`[1:]

func exit(format string, a ...interface{}) {
Expand Down Expand Up @@ -61,5 +62,7 @@ func main() {
file(args...)
case "dedup":
dedup(args...)
case "cw":
closeWrite(args...)
}
}
29 changes: 29 additions & 0 deletions fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ const (
// get triggered very frequently by some software. For example, Spotlight
// indexing on macOS, anti-virus software, backup software, etc.
Chmod

// File opened for writing was closed.
//
// Only works on Linux and FreeBSD.
//
// The advantage of using this over Write is that it's more reliable than
// waiting for Write events to stop. It's also faster (if you're not
// listening to Write events): copying a file of a few GB can easily
// generate tens of thousands of Write events.
UnportableCloseWrite
)

var (
Expand All @@ -95,6 +105,8 @@ var (
// - windows: The buffer size is too small; WithBufferSize() can be used to increase it.
// - kqueue, fen: Not used.
ErrEventOverflow = errors.New("fsnotify: queue or buffer overflow")

ErrUnsupported = errors.New("fsnotify: not supported with this backend")
)

func (o Op) String() string {
Expand All @@ -108,6 +120,9 @@ func (o Op) String() string {
if o.Has(Write) {
b.WriteString("|WRITE")
}
if o.Has(UnportableCloseWrite) {
b.WriteString("|CLOSE_WRITE")
}
if o.Has(Rename) {
b.WriteString("|RENAME")
}
Expand Down Expand Up @@ -135,6 +150,7 @@ type (
addOpt func(opt *withOpts)
withOpts struct {
bufsize int
op Op
}
)

Expand All @@ -147,6 +163,7 @@ var debug = func() bool {

var defaultOpts = withOpts{
bufsize: 65536, // 64K
op: Create | Write | Remove | Rename | Chmod,
}

func getOptions(opts ...addOpt) withOpts {
Expand All @@ -171,6 +188,18 @@ func WithBufferSize(bytes int) addOpt {
return func(opt *withOpts) { opt.bufsize = bytes }
}

// WithEvents sets which events to listen for.
//
// Default is Create | Write | Remove | Rename | Chmod.
//
// Unportable events are not supported by all platforms; see the documentation
// for details.
//
// Using an Unportable evnet returns an error.
func WithEvents(op Op) addOpt {
return func(opt *withOpts) { opt.op = op }
}

var enableRecurse = false

// Check if this path is recursive (ends with "/..." or "\..."), and return the
Expand Down
5 changes: 0 additions & 5 deletions helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,6 @@ func indent(s fmt.Stringer) string {

var join = filepath.Join

func isCI() bool {
_, ok := os.LookupEnv("CI")
return ok
}

func isKqueue() bool {
switch runtime.GOOS {
case "darwin", "freebsd", "openbsd", "netbsd", "dragonfly":
Expand Down
4 changes: 4 additions & 0 deletions internal/debug_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package internal

import "golang.org/x/sys/unix"

const (
NOTE_CLOSE_WRITE = 0
)

var names = []struct {
n string
m uint32
Expand Down
4 changes: 4 additions & 0 deletions internal/debug_dragonfly.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package internal

import "golang.org/x/sys/unix"

const (
NOTE_CLOSE_WRITE = 0
)

var names = []struct {
n string
m uint32
Expand Down

0 comments on commit ad859c6

Please sign in to comment.