Skip to content

Commit

Permalink
Add AddWith() to pass options, allow controlling Windows buffer size
Browse files Browse the repository at this point in the history
This is similar to Add(), except that you can pass options. Ideally this
should just be Add(), but that's not a compatible change, so we're stuck
with this until we do a v2.

There are quite a few enhancements that depend on *some* way to pass
options; as an example I added WithBufferSize() to control the buffer
size for (see #72) for the Windows backend, because that one is fairly
trivial to implement:

	w, err := fsnotify.NewWatcher()
	err = w.AddWith("/path", fsnotify.WithBufferSize(65536*4))

Some other options we might want to add:

	err = w.AddWith("/path",
		fsnotify.WithEvents(fsnotify.Open | fsnotify.Close),  // Filter events
		fsnotify.WithPoll(time.Second),                       // Use poll watcher
		fsnotify.WithFanotify(),                              // Prefer fanotify on Linux
		fsnotify.WithFollowLinks(true),                       // Control symlink follow behaviour
		fsnotify.WithDebounce(100*time.Milliseconds),         // Automatically debounce duplicate events
		fsnotify.WithRetry(1*time.Second, 1*time.Minute),     // Retry every second if the path disappears for a minute
	)

These are just some ideas, nothing fixed here yet. Some of these options
are likely to change once I get around to actually working on it.

This uses "functional options" so we can add more later. Options are
passed to Add() rather than the Watcher itself, so the behaviour can be
modified for every watch, rather than being global. This way you can do
things like watch /nfs-drive with a poll backend, and use the regular OS
backend for ~/dir, without having to create two watchers.

This upgrades fairly nicely to v2 where we rename AddWith() to Add():

	err = w.Add("/path",
		fsnotify.WithBufferSize(65536*4),
		fsnotify.WithEvents(fsnotify.Open | fsnotify.Close))

Folks will just have to s/fsnotify.AddWith/fsnotify.Add/, without having
to change all the option names. Plus having a consistent prefix
autocompletes nicely in editors.

Fixes #72
  • Loading branch information
arp242 committed Oct 14, 2022
1 parent 51c3806 commit c2a2940
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 24 deletions.
15 changes: 14 additions & 1 deletion backend_fen.go
Expand Up @@ -198,6 +198,8 @@ func (w *Watcher) Close() error {
//
// Returns [ErrClosed] if [Watcher.Close] was called.
//
// See [AddWith] for a version that allows adding options.
//
// # Watching directories
//
// All files in a directory are monitored, including new files that are created
Expand All @@ -215,14 +217,25 @@ func (w *Watcher) Close() error {
//
// Instead, watch the parent directory and use Event.Name to filter out files
// you're not interested in. There is an example of this in [cmd/fsnotify/file.go].
func (w *Watcher) Add(name string) error {
func (w *Watcher) Add(name string) error { return w.AddWith(name) }

// AddWith is like [Add], but has the possibility to add options. When using
// Add() the defaults described below are used.
//
// Possible options are:
//
// - [WithBufferSize] sets the buffer size for the Windows backend; no-op on
// other platforms. The default is 64K (65536 bytes).
func (w *Watcher) AddWith(name string, opts ...addOpt) error {
if w.isClosed() {
return ErrClosed
}
if w.port.PathIsWatched(name) {
return nil
}

_ = getOptions(opts...)

// Currently we resolve symlinks that were explicitly requested to be
// watched. Otherwise we would use LStat here.
stat, err := os.Stat(name)
Expand Down
17 changes: 15 additions & 2 deletions backend_inotify.go
Expand Up @@ -220,6 +220,8 @@ func (w *Watcher) Close() error {
//
// Returns [ErrClosed] if [Watcher.Close] was called.
//
// See [AddWith] for a version that allows adding options.
//
// # Watching directories
//
// All files in a directory are monitored, including new files that are created
Expand All @@ -237,12 +239,23 @@ func (w *Watcher) Close() error {
//
// Instead, watch the parent directory and use Event.Name to filter out files
// you're not interested in. There is an example of this in [cmd/fsnotify/file.go].
func (w *Watcher) Add(name string) error {
name = filepath.Clean(name)
func (w *Watcher) Add(name string) error { return w.AddWith(name) }

// AddWith is like [Add], but has the possibility to add options. When using
// Add() the defaults described below are used.
//
// Possible options are:
//
// - [WithBufferSize] sets the buffer size for the Windows backend; no-op on
// other platforms. The default is 64K (65536 bytes).
func (w *Watcher) AddWith(name string, opts ...addOpt) error {
if w.isClosed() {
return ErrClosed
}

name = filepath.Clean(name)
_ = getOptions(opts...)

var flags uint32 = unix.IN_MOVED_TO | unix.IN_MOVED_FROM |
unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY |
unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF
Expand Down
15 changes: 14 additions & 1 deletion backend_kqueue.go
Expand Up @@ -252,6 +252,8 @@ func (w *Watcher) Close() error {
//
// Returns [ErrClosed] if [Watcher.Close] was called.
//
// See [AddWith] for a version that allows adding options.
//
// # Watching directories
//
// All files in a directory are monitored, including new files that are created
Expand All @@ -269,7 +271,18 @@ func (w *Watcher) Close() error {
//
// Instead, watch the parent directory and use Event.Name to filter out files
// you're not interested in. There is an example of this in [cmd/fsnotify/file.go].
func (w *Watcher) Add(name string) error {
func (w *Watcher) Add(name string) error { return w.AddWith(name) }

// AddWith is like [Add], but has the possibility to add options. When using
// Add() the defaults described below are used.
//
// Possible options are:
//
// - [WithBufferSize] sets the buffer size for the Windows backend; no-op on
// other platforms. The default is 64K (65536 bytes).
func (w *Watcher) AddWith(name string, opts ...addOpt) error {
_ = getOptions(opts...)

w.mu.Lock()
w.userWatches[name] = struct{}{}
w.mu.Unlock()
Expand Down
13 changes: 13 additions & 0 deletions backend_other.go
Expand Up @@ -36,6 +36,8 @@ func (w *Watcher) Close() error {
//
// Returns [ErrClosed] if [Watcher.Close] was called.
//
// See [AddWith] for a version that allows adding options.
//
// # Watching directories
//
// All files in a directory are monitored, including new files that are created
Expand All @@ -57,6 +59,17 @@ func (w *Watcher) Add(name string) error {
return nil
}

// AddWith is like [Add], but has the possibility to add options. When using
// Add() the defaults described below are used.
//
// Possible options are:
//
// - [WithBufferSize] sets the buffer size for the Windows backend; no-op on
// other platforms. The default is 64K (65536 bytes).
func (w *Watcher) AddWith(name string, opts ...addOpt) error {
return nil
}

// Remove stops monitoring the path for changes.
//
// Directories are always removed non-recursively. For example, if you added
Expand Down
50 changes: 36 additions & 14 deletions backend_windows.go
Expand Up @@ -207,6 +207,8 @@ func (w *Watcher) Close() error {
//
// Returns [ErrClosed] if [Watcher.Close] was called.
//
// See [AddWith] for a version that allows adding options.
//
// # Watching directories
//
// All files in a directory are monitored, including new files that are created
Expand All @@ -224,16 +226,31 @@ func (w *Watcher) Close() error {
//
// Instead, watch the parent directory and use Event.Name to filter out files
// you're not interested in. There is an example of this in [cmd/fsnotify/file.go].
func (w *Watcher) Add(name string) error {
func (w *Watcher) Add(name string) error { return w.AddWith(name) }

// AddWith is like [Add], but has the possibility to add options. When using
// Add() the defaults described below are used.
//
// Possible options are:
//
// - [WithBufferSize] sets the buffer size for the Windows backend; no-op on
// other platforms. The default is 64K (65536 bytes).
func (w *Watcher) AddWith(name string, opts ...addOpt) error {
if w.isClosed() {
return ErrClosed
}

with := getOptions(opts...)
if with.bufsize < 4096 {
return fmt.Errorf("fsnotify.WithBufferSize: buffer size cannot be smaller than 4096 bytes")
}

in := &input{
op: opAddWatch,
path: filepath.Clean(name),
flags: sysFSALLEVENTS,
reply: make(chan error),
op: opAddWatch,
path: filepath.Clean(name),
flags: sysFSALLEVENTS,
reply: make(chan error),
bufsize: with.bufsize,
}
w.input <- in
if err := w.wakeupReader(); err != nil {
Expand Down Expand Up @@ -332,10 +349,11 @@ const (
)

type input struct {
op int
path string
flags uint32
reply chan error
op int
path string
flags uint32
bufsize int
reply chan error
}

type inode struct {
Expand All @@ -351,7 +369,7 @@ type watch struct {
mask uint64 // Directory itself is being watched with these notify flags
names map[string]uint64 // Map of names being watched and their notify flags
rename string // Remembers the old name while renaming a file
buf [65536]byte // 64K buffer
buf []byte // buffer, allocated later
}

type (
Expand Down Expand Up @@ -424,7 +442,7 @@ func (m watchMap) set(ino *inode, watch *watch) {
}

// Must run within the I/O thread.
func (w *Watcher) addWatch(pathname string, flags uint64) error {
func (w *Watcher) addWatch(pathname string, flags uint64, bufsize int) error {
dir, err := w.getDir(pathname)
if err != nil {
return err
Expand All @@ -447,6 +465,7 @@ func (w *Watcher) addWatch(pathname string, flags uint64) error {
ino: ino,
path: dir,
names: make(map[string]uint64),
buf: make([]byte, bufsize),
}
w.mu.Lock()
w.watches.set(ino, watchEntry)
Expand Down Expand Up @@ -546,8 +565,11 @@ func (w *Watcher) startRead(watch *watch) error {
return nil
}

rdErr := windows.ReadDirectoryChanges(watch.ino.handle, &watch.buf[0],
uint32(unsafe.Sizeof(watch.buf)), false, mask, nil, &watch.ov, 0)
// We need to pass the array, rather than the slice.
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&watch.buf))
rdErr := windows.ReadDirectoryChanges(watch.ino.handle,
(*byte)(unsafe.Pointer(hdr.Data)), uint32(hdr.Len),
false, mask, nil, &watch.ov, 0)
if rdErr != nil {
err := os.NewSyscallError("ReadDirectoryChanges", rdErr)
if rdErr == windows.ERROR_ACCESS_DENIED && watch.mask&provisional == 0 {
Expand Down Expand Up @@ -606,7 +628,7 @@ func (w *Watcher) readEvents() {
case in := <-w.input:
switch in.op {
case opAddWatch:
in.reply <- w.addWatch(in.path, uint64(in.flags))
in.reply <- w.addWatch(in.path, uint64(in.flags), in.bufsize)
case opRemoveWatch:
in.reply <- w.remWatch(in.path)
}
Expand Down
46 changes: 46 additions & 0 deletions backend_windows_test.go
Expand Up @@ -65,3 +65,49 @@ func TestWindowsNoAttributeChanges(t *testing.T) {
t.Fatalf("should not have received any events, received:\n%s", have)
}
}

// TODO: write test which makes sure the buffer size is set correctly.
func TestWindowsWithBufferSize(t *testing.T) {
getWatch := func(opts ...addOpt) (*watch, error) {
w, err := NewWatcher()
if err != nil {
return nil, err
}
if err := w.AddWith(t.TempDir(), opts...); err != nil {
return nil, err
}

// Hackery to get first (and only) map value.
var v indexMap
for _, v = range w.watches {
}
if len(v) != 1 {
t.Fatal()
}
var watch *watch
for _, watch = range v {
}
return watch, nil
}

check := func(w *watch, want int) {
if len(w.buf) != want || cap(w.buf) != want {
t.Fatalf("want = %d; len = %d; cap = %d", want, len(w.buf), cap(w.buf))
}
}

if w, err := getWatch(); err != nil {
t.Fatal(err)
} else {
check(w, 65536)
}
if w, err := getWatch(WithBufferSize(4096)); err != nil {
t.Fatal(err)
} else {
check(w, 4096)
}

if _, err := getWatch(WithBufferSize(1024)); err == nil || !strings.Contains(err.Error(), "cannot be smaller") {
t.Fatal(err)
}
}
31 changes: 28 additions & 3 deletions fsnotify.go
@@ -1,6 +1,3 @@
//go:build !plan9
// +build !plan9

// Package fsnotify provides a cross-platform interface for file system
// notifications.
package fsnotify
Expand Down Expand Up @@ -80,3 +77,31 @@ func (e Event) Has(op Op) bool { return e.Op.Has(op) }
func (e Event) String() string {
return fmt.Sprintf("%-13s %q", e.Op.String(), e.Name)
}

type (
addOpt func(opt *withOpts)
withOpts struct {
bufsize int
}
)

var defaultOpts = withOpts{
bufsize: 65536, // 64K
}

func getOptions(opts ...addOpt) withOpts {
with := defaultOpts
for _, o := range opts {
o(&with)
}
return with
}

// WithBufferSize sets the buffer size for the Windows backend. This is a no-op
// for other backends.
//
// The default value is 64K (65536 bytes) which should be enough for most
// applications, but you can increase it if you're hitting "short read" errors.
func WithBufferSize(bytes int) addOpt {
return func(opt *withOpts) { opt.bufsize = bytes }
}
3 changes: 0 additions & 3 deletions fsnotify_test.go
@@ -1,6 +1,3 @@
//go:build !plan9
// +build !plan9

package fsnotify

import (
Expand Down
14 changes: 14 additions & 0 deletions mkdoc.zsh
Expand Up @@ -89,6 +89,8 @@ add=$(<<EOF
//
// Returns [ErrClosed] if [Watcher.Close] was called.
//
// See [AddWith] for a version that allows adding options.
//
// # Watching directories
//
// All files in a directory are monitored, including new files that are created
Expand All @@ -109,6 +111,17 @@ add=$(<<EOF
EOF
)

addwith=$(<<EOF
// AddWith is like [Add], but has the possibility to add options. When using
// Add() the defaults described below are used.
//
// Possible options are:
//
// - [WithBufferSize] sets the buffer size for the Windows backend; no-op on
// other platforms. The default is 64K (65536 bytes).
EOF
)

remove=$(<<EOF
// Remove stops monitoring the path for changes.
//
Expand Down Expand Up @@ -205,6 +218,7 @@ set-cmt() {
set-cmt '^type Watcher struct ' $watcher
set-cmt '^func NewWatcher(' $new
set-cmt '^func (w \*Watcher) Add(' $add
set-cmt '^func (w \*Watcher) AddWith(' $addwith
set-cmt '^func (w \*Watcher) Remove(' $remove
set-cmt '^func (w \*Watcher) Close(' $close
set-cmt '^func (w \*Watcher) WatchList(' $watchlist
Expand Down

0 comments on commit c2a2940

Please sign in to comment.