Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Farwydi master single flight mode #123

Merged
merged 3 commits into from
Feb 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,32 @@ If you want to chat, you can find us at Slack! [<img src="https://img.shields.io

## Examples

Take a look in our [go docs](https://pkg.go.dev/github.com/go-co-op/gocron#pkg-examples)
```golang
s := gocron.NewScheduler(time.UTC)

s.Every(5).Seconds().Do(func(){ ... })
```

For more examples, take a look in our [go docs](https://pkg.go.dev/github.com/go-co-op/gocron#pkg-examples)

## FAQ

* Q: I'm running multiple pods on a distributed environment. How can I make a job not run once per pod causing duplication?
* Q: I'm running multiple pods on a distributed environment. How can I make a job not run once per pod causing duplication?
* A: We recommend using your own lock solution within the jobs themselves (you could use [Redis](https://redis.io/topics/distlock), for example)

* Q: I've removed my job from the scheduler, but how can I stop a long-running job that has already been triggered?
* A: We recommend using a means of canceling your job, e.g. a `context.WithCancel()`.

---
Looking to contribute? Try to follow these guidelines:
* Use issues for everything
* For a small change, just send a PR!
* For bigger changes, please open an issue for discussion before sending a PR.
* PRs should have: tests, documentation and examples (if it makes sense)
* You can also contribute by:
* Reporting issues
* Suggesting new features or enhancements
* Improving/fixing documentation
* Use issues for everything
* For a small change, just send a PR!
* For bigger changes, please open an issue for discussion before sending a PR.
* PRs should have: tests, documentation and examples (if it makes sense)
* You can also contribute by:
* Reporting issues
* Suggesting new features or enhancements
* Improving/fixing documentation
---

[Jetbrains](https://www.jetbrains.com/?from=gocron) supports this project with GoLand licenses. We appreciate their support for free and open source software!
16 changes: 16 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ func ExampleJob_ScheduledTime() {
fmt.Println(job.ScheduledTime())
}

func ExampleJob_SingletonMode() {
s := gocron.NewScheduler(time.UTC)
job, _ := s.Every(1).Second().Do(task)
job.SingletonMode()
}

func ExampleJob_Tag() {
s := gocron.NewScheduler(time.UTC)
job, _ := s.Every("1s").Do(task)
Expand Down Expand Up @@ -427,6 +433,8 @@ func ExampleScheduler_Second() {
_, _ = s.Every(1).Do(task)
_, _ = s.Every(1).Second().Do(task)
_, _ = s.Every(1).Seconds().Do(task)
_, _ = s.Every("1s").Seconds().Do(task)
_, _ = s.Every(time.Second).Seconds().Do(task)
}

func ExampleScheduler_Seconds() {
Expand All @@ -437,6 +445,14 @@ func ExampleScheduler_Seconds() {
_, _ = s.Every(1).Do(task)
_, _ = s.Every(1).Second().Do(task)
_, _ = s.Every(1).Seconds().Do(task)
_, _ = s.Every("1s").Seconds().Do(task)
_, _ = s.Every(time.Second).Seconds().Do(task)

}

func ExampleScheduler_SingletonMode() {
s := gocron.NewScheduler(time.UTC)
_, _ = s.Every(1).Second().SingletonMode().Do(task)
}

func ExampleScheduler_StartBlocking() {
Expand Down
12 changes: 11 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@ func (e *executor) start() {
wg.Add(1)
go func() {
defer wg.Done()
callJobFuncWithParams(f.functions[f.name], f.params[f.name])

switch f.runConfig.mode {
case defaultMode:
callJobFuncWithParams(f.functions[f.name], f.params[f.name])
case singletonMode:
_, _, _ = f.limiter.Do("main", func() (interface{}, error) {
callJobFuncWithParams(f.functions[f.name], f.params[f.name])
return nil, nil
})
}

}()
case <-e.stop:
wg.Wait()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ go 1.15
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
39 changes: 33 additions & 6 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"sync"
"time"

"golang.org/x/sync/singleflight"
)

// Job struct stores the information necessary to run a Job
Expand All @@ -22,7 +24,6 @@ type Job struct {
scheduledWeekday *time.Weekday // Specific day of the week to start on
dayOfTheMonth int // Specific day of the month to run the job
tags []string // allow the user to tag Jobs with certain labels
runConfig runConfig // configuration for how many times to run the job
runCount int // number of times the job ran
timer *time.Timer
}
Expand All @@ -31,14 +32,28 @@ type jobFunction struct {
functions map[string]interface{} // Map for the function task store
params map[string][]interface{} // Map for function and params of function
name string // the Job name to run, func[jobFunc]
runConfig runConfig // configuration for how many times to run the job
limiter *singleflight.Group // limits inflight runs of job to one
}

type runConfig struct {
finiteRuns bool
maxRuns int
mode mode
removeAfterLastRun bool
}

// mode is the Job's running mode
type mode int8

const (
// defaultMode disable any mode
defaultMode mode = iota

// singletonMode switch to single job mode
singletonMode
)

// NewJob creates a new Job with the provided interval
func NewJob(interval int) *Job {
return &Job{
Expand Down Expand Up @@ -137,18 +152,30 @@ func (j *Job) Weekday() (time.Weekday, error) {

// LimitRunsTo limits the number of executions of this job to n.
// The job will remain in the scheduler.
// Note: If a job is added to a running scheduler and this method is used
// Note: If a job is added to a running scheduler and this method is then used
// you may see the job run more than the set limit as job is scheduled immediately
// by default upon being added to the scheduler. It is recommended to use the
// LimitRunsTo() func on the scheduler chain when scheduling the job.
// For example: scheduler.LimitRunsTo(1).Do()
func (j *Job) LimitRunsTo(n int) {
j.Lock()
defer j.Unlock()
j.runConfig = runConfig{
finiteRuns: true,
maxRuns: n,
}
j.runConfig.finiteRuns = true
j.runConfig.maxRuns = n
}

// SingletonMode prevents a new job from starting if the prior job has not yet
// completed it's run
// Note: If a job is added to a running scheduler and this method is then used
// you may see the job run overrun itself as job is scheduled immediately
// by default upon being added to the scheduler. It is recommended to use the
// SingletonMode() func on the scheduler chain when scheduling the job.
func (j *Job) SingletonMode() {
j.Lock()
defer j.Unlock()
j.runConfig.mode = singletonMode
j.jobFunction.limiter = &singleflight.Group{}

}

// RemoveAfterLastRun sets the job to be removed after it's last run (when limited)
Expand Down
6 changes: 4 additions & 2 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ func TestJob_shouldRunAgain(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
j := &Job{
runConfig: tt.runConfig,
runCount: tt.runCount,
jobFunction: jobFunction{
runConfig: tt.runConfig,
},
runCount: tt.runCount,
}
if got := j.shouldRun(); got != tt.want {
t.Errorf("Job.shouldRunAgain() = %v, want %v", got, tt.want)
Expand Down
10 changes: 10 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ func (s *Scheduler) run(job *Job) {
return
}

job.Lock()
defer job.Unlock()
job.setLastRun(s.now())
job.runCount++
s.executor.jobFunctions <- job.jobFunction
Expand Down Expand Up @@ -436,6 +438,14 @@ func (s *Scheduler) LimitRunsTo(i int) *Scheduler {
return s
}

// SingletonMode prevents a new job from starting if the prior job has not yet
// completed it's run
func (s *Scheduler) SingletonMode() *Scheduler {
job := s.getCurrentJob()
job.SingletonMode()
return s
}

// RemoveAfterLastRun sets the job to be removed after it's last run (when limited)
func (s *Scheduler) RemoveAfterLastRun() *Scheduler {
job := s.getCurrentJob()
Expand Down
27 changes: 25 additions & 2 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var _ timeWrapper = (*fakeTime)(nil)
Expand Down Expand Up @@ -1144,6 +1144,29 @@ func TestCalculateMonths(t *testing.T) {
assert.Equal(t, s.time.Now(s.location).AddDate(0, 1, 0).Month(), job.nextRun.Month())
}

func TestScheduler_SingletonMode(t *testing.T) {
t.Run("next run of long running job doesn't overrun", func(t *testing.T) {
//semaphore := make(chan bool)

s := NewScheduler(time.UTC)
var trigger int32

_, err := s.Every(1).Second().SingletonMode().Do(func() {
if atomic.LoadInt32(&trigger) == 1 {
t.Fatal("Restart should not occur")
}
atomic.AddInt32(&trigger, 1)
fmt.Println("I am a long task")
time.Sleep(3 * time.Second)
})
require.NoError(t, err)

s.StartAsync()
time.Sleep(2 * time.Second)
s.Stop()
})
}

func TestScheduler_LimitRunsTo(t *testing.T) {
t.Run("job added before starting scheduler", func(t *testing.T) {
semaphore := make(chan bool)
Expand Down