Skip to content

Commit

Permalink
fix race occurrence with single queue in executor (#508)
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler committed Jun 2, 2023
1 parent 2c3b43d commit 7390192
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 0 deletions.
4 changes: 4 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func (jf *jobFunction) singletonRunner() {
case <-jf.ctx.Done():
jf.singletonWg.Done()
jf.singletonRunnerOn.Store(false)
jf.singletonQueueMu.Lock()
jf.singletonQueue = make(chan struct{}, 1000)
jf.singletonQueueMu.Unlock()
jf.stopped.Store(false)
return
case <-jf.singletonQueue:
Expand Down Expand Up @@ -166,7 +168,9 @@ func (e *executor) runJob(f jobFunction) {
if !f.singletonRunnerOn.Load() {
go f.singletonRunner()
}
f.singletonQueueMu.Lock()
f.singletonQueue <- struct{}{}
f.singletonQueueMu.Unlock()
}
}

Expand Down
5 changes: 5 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type jobFunction struct {
jobName string // key of the distributed lock
funcName string // the name of the function - e.g. main.func1
runConfig runConfig // configuration for how many times to run the job
singletonQueueMu *sync.Mutex // mutex for singletonQueue
singletonQueue chan struct{} // queues jobs for the singleton runner to handle
singletonRunnerOn *atomic.Bool // whether the runner function for singleton is running
ctx context.Context // for cancellation
Expand Down Expand Up @@ -80,6 +81,7 @@ func (jf *jobFunction) copy() jobFunction {
jobName: jf.jobName,
runConfig: jf.runConfig,
singletonQueue: jf.singletonQueue,
singletonQueueMu: jf.singletonQueueMu,
ctx: jf.ctx,
cancel: jf.cancel,
isRunning: jf.isRunning,
Expand Down Expand Up @@ -421,6 +423,9 @@ func (j *Job) SingletonMode() {
defer j.mu.Unlock()
j.runConfig.mode = singletonMode
j.jobFunction.singletonWg = &sync.WaitGroup{}
j.singletonQueueMu = &sync.Mutex{}
j.singletonQueueMu.Lock()
defer j.singletonQueueMu.Unlock()
j.jobFunction.singletonQueue = make(chan struct{}, 100)
}

Expand Down

0 comments on commit 7390192

Please sign in to comment.