Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pause job execution on the scheduler #518

Merged
merged 3 commits into from
Jun 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,16 @@ func ExampleScheduler_NextRun() {
// 10:30
}

func ExampleScheduler_PauseJobExecution() {
s := gocron.NewScheduler(time.UTC)
_, _ = s.Every("1s").Do(task)
s.StartAsync()
s.PauseJobExecution(true)
// jobs don't run
s.PauseJobExecution(false)
// jobs run again
}

func ExampleScheduler_RegisterEventListeners() {
s := gocron.NewScheduler(time.UTC)

Expand Down
16 changes: 9 additions & 7 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ const (
)

type executor struct {
jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler
ctx context.Context // used to tell the executor to stop
cancel context.CancelFunc // used to tell the executor to stop
wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop
jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish
singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete
jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler
ctx context.Context // used to tell the executor to stop
cancel context.CancelFunc // used to tell the executor to stop
wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop
jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish
singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete
skipExecution *atomic.Bool // used to pause the execution of jobs

limitMode limitMode // when SetMaxConcurrentJobs() is set upon the scheduler
limitModeMaxRunningJobs int // stores the maximum number of concurrently running jobs
Expand Down Expand Up @@ -134,6 +135,7 @@ func (e *executor) start() {
e.jobsWg = &sync.WaitGroup{}

e.stopped = atomic.NewBool(false)
e.skipExecution = atomic.NewBool(false)

e.limitModeQueueMu.Lock()
e.limitModeQueue = make(chan jobFunction, 1000)
Expand Down Expand Up @@ -187,7 +189,7 @@ func (e *executor) run() {
for {
select {
case f := <-e.jobFunctions:
if e.stopped.Load() {
if e.stopped.Load() || e.skipExecution.Load() {
continue
}

Expand Down
4 changes: 4 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,3 +1441,7 @@ func (s *Scheduler) RegisterEventListeners(eventListeners ...EventListener) {
job.RegisterEventListeners(eventListeners...)
}
}

func (s *Scheduler) PauseJobExecution(shouldPause bool) {
s.executor.skipExecution.Store(shouldPause)
}
25 changes: 25 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2707,3 +2707,28 @@ func TestScheduler_WithDistributedLocker_With_Name(t *testing.T) {
})
}
}

func TestScheduler_PauseJobExecution(t *testing.T) {
s := NewScheduler(time.UTC)
var counter int
var mu sync.Mutex

_, err := s.Every("100ms").Do(func() {
mu.Lock()
counter++
mu.Unlock()
})
require.NoError(t, err)

s.StartAsync()
time.Sleep(50 * time.Millisecond)

s.PauseJobExecution(true)
time.Sleep(200 * time.Millisecond)

s.PauseJobExecution(false)
time.Sleep(100 * time.Millisecond)

JohnRoesler marked this conversation as resolved.
Show resolved Hide resolved
assert.GreaterOrEqual(t, counter, 1)
assert.LessOrEqual(t, counter, 2)
}