Skip to content

Feat/db migration #586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rolling-shutter/chainobserver/db/collator/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var files embed.FS
var Definition db.Definition

func init() {
def, err := db.NewSQLCDefinition(files, "sql/", "chainobscollator", 1)
def, err := db.NewSQLCDefinition(files, "sql/", "chainobscollator")
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize DB metadata")
}
Expand Down
2 changes: 1 addition & 1 deletion rolling-shutter/chainobserver/db/keyper/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var files embed.FS
var Definition db.Definition

func init() {
def, err := db.NewSQLCDefinition(files, "sql/", "chainobskeyper", 1)
def, err := db.NewSQLCDefinition(files, "sql/", "chainobskeyper")
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize DB metadata")
}
Expand Down
2 changes: 1 addition & 1 deletion rolling-shutter/chainobserver/db/sync/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var Definition db.Definition

func init() {
var err error
Definition, err = db.NewSQLCDefinition(files, "sql/", "chainobssync", 1)
Definition, err = db.NewSQLCDefinition(files, "sql/", "chainobssync")
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize DB metadata")
}
Expand Down
2 changes: 1 addition & 1 deletion rolling-shutter/keyper/database/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var files embed.FS
var Definition db.Definition

func init() {
sqlcDB, err := db.NewSQLCDefinition(files, "sql/", "keyper", 1)
sqlcDB, err := db.NewSQLCDefinition(files, "sql/", "keyper")
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize DB")
}
Expand Down
2 changes: 1 addition & 1 deletion rolling-shutter/keyperimpl/gnosis/database/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var files embed.FS
var Definition db.Definition

