Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional Buffered Events channel. #550

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 13 additions & 1 deletion backend_fen.go
Expand Up @@ -133,8 +133,20 @@ type Watcher struct {

// NewWatcher creates a new Watcher.
func NewWatcher() (*Watcher, error) {
return NewBufferedWatcher(0)
}

// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel.
// For almost all use cases an unbuffered Watcher will perform better than buffered.
// Most kernels have de-duplication logic which allows for less activity in userspace
// and generally better performance. However there may be some cases where a very
// large buffers can enable an application to keep up with mass file rotations.
// You will always be better off increasing the kernel buffers over adding a large
// userspace buffer, but if you can't control the kernel buffer then a buffered
// watcher is a reasonable option. You probably want NewWatcher.
func NewBufferedWatcher(sz uint) (*Watcher, error) {
w := &Watcher{
Events: make(chan Event),
Events: make(chan Event, sz),
Errors: make(chan error),
dirs: make(map[string]struct{}),
watches: make(map[string]struct{}),
Expand Down
14 changes: 13 additions & 1 deletion backend_inotify.go
Expand Up @@ -232,6 +232,18 @@ func (w *watches) updatePath(path string, f func(*watch) (*watch, error)) error

// NewWatcher creates a new Watcher.
func NewWatcher() (*Watcher, error) {
return NewBufferedWatcher(0)
}

// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel.
// For almost all use cases an unbuffered Watcher will perform better than buffered.
// Most kernels have de-duplication logic which allows for less activity in userspace
// and generally better performance. However there may be some cases where a very
// large buffers can enable an application to keep up with mass file rotations.
// You will always be better off increasing the kernel buffers over adding a large
// userspace buffer, but if you can't control the kernel buffer then a buffered
// watcher is a reasonable option. You probably want NewWatcher.
func NewBufferedWatcher(sz uint) (*Watcher, error) {
// Need to set nonblocking mode for SetDeadline to work, otherwise blocking
// I/O operations won't terminate on close.
fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK)
Expand All @@ -243,7 +255,7 @@ func NewWatcher() (*Watcher, error) {
fd: fd,
inotifyFile: os.NewFile(uintptr(fd), ""),
watches: newWatches(),
Events: make(chan Event),
Events: make(chan Event, sz),
Errors: make(chan error),
done: make(chan struct{}),
doneResp: make(chan struct{}),
Expand Down
14 changes: 13 additions & 1 deletion backend_kqueue.go
Expand Up @@ -144,6 +144,18 @@ type pathInfo struct {

// NewWatcher creates a new Watcher.
func NewWatcher() (*Watcher, error) {
return NewBufferedWatcher(0)
}

// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel.
// For almost all use cases an unbuffered Watcher will perform better than buffered.
// Most kernels have de-duplication logic which allows for less activity in userspace
// and generally better performance. However there may be some cases where a very
// large buffers can enable an application to keep up with mass file rotations.
// You will always be better off increasing the kernel buffers over adding a large
// userspace buffer, but if you can't control the kernel buffer then a buffered
// watcher is a reasonable option. You probably want NewWatcher.
func NewBufferedWatcher(sz uint) (*Watcher, error) {
kq, closepipe, err := newKqueue()
if err != nil {
return nil, err
Expand All @@ -158,7 +170,7 @@ func NewWatcher() (*Watcher, error) {
paths: make(map[int]pathInfo),
fileExists: make(map[string]struct{}),
userWatches: make(map[string]struct{}),
Events: make(chan Event),
Events: make(chan Event, sz),
Errors: make(chan error),
done: make(chan struct{}),
}
Expand Down
12 changes: 12 additions & 0 deletions backend_other.go
Expand Up @@ -122,6 +122,18 @@ func NewWatcher() (*Watcher, error) {
return nil, errors.New("fsnotify not supported on the current platform")
}

// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel.
// For almost all use cases an unbuffered Watcher will perform better than buffered.
// Most kernels have de-duplication logic which allows for less activity in userspace
// and generally better performance. However there may be some cases where a very
// large buffers can enable an application to keep up with mass file rotations.
// You will always be better off increasing the kernel buffers over adding a large
// userspace buffer, but if you can't control the kernel buffer then a buffered
// watcher is a reasonable option. You probably want NewWatcher.
func NewBufferedWatcher(sz uint) (*Watcher, error) {
return NewWatcher() //just re-use the original error response
}

// Close removes all watches and closes the events channel.
func (w *Watcher) Close() error { return nil }

Expand Down
14 changes: 13 additions & 1 deletion backend_windows.go
Expand Up @@ -143,6 +143,18 @@ type Watcher struct {

// NewWatcher creates a new Watcher.
func NewWatcher() (*Watcher, error) {
return NewBufferedWatcher(50) // Windows backend defaults to a buffered channel of size 50
}

// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel.
// For almost all use cases an unbuffered Watcher will perform better than buffered.
// Most kernels have de-duplication logic which allows for less activity in userspace
// and generally better performance. However there may be some cases where a very
// large buffers can enable an application to keep up with mass file rotations.
// You will always be better off increasing the kernel buffers over adding a large
// userspace buffer, but if you can't control the kernel buffer then a buffered
// watcher is a reasonable option. You probably want NewWatcher.
func NewBufferedWatcher(sz uint) (*Watcher, error) {
port, err := windows.CreateIoCompletionPort(windows.InvalidHandle, 0, 0, 0)
if err != nil {
return nil, os.NewSyscallError("CreateIoCompletionPort", err)
Expand All @@ -151,7 +163,7 @@ func NewWatcher() (*Watcher, error) {
port: port,
watches: make(watchMap),
input: make(chan *input, 1),
Events: make(chan Event, 50),
Events: make(chan Event, sz),
Errors: make(chan error),
quit: make(chan chan<- error, 1),
}
Expand Down
70 changes: 70 additions & 0 deletions fsnotify_test.go
Expand Up @@ -1577,6 +1577,57 @@ func BenchmarkWatch(b *testing.B) {
wg.Wait()
}

func BenchmarkBufferedWatch(b *testing.B) {
w, err := NewBufferedWatcher(4096)
if err != nil {
b.Fatal(err)
}

tmp := b.TempDir()
file := join(tmp, "file")
err = w.Add(tmp)
if err != nil {
b.Fatal(err)
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
select {
case err, ok := <-w.Errors:
if !ok {
wg.Done()
return
}
b.Error(err)
case _, ok := <-w.Events:
if !ok {
wg.Done()
return
}
}
}
}()

b.ResetTimer()
for n := 0; n < b.N; n++ {
fp, err := os.Create(file)
if err != nil {
b.Fatal(err)
}
err = fp.Close()
if err != nil {
b.Fatal(err)
}
}
err = w.Close()
if err != nil {
b.Fatal(err)
}
wg.Wait()
}

func BenchmarkAddRemove(b *testing.B) {
w, err := NewWatcher()
if err != nil {
Expand All @@ -1595,3 +1646,22 @@ func BenchmarkAddRemove(b *testing.B) {
}
}
}

func BenchmarkBufferedAddRemove(b *testing.B) {
w, err := NewBufferedWatcher(4096)
if err != nil {
b.Fatal(err)
}

tmp := b.TempDir()

b.ResetTimer()
for n := 0; n < b.N; n++ {
if err := w.Add(tmp); err != nil {
b.Fatal(err)
}
if err := w.Remove(tmp); err != nil {
b.Fatal(err)
}
}
}
12 changes: 12 additions & 0 deletions mkdoc.zsh
Expand Up @@ -72,6 +72,17 @@ EOF
new=$(<<EOF
// NewWatcher creates a new Watcher.
EOF

newbuffered=$(<<EOF
// NewBufferedWatcher creates a new Watcher with an optionally buffered event channel.
// For almost all use cases an unbuffered Watcher will perform better than buffered.
// Most kernels have de-duplication logic which allows for less activity in userspace
// and generally better performance. However there may be some cases where a very
// large buffers can enable an application to keep up with mass file rotations.
// You will always be better off increasing the kernel buffers over adding a large
// userspace buffer, but if you can't control the kernel buffer then a buffered
// watcher is a reasonable option. You probably want NewWatcher.
EOF
)

add=$(<<EOF
Expand Down Expand Up @@ -236,6 +247,7 @@ set-cmt() {

set-cmt '^type Watcher struct ' $watcher
set-cmt '^func NewWatcher(' $new
set-cmt '^func NewBufferedWatcher(' $newbuffered
set-cmt '^func (w \*Watcher) Add(' $add
set-cmt '^func (w \*Watcher) AddWith(' $addwith
set-cmt '^func (w \*Watcher) Remove(' $remove
Expand Down