Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact: schedules jobs with time.AfterFunc() #99

Merged
merged 7 commits into from
Dec 30, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Job struct {
tags []string // allow the user to tag Jobs with certain labels
runConfig runConfig // configuration for how many times to run the job
runCount int // number of times the job ran
timer *time.Timer
}

type runConfig struct {
Expand All @@ -51,8 +52,8 @@ func NewJob(interval uint64) *Job {
func (j *Job) run() {
j.Lock()
defer j.Unlock()
callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc])
j.runCount++
go callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc])
}

func (j *Job) neverRan() bool {
Expand All @@ -73,6 +74,18 @@ func (j *Job) setStartsImmediately(b bool) {
j.startsImmediately = b
}

func (j *Job) getTimer() *time.Timer {
j.RLock()
defer j.RUnlock()
return j.timer
}

func (j *Job) setTimer(t *time.Timer) {
j.Lock()
defer j.Unlock()
j.timer = t
}

func (j *Job) getAtTime() time.Duration {
j.RLock()
defer j.RUnlock()
Expand Down
118 changes: 47 additions & 71 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewScheduler(loc *time.Location) *Scheduler {
jobs: make([]*Job, 0),
location: loc,
running: false,
stopChan: make(chan struct{}),
stopChan: make(chan struct{}, 1),
time: &trueTime{},
}
}
Expand All @@ -46,24 +46,30 @@ func (s *Scheduler) StartAsync() chan struct{} {
if s.IsRunning() {
return s.stopChan
}
s.start()
return s.stopChan
}

//start runs each job and schedules it's next run
func (s *Scheduler) start() {
s.setRunning(true)
s.runJobs()
}

s.scheduleAllJobs()
ticker := s.time.NewTicker(1 * time.Second)
go func() {
for {
select {
case <-ticker.C:
s.RunPending()
case <-s.stopChan:
ticker.Stop()
s.setRunning(false)
return
func (s *Scheduler) runJobs() {
for _, j := range s.Jobs() {
if j.getStartsImmediately() {
s.run(j)
j.setStartsImmediately(false)
}
if !j.shouldRun() {
if j.getRemoveAfterLastRun() { // TODO: this method seems unnecessary as we could always remove after the run cout has expired. Maybe remove this in the future?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

s.RemoveByReference(j)
}
continue
}
}()

return s.stopChan
s.scheduleNextRun(j)
}
}

func (s *Scheduler) setRunning(b bool) {
Expand Down Expand Up @@ -126,41 +132,40 @@ func (s *Scheduler) Location() *time.Location {

// scheduleNextRun Compute the instant when this Job should run next
func (s *Scheduler) scheduleNextRun(job *Job) {
now := s.time.Now(s.Location())
now := s.now()
lastRun := job.LastRun()

// job can be scheduled with .StartAt()
if job.neverRan() {
if !job.NextRun().IsZero() {
return // scheduled for future run and should skip scheduling
}
// default is for jobs to start immediately unless scheduled at a specific time or day
if job.getStartsImmediately() {
job.setNextRun(now)
return
}
lastRun = now
}

job.setLastRun(now)

durationToNextRun := s.durationToNextRun(job)
job.setNextRun(job.LastRun().Add(durationToNextRun))
durationToNextRun := s.durationToNextRun(lastRun, job)
job.setNextRun(lastRun.Add(durationToNextRun))
job.setTimer(time.AfterFunc(durationToNextRun, func() {
s.run(job)
s.scheduleNextRun(job)
}))
}

func (s *Scheduler) durationToNextRun(job *Job) time.Duration {
lastRun := job.LastRun()
func (s *Scheduler) durationToNextRun(t time.Time, job *Job) time.Duration {
var duration time.Duration
switch job.unit {
case seconds, minutes, hours:
duration = s.calculateDuration(job)
case days:
duration = s.calculateDays(job, lastRun)
duration = s.calculateDays(job, t)
case weeks:
if job.scheduledWeekday != nil { // weekday selected, Every().Monday(), for example
duration = s.calculateWeekday(job, lastRun)
duration = s.calculateWeekday(job, t)
} else {
duration = s.calculateWeeks(job, lastRun)
duration = s.calculateWeeks(job, t)
}
case months:
duration = s.calculateMonths(job, lastRun)
duration = s.calculateMonths(job, t)
}
return duration
}
Expand Down Expand Up @@ -280,18 +285,6 @@ func (s *Scheduler) roundToMidnight(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, s.Location())
}

// Get the current runnable Jobs, which shouldRun is True
func (s *Scheduler) runnableJobs() []*Job {
var runnableJobs []*Job
sort.Sort(s)
for _, job := range s.Jobs() {
if s.shouldRun(job) {
runnableJobs = append(runnableJobs, job)
}
}
return runnableJobs
}

// NextRun datetime when the next Job should run.
func (s *Scheduler) NextRun() (*Job, time.Time) {
if len(s.Jobs()) <= 0 {
Expand All @@ -310,25 +303,12 @@ func (s *Scheduler) Every(interval uint64) *Scheduler {
return s
}

// RunPending runs all the Jobs that are scheduled to run.
func (s *Scheduler) RunPending() {
for _, job := range s.runnableJobs() {
s.runAndReschedule(job) // we should handle this error somehow
func (s *Scheduler) run(job *Job) {
if !s.running {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JohnRoesler also, do you think this might help with that issue that we talked earlier on slack?

return
}
}

func (s *Scheduler) runAndReschedule(job *Job) error {
if err := s.run(job); err != nil {
return err
}
s.scheduleNextRun(job)
return nil
}

func (s *Scheduler) run(job *Job) error {
job.setLastRun(s.time.Now(s.Location()))
go job.run()
return nil
job.setLastRun(s.now())
job.run()
}

// RunAll run all Jobs regardless if they are scheduled to run or not
Expand All @@ -339,10 +319,7 @@ func (s *Scheduler) RunAll() {
// RunAllWithDelay runs all Jobs with delay seconds
func (s *Scheduler) RunAllWithDelay(d int) {
for _, job := range s.Jobs() {
err := s.run(job)
if err != nil {
continue
}
s.run(job)
s.time.Sleep(time.Duration(d) * time.Second)
}
}
Expand Down Expand Up @@ -418,11 +395,12 @@ func (s *Scheduler) Clear() {
// Stop stops the scheduler. This is a no-op if the scheduler is already stopped .
func (s *Scheduler) Stop() {
if s.IsRunning() {
s.stopScheduler()
s.stop()
}
}

func (s *Scheduler) stopScheduler() {
func (s *Scheduler) stop() {
s.setRunning(false)
s.stopChan <- struct{}{}
}

Expand Down Expand Up @@ -626,8 +604,6 @@ func (s *Scheduler) getCurrentJob() *Job {
return s.Jobs()[len(s.jobs)-1]
}

func (s *Scheduler) scheduleAllJobs() {
for _, j := range s.Jobs() {
s.scheduleNextRun(j)
}
func (s *Scheduler) now() time.Time {
return s.time.Now(s.Location())
}
Loading