Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: go-co-op/gocron
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v2.16.0
Choose a base ref
...
head repository: go-co-op/gocron
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v2.16.1
Choose a head ref
  • 2 commits
  • 3 files changed
  • 2 contributors

Commits on Feb 25, 2025

  1. ExampleWithCronImplementation

    JohnRoesler committed Feb 25, 2025
    Copy the full SHA
    e1b7d52 View commit details

Commits on Feb 26, 2025

  1. Fix #835 and #837 (#836)

    apocelipes authored Feb 26, 2025
    Copy the full SHA
    63f3701 View commit details
Showing with 109 additions and 76 deletions.
  1. +29 −0 example_test.go
  2. +79 −75 executor.go
  3. +1 −1 scheduler.go
29 changes: 29 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
@@ -636,6 +636,35 @@ func ExampleWithContext() {
)
}

var _ gocron.Cron = (*customCron)(nil)

type customCron struct{}

func (c customCron) IsValid(crontab string, location *time.Location, now time.Time) error {
return nil
}

func (c customCron) Next(lastRun time.Time) time.Time {
return time.Now().Add(time.Second)
}

func ExampleWithCronImplementation() {
s, _ := gocron.NewScheduler()
defer func() { _ = s.Shutdown() }()
_, _ = s.NewJob(
gocron.CronJob(
"* * * * *",
false,
),
gocron.NewTask(
func() {},
),
gocron.WithCronImplementation(
&customCron{},
),
)
}

