diff --git a/rolling-shutter/cmd/gnosiskeyper/gnosiskeyper.go b/rolling-shutter/cmd/gnosiskeyper/gnosiskeyper.go index 2c6fa02e1..767bd70fb 100644 --- a/rolling-shutter/cmd/gnosiskeyper/gnosiskeyper.go +++ b/rolling-shutter/cmd/gnosiskeyper/gnosiskeyper.go @@ -11,6 +11,7 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/cmd/shversion" "github.com/shutter-network/rolling-shutter/rolling-shutter/gnosiskeyperwatcher" keyper "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis" + keyperconfig "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/config" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/configuration/command" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/db" @@ -38,7 +39,7 @@ Shuttermint node which have to be started separately in advance.`, return builder.Command() } -func main(config *keyper.Config) error { +func main(config *keyperconfig.Config) error { log.Info(). Str("version", shversion.Version()). Str("address", config.GetAddress().Hex()). @@ -49,7 +50,7 @@ func main(config *keyper.Config) error { return service.RunWithSighandler(context.Background(), kpr) } -func initDB(cfg *keyper.Config) error { +func initDB(cfg *keyperconfig.Config) error { ctx := context.Background() dbpool, err := pgxpool.Connect(ctx, cfg.DatabaseURL) if err != nil { @@ -59,7 +60,7 @@ func initDB(cfg *keyper.Config) error { return db.InitDB(ctx, dbpool, database.Definition.Name(), database.Definition) } -func watch(cfg *keyper.Config) error { +func watch(cfg *keyperconfig.Config) error { log.Info().Msg("starting monitor") return service.RunWithSighandler(context.Background(), gnosiskeyperwatcher.New(cfg)) } diff --git a/rolling-shutter/gnosisaccessnode/config.go b/rolling-shutter/gnosisaccessnode/config.go index de2c93731..2fa2a531a 100644 --- a/rolling-shutter/gnosisaccessnode/config.go +++ b/rolling-shutter/gnosisaccessnode/config.go @@ -3,7 +3,7 @@ package gnosisaccessnode import ( "io" - gnosiskeyper "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis" + gnosisconfig "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/config" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/configuration" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/metricsserver" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2p" @@ -13,7 +13,7 @@ type Config struct { InstanceID uint64 GnosisNode *configuration.EthnodeConfig - Contracts *gnosiskeyper.GnosisContractsConfig + Contracts *gnosisconfig.GnosisContractsConfig P2P *p2p.Config Metrics *metricsserver.MetricsConfig @@ -22,7 +22,7 @@ type Config struct { func (c *Config) Init() { c.GnosisNode = configuration.NewEthnodeConfig() - c.Contracts = gnosiskeyper.NewGnosisContractsConfig() + c.Contracts = gnosisconfig.NewGnosisContractsConfig() c.P2P = p2p.NewConfig() c.Metrics = metricsserver.NewConfig() } diff --git a/rolling-shutter/gnosisaccessnode/decryptionkeyshandler.go b/rolling-shutter/gnosisaccessnode/decryptionkeyshandler.go index 230ddecf6..4f3aed677 100644 --- a/rolling-shutter/gnosisaccessnode/decryptionkeyshandler.go +++ b/rolling-shutter/gnosisaccessnode/decryptionkeyshandler.go @@ -10,19 +10,20 @@ import ( "github.com/shutter-network/shutter/shlib/shcrypto" + "github.com/shutter-network/rolling-shutter/rolling-shutter/gnosisaccessnode/storage" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2pmsg" ) type DecryptionKeysHandler struct { config *Config - storage *Storage + storage *storage.Memory } -func NewDecryptionKeysHandler(config *Config, storage *Storage) *DecryptionKeysHandler { +func NewDecryptionKeysHandler(config *Config, store *storage.Memory) *DecryptionKeysHandler { return &DecryptionKeysHandler{ config: config, - storage: storage, + storage: store, } } diff --git a/rolling-shutter/gnosisaccessnode/node.go b/rolling-shutter/gnosisaccessnode/node.go index 277a3dfb3..adebf214d 100644 --- a/rolling-shutter/gnosisaccessnode/node.go +++ b/rolling-shutter/gnosisaccessnode/node.go @@ -9,6 +9,7 @@ import ( "github.com/shutter-network/shutter/shlib/shcrypto" obskeyperdatabase "github.com/shutter-network/rolling-shutter/rolling-shutter/chainobserver/db/keyper" + "github.com/shutter-network/rolling-shutter/rolling-shutter/gnosisaccessnode/storage" chainsync "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/legacychainsync" syncevent "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/legacychainsync/event" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/metricsserver" @@ -19,13 +20,13 @@ import ( type GnosisAccessNode struct { config *Config - storage *Storage + storage *storage.Memory } func New(config *Config) *GnosisAccessNode { return &GnosisAccessNode{ config: config, - storage: NewStorage(), + storage: storage.NewMemory(), } } diff --git a/rolling-shutter/gnosisaccessnode/storage.go b/rolling-shutter/gnosisaccessnode/storage/storage.go similarity index 63% rename from rolling-shutter/gnosisaccessnode/storage.go rename to rolling-shutter/gnosisaccessnode/storage/storage.go index c03ecf808..986f08c52 100644 --- a/rolling-shutter/gnosisaccessnode/storage.go +++ b/rolling-shutter/gnosisaccessnode/storage/storage.go @@ -1,4 +1,4 @@ -package gnosisaccessnode +package storage import ( "sync" @@ -8,28 +8,28 @@ import ( obskeyperdatabase "github.com/shutter-network/rolling-shutter/rolling-shutter/chainobserver/db/keyper" ) -type Storage struct { +type Memory struct { mu sync.Mutex eonKeys map[uint64]*shcrypto.EonPublicKey keyperSets map[uint64]*obskeyperdatabase.KeyperSet } -func NewStorage() *Storage { - return &Storage{ +func NewMemory() *Memory { + return &Memory{ mu: sync.Mutex{}, eonKeys: make(map[uint64]*shcrypto.EonPublicKey), keyperSets: make(map[uint64]*obskeyperdatabase.KeyperSet), } } -func (s *Storage) AddEonKey(keyperConfigIndex uint64, key *shcrypto.EonPublicKey) { +func (s *Memory) AddEonKey(keyperConfigIndex uint64, key *shcrypto.EonPublicKey) { s.mu.Lock() defer s.mu.Unlock() s.eonKeys[keyperConfigIndex] = key } -func (s *Storage) GetEonKey(keyperConfigIndex uint64) (*shcrypto.EonPublicKey, bool) { +func (s *Memory) GetEonKey(keyperConfigIndex uint64) (*shcrypto.EonPublicKey, bool) { s.mu.Lock() defer s.mu.Unlock() @@ -37,14 +37,14 @@ func (s *Storage) GetEonKey(keyperConfigIndex uint64) (*shcrypto.EonPublicKey, b return v, ok } -func (s *Storage) AddKeyperSet(keyperConfigIndex uint64, keyperSet *obskeyperdatabase.KeyperSet) { +func (s *Memory) AddKeyperSet(keyperConfigIndex uint64, keyperSet *obskeyperdatabase.KeyperSet) { s.mu.Lock() defer s.mu.Unlock() s.keyperSets[keyperConfigIndex] = keyperSet } -func (s *Storage) GetKeyperSet(keyperConfigIndex uint64) (*obskeyperdatabase.KeyperSet, bool) { +func (s *Memory) GetKeyperSet(keyperConfigIndex uint64) (*obskeyperdatabase.KeyperSet, bool) { s.mu.Lock() defer s.mu.Unlock() diff --git a/rolling-shutter/gnosisaccessnode/synchandler/eonkey.go b/rolling-shutter/gnosisaccessnode/synchandler/eonkey.go new file mode 100644 index 000000000..46e700392 --- /dev/null +++ b/rolling-shutter/gnosisaccessnode/synchandler/eonkey.go @@ -0,0 +1,89 @@ +package synchandler + +import ( + "context" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/rs/zerolog/log" + "github.com/shutter-network/shop-contracts/bindings" + + "github.com/shutter-network/shutter/shlib/shcrypto" + + "github.com/shutter-network/rolling-shutter/rolling-shutter/gnosisaccessnode/storage" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer" +) + +func init() { + var err error + KeyBroadcastContractContractABI, err = bindings.KeyBroadcastContractMetaData.GetAbi() + if err != nil { + panic(err) + } +} + +var KeyBroadcastContractContractABI *abi.ABI + +func NewEonKeyBroadcast( + store *storage.Memory, + contractAddress common.Address, +) (syncer.ContractEventHandler, error) { + return syncer.WrapHandler(&EonKeyBroadcast{ + evABI: KeyBroadcastContractContractABI, + keyBroadcastAddress: contractAddress, + storage: store, + }) +} + +type EonKeyBroadcast struct { + storage *storage.Memory + evABI *abi.ABI + keyBroadcastAddress common.Address +} + +func (handler *EonKeyBroadcast) Address() common.Address { + return handler.keyBroadcastAddress +} + +func (*EonKeyBroadcast) Event() string { + // TODO: look this up that his is correct + return "EonKeyBroadcast" +} + +func (handler *EonKeyBroadcast) ABI() abi.ABI { + return *handler.evABI +} + +func (handler *EonKeyBroadcast) Accept( + _ context.Context, + _ types.Header, + _ bindings.KeyBroadcastContractEonKeyBroadcast, +) (bool, error) { + return true, nil +} + +func (handler *EonKeyBroadcast) Handle( + _ context.Context, + _ syncer.ChainUpdateContext, + events []bindings.KeyBroadcastContractEonKeyBroadcast, +) error { + for _, ev := range events { + key := new(shcrypto.EonPublicKey) + err := key.Unmarshal(ev.Key) + if err != nil { + log.Error(). + Err(err). + Hex("key", ev.Key). + Int("keyper-config-index", int(ev.Eon)). + Msg("received invalid eon key") + return nil + } + log.Info(). + Int("keyper-config-index", int(ev.Eon)). + Hex("key", ev.Key). + Msg("adding eon key") + handler.storage.AddEonKey(ev.Eon, key) + } + return nil +} diff --git a/rolling-shutter/gnosisaccessnode/synchandler/keypersetmanager.go b/rolling-shutter/gnosisaccessnode/synchandler/keypersetmanager.go new file mode 100644 index 000000000..1d56f79cf --- /dev/null +++ b/rolling-shutter/gnosisaccessnode/synchandler/keypersetmanager.go @@ -0,0 +1,112 @@ +package synchandler + +import ( + "context" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/rs/zerolog/log" + "github.com/shutter-network/shop-contracts/bindings" + + obskeyperdatabase "github.com/shutter-network/rolling-shutter/rolling-shutter/chainobserver/db/keyper" + "github.com/shutter-network/rolling-shutter/rolling-shutter/gnosisaccessnode/storage" + keypersetsync "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/synchandler" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer" + "github.com/shutter-network/rolling-shutter/rolling-shutter/shdb" +) + +func init() { + var err error + KeyperSetManagerContractABI, err = bindings.KeyperSetManagerMetaData.GetAbi() + if err != nil { + panic(err) + } +} + +var KeyperSetManagerContractABI *abi.ABI + +func NewKeyperSetAdded( + ethClient client.Client, + store *storage.Memory, + contractAddress common.Address, +) (syncer.ContractEventHandler, error) { + // we need access to an additional contract here in oder to pull in some more + // required information about the keyper sets: + ksm, err := bindings.NewKeyperSetManager(contractAddress, ethClient) + if err != nil { + return nil, err + } + return syncer.WrapHandler(&KeyperSetAdded{ + storage: store, + evABI: KeyperSetManagerContractABI, + keyperSetManagerAddress: contractAddress, + keyperSetManager: ksm, + ethClient: ethClient, + }) +} + +type KeyperSetAdded struct { + storage *storage.Memory + evABI *abi.ABI + keyperSetManagerAddress common.Address + ethClient client.Client + keyperSetManager *bindings.KeyperSetManager +} + +func (handler *KeyperSetAdded) Address() common.Address { + return handler.keyperSetManagerAddress +} + +func (*KeyperSetAdded) Event() string { + return "KeyperSetAdded" +} + +func (handler *KeyperSetAdded) ABI() abi.ABI { + return *handler.evABI +} + +func (handler *KeyperSetAdded) Accept( + _ context.Context, + _ types.Header, + _ bindings.KeyperSetManagerKeyperSetAdded, +) (bool, error) { + return true, nil +} + +func (handler *KeyperSetAdded) Handle( + ctx context.Context, + update syncer.ChainUpdateContext, + events []bindings.KeyperSetManagerKeyperSetAdded, +) error { + // TODO: handle reorgs here + _ = update + + for _, ev := range events { + keyperSet, err := keypersetsync.QueryFullKeyperSetFromKeyperSetAddedEvent( + ctx, + handler.ethClient, + ev, + handler.keyperSetManager, + ) + if err != nil { + log.Error().Err(err).Msg("KeyperSetAdded event, error querying keyperset-data") + } + // FIXME: integer overflow protection + obsKeyperSet := obskeyperdatabase.KeyperSet{ + KeyperConfigIndex: int64(keyperSet.Eon), + ActivationBlockNumber: int64(keyperSet.ActivationBlock), + Keypers: shdb.EncodeAddresses(keyperSet.Members), + Threshold: int32(keyperSet.Threshold), + } + log.Info(). + Uint64("keyper-config-index", keyperSet.Eon). + Uint64("activation-block-number", keyperSet.ActivationBlock). + Int("num-keypers", len(keyperSet.Members)). + Uint64("threshold", keyperSet.Threshold). + Msg("adding keyper set") + handler.storage.AddKeyperSet(keyperSet.Eon, &obsKeyperSet) + } + return nil +} diff --git a/rolling-shutter/gnosiskeyperwatcher/blocks.go b/rolling-shutter/gnosiskeyperwatcher/blocks.go index 66ca62a41..15da65933 100644 --- a/rolling-shutter/gnosiskeyperwatcher/blocks.go +++ b/rolling-shutter/gnosiskeyperwatcher/blocks.go @@ -8,12 +8,12 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/rs/zerolog/log" - keyper "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis" + gnosisconfig "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/config" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) type BlocksWatcher struct { - config *keyper.Config + config *gnosisconfig.Config blocksChannel chan *BlockReceivedEvent } @@ -22,7 +22,7 @@ type BlockReceivedEvent struct { Time time.Time } -func NewBlocksWatcher(config *keyper.Config, blocksChannel chan *BlockReceivedEvent) *BlocksWatcher { +func NewBlocksWatcher(config *gnosisconfig.Config, blocksChannel chan *BlockReceivedEvent) *BlocksWatcher { return &BlocksWatcher{ config: config, blocksChannel: blocksChannel, diff --git a/rolling-shutter/gnosiskeyperwatcher/keys.go b/rolling-shutter/gnosiskeyperwatcher/keys.go index 0ea8eedb4..3549ab952 100644 --- a/rolling-shutter/gnosiskeyperwatcher/keys.go +++ b/rolling-shutter/gnosiskeyperwatcher/keys.go @@ -9,14 +9,14 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/rs/zerolog/log" - keyper "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis" + gnosisconfig "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/config" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2p" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2pmsg" ) type KeysWatcher struct { - config *keyper.Config + config *gnosisconfig.Config blocksChannel chan *BlockReceivedEvent recentBlocksMux sync.Mutex @@ -24,7 +24,7 @@ type KeysWatcher struct { mostRecentBlock uint64 } -func NewKeysWatcher(config *keyper.Config, blocksChannel chan *BlockReceivedEvent) *KeysWatcher { +func NewKeysWatcher(config *gnosisconfig.Config, blocksChannel chan *BlockReceivedEvent) *KeysWatcher { return &KeysWatcher{ config: config, blocksChannel: blocksChannel, diff --git a/rolling-shutter/gnosiskeyperwatcher/watcher.go b/rolling-shutter/gnosiskeyperwatcher/watcher.go index 58f09e028..afce09cda 100644 --- a/rolling-shutter/gnosiskeyperwatcher/watcher.go +++ b/rolling-shutter/gnosiskeyperwatcher/watcher.go @@ -3,15 +3,15 @@ package gnosiskeyperwatcher import ( "context" - keyper "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis" + gnosisconfig "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/config" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) type Watcher struct { - config *keyper.Config + config *gnosisconfig.Config } -func New(config *keyper.Config) *Watcher { +func New(config *gnosisconfig.Config) *Watcher { return &Watcher{ config: config, } diff --git a/rolling-shutter/keyper/synchandler/keypersetadded.go b/rolling-shutter/keyper/synchandler/keypersetadded.go new file mode 100644 index 000000000..bde7e67fd --- /dev/null +++ b/rolling-shutter/keyper/synchandler/keypersetadded.go @@ -0,0 +1,233 @@ +package synchandler + +import ( + "context" + "fmt" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + "github.com/shutter-network/shop-contracts/bindings" + + obskeyper "github.com/shutter-network/rolling-shutter/rolling-shutter/chainobserver/db/keyper" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number" + "github.com/shutter-network/rolling-shutter/rolling-shutter/shdb" +) + +var ErrParseKeyperSet = errors.New("can't parse KeyperSet") + +func makeCallError(attrName string, err error) error { + return fmt.Errorf("could not retrieve `%s` from contract: %w", attrName, err) +} + +func init() { + var err error + KeyperSetManagerContractABI, err = bindings.KeyperSetManagerMetaData.GetAbi() + if err != nil { + panic(err) + } +} + +var KeyperSetManagerContractABI *abi.ABI + +func NewKeyperSetAdded( + db *pgxpool.Pool, + ethClient client.Client, + contractAddress, + ethereumAddress common.Address, +) (syncer.ContractEventHandler, error) { + ksm, err := bindings.NewKeyperSetManager(contractAddress, ethClient) + if err != nil { + return nil, err + } + return syncer.WrapHandler(&KeyperSetAdded{ + evABI: KeyperSetManagerContractABI, + keyperSetManagerAddress: contractAddress, + ethereumAddress: ethereumAddress, + dbpool: db, + ethClient: ethClient, + keyperSetManager: ksm, + }) +} + +type KeyperSetAdded struct { + evABI *abi.ABI + keyperSetManagerAddress common.Address + // our address to check wether we are part of the keyper-set + ethereumAddress common.Address + dbpool *pgxpool.Pool + + // we only need this because we have to poll + // additional data from the contract + ethClient client.Client + keyperSetManager *bindings.KeyperSetManager +} + +func (handler *KeyperSetAdded) Address() common.Address { + return handler.keyperSetManagerAddress +} + +func (*KeyperSetAdded) Event() string { + return "KeyperSetAdded" +} + +func (handler *KeyperSetAdded) ABI() abi.ABI { + return *handler.evABI +} + +func (handler *KeyperSetAdded) Accept( + _ context.Context, + _ types.Header, + _ bindings.KeyperSetManagerKeyperSetAdded, +) (bool, error) { + return true, nil +} + +func (handler *KeyperSetAdded) Handle( + ctx context.Context, + _ syncer.ChainUpdateContext, + events []bindings.KeyperSetManagerKeyperSetAdded, +) error { + // TODO: we don't handle reorgs here. + // This is because we don't have a good way to deal with + // them: + // When we originally insert the event, we would have to save + // the insert block-hash in the db and upon a reorg delete the keypersets + // by insert block-hash. We can't do this in production, since we don't + // have a good database migration strategy and framework yet. + + for _, ev := range events { + ks, err := QueryFullKeyperSetFromKeyperSetAddedEvent(ctx, handler.ethClient, ev, handler.keyperSetManager) + if err != nil { + log.Error(). + Err(err). + Msg("KeyperSetAdded event, error querying keyperset-data") + } + err = handler.processNewKeyperSet(ctx, ks) + if err != nil { + log.Error().Err(err).Msg("KeyperSetAdded event, error writing to database") + } + } + return nil +} + +func (handler *KeyperSetAdded) processNewKeyperSet(ctx context.Context, ev *KeyperSet) error { + isMember := false + for _, m := range ev.Members { + if m.Cmp(handler.ethereumAddress) == 0 { + isMember = true + break + } + } + log.Info(). + Uint64("activation-block", ev.ActivationBlock). + Uint64("eon", ev.Eon). + Int("num-members", len(ev.Members)). + Uint64("threshold", ev.Threshold). + Bool("is-member", isMember). + Msg("new keyper set added") + + // TODO: before, we were notifying the SequencerTransactionSubmitted + // handler when we were part of the newly inserted KeyperSet. + // This was an optimisation measure to not let the node + // insert SequencerTransactionSubmitted events into it's DB for + // Eons that are before the point where it is part of the KeyperSet. + // This optimisation is for now omitted, since it unnecessarily coupled + // both handler. It was mainly done like this to avoid adding an + // additional field 'weAreMember' to the KeyperSet in the db. + // XXX: do we have to insert keypersets we are not part of? + // Maybe it would be easiest to just ignore those keypersets? + + return handler.dbpool.BeginFunc(ctx, func(tx pgx.Tx) error { + obskeyperdb := obskeyper.New(tx) + + keyperConfigIndex, err := medley.Uint64ToInt64Safe(ev.Eon) + if err != nil { + return errors.Wrap(err, ErrParseKeyperSet.Error()) + } + activationBlockNumber, err := medley.Uint64ToInt64Safe(ev.ActivationBlock) + if err != nil { + return errors.Wrap(err, ErrParseKeyperSet.Error()) + } + threshold, err := medley.Uint64ToInt64Safe(ev.Threshold) + if err != nil { + return errors.Wrap(err, ErrParseKeyperSet.Error()) + } + + // we insert the keyperset into the db, even though we are not member of it. + // Since there is no field to mark our keypersets, we would always + // have to iterate over all keypersets... + return obskeyperdb.InsertKeyperSet(ctx, obskeyper.InsertKeyperSetParams{ + KeyperConfigIndex: keyperConfigIndex, + ActivationBlockNumber: activationBlockNumber, + Keypers: shdb.EncodeAddresses(ev.Members), + Threshold: int32(threshold), + }) + }) +} + +type KeyperSet struct { + ActivationBlock uint64 + Members []common.Address + Threshold uint64 + Eon uint64 + + AtBlockNumber *number.BlockNumber +} + +// QueryFullKeyperSetFromKeyperSetAddedEvent polls some additional +// data from the contracts in order to construct the full set of +// information for a keyper-set. +// This has to be done because not all information relevant to +// the keyperset is included in the KeyperSetAdded event. +func QueryFullKeyperSetFromKeyperSetAddedEvent( + ctx context.Context, + ethClient client.Client, + event bindings.KeyperSetManagerKeyperSetAdded, + keyperSetManager *bindings.KeyperSetManager, +) (*KeyperSet, error) { + keyperSet, err := bindings.NewKeyperSet(event.KeyperSetContract, ethClient) + if err != nil { + return nil, fmt.Errorf("can't bind KeyperSet contract: %w", err) + } + opts := &bind.CallOpts{ + BlockHash: event.Raw.BlockHash, + Context: ctx, + } + // the manager only accepts final keyper sets, + // so we expect this to be final now. + final, err := keyperSet.IsFinalized(opts) + if err != nil { + return nil, makeCallError("IsFinalized", err) + } + if !final { + return nil, errors.New("contract did accept unfinalized keyper-sets") + } + members, err := keyperSet.GetMembers(opts) + if err != nil { + return nil, makeCallError("Members", err) + } + threshold, err := keyperSet.GetThreshold(opts) + if err != nil { + return nil, makeCallError("Threshold", err) + } + eon, err := keyperSetManager.GetKeyperSetIndexByBlock(opts, event.ActivationBlock) + if err != nil { + return nil, makeCallError("KeyperSetIndexByBlock", err) + } + return &KeyperSet{ + ActivationBlock: event.ActivationBlock, + Members: members, + Threshold: threshold, + Eon: eon, + AtBlockNumber: number.BigToBlockNumber(opts.BlockNumber), + }, nil +} diff --git a/rolling-shutter/keyperimpl/gnosis/config.go b/rolling-shutter/keyperimpl/gnosis/config/config.go similarity index 99% rename from rolling-shutter/keyperimpl/gnosis/config.go rename to rolling-shutter/keyperimpl/gnosis/config/config.go index 86c4f1863..58418840d 100644 --- a/rolling-shutter/keyperimpl/gnosis/config.go +++ b/rolling-shutter/keyperimpl/gnosis/config/config.go @@ -1,4 +1,4 @@ -package gnosis +package config import ( "io" diff --git a/rolling-shutter/keyperimpl/gnosis/database/gnosiskeyper.sqlc.gen.go b/rolling-shutter/keyperimpl/gnosis/database/gnosiskeyper.sqlc.gen.go index 7281102ee..77d4a0840 100644 --- a/rolling-shutter/keyperimpl/gnosis/database/gnosiskeyper.sqlc.gen.go +++ b/rolling-shutter/keyperimpl/gnosis/database/gnosiskeyper.sqlc.gen.go @@ -12,6 +12,15 @@ import ( "github.com/jackc/pgconn" ) +const deleteTransactionSubmittedEventsFromBlockHash = `-- name: DeleteTransactionSubmittedEventsFromBlockHash :exec +DELETE FROM transaction_submitted_event WHERE block_hash == $1 +` + +func (q *Queries) DeleteTransactionSubmittedEventsFromBlockHash(ctx context.Context, blockHash []byte) error { + _, err := q.db.Exec(ctx, deleteTransactionSubmittedEventsFromBlockHash, blockHash) + return err +} + const deleteTransactionSubmittedEventsFromBlockNumber = `-- name: DeleteTransactionSubmittedEventsFromBlockNumber :exec DELETE FROM transaction_submitted_event WHERE block_number >= $1 ` @@ -38,6 +47,29 @@ func (q *Queries) GetCurrentDecryptionTrigger(ctx context.Context, eon int64) (C return i, err } +const getLatestTransactionSubmittedEvent = `-- name: GetLatestTransactionSubmittedEvent :one +SELECT index, block_number, block_hash, tx_index, log_index, eon, identity_prefix, sender, gas_limit FROM transaction_submitted_event +ORDER BY block_number DESC +LIMIT 1 +` + +func (q *Queries) GetLatestTransactionSubmittedEvent(ctx context.Context) (TransactionSubmittedEvent, error) { + row := q.db.QueryRow(ctx, getLatestTransactionSubmittedEvent) + var i TransactionSubmittedEvent + err := row.Scan( + &i.Index, + &i.BlockNumber, + &i.BlockHash, + &i.TxIndex, + &i.LogIndex, + &i.Eon, + &i.IdentityPrefix, + &i.Sender, + &i.GasLimit, + ) + return i, err +} + const getNumValidatorRegistrations = `-- name: GetNumValidatorRegistrations :one SELECT COUNT(*) FROM validator_registrations ` diff --git a/rolling-shutter/keyperimpl/gnosis/database/sql/queries/gnosiskeyper.sql b/rolling-shutter/keyperimpl/gnosis/database/sql/queries/gnosiskeyper.sql index 2bf2692b2..e2ffa646e 100644 --- a/rolling-shutter/keyperimpl/gnosis/database/sql/queries/gnosiskeyper.sql +++ b/rolling-shutter/keyperimpl/gnosis/database/sql/queries/gnosiskeyper.sql @@ -33,6 +33,10 @@ SET block_hash = $1, block_number = $2, slot = $3; -- name: GetTransactionSubmittedEventsSyncedUntil :one SELECT * FROM transaction_submitted_events_synced_until LIMIT 1; +-- name: GetLatestTransactionSubmittedEvent :one +SELECT * FROM transaction_submitted_event +ORDER BY block_number DESC +LIMIT 1; -- name: GetTransactionSubmittedEventCount :one SELECT max(index) + 1 FROM transaction_submitted_event @@ -41,6 +45,9 @@ WHERE eon = $1; -- name: DeleteTransactionSubmittedEventsFromBlockNumber :exec DELETE FROM transaction_submitted_event WHERE block_number >= $1; +-- name: DeleteTransactionSubmittedEventsFromBlockHash :exec +DELETE FROM transaction_submitted_event WHERE block_hash == $1; + -- name: GetTxPointer :one SELECT * FROM tx_pointer WHERE eon = $1; @@ -119,4 +126,4 @@ ORDER BY block_number DESC, tx_index DESC, log_index DESC LIMIT 1; -- name: GetNumValidatorRegistrations :one -SELECT COUNT(*) FROM validator_registrations; \ No newline at end of file +SELECT COUNT(*) FROM validator_registrations; diff --git a/rolling-shutter/keyperimpl/gnosis/handlers.go b/rolling-shutter/keyperimpl/gnosis/handlers.go index 5485687d5..b2f68af02 100644 --- a/rolling-shutter/keyperimpl/gnosis/handlers.go +++ b/rolling-shutter/keyperimpl/gnosis/handlers.go @@ -16,6 +16,7 @@ import ( corekeyperdatabase "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/gnosisssztypes" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/metrics" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/identitypreimage" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2pmsg" "github.com/shutter-network/rolling-shutter/rolling-shutter/shdb" @@ -338,7 +339,7 @@ func (h *DecryptionKeysHandler) HandleMessage(ctx context.Context, msg p2pmsg.Me } eonString := fmt.Sprint(keys.Eon) - metricsTxPointer.WithLabelValues(eonString).Set(float64(newTxPointer)) - metricsTxPointerAge.WithLabelValues(eonString).Set(0) + metrics.TxPointer.WithLabelValues(eonString).Set(float64(newTxPointer)) + metrics.TxPointerAge.WithLabelValues(eonString).Set(0) return []p2pmsg.Message{}, nil } diff --git a/rolling-shutter/keyperimpl/gnosis/keyper.go b/rolling-shutter/keyperimpl/gnosis/keyper.go index 134bc67eb..2d056097b 100644 --- a/rolling-shutter/keyperimpl/gnosis/keyper.go +++ b/rolling-shutter/keyperimpl/gnosis/keyper.go @@ -17,6 +17,7 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/epochkghandler" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/kprconfig" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/config" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/broker" @@ -40,7 +41,7 @@ const ( type Keyper struct { core *keyper.KeyperCore - config *Config + config *config.Config dbpool *pgxpool.Pool beaconAPIClient *beaconapiclient.Client @@ -60,7 +61,7 @@ type Keyper struct { decryptionTriggerChannel chan *broker.Event[*epochkghandler.DecryptionTrigger] } -func New(c *Config) *Keyper { +func New(c *config.Config) *Keyper { return &Keyper{ config: c, } diff --git a/rolling-shutter/keyperimpl/gnosis/messagingmiddleware.go b/rolling-shutter/keyperimpl/gnosis/messagingmiddleware.go index b5d9cd014..3072358ba 100644 --- a/rolling-shutter/keyperimpl/gnosis/messagingmiddleware.go +++ b/rolling-shutter/keyperimpl/gnosis/messagingmiddleware.go @@ -15,8 +15,10 @@ import ( "google.golang.org/protobuf/proto" obskeyperdatabase "github.com/shutter-network/rolling-shutter/rolling-shutter/chainobserver/db/keyper" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/config" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/gnosisssztypes" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/metrics" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/identitypreimage" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/retry" @@ -26,7 +28,7 @@ import ( ) type MessagingMiddleware struct { - config *Config + config *config.Config messaging p2p.Messaging dbpool *pgxpool.Pool } @@ -62,8 +64,8 @@ func (h *WrappedMessageHandler) HandleMessage(ctx context.Context, msg p2pmsg.Me return replacedMsgs, nil } -func NewMessagingMiddleware(messaging p2p.Messaging, dbpool *pgxpool.Pool, config *Config) *MessagingMiddleware { - return &MessagingMiddleware{messaging: messaging, dbpool: dbpool, config: config} +func NewMessagingMiddleware(messaging p2p.Messaging, dbpool *pgxpool.Pool, cfg *config.Config) *MessagingMiddleware { + return &MessagingMiddleware{messaging: messaging, dbpool: dbpool, config: cfg} } func (i *MessagingMiddleware) Start(_ context.Context, runner service.Runner) error { @@ -188,7 +190,7 @@ func (i *MessagingMiddleware) interceptDecryptionKeyShares( ) slotStartTime := time.Unix(int64(slotStartTimestamp), 0) delta := time.Since(slotStartTime) - metricsKeySharesSentTimeDelta.WithLabelValues(fmt.Sprint(originalMsg.Eon)).Observe(delta.Seconds()) + metrics.KeySharesSentTimeDelta.WithLabelValues(fmt.Sprint(originalMsg.Eon)).Observe(delta.Seconds()) return msg, nil } @@ -278,7 +280,7 @@ func (i *MessagingMiddleware) interceptDecryptionKeys( ) slotStartTime := time.Unix(int64(slotStartTimestamp), 0) delta := time.Since(slotStartTime) - metricsKeysSentTimeDelta.WithLabelValues(fmt.Sprint(originalMsg.Eon)).Observe(delta.Seconds()) + metrics.KeysSentTimeDelta.WithLabelValues(fmt.Sprint(originalMsg.Eon)).Observe(delta.Seconds()) return msg, nil } @@ -308,7 +310,7 @@ func (i *MessagingMiddleware) advanceTxPointer(ctx context.Context, msg *p2pmsg. return errors.Wrap(err, "failed to set tx pointer") } eonString := fmt.Sprint(msg.Eon) - metricsTxPointer.WithLabelValues(eonString).Set(float64(newTxPointer)) - metricsTxPointerAge.WithLabelValues(eonString).Set(0) + metrics.TxPointer.WithLabelValues(eonString).Set(float64(newTxPointer)) + metrics.TxPointerAge.WithLabelValues(eonString).Set(0) return nil } diff --git a/rolling-shutter/keyperimpl/gnosis/metrics.go b/rolling-shutter/keyperimpl/gnosis/metrics/metrics.go similarity index 66% rename from rolling-shutter/keyperimpl/gnosis/metrics.go rename to rolling-shutter/keyperimpl/gnosis/metrics/metrics.go index 5c1ad6f39..d9e023aaa 100644 --- a/rolling-shutter/keyperimpl/gnosis/metrics.go +++ b/rolling-shutter/keyperimpl/gnosis/metrics/metrics.go @@ -1,8 +1,8 @@ -package gnosis +package metrics import "github.com/prometheus/client_golang/prometheus" -var metricsTxPointer = prometheus.NewGaugeVec( +var TxPointer = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "shutter", Subsystem: "gnosis", @@ -12,7 +12,7 @@ var metricsTxPointer = prometheus.NewGaugeVec( []string{"eon"}, ) -var metricsTxPointerAge = prometheus.NewGaugeVec( +var TxPointerAge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "shutter", Subsystem: "gnosis", @@ -22,7 +22,7 @@ var metricsTxPointerAge = prometheus.NewGaugeVec( []string{"eon"}, ) -var metricsLatestTxSubmittedEventIndex = prometheus.NewGaugeVec( +var LatestTxSubmittedEventIndex = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "shutter", Subsystem: "gnosis", @@ -32,7 +32,7 @@ var metricsLatestTxSubmittedEventIndex = prometheus.NewGaugeVec( []string{"eon"}, ) -var metricsTxSubmittedEventsSyncedUntil = prometheus.NewGauge( +var TxSubmittedEventsSyncedUntil = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: "shutter", Subsystem: "gnosis", @@ -41,7 +41,7 @@ var metricsTxSubmittedEventsSyncedUntil = prometheus.NewGauge( }, ) -var metricsValidatorRegistrationsSyncedUntil = prometheus.NewGauge( +var ValidatorRegistrationsSyncedUntil = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: "shutter", Subsystem: "gnosis", @@ -50,7 +50,7 @@ var metricsValidatorRegistrationsSyncedUntil = prometheus.NewGauge( }, ) -var metricsNumValidatorRegistrations = prometheus.NewGauge( +var NumValidatorRegistrations = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: "shutter", Subsystem: "gnosis", @@ -61,7 +61,7 @@ var metricsNumValidatorRegistrations = prometheus.NewGauge( var slotTimeDeltaBuckets = []float64{-5, -4.5, -4.0, -3.5, -3.0, -2.5, -2.0, -1.5, -1.0, -0.5, -0, 1.0, 100} -var metricsKeysSentTimeDelta = prometheus.NewHistogramVec( +var KeysSentTimeDelta = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "shutter", Subsystem: "gnosis", @@ -72,7 +72,7 @@ var metricsKeysSentTimeDelta = prometheus.NewHistogramVec( []string{"eon"}, ) -var metricsKeySharesSentTimeDelta = prometheus.NewHistogramVec( +var KeySharesSentTimeDelta = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "shutter", Subsystem: "gnosis", @@ -84,12 +84,12 @@ var metricsKeySharesSentTimeDelta = prometheus.NewHistogramVec( ) func init() { - prometheus.MustRegister(metricsTxPointer) - prometheus.MustRegister(metricsTxPointerAge) - prometheus.MustRegister(metricsLatestTxSubmittedEventIndex) - prometheus.MustRegister(metricsTxSubmittedEventsSyncedUntil) - prometheus.MustRegister(metricsValidatorRegistrationsSyncedUntil) - prometheus.MustRegister(metricsNumValidatorRegistrations) - prometheus.MustRegister(metricsKeysSentTimeDelta) - prometheus.MustRegister(metricsKeySharesSentTimeDelta) + prometheus.MustRegister(TxPointer) + prometheus.MustRegister(TxPointerAge) + prometheus.MustRegister(LatestTxSubmittedEventIndex) + prometheus.MustRegister(TxSubmittedEventsSyncedUntil) + prometheus.MustRegister(ValidatorRegistrationsSyncedUntil) + prometheus.MustRegister(NumValidatorRegistrations) + prometheus.MustRegister(KeysSentTimeDelta) + prometheus.MustRegister(KeySharesSentTimeDelta) } diff --git a/rolling-shutter/keyperimpl/gnosis/newslot.go b/rolling-shutter/keyperimpl/gnosis/newslot.go index 0b65188eb..fafd7d77f 100644 --- a/rolling-shutter/keyperimpl/gnosis/newslot.go +++ b/rolling-shutter/keyperimpl/gnosis/newslot.go @@ -19,6 +19,7 @@ import ( corekeyperdatabase "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/epochkghandler" gnosisdatabase "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/metrics" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/broker" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/identitypreimage" @@ -217,8 +218,8 @@ func getTxPointer(ctx context.Context, db *pgxpool.Pool, eon int64, maxTxPointer return 0, errors.Wrap(err, "failed to query transaction submitted event count from db") } } - metricsTxPointer.WithLabelValues(eonString).Set(float64(txPointer)) - metricsTxPointerAge.WithLabelValues(eonString).Set(float64(txPointerAge)) + metrics.TxPointer.WithLabelValues(eonString).Set(float64(txPointer)) + metrics.TxPointerAge.WithLabelValues(eonString).Set(float64(txPointerAge)) return txPointer, nil } diff --git a/rolling-shutter/keyperimpl/gnosis/sequencersyncer.go b/rolling-shutter/keyperimpl/gnosis/sequencersyncer.go index c9c3313c1..15778e4e8 100644 --- a/rolling-shutter/keyperimpl/gnosis/sequencersyncer.go +++ b/rolling-shutter/keyperimpl/gnosis/sequencersyncer.go @@ -17,6 +17,7 @@ import ( sequencerBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/metrics" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley" "github.com/shutter-network/rolling-shutter/rolling-shutter/shdb" ) @@ -182,7 +183,7 @@ func (s *SequencerSyncer) syncRange( Int("num-inserted-events", len(filteredEvents)). Int("num-discarded-events", len(events)-len(filteredEvents)). Msg("synced sequencer contract") - metricsTxSubmittedEventsSyncedUntil.Set(float64(end)) + metrics.TxSubmittedEventsSyncedUntil.Set(float64(end)) return nil } @@ -254,7 +255,7 @@ func (s *SequencerSyncer) insertTransactionSubmittedEvents( if err != nil { return errors.Wrap(err, "failed to insert transaction submitted event into db") } - metricsLatestTxSubmittedEventIndex.WithLabelValues(fmt.Sprint(event.Eon)).Set(float64(event.TxIndex)) + metrics.LatestTxSubmittedEventIndex.WithLabelValues(fmt.Sprint(event.Eon)).Set(float64(event.TxIndex)) log.Debug(). Uint64("index", event.TxIndex). Uint64("block", event.Raw.BlockNumber). diff --git a/rolling-shutter/keyperimpl/gnosis/synchandler/chainupdate.go b/rolling-shutter/keyperimpl/gnosis/synchandler/chainupdate.go new file mode 100644 index 000000000..87d8af69c --- /dev/null +++ b/rolling-shutter/keyperimpl/gnosis/synchandler/chainupdate.go @@ -0,0 +1,51 @@ +package synchandler + +import ( + "context" + "fmt" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/hashicorp/go-multierror" + + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer" +) + +var _ syncer.ChainUpdateHandler = &DecryptOnChainUpdate{} + +type DecryptionFunction = func(context.Context, *types.Header) error + +func NewDecryptOnChainUpdate(fn DecryptionFunction) *DecryptOnChainUpdate { + return &DecryptOnChainUpdate{ + decrypt: fn, + } +} + +type DecryptOnChainUpdate struct { + decrypt DecryptionFunction +} + +func (cu *DecryptOnChainUpdate) Handle( + ctx context.Context, + update syncer.ChainUpdateContext, +) (result error) { + // in case of a reorg (non-nil update.Remove segment) we can't roll back any + // changes, since the keys have been release already publicly. + if update.Append != nil { + for _, header := range update.Append.Get() { + // We can call the decrypt function with all updated headers, + // even if this was a reorg. + // This is because the downstream function is expected to keep track of + // what slots have already been sent out and decide on itself wether + // to re-release keys. + err := cu.decrypt(ctx, header) + if err != nil { + result = multierror.Append(result, + fmt.Errorf("failed to decrypt for block %s (num=%d): %w", + header.Hash().String(), + header.Number.Uint64(), + err)) + } + } + } + return nil +} diff --git a/rolling-shutter/keyperimpl/gnosis/synchandler/sequencer.go b/rolling-shutter/keyperimpl/gnosis/synchandler/sequencer.go new file mode 100644 index 000000000..f9c27bbda --- /dev/null +++ b/rolling-shutter/keyperimpl/gnosis/synchandler/sequencer.go @@ -0,0 +1,160 @@ +package synchandler + +import ( + "context" + "fmt" + "math" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + bindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" + + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/metrics" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer" + "github.com/shutter-network/rolling-shutter/rolling-shutter/shdb" +) + +func init() { + var err error + SequencerContractABI, err = bindings.SequencerMetaData.GetAbi() + if err != nil { + panic(err) + } +} + +var SequencerContractABI *abi.ABI + +func NewSequencerTransactionSubmitted(dbPool *pgxpool.Pool, address common.Address) (syncer.ContractEventHandler, error) { + return syncer.WrapHandler( + &SequencerTransactionSubmitted{ + evABI: SequencerContractABI, + address: address, + dbPool: dbPool, + }) +} + +type SequencerTransactionSubmitted struct { + evABI *abi.ABI + address common.Address + + dbPool *pgxpool.Pool +} + +func (sts *SequencerTransactionSubmitted) Address() common.Address { + return sts.address +} + +func (*SequencerTransactionSubmitted) Event() string { + return "TransactionSubmitted" +} + +func (sts *SequencerTransactionSubmitted) ABI() abi.ABI { + return *sts.evABI +} + +func (sts *SequencerTransactionSubmitted) Accept( + _ context.Context, + _ types.Header, + _ bindings.SequencerTransactionSubmitted, +) (bool, error) { + return true, nil +} + +func (sts *SequencerTransactionSubmitted) Handle( + ctx context.Context, + update syncer.ChainUpdateContext, + events []bindings.SequencerTransactionSubmitted, +) error { + numInsertedEvents := 0 + numDiscardedEvents := 0 + err := sts.dbPool.BeginFunc(ctx, func(tx pgx.Tx) error { + db := database.New(tx) + if update.Remove != nil { + for _, header := range update.Remove.Get() { + if err := db.DeleteTransactionSubmittedEventsFromBlockHash(ctx, header.Hash().Bytes()); err != nil { + return errors.Wrap(err, "failed to delete transaction submitted events from db") + } + } + log.Info(). + Int("depth", update.Remove.Len()). + Int64("previous-synced-until", update.Remove.Latest().Number.Int64()). + Int64("new-synced-until", update.Append.Latest().Number.Int64()). + Msg("sync status reset due to reorg") + } + filteredEvents := sts.filterEvents(events) + numDiscardedEvents = len(events) - len(filteredEvents) + for _, event := range filteredEvents { + _, err := db.InsertTransactionSubmittedEvent(ctx, database.InsertTransactionSubmittedEventParams{ + Index: int64(event.TxIndex), + BlockNumber: int64(event.Raw.BlockNumber), + BlockHash: event.Raw.BlockHash[:], + TxIndex: int64(event.Raw.TxIndex), + LogIndex: int64(event.Raw.Index), + Eon: int64(event.Eon), + IdentityPrefix: event.IdentityPrefix[:], + Sender: shdb.EncodeAddress(event.Sender), + GasLimit: event.GasLimit.Int64(), + }) + if err != nil { + return errors.Wrap(err, "failed to insert transaction submitted event into db") + } + numInsertedEvents++ + metrics.LatestTxSubmittedEventIndex.WithLabelValues(fmt.Sprint(event.Eon)).Set(float64(event.TxIndex)) + log.Debug(). + Uint64("index", event.TxIndex). + Uint64("block", event.Raw.BlockNumber). + Uint64("eon", event.Eon). + Hex("identityPrefix", event.IdentityPrefix[:]). + Hex("sender", event.Sender.Bytes()). + Uint64("gasLimit", event.GasLimit.Uint64()). + Msg("synced new transaction submitted event") + } + return nil + }) + log.Info(). + Uint64("start-block", update.Append.Earliest().Number.Uint64()). + Uint64("end-block", update.Append.Latest().Number.Uint64()). + Int("num-inserted-events", numInsertedEvents). + Int("num-discarded-events", numDiscardedEvents). + Msg("synced sequencer contract") + metrics.TxSubmittedEventsSyncedUntil.Set(float64(update.Append.Latest().Number.Uint64())) + return err +} + +func (sts *SequencerTransactionSubmitted) filterEvents( + events []bindings.SequencerTransactionSubmitted, +) []bindings.SequencerTransactionSubmitted { + filteredEvents := []bindings.SequencerTransactionSubmitted{} + // TODO: before the refactoring, a mux'ed field was read here + // in order to filter out Eons that are before "our" first + // keyper-sets eon. + // Do this by direcly reading "our" keyper-sets from the DB + // without manually iterating over all keyper-sets / addresses. + // This requires persisting this information upon insertion + // in the KeyperSetAdded event handler and thus a database migration. + // + // For now, insert all events into the DB. + // We could clean up this from time to time as a temporary measure + // to not make the DB grow unnecessarily. + for _, event := range events { + if event.Eon > math.MaxInt64 || + !event.GasLimit.IsInt64() { + log.Debug(). + Uint64("eon", event.Eon). + Uint64("block-number", event.Raw.BlockNumber). + Str("block-hash", event.Raw.BlockHash.Hex()). + Uint("tx-index", event.Raw.TxIndex). + Uint("log-index", event.Raw.Index). + Msg("ignoring transaction submitted event") + continue + } + filteredEvents = append(filteredEvents, event) + } + return filteredEvents +} diff --git a/rolling-shutter/keyperimpl/gnosis/synchandler/validatorupdated.go b/rolling-shutter/keyperimpl/gnosis/synchandler/validatorupdated.go new file mode 100644 index 000000000..81eb01689 --- /dev/null +++ b/rolling-shutter/keyperimpl/gnosis/synchandler/validatorupdated.go @@ -0,0 +1,261 @@ +package synchandler + +import ( + "context" + "math" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/pkg/errors" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + bindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/validatorregistry" + blst "github.com/supranational/blst/bindings/go" + + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/metrics" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/syncer" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/validatorregistry" +) + +const ValidatorRegistrationMessageVersion = 0 + +func init() { + var err error + ValidatorRegistryContractABI, err = bindings.ValidatorregistryMetaData.GetAbi() + if err != nil { + panic(err) + } +} + +var ValidatorRegistryContractABI *abi.ABI + +func NewValidatorUpdated( + dbPool *pgxpool.Pool, + ethClient *ethclient.Client, + beaconClient *beaconapiclient.Client, + contractAddress common.Address, + chainID uint64, +) (syncer.ContractEventHandler, error) { + contract, err := bindings.NewValidatorregistry(contractAddress, ethClient) + if err != nil { + return nil, err + } + return syncer.WrapHandler( + &ValidatorUpdated{ + evABI: ValidatorRegistryContractABI, + address: contractAddress, + valRegistryContract: contract, + dbPool: dbPool, + beaconClient: beaconClient, + chainID: chainID, + }) +} + +type ValidatorUpdated struct { + evABI *abi.ABI + address common.Address + + valRegistryContract *bindings.Validatorregistry + dbPool *pgxpool.Pool + beaconClient *beaconapiclient.Client + + chainID uint64 +} + +func (vu *ValidatorUpdated) Address() common.Address { + return vu.address +} + +func (*ValidatorUpdated) Event() string { + return "Updated" +} + +func (vu *ValidatorUpdated) ABI() abi.ABI { + return *vu.evABI +} + +func (vu *ValidatorUpdated) Accept( + _ context.Context, + _ types.Header, + _ bindings.ValidatorregistryUpdated, +) (bool, error) { + return true, nil +} + +func (vu *ValidatorUpdated) Handle( + ctx context.Context, + update syncer.ChainUpdateContext, + events []bindings.ValidatorregistryUpdated, +) error { + db := database.New(vu.dbPool) + filteredEvents, err := vu.filterEvents(ctx, events) + if err != nil { + return err + } + err = vu.dbPool.BeginFunc(ctx, func(tx pgx.Tx) error { + dtbs := database.New(tx) + for _, event := range filteredEvents { + msg := new(validatorregistry.RegistrationMessage) + err := msg.Unmarshal(event.Message) + if err != nil { + return errors.Wrap(err, "failed to unmarshal registration message") + } + err = dtbs.InsertValidatorRegistration(ctx, database.InsertValidatorRegistrationParams{ + BlockNumber: int64(event.Raw.BlockNumber), + BlockHash: event.Raw.BlockHash.Bytes(), + TxIndex: int64(event.Raw.TxIndex), + LogIndex: int64(event.Raw.Index), + ValidatorIndex: int64(msg.ValidatorIndex), + Nonce: int64(msg.Nonce), + IsRegistration: msg.IsRegistration, + }) + if err != nil { + return errors.Wrap(err, "failed to insert validator registration into db") + } + } + err = db.SetValidatorRegistrationsSyncedUntil(ctx, database.SetValidatorRegistrationsSyncedUntilParams{ + // TODO: check int64 overflow + BlockNumber: update.Append.Latest().Number.Int64(), + BlockHash: update.Append.Latest().Hash().Bytes(), + }) + if err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + numRegistrations, err := db.GetNumValidatorRegistrations(ctx) + if err != nil { + return errors.Wrap(err, "failed to get number of validator registrations") + } + log.Info(). + Int64("start-block", update.Append.Earliest().Number.Int64()). + Int64("end-block", update.Append.Latest().Number.Int64()). + Int("num-inserted-events", len(filteredEvents)). + Int("num-discarded-events", len(events)-len(filteredEvents)). + Int64("num-registrations", numRegistrations). + Msg("synced validator registry") + + metrics.NumValidatorRegistrations.Set(float64(numRegistrations)) + metrics.ValidatorRegistrationsSyncedUntil.Set(float64(update.Append.Latest().Number.Uint64())) + return nil +} + +func (vu *ValidatorUpdated) filterEvents( + ctx context.Context, + events []bindings.ValidatorregistryUpdated, +) ([]bindings.ValidatorregistryUpdated, error) { + db := database.New(vu.dbPool) + filteredEvents := []bindings.ValidatorregistryUpdated{} + for _, event := range events { + logger := log.With(). + Hex("block-hash", event.Raw.BlockHash.Bytes()). + Uint64("block-number", event.Raw.BlockNumber). + Uint("tx-index", event.Raw.TxIndex). + Uint("log-index", event.Raw.Index). + Logger() + + msg := new(validatorregistry.RegistrationMessage) + err := msg.Unmarshal(event.Message) + if err != nil { + logger.Warn(). + Err(err). + Msg("failed to unmarshal registration message") + continue + } + + if !vu.checkStaticRegistrationMessageFields(msg, event.Raw.Address, logger) { + continue + } + + latestNonce, err := db.GetValidatorRegistrationNonceBefore(ctx, database.GetValidatorRegistrationNonceBeforeParams{ + ValidatorIndex: int64(msg.ValidatorIndex), + BlockNumber: int64(event.Raw.BlockNumber), + TxIndex: int64(event.Raw.TxIndex), + LogIndex: int64(event.Raw.Index), + }) + if err != nil && err != pgx.ErrNoRows { + return nil, errors.Wrapf(err, "failed to query latest nonce for validator %d", msg.ValidatorIndex) + } + if err == pgx.ErrNoRows { + latestNonce = -1 + } + if msg.Nonce > math.MaxInt64 || int64(msg.Nonce) <= latestNonce { + logger.Warn(). + Uint64("nonce", msg.Nonce). + Int64("latest-nonce", latestNonce). + Msg("ignoring registration message with invalid nonce") + continue + } + + validator, err := vu.beaconClient.GetValidatorByIndex(ctx, "head", msg.ValidatorIndex) + if err != nil { + return nil, errors.Wrapf(err, "failed to get validator %d", msg.ValidatorIndex) + } + if validator == nil { + logger.Warn().Msg("ignoring registration message for unknown validator") + continue + } + pubkey, err := validator.Data.Validator.GetPubkey() + if err != nil { + return nil, errors.Wrapf(err, "failed to get pubkey of validator %d", msg.ValidatorIndex) + } + sig := new(blst.P2Affine).Uncompress(event.Signature) + if sig == nil { + logger.Warn().Msg("ignoring registration message with undecodable signature") + continue + } + validSignature := validatorregistry.VerifySignature(sig, pubkey, msg) + if !validSignature { + logger.Warn().Msg("ignoring registration message with invalid signature") + continue + } + + filteredEvents = append(filteredEvents, event) + } + return filteredEvents, nil +} + +func (vu *ValidatorUpdated) checkStaticRegistrationMessageFields( + msg *validatorregistry.RegistrationMessage, + validatorRegistryAddress common.Address, + logger zerolog.Logger, +) bool { + logger = logger.With().Uint64("validator-index", msg.ValidatorIndex).Logger() + if msg.Version != ValidatorRegistrationMessageVersion { + logger.Warn(). + Uint8("version", msg.Version). + Uint8("expected-version", ValidatorRegistrationMessageVersion). + Msg("ignoring registration message with invalid version") + return false + } + if msg.ChainID != vu.chainID { + logger.Warn(). + Uint64("chain-id", msg.ChainID). + Uint64("expected-chain-id", vu.chainID). + Uint64("validator-index", msg.ValidatorIndex). + Msg("ignoring registration message with invalid chain ID") + return false + } + if msg.ValidatorRegistryAddress != validatorRegistryAddress { + logger.Warn(). + Hex("validator-registry-address", msg.ValidatorRegistryAddress.Bytes()). + Hex("expected-validator-registry-address", validatorRegistryAddress.Bytes()). + Msg("ignoring registration message with invalid validator registry address") + return false + } + if msg.ValidatorIndex > math.MaxInt64 { + logger.Warn(). + Msg("ignoring registration message with invalid validator index") + return false + } + return true +} diff --git a/rolling-shutter/keyperimpl/gnosis/validatorsyncer.go b/rolling-shutter/keyperimpl/gnosis/validatorsyncer.go index 56d70c984..1f0b7ff77 100644 --- a/rolling-shutter/keyperimpl/gnosis/validatorsyncer.go +++ b/rolling-shutter/keyperimpl/gnosis/validatorsyncer.go @@ -18,6 +18,7 @@ import ( blst "github.com/supranational/blst/bindings/go" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/metrics" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/validatorregistry" @@ -107,8 +108,8 @@ func (v *ValidatorSyncer) syncRange(ctx context.Context, start, end uint64) erro Int("num-discarded-events", len(events)-len(filteredEvents)). Int64("num-registrations", numRegistrations). Msg("synced validator registry") - metricsNumValidatorRegistrations.Set(float64(numRegistrations)) - metricsValidatorRegistrationsSyncedUntil.Set(float64(end)) + metrics.NumValidatorRegistrations.Set(float64(numRegistrations)) + metrics.ValidatorRegistrationsSyncedUntil.Set(float64(end)) return nil }