func init() {
def, err := db.NewSQLCDefinition(files, "sql/", "gnosiskeyper", 1)
def, err := db.NewSQLCDefinition(files, "sql/", "gnosiskeyper")
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize DB metadata")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- schema-version: gnosiskeyper-2 --
-- migrations need to start from V2... as file name, as the V1 was initial schema

ALTER TABLE validator_registrations
DROP CONSTRAINT validator_registrations_pkey,
ADD PRIMARY KEY (block_number, tx_index, log_index, validator_index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for this change? Currently, the primary key is (block_number, tx_index, log_index). The validator registry emits one log per registration (which contains the signature), so adding the validator index does neither constrain it more or less.

Copy link
Contributor Author

@blockchainluffy blockchainluffy May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to integrate AggregateRegistrationMessage, in which more than one validators can submit signatures in a single transaction.
The above constraint was failing as it only allowed one validator index to be added in a single transaction.

Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@ CREATE TABLE validator_registrations_synced_until(
enforce_one_row bool PRIMARY KEY DEFAULT true,
block_hash bytea NOT NULL,
block_number bigint NOT NULL CHECK (block_number >= 0)
);
);

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var files embed.FS
var Definition db.Definition

func init() {
def, err := db.NewSQLCDefinition(files, "sql/", "shutterservicekeyper", 1)
def, err := db.NewSQLCDefinition(files, "sql/", "shutterservicekeyper")
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize DB metadata")
}
Expand Down
2 changes: 1 addition & 1 deletion rolling-shutter/medley/db/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var metaDefinition Definition

func init() {
var err error
metaDefinition, err = NewSQLCDefinition(files, "sql/", "meta", 1)
metaDefinition, err = NewSQLCDefinition(files, "sql/", "meta")
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize DB metadata")
}
Expand Down
43 changes: 33 additions & 10 deletions rolling-shutter/medley/db/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,23 @@ import (
// pins the database to a specific role, e.g. "keyper-test" or "snapshot-keyper-production"
// in order to prevent later usage of the database with commands that fulfill a different role.
func InitDB(ctx context.Context, dbpool *pgxpool.Pool, role string, definition Definition) error {
// First check if schema exists and is valid
err := dbpool.BeginFunc(WrapContext(ctx, definition.Validate))
if err == nil {
shdb.AddConnectionInfo(log.Info(), dbpool).Msg("database already exists")
return nil
} else if errors.Is(err, ErrNeedsMigration) {
// Schema exists, just run migrations
shdb.AddConnectionInfo(log.Info(), dbpool).Msg("database exists, checking for migrations")
err = dbpool.BeginFunc(WrapContext(ctx, definition.Migrate))
if err != nil {
return errors.Wrap(err, "failed to apply migrations")
}
return nil
} else if errors.Is(err, ErrValueMismatch) {
return err
}

// Schema doesn't exist or is invalid, create it
err = dbpool.BeginFunc(WrapContext(ctx, definition.Create))
if err != nil {
return err
Expand All @@ -35,19 +44,23 @@ func InitDB(ctx context.Context, dbpool *pgxpool.Pool, role string, definition D
return err
}

// Run any migrations after initial creation
err = dbpool.BeginFunc(WrapContext(ctx, definition.Migrate))
if err != nil {
return errors.Wrap(err, "failed to apply migrations")
}

// For the outer DB initialisation, also set the database version to
// the overall "role", so that e.g. a snapshot keyper database won't be
// used by another keyper implementation, no matter if the schemas
// are compatible
err = dbpool.BeginFunc(
ctx,
func(tx pgx.Tx) error {
return InsertDBVersion(ctx, tx, role)
},
)
err = dbpool.BeginFunc(ctx, func(tx pgx.Tx) error {
return InsertDBVersion(ctx, tx, role)
})
if err != nil {
return err
}

shdb.AddConnectionInfo(log.Info(), dbpool).Msg("database initialized")
return nil
}
Expand Down Expand Up @@ -120,17 +133,27 @@ func (d AggregateDefinition) Validate(ctx context.Context, tx pgx.Tx) error {
return nil
}

func (d AggregateDefinition) Migrate(ctx context.Context, tx pgx.Tx) error {
for def := range d.defs {
err := def.Migrate(ctx, tx)
if err != nil {
return errors.Wrapf(err, "migration failed for definition '%s'", def.Name())
}
}
return nil
}

type Definition interface {
Name() string
Create(context.Context, pgx.Tx) error
Init(context.Context, pgx.Tx) error
Validate(context.Context, pgx.Tx) error
Migrate(context.Context, pgx.Tx) error
}

type Schema struct {
Version int
Name string
Path string
Name string
Path string
}

type Migration struct {
Expand Down
14 changes: 14 additions & 0 deletions rolling-shutter/medley/db/meta.sqlc.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 40 additions & 6 deletions rolling-shutter/medley/db/metadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ package db
import (
"context"
"fmt"
"strconv"

"github.com/jackc/pgx/v4"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)

var (
ErrValueMismatch = errors.New("database has unexpected value")
ErrKeyNotFound = errors.New("key does not exist")
ErrValueMismatch = errors.New("database has unexpected value")
ErrKeyNotFound = errors.New("key does not exist")
ErrNeedsMigration = errors.New("needs migration")
)

var DatabaseVersionKey string = "database-version"
Expand All @@ -29,8 +31,8 @@ func InsertDBVersion(ctx context.Context, tx pgx.Tx, version string) error {
return insertMetaInf(ctx, tx, DatabaseVersionKey, version)
}

func InsertSchemaVersion(ctx context.Context, tx pgx.Tx, definitionName string, schema Schema) error {
return insertMetaInf(ctx, tx, MakeSchemaVersionKey(definitionName, schema.Name), fmt.Sprint(schema.Version))
func InsertSchemaVersion(ctx context.Context, tx pgx.Tx, definitionName string, schema Schema, version int) error {
return insertMetaInf(ctx, tx, MakeSchemaVersionKey(definitionName, schema.Name), fmt.Sprint(version))
}

func insertMetaInf(ctx context.Context, tx pgx.Tx, key, val string) error {
Expand All @@ -42,9 +44,26 @@ func insertMetaInf(ctx context.Context, tx pgx.Tx, key, val string) error {
})
}

func UpdateSchemaVersion(ctx context.Context, tx pgx.Tx, defName string, schema Schema, version int) error {
return New(tx).UpdateMeta(ctx, UpdateMetaParams{
Key: MakeSchemaVersionKey(defName, schema.Name),
Value: fmt.Sprint(version),
})
}

// ValidateSchemaVersion checks that the database schema is compatible.
func ValidateSchemaVersion(ctx context.Context, tx pgx.Tx, definitionName string, schema Schema) error {
return expectMetaKeyVal(ctx, tx, MakeSchemaVersionKey(definitionName, schema.Name), fmt.Sprint(schema.Version))
func ValidateSchemaVersion(ctx context.Context, tx pgx.Tx, definitionName string, schema Schema, version int) error {
haveVersion, err := GetSchemaVersion(ctx, tx, definitionName, schema)
if err != nil {
return err
}
if haveVersion < version {
return errors.Wrapf(ErrNeedsMigration, "expected version %d, have %d", version, haveVersion)
}
if haveVersion != version {
return errors.Wrapf(ErrValueMismatch, "expected version %d, have %d", version, haveVersion)
}
return nil
}

func expectMetaKeyVal(ctx context.Context, tx pgx.Tx, key, val string) error {
Expand All @@ -67,3 +86,18 @@ func expectMetaKeyVal(ctx context.Context, tx pgx.Tx, key, val string) error {
func ValidateDatabaseVersion(ctx context.Context, tx pgx.Tx, version string) error {
return expectMetaKeyVal(ctx, tx, DatabaseVersionKey, version)
}

func GetSchemaVersion(ctx context.Context, tx pgx.Tx, definitionName string, schema Schema) (int, error) {
key := MakeSchemaVersionKey(definitionName, schema.Name)
haveVal, err := New(tx).GetMeta(ctx, key)
if err == pgx.ErrNoRows {
return 0, errors.Wrapf(ErrKeyNotFound, "key: %s", key)
} else if err != nil {
return 0, errors.Wrapf(err, "failed to get key '%s' from meta_inf table", key)
}
version, err := strconv.ParseInt(haveVal, 10, 0)
if err != nil {
return 0, errors.Wrapf(err, "failed to convert version '%s' from meta_inf table", key)
}
return int(version), nil
}
3 changes: 3 additions & 0 deletions rolling-shutter/medley/db/sql/queries/meta.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ INSERT INTO meta_inf (key, value) VALUES ($1, $2);

-- name: GetMeta :one
SELECT value FROM meta_inf WHERE key = $1;

-- name: UpdateMeta :exec
UPDATE meta_inf SET value = $1 WHERE key = $2;
Loading