Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-740: expand oneTimeJob to support multiple times #741

Merged
merged 3 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,17 @@ func ExampleOneTimeJob() {
func() {},
),
)
// run job twice - once in 10 seconds and once in 55 minutes
n := time.Now()
_, _ = s.NewJob(
OneTimeJob(
OneTimeJobStartDateTimes(
n.Add(10*time.Second),
n.Add(55*time.Minute),
),
),
NewTask(func() {}),
)

s.Start()
}
Expand Down
78 changes: 63 additions & 15 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/rand"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -446,35 +447,47 @@ type oneTimeJobDefinition struct {
}

func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location, now time.Time) error {
j.jobSchedule = oneTimeJob{}
if err := o.startAt(j); err != nil {
return err
sortedTimes := o.startAt(j)
sort.Slice(sortedTimes, func(i, j int) bool {
return sortedTimes[i].Before(sortedTimes[j])
})
Comment on lines +451 to +453
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've used this in other places, Before vs. Compare. SortStable, which isn't super important here, just keeps the order if items are duplicates

slices.SortStableFunc(j.nextScheduled, func(a, b time.Time) int {
			return a.Compare(b)
		})

// keep only schedules that are in the future
idx, found := slices.BinarySearchFunc(sortedTimes, now, timeCmp())
if found {
idx++
}
// in case we are not in the `startImmediately` case, our start-date must be in
// the future according to the scheduler clock
if !j.startImmediately && (j.startTime.IsZero() || j.startTime.Before(now)) {
sortedTimes = sortedTimes[idx:]
if !j.startImmediately && len(sortedTimes) == 0 {
return ErrOneTimeJobStartDateTimePast
}
j.jobSchedule = oneTimeJob{sortedTimes: sortedTimes}
return nil
}

// OneTimeJobStartAtOption defines when the one time job is run
type OneTimeJobStartAtOption func(*internalJob) error
type OneTimeJobStartAtOption func(*internalJob) []time.Time
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to change the signature of this exported type - I tried to think about situations where this would not be considered a backwards-compatible change but I could not think of one.


// OneTimeJobStartImmediately tells the scheduler to run the one time job immediately.
func OneTimeJobStartImmediately() OneTimeJobStartAtOption {
return func(j *internalJob) error {
return func(j *internalJob) []time.Time {
j.startImmediately = true
return nil
return []time.Time{}
}
}

// OneTimeJobStartDateTime sets the date & time at which the job should run.
// This datetime must be in the future (according to the scheduler clock).
func OneTimeJobStartDateTime(start time.Time) OneTimeJobStartAtOption {
return func(j *internalJob) error {
j.startTime = start
return nil
return func(j *internalJob) []time.Time {
return []time.Time{start}
}
}

// OneTimeJobStartDateTimes sets the date & times at which the job should run.
// At least one of the date/times must be in the future (according to the scheduler clock).
func OneTimeJobStartDateTimes(times ...time.Time) OneTimeJobStartAtOption {
return func(j *internalJob) []time.Time {
return times
}
}

Expand All @@ -486,6 +499,18 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition {
}
}

func timeCmp() func(element time.Time, target time.Time) int {
return func(element time.Time, target time.Time) int {
if element.Equal(target) {
return 0
}
if element.Before(target) {
return -1
}
return 1
}
}

// -----------------------------------------------
// -----------------------------------------------
// ----------------- Job Options -----------------
Expand Down Expand Up @@ -876,10 +901,33 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass

var _ jobSchedule = (*oneTimeJob)(nil)

type oneTimeJob struct{}
type oneTimeJob struct {
sortedTimes []time.Time
}

func (o oneTimeJob) next(_ time.Time) time.Time {
return time.Time{}
// next finds the next item in a sorted list of times using binary-search.
//
// example: sortedTimes: [2, 4, 6, 8]
//
// lastRun: 1 => [idx=0,found=false] => next is 2 - sorted[idx] idx=0
// lastRun: 2 => [idx=0,found=true] => next is 4 - sorted[idx+1] idx=1
// lastRun: 3 => [idx=1,found=false] => next is 4 - sorted[idx] idx=1
// lastRun: 4 => [idx=1,found=true] => next is 6 - sorted[idx+1] idx=2
// lastRun: 7 => [idx=3,found=false] => next is 8 - sorted[idx] idx=3
// lastRun: 8 => [idx=3,found=found] => next is none
// lastRun: 9 => [idx=3,found=found] => next is none
func (o oneTimeJob) next(lastRun time.Time) time.Time {
idx, found := slices.BinarySearchFunc(o.sortedTimes, lastRun, timeCmp())
// if found, the next run is the following index
if found {
idx++
}
// exhausted runs
if idx >= len(o.sortedTimes) {
return time.Time{}
}

return o.sortedTimes[idx]
}

// -----------------------------------------------
Expand Down
182 changes: 182 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2089,6 +2089,188 @@ func TestScheduler_OneTimeJob(t *testing.T) {
}
}

func TestScheduler_AtTimesJob(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

n := time.Now().UTC()

tests := []struct {
name string
atTimes []time.Time
fakeClock clockwork.FakeClock
assertErr require.ErrorAssertionFunc
// asserts things about schedules, advance time and perform new assertions
advanceAndAsserts []func(
t *testing.T,
j Job,
clock clockwork.FakeClock,
runs *atomic.Uint32,
)
}{
{
name: "no at times",
atTimes: []time.Time{},
fakeClock: clockwork.NewFakeClock(),
assertErr: func(t require.TestingT, err error, i ...interface{}) {
require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast)
},
},
{
name: "all in the past",
atTimes: []time.Time{n.Add(-1 * time.Second)},
fakeClock: clockwork.NewFakeClockAt(n),
assertErr: func(t require.TestingT, err error, i ...interface{}) {
require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast)
},
},
{
name: "one run 1 millisecond in the future",
atTimes: []time.Time{n.Add(1 * time.Millisecond)},
fakeClock: clockwork.NewFakeClockAt(n),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
require.Equal(t, uint32(0), runs.Load())

// last not initialized
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, lastRunAt)

// next is now
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)

// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(1), runs.Load())
}, 3*time.Second, 100*time.Millisecond)

