Skip to content

Commit

Permalink
Merge pull request grafana#14468 from bergquist/db_lock
Browse files Browse the repository at this point in the history
Infra package for creating distributed lock to make sure functions are executed once even in HA mode.
  • Loading branch information
bergquist committed Dec 17, 2018
2 parents c0fc236 + 7aa84ae commit 7653d8a
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/grafana-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

// self registering services
_ "github.com/grafana/grafana/pkg/extensions"
_ "github.com/grafana/grafana/pkg/infra/serverlock"
_ "github.com/grafana/grafana/pkg/metrics"
_ "github.com/grafana/grafana/pkg/plugins"
_ "github.com/grafana/grafana/pkg/services/alerting"
Expand Down
8 changes: 8 additions & 0 deletions pkg/infra/serverlock/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package serverlock

type serverLock struct {
Id int64
OperationUid string
LastExecution int64
Version int64
}
116 changes: 116 additions & 0 deletions pkg/infra/serverlock/serverlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package serverlock

import (
"context"
"time"

"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/sqlstore"
)

func init() {
registry.RegisterService(&ServerLockService{})
}

// ServerLockService allows servers in HA mode to claim a lock
// and execute an function if the server was granted the lock
type ServerLockService struct {
SQLStore *sqlstore.SqlStore `inject:""`
log log.Logger
}

// Init this service
func (sl *ServerLockService) Init() error {
sl.log = log.New("infra.lockservice")
return nil
}

// LockAndExecute try to create a lock for this server and only executes the
// `fn` function when successful. This should not be used at low internal. But services
// that needs to be run once every ex 10m.
func (sl *ServerLockService) LockAndExecute(ctx context.Context, actionName string, maxInterval time.Duration, fn func()) error {
// gets or creates a lockable row
rowLock, err := sl.getOrCreate(ctx, actionName)
if err != nil {
return err
}

// avoid execution if last lock happened less than `maxInterval` ago
if rowLock.LastExecution != 0 {
lastExeuctionTime := time.Unix(rowLock.LastExecution, 0)
if lastExeuctionTime.Unix() > time.Now().Add(-maxInterval).Unix() {
return nil
}
}

// try to get lock based on rowLow version
acquiredLock, err := sl.acquireLock(ctx, rowLock)
if err != nil {
return err
}

if acquiredLock {
fn()
}

return nil
}

func (sl *ServerLockService) acquireLock(ctx context.Context, serverLock *serverLock) (bool, error) {
var result bool

err := sl.SQLStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
newVersion := serverLock.Version + 1
sql := `UPDATE server_lock SET
version = ?,
last_execution = ?
WHERE
id = ? AND version = ?`

res, err := dbSession.Exec(sql, newVersion, time.Now().Unix(), serverLock.Id, serverLock.Version)
if err != nil {
return err
}

affected, err := res.RowsAffected()
result = affected == 1

return err
})

return result, err
}

func (sl *ServerLockService) getOrCreate(ctx context.Context, actionName string) (*serverLock, error) {
var result *serverLock

err := sl.SQLStore.WithTransactionalDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
lockRows := []*serverLock{}
err := dbSession.Where("operation_uid = ?", actionName).Find(&lockRows)
if err != nil {
return err
}

if len(lockRows) > 0 {
result = lockRows[0]
return nil
}

lockRow := &serverLock{
OperationUid: actionName,
LastExecution: 0,
}

_, err = dbSession.Insert(lockRow)
if err != nil {
return err
}

result = lockRow

return nil
})

return result, err
}
40 changes: 40 additions & 0 deletions pkg/infra/serverlock/serverlock_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// +build integration

package serverlock

import (
"context"
"testing"
"time"

. "github.com/smartystreets/goconvey/convey"
)

func TestServerLok(t *testing.T) {
sl := createTestableServerLock(t)

Convey("Server lock integration tests", t, func() {
counter := 0
var err error
incCounter := func() { counter++ }
atInterval := time.Second * 1
ctx := context.Background()

//this time `fn` should be executed
So(sl.LockAndExecute(ctx, "test-operation", atInterval, incCounter), ShouldBeNil)

//this should not execute `fn`
So(sl.LockAndExecute(ctx, "test-operation", atInterval, incCounter), ShouldBeNil)
So(sl.LockAndExecute(ctx, "test-operation", atInterval, incCounter), ShouldBeNil)
So(sl.LockAndExecute(ctx, "test-operation", atInterval, incCounter), ShouldBeNil)
So(sl.LockAndExecute(ctx, "test-operation", atInterval, incCounter), ShouldBeNil)

// wait 5 second.
<-time.After(atInterval * 2)

// now `fn` should be executed again
err = sl.LockAndExecute(ctx, "test-operation", atInterval, incCounter)
So(err, ShouldBeNil)
So(counter, ShouldEqual, 2)
})
}
55 changes: 55 additions & 0 deletions pkg/infra/serverlock/serverlock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package serverlock

