From 5e8ff902b136ea132c0b7f39bf47eedb0eed2d43 Mon Sep 17 00:00:00 2001 From: kris Date: Thu, 19 Jan 2023 09:05:01 -0700 Subject: [PATCH 1/6] adding options parameters to NewWatcher to allow for buffered EventChannels --- backend_fen.go | 7 ++++--- backend_inotify.go | 7 ++++--- backend_kqueue.go | 5 +++-- backend_other.go | 4 ++-- backend_windows.go | 18 +++++++++++++++--- fsnotify.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ fsnotify_test.go | 28 ++++++++++++++++++++++++++++ 7 files changed, 102 insertions(+), 13 deletions(-) diff --git a/backend_fen.go b/backend_fen.go index 500bd63f..8e2a2df0 100644 --- a/backend_fen.go +++ b/backend_fen.go @@ -131,10 +131,11 @@ type Watcher struct { watches map[string]struct{} // Explicitly watched non-directories } -// NewWatcher creates a new Watcher. -func NewWatcher() (*Watcher, error) { +// NewWatcher creates a new Watcher with an optional set of Option functions. +func NewWatcher(opts ...newOpt) (*Watcher, error) { + o := getNewOptions(opts...) w := &Watcher{ - Events: make(chan Event), + Events: o.eventChannel(), Errors: make(chan error), dirs: make(map[string]struct{}), watches: make(map[string]struct{}), diff --git a/backend_inotify.go b/backend_inotify.go index fe5033b1..6b69f27a 100644 --- a/backend_inotify.go +++ b/backend_inotify.go @@ -230,8 +230,8 @@ func (w *watches) updatePath(path string, f func(*watch) (*watch, error)) error return nil } -// NewWatcher creates a new Watcher. -func NewWatcher() (*Watcher, error) { +// NewWatcher creates a new Watcher with an optional set of Option functions. +func NewWatcher(opts ...newOpt) (*Watcher, error) { // Need to set nonblocking mode for SetDeadline to work, otherwise blocking // I/O operations won't terminate on close. fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK) @@ -239,11 +239,12 @@ func NewWatcher() (*Watcher, error) { return nil, errno } + o := getNewOptions(opts...) w := &Watcher{ fd: fd, inotifyFile: os.NewFile(uintptr(fd), ""), watches: newWatches(), - Events: make(chan Event), + Events: o.eventChannel(), Errors: make(chan error), done: make(chan struct{}), doneResp: make(chan struct{}), diff --git a/backend_kqueue.go b/backend_kqueue.go index 9d464504..df91e68c 100644 --- a/backend_kqueue.go +++ b/backend_kqueue.go @@ -142,13 +142,14 @@ type pathInfo struct { isDir bool } -// NewWatcher creates a new Watcher. +// NewWatcher creates a new Watcher with an optional set of Option functions. func NewWatcher() (*Watcher, error) { kq, closepipe, err := newKqueue() if err != nil { return nil, err } + o := getNewOptions(opts...) w := &Watcher{ kq: kq, closepipe: closepipe, @@ -158,7 +159,7 @@ func NewWatcher() (*Watcher, error) { paths: make(map[int]pathInfo), fileExists: make(map[string]struct{}), userWatches: make(map[string]struct{}), - Events: make(chan Event), + Events: o.eventChannel(), Errors: make(chan error), done: make(chan struct{}), } diff --git a/backend_other.go b/backend_other.go index 1668de90..4c3ce17a 100644 --- a/backend_other.go +++ b/backend_other.go @@ -117,8 +117,8 @@ type Watcher struct { Errors chan error } -// NewWatcher creates a new Watcher. -func NewWatcher() (*Watcher, error) { +// NewWatcher creates a new Watcher with an optional set of Option functions. +func NewWatcher(opts ...newOpt) (*Watcher, error) { return nil, errors.New("fsnotify not supported on the current platform") } diff --git a/backend_windows.go b/backend_windows.go index c0359ac8..9dc24e56 100644 --- a/backend_windows.go +++ b/backend_windows.go @@ -141,17 +141,29 @@ type Watcher struct { closed bool // Set to true when Close() is first called } -// NewWatcher creates a new Watcher. -func NewWatcher() (*Watcher, error) { +// NewWatcher creates a new Watcher with an optional set of Option functions. +func NewWatcher(opts ...newOpt) (*Watcher, error) { port, err := windows.CreateIoCompletionPort(windows.InvalidHandle, 0, 0, 0) if err != nil { return nil, os.NewSyscallError("CreateIoCompletionPort", err) } + + o := getNewOptions(opts...) + + // Windows doesn't have an internal kernel buffer so we default to an Event + // channel buffer of 50 if no manual buffer is set. + var ch chan Event + if o.eventbuffer > 0 { + ch = make(chan Event, o.eventbuffer) + } else { + //no manual buffer set, use the default of 50 + ch = make(chan Event, 50) + } w := &Watcher{ port: port, watches: make(watchMap), input: make(chan *input, 1), - Events: make(chan Event, 50), + Events: ch, Errors: make(chan error), quit: make(chan chan<- error, 1), } diff --git a/fsnotify.go b/fsnotify.go index c00ce762..c71eba3a 100644 --- a/fsnotify.go +++ b/fsnotify.go @@ -141,3 +141,49 @@ func recursivePath(path string) (string, bool) { } return path, false } + +type newOpt func(opt *newOpts) //option function to be passed to the NewWatcher + +// newOpts is used to configure some optional items in the Watcher structure during configuration. +// The newOpts structure is extensible so that new optional configuration items can be added later +type newOpts struct { + eventbuffer uint +} + +var defaultNewOpts = newOpts{ + eventbuffer: 0, // Default to an unbuffered channel +} + +// WithBufferedEventChannel configures the Watcher with an optionally buffered Event channel +// +// The default Event channel is unbuffered, which means that the host kernel is solely +// responsible for buffering events. A buffered Event channel allows for a userspace +// application to add additional event buffering without modifying kernel parameters. +// Adjusting the kernel buffers will always provide better performance on highly loaded +// systems, but a user space buffer can potentially provide some resiliency when it is +// not possible to change the kernel parameters +func WithBufferedEventChannel(sz uint) newOpt { + return func(opt *newOpts) { + if opt != nil { + opt.eventbuffer = sz + } + } +} + +// getNewOptions is just a little helper function simplify the usage of options in NewWatcher +func getNewOptions(opts ...newOpt) (r newOpts) { + r = defaultNewOpts + for _, o := range opts { + o(&r) + } + return +} + +// eventChannel creates a new Event channel based on eventbuffer value. +// A value of zero indicates an un buffered channel, values > 0 createa buffered channel. +func (o newOpts) eventChannel() chan Event { + if o.eventbuffer == 0 { + return make(chan Event) + } + return make(chan Event, o.eventbuffer) +} diff --git a/fsnotify_test.go b/fsnotify_test.go index b86d54d5..2ba54528 100644 --- a/fsnotify_test.go +++ b/fsnotify_test.go @@ -1526,6 +1526,34 @@ func TestWatchList(t *testing.T) { } } +func TestWatchBufferedChannel(t *testing.T) { + //create a watcher without a buffer + w, err := NewWatcher() + if err != nil { + t.Fatal(err) + } + //check that the channel exists + if w.Events == nil { + t.Fatal("nil channel") + } else if err = w.Close(); err != nil { + t.Fatal(err) + } + + //check with a buffered channel + if w, err = NewWatcher(WithBufferedEventChannel(42)); err != nil { + t.Fatal(err) + } + //check that the channel exists + if w.Events == nil { + t.Fatal("nil channel") + } else if cap(w.Events) != 42 { + t.Fatalf("event channel cap is wrong: %d != 42", cap(w.Events)) + } else if err = w.Close(); err != nil { + t.Fatal(err) + } + +} + func BenchmarkWatch(b *testing.B) { w, err := NewWatcher() if err != nil { From 5e99d85cbe185e91b0975944b76669f825348086 Mon Sep 17 00:00:00 2001 From: kris Date: Thu, 19 Jan 2023 09:20:28 -0700 Subject: [PATCH 2/6] missed one --- backend_kqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend_kqueue.go b/backend_kqueue.go index df91e68c..e56e18bd 100644 --- a/backend_kqueue.go +++ b/backend_kqueue.go @@ -143,7 +143,7 @@ type pathInfo struct { } // NewWatcher creates a new Watcher with an optional set of Option functions. -func NewWatcher() (*Watcher, error) { +func NewWatcher(opts ...newOpt) (*Watcher, error) { kq, closepipe, err := newKqueue() if err != nil { return nil, err From eafc1ded10717c05f2df1adafabb77486c1558fc Mon Sep 17 00:00:00 2001 From: kris Date: Sun, 5 Feb 2023 15:17:53 -0700 Subject: [PATCH 3/6] Addressing comments and moving to a new top level call rather than trying to update the existing NewWatcher --- backend_fen.go | 7 ++++- backend_inotify.go | 7 ++++- backend_kqueue.go | 7 ++++- backend_other.go | 5 ++++ backend_windows.go | 7 ++++- fsnotify_test.go | 70 ++++++++++++++++++++++++++++++++++++++++++++++ mkdoc.zsh | 5 ++++ 7 files changed, 104 insertions(+), 4 deletions(-) diff --git a/backend_fen.go b/backend_fen.go index 500bd63f..0d700fc8 100644 --- a/backend_fen.go +++ b/backend_fen.go @@ -133,8 +133,13 @@ type Watcher struct { // NewWatcher creates a new Watcher. func NewWatcher() (*Watcher, error) { + return NewBufferedWatcher(0) +} + +// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +func NewBufferedWatcher(sz uint) (*Watcher, error) { w := &Watcher{ - Events: make(chan Event), + Events: make(chan Event, sz), Errors: make(chan error), dirs: make(map[string]struct{}), watches: make(map[string]struct{}), diff --git a/backend_inotify.go b/backend_inotify.go index fe5033b1..8360a104 100644 --- a/backend_inotify.go +++ b/backend_inotify.go @@ -232,6 +232,11 @@ func (w *watches) updatePath(path string, f func(*watch) (*watch, error)) error // NewWatcher creates a new Watcher. func NewWatcher() (*Watcher, error) { + return NewBufferedWatcher(0) +} + +// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +func NewBufferedWatcher(sz uint) (*Watcher, error) { // Need to set nonblocking mode for SetDeadline to work, otherwise blocking // I/O operations won't terminate on close. fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK) @@ -243,7 +248,7 @@ func NewWatcher() (*Watcher, error) { fd: fd, inotifyFile: os.NewFile(uintptr(fd), ""), watches: newWatches(), - Events: make(chan Event), + Events: make(chan Event, sz), Errors: make(chan error), done: make(chan struct{}), doneResp: make(chan struct{}), diff --git a/backend_kqueue.go b/backend_kqueue.go index 9d464504..9d644b2e 100644 --- a/backend_kqueue.go +++ b/backend_kqueue.go @@ -144,6 +144,11 @@ type pathInfo struct { // NewWatcher creates a new Watcher. func NewWatcher() (*Watcher, error) { + return NewBufferedWatcher(0) +} + +// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +func NewBufferedWatcher(sz uint) (*Watcher, error) { kq, closepipe, err := newKqueue() if err != nil { return nil, err @@ -158,7 +163,7 @@ func NewWatcher() (*Watcher, error) { paths: make(map[int]pathInfo), fileExists: make(map[string]struct{}), userWatches: make(map[string]struct{}), - Events: make(chan Event), + Events: make(chan Event, sz), Errors: make(chan error), done: make(chan struct{}), } diff --git a/backend_other.go b/backend_other.go index 1668de90..a24856d9 100644 --- a/backend_other.go +++ b/backend_other.go @@ -122,6 +122,11 @@ func NewWatcher() (*Watcher, error) { return nil, errors.New("fsnotify not supported on the current platform") } +// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +func NewBufferedWatcher(sz uint) (*Watcher, error) { + return NewWatcher() //just re-use the original error response +} + // Close removes all watches and closes the events channel. func (w *Watcher) Close() error { return nil } diff --git a/backend_windows.go b/backend_windows.go index c0359ac8..3f4feefe 100644 --- a/backend_windows.go +++ b/backend_windows.go @@ -143,6 +143,11 @@ type Watcher struct { // NewWatcher creates a new Watcher. func NewWatcher() (*Watcher, error) { + return NewBufferedWatcher(50) // Windows backend defaults to a buffered channel of size 50 +} + +// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +func NewBufferedWatcher(sz uint) (*Watcher, error) { port, err := windows.CreateIoCompletionPort(windows.InvalidHandle, 0, 0, 0) if err != nil { return nil, os.NewSyscallError("CreateIoCompletionPort", err) @@ -151,7 +156,7 @@ func NewWatcher() (*Watcher, error) { port: port, watches: make(watchMap), input: make(chan *input, 1), - Events: make(chan Event, 50), + Events: make(chan Event, sz), Errors: make(chan error), quit: make(chan chan<- error, 1), } diff --git a/fsnotify_test.go b/fsnotify_test.go index b86d54d5..3733988a 100644 --- a/fsnotify_test.go +++ b/fsnotify_test.go @@ -1577,6 +1577,57 @@ func BenchmarkWatch(b *testing.B) { wg.Wait() } +func BenchmarkBufferedWatch(b *testing.B) { + w, err := NewBufferedWatcher(4096) + if err != nil { + b.Fatal(err) + } + + tmp := b.TempDir() + file := join(tmp, "file") + err = w.Add(tmp) + if err != nil { + b.Fatal(err) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + for { + select { + case err, ok := <-w.Errors: + if !ok { + wg.Done() + return + } + b.Error(err) + case _, ok := <-w.Events: + if !ok { + wg.Done() + return + } + } + } + }() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + fp, err := os.Create(file) + if err != nil { + b.Fatal(err) + } + err = fp.Close() + if err != nil { + b.Fatal(err) + } + } + err = w.Close() + if err != nil { + b.Fatal(err) + } + wg.Wait() +} + func BenchmarkAddRemove(b *testing.B) { w, err := NewWatcher() if err != nil { @@ -1595,3 +1646,22 @@ func BenchmarkAddRemove(b *testing.B) { } } } + +func BenchmarkBufferedAddRemove(b *testing.B) { + w, err := NewBufferedWatcher(4096) + if err != nil { + b.Fatal(err) + } + + tmp := b.TempDir() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + if err := w.Add(tmp); err != nil { + b.Fatal(err) + } + if err := w.Remove(tmp); err != nil { + b.Fatal(err) + } + } +} diff --git a/mkdoc.zsh b/mkdoc.zsh index 868d3b80..c1970501 100755 --- a/mkdoc.zsh +++ b/mkdoc.zsh @@ -72,6 +72,10 @@ EOF new=$(< Date: Sun, 5 Feb 2023 15:26:58 -0700 Subject: [PATCH 4/6] udpate comment blocks --- backend_fen.go | 9 ++++++++- backend_inotify.go | 9 ++++++++- backend_kqueue.go | 9 ++++++++- backend_other.go | 9 ++++++++- backend_windows.go | 9 ++++++++- mkdoc.zsh | 7 +++++++ 6 files changed, 47 insertions(+), 5 deletions(-) diff --git a/backend_fen.go b/backend_fen.go index 0d700fc8..1df18dcb 100644 --- a/backend_fen.go +++ b/backend_fen.go @@ -136,7 +136,14 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(0) } -// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. +// For almost all use cases an unbuffered Watcher will perform better than buffered. +// Most kernels have de-duplication logic which allows for less activity in userspace +// and generally better performance. However there may be some cases where a very +// large buffers can enable an application to keep up with mass file rotations. +// You will always be better off increasing the kernel buffers over adding a large +// userspace buffer, but if you can't control the kernel buffer then a buffered +// watcher is a reasonable option. You probably want NewWatcher. func NewBufferedWatcher(sz uint) (*Watcher, error) { w := &Watcher{ Events: make(chan Event, sz), diff --git a/backend_inotify.go b/backend_inotify.go index 8360a104..80c44f60 100644 --- a/backend_inotify.go +++ b/backend_inotify.go @@ -235,7 +235,14 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(0) } -// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. +// For almost all use cases an unbuffered Watcher will perform better than buffered. +// Most kernels have de-duplication logic which allows for less activity in userspace +// and generally better performance. However there may be some cases where a very +// large buffers can enable an application to keep up with mass file rotations. +// You will always be better off increasing the kernel buffers over adding a large +// userspace buffer, but if you can't control the kernel buffer then a buffered +// watcher is a reasonable option. You probably want NewWatcher. func NewBufferedWatcher(sz uint) (*Watcher, error) { // Need to set nonblocking mode for SetDeadline to work, otherwise blocking // I/O operations won't terminate on close. diff --git a/backend_kqueue.go b/backend_kqueue.go index 9d644b2e..d3b148ff 100644 --- a/backend_kqueue.go +++ b/backend_kqueue.go @@ -147,7 +147,14 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(0) } -// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. +// For almost all use cases an unbuffered Watcher will perform better than buffered. +// Most kernels have de-duplication logic which allows for less activity in userspace +// and generally better performance. However there may be some cases where a very +// large buffers can enable an application to keep up with mass file rotations. +// You will always be better off increasing the kernel buffers over adding a large +// userspace buffer, but if you can't control the kernel buffer then a buffered +// watcher is a reasonable option. You probably want NewWatcher. func NewBufferedWatcher(sz uint) (*Watcher, error) { kq, closepipe, err := newKqueue() if err != nil { diff --git a/backend_other.go b/backend_other.go index a24856d9..60b124d2 100644 --- a/backend_other.go +++ b/backend_other.go @@ -122,7 +122,14 @@ func NewWatcher() (*Watcher, error) { return nil, errors.New("fsnotify not supported on the current platform") } -// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. +// For almost all use cases an unbuffered Watcher will perform better than buffered. +// Most kernels have de-duplication logic which allows for less activity in userspace +// and generally better performance. However there may be some cases where a very +// large buffers can enable an application to keep up with mass file rotations. +// You will always be better off increasing the kernel buffers over adding a large +// userspace buffer, but if you can't control the kernel buffer then a buffered +// watcher is a reasonable option. You probably want NewWatcher. func NewBufferedWatcher(sz uint) (*Watcher, error) { return NewWatcher() //just re-use the original error response } diff --git a/backend_windows.go b/backend_windows.go index 3f4feefe..733ab09a 100644 --- a/backend_windows.go +++ b/backend_windows.go @@ -146,7 +146,14 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(50) // Windows backend defaults to a buffered channel of size 50 } -// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. +// For almost all use cases an unbuffered Watcher will perform better than buffered. +// Most kernels have de-duplication logic which allows for less activity in userspace +// and generally better performance. However there may be some cases where a very +// large buffers can enable an application to keep up with mass file rotations. +// You will always be better off increasing the kernel buffers over adding a large +// userspace buffer, but if you can't control the kernel buffer then a buffered +// watcher is a reasonable option. You probably want NewWatcher. func NewBufferedWatcher(sz uint) (*Watcher, error) { port, err := windows.CreateIoCompletionPort(windows.InvalidHandle, 0, 0, 0) if err != nil { diff --git a/mkdoc.zsh b/mkdoc.zsh index c1970501..f4956525 100755 --- a/mkdoc.zsh +++ b/mkdoc.zsh @@ -75,6 +75,13 @@ EOF newbuffered=$(< Date: Wed, 12 Jul 2023 19:32:11 +0100 Subject: [PATCH 5/6] Test buffered watcher --- .circleci/config.yml | 8 ++++--- .cirrus.yml | 2 +- .github/workflows/test.yml | 18 +++++++++------- backend_fen.go | 17 ++++++++------- backend_inotify.go | 17 ++++++++------- backend_kqueue.go | 17 ++++++++------- backend_other.go | 17 ++++++++------- backend_windows.go | 17 ++++++++------- fsnotify_test.go | 43 +++++++++++++++++++++++++++++++------- helpers_test.go | 9 ++++++++ mkdoc.zsh | 18 +++++++++------- 11 files changed, 116 insertions(+), 67 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index dac3af46..4d38e420 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -22,8 +22,10 @@ jobs: name: test command: | uname -a + sysctl fs.inotify.max_user_watches fs.inotify.max_user_instances + ulimit -a go version - go test -race ./... + go test -race -parallel 1 ./... # iOS ios: @@ -48,7 +50,7 @@ jobs: export PATH=$PATH:/usr/local/Cellar/go/*/bin uname -a go version - go test -race ./... + go test -race -parallel 1 ./... # This is just Linux x86_64; also need to get a Go with GOOS=android, but # there aren't any pre-built versions of that on the Go site. Idk, disable for @@ -76,5 +78,5 @@ jobs: # uname -a # export PATH=/usr/local/go/bin:$PATH # go version - # go test -race ./... + # go test -race -parallel 1 ./... # diff --git a/.cirrus.yml b/.cirrus.yml index 89cd0225..0774c6b4 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -9,4 +9,4 @@ freebsd_task: # run tests as user "cirrus" instead of root - pw useradd cirrus -m - chown -R cirrus:cirrus . - - sudo -u cirrus go test -race ./... + - sudo -u cirrus go test -race -parallel 1 ./... diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index aeb6b5f2..8fb10945 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -29,7 +29,7 @@ jobs: - name: test run: | - go test -race ./... + go test -race -parallel 1 ./... # Test gccgo testgcc: @@ -42,7 +42,7 @@ jobs: - name: test run: | sudo apt-get -y install gccgo-12 - go-12 test ./... + go-12 test -parallel 1 ./... # Test only the latest Go version on macOS; we use the macOS builders for BSD # and illumos, and GitHub doesn't allow many of them to run concurrently. If @@ -70,7 +70,7 @@ jobs: - name: test run: | - go test -race ./... + go test -race -parallel 1 ./... # OpenBSD; no -race as the VM doesn't include the comp set. # @@ -91,7 +91,7 @@ jobs: prepare: pkg_add go run: | useradd -mG wheel action - su action -c 'go test ./...' + su action -c 'go test -parallel 1 ./...' # NetBSD testNetBSD: @@ -107,7 +107,7 @@ jobs: # TODO: no -race for the same reason as OpenBSD (the timing; it does run). run: | useradd -mG wheel action - su action -c 'go120 test ./...' + su action -c 'go120 test -parallel 1 ./...' # illumos testillumos: @@ -125,7 +125,7 @@ jobs: useradd action export GOCACHE=/tmp/go-cache export GOPATH=/tmp/go-path - su action -c '/opt/ooce/go-1.19/bin/go test ./...' + su action -c '/opt/ooce/go-1.19/bin/go test -parallel 1 ./...' # Older Debian 6, for old Linux kernels. testDebian6: @@ -159,6 +159,10 @@ jobs: go test -c -o ${p//\//-}.test $p done vagrant up + + vagrant ssh -c 'uname -a' + vagrant ssh -c 'tail /proc/sys/fs/inotify/max_user_watches /proc/sys/fs/inotify/max_user_instances' + vagrant ssh -c 'ulimit -a' for t in *.test; do - vagrant ssh -c "/vagrant/$t" + vagrant ssh -c "/vagrant/$t -test.parallel 1" done diff --git a/backend_fen.go b/backend_fen.go index 1df18dcb..2e771081 100644 --- a/backend_fen.go +++ b/backend_fen.go @@ -136,14 +136,15 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(0) } -// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. -// For almost all use cases an unbuffered Watcher will perform better than buffered. -// Most kernels have de-duplication logic which allows for less activity in userspace -// and generally better performance. However there may be some cases where a very -// large buffers can enable an application to keep up with mass file rotations. -// You will always be better off increasing the kernel buffers over adding a large -// userspace buffer, but if you can't control the kernel buffer then a buffered -// watcher is a reasonable option. You probably want NewWatcher. +// NewBufferedWatcher creates a new Watcher with a buffered event channel. +// +// For almost all use cases an unbuffered Watcher will perform better; most +// kernels have de-duplication logic, which means less activity in userspace and +// generally better performance. However there may be some cases where a very +// large buffer can enable an application to keep up with a very large number of +// events. You will always be better off increasing the kernel buffers over +// adding a large userspace buffer, but if you can't control the kernel buffer +// then a buffered watcher is a reasonable option. func NewBufferedWatcher(sz uint) (*Watcher, error) { w := &Watcher{ Events: make(chan Event, sz), diff --git a/backend_inotify.go b/backend_inotify.go index 80c44f60..2dd6057d 100644 --- a/backend_inotify.go +++ b/backend_inotify.go @@ -235,14 +235,15 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(0) } -// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. -// For almost all use cases an unbuffered Watcher will perform better than buffered. -// Most kernels have de-duplication logic which allows for less activity in userspace -// and generally better performance. However there may be some cases where a very -// large buffers can enable an application to keep up with mass file rotations. -// You will always be better off increasing the kernel buffers over adding a large -// userspace buffer, but if you can't control the kernel buffer then a buffered -// watcher is a reasonable option. You probably want NewWatcher. +// NewBufferedWatcher creates a new Watcher with a buffered event channel. +// +// For almost all use cases an unbuffered Watcher will perform better; most +// kernels have de-duplication logic, which means less activity in userspace and +// generally better performance. However there may be some cases where a very +// large buffer can enable an application to keep up with a very large number of +// events. You will always be better off increasing the kernel buffers over +// adding a large userspace buffer, but if you can't control the kernel buffer +// then a buffered watcher is a reasonable option. func NewBufferedWatcher(sz uint) (*Watcher, error) { // Need to set nonblocking mode for SetDeadline to work, otherwise blocking // I/O operations won't terminate on close. diff --git a/backend_kqueue.go b/backend_kqueue.go index 7d81b28f..dc0ae57e 100644 --- a/backend_kqueue.go +++ b/backend_kqueue.go @@ -147,14 +147,15 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(0) } -// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. -// For almost all use cases an unbuffered Watcher will perform better than buffered. -// Most kernels have de-duplication logic which allows for less activity in userspace -// and generally better performance. However there may be some cases where a very -// large buffers can enable an application to keep up with mass file rotations. -// You will always be better off increasing the kernel buffers over adding a large -// userspace buffer, but if you can't control the kernel buffer then a buffered -// watcher is a reasonable option. You probably want NewWatcher. +// NewBufferedWatcher creates a new Watcher with a buffered event channel. +// +// For almost all use cases an unbuffered Watcher will perform better; most +// kernels have de-duplication logic, which means less activity in userspace and +// generally better performance. However there may be some cases where a very +// large buffer can enable an application to keep up with a very large number of +// events. You will always be better off increasing the kernel buffers over +// adding a large userspace buffer, but if you can't control the kernel buffer +// then a buffered watcher is a reasonable option. func NewBufferedWatcher(sz uint) (*Watcher, error) { kq, closepipe, err := newKqueue() if err != nil { diff --git a/backend_other.go b/backend_other.go index 60b124d2..e9ad2179 100644 --- a/backend_other.go +++ b/backend_other.go @@ -122,14 +122,15 @@ func NewWatcher() (*Watcher, error) { return nil, errors.New("fsnotify not supported on the current platform") } -// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. -// For almost all use cases an unbuffered Watcher will perform better than buffered. -// Most kernels have de-duplication logic which allows for less activity in userspace -// and generally better performance. However there may be some cases where a very -// large buffers can enable an application to keep up with mass file rotations. -// You will always be better off increasing the kernel buffers over adding a large -// userspace buffer, but if you can't control the kernel buffer then a buffered -// watcher is a reasonable option. You probably want NewWatcher. +// NewBufferedWatcher creates a new Watcher with a buffered event channel. +// +// For almost all use cases an unbuffered Watcher will perform better; most +// kernels have de-duplication logic, which means less activity in userspace and +// generally better performance. However there may be some cases where a very +// large buffer can enable an application to keep up with a very large number of +// events. You will always be better off increasing the kernel buffers over +// adding a large userspace buffer, but if you can't control the kernel buffer +// then a buffered watcher is a reasonable option. func NewBufferedWatcher(sz uint) (*Watcher, error) { return NewWatcher() //just re-use the original error response } diff --git a/backend_windows.go b/backend_windows.go index 733ab09a..c2c2cc9f 100644 --- a/backend_windows.go +++ b/backend_windows.go @@ -146,14 +146,15 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(50) // Windows backend defaults to a buffered channel of size 50 } -// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. -// For almost all use cases an unbuffered Watcher will perform better than buffered. -// Most kernels have de-duplication logic which allows for less activity in userspace -// and generally better performance. However there may be some cases where a very -// large buffers can enable an application to keep up with mass file rotations. -// You will always be better off increasing the kernel buffers over adding a large -// userspace buffer, but if you can't control the kernel buffer then a buffered -// watcher is a reasonable option. You probably want NewWatcher. +// NewBufferedWatcher creates a new Watcher with a buffered event channel. +// +// For almost all use cases an unbuffered Watcher will perform better; most +// kernels have de-duplication logic, which means less activity in userspace and +// generally better performance. However there may be some cases where a very +// large buffer can enable an application to keep up with a very large number of +// events. You will always be better off increasing the kernel buffers over +// adding a large userspace buffer, but if you can't control the kernel buffer +// then a buffered watcher is a reasonable option. func NewBufferedWatcher(sz uint) (*Watcher, error) { port, err := windows.CreateIoCompletionPort(windows.InvalidHandle, 0, 0, 0) if err != nil { diff --git a/fsnotify_test.go b/fsnotify_test.go index 6173c7e7..7dad8673 100644 --- a/fsnotify_test.go +++ b/fsnotify_test.go @@ -28,6 +28,25 @@ func init() { internal.SetRlimit() } +// Quick but somewhat ugly way to run tests against both the standard fsnotify +// and the buffered one. Should rewrite the tests to run more than once, but +// effort... +func TestMain(m *testing.M) { + c1 := m.Run() + os.Setenv("FSNOTIFY_BUFFER", "1") + c2 := m.Run() + os.Setenv("FSNOTIFY_BUFFER", "1024") + c3 := m.Run() + + if c1 != 0 { + os.Exit(c1) + } + if c2 != 0 { + os.Exit(c2) + } + os.Exit(c3) +} + func TestWatch(t *testing.T) { tests := []testCase{ {"multiple creates", func(t *testing.T, w *Watcher, tmp string) { @@ -948,20 +967,27 @@ func TestClose(t *testing.T) { time.Sleep(50 * time.Millisecond) } - select { - default: - t.Fatal("blocking on Events") - case _, ok := <-w.Events: - if ok { - t.Fatal("Events not closed") + tim := time.NewTimer(50 * time.Millisecond) + loop: + for { + select { + default: + t.Fatal("blocking on Events") + case <-tim.C: + t.Fatalf("Events not closed") + case _, ok := <-w.Events: + if !ok { + break loop + } } } + select { default: t.Fatal("blocking on Errors") - case _, ok := <-w.Errors: + case err, ok := <-w.Errors: if ok { - t.Fatal("Errors not closed") + t.Fatalf("Errors not closed; read:\n\t%s", err) } } } @@ -1001,6 +1027,7 @@ func TestClose(t *testing.T) { touch(t, tmp, "file") rm(t, tmp, "file") + eventSeparator() if err := w.Close(); err != nil { t.Fatal(err) } diff --git a/helpers_test.go b/helpers_test.go index 2051a94f..f8e53da6 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -7,6 +7,7 @@ import ( "path/filepath" "runtime" "sort" + "strconv" "strings" "sync" "testing" @@ -46,6 +47,14 @@ func waitForEvents() { time.Sleep(500 * time.Millisecond) } func newWatcher(t *testing.T, add ...string) *Watcher { t.Helper() w, err := NewWatcher() + if e, ok := os.LookupEnv("FSNOTIFY_BUFFER"); ok { + t.Logf("using FSNOTIFY_BUFFER=%v", e) + n, err2 := strconv.Atoi(e) + if err2 != nil { + t.Fatalf("FSNOTIFY_BUFFER: %v", err2) + } + w, err = NewBufferedWatcher(uint(n)) + } if err != nil { t.Fatalf("newWatcher: %s", err) } diff --git a/mkdoc.zsh b/mkdoc.zsh index f4956525..6f3c0dca 100755 --- a/mkdoc.zsh +++ b/mkdoc.zsh @@ -72,16 +72,18 @@ EOF new=$(< Date: Thu, 19 Oct 2023 03:06:46 +0100 Subject: [PATCH 6/6] Fix tests --- .circleci/config.yml | 11 +- .cirrus.yml | 3 +- .github/workflows/test.yml | 132 ++++++++++--------- CHANGELOG.md | 14 +- backend_fen.go | 22 ++-- backend_inotify.go | 22 ++-- backend_kqueue.go | 22 ++-- backend_other.go | 28 ++-- backend_windows.go | 24 ++-- fsnotify.go | 7 +- fsnotify_test.go | 259 +++++++++++++++++-------------------- helpers_test.go | 34 +++-- mkdoc.zsh | 24 ++-- 13 files changed, 303 insertions(+), 299 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index fa8399ee..beb2cd46 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -22,10 +22,9 @@ jobs: name: test command: | uname -a - sysctl fs.inotify.max_user_watches fs.inotify.max_user_instances - ulimit -a go version - go test -parallel 1 -race ./... + FSNOTIFY_BUFFER=4096 go test -parallel 1 -race ./... + go test -parallel 1 -race ./... # iOS ios: @@ -50,7 +49,8 @@ jobs: export PATH=$PATH:/usr/local/Cellar/go/*/bin uname -a go version - go test -parallel 1 -race ./... + FSNOTIFY_BUFFER=4096 go test -parallel 1 -race ./... + go test -parallel 1 -race ./... # This is just Linux x86_64; also need to get a Go with GOOS=android, but # there aren't any pre-built versions of that on the Go site. Idk, disable for @@ -78,5 +78,6 @@ jobs: # uname -a # export PATH=/usr/local/go/bin:$PATH # go version - # go test -parallel 1 -race ./... + # FSNOTIFY_BUFFER=4096 go test -parallel 1 -race ./... + # go test -parallel 1 -race ./... # diff --git a/.cirrus.yml b/.cirrus.yml index d72f6e8b..ffc7b992 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -9,4 +9,5 @@ freebsd_task: # run tests as user "cirrus" instead of root - pw useradd cirrus -m - chown -R cirrus:cirrus . - - sudo -u cirrus go test -parallel 1 -race ./... + - FSNOTIFY_BUFFER=4096 sudo --preserve-env=FSNOTIFY_BUFFER -u cirrus go test -parallel 1 -race ./... + - sudo --preserve-env=FSNOTIFY_BUFFER -u cirrus go test -parallel 1 -race ./... diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 28ade379..0397e4e2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,71 +6,74 @@ on: branches: ['main', 'aix'] jobs: - # Test Windows and Linux with the latest Go version and the oldest we support. - test: + linux: strategy: fail-fast: false matrix: - os: - - ubuntu-latest - - windows-latest - go: - - '1.17' - - '1.21' + os: ['ubuntu-latest'] + go: ['1.17', '1.21'] runs-on: ${{ matrix.os }} steps: - - name: checkout - uses: actions/checkout@v3 - - - name: setup Go - uses: actions/setup-go@v4 + - uses: 'actions/checkout@v3' + - uses: 'actions/setup-go@v4' with: go-version: ${{ matrix.go }} + - name: test + run: | + FSNOTIFY_BUFFER=4096 go test -parallel 1 -race ./... + go test -parallel 1 -race ./... + windows: + strategy: + fail-fast: false + matrix: + os: ['windows-latest'] + go: ['1.17', '1.21'] + runs-on: ${{ matrix.os }} + steps: + - uses: 'actions/checkout@v3' + - uses: 'actions/setup-go@v4' + with: + go-version: ${{ matrix.go }} - name: test run: | go test -parallel 1 -race ./... + set "FSNOTIFY_BUFFER=4096" + go test -parallel 1 -race ./... # Test gccgo - testgcc: - runs-on: ubuntu-22.04 - name: test (ubuntu-22.04, gccgo 12.1) + gcc: + runs-on: 'ubuntu-22.04' + name: 'test (ubuntu-22.04, gccgo 12.1)' steps: - - name: checkout - uses: actions/checkout@v3 - + - uses: 'actions/checkout@v3' - name: test run: | sudo apt-get -y install gccgo-12 - go-12 test -parallel 1 ./... + FSNOTIFY_BUFFER=4096 go-12 test -parallel 1 ./... + go-12 test -parallel 1 ./... # Test only the latest Go version on macOS; we use the macOS builders for BSD # and illumos, and GitHub doesn't allow many of them to run concurrently. If # it works on Windows and Linux with Go 1.17, then it probably does on macOS # too. - testMacOS: + macos: name: test strategy: fail-fast: false matrix: - os: - - macos-11 - - macos-13 - go: - - '1.21' + os: ['macos-11', 'macos-13'] + go: ['1.21'] runs-on: ${{ matrix.os }} steps: - - name: checkout - uses: actions/checkout@v3 - - - name: setup Go - uses: actions/setup-go@v4 + - uses: 'actions/checkout@v3' + - uses: 'actions/setup-go@v4' with: go-version: ${{ matrix.go }} - - name: test run: | - go test -parallel 1 -race ./... + FSNOTIFY_BUFFER=4096 go test -parallel 1 -race ./... + go test -parallel 1 -race ./... # OpenBSD; no -race as the VM doesn't include the comp set. # @@ -79,45 +82,50 @@ jobs: # so should probably look into that first. Go 1.19 is supposed to have a # much faster race detector, so maybe waiting until we have that is # enough. - testOpenBSD: - runs-on: macos-12 - name: test (openbsd, 1.17) + openbsd: + runs-on: 'macos-12' + timeout-minutes: 30 + name: 'test (openbsd, 1.17)' steps: - - uses: actions/checkout@v3 - - name: test (openbsd, 1.17) - id: test - uses: vmactions/openbsd-vm@v0 + - uses: 'actions/checkout@v3' + - name: 'test (openbsd, 1.17)' + id: 'openbsd' + uses: 'vmactions/openbsd-vm@v0' with: prepare: pkg_add go run: | useradd -mG wheel action - su action -c 'go test -parallel 1 ./...' + FSNOTIFY_BUFFER=4096 su action -c 'go test -parallel 1 ./...' + su action -c 'go test -parallel 1 ./...' # NetBSD - testNetBSD: + netbsd: runs-on: macos-12 + timeout-minutes: 30 name: test (netbsd, 1.20) steps: - - uses: actions/checkout@v3 - - name: test (netbsd, 1.20) - id: test - uses: vmactions/netbsd-vm@v0 + - uses: 'actions/checkout@v3' + - name: 'test (netbsd, 1.20)' + id: 'netbsd' + uses: 'vmactions/netbsd-vm@v0' with: prepare: pkg_add go # TODO: no -race for the same reason as OpenBSD (the timing; it does run). run: | useradd -mG wheel action - su action -c 'go120 test -parallel 1 ./...' + FSNOTIFY_BUFFER=4096 su action -c 'go120 test -parallel 1 ./...' + su action -c 'go120 test -parallel 1 ./...' # illumos - testillumos: + illumos: runs-on: macos-12 + timeout-minutes: 30 name: test (illumos, 1.19) steps: - - uses: actions/checkout@v3 - - name: test (illumos, 1.19) - id: test - uses: papertigers/illumos-vm@r38 + - uses: 'actions/checkout@v3' + - name: 'test (illumos, 1.19)' + id: 'illumos' + uses: 'papertigers/illumos-vm@r38' with: prepare: | pkg install go-119 @@ -125,11 +133,13 @@ jobs: useradd action export GOCACHE=/tmp/go-cache export GOPATH=/tmp/go-path - su action -c '/opt/ooce/go-1.19/bin/go test -parallel 1 ./...' + FSNOTIFY_BUFFER=4096 su action -c '/opt/ooce/go-1.19/bin/go test -parallel 1 ./...' + su action -c '/opt/ooce/go-1.19/bin/go test -parallel 1 ./...' # Older Debian 6, for old Linux kernels. - testDebian6: + debian6: runs-on: macos-12 + timeout-minutes: 30 name: test (debian6, 1.19) strategy: fail-fast: false @@ -149,20 +159,18 @@ jobs: with: go-version: '1.19' - - name: test (debian6, 1.19) - id: test + - name: 'test (debian6, 1.19)' + id: 'debian6' run: | cp -f .github/workflows/Vagrantfile.debian6 Vagrantfile export GOOS=linux export GOARCH=amd64 for p in $(go list ./...); do - go test -c -o ${p//\//-}.test $p + FSNOTIFY_BUFFER=4096 go test -c -o ${p//\//-}.test $p + go test -c -o ${p//\//-}.test $p done vagrant up - - vagrant ssh -c 'uname -a' - vagrant ssh -c 'tail /proc/sys/fs/inotify/max_user_watches /proc/sys/fs/inotify/max_user_instances' - vagrant ssh -c 'ulimit -a' for t in *.test; do - vagrant ssh -c "/vagrant/$t -test.parallel 1" + FSNOTIFY_BUFFER=4096 vagrant ssh -c "/vagrant/$t -test.parallel 1" + vagrant ssh -c "/vagrant/$t -test.parallel 1" done diff --git a/CHANGELOG.md b/CHANGELOG.md index e82581d7..1b67521d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,12 +8,17 @@ This version of fsnotify needs Go 1.17. - illumos: add FEN backend to support illumos and Solaris. ([#371]) +- all: add `NewBufferedWatcher()` to use a buffered channel, which can be useful + in cases where you can't control the kernel buffer and receive a large number + of events in bursts. ([#550], [#572]) + - all: add `AddWith()`, which is identical to `Add()` but allows passing options. ([#521]) -- windows: allow setting the buffer size with `fsnotify.WithBufferSize()`; the - default of 64K is the highest value that works on all platforms and is enough - for most purposes, but in some cases a highest buffer is needed. ([#521]) +- windows: allow setting the ReadDirectoryChangesW() buffer size with + `fsnotify.WithBufferSize()`; the default of 64K is the highest value that + works on all platforms and is enough for most purposes, but in some cases a + highest buffer is needed. ([#521]) ### Changes and fixes @@ -57,7 +62,6 @@ This version of fsnotify needs Go 1.17. Google AppEngine forbids usage of the unsafe package so the inotify backend won't compile there. - [#371]: https://github.com/fsnotify/fsnotify/pull/371 [#516]: https://github.com/fsnotify/fsnotify/pull/516 [#518]: https://github.com/fsnotify/fsnotify/pull/518 @@ -67,6 +71,8 @@ This version of fsnotify needs Go 1.17. [#526]: https://github.com/fsnotify/fsnotify/pull/526 [#528]: https://github.com/fsnotify/fsnotify/pull/528 [#537]: https://github.com/fsnotify/fsnotify/pull/537 +[#550]: https://github.com/fsnotify/fsnotify/pull/550 +[#572]: https://github.com/fsnotify/fsnotify/pull/572 1.6.0 - 2022-10-13 ------------------- diff --git a/backend_fen.go b/backend_fen.go index c33811f3..d0daea3d 100644 --- a/backend_fen.go +++ b/backend_fen.go @@ -77,10 +77,10 @@ import ( // Sometimes it will send events for all times, sometimes it will send no // events, and often only for some files. // -// The default buffer size is 64K, which is the largest value that is guaranteed -// to work with SMB filesystems. If you have many events in quick succession -// this may not be enough, and you will have to use [WithBufferSize] to increase -// the value. +// The default ReadDirectoryChangesW() buffer size is 64K, which is the largest +// value that is guaranteed to work with SMB filesystems. If you have many +// events in quick succession this may not be enough, and you will have to use +// [WithBufferSize] to increase the value. type Watcher struct { // Events sends the filesystem change events. // @@ -142,15 +142,13 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(0) } -// NewBufferedWatcher creates a new Watcher with a buffered event channel. +// NewBufferedWatcher creates a new Watcher with a buffered [Events] channel. // -// For almost all use cases an unbuffered Watcher will perform better; most -// kernels have de-duplication logic, which means less activity in userspace and -// generally better performance. However there may be some cases where a very -// large buffer can enable an application to keep up with a very large number of -// events. You will always be better off increasing the kernel buffers over -// adding a large userspace buffer, but if you can't control the kernel buffer -// then a buffered watcher is a reasonable option. +// The main use-case for this is situations with a very large number of events +// where the kernel buffer size can't be increased (e.g. due to lack of +// permissions). An unbuffered Watcher will perform better for almost all use +// cases, and whenever possible you will be better off increasing the kernel +// buffers instead of adding a large userspace buffer. func NewBufferedWatcher(sz uint) (*Watcher, error) { w := &Watcher{ Events: make(chan Event, sz), diff --git a/backend_inotify.go b/backend_inotify.go index a35398d6..3813e5f7 100644 --- a/backend_inotify.go +++ b/backend_inotify.go @@ -80,10 +80,10 @@ import ( // Sometimes it will send events for all times, sometimes it will send no // events, and often only for some files. // -// The default buffer size is 64K, which is the largest value that is guaranteed -// to work with SMB filesystems. If you have many events in quick succession -// this may not be enough, and you will have to use [WithBufferSize] to increase -// the value. +// The default ReadDirectoryChangesW() buffer size is 64K, which is the largest +// value that is guaranteed to work with SMB filesystems. If you have many +// events in quick succession this may not be enough, and you will have to use +// [WithBufferSize] to increase the value. type Watcher struct { // Events sends the filesystem change events. // @@ -241,15 +241,13 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(0) } -// NewBufferedWatcher creates a new Watcher with a buffered event channel. +// NewBufferedWatcher creates a new Watcher with a buffered [Events] channel. // -// For almost all use cases an unbuffered Watcher will perform better; most -// kernels have de-duplication logic, which means less activity in userspace and -// generally better performance. However there may be some cases where a very -// large buffer can enable an application to keep up with a very large number of -// events. You will always be better off increasing the kernel buffers over -// adding a large userspace buffer, but if you can't control the kernel buffer -// then a buffered watcher is a reasonable option. +// The main use-case for this is situations with a very large number of events +// where the kernel buffer size can't be increased (e.g. due to lack of +// permissions). An unbuffered Watcher will perform better for almost all use +// cases, and whenever possible you will be better off increasing the kernel +// buffers instead of adding a large userspace buffer. func NewBufferedWatcher(sz uint) (*Watcher, error) { // Need to set nonblocking mode for SetDeadline to work, otherwise blocking // I/O operations won't terminate on close. diff --git a/backend_kqueue.go b/backend_kqueue.go index d1c07cbe..185626c4 100644 --- a/backend_kqueue.go +++ b/backend_kqueue.go @@ -77,10 +77,10 @@ import ( // Sometimes it will send events for all times, sometimes it will send no // events, and often only for some files. // -// The default buffer size is 64K, which is the largest value that is guaranteed -// to work with SMB filesystems. If you have many events in quick succession -// this may not be enough, and you will have to use [WithBufferSize] to increase -// the value. +// The default ReadDirectoryChangesW() buffer size is 64K, which is the largest +// value that is guaranteed to work with SMB filesystems. If you have many +// events in quick succession this may not be enough, and you will have to use +// [WithBufferSize] to increase the value. type Watcher struct { // Events sends the filesystem change events. // @@ -153,15 +153,13 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(0) } -// NewBufferedWatcher creates a new Watcher with a buffered event channel. +// NewBufferedWatcher creates a new Watcher with a buffered [Events] channel. // -// For almost all use cases an unbuffered Watcher will perform better; most -// kernels have de-duplication logic, which means less activity in userspace and -// generally better performance. However there may be some cases where a very -// large buffer can enable an application to keep up with a very large number of -// events. You will always be better off increasing the kernel buffers over -// adding a large userspace buffer, but if you can't control the kernel buffer -// then a buffered watcher is a reasonable option. +// The main use-case for this is situations with a very large number of events +// where the kernel buffer size can't be increased (e.g. due to lack of +// permissions). An unbuffered Watcher will perform better for almost all use +// cases, and whenever possible you will be better off increasing the kernel +// buffers instead of adding a large userspace buffer. func NewBufferedWatcher(sz uint) (*Watcher, error) { kq, closepipe, err := newKqueue() if err != nil { diff --git a/backend_other.go b/backend_other.go index 967309fb..01d617eb 100644 --- a/backend_other.go +++ b/backend_other.go @@ -69,10 +69,10 @@ import "errors" // Sometimes it will send events for all times, sometimes it will send no // events, and often only for some files. // -// The default buffer size is 64K, which is the largest value that is guaranteed -// to work with SMB filesystems. If you have many events in quick succession -// this may not be enough, and you will have to use [WithBufferSize] to increase -// the value. +// The default ReadDirectoryChangesW() buffer size is 64K, which is the largest +// value that is guaranteed to work with SMB filesystems. If you have many +// events in quick succession this may not be enough, and you will have to use +// [WithBufferSize] to increase the value. type Watcher struct { // Events sends the filesystem change events. // @@ -128,18 +128,14 @@ func NewWatcher() (*Watcher, error) { return nil, errors.New("fsnotify not supported on the current platform") } -// NewBufferedWatcher creates a new Watcher with a buffered event channel. -// -// For almost all use cases an unbuffered Watcher will perform better; most -// kernels have de-duplication logic, which means less activity in userspace and -// generally better performance. However there may be some cases where a very -// large buffer can enable an application to keep up with a very large number of -// events. You will always be better off increasing the kernel buffers over -// adding a large userspace buffer, but if you can't control the kernel buffer -// then a buffered watcher is a reasonable option. -func NewBufferedWatcher(sz uint) (*Watcher, error) { - return NewWatcher() //just re-use the original error response -} +// NewBufferedWatcher creates a new Watcher with a buffered [Events] channel. +// +// The main use-case for this is situations with a very large number of events +// where the kernel buffer size can't be increased (e.g. due to lack of +// permissions). An unbuffered Watcher will perform better for almost all use +// cases, and whenever possible you will be better off increasing the kernel +// buffers instead of adding a large userspace buffer. +func NewBufferedWatcher(sz uint) (*Watcher, error) { return NewWatcher() } // Close removes all watches and closes the events channel. func (w *Watcher) Close() error { return nil } diff --git a/backend_windows.go b/backend_windows.go index 27c64dc4..09740612 100644 --- a/backend_windows.go +++ b/backend_windows.go @@ -85,10 +85,10 @@ import ( // Sometimes it will send events for all times, sometimes it will send no // events, and often only for some files. // -// The default buffer size is 64K, which is the largest value that is guaranteed -// to work with SMB filesystems. If you have many events in quick succession -// this may not be enough, and you will have to use [WithBufferSize] to increase -// the value. +// The default ReadDirectoryChangesW() buffer size is 64K, which is the largest +// value that is guaranteed to work with SMB filesystems. If you have many +// events in quick succession this may not be enough, and you will have to use +// [WithBufferSize] to increase the value. type Watcher struct { // Events sends the filesystem change events. // @@ -149,18 +149,16 @@ type Watcher struct { // NewWatcher creates a new Watcher. func NewWatcher() (*Watcher, error) { - return NewBufferedWatcher(50) // Windows backend defaults to a buffered channel of size 50 + return NewBufferedWatcher(50) } -// NewBufferedWatcher creates a new Watcher with a buffered event channel. +// NewBufferedWatcher creates a new Watcher with a buffered [Events] channel. // -// For almost all use cases an unbuffered Watcher will perform better; most -// kernels have de-duplication logic, which means less activity in userspace and -// generally better performance. However there may be some cases where a very -// large buffer can enable an application to keep up with a very large number of -// events. You will always be better off increasing the kernel buffers over -// adding a large userspace buffer, but if you can't control the kernel buffer -// then a buffered watcher is a reasonable option. +// The main use-case for this is situations with a very large number of events +// where the kernel buffer size can't be increased (e.g. due to lack of +// permissions). An unbuffered Watcher will perform better for almost all use +// cases, and whenever possible you will be better off increasing the kernel +// buffers instead of adding a large userspace buffer. func NewBufferedWatcher(sz uint) (*Watcher, error) { port, err := windows.CreateIoCompletionPort(windows.InvalidHandle, 0, 0, 0) if err != nil { diff --git a/fsnotify.go b/fsnotify.go index 31b9226a..24c99cc4 100644 --- a/fsnotify.go +++ b/fsnotify.go @@ -122,13 +122,16 @@ func getOptions(opts ...addOpt) withOpts { return with } -// WithBufferSize sets the buffer size for the Windows backend. This is a no-op -// for other backends. +// WithBufferSize sets the [ReadDirectoryChangesW] buffer size. +// +// This only has effect on Windows systems, and is a no-op for other backends. // // The default value is 64K (65536 bytes) which is the highest value that works // on all filesystems and should be enough for most applications, but if you // have a large burst of events it may not be enough. You can increase it if // you're hitting "queue or buffer overflow" errors ([ErrEventOverflow]). +// +// [ReadDirectoryChangesW]: https://learn.microsoft.com/en-gb/windows/win32/api/winbase/nf-winbase-readdirectorychangesw func WithBufferSize(bytes int) addOpt { return func(opt *withOpts) { opt.bufsize = bytes } } diff --git a/fsnotify_test.go b/fsnotify_test.go index ce6e1d1b..f6e33d88 100644 --- a/fsnotify_test.go +++ b/fsnotify_test.go @@ -28,25 +28,6 @@ func init() { internal.SetRlimit() } -// Quick but somewhat ugly way to run tests against both the standard fsnotify -// and the buffered one. Should rewrite the tests to run more than once, but -// effort... -func TestMain(m *testing.M) { - c1 := m.Run() - os.Setenv("FSNOTIFY_BUFFER", "1") - c2 := m.Run() - os.Setenv("FSNOTIFY_BUFFER", "4") - c3 := m.Run() - - if c1 != 0 { - os.Exit(c1) - } - if c2 != 0 { - os.Exit(c2) - } - os.Exit(c3) -} - func TestWatch(t *testing.T) { tests := []testCase{ {"multiple creates", func(t *testing.T, w *Watcher, tmp string) { @@ -1089,21 +1070,40 @@ func TestClose(t *testing.T) { // a good reproducible test for this, but running it 150 times seems to // reproduce it in ~75% of cases and isn't too slow (~0.06s on my system). t.Run("double close", func(t *testing.T) { - t.Parallel() - - for i := 0; i < 150; i++ { - w, err := NewWatcher() - if err != nil { - if strings.Contains(err.Error(), "too many") { // syscall.EMFILE - time.Sleep(100 * time.Millisecond) - continue + t.Run("default", func(t *testing.T) { + t.Parallel() + + for i := 0; i < 150; i++ { + w, err := NewWatcher() + if err != nil { + if strings.Contains(err.Error(), "too many") { // syscall.EMFILE + time.Sleep(100 * time.Millisecond) + continue + } + t.Fatal(err) } - t.Fatal(err) + go w.Close() + go w.Close() + go w.Close() } - go w.Close() - go w.Close() - go w.Close() - } + }) + t.Run("buffered=4096", func(t *testing.T) { + t.Parallel() + + for i := 0; i < 150; i++ { + w, err := NewBufferedWatcher(4096) + if err != nil { + if strings.Contains(err.Error(), "too many") { // syscall.EMFILE + time.Sleep(100 * time.Millisecond) + continue + } + t.Fatal(err) + } + go w.Close() + go w.Close() + go w.Close() + } + }) }) t.Run("closes channels after read", func(t *testing.T) { @@ -1618,141 +1618,122 @@ func TestOpHas(t *testing.T) { } func BenchmarkWatch(b *testing.B) { - w, err := NewWatcher() - if err != nil { - b.Fatal(err) - } - - tmp := b.TempDir() - file := join(tmp, "file") - err = w.Add(tmp) - if err != nil { - b.Fatal(err) - } + do := func(b *testing.B, w *Watcher) { + tmp := b.TempDir() + file := join(tmp, "file") + err := w.Add(tmp) + if err != nil { + b.Fatal(err) + } - var wg sync.WaitGroup - wg.Add(1) - go func() { - for { - select { - case err, ok := <-w.Errors: - if !ok { - wg.Done() - return - } - b.Error(err) - case _, ok := <-w.Events: - if !ok { - wg.Done() - return + var wg sync.WaitGroup + wg.Add(1) + go func() { + for { + select { + case err, ok := <-w.Errors: + if !ok { + wg.Done() + return + } + b.Error(err) + case _, ok := <-w.Events: + if !ok { + wg.Done() + return + } } } - } - }() + }() - b.ResetTimer() - for n := 0; n < b.N; n++ { - fp, err := os.Create(file) - if err != nil { - b.Fatal(err) + b.ResetTimer() + for n := 0; n < b.N; n++ { + fp, err := os.Create(file) + if err != nil { + b.Fatal(err) + } + err = fp.Close() + if err != nil { + b.Fatal(err) + } } - err = fp.Close() + err = w.Close() if err != nil { b.Fatal(err) } - } - err = w.Close() - if err != nil { - b.Fatal(err) - } - wg.Wait() -} - -func BenchmarkBufferedWatch(b *testing.B) { - w, err := NewBufferedWatcher(4096) - if err != nil { - b.Fatal(err) - } - - tmp := b.TempDir() - file := join(tmp, "file") - err = w.Add(tmp) - if err != nil { - b.Fatal(err) + wg.Wait() } - var wg sync.WaitGroup - wg.Add(1) - go func() { - for { - select { - case err, ok := <-w.Errors: - if !ok { - wg.Done() - return - } - b.Error(err) - case _, ok := <-w.Events: - if !ok { - wg.Done() - return - } - } + b.Run("default", func(b *testing.B) { + w, err := NewWatcher() + if err != nil { + b.Fatal(err) } - }() - - b.ResetTimer() - for n := 0; n < b.N; n++ { - fp, err := os.Create(file) + do(b, w) + }) + b.Run("buffered=1", func(b *testing.B) { + w, err := NewBufferedWatcher(1) if err != nil { b.Fatal(err) } - err = fp.Close() + do(b, w) + }) + b.Run("buffered=1024", func(b *testing.B) { + w, err := NewBufferedWatcher(1024) if err != nil { b.Fatal(err) } - } - err = w.Close() - if err != nil { - b.Fatal(err) - } - wg.Wait() + do(b, w) + }) + b.Run("buffered=4096", func(b *testing.B) { + w, err := NewBufferedWatcher(4096) + if err != nil { + b.Fatal(err) + } + do(b, w) + }) } func BenchmarkAddRemove(b *testing.B) { - w, err := NewWatcher() - if err != nil { - b.Fatal(err) + do := func(b *testing.B, w *Watcher) { + tmp := b.TempDir() + b.ResetTimer() + for n := 0; n < b.N; n++ { + if err := w.Add(tmp); err != nil { + b.Fatal(err) + } + if err := w.Remove(tmp); err != nil { + b.Fatal(err) + } + } } - tmp := b.TempDir() - - b.ResetTimer() - for n := 0; n < b.N; n++ { - if err := w.Add(tmp); err != nil { + b.Run("default", func(b *testing.B) { + w, err := NewWatcher() + if err != nil { b.Fatal(err) } - if err := w.Remove(tmp); err != nil { + do(b, w) + }) + b.Run("buffered=1", func(b *testing.B) { + w, err := NewBufferedWatcher(1) + if err != nil { b.Fatal(err) } - } -} - -func BenchmarkBufferedAddRemove(b *testing.B) { - w, err := NewBufferedWatcher(4096) - if err != nil { - b.Fatal(err) - } - - tmp := b.TempDir() - - b.ResetTimer() - for n := 0; n < b.N; n++ { - if err := w.Add(tmp); err != nil { + do(b, w) + }) + b.Run("buffered=1024", func(b *testing.B) { + w, err := NewBufferedWatcher(1024) + if err != nil { b.Fatal(err) } - if err := w.Remove(tmp); err != nil { + do(b, w) + }) + b.Run("buffered=4096", func(b *testing.B) { + w, err := NewBufferedWatcher(4096) + if err != nil { b.Fatal(err) } - } + do(b, w) + }) } diff --git a/helpers_test.go b/helpers_test.go index 98acb239..c9a7dd78 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -43,17 +43,35 @@ func (tt testCase) run(t *testing.T) { func eventSeparator() { time.Sleep(50 * time.Millisecond) } func waitForEvents() { time.Sleep(500 * time.Millisecond) } +// To test the buffered watcher we run the tests twice in the CI: once as "go +// test" and once with FSNOTIFY_BUFFER set. This is a bit hacky, but saves +// having to refactor a lot of this code. Besides, running the tests in the CI +// more than once isn't a bad thing, since it helps catch flaky tests (should +// probably run it even more). +var testBuffered = func() uint { + s, ok := os.LookupEnv("FSNOTIFY_BUFFER") + if ok { + i, err := strconv.ParseUint(s, 0, 0) + if err != nil { + panic(fmt.Sprintf("FSNOTIFY_BUFFER: %s", err)) + } + return uint(i) + } + return 0 +}() + // newWatcher initializes an fsnotify Watcher instance. func newWatcher(t *testing.T, add ...string) *Watcher { t.Helper() - w, err := NewWatcher() - if e, ok := os.LookupEnv("FSNOTIFY_BUFFER"); ok { - t.Logf("using FSNOTIFY_BUFFER=%v", e) - n, err2 := strconv.Atoi(e) - if err2 != nil { - t.Fatalf("FSNOTIFY_BUFFER: %v", err2) - } - w, err = NewBufferedWatcher(uint(n)) + + var ( + w *Watcher + err error + ) + if testBuffered > 0 { + w, err = NewBufferedWatcher(testBuffered) + } else { + w, err = NewWatcher() } if err != nil { t.Fatalf("newWatcher: %s", err) diff --git a/mkdoc.zsh b/mkdoc.zsh index 4aaecfbb..ec01e6a5 100755 --- a/mkdoc.zsh +++ b/mkdoc.zsh @@ -67,10 +67,10 @@ watcher=$(<