From 615ecde5ad724d1b9ab173849ccd67a9d75a865f Mon Sep 17 00:00:00 2001 From: Rodrigo Broggi Date: Fri, 21 Jun 2024 12:53:15 +0200 Subject: [PATCH 1/2] issue-740: `AtTimes` job type --- errors.go | 1 + example_test.go | 19 +++++ job.go | 70 ++++++++++++++++++ scheduler_test.go | 184 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 274 insertions(+) diff --git a/errors.go b/errors.go index 53df01b1..be2b6e3f 100644 --- a/errors.go +++ b/errors.go @@ -25,6 +25,7 @@ var ( ErrNewJobWrongNumberOfParameters = fmt.Errorf("gocron: NewJob: Number of provided parameters does not match expected") ErrNewJobWrongTypeOfParameters = fmt.Errorf("gocron: NewJob: Type of provided parameters does not match expected") ErrOneTimeJobStartDateTimePast = fmt.Errorf("gocron: OneTimeJob: start must not be in the past") + ErrAtTimesJobAtLeastOneInFuture = fmt.Errorf("gocron: AtTimesJob: at least one point in time must be in the future") ErrStopExecutorTimedOut = fmt.Errorf("gocron: timed out waiting for executor to stop") ErrStopJobsTimedOut = fmt.Errorf("gocron: timed out waiting for jobs to finish") ErrStopSchedulerTimedOut = fmt.Errorf("gocron: timed out waiting for scheduler to stop") diff --git a/example_test.go b/example_test.go index a4baa8f2..2607232c 100644 --- a/example_test.go +++ b/example_test.go @@ -358,6 +358,25 @@ func ExampleOneTimeJob() { s.Start() } +func ExampleAtTimesJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + // run job in 10 seconds and in 55 minutes from now + n := time.Now() + _, _ = s.NewJob( + AtTimesJob( + n.Add(10*time.Second), + n.Add(55*time.Minute), + ), + NewTask( + func() {}, + ), + ) + + s.Start() +} + func ExampleScheduler_jobs() { s, _ := NewScheduler() defer func() { _ = s.Shutdown() }() diff --git a/job.go b/job.go index 1f45bf9f..3547049c 100644 --- a/job.go +++ b/job.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math/rand" + "sort" "strings" "time" @@ -486,6 +487,46 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition { } } +var _JobDefinition = (*atTimesJobDefinition)(nil) + +type atTimesJobDefinition struct { + atTimes []time.Time +} + +func (a atTimesJobDefinition) setup(j *internalJob, _ *time.Location, now time.Time) error { + sortedTimes := a.atTimes + sort.Slice(a.atTimes, func(i, j int) bool { + return a.atTimes[i].Before(a.atTimes[j]) + }) + // keep only schedules that are in the future + idx, found := slices.BinarySearchFunc(sortedTimes, now, timeCmp()) + if found { + idx++ + } + sortedTimes = sortedTimes[idx:] + if len(sortedTimes) == 0 { + return ErrAtTimesJobAtLeastOneInFuture + } + j.jobSchedule = atTimesJob{sortedTimes: sortedTimes} + return nil +} + +func timeCmp() func(element time.Time, target time.Time) int { + return func(element time.Time, target time.Time) int { + if element.Equal(target) { + return 0 + } + if element.Before(target) { + return -1 + } + return 1 + } +} + +func AtTimesJob(atTimes ...time.Time) JobDefinition { + return atTimesJobDefinition{atTimes: atTimes} +} + // ----------------------------------------------- // ----------------------------------------------- // ----------------- Job Options ----------------- @@ -882,6 +923,35 @@ func (o oneTimeJob) next(_ time.Time) time.Time { return time.Time{} } +type atTimesJob struct { + sortedTimes []time.Time +} + +// next finds the next item in a sorted list of times using binary-search. +// +// example: sortedTimes: [2, 4, 6, 8] +// +// lastRun: 1 => [idx=0,found=false] => next is 2 - sorted[idx] idx=0 +// lastRun: 2 => [idx=0,found=true] => next is 4 - sorted[idx+1] idx=1 +// lastRun: 3 => [idx=1,found=false] => next is 4 - sorted[idx] idx=1 +// lastRun: 4 => [idx=1,found=true] => next is 6 - sorted[idx+1] idx=2 +// lastRun: 7 => [idx=3,found=false] => next is 8 - sorted[idx] idx=3 +// lastRun: 8 => [idx=3,found=found] => next is none +// lastRun: 9 => [idx=3,found=found] => next is none +func (a atTimesJob) next(lastRun time.Time) time.Time { + idx, found := slices.BinarySearchFunc(a.sortedTimes, lastRun, timeCmp()) + // if found, the next run is the following index + if found { + idx++ + } + // exhausted runs + if idx >= len(a.sortedTimes) { + return time.Time{} + } + + return a.sortedTimes[idx] +} + // ----------------------------------------------- // ----------------------------------------------- // ---------------- Job Interface ---------------- diff --git a/scheduler_test.go b/scheduler_test.go index a59b59bd..58565c8a 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -6,10 +6,12 @@ import ( "fmt" "os" "sync" + "sync/atomic" "testing" "time" "github.com/google/uuid" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -2007,6 +2009,188 @@ func TestScheduler_OneTimeJob(t *testing.T) { } } +func TestScheduler_AtTimesJob(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + n := time.Now().UTC() + + tests := []struct { + name string + atTimes []time.Time + fakeClock clockwork.FakeClock + assertErr require.ErrorAssertionFunc + // asserts things about schedules, advance time and perform new assertions + advanceAndAsserts []func( + t *testing.T, + j Job, + clock clockwork.FakeClock, + runs *atomic.Uint32, + ) + }{ + { + name: "no at times", + atTimes: []time.Time{}, + fakeClock: clockwork.NewFakeClock(), + assertErr: func(t require.TestingT, err error, i ...interface{}) { + require.ErrorIs(t, err, ErrAtTimesJobAtLeastOneInFuture) + }, + }, + { + name: "all in the past", + atTimes: []time.Time{n.Add(-1 * time.Second)}, + fakeClock: clockwork.NewFakeClockAt(n), + assertErr: func(t require.TestingT, err error, i ...interface{}) { + require.ErrorIs(t, err, ErrAtTimesJobAtLeastOneInFuture) + }, + }, + { + name: "one run 1 millisecond in the future", + atTimes: []time.Time{n.Add(1 * time.Millisecond)}, + fakeClock: clockwork.NewFakeClockAt(n), + advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){ + func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) { + require.Equal(t, uint32(0), runs.Load()) + + // last not initialized + lastRunAt, err := j.LastRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, lastRunAt) + + // next is now + nextRunAt, err := j.NextRun() + require.NoError(t, err) + require.Equal(t, n.Add(1*time.Millisecond), nextRunAt) + + // advance and eventually run + clock.Advance(2 * time.Millisecond) + require.Eventually(t, func() bool { + return assert.Equal(t, uint32(1), runs.Load()) + }, 3*time.Second, 100*time.Millisecond) + + // last was run + lastRunAt, err = j.LastRun() + require.NoError(t, err) + require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond) + + nextRunAt, err = j.NextRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, nextRunAt) + }, + }, + }, + { + name: "one run in the past and one in the future", + atTimes: []time.Time{n.Add(-1 * time.Millisecond), n.Add(1 * time.Millisecond)}, + fakeClock: clockwork.NewFakeClockAt(n), + advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){ + func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) { + require.Equal(t, uint32(0), runs.Load()) + + // last not initialized + lastRunAt, err := j.LastRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, lastRunAt) + + // next is now + nextRunAt, err := j.NextRun() + require.NoError(t, err) + require.Equal(t, n.Add(1*time.Millisecond), nextRunAt) + + // advance and eventually run + clock.Advance(2 * time.Millisecond) + require.Eventually(t, func() bool { + return assert.Equal(t, uint32(1), runs.Load()) + }, 3*time.Second, 100*time.Millisecond) + + // last was run + lastRunAt, err = j.LastRun() + require.NoError(t, err) + require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond) + }, + }, + }, + { + name: "two runs in the future", + atTimes: []time.Time{n.Add(1 * time.Millisecond), n.Add(3 * time.Millisecond)}, + fakeClock: clockwork.NewFakeClockAt(n), + advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){ + func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) { + require.Equal(t, uint32(0), runs.Load()) + + // last not initialized + lastRunAt, err := j.LastRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, lastRunAt) + + // next is now + nextRunAt, err := j.NextRun() + require.NoError(t, err) + require.Equal(t, n.Add(1*time.Millisecond), nextRunAt) + + // advance and eventually run + clock.Advance(2 * time.Millisecond) + require.Eventually(t, func() bool { + return assert.Equal(t, uint32(1), runs.Load()) + }, 3*time.Second, 100*time.Millisecond) + + // last was run + lastRunAt, err = j.LastRun() + require.NoError(t, err) + require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond) + + nextRunAt, err = j.NextRun() + require.NoError(t, err) + require.Equal(t, n.Add(3*time.Millisecond), nextRunAt) + }, + + func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) { + // advance and eventually run + clock.Advance(2 * time.Millisecond) + require.Eventually(t, func() bool { + return assert.Equal(t, uint32(2), runs.Load()) + }, 3*time.Second, 100*time.Millisecond) + + // last was run + lastRunAt, err := j.LastRun() + require.NoError(t, err) + require.WithinDuration(t, n.Add(3*time.Millisecond), lastRunAt, 1*time.Millisecond) + + nextRunAt, err := j.NextRun() + require.NoError(t, err) + require.Equal(t, time.Time{}, nextRunAt) + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newTestScheduler(t, WithClock(tt.fakeClock)) + t.Cleanup(func() { + require.NoError(t, s.Shutdown()) + }) + + runs := atomic.Uint32{} + j, err := s.NewJob( + AtTimesJob(tt.atTimes...), + NewTask(func() { + runs.Add(1) + }), + ) + if tt.assertErr != nil { + tt.assertErr(t, err) + } else { + require.NoError(t, err) + s.Start() + + for _, advanceAndAssert := range tt.advanceAndAsserts { + advanceAndAssert(t, j, tt.fakeClock, &runs) + } + } + }) + } +} + func TestScheduler_WithLimitedRuns(t *testing.T) { defer verifyNoGoroutineLeaks(t) From 6c9084639b2f0e2e0fbaf66089ac823ec6d6f18e Mon Sep 17 00:00:00 2001 From: Rodrigo Broggi Date: Fri, 21 Jun 2024 16:48:04 +0200 Subject: [PATCH 2/2] issue-742: followup comments and reuse oneTimeJob --- errors.go | 1 - example_test.go | 22 +++++-------- job.go | 80 +++++++++++++++++------------------------------ scheduler_test.go | 6 ++-- 4 files changed, 39 insertions(+), 70 deletions(-) diff --git a/errors.go b/errors.go index be2b6e3f..53df01b1 100644 --- a/errors.go +++ b/errors.go @@ -25,7 +25,6 @@ var ( ErrNewJobWrongNumberOfParameters = fmt.Errorf("gocron: NewJob: Number of provided parameters does not match expected") ErrNewJobWrongTypeOfParameters = fmt.Errorf("gocron: NewJob: Type of provided parameters does not match expected") ErrOneTimeJobStartDateTimePast = fmt.Errorf("gocron: OneTimeJob: start must not be in the past") - ErrAtTimesJobAtLeastOneInFuture = fmt.Errorf("gocron: AtTimesJob: at least one point in time must be in the future") ErrStopExecutorTimedOut = fmt.Errorf("gocron: timed out waiting for executor to stop") ErrStopJobsTimedOut = fmt.Errorf("gocron: timed out waiting for jobs to finish") ErrStopSchedulerTimedOut = fmt.Errorf("gocron: timed out waiting for scheduler to stop") diff --git a/example_test.go b/example_test.go index 2607232c..478af1f2 100644 --- a/example_test.go +++ b/example_test.go @@ -354,24 +354,16 @@ func ExampleOneTimeJob() { func() {}, ), ) - - s.Start() -} - -func ExampleAtTimesJob() { - s, _ := NewScheduler() - defer func() { _ = s.Shutdown() }() - - // run job in 10 seconds and in 55 minutes from now + // run job twice - once in 10 seconds and once in 55 minutes n := time.Now() _, _ = s.NewJob( - AtTimesJob( - n.Add(10*time.Second), - n.Add(55*time.Minute), - ), - NewTask( - func() {}, + OneTimeJob( + OneTimeJobStartDateTimes( + n.Add(10*time.Second), + n.Add(55*time.Minute), + ), ), + NewTask(func() {}), ) s.Start() diff --git a/job.go b/job.go index 3547049c..7f1f2c40 100644 --- a/job.go +++ b/job.go @@ -447,35 +447,47 @@ type oneTimeJobDefinition struct { } func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location, now time.Time) error { - j.jobSchedule = oneTimeJob{} - if err := o.startAt(j); err != nil { - return err + sortedTimes := o.startAt(j) + sort.Slice(sortedTimes, func(i, j int) bool { + return sortedTimes[i].Before(sortedTimes[j]) + }) + // keep only schedules that are in the future + idx, found := slices.BinarySearchFunc(sortedTimes, now, timeCmp()) + if found { + idx++ } - // in case we are not in the `startImmediately` case, our start-date must be in - // the future according to the scheduler clock - if !j.startImmediately && (j.startTime.IsZero() || j.startTime.Before(now)) { + sortedTimes = sortedTimes[idx:] + if !j.startImmediately && len(sortedTimes) == 0 { return ErrOneTimeJobStartDateTimePast } + j.jobSchedule = oneTimeJob{sortedTimes: sortedTimes} return nil } // OneTimeJobStartAtOption defines when the one time job is run -type OneTimeJobStartAtOption func(*internalJob) error +type OneTimeJobStartAtOption func(*internalJob) []time.Time // OneTimeJobStartImmediately tells the scheduler to run the one time job immediately. func OneTimeJobStartImmediately() OneTimeJobStartAtOption { - return func(j *internalJob) error { + return func(j *internalJob) []time.Time { j.startImmediately = true - return nil + return []time.Time{} } } // OneTimeJobStartDateTime sets the date & time at which the job should run. // This datetime must be in the future (according to the scheduler clock). func OneTimeJobStartDateTime(start time.Time) OneTimeJobStartAtOption { - return func(j *internalJob) error { - j.startTime = start - return nil + return func(j *internalJob) []time.Time { + return []time.Time{start} + } +} + +// OneTimeJobStartDateTimes sets the date & times at which the job should run. +// At least one of the date/times must be in the future (according to the scheduler clock). +func OneTimeJobStartDateTimes(times ...time.Time) OneTimeJobStartAtOption { + return func(j *internalJob) []time.Time { + return times } } @@ -487,30 +499,6 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition { } } -var _JobDefinition = (*atTimesJobDefinition)(nil) - -type atTimesJobDefinition struct { - atTimes []time.Time -} - -func (a atTimesJobDefinition) setup(j *internalJob, _ *time.Location, now time.Time) error { - sortedTimes := a.atTimes - sort.Slice(a.atTimes, func(i, j int) bool { - return a.atTimes[i].Before(a.atTimes[j]) - }) - // keep only schedules that are in the future - idx, found := slices.BinarySearchFunc(sortedTimes, now, timeCmp()) - if found { - idx++ - } - sortedTimes = sortedTimes[idx:] - if len(sortedTimes) == 0 { - return ErrAtTimesJobAtLeastOneInFuture - } - j.jobSchedule = atTimesJob{sortedTimes: sortedTimes} - return nil -} - func timeCmp() func(element time.Time, target time.Time) int { return func(element time.Time, target time.Time) int { if element.Equal(target) { @@ -523,10 +511,6 @@ func timeCmp() func(element time.Time, target time.Time) int { } } -func AtTimesJob(atTimes ...time.Time) JobDefinition { - return atTimesJobDefinition{atTimes: atTimes} -} - // ----------------------------------------------- // ----------------------------------------------- // ----------------- Job Options ----------------- @@ -917,13 +901,7 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass var _ jobSchedule = (*oneTimeJob)(nil) -type oneTimeJob struct{} - -func (o oneTimeJob) next(_ time.Time) time.Time { - return time.Time{} -} - -type atTimesJob struct { +type oneTimeJob struct { sortedTimes []time.Time } @@ -938,18 +916,18 @@ type atTimesJob struct { // lastRun: 7 => [idx=3,found=false] => next is 8 - sorted[idx] idx=3 // lastRun: 8 => [idx=3,found=found] => next is none // lastRun: 9 => [idx=3,found=found] => next is none -func (a atTimesJob) next(lastRun time.Time) time.Time { - idx, found := slices.BinarySearchFunc(a.sortedTimes, lastRun, timeCmp()) +func (o oneTimeJob) next(lastRun time.Time) time.Time { + idx, found := slices.BinarySearchFunc(o.sortedTimes, lastRun, timeCmp()) // if found, the next run is the following index if found { idx++ } // exhausted runs - if idx >= len(a.sortedTimes) { + if idx >= len(o.sortedTimes) { return time.Time{} } - return a.sortedTimes[idx] + return o.sortedTimes[idx] } // ----------------------------------------------- diff --git a/scheduler_test.go b/scheduler_test.go index 61ec9fef..78044680 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -2112,7 +2112,7 @@ func TestScheduler_AtTimesJob(t *testing.T) { atTimes: []time.Time{}, fakeClock: clockwork.NewFakeClock(), assertErr: func(t require.TestingT, err error, i ...interface{}) { - require.ErrorIs(t, err, ErrAtTimesJobAtLeastOneInFuture) + require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast) }, }, { @@ -2120,7 +2120,7 @@ func TestScheduler_AtTimesJob(t *testing.T) { atTimes: []time.Time{n.Add(-1 * time.Second)}, fakeClock: clockwork.NewFakeClockAt(n), assertErr: func(t require.TestingT, err error, i ...interface{}) { - require.ErrorIs(t, err, ErrAtTimesJobAtLeastOneInFuture) + require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast) }, }, { @@ -2252,7 +2252,7 @@ func TestScheduler_AtTimesJob(t *testing.T) { runs := atomic.Uint32{} j, err := s.NewJob( - AtTimesJob(tt.atTimes...), + OneTimeJob(OneTimeJobStartDateTimes(tt.atTimes...)), NewTask(func() { runs.Add(1) }),