Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug in scheduler, rescheduling jobs immediately #23

Merged
merged 5 commits into from
Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type Job struct {
fparams map[string][]interface{} // Map for function and params of function
lock bool // lock the Job from running at same time form multiple instances
tags []string // allow the user to tag Jobs with certain labels
time timeHelper // an instance of timeHelper to interact with the time package
}

// NewJob creates a new Job with the provided interval
Expand All @@ -34,19 +33,12 @@ func NewJob(interval uint64) *Job {
funcs: make(map[string]interface{}),
fparams: make(map[string][]interface{}),
tags: []string{},
time: th,
}
}

// shouldRun returns true if the Job should be run now
func (j *Job) shouldRun() bool {
return j.time.Now().Unix() >= j.nextRun.Unix()
}

// Run the Job and immediately reschedule it
func (j *Job) run() {
j.lastRun = j.time.Now()
go callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc])
callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc])
}

// Err returns an error if one ocurred while creating the Job
Expand Down
60 changes: 40 additions & 20 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,7 @@ func (s *Scheduler) SetLocation(newLocation *time.Location) {

// scheduleNextRun Compute the instant when this Job should run next
func (s *Scheduler) scheduleNextRun(j *Job) error {
now := s.time.Now().In(s.loc)
if j.lastRun == s.time.Unix(0, 0) {
j.lastRun = now
}

if j.nextRun.After(now) {
return nil
}
now := s.time.Now(s.loc)

periodDuration, err := j.periodDuration()
if err != nil {
Expand All @@ -89,7 +82,7 @@ func (s *Scheduler) scheduleNextRun(j *Job) error {
j.nextRun = j.lastRun.Add(periodDuration)
case days:
j.nextRun = s.roundToMidnight(j.lastRun)
j.nextRun = j.nextRun.Add(j.atTime)
j.nextRun = j.nextRun.Add(j.atTime).Add(periodDuration)
case weeks:
j.nextRun = s.roundToMidnight(j.lastRun)
dayDiff := int(j.startDay)
Expand Down Expand Up @@ -118,7 +111,7 @@ func (s *Scheduler) getRunnableJobs() []*Job {
var runnableJobs []*Job
sort.Sort(s)
for _, job := range s.jobs {
if job.shouldRun() {
if s.shouldRun(job) {
runnableJobs = append(runnableJobs, job)
} else {
break
Expand All @@ -130,7 +123,7 @@ func (s *Scheduler) getRunnableJobs() []*Job {
// NextRun datetime when the next Job should run.
func (s *Scheduler) NextRun() (*Job, time.Time) {
if len(s.jobs) <= 0 {
return nil, s.time.Now()
return nil, s.time.Now(s.loc)
}
sort.Sort(s)
return s.jobs[0], s.jobs[0].nextRun
Expand All @@ -147,11 +140,21 @@ func (s *Scheduler) Every(interval uint64) *Scheduler {
func (s *Scheduler) RunPending() {
runnableJobs := s.getRunnableJobs()
for _, job := range runnableJobs {
s.runJob(job)
s.runAndReschedule(job) // we should handle this error somehow
}
}

func (s *Scheduler) runJob(job *Job) error {
func (s *Scheduler) runAndReschedule(job *Job) error {
if err := s.run(job); err != nil {
return err
}
if err := s.scheduleNextRun(job); err != nil {
return err
}
return nil
}

func (s *Scheduler) run(job *Job) error {
if job.lock {
if locker == nil {
return fmt.Errorf("trying to lock %s with nil locker", job.jobFunc)
Expand All @@ -161,11 +164,10 @@ func (s *Scheduler) runJob(job *Job) error {
locker.Lock(key)
defer locker.Unlock(key)
}
job.run()
err := s.scheduleNextRun(job)
if err != nil {
return err
}

job.lastRun = s.time.Now(s.loc)
go job.run()

return nil
}

Expand All @@ -177,7 +179,7 @@ func (s *Scheduler) RunAll() {
// RunAllWithDelay runs all Jobs with delay seconds
func (s *Scheduler) RunAllWithDelay(d int) {
for _, job := range s.jobs {
err := s.runJob(job)
err := s.run(job)
if err != nil {
continue
}
Expand Down Expand Up @@ -253,6 +255,19 @@ func (s *Scheduler) Do(jobFun interface{}, params ...interface{}) (*Job, error)
j.jobFunc = fname

if !j.startsImmediately {
periodDuration, err := j.periodDuration()
if err != nil {
return nil, err
}

if j.lastRun == s.time.Unix(0, 0) {
j.lastRun = s.time.Now(s.loc)

if j.atTime != 0 {
j.lastRun = j.lastRun.Add(-periodDuration)
}
}

if err := s.scheduleNextRun(j); err != nil {
return nil, err
}
Expand Down Expand Up @@ -283,11 +298,16 @@ func (s *Scheduler) StartAt(t time.Time) *Scheduler {
// StartImmediately sets the Jobs next run as soon as the scheduler starts
func (s *Scheduler) StartImmediately() *Scheduler {
job := s.getCurrentJob()
job.nextRun = s.time.Now().In(s.loc)
job.nextRun = s.time.Now(s.loc)
job.startsImmediately = true
return s
}

// shouldRun returns true if the Job should be run now
func (s *Scheduler) shouldRun(j *Job) bool {
Streppel marked this conversation as resolved.
Show resolved Hide resolved
return s.time.Now(s.loc).Unix() >= j.nextRun.Unix()
}

// setUnit sets the unit type
func (s *Scheduler) setUnit(unit timeUnit) {
currentJob := s.getCurrentJob()
Expand Down
8 changes: 4 additions & 4 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestExecutionSecond(t *testing.T) {
func TestExecutionSeconds(t *testing.T) {
sched := NewScheduler(time.UTC)
jobDone := make(chan bool)
executionTimes := make([]int64, 0, 2)
var executionTimes []int64
numberOfIterations := 2

sched.Every(2).Seconds().Do(func() {
Expand Down Expand Up @@ -75,18 +75,18 @@ func TestStartImmediately(t *testing.T) {
func TestAt(t *testing.T) {
s := NewScheduler(time.UTC)

// Schedule to run in next minute
// Schedule to run in next 2 seconds
now := time.Now().UTC()
dayJobDone := make(chan bool, 1)

// Schedule every day At
startAt := fmt.Sprintf("%02d:%02d", now.Hour(), now.Add(time.Minute).Minute())
startAt := fmt.Sprintf("%02d:%02d:%02d", now.Hour(), now.Minute(), now.Add(time.Second*2).Second())
dayJob, _ := s.Every(1).Day().At(startAt).Do(func() {
dayJobDone <- true
})

// Expected start time
expectedStartTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Add(time.Minute).Minute(), 0, 0, time.UTC)
expectedStartTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Add(time.Second*2).Second(), 0, time.UTC)
nextRun := dayJob.NextScheduledTime()
assert.Equal(t, expectedStartTime, nextRun)

Expand Down
7 changes: 4 additions & 3 deletions timeHelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package gocron
import "time"

type timeHelper interface {
Now() time.Time
Now(*time.Location) time.Time
Unix(int64, int64) time.Time
Sleep(time.Duration)
Date(int, time.Month, int, int, int, int, int, *time.Location) time.Time
Expand All @@ -16,8 +16,9 @@ func newTimeHelper() timeHelper {

type trueTime struct{}

func (t *trueTime) Now() time.Time {
return time.Now()
func (t *trueTime) Now(location *time.Location) time.Time {
n := time.Now().In(location)
return t.Date(n.Year(), n.Month(), n.Day(), n.Hour(), n.Minute(), n.Second(), 0, location)
}

func (t *trueTime) Unix(sec int64, nsec int64) time.Time {
Expand Down