Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dustin decker/max concurrent jobs #126

Merged
9 changes: 9 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,15 @@ func ExampleScheduler_Seconds() {

}

func ExampleScheduler_SetMaxConcurrentJobs() {
s := gocron.NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(1, gocron.RescheduleMode)
_, _ = s.Every(1).Seconds().Do(func() {
fmt.Println("This will run once every 5 seconds even though it is scheduled every second because maximum concurrent job limit is set.")
time.Sleep(5 * time.Second)
})
}

func ExampleScheduler_SingletonMode() {
s := gocron.NewScheduler(time.UTC)
_, _ = s.Every(1).Second().SingletonMode().Do(task)
Expand Down
46 changes: 43 additions & 3 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,32 @@ package gocron

import (
"sync"

"golang.org/x/sync/semaphore"
)

const (
// default is that if a limit on maximum concurrent jobs is set
// and the limit is reached, a job will skip it's run and try
// again on the next occurrence in the schedule
RescheduleMode limitMode = iota

// in wait mode if a limit on maximum concurrent jobs is set
// and the limit is reached, a job will wait to try and run
// until a spot in the limit is freed up.
//
// Note: this mode can produce unpredictable results as
// job execution order isn't guaranteed. For example, a job that
// executes frequently may pile up in the wait queue and be executed
// many times back to back when the queue opens.
WaitMode
)

type executor struct {
jobFunctions chan jobFunction
stop chan struct{}
jobFunctions chan jobFunction
stop chan struct{}
limitMode limitMode
maxRunningJobs *semaphore.Weighted
}

func newExecutor() executor {
Expand All @@ -25,6 +46,26 @@ func (e *executor) start() {
go func() {
defer wg.Done()

if e.maxRunningJobs != nil {
if !e.maxRunningJobs.TryAcquire(1) {

switch e.limitMode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really liked this approach

case RescheduleMode:
return
case WaitMode:
for {
JohnRoesler marked this conversation as resolved.
Show resolved Hide resolved
if !e.maxRunningJobs.TryAcquire(1) {
continue
} else {
break
}
JohnRoesler marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

defer e.maxRunningJobs.Release(1)
}

switch f.runConfig.mode {
case defaultMode:
callJobFuncWithParams(f.functions[f.name], f.params[f.name])
Expand All @@ -34,7 +75,6 @@ func (e *executor) start() {
return nil, nil
})
}

}()
case <-e.stop:
wg.Wait()
Expand Down
16 changes: 13 additions & 3 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ import (
"strings"
"sync"
"time"

"golang.org/x/sync/semaphore"
)

type limitMode int8

// Scheduler struct stores a list of Jobs and the location of time Scheduler
// Scheduler implements the sort.Interface{} for sorting Jobs, by the time of nextRun
type Scheduler struct {
Expand All @@ -17,9 +21,8 @@ type Scheduler struct {

locationMutex sync.RWMutex
location *time.Location

runningMutex sync.RWMutex
running bool // represents if the scheduler is running at the moment or not
runningMutex sync.RWMutex
running bool // represents if the scheduler is running at the moment or not

time timeWrapper // wrapper around time.Time
executor *executor // executes jobs passed via chan
Expand All @@ -38,6 +41,13 @@ func NewScheduler(loc *time.Location) *Scheduler {
}
}

// SetMaxConcurrentJobs limits how many jobs can be running at the same time.
// This is useful when running resource intensive jobs and a precise start time is not critical.
func (s *Scheduler) SetMaxConcurrentJobs(n int, mode limitMode) {
s.executor.maxRunningJobs = semaphore.NewWeighted(int64(n))
s.executor.limitMode = mode
}

// StartBlocking starts all jobs and blocks the current thread
func (s *Scheduler) StartBlocking() {
s.StartAsync()
Expand Down
80 changes: 80 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,86 @@ func TestScheduler_LimitRunsTo(t *testing.T) {
})
}

func TestScheduler_SetMaxConcurrentJobs(t *testing.T) {
t.Run("reschedule mode", func(t *testing.T) {
semaphore := make(chan bool)

s := NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(2, RescheduleMode)

f := func() {
semaphore <- true
time.Sleep(2 * time.Second)
}

_, err := s.Every(1).Second().Do(f)
require.NoError(t, err)

_, err = s.Every(2).Second().Do(f)
require.NoError(t, err)

_, err = s.Every(3).Second().Do(f)
require.NoError(t, err)

s.StartAsync()

var counter int

now := time.Now()
for time.Now().Before(now.Add(4 * time.Second)) {
if <-semaphore {
counter++
}
}

// Expecting a total of 5 job runs:
// 0s - jobs 1 & 2 run, job 3 hits the limit and is skipped
// 1s - job 1 hits the limit as skipped
// 2s - jobs 1 & 2 hit the limit and are skipped
// 3s - jobs 1 & 3 run
assert.Equal(t, 5, counter)
})

t.Run("wait mode", func(t *testing.T) {
semaphore := make(chan bool)

s := NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(2, WaitMode)

f := func() {
semaphore <- true
time.Sleep(1 * time.Second)
}

_, err := s.Every(1).Second().Do(f)
require.NoError(t, err)

_, err = s.Every(2).Second().Do(f)
require.NoError(t, err)

_, err = s.Every(3).Second().Do(f)
require.NoError(t, err)

s.StartAsync()

var counter int

now := time.Now()
for time.Now().Before(now.Add(4 * time.Second)) {
if <-semaphore {
counter++
}
}

// Expecting a total of 9 job runs. The exact order of jobs may vary, for example:
// 0s - jobs 2 & 3 run, job 1 hits the limit and waits
// 1s - job 1 runs twice, the blocked run and the regularly scheduled run
// 2s - jobs 1 & 3 run
// 3s - jobs 2 & 3 run, job 1 hits the limit and waits
assert.Equal(t, 9, counter)
})
}

func TestScheduler_RemoveAfterLastRun(t *testing.T) {
t.Run("job removed after the last run", func(t *testing.T) {
semaphore := make(chan bool)
Expand Down