Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sync.EagerGroup #425

Merged
merged 9 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/labeler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ name: "Pull request labeler"
on:
- pull_request

concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true
jobs:
triage:
permissions:
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/pr-description-enforcer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ on:
- edited
- reopened

concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true
jobs:
enforce:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/semantic-pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ on:
- ready_for_review
- synchronize

concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true
jobs:
main:
name: title
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ on:
- main
- "release/*"
pull_request:
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true
jobs:
unit:
name: unit
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ on:
- master
- main
pull_request:
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true
jobs:
generate:
name: generated files
Expand Down
99 changes: 99 additions & 0 deletions sync/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package sync

import (
"context"
"sync"
)

// A EagerGroup is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// Use NewEagerGroup to create a new group.
type EagerGroup struct {
ctx context.Context
cancel context.CancelCauseFunc
wg sync.WaitGroup
sem chan struct{}
errOnce sync.Once
err error
}

// NewEagerGroup returns a new eager group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
//
// limit < 1 means no limit on the number of active goroutines.
func NewEagerGroup(ctx context.Context, limit int) (*EagerGroup, context.Context) {
ctx, cancel := context.WithCancelCause(ctx)
g := &EagerGroup{
ctx: ctx,
cancel: cancel,
}
if limit > 0 {
g.sem = make(chan struct{}, limit)
}
return g, ctx
}

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context.
// The error will be returned by Wait.
//
// If the group was created by calling NewEagerGroup with limit < 1, there is no
// limit on the number of active goroutines.
//
// If the group's context is canceled, routines that have not executed yet due to the limit won't be executed.
// Additionally, there is a best effort not to execute `f()` once the context is canceled
// and that happens whether or not a limit has been specified.
func (g *EagerGroup) Go(f func() error) {
if err := g.ctx.Err(); err != nil {
g.errOnce.Do(func() {
g.err = g.ctx.Err()
g.cancel(g.err)
})
return
}
fracasula marked this conversation as resolved.
Show resolved Hide resolved

if g.sem != nil {
select {
case <-g.ctx.Done():
g.errOnce.Do(func() {
g.err = g.ctx.Err()
g.cancel(g.err)
})
return
case g.sem <- struct{}{}:
}
}

g.wg.Add(1)
go func() {
err := g.ctx.Err()
if err == nil {
err = f()
}
if err != nil {
g.errOnce.Do(func() {
g.err = err
g.cancel(g.err)
})
}
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}()
}

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *EagerGroup) Wait() error {
g.wg.Wait()
g.cancel(g.err)
return g.err
}
91 changes: 91 additions & 0 deletions sync/group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package sync

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestEagerGroupWithLimit(t *testing.T) {
g, ctx := NewEagerGroup(context.Background(), 2)
var count atomic.Int64
// One of the following three goroutines should DEFINITELY NOT be executed due to the limit of 2 and the context being cancelled.
// The context should get cancelled automatically because the first two routines returned an error.
g.Go(func() error {
t.Log("one")
count.Add(1)
return fmt.Errorf("one")
})
g.Go(func() error {
t.Log("two")
count.Add(1)
return fmt.Errorf("two")
})
g.Go(func() error {
t.Log("three")
count.Add(1)
return fmt.Errorf("three")
})
require.Error(t, g.Wait(), "We expect group.Wait() to return an error")
ok := true
select {
case <-ctx.Done():
_, ok = <-ctx.Done()
case <-time.After(time.Second):
}
require.False(t, ok, "We expect the context to be cancelled")
require.True(t, 1 <= count.Load() && count.Load() <= 2, "We expect count to be between 1 and 2")
}

func TestEagerGroupWithNoLimit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := NewEagerGroup(ctx, 0)
funcCounter := &atomic.Int64{}

go func() {
for {
if funcCounter.Load() > 10 {
cancel()
return
}
}
}()

for i := 0; i < 10000; i++ {
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
funcCounter.Add(1)
return nil
})
}
require.ErrorIs(t, g.Wait(), ctx.Err(), "We expect group.Wait() to return the context error")
_, ok := <-ctx.Done()
require.False(t, ok, "We expect the context to be cancelled")
t.Log(funcCounter.Load(), "funcs executed")
// We expect between 10 and 10000 funcs to be executed
// because group tries to return early if context is cancelled
require.Less(
t,
funcCounter.Load(),
int64(10000),
"Expected less than 1000 funcs to be executed",
)
}

func TestNoInitEagerGroup(t *testing.T) {
g := &EagerGroup{}
f := func() error { return nil }
require.Panics(
t,
func() { g.Go(f) },
"We expect a panic when calling Go on a group that has not been initialized with NewEagerGroup",
)
}