import (
"context"
"testing"

"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/services/sqlstore"
. "github.com/smartystreets/goconvey/convey"
)

func createTestableServerLock(t *testing.T) *ServerLockService {
t.Helper()

sqlstore := sqlstore.InitTestDB(t)

return &ServerLockService{
SQLStore: sqlstore,
log: log.New("test-logger"),
}
}

func TestServerLock(t *testing.T) {
Convey("Server lock", t, func() {
sl := createTestableServerLock(t)
operationUID := "test-operation"

first, err := sl.getOrCreate(context.Background(), operationUID)
So(err, ShouldBeNil)

lastExecution := first.LastExecution
Convey("trying to create three new row locks", func() {
for i := 0; i < 3; i++ {
first, err = sl.getOrCreate(context.Background(), operationUID)
So(err, ShouldBeNil)
So(first.OperationUid, ShouldEqual, operationUID)
So(first.Id, ShouldEqual, 1)
}

Convey("Should not create new since lock already exist", func() {
So(lastExecution, ShouldEqual, first.LastExecution)
})
})

Convey("Should be able to create lock on first row", func() {
gotLock, err := sl.acquireLock(context.Background(), first)
So(err, ShouldBeNil)
So(gotLock, ShouldBeTrue)

gotLock, err = sl.acquireLock(context.Background(), first)
So(err, ShouldBeNil)
So(gotLock, ShouldBeFalse)
})
})
}
11 changes: 8 additions & 3 deletions pkg/services/cleanup/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import (
"time"

"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/infra/serverlock"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/setting"
)

type CleanUpService struct {
log log.Logger
Cfg *setting.Cfg `inject:""`
log log.Logger
Cfg *setting.Cfg `inject:""`
ServerLockService *serverlock.ServerLockService `inject:""`
}

func init() {
Expand All @@ -38,7 +40,10 @@ func (srv *CleanUpService) Run(ctx context.Context) error {
srv.cleanUpTmpFiles()
srv.deleteExpiredSnapshots()
srv.deleteExpiredDashboardVersions()
srv.deleteOldLoginAttempts()
srv.ServerLockService.LockAndExecute(ctx, "delete old login attempts", time.Minute*10, func() {
srv.deleteOldLoginAttempts()
})

case <-ctx.Done():
return ctx.Err()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/services/sqlstore/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func AddMigrations(mg *Migrator) {
addTagMigration(mg)
addLoginAttemptMigrations(mg)
addUserAuthMigrations(mg)
addServerlockMigrations(mg)
}

func addMigrationLogMigrations(mg *Migrator) {
Expand Down
22 changes: 22 additions & 0 deletions pkg/services/sqlstore/migrations/serverlock_migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package migrations

import "github.com/grafana/grafana/pkg/services/sqlstore/migrator"

func addServerlockMigrations(mg *migrator.Migrator) {
serverLock := migrator.Table{
Name: "server_lock",
Columns: []*migrator.Column{
{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "operation_uid", Type: migrator.DB_NVarchar, Length: 100},
{Name: "version", Type: migrator.DB_BigInt},
{Name: "last_execution", Type: migrator.DB_BigInt, Nullable: false},
},
Indices: []*migrator.Index{
{Cols: []string{"operation_uid"}, Type: migrator.UniqueIndex},
},
}

mg.AddMigration("create server_lock table", migrator.NewAddTableMigration(serverLock))

mg.AddMigration("add index server_lock.operation_uid", migrator.NewAddIndexMigration(serverLock, serverLock.Indices[0]))
}
2 changes: 1 addition & 1 deletion scripts/circle-test-backend.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ exit_if_fail time go install ./pkg/cmd/grafana-server
echo "running go test"
set -e
time for d in $(go list ./pkg/...); do
exit_if_fail go test -covermode=atomic $d
exit_if_fail go test -tags=integration -covermode=atomic $d
done

0 comments on commit 7653d8a

Please sign in to comment.