Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add new RegisterEventListeners using new EventListener type #517

Merged
merged 3 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 61 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,33 @@ func ExampleJob_PreviousRun() {
fmt.Println("Previous run:", job.PreviousRun())
}

func ExampleJob_RegisterEventListeners() {
s := gocron.NewScheduler(time.UTC)

job, _ := s.Every("1s").Name("my_func").Do(func() error { return fmt.Errorf("error") })
job.RegisterEventListeners(
gocron.AfterJobRuns(func(jobName string) {
fmt.Printf("afterJobRuns: %s\n", jobName)
}),
gocron.BeforeJobRuns(func(jobName string) {
fmt.Printf("beforeJobRuns: %s\n", jobName)
}),
gocron.WhenJobReturnsError(func(jobName string, err error) {
fmt.Printf("whenJobReturnsError: %s, %v\n", jobName, err)
}),
gocron.WhenJobReturnsNoError(func(jobName string) {
fmt.Printf("whenJobReturnsNoError: %s\n", jobName)
}),
)
s.StartAsync()
time.Sleep(100 * time.Millisecond)
s.Stop()
// Output:
// beforeJobRuns: my_func
// whenJobReturnsError: my_func, error
// afterJobRuns: my_func
}

func ExampleJob_RunCount() {
s := gocron.NewScheduler(time.UTC)
job, _ := s.Every(1).Second().Do(task)
Expand Down Expand Up @@ -596,6 +623,40 @@ func ExampleScheduler_NextRun() {
// 10:30
}

func ExampleScheduler_RegisterEventListeners() {
s := gocron.NewScheduler(time.UTC)

s.Every("1s").Name("my_func_1").Do(func() error { return fmt.Errorf("error_1") })
s.Every("1s").Name("my_func_2").
StartAt(time.Now().UTC().Add(50 * time.Millisecond)).
Do(func() error { return fmt.Errorf("error_2") })

s.RegisterEventListeners(
gocron.AfterJobRuns(func(jobName string) {
fmt.Printf("afterJobRuns: %s\n", jobName)
}),
gocron.BeforeJobRuns(func(jobName string) {
fmt.Printf("beforeJobRuns: %s\n", jobName)
}),
gocron.WhenJobReturnsError(func(jobName string, err error) {
fmt.Printf("whenJobReturnsError: %s, %v\n", jobName, err)
}),
gocron.WhenJobReturnsNoError(func(jobName string) {
fmt.Printf("whenJobReturnsNoError: %s\n", jobName)
}),
)
s.StartAsync()
time.Sleep(120 * time.Millisecond)
s.Stop()
// Output:
// beforeJobRuns: my_func_1
// whenJobReturnsError: my_func_1, error_1
// afterJobRuns: my_func_1
// beforeJobRuns: my_func_2
// whenJobReturnsError: my_func_2, error_2
// afterJobRuns: my_func_2
}

func ExampleScheduler_Remove() {
s := gocron.NewScheduler(time.UTC)
_, _ = s.Every(1).Week().Do(task)
Expand Down
9 changes: 8 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ func runJob(f jobFunction) {
f.runStartCount.Add(1)
f.isRunning.Store(true)
callJobFunc(f.eventListeners.onBeforeJobExecution)
callJobFuncWithParams(f.function, f.parameters)
_ = callJobFuncWithParams(f.eventListeners.beforeJobRuns, []interface{}{f.getName()})
err := callJobFuncWithParams(f.function, f.parameters)
if err != nil {
_ = callJobFuncWithParams(f.eventListeners.onError, []interface{}{f.getName(), err})
} else {
_ = callJobFuncWithParams(f.eventListeners.noError, []interface{}{f.getName()})
}
_ = callJobFuncWithParams(f.eventListeners.afterJobRuns, []interface{}{f.getName()})
callJobFunc(f.eventListeners.onAfterJobExecution)
f.isRunning.Store(false)
f.runFinishCount.Add(1)
Expand Down
27 changes: 22 additions & 5 deletions gocron.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,38 @@ const (
)

func callJobFunc(jobFunc interface{}) {
if jobFunc != nil {
reflect.ValueOf(jobFunc).Call([]reflect.Value{})
if jobFunc == nil {
return
}
f := reflect.ValueOf(jobFunc)
if !f.IsZero() {
f.Call([]reflect.Value{})
}
}

func callJobFuncWithParams(jobFunc interface{}, params []interface{}) {
func callJobFuncWithParams(jobFunc interface{}, params []interface{}) error {
if jobFunc == nil {
return nil
}
f := reflect.ValueOf(jobFunc)
if f.IsZero() {
return nil
}
if len(params) != f.Type().NumIn() {
return
return nil
}
in := make([]reflect.Value, len(params))
for k, param := range params {
in[k] = reflect.ValueOf(param)
}
f.Call(in)
vals := f.Call(in)
for _, val := range vals {
i := val.Interface()
if err, ok := i.(error); ok {
return err
}
}
return nil
}

func getFunctionName(fn interface{}) string {
Expand Down
67 changes: 64 additions & 3 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ type jobFunction struct {
}

type eventListeners struct {
onBeforeJobExecution interface{} // performs before job executing
onAfterJobExecution interface{} // performs after job executing
onAfterJobExecution interface{} // deprecated
onBeforeJobExecution interface{} // deprecated
beforeJobRuns func(jobName string) // called before the job executes
afterJobRuns func(jobName string) // called after the job executes
onError func(jobName string, err error) // called when the job returns an error
noError func(jobName string) // called when no error is returned
}

type jobMutex struct {
Expand Down Expand Up @@ -98,6 +102,13 @@ func (jf *jobFunction) copy() jobFunction {
return cp
}

func (jf *jobFunction) getName() string {
if jf.jobName != "" {
return jf.jobName
}
return jf.funcName
}

type runConfig struct {
finiteRuns bool
maxRuns int
Expand Down Expand Up @@ -344,7 +355,57 @@ func (j *Job) Tags() []string {
return j.tags
}

// SetEventListeners accepts two functions that will be called, one before and one after the job is run
// EventListener functions utilize the job's name and are triggered
// by or in the condition that the name suggests
type EventListener func(j *Job)

// BeforeJobRuns is called before the job is run
func BeforeJobRuns(eventListenerFunc func(jobName string)) EventListener {
return func(j *Job) {
j.mu.Lock()
defer j.mu.Unlock()
j.eventListeners.beforeJobRuns = eventListenerFunc
}
}

// AfterJobRuns is called after the job is run
// This is called even when an error is returned
func AfterJobRuns(eventListenerFunc func(jobName string)) EventListener {
return func(j *Job) {
j.mu.Lock()
defer j.mu.Unlock()
j.eventListeners.afterJobRuns = eventListenerFunc
}
}

// WhenJobReturnsError is called when the job returns an error
func WhenJobReturnsError(eventListenerFunc func(jobName string, err error)) EventListener {
return func(j *Job) {
j.mu.Lock()
defer j.mu.Unlock()
j.eventListeners.onError = eventListenerFunc
}
}

// WhenJobReturnsNoError is called when the job does not return an error
// the function must accept a single parameter, which is an error
func WhenJobReturnsNoError(eventListenerFunc func(jobName string)) EventListener {
return func(j *Job) {
j.mu.Lock()
defer j.mu.Unlock()
j.eventListeners.noError = eventListenerFunc
}
}

// RegisterEventListeners accepts EventListeners and registers them for the job
// The event listeners are then called at the times described by each listener.
func (j *Job) RegisterEventListeners(eventListeners ...EventListener) {
for _, el := range eventListeners {
el(j)
}
}

// Deprecated: SetEventListeners accepts two functions that will be called, one before and one after the job is run
func (j *Job) SetEventListeners(onBeforeJobExecution interface{}, onAfterJobExecution interface{}) {
j.eventListeners = eventListeners{
onBeforeJobExecution: onBeforeJobExecution,
Expand Down
42 changes: 39 additions & 3 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gocron
import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -234,23 +235,54 @@ func TestJob_CommonExports(t *testing.T) {
func TestJob_SetEventListeners(t *testing.T) {
t.Run("run event listeners callbacks for a job", func(t *testing.T) {
var (
jobRanPassed = false
beforeCallbackPassed = false
afterCallbackPassed = false
jobRanPassed bool
beforeCallbackPassed bool
afterCallbackPassed bool
beforeJobCallback bool
afterJobCallback bool
onErrorCallback bool
noErrorCallback bool
wg = &sync.WaitGroup{}
)
wg.Add(1)
s := NewScheduler(time.UTC)
job, err := s.Tag("tag1").Every("100ms").Do(func() {
jobRanPassed = true
})
require.NoError(t, err)
job.SetEventListeners(func() {
beforeCallbackPassed = true
}, func() {
defer wg.Done()
afterCallbackPassed = true
})

job2, err := s.Every("100ms").Do(func() error { return fmt.Errorf("failed") })
require.NoError(t, err)
wg.Add(1)
job2.RegisterEventListeners(
AfterJobRuns(func(_ string) {
afterJobCallback = true
wg.Done()
}),
BeforeJobRuns(func(_ string) {
beforeJobCallback = true
}),
WhenJobReturnsError(func(_ string, _ error) {
onErrorCallback = true
}),
)

job3, err := s.Every("100ms").Do(func() {})
require.NoError(t, err)
wg.Add(1)
job3.RegisterEventListeners(
WhenJobReturnsNoError(func(_ string) {
noErrorCallback = true
wg.Done()
}),
)

s.StartAsync()
wg.Wait()
s.Stop()
Expand All @@ -259,6 +291,10 @@ func TestJob_SetEventListeners(t *testing.T) {
assert.True(t, jobRanPassed)
assert.True(t, beforeCallbackPassed)
assert.True(t, afterCallbackPassed)
assert.True(t, beforeJobCallback)
assert.True(t, afterJobCallback)
assert.True(t, onErrorCallback)
assert.True(t, noErrorCallback)
})
}

Expand Down
14 changes: 11 additions & 3 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,6 @@ func (s *Scheduler) runContinuous(job *Job) {
if !job.getStartsImmediately() {
job.setStartsImmediately(true)
} else {
//if job.neverRan() {
// job.setLastRun(s.now())
//}
s.run(job)
}
nr := next.dateTime.Sub(s.now())
Expand Down Expand Up @@ -1433,3 +1430,14 @@ func (s *Scheduler) StopBlockingChan() {
func (s *Scheduler) WithDistributedLocker(l Locker) {
s.executor.distributedLocker = l
}

// RegisterEventListeners accepts EventListeners and registers them for all jobs
// in the scheduler at the time this function is called.
// The event listeners are then called at the times described by each listener.
// If a new job is added, an additional call to this method, or the job specific
// version must be executed in order for the new job to trigger event listeners.
func (s *Scheduler) RegisterEventListeners(eventListeners ...EventListener) {
for _, job := range s.Jobs() {
job.RegisterEventListeners(eventListeners...)
}
}