diff --git a/rolling-shutter/chainobserver/db/collator/definition.go b/rolling-shutter/chainobserver/db/collator/definition.go index 07f06a1d8..35d31392d 100644 --- a/rolling-shutter/chainobserver/db/collator/definition.go +++ b/rolling-shutter/chainobserver/db/collator/definition.go @@ -16,7 +16,7 @@ var files embed.FS var Definition db.Definition func init() { - def, err := db.NewSQLCDefinition(files, "sql/", "chainobscollator", 1) + def, err := db.NewSQLCDefinition(files, "sql/", "chainobscollator") if err != nil { log.Fatal().Err(err).Msg("failed to initialize DB metadata") } diff --git a/rolling-shutter/chainobserver/db/keyper/definition.go b/rolling-shutter/chainobserver/db/keyper/definition.go index 26e7ddf8e..790a4ac45 100644 --- a/rolling-shutter/chainobserver/db/keyper/definition.go +++ b/rolling-shutter/chainobserver/db/keyper/definition.go @@ -16,7 +16,7 @@ var files embed.FS var Definition db.Definition func init() { - def, err := db.NewSQLCDefinition(files, "sql/", "chainobskeyper", 1) + def, err := db.NewSQLCDefinition(files, "sql/", "chainobskeyper") if err != nil { log.Fatal().Err(err).Msg("failed to initialize DB metadata") } diff --git a/rolling-shutter/chainobserver/db/sync/definition.go b/rolling-shutter/chainobserver/db/sync/definition.go index 196dc3c08..6cd6eaadf 100644 --- a/rolling-shutter/chainobserver/db/sync/definition.go +++ b/rolling-shutter/chainobserver/db/sync/definition.go @@ -16,7 +16,7 @@ var Definition db.Definition func init() { var err error - Definition, err = db.NewSQLCDefinition(files, "sql/", "chainobssync", 1) + Definition, err = db.NewSQLCDefinition(files, "sql/", "chainobssync") if err != nil { log.Fatal().Err(err).Msg("failed to initialize DB metadata") } diff --git a/rolling-shutter/keyper/database/definition.go b/rolling-shutter/keyper/database/definition.go index 0e69d1c8f..1594f20ef 100644 --- a/rolling-shutter/keyper/database/definition.go +++ b/rolling-shutter/keyper/database/definition.go @@ -21,7 +21,7 @@ var files embed.FS var Definition db.Definition func init() { - sqlcDB, err := db.NewSQLCDefinition(files, "sql/", "keyper", 1) + sqlcDB, err := db.NewSQLCDefinition(files, "sql/", "keyper") if err != nil { log.Fatal().Err(err).Msg("failed to initialize DB") } diff --git a/rolling-shutter/keyperimpl/gnosis/database/definition.go b/rolling-shutter/keyperimpl/gnosis/database/definition.go index e83d8021f..4e8b1ec8d 100644 --- a/rolling-shutter/keyperimpl/gnosis/database/definition.go +++ b/rolling-shutter/keyperimpl/gnosis/database/definition.go @@ -17,7 +17,7 @@ var files embed.FS var Definition db.Definition func init() { - def, err := db.NewSQLCDefinition(files, "sql/", "gnosiskeyper", 1) + def, err := db.NewSQLCDefinition(files, "sql/", "gnosiskeyper") if err != nil { log.Fatal().Err(err).Msg("failed to initialize DB metadata") } diff --git a/rolling-shutter/keyperimpl/gnosis/database/sql/migrations/V2_validatorRegistrations.sql b/rolling-shutter/keyperimpl/gnosis/database/sql/migrations/V2_validatorRegistrations.sql new file mode 100644 index 000000000..9aac91159 --- /dev/null +++ b/rolling-shutter/keyperimpl/gnosis/database/sql/migrations/V2_validatorRegistrations.sql @@ -0,0 +1,6 @@ +-- schema-version: gnosiskeyper-2 -- +-- migrations need to start from V2... as file name, as the V1 was initial schema + +ALTER TABLE validator_registrations + DROP CONSTRAINT validator_registrations_pkey, + ADD PRIMARY KEY (block_number, tx_index, log_index, validator_index); \ No newline at end of file diff --git a/rolling-shutter/keyperimpl/gnosis/database/sql/schemas/gnosiskeyper.sql b/rolling-shutter/keyperimpl/gnosis/database/sql/schemas/gnosiskeyper.sql index e0928c5b6..cd04083be 100644 --- a/rolling-shutter/keyperimpl/gnosis/database/sql/schemas/gnosiskeyper.sql +++ b/rolling-shutter/keyperimpl/gnosis/database/sql/schemas/gnosiskeyper.sql @@ -66,4 +66,5 @@ CREATE TABLE validator_registrations_synced_until( enforce_one_row bool PRIMARY KEY DEFAULT true, block_hash bytea NOT NULL, block_number bigint NOT NULL CHECK (block_number >= 0) -); \ No newline at end of file +); + diff --git a/rolling-shutter/keyperimpl/shutterservice/database/definition.go b/rolling-shutter/keyperimpl/shutterservice/database/definition.go index 8f9cd7b3c..374a1fbe4 100644 --- a/rolling-shutter/keyperimpl/shutterservice/database/definition.go +++ b/rolling-shutter/keyperimpl/shutterservice/database/definition.go @@ -17,7 +17,7 @@ var files embed.FS var Definition db.Definition func init() { - def, err := db.NewSQLCDefinition(files, "sql/", "shutterservicekeyper", 1) + def, err := db.NewSQLCDefinition(files, "sql/", "shutterservicekeyper") if err != nil { log.Fatal().Err(err).Msg("failed to initialize DB metadata") } diff --git a/rolling-shutter/medley/db/definition.go b/rolling-shutter/medley/db/definition.go index 1b50ef56f..a4b095ee3 100644 --- a/rolling-shutter/medley/db/definition.go +++ b/rolling-shutter/medley/db/definition.go @@ -14,7 +14,7 @@ var metaDefinition Definition func init() { var err error - metaDefinition, err = NewSQLCDefinition(files, "sql/", "meta", 1) + metaDefinition, err = NewSQLCDefinition(files, "sql/", "meta") if err != nil { log.Fatal().Err(err).Msg("failed to initialize DB metadata") } diff --git a/rolling-shutter/medley/db/definitions.go b/rolling-shutter/medley/db/definitions.go index 021a28ca7..f94f36b4b 100644 --- a/rolling-shutter/medley/db/definitions.go +++ b/rolling-shutter/medley/db/definitions.go @@ -17,14 +17,23 @@ import ( // pins the database to a specific role, e.g. "keyper-test" or "snapshot-keyper-production" // in order to prevent later usage of the database with commands that fulfill a different role. func InitDB(ctx context.Context, dbpool *pgxpool.Pool, role string, definition Definition) error { + // First check if schema exists and is valid err := dbpool.BeginFunc(WrapContext(ctx, definition.Validate)) if err == nil { shdb.AddConnectionInfo(log.Info(), dbpool).Msg("database already exists") return nil + } else if errors.Is(err, ErrNeedsMigration) { + // Schema exists, just run migrations + shdb.AddConnectionInfo(log.Info(), dbpool).Msg("database exists, checking for migrations") + err = dbpool.BeginFunc(WrapContext(ctx, definition.Migrate)) + if err != nil { + return errors.Wrap(err, "failed to apply migrations") + } + return nil } else if errors.Is(err, ErrValueMismatch) { return err } - + // Schema doesn't exist or is invalid, create it err = dbpool.BeginFunc(WrapContext(ctx, definition.Create)) if err != nil { return err @@ -35,19 +44,23 @@ func InitDB(ctx context.Context, dbpool *pgxpool.Pool, role string, definition D return err } + // Run any migrations after initial creation + err = dbpool.BeginFunc(WrapContext(ctx, definition.Migrate)) + if err != nil { + return errors.Wrap(err, "failed to apply migrations") + } + // For the outer DB initialisation, also set the database version to // the overall "role", so that e.g. a snapshot keyper database won't be // used by another keyper implementation, no matter if the schemas // are compatible - err = dbpool.BeginFunc( - ctx, - func(tx pgx.Tx) error { - return InsertDBVersion(ctx, tx, role) - }, - ) + err = dbpool.BeginFunc(ctx, func(tx pgx.Tx) error { + return InsertDBVersion(ctx, tx, role) + }) if err != nil { return err } + shdb.AddConnectionInfo(log.Info(), dbpool).Msg("database initialized") return nil } @@ -120,17 +133,27 @@ func (d AggregateDefinition) Validate(ctx context.Context, tx pgx.Tx) error { return nil } +func (d AggregateDefinition) Migrate(ctx context.Context, tx pgx.Tx) error { + for def := range d.defs { + err := def.Migrate(ctx, tx) + if err != nil { + return errors.Wrapf(err, "migration failed for definition '%s'", def.Name()) + } + } + return nil +} + type Definition interface { Name() string Create(context.Context, pgx.Tx) error Init(context.Context, pgx.Tx) error Validate(context.Context, pgx.Tx) error + Migrate(context.Context, pgx.Tx) error } type Schema struct { - Version int - Name string - Path string + Name string + Path string } type Migration struct { diff --git a/rolling-shutter/medley/db/meta.sqlc.gen.go b/rolling-shutter/medley/db/meta.sqlc.gen.go index b8ee2936b..1035b6fea 100644 --- a/rolling-shutter/medley/db/meta.sqlc.gen.go +++ b/rolling-shutter/medley/db/meta.sqlc.gen.go @@ -33,3 +33,17 @@ func (q *Queries) InsertMeta(ctx context.Context, arg InsertMetaParams) error { _, err := q.db.Exec(ctx, insertMeta, arg.Key, arg.Value) return err } + +const updateMeta = `-- name: UpdateMeta :exec +UPDATE meta_inf SET value = $1 WHERE key = $2 +` + +type UpdateMetaParams struct { + Value string + Key string +} + +func (q *Queries) UpdateMeta(ctx context.Context, arg UpdateMetaParams) error { + _, err := q.db.Exec(ctx, updateMeta, arg.Value, arg.Key) + return err +} diff --git a/rolling-shutter/medley/db/metadb.go b/rolling-shutter/medley/db/metadb.go index e139cb2cb..b76f6008f 100644 --- a/rolling-shutter/medley/db/metadb.go +++ b/rolling-shutter/medley/db/metadb.go @@ -3,6 +3,7 @@ package db import ( "context" "fmt" + "strconv" "github.com/jackc/pgx/v4" "github.com/pkg/errors" @@ -10,8 +11,9 @@ import ( ) var ( - ErrValueMismatch = errors.New("database has unexpected value") - ErrKeyNotFound = errors.New("key does not exist") + ErrValueMismatch = errors.New("database has unexpected value") + ErrKeyNotFound = errors.New("key does not exist") + ErrNeedsMigration = errors.New("needs migration") ) var DatabaseVersionKey string = "database-version" @@ -29,8 +31,8 @@ func InsertDBVersion(ctx context.Context, tx pgx.Tx, version string) error { return insertMetaInf(ctx, tx, DatabaseVersionKey, version) } -func InsertSchemaVersion(ctx context.Context, tx pgx.Tx, definitionName string, schema Schema) error { - return insertMetaInf(ctx, tx, MakeSchemaVersionKey(definitionName, schema.Name), fmt.Sprint(schema.Version)) +func InsertSchemaVersion(ctx context.Context, tx pgx.Tx, definitionName string, schema Schema, version int) error { + return insertMetaInf(ctx, tx, MakeSchemaVersionKey(definitionName, schema.Name), fmt.Sprint(version)) } func insertMetaInf(ctx context.Context, tx pgx.Tx, key, val string) error { @@ -42,9 +44,26 @@ func insertMetaInf(ctx context.Context, tx pgx.Tx, key, val string) error { }) } +func UpdateSchemaVersion(ctx context.Context, tx pgx.Tx, defName string, schema Schema, version int) error { + return New(tx).UpdateMeta(ctx, UpdateMetaParams{ + Key: MakeSchemaVersionKey(defName, schema.Name), + Value: fmt.Sprint(version), + }) +} + // ValidateSchemaVersion checks that the database schema is compatible. -func ValidateSchemaVersion(ctx context.Context, tx pgx.Tx, definitionName string, schema Schema) error { - return expectMetaKeyVal(ctx, tx, MakeSchemaVersionKey(definitionName, schema.Name), fmt.Sprint(schema.Version)) +func ValidateSchemaVersion(ctx context.Context, tx pgx.Tx, definitionName string, schema Schema, version int) error { + haveVersion, err := GetSchemaVersion(ctx, tx, definitionName, schema) + if err != nil { + return err + } + if haveVersion < version { + return errors.Wrapf(ErrNeedsMigration, "expected version %d, have %d", version, haveVersion) + } + if haveVersion != version { + return errors.Wrapf(ErrValueMismatch, "expected version %d, have %d", version, haveVersion) + } + return nil } func expectMetaKeyVal(ctx context.Context, tx pgx.Tx, key, val string) error { @@ -67,3 +86,18 @@ func expectMetaKeyVal(ctx context.Context, tx pgx.Tx, key, val string) error { func ValidateDatabaseVersion(ctx context.Context, tx pgx.Tx, version string) error { return expectMetaKeyVal(ctx, tx, DatabaseVersionKey, version) } + +func GetSchemaVersion(ctx context.Context, tx pgx.Tx, definitionName string, schema Schema) (int, error) { + key := MakeSchemaVersionKey(definitionName, schema.Name) + haveVal, err := New(tx).GetMeta(ctx, key) + if err == pgx.ErrNoRows { + return 0, errors.Wrapf(ErrKeyNotFound, "key: %s", key) + } else if err != nil { + return 0, errors.Wrapf(err, "failed to get key '%s' from meta_inf table", key) + } + version, err := strconv.ParseInt(haveVal, 10, 0) + if err != nil { + return 0, errors.Wrapf(err, "failed to convert version '%s' from meta_inf table", key) + } + return int(version), nil +} diff --git a/rolling-shutter/medley/db/sql/queries/meta.sql b/rolling-shutter/medley/db/sql/queries/meta.sql index a1111c571..b1496e7d4 100644 --- a/rolling-shutter/medley/db/sql/queries/meta.sql +++ b/rolling-shutter/medley/db/sql/queries/meta.sql @@ -3,3 +3,6 @@ INSERT INTO meta_inf (key, value) VALUES ($1, $2); -- name: GetMeta :one SELECT value FROM meta_inf WHERE key = $1; + +-- name: UpdateMeta :exec +UPDATE meta_inf SET value = $1 WHERE key = $2; \ No newline at end of file diff --git a/rolling-shutter/medley/db/sqlc.go b/rolling-shutter/medley/db/sqlc.go index fe769d8bb..3144af393 100644 --- a/rolling-shutter/medley/db/sqlc.go +++ b/rolling-shutter/medley/db/sqlc.go @@ -2,12 +2,15 @@ package db import ( "context" + "fmt" "io/fs" "path" + "sort" "strings" "github.com/jackc/pgx/v4" "github.com/pkg/errors" + "github.com/rs/zerolog/log" "gopkg.in/yaml.v3" ) @@ -30,7 +33,7 @@ type entry struct { // paths of all the schema definition directories. // The schema file will then be stored in the object's state // together with the passed in `version`. -func ParseSQLC(filesystem fs.FS, sqlcPath string, version int) ([]Schema, error) { +func ParseSQLC(filesystem fs.FS, sqlcPath string) ([]Schema, error) { b, err := fs.ReadFile(filesystem, sqlcPath) if err != nil { return nil, err @@ -62,17 +65,13 @@ func ParseSQLC(filesystem fs.FS, sqlcPath string, version int) ([]Schema, error) if i.IsDir() { continue } - // TODO: allow database migrations. - // For now assume only schemas, no migrations. - // However sqlc does recognize migration files from different tools. base, isSQL := strings.CutSuffix(i.Name(), ".sql") if !isSQL { continue } schema := Schema{ - Version: version, - Name: base, - Path: path.Join(schemaDirPath, i.Name()), + Name: base, + Path: path.Join(schemaDirPath, i.Name()), } schemas = append(schemas, schema) } @@ -80,7 +79,7 @@ func ParseSQLC(filesystem fs.FS, sqlcPath string, version int) ([]Schema, error) return schemas, nil } -func NewSQLCDefinition(filesystem fs.FS, sqlcPath string, name string, version int) (*SQLC, error) { +func NewSQLCDefinition(filesystem fs.FS, sqlcPath string, name string) (*SQLC, error) { p := path.Clean(sqlcPath) des, err := fs.ReadDir(filesystem, p) if errors.Is(err, ErrNotADirectory) { @@ -99,15 +98,22 @@ func NewSQLCDefinition(filesystem fs.FS, sqlcPath string, name string, version i if foundPath == "" { return nil, errors.Errorf("SQLC file '%s' does not exists", p) } - schemas, err := ParseSQLC(filesystem, foundPath, version) + schemas, err := ParseSQLC(filesystem, foundPath) if err != nil { return nil, err } - return &SQLC{ + + sqlcdef := &SQLC{ schemas: schemas, filesystem: filesystem, name: name, - }, nil + sqlcPath: sqlcPath, + } + sqlcdef.version, err = sqlcdef.GetLatestMigrationVersion() + if err != nil { + return nil, err + } + return sqlcdef, err } // SQLC implements the `Definition` interface and keeps @@ -116,6 +122,8 @@ type SQLC struct { schemas []Schema filesystem fs.FS name string + sqlcPath string + version int } func (d *SQLC) Name() string { @@ -138,8 +146,10 @@ func (d *SQLC) Create(ctx context.Context, tx pgx.Tx) error { return errors.Wrapf(err, "failed to execute SQL statements for definition '%s'", d.Name()) } } + for _, schema := range d.schemas { - err := InsertSchemaVersion(ctx, tx, d.Name(), schema) + // this is initial creation of db, so create version as one here + err := InsertSchemaVersion(ctx, tx, d.Name(), schema, 1) if err != nil { return err } @@ -151,7 +161,7 @@ func (d *SQLC) Create(ctx context.Context, tx pgx.Tx) error { // with the schema versions of it's schema definitions. func (d *SQLC) Validate(ctx context.Context, tx pgx.Tx) error { for _, schema := range d.schemas { - err := ValidateSchemaVersion(ctx, tx, d.Name(), schema) + err := ValidateSchemaVersion(ctx, tx, d.Name(), schema, d.version) if err != nil { return err } @@ -170,3 +180,115 @@ func (d *SQLC) sqlCreateStatements() []string { } return sqlStatements } + +func (d *SQLC) LoadMigrations() ([]Migration, error) { + migrationsPath := path.Join(path.Dir(d.sqlcPath), "migrations") + entries, err := fs.ReadDir(d.filesystem, migrationsPath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return nil, nil + } + return nil, errors.Wrap(err, "error reading migrations directory") + } + + var migrations []Migration + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + if !strings.HasSuffix(name, ".sql") { + continue + } + + var fileversion int + _, err := fmt.Sscanf(name, "V%d_", &fileversion) + if err != nil { + continue + } + + // We don't support migrations before version 2 + if fileversion < 2 { + return nil, errors.Errorf("migration file %s has version %d, which is less than 2", name, fileversion) + } + + _, err = fs.ReadFile(d.filesystem, path.Join(migrationsPath, name)) + if err != nil { + return nil, errors.Wrapf(err, "failed to read migration %s", name) + } + + migrations = append(migrations, Migration{ + Version: fileversion, + Path: path.Join(migrationsPath, name), + Up: true, + }) + } + + sort.Slice(migrations, func(i, j int) bool { + return migrations[i].Version < migrations[j].Version + }) + + return migrations, nil +} + +func (d *SQLC) Migrate(ctx context.Context, tx pgx.Tx) error { + for _, schema := range d.schemas { + // Get current version from meta-inf + version, err := GetSchemaVersion(ctx, tx, d.Name(), schema) + if err != nil { + return err + } + + migrations, err := d.LoadMigrations() + if err != nil { + return errors.Wrap(err, "failed to load migrations") + } + + // Apply only migrations that are newer than current version + for _, migration := range migrations { + if migration.Version <= version { + continue + } + + content, err := fs.ReadFile(d.filesystem, migration.Path) + if err != nil { + return errors.Wrapf(err, "failed to read migration file %s", migration.Path) + } + + log.Info(). + Str("definition", d.Name()). + Int("from_version", version). + Int("to_version", migration.Version). + Str("path", migration.Path). + Msg("applying migration") + + _, err = tx.Exec(ctx, string(content)) + if err != nil { + return errors.Wrapf(err, "failed to apply migration %d", migration.Version) + } + + // Update version after each successful migration + err = UpdateSchemaVersion(ctx, tx, d.Name(), schema, migration.Version) + if err != nil { + return errors.Wrapf(err, "failed to update schema version to %d", migration.Version) + } + } + } + + return nil +} + +func (d *SQLC) GetLatestMigrationVersion() (int, error) { + migrations, err := d.LoadMigrations() + if err != nil { + return 0, err + } + + if len(migrations) == 0 { + return 1, nil // Return 1 if no migrations exist + } + + // Since migrations are already sorted in LoadMigrations(), + // we can just return the version of the last migration + return migrations[len(migrations)-1].Version, nil +} diff --git a/rolling-shutter/snapshot/database/definition.go b/rolling-shutter/snapshot/database/definition.go index 6ff3c3b04..c2a7c9cd0 100644 --- a/rolling-shutter/snapshot/database/definition.go +++ b/rolling-shutter/snapshot/database/definition.go @@ -15,7 +15,7 @@ var files embed.FS var Definition db.Definition func init() { - def, err := db.NewSQLCDefinition(files, "sql/", "snapshot", 22) + def, err := db.NewSQLCDefinition(files, "sql/", "snapshot") if err != nil { log.Fatal().Err(err).Msg("failed to initialize DB metadata") }