func ExampleWithDisabledDistributedJobLocker() {
// var _ gocron.Locker = (*myLocker)(nil)
//
154 changes: 79 additions & 75 deletions executor.go
Original file line number Diff line number Diff line change
@@ -36,6 +36,8 @@ type executor struct {

// used by the executor to receive a stop signal from the scheduler
stopCh chan struct{}
// ensure that stop runs before the next call to start and only runs once
stopOnce *sync.Once
// the timeout value when stopping
stopTimeout time.Duration
// used to signal that the executor has completed shutdown
@@ -88,6 +90,7 @@ func (e *executor) start() {
// any other uses within the executor should create a context
// using the executor context as parent.
e.ctx, e.cancel = context.WithCancel(context.Background())
e.stopOnce = &sync.Once{}

// the standardJobsWg tracks
standardJobsWg := &waitGroupWithMutex{}
@@ -131,7 +134,7 @@ func (e *executor) start() {

// spin off into a goroutine to unblock the executor and
// allow for processing for more work
go func() {
go func(executorCtx context.Context) {
// make sure to cancel the above context per the docs
// // Canceling this context releases resources associated with it, so code should
// // call cancel as soon as the operations running in this Context complete.
@@ -211,8 +214,7 @@ func (e *executor) start() {
}
} else {
select {
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
case <-executorCtx.Done():
return
default:
}
@@ -228,7 +230,7 @@ func (e *executor) start() {
}(*j)
}
}
}()
}(e.ctx)
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
return
@@ -473,86 +475,88 @@ func (e *executor) incrementJobCounter(j internalJob, status JobStatus) {
}

func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) {
e.logger.Debug("gocron: stopping executor")
// we've been asked to stop. This is either because the scheduler has been told
// to stop all jobs or the scheduler has been asked to completely shutdown.
//
// cancel tells all the functions to stop their work and send in a done response
e.cancel()

// the wait for job channels are used to report back whether we successfully waited
// for all jobs to complete or if we hit the configured timeout.
waitForJobs := make(chan struct{}, 1)
waitForSingletons := make(chan struct{}, 1)
waitForLimitMode := make(chan struct{}, 1)

// the waiter context is used to cancel the functions waiting on jobs.
// this is done to avoid goroutine leaks.
waiterCtx, waiterCancel := context.WithCancel(context.Background())

// wait for standard jobs to complete
go func() {
e.logger.Debug("gocron: waiting for standard jobs to complete")
e.stopOnce.Do(func() {
e.logger.Debug("gocron: stopping executor")
// we've been asked to stop. This is either because the scheduler has been told
// to stop all jobs or the scheduler has been asked to completely shutdown.
//
// cancel tells all the functions to stop their work and send in a done response
e.cancel()

// the wait for job channels are used to report back whether we successfully waited
// for all jobs to complete or if we hit the configured timeout.
waitForJobs := make(chan struct{}, 1)
waitForSingletons := make(chan struct{}, 1)
waitForLimitMode := make(chan struct{}, 1)

// the waiter context is used to cancel the functions waiting on jobs.
// this is done to avoid goroutine leaks.
waiterCtx, waiterCancel := context.WithCancel(context.Background())

// wait for standard jobs to complete
go func() {
// this is done in a separate goroutine, so we aren't
// blocked by the WaitGroup's Wait call in the event
// that the waiter context is cancelled.
// This particular goroutine could leak in the event that
// some long-running standard job doesn't complete.
standardJobsWg.Wait()
e.logger.Debug("gocron: standard jobs completed")
waitForJobs <- struct{}{}
e.logger.Debug("gocron: waiting for standard jobs to complete")
go func() {
// this is done in a separate goroutine, so we aren't
// blocked by the WaitGroup's Wait call in the event
// that the waiter context is cancelled.
// This particular goroutine could leak in the event that
// some long-running standard job doesn't complete.
standardJobsWg.Wait()
e.logger.Debug("gocron: standard jobs completed")
waitForJobs <- struct{}{}
}()
<-waiterCtx.Done()
}()
<-waiterCtx.Done()
}()

// wait for per job singleton limit mode runner jobs to complete
go func() {
e.logger.Debug("gocron: waiting for singleton jobs to complete")
// wait for per job singleton limit mode runner jobs to complete
go func() {
singletonJobsWg.Wait()
e.logger.Debug("gocron: singleton jobs completed")
waitForSingletons <- struct{}{}
e.logger.Debug("gocron: waiting for singleton jobs to complete")
go func() {
singletonJobsWg.Wait()
e.logger.Debug("gocron: singleton jobs completed")
waitForSingletons <- struct{}{}
}()
<-waiterCtx.Done()
}()
<-waiterCtx.Done()
}()

// wait for limit mode runners to complete
go func() {
e.logger.Debug("gocron: waiting for limit mode jobs to complete")
// wait for limit mode runners to complete
go func() {
limitModeJobsWg.Wait()
e.logger.Debug("gocron: limitMode jobs completed")
waitForLimitMode <- struct{}{}
e.logger.Debug("gocron: waiting for limit mode jobs to complete")
go func() {
limitModeJobsWg.Wait()
e.logger.Debug("gocron: limitMode jobs completed")
waitForLimitMode <- struct{}{}
}()
<-waiterCtx.Done()
}()
<-waiterCtx.Done()
}()

// now either wait for all the jobs to complete,
// or hit the timeout.
var count int
timeout := time.Now().Add(e.stopTimeout)
for time.Now().Before(timeout) && count < 3 {
select {
case <-waitForJobs:
count++
case <-waitForSingletons:
count++
case <-waitForLimitMode:
count++
default:
// now either wait for all the jobs to complete,
// or hit the timeout.
var count int
timeout := time.Now().Add(e.stopTimeout)
for time.Now().Before(timeout) && count < 3 {
select {
case <-waitForJobs:
count++
case <-waitForSingletons:
count++
case <-waitForLimitMode:
count++
default:
}
}
}
if count < 3 {
e.done <- ErrStopJobsTimedOut
e.logger.Debug("gocron: executor stopped - timed out")
} else {
e.done <- nil
e.logger.Debug("gocron: executor stopped")
}
waiterCancel()
if count < 3 {
e.done <- ErrStopJobsTimedOut
e.logger.Debug("gocron: executor stopped - timed out")
} else {
e.done <- nil
e.logger.Debug("gocron: executor stopped")
}
waiterCancel()

if e.limitMode != nil {
e.limitMode.started = false
}
if e.limitMode != nil {
e.limitMode.started = false
}
})
}
2 changes: 1 addition & 1 deletion scheduler.go
Original file line number Diff line number Diff line change
@@ -141,7 +141,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
jobUpdateNextRuns: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error),
done: make(chan error, 1),
}

s := &scheduler{