Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

after job listener #734

Merged
merged 2 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gocron_test

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -10,6 +12,14 @@ import (
"github.com/jonboulle/clockwork"
)

var _ Locker = new(errorLocker)

type errorLocker struct{}

func (e errorLocker) Lock(_ context.Context, _ string) (Lock, error) {
return nil, errors.New("locked")
}

func ExampleAfterJobRuns() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()
Expand Down Expand Up @@ -52,6 +62,28 @@ func ExampleAfterJobRunsWithError() {
)
}

func ExampleAfterLockError() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()

_, _ = s.NewJob(
DurationJob(
time.Second,
),
NewTask(
func() {},
),
WithDistributedJobLocker(&errorLocker{}),
WithEventListeners(
AfterLockError(
func(jobID uuid.UUID, jobName string, err error) {
// do something immediately before the job is run
},
),
),
)
}

func ExampleBeforeJobRuns() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()
Expand Down
2 changes: 2 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
} else if j.locker != nil {
lock, err := j.locker.Lock(j.ctx, j.name)
if err != nil {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
return
Expand All @@ -351,6 +352,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
return
Expand Down
13 changes: 13 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type internalJob struct {
afterJobRuns func(jobID uuid.UUID, jobName string)
beforeJobRuns func(jobID uuid.UUID, jobName string)
afterJobRunsWithError func(jobID uuid.UUID, jobName string, err error)
afterLockError func(jobID uuid.UUID, jobName string, err error)

locker Locker
}
Expand Down Expand Up @@ -639,6 +640,18 @@ func BeforeJobRuns(eventListenerFunc func(jobID uuid.UUID, jobName string)) Even
}
}

// AfterLockError is used to when the distributed locker returns an error and
// then run the provided function.
func AfterLockError(eventListenerFunc func(jobID uuid.UUID, jobName string, err error)) EventListener {
return func(j *internalJob) error {
if eventListenerFunc == nil {
return ErrEventListenerFuncNil
}
j.afterLockError = eventListenerFunc
return nil
}
}

// -----------------------------------------------
// -----------------------------------------------
// ---------------- Job Schedules ----------------
Expand Down
11 changes: 11 additions & 0 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,12 +437,20 @@ func TestWithEventListeners(t *testing.T) {
},
nil,
},
{
"afterLockError",
[]EventListener{
AfterLockError(func(_ uuid.UUID, _ string, _ error) {}),
},
nil,
},
{
"multiple event listeners",
[]EventListener{
AfterJobRuns(func(_ uuid.UUID, _ string) {}),
AfterJobRunsWithError(func(_ uuid.UUID, _ string, _ error) {}),
BeforeJobRuns(func(_ uuid.UUID, _ string) {}),
AfterLockError(func(_ uuid.UUID, _ string, _ error) {}),
},
nil,
},
Expand Down Expand Up @@ -488,6 +496,9 @@ func TestWithEventListeners(t *testing.T) {
if ij.beforeJobRuns != nil {
count++
}
if ij.afterLockError != nil {
count++
}
assert.Equal(t, len(tt.eventListeners), count)
})
}
Expand Down
69 changes: 69 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gocron

import (
"context"
"errors"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -49,6 +50,14 @@ func newTestScheduler(t *testing.T, options ...SchedulerOption) Scheduler {
return s
}

var _ Locker = new(errorLocker)

type errorLocker struct{}

func (e errorLocker) Lock(_ context.Context, _ string) (Lock, error) {
return nil, errors.New("locked")
}

func TestScheduler_OneSecond_NoOptions(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
cronNoOptionsCh := make(chan struct{}, 10)
Expand Down Expand Up @@ -1631,6 +1640,66 @@ func TestScheduler_WithEventListeners(t *testing.T) {
}
}

func TestScheduler_WithLocker_WithEventListeners(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

listenerRunCh := make(chan error, 1)
tests := []struct {
name string
locker Locker
tsk Task
el EventListener
expectRun bool
expectErr error
}{
{
"AfterLockError",
errorLocker{},
NewTask(func() {}),
AfterLockError(func(_ uuid.UUID, _ string, err error) {
listenerRunCh <- nil
}),
true,
nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t)
_, err := s.NewJob(
DurationJob(time.Minute*10),
tt.tsk,
WithStartAt(
WithStartImmediately(),
),
WithDistributedJobLocker(tt.locker),
WithEventListeners(tt.el),
WithLimitedRuns(1),
)
require.NoError(t, err)

s.Start()
if tt.expectRun {
select {
case err = <-listenerRunCh:
assert.ErrorIs(t, err, tt.expectErr)
case <-time.After(time.Second):
t.Fatal("timed out waiting for listener to run")
}
} else {
select {
case <-listenerRunCh:
t.Fatal("listener ran when it shouldn't have")
case <-time.After(time.Millisecond * 100):
}
}

require.NoError(t, s.Shutdown())
})
}
}

func TestScheduler_ManyJobs(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

Expand Down