Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent access for SQL storage. #1370

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion storage/sql/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,16 @@ func (c *conn) UpdateAuthRequest(id string, updater func(a storage.AuthRequest)
return err
}

err = c.flavor.lockForUpdate(tx, "auth_request", "id", r.ID)
if err != nil {
return fmt.Errorf("update auth request: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 All these fmt.Errorf make me nervous -- they're hard to deal with in call sites -- but this is the style this package is written in, so it's fine at add a few. 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can switch to github.com/pkg/errors in another PR :)

}

a, err := updater(r)
if err != nil {
return err
}

_, err = tx.Exec(`
update auth_request
set
Expand Down Expand Up @@ -380,13 +386,18 @@ func (c *conn) UpdateKeys(updater func(old storage.Keys) (storage.Keys, error))
firstUpdate := false
// TODO(ericchiang): errors may cause a transaction be rolled back by the SQL
// server. Test this, and consider adding a COUNT() command beforehand.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Is that comment still accurate now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember. Ha

err := c.flavor.lockForUpdate(tx, "keys", "id", keysRowID)
if err != nil {
return fmt.Errorf("get keys: %v", err)
}

old, err := getKeys(tx)
if err != nil {
if err != storage.ErrNotFound {
return fmt.Errorf("get keys: %v", err)
}

firstUpdate = true
old = storage.Keys{}
}

nk, err := updater(old)
Expand Down Expand Up @@ -457,6 +468,12 @@ func (c *conn) UpdateClient(id string, updater func(old storage.Client) (storage
if err != nil {
return err
}

err = c.flavor.lockForUpdate(tx, "client", "id", id)
if err != nil {
return fmt.Errorf("update client: %v", err)
}

nc, err := updater(cli)
if err != nil {
return err
Expand Down Expand Up @@ -577,6 +594,11 @@ func (c *conn) UpdatePassword(email string, updater func(p storage.Password) (st
return err
}

err = c.flavor.lockForUpdate(tx, "password", "email", p.Email)
if err != nil {
return fmt.Errorf("update password: %v", err)
}

np, err := updater(p)
if err != nil {
return err
Expand Down
41 changes: 12 additions & 29 deletions storage/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
type flavor struct {
queryReplacers []replacer

// Optional function to create and finish a transaction.
executeTx func(db *sql.DB, fn func(*sql.Tx) error) error
// Function for locking specific rows in table, which column has specified value. It must
// return error if locking failed. If underlying database doesn't support locking (sqlite3)
// just make an empty implementation.
lockForUpdate func(tx *trans, table, column, value string) error

// Does the flavor support timezones?
supportsTimezones bool
Expand All @@ -43,27 +45,9 @@ var (
// The "github.com/lib/pq" driver is the default flavor. All others are
// translations of this.
flavorPostgres = flavor{
// The default behavior for Postgres transactions is consistent reads, not consistent writes.
// For each transaction opened, ensure it has the correct isolation level.
//
// See: https://www.postgresql.org/docs/9.3/static/sql-set-transaction.html
//
// NOTE(ericchiang): For some reason using `SET SESSION CHARACTERISTICS AS TRANSACTION` at a
// session level didn't work for some edge cases. Might be something worth exploring.
executeTx: func(db *sql.DB, fn func(sqlTx *sql.Tx) error) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does anything use this anymore? can we nuke it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nuked

tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()

if _, err := tx.Exec(`SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;`); err != nil {
return err
}
if err := fn(tx); err != nil {
return err
}
return tx.Commit()
lockForUpdate: func(tx *trans, table, column, value string) error {
_, err := tx.Exec("SELECT 1 FROM "+table+" WHERE "+column+" = $1 FOR UPDATE NOWAIT;", value)
Copy link
Contributor

@vito vito Dec 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(For my own understanding,) NOWAIT is used here because the intent of this PR is still to fail one transaction in the event of concurrent updates? i.e. same goal as before, just a different (less heavy-handed) method. And so the existing concurrency tests still pass with no changes necessary.

So this doesn't attempt to address #1341 (and associated PR #1342) as-is; that could be done in a separate PR and would also involve changes to the tests. I suppose that could either be done by removing the NOWAIT or adding explicit retry logic, similar to #1342.

I'm happy to submit that PR after this is merged if no one else has plans to already, not trying to increase scope of your PR. Just making sure I understand. 🙂

Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to add an inline comment :)

This works, but the lockForUpdate API seems a bit risky. Ideally it could be baked into ExecTx or something, but nothing's coming to mind at the moment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes NOWAIT here is only because of tests. And yes, I think tests need to be redesigned a little bit, as changing the same auth_request concurrently wouldn't actually harm a login flow, and deadlock which current tests produce is not possible in real life situations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably just update/remove the tests then, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't come to idea how to test concurrency better yet. Maybe later.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the issue with interleaving connectors in the concurrent login flow mentioned in #1356 (comment) ?

Failing one of the transactions seems to be the only way to handle it in current implementation of handler/storage.

return err
},

supportsTimezones: true,
Expand All @@ -82,6 +66,11 @@ var (
// SQLite doesn't have a "now()" method, replace with "date('now')"
{regexp.MustCompile(`\bnow\(\)`), "date('now')"},
},

// There is no requirement for concurrent access for SQLite3
lockForUpdate: func(tx *trans, table, column, value string) error {
return nil
},
}
)

Expand Down Expand Up @@ -140,12 +129,6 @@ func (c *conn) QueryRow(query string, args ...interface{}) *sql.Row {

// ExecTx runs a method which operates on a transaction.
func (c *conn) ExecTx(fn func(tx *trans) error) error {
if c.flavor.executeTx != nil {
return c.flavor.executeTx(c.db, func(sqlTx *sql.Tx) error {
return fn(&trans{sqlTx, c})
})
}

sqlTx, err := c.db.Begin()
if err != nil {
return err
Expand Down