From 5e8ff902b136ea132c0b7f39bf47eedb0eed2d43 Mon Sep 17 00:00:00 2001 From: kris Date: Thu, 19 Jan 2023 09:05:01 -0700 Subject: [PATCH 1/4] adding options parameters to NewWatcher to allow for buffered EventChannels --- backend_fen.go | 7 ++++--- backend_inotify.go | 7 ++++--- backend_kqueue.go | 5 +++-- backend_other.go | 4 ++-- backend_windows.go | 18 +++++++++++++++--- fsnotify.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ fsnotify_test.go | 28 ++++++++++++++++++++++++++++ 7 files changed, 102 insertions(+), 13 deletions(-) diff --git a/backend_fen.go b/backend_fen.go index 500bd63f..8e2a2df0 100644 --- a/backend_fen.go +++ b/backend_fen.go @@ -131,10 +131,11 @@ type Watcher struct { watches map[string]struct{} // Explicitly watched non-directories } -// NewWatcher creates a new Watcher. -func NewWatcher() (*Watcher, error) { +// NewWatcher creates a new Watcher with an optional set of Option functions. +func NewWatcher(opts ...newOpt) (*Watcher, error) { + o := getNewOptions(opts...) w := &Watcher{ - Events: make(chan Event), + Events: o.eventChannel(), Errors: make(chan error), dirs: make(map[string]struct{}), watches: make(map[string]struct{}), diff --git a/backend_inotify.go b/backend_inotify.go index fe5033b1..6b69f27a 100644 --- a/backend_inotify.go +++ b/backend_inotify.go @@ -230,8 +230,8 @@ func (w *watches) updatePath(path string, f func(*watch) (*watch, error)) error return nil } -// NewWatcher creates a new Watcher. -func NewWatcher() (*Watcher, error) { +// NewWatcher creates a new Watcher with an optional set of Option functions. +func NewWatcher(opts ...newOpt) (*Watcher, error) { // Need to set nonblocking mode for SetDeadline to work, otherwise blocking // I/O operations won't terminate on close. fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK) @@ -239,11 +239,12 @@ func NewWatcher() (*Watcher, error) { return nil, errno } + o := getNewOptions(opts...) w := &Watcher{ fd: fd, inotifyFile: os.NewFile(uintptr(fd), ""), watches: newWatches(), - Events: make(chan Event), + Events: o.eventChannel(), Errors: make(chan error), done: make(chan struct{}), doneResp: make(chan struct{}), diff --git a/backend_kqueue.go b/backend_kqueue.go index 9d464504..df91e68c 100644 --- a/backend_kqueue.go +++ b/backend_kqueue.go @@ -142,13 +142,14 @@ type pathInfo struct { isDir bool } -// NewWatcher creates a new Watcher. +// NewWatcher creates a new Watcher with an optional set of Option functions. func NewWatcher() (*Watcher, error) { kq, closepipe, err := newKqueue() if err != nil { return nil, err } + o := getNewOptions(opts...) w := &Watcher{ kq: kq, closepipe: closepipe, @@ -158,7 +159,7 @@ func NewWatcher() (*Watcher, error) { paths: make(map[int]pathInfo), fileExists: make(map[string]struct{}), userWatches: make(map[string]struct{}), - Events: make(chan Event), + Events: o.eventChannel(), Errors: make(chan error), done: make(chan struct{}), } diff --git a/backend_other.go b/backend_other.go index 1668de90..4c3ce17a 100644 --- a/backend_other.go +++ b/backend_other.go @@ -117,8 +117,8 @@ type Watcher struct { Errors chan error } -// NewWatcher creates a new Watcher. -func NewWatcher() (*Watcher, error) { +// NewWatcher creates a new Watcher with an optional set of Option functions. +func NewWatcher(opts ...newOpt) (*Watcher, error) { return nil, errors.New("fsnotify not supported on the current platform") } diff --git a/backend_windows.go b/backend_windows.go index c0359ac8..9dc24e56 100644 --- a/backend_windows.go +++ b/backend_windows.go @@ -141,17 +141,29 @@ type Watcher struct { closed bool // Set to true when Close() is first called } -// NewWatcher creates a new Watcher. -func NewWatcher() (*Watcher, error) { +// NewWatcher creates a new Watcher with an optional set of Option functions. +func NewWatcher(opts ...newOpt) (*Watcher, error) { port, err := windows.CreateIoCompletionPort(windows.InvalidHandle, 0, 0, 0) if err != nil { return nil, os.NewSyscallError("CreateIoCompletionPort", err) } + + o := getNewOptions(opts...) + + // Windows doesn't have an internal kernel buffer so we default to an Event + // channel buffer of 50 if no manual buffer is set. + var ch chan Event + if o.eventbuffer > 0 { + ch = make(chan Event, o.eventbuffer) + } else { + //no manual buffer set, use the default of 50 + ch = make(chan Event, 50) + } w := &Watcher{ port: port, watches: make(watchMap), input: make(chan *input, 1), - Events: make(chan Event, 50), + Events: ch, Errors: make(chan error), quit: make(chan chan<- error, 1), } diff --git a/fsnotify.go b/fsnotify.go index c00ce762..c71eba3a 100644 --- a/fsnotify.go +++ b/fsnotify.go @@ -141,3 +141,49 @@ func recursivePath(path string) (string, bool) { } return path, false } + +type newOpt func(opt *newOpts) //option function to be passed to the NewWatcher + +// newOpts is used to configure some optional items in the Watcher structure during configuration. +// The newOpts structure is extensible so that new optional configuration items can be added later +type newOpts struct { + eventbuffer uint +} + +var defaultNewOpts = newOpts{ + eventbuffer: 0, // Default to an unbuffered channel +} + +// WithBufferedEventChannel configures the Watcher with an optionally buffered Event channel +// +// The default Event channel is unbuffered, which means that the host kernel is solely +// responsible for buffering events. A buffered Event channel allows for a userspace +// application to add additional event buffering without modifying kernel parameters. +// Adjusting the kernel buffers will always provide better performance on highly loaded +// systems, but a user space buffer can potentially provide some resiliency when it is +// not possible to change the kernel parameters +func WithBufferedEventChannel(sz uint) newOpt { + return func(opt *newOpts) { + if opt != nil { + opt.eventbuffer = sz + } + } +} + +// getNewOptions is just a little helper function simplify the usage of options in NewWatcher +func getNewOptions(opts ...newOpt) (r newOpts) { + r = defaultNewOpts + for _, o := range opts { + o(&r) + } + return +} + +// eventChannel creates a new Event channel based on eventbuffer value. +// A value of zero indicates an un buffered channel, values > 0 createa buffered channel. +func (o newOpts) eventChannel() chan Event { + if o.eventbuffer == 0 { + return make(chan Event) + } + return make(chan Event, o.eventbuffer) +} diff --git a/fsnotify_test.go b/fsnotify_test.go index b86d54d5..2ba54528 100644 --- a/fsnotify_test.go +++ b/fsnotify_test.go @@ -1526,6 +1526,34 @@ func TestWatchList(t *testing.T) { } } +func TestWatchBufferedChannel(t *testing.T) { + //create a watcher without a buffer + w, err := NewWatcher() + if err != nil { + t.Fatal(err) + } + //check that the channel exists + if w.Events == nil { + t.Fatal("nil channel") + } else if err = w.Close(); err != nil { + t.Fatal(err) + } + + //check with a buffered channel + if w, err = NewWatcher(WithBufferedEventChannel(42)); err != nil { + t.Fatal(err) + } + //check that the channel exists + if w.Events == nil { + t.Fatal("nil channel") + } else if cap(w.Events) != 42 { + t.Fatalf("event channel cap is wrong: %d != 42", cap(w.Events)) + } else if err = w.Close(); err != nil { + t.Fatal(err) + } + +} + func BenchmarkWatch(b *testing.B) { w, err := NewWatcher() if err != nil { From 5e99d85cbe185e91b0975944b76669f825348086 Mon Sep 17 00:00:00 2001 From: kris Date: Thu, 19 Jan 2023 09:20:28 -0700 Subject: [PATCH 2/4] missed one --- backend_kqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend_kqueue.go b/backend_kqueue.go index df91e68c..e56e18bd 100644 --- a/backend_kqueue.go +++ b/backend_kqueue.go @@ -143,7 +143,7 @@ type pathInfo struct { } // NewWatcher creates a new Watcher with an optional set of Option functions. -func NewWatcher() (*Watcher, error) { +func NewWatcher(opts ...newOpt) (*Watcher, error) { kq, closepipe, err := newKqueue() if err != nil { return nil, err From eafc1ded10717c05f2df1adafabb77486c1558fc Mon Sep 17 00:00:00 2001 From: kris Date: Sun, 5 Feb 2023 15:17:53 -0700 Subject: [PATCH 3/4] Addressing comments and moving to a new top level call rather than trying to update the existing NewWatcher --- backend_fen.go | 7 ++++- backend_inotify.go | 7 ++++- backend_kqueue.go | 7 ++++- backend_other.go | 5 ++++ backend_windows.go | 7 ++++- fsnotify_test.go | 70 ++++++++++++++++++++++++++++++++++++++++++++++ mkdoc.zsh | 5 ++++ 7 files changed, 104 insertions(+), 4 deletions(-) diff --git a/backend_fen.go b/backend_fen.go index 500bd63f..0d700fc8 100644 --- a/backend_fen.go +++ b/backend_fen.go @@ -133,8 +133,13 @@ type Watcher struct { // NewWatcher creates a new Watcher. func NewWatcher() (*Watcher, error) { + return NewBufferedWatcher(0) +} + +// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +func NewBufferedWatcher(sz uint) (*Watcher, error) { w := &Watcher{ - Events: make(chan Event), + Events: make(chan Event, sz), Errors: make(chan error), dirs: make(map[string]struct{}), watches: make(map[string]struct{}), diff --git a/backend_inotify.go b/backend_inotify.go index fe5033b1..8360a104 100644 --- a/backend_inotify.go +++ b/backend_inotify.go @@ -232,6 +232,11 @@ func (w *watches) updatePath(path string, f func(*watch) (*watch, error)) error // NewWatcher creates a new Watcher. func NewWatcher() (*Watcher, error) { + return NewBufferedWatcher(0) +} + +// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +func NewBufferedWatcher(sz uint) (*Watcher, error) { // Need to set nonblocking mode for SetDeadline to work, otherwise blocking // I/O operations won't terminate on close. fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK) @@ -243,7 +248,7 @@ func NewWatcher() (*Watcher, error) { fd: fd, inotifyFile: os.NewFile(uintptr(fd), ""), watches: newWatches(), - Events: make(chan Event), + Events: make(chan Event, sz), Errors: make(chan error), done: make(chan struct{}), doneResp: make(chan struct{}), diff --git a/backend_kqueue.go b/backend_kqueue.go index 9d464504..9d644b2e 100644 --- a/backend_kqueue.go +++ b/backend_kqueue.go @@ -144,6 +144,11 @@ type pathInfo struct { // NewWatcher creates a new Watcher. func NewWatcher() (*Watcher, error) { + return NewBufferedWatcher(0) +} + +// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +func NewBufferedWatcher(sz uint) (*Watcher, error) { kq, closepipe, err := newKqueue() if err != nil { return nil, err @@ -158,7 +163,7 @@ func NewWatcher() (*Watcher, error) { paths: make(map[int]pathInfo), fileExists: make(map[string]struct{}), userWatches: make(map[string]struct{}), - Events: make(chan Event), + Events: make(chan Event, sz), Errors: make(chan error), done: make(chan struct{}), } diff --git a/backend_other.go b/backend_other.go index 1668de90..a24856d9 100644 --- a/backend_other.go +++ b/backend_other.go @@ -122,6 +122,11 @@ func NewWatcher() (*Watcher, error) { return nil, errors.New("fsnotify not supported on the current platform") } +// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +func NewBufferedWatcher(sz uint) (*Watcher, error) { + return NewWatcher() //just re-use the original error response +} + // Close removes all watches and closes the events channel. func (w *Watcher) Close() error { return nil } diff --git a/backend_windows.go b/backend_windows.go index c0359ac8..3f4feefe 100644 --- a/backend_windows.go +++ b/backend_windows.go @@ -143,6 +143,11 @@ type Watcher struct { // NewWatcher creates a new Watcher. func NewWatcher() (*Watcher, error) { + return NewBufferedWatcher(50) // Windows backend defaults to a buffered channel of size 50 +} + +// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +func NewBufferedWatcher(sz uint) (*Watcher, error) { port, err := windows.CreateIoCompletionPort(windows.InvalidHandle, 0, 0, 0) if err != nil { return nil, os.NewSyscallError("CreateIoCompletionPort", err) @@ -151,7 +156,7 @@ func NewWatcher() (*Watcher, error) { port: port, watches: make(watchMap), input: make(chan *input, 1), - Events: make(chan Event, 50), + Events: make(chan Event, sz), Errors: make(chan error), quit: make(chan chan<- error, 1), } diff --git a/fsnotify_test.go b/fsnotify_test.go index b86d54d5..3733988a 100644 --- a/fsnotify_test.go +++ b/fsnotify_test.go @@ -1577,6 +1577,57 @@ func BenchmarkWatch(b *testing.B) { wg.Wait() } +func BenchmarkBufferedWatch(b *testing.B) { + w, err := NewBufferedWatcher(4096) + if err != nil { + b.Fatal(err) + } + + tmp := b.TempDir() + file := join(tmp, "file") + err = w.Add(tmp) + if err != nil { + b.Fatal(err) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + for { + select { + case err, ok := <-w.Errors: + if !ok { + wg.Done() + return + } + b.Error(err) + case _, ok := <-w.Events: + if !ok { + wg.Done() + return + } + } + } + }() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + fp, err := os.Create(file) + if err != nil { + b.Fatal(err) + } + err = fp.Close() + if err != nil { + b.Fatal(err) + } + } + err = w.Close() + if err != nil { + b.Fatal(err) + } + wg.Wait() +} + func BenchmarkAddRemove(b *testing.B) { w, err := NewWatcher() if err != nil { @@ -1595,3 +1646,22 @@ func BenchmarkAddRemove(b *testing.B) { } } } + +func BenchmarkBufferedAddRemove(b *testing.B) { + w, err := NewBufferedWatcher(4096) + if err != nil { + b.Fatal(err) + } + + tmp := b.TempDir() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + if err := w.Add(tmp); err != nil { + b.Fatal(err) + } + if err := w.Remove(tmp); err != nil { + b.Fatal(err) + } + } +} diff --git a/mkdoc.zsh b/mkdoc.zsh index 868d3b80..c1970501 100755 --- a/mkdoc.zsh +++ b/mkdoc.zsh @@ -72,6 +72,10 @@ EOF new=$(< Date: Sun, 5 Feb 2023 15:26:58 -0700 Subject: [PATCH 4/4] udpate comment blocks --- backend_fen.go | 9 ++++++++- backend_inotify.go | 9 ++++++++- backend_kqueue.go | 9 ++++++++- backend_other.go | 9 ++++++++- backend_windows.go | 9 ++++++++- mkdoc.zsh | 7 +++++++ 6 files changed, 47 insertions(+), 5 deletions(-) diff --git a/backend_fen.go b/backend_fen.go index 0d700fc8..1df18dcb 100644 --- a/backend_fen.go +++ b/backend_fen.go @@ -136,7 +136,14 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(0) } -// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. +// For almost all use cases an unbuffered Watcher will perform better than buffered. +// Most kernels have de-duplication logic which allows for less activity in userspace +// and generally better performance. However there may be some cases where a very +// large buffers can enable an application to keep up with mass file rotations. +// You will always be better off increasing the kernel buffers over adding a large +// userspace buffer, but if you can't control the kernel buffer then a buffered +// watcher is a reasonable option. You probably want NewWatcher. func NewBufferedWatcher(sz uint) (*Watcher, error) { w := &Watcher{ Events: make(chan Event, sz), diff --git a/backend_inotify.go b/backend_inotify.go index 8360a104..80c44f60 100644 --- a/backend_inotify.go +++ b/backend_inotify.go @@ -235,7 +235,14 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(0) } -// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. +// For almost all use cases an unbuffered Watcher will perform better than buffered. +// Most kernels have de-duplication logic which allows for less activity in userspace +// and generally better performance. However there may be some cases where a very +// large buffers can enable an application to keep up with mass file rotations. +// You will always be better off increasing the kernel buffers over adding a large +// userspace buffer, but if you can't control the kernel buffer then a buffered +// watcher is a reasonable option. You probably want NewWatcher. func NewBufferedWatcher(sz uint) (*Watcher, error) { // Need to set nonblocking mode for SetDeadline to work, otherwise blocking // I/O operations won't terminate on close. diff --git a/backend_kqueue.go b/backend_kqueue.go index 9d644b2e..d3b148ff 100644 --- a/backend_kqueue.go +++ b/backend_kqueue.go @@ -147,7 +147,14 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(0) } -// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. +// For almost all use cases an unbuffered Watcher will perform better than buffered. +// Most kernels have de-duplication logic which allows for less activity in userspace +// and generally better performance. However there may be some cases where a very +// large buffers can enable an application to keep up with mass file rotations. +// You will always be better off increasing the kernel buffers over adding a large +// userspace buffer, but if you can't control the kernel buffer then a buffered +// watcher is a reasonable option. You probably want NewWatcher. func NewBufferedWatcher(sz uint) (*Watcher, error) { kq, closepipe, err := newKqueue() if err != nil { diff --git a/backend_other.go b/backend_other.go index a24856d9..60b124d2 100644 --- a/backend_other.go +++ b/backend_other.go @@ -122,7 +122,14 @@ func NewWatcher() (*Watcher, error) { return nil, errors.New("fsnotify not supported on the current platform") } -// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. +// For almost all use cases an unbuffered Watcher will perform better than buffered. +// Most kernels have de-duplication logic which allows for less activity in userspace +// and generally better performance. However there may be some cases where a very +// large buffers can enable an application to keep up with mass file rotations. +// You will always be better off increasing the kernel buffers over adding a large +// userspace buffer, but if you can't control the kernel buffer then a buffered +// watcher is a reasonable option. You probably want NewWatcher. func NewBufferedWatcher(sz uint) (*Watcher, error) { return NewWatcher() //just re-use the original error response } diff --git a/backend_windows.go b/backend_windows.go index 3f4feefe..733ab09a 100644 --- a/backend_windows.go +++ b/backend_windows.go @@ -146,7 +146,14 @@ func NewWatcher() (*Watcher, error) { return NewBufferedWatcher(50) // Windows backend defaults to a buffered channel of size 50 } -// NewBufferedWatcher creates a new Watcher with an optionally buffered Event channel +// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel. +// For almost all use cases an unbuffered Watcher will perform better than buffered. +// Most kernels have de-duplication logic which allows for less activity in userspace +// and generally better performance. However there may be some cases where a very +// large buffers can enable an application to keep up with mass file rotations. +// You will always be better off increasing the kernel buffers over adding a large +// userspace buffer, but if you can't control the kernel buffer then a buffered +// watcher is a reasonable option. You probably want NewWatcher. func NewBufferedWatcher(sz uint) (*Watcher, error) { port, err := windows.CreateIoCompletionPort(windows.InvalidHandle, 0, 0, 0) if err != nil { diff --git a/mkdoc.zsh b/mkdoc.zsh index c1970501..f4956525 100755 --- a/mkdoc.zsh +++ b/mkdoc.zsh @@ -75,6 +75,13 @@ EOF newbuffered=$(<