// last was run
lastRunAt, err = j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)

nextRunAt, err = j.NextRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, nextRunAt)
},
},
},
{
name: "one run in the past and one in the future",
atTimes: []time.Time{n.Add(-1 * time.Millisecond), n.Add(1 * time.Millisecond)},
fakeClock: clockwork.NewFakeClockAt(n),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
require.Equal(t, uint32(0), runs.Load())

// last not initialized
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, lastRunAt)

// next is now
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)

// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(1), runs.Load())
}, 3*time.Second, 100*time.Millisecond)

// last was run
lastRunAt, err = j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)
},
},
},
{
name: "two runs in the future",
atTimes: []time.Time{n.Add(1 * time.Millisecond), n.Add(3 * time.Millisecond)},
fakeClock: clockwork.NewFakeClockAt(n),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
require.Equal(t, uint32(0), runs.Load())

// last not initialized
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, lastRunAt)

// next is now
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)

// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(1), runs.Load())
}, 3*time.Second, 100*time.Millisecond)

// last was run
lastRunAt, err = j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)

nextRunAt, err = j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(3*time.Millisecond), nextRunAt)
},

func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(2), runs.Load())
}, 3*time.Second, 100*time.Millisecond)

// last was run
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(3*time.Millisecond), lastRunAt, 1*time.Millisecond)

nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, nextRunAt)
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t, WithClock(tt.fakeClock))
t.Cleanup(func() {
require.NoError(t, s.Shutdown())
})

runs := atomic.Uint32{}
j, err := s.NewJob(
OneTimeJob(OneTimeJobStartDateTimes(tt.atTimes...)),
NewTask(func() {
runs.Add(1)
}),
)
if tt.assertErr != nil {
tt.assertErr(t, err)
} else {
require.NoError(t, err)
s.Start()

for _, advanceAndAssert := range tt.advanceAndAsserts {
advanceAndAssert(t, j, tt.fakeClock, &runs)
}
}
})
}
}

func TestScheduler_WithLimitedRuns(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

Expand Down