diff --git a/rolling-shutter/docs/rolling-shutter_optimismkeyper.md b/rolling-shutter/docs/rolling-shutter_optimismkeyper.md new file mode 100644 index 000000000..bf0a546e0 --- /dev/null +++ b/rolling-shutter/docs/rolling-shutter_optimismkeyper.md @@ -0,0 +1,34 @@ +## rolling-shutter optimismkeyper + +Run a Shutter optimism keyper node + +### Synopsis + +This command runs a keyper node. It will connect to both an Optimism and a +Shuttermint node which have to be started separately in advance. + +``` +rolling-shutter optimismkeyper [flags] +``` + +### Options + +``` + --config string config file + -h, --help help for optimismkeyper +``` + +### Options inherited from parent commands + +``` + --logformat string set log format, possible values: min, short, long, max (default "long") + --loglevel string set log level, possible values: warn, info, debug (default "info") + --no-color do not write colored logs +``` + +### SEE ALSO + +* [rolling-shutter](rolling-shutter.md) - A collection of commands to run and interact with Rolling Shutter nodes +* [rolling-shutter optimismkeyper generate-config](rolling-shutter_optimismkeyper_generate-config.md) - Generate a 'optimismkeyper' configuration file +* [rolling-shutter optimismkeyper initdb](rolling-shutter_optimismkeyper_initdb.md) - Initialize the database of the 'optimismkeyper' + diff --git a/rolling-shutter/docs/rolling-shutter_optimismkeyper_generate-config.md b/rolling-shutter/docs/rolling-shutter_optimismkeyper_generate-config.md new file mode 100644 index 000000000..55d179a08 --- /dev/null +++ b/rolling-shutter/docs/rolling-shutter_optimismkeyper_generate-config.md @@ -0,0 +1,28 @@ +## rolling-shutter optimismkeyper generate-config + +Generate a 'optimismkeyper' configuration file + +``` +rolling-shutter optimismkeyper generate-config [flags] +``` + +### Options + +``` + -h, --help help for generate-config + --output string output file +``` + +### Options inherited from parent commands + +``` + --config string config file + --logformat string set log format, possible values: min, short, long, max (default "long") + --loglevel string set log level, possible values: warn, info, debug (default "info") + --no-color do not write colored logs +``` + +### SEE ALSO + +* [rolling-shutter optimismkeyper](rolling-shutter_optimismkeyper.md) - Run a Shutter optimism keyper node + diff --git a/rolling-shutter/docs/rolling-shutter_optimismkeyper_initdb.md b/rolling-shutter/docs/rolling-shutter_optimismkeyper_initdb.md new file mode 100644 index 000000000..cec7e7bc0 --- /dev/null +++ b/rolling-shutter/docs/rolling-shutter_optimismkeyper_initdb.md @@ -0,0 +1,27 @@ +## rolling-shutter optimismkeyper initdb + +Initialize the database of the 'optimismkeyper' + +``` +rolling-shutter optimismkeyper initdb [flags] +``` + +### Options + +``` + -h, --help help for initdb +``` + +### Options inherited from parent commands + +``` + --config string config file + --logformat string set log format, possible values: min, short, long, max (default "long") + --loglevel string set log level, possible values: warn, info, debug (default "info") + --no-color do not write colored logs +``` + +### SEE ALSO + +* [rolling-shutter optimismkeyper](rolling-shutter_optimismkeyper.md) - Run a Shutter optimism keyper node + diff --git a/rolling-shutter/keyperimpl/optimism/keyper.go b/rolling-shutter/keyperimpl/optimism/keyper.go index 44941848a..922b1e784 100644 --- a/rolling-shutter/keyperimpl/optimism/keyper.go +++ b/rolling-shutter/keyperimpl/optimism/keyper.go @@ -99,9 +99,10 @@ func (kpr *Keyper) newBlock(_ context.Context, ev *syncevent.LatestBlock) error // TODO: sanity checks - idPreimage := identitypreimage.BigToIdentityPreimage(ev.Number.Int) + latestBlockNumber := ev.Number.Uint64() + idPreimage := identitypreimage.Uint64ToIdentityPreimage(latestBlockNumber + 1) trig := &epochkghandler.DecryptionTrigger{ - BlockNumber: ev.Number.Uint64(), + BlockNumber: latestBlockNumber + 1, IdentityPreimages: []identitypreimage.IdentityPreimage{idPreimage}, } @@ -144,11 +145,14 @@ func (kpr *Keyper) newEonPublicKey(ctx context.Context, pubKey keyper.EonPublicK log.Info(). Uint64("eon", pubKey.Eon). Uint64("activation-block", pubKey.ActivationBlock). + Bytes("pub-key", pubKey.PublicKey). Msg("new eon pk") // Currently all keypers call this and race to call this function first. // For now this is fine, but a keyper should only send a transaction if // the key is not set yet. // Best would be a coordinatated leader election who will broadcast the key. + // FIXME: the syncer receives an empty key byte. + // Is this already tx, err := kpr.l2Client.BroadcastEonKey(ctx, pubKey.Eon, pubKey.PublicKey) if err != nil { log.Error().Err(err).Msg("error broadcasting eon public key") diff --git a/rolling-shutter/medley/chainsync/client.go b/rolling-shutter/medley/chainsync/client.go index 7d2bd57de..6bd674905 100644 --- a/rolling-shutter/medley/chainsync/client.go +++ b/rolling-shutter/medley/chainsync/client.go @@ -24,7 +24,7 @@ var noopLogger = &logger.NoopLogger{} var ErrServiceNotInstantiated = errors.New("service is not instantiated, pass a handler function option") type Client struct { - client.Client + client.SyncEthereumClient log log.Logger options *options @@ -136,7 +136,7 @@ func (s *Client) BroadcastEonKey(ctx context.Context, eon uint64, eonPubKey []by // This value is cached, since it is not expected to change. func (s *Client) ChainID(ctx context.Context) (*big.Int, error) { if s.chainID == nil { - cid, err := s.Client.ChainID(ctx) + cid, err := s.SyncEthereumClient.ChainID(ctx) if err != nil { return nil, err } diff --git a/rolling-shutter/medley/chainsync/client/client.go b/rolling-shutter/medley/chainsync/client/client.go index 0444ab8c9..4924bc8c9 100644 --- a/rolling-shutter/medley/chainsync/client/client.go +++ b/rolling-shutter/medley/chainsync/client/client.go @@ -9,7 +9,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -type Client interface { +type FullEthereumClient interface { Close() ChainID(ctx context.Context) (*big.Int, error) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) @@ -45,3 +45,16 @@ type Client interface { EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) SendTransaction(ctx context.Context, tx *types.Transaction) error } + +type SyncEthereumClient interface { + Close() + ChainID(ctx context.Context) (*big.Int, error) + BlockNumber(ctx context.Context) (uint64, error) + HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) + HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) + FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) + SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) + CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) + TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) +} diff --git a/rolling-shutter/medley/chainsync/client/test.go b/rolling-shutter/medley/chainsync/client/test.go new file mode 100644 index 000000000..d436ac884 --- /dev/null +++ b/rolling-shutter/medley/chainsync/client/test.go @@ -0,0 +1,170 @@ +package client + +import ( + "context" + "errors" + "math/big" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +var ErrNotImplemented = errors.New("not implemented") + +var _ SyncEthereumClient = &TestClient{} + +type TestClient struct { + headerChain []*types.Header + latestHeadIndex int + intialProgress bool + latestHeadEmitter []chan<- *types.Header + latestHeadSubscription []*Subscription +} + +func NewSubscription(idx int) *Subscription { + return &Subscription{ + idx: idx, + err: make(chan error, 1), + } +} + +type Subscription struct { + idx int + err chan error +} + +func (su *Subscription) Unsubscribe() { + // TODO: not implemented yet, but we don't want to panic +} + +func (su *Subscription) Err() <-chan error { + return su.err +} + +type TestClientController struct { + c *TestClient +} + +func NewTestClient() (*TestClient, *TestClientController) { + c := &TestClient{ + headerChain: []*types.Header{}, + latestHeadIndex: 0, + } + ctrl := &TestClientController{c} + return c, ctrl +} + +func (c *TestClientController) ProgressHead() bool { + if c.c.latestHeadIndex >= len(c.c.headerChain)-1 { + return false + } + c.c.latestHeadIndex++ + return true +} + +func (c *TestClientController) EmitEvents(ctx context.Context) error { + if len(c.c.latestHeadEmitter) == 0 { + return nil + } + h := c.c.getLatestHeader() + for _, em := range c.c.latestHeadEmitter { + select { + case em <- h: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil +} + +func (c *TestClientController) AppendNextHeaders(h ...*types.Header) { + c.c.headerChain = append(c.c.headerChain, h...) +} + +func (t *TestClient) ChainID(_ context.Context) (*big.Int, error) { + return big.NewInt(42), nil +} + +func (t *TestClient) Close() { + // TODO: cleanup +} + +func (t *TestClient) getLatestHeader() *types.Header { + if len(t.headerChain) == 0 { + return nil + } + return t.headerChain[t.latestHeadIndex] +} + +func (t *TestClient) searchBlock(f func(*types.Header) bool) *types.Header { + for i := t.latestHeadIndex; i >= 0; i-- { + h := t.headerChain[i] + if f(h) { + return h + } + } + return nil +} + +func (t *TestClient) searchBlockByNumber(number *big.Int) *types.Header { + return t.searchBlock( + func(h *types.Header) bool { + return h.Number.Cmp(number) == 0 + }) +} + +func (t *TestClient) searchBlockByHash(hash common.Hash) *types.Header { + return t.searchBlock( + func(h *types.Header) bool { + return hash.Cmp(h.Hash()) == 0 + }) +} + +func (t *TestClient) BlockNumber(_ context.Context) (uint64, error) { + return t.getLatestHeader().Nonce.Uint64(), nil +} + +func (t *TestClient) HeaderByHash(_ context.Context, hash common.Hash) (*types.Header, error) { + h := t.searchBlockByHash(hash) + if h == nil { + return nil, errors.New("header not found") + } + return h, nil +} + +func (t *TestClient) HeaderByNumber(_ context.Context, number *big.Int) (*types.Header, error) { + if number == nil { + return t.getLatestHeader(), nil + } + h := t.searchBlockByNumber(number) + if h == nil { + return nil, errors.New("header not found") + } + return h, nil +} + +func (t *TestClient) SubscribeNewHead(_ context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { + t.latestHeadEmitter = append(t.latestHeadEmitter, ch) + su := NewSubscription(len(t.latestHeadSubscription) - 1) + t.latestHeadSubscription = append(t.latestHeadSubscription, su) + // TODO: unsubscribe and deleting from the array + // TODO: filling error promise in the subscription + return su, nil +} + +func (t *TestClient) FilterLogs(_ context.Context, _ ethereum.FilterQuery) ([]types.Log, error) { + panic(ErrNotImplemented) +} + +func (t *TestClient) SubscribeFilterLogs(_ context.Context, _ ethereum.FilterQuery, _ chan<- types.Log) (ethereum.Subscription, error) { + panic(ErrNotImplemented) +} + +func (t *TestClient) CodeAt(_ context.Context, _ common.Address, _ *big.Int) ([]byte, error) { + panic(ErrNotImplemented) +} + +func (t *TestClient) TransactionReceipt(_ context.Context, _ common.Hash) (*types.Receipt, error) { + panic(ErrNotImplemented) +} diff --git a/rolling-shutter/medley/chainsync/options.go b/rolling-shutter/medley/chainsync/options.go index 580876243..9a7e43473 100644 --- a/rolling-shutter/medley/chainsync/options.go +++ b/rolling-shutter/medley/chainsync/options.go @@ -24,10 +24,11 @@ type options struct { keyperSetManagerAddress *common.Address keyBroadcastContractAddress *common.Address clientURL string - client syncclient.Client + ethClient syncclient.FullEthereumClient logger log.Logger runner service.Runner syncStart *number.BlockNumber + fetchActivesAtSyncStart bool privKey *ecdsa.PrivateKey handlerShutterState event.ShutterStateHandler @@ -37,118 +38,143 @@ type options struct { } func (o *options) verify() error { - if o.clientURL != "" && o.client != nil { - // TODO: error message - return errors.New("can't use client and client url") + if o.clientURL != "" && o.ethClient != nil { + return errors.New("'WithClient' and 'WithClientURL' options are mutually exclusive") } - if o.clientURL == "" && o.client == nil { - // TODO: error message - return errors.New("have to provide either url or client") + if o.clientURL == "" && o.ethClient == nil { + return errors.New("either 'WithClient' or 'WithClientURL' options are expected") } // TODO: check for the existence of the contract addresses depending on // what handlers are not nil return nil } -// initialize the shutter client and apply the options. -// the context is only the initialisation context, -// and should not be considered to handle the lifecycle -// of shutter clients background workers. -func (o *options) apply(ctx context.Context, c *Client) error { - var ( - client syncclient.Client - err error - ) - if o.clientURL != "" { - o.client, err = ethclient.DialContext(ctx, o.clientURL) - if err != nil { - return err - } - } - client = o.client - c.log = o.logger - - c.Client = client +func (o *options) applyHandler(c *Client) error { + var err error + syncedServices := []syncer.ManualFilterHandler{} - // the nil passthrough will use "latest" for each call, - // but we want to harmonize and fix the sync start to a specific block. - if o.syncStart.IsLatest() { - latestBlock, err := c.Client.BlockNumber(ctx) - if err != nil { - return errors.Wrap(err, "polling latest block") - } - o.syncStart = number.NewBlockNumber(&latestBlock) - } - - c.KeyperSetManager, err = bindings.NewKeyperSetManager(*o.keyperSetManagerAddress, client) + c.KeyperSetManager, err = bindings.NewKeyperSetManager(*o.keyperSetManagerAddress, o.ethClient) if err != nil { return err } c.kssync = &syncer.KeyperSetSyncer{ - Client: client, - Contract: c.KeyperSetManager, - Log: c.log, - StartBlock: o.syncStart, - Handler: o.handlerKeyperSet, + Client: o.ethClient, + Contract: c.KeyperSetManager, + Log: c.log, + Handler: o.handlerKeyperSet, } if o.handlerKeyperSet != nil { - c.services = append(c.services, c.kssync) + syncedServices = append(syncedServices, c.kssync) } - c.KeyBroadcast, err = bindings.NewKeyBroadcastContract(*o.keyBroadcastContractAddress, client) + c.KeyBroadcast, err = bindings.NewKeyBroadcastContract(*o.keyBroadcastContractAddress, o.ethClient) if err != nil { return err } c.epksync = &syncer.EonPubKeySyncer{ - Client: client, + Client: o.ethClient, Log: c.log, KeyBroadcast: c.KeyBroadcast, KeyperSetManager: c.KeyperSetManager, Handler: o.handlerEonPublicKey, - StartBlock: o.syncStart, } if o.handlerEonPublicKey != nil { - c.services = append(c.services, c.epksync) + syncedServices = append(syncedServices, c.epksync) } - c.sssync = &syncer.ShutterStateSyncer{ - Client: client, - Contract: c.KeyperSetManager, - Log: c.log, - Handler: o.handlerShutterState, - StartBlock: o.syncStart, + Client: o.ethClient, + Contract: c.KeyperSetManager, + Log: c.log, + Handler: o.handlerShutterState, } if o.handlerShutterState != nil { - c.services = append(c.services, c.sssync) + syncedServices = append(syncedServices, c.sssync) } - if o.handlerBlock != nil { - c.uhsync = &syncer.UnsafeHeadSyncer{ - Client: client, - Log: c.log, - Handler: o.handlerBlock, + if o.handlerBlock == nil { + // Even if the user is not interested in handling new block events, + // the streaming block handler must be running in order to + // synchronize polling of new contract events. + // Since the handler function is always called, we need to + // inject a noop-handler + o.handlerBlock = func(ctx context.Context, lb *event.LatestBlock) error { + return nil } } + + c.uhsync = &syncer.UnsafeHeadSyncer{ + Client: o.ethClient, + Log: c.log, + Handler: o.handlerBlock, + SyncedHandler: syncedServices, + FetchActiveAtStart: o.fetchActivesAtSyncStart, + SyncStartBlock: o.syncStart, + } if o.handlerBlock != nil { c.services = append(c.services, c.uhsync) } - c.privKey = o.privKey return nil } +// initialize the shutter client and apply the options. +// the context is only the initialisation context, +// and should not be considered to handle the lifecycle +// of shutter clients background workers. +func (o *options) apply(ctx context.Context, c *Client) error { + var ( + client syncclient.SyncEthereumClient + err error + ) + if o.clientURL != "" { + o.ethClient, err = ethclient.DialContext(ctx, o.clientURL) + if err != nil { + return err + } + } + client = o.ethClient + c.SyncEthereumClient = client + + // the nil passthrough will use "latest" for each call, + // but we want to harmonize and fix the sync start to a specific block. + if o.syncStart.IsLatest() { + latestBlock, err := c.SyncEthereumClient.BlockNumber(ctx) + if err != nil { + return errors.Wrap(err, "polling latest block") + } + o.syncStart = number.NewBlockNumber(&latestBlock) + } + + if o.logger != nil { + c.log = o.logger + } + + c.privKey = o.privKey + return o.applyHandler(c) +} + func defaultOptions() *options { return &options{ keyperSetManagerAddress: &predeploy.KeyperSetManagerAddr, keyBroadcastContractAddress: &predeploy.KeyBroadcastContractAddr, clientURL: "", - client: nil, + ethClient: nil, logger: noopLogger, runner: nil, + fetchActivesAtSyncStart: true, syncStart: number.NewBlockNumber(nil), } } -func WithSyncStartBlock(blockNumber *number.BlockNumber) Option { +func WithNoFetchActivesBeforeStart() Option { + return func(o *options) error { + o.fetchActivesAtSyncStart = false + return nil + } +} + +func WithSyncStartBlock( + blockNumber *number.BlockNumber, +) Option { if blockNumber == nil { blockNumber = number.NewBlockNumber(nil) } @@ -193,9 +219,9 @@ func WithLogger(l log.Logger) Option { } } -func WithClient(client syncclient.Client) Option { +func WithClient(client syncclient.FullEthereumClient) Option { return func(o *options) error { - o.client = client + o.ethClient = client return nil } } diff --git a/rolling-shutter/medley/chainsync/syncer/eonpubkey.go b/rolling-shutter/medley/chainsync/syncer/eonpubkey.go index 1c0494845..f055fc118 100644 --- a/rolling-shutter/medley/chainsync/syncer/eonpubkey.go +++ b/rolling-shutter/medley/chainsync/syncer/eonpubkey.go @@ -2,45 +2,78 @@ package syncer import ( "context" - "errors" "fmt" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/log" + "github.com/pkg/errors" "github.com/shutter-network/shop-contracts/bindings" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) +var _ ManualFilterHandler = &EonPubKeySyncer{} + type EonPubKeySyncer struct { - Client client.Client + Client client.SyncEthereumClient Log log.Logger KeyBroadcast *bindings.KeyBroadcastContract KeyperSetManager *bindings.KeyperSetManager - StartBlock *number.BlockNumber Handler event.EonPublicKeyHandler - - keyBroadcastCh chan *bindings.KeyBroadcastContractEonKeyBroadcast } -func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) error { - if s.Handler == nil { - return errors.New("no handler registered") +func (s *EonPubKeySyncer) QueryAndHandle(ctx context.Context, block uint64) error { + s.Log.Info( + "pubsyncer query and handle called", + "block", + block, + ) + opts := &bind.FilterOpts{ + Start: block, + End: &block, + Context: ctx, } - // the latest block still has to be fixed. - // otherwise we could skip some block events - // between the initial poll and the subscription. - if s.StartBlock.IsLatest() { - latest, err := s.Client.BlockNumber(ctx) + iter, err := s.KeyBroadcast.FilterEonKeyBroadcast(opts) + if err != nil { + return err + } + defer iter.Close() + + for iter.Next() { + err := s.handle(ctx, iter.Event) if err != nil { - return err + s.Log.Error( + "handler for `NewKeyperSet` errored", + "error", + err.Error(), + ) } - s.StartBlock.SetUint64(latest) } - pubKs, err := s.getInitialPubKeys(ctx) + if err := iter.Error(); err != nil { + return errors.Wrap(err, "filter iterator error") + } + return nil +} + +func (s *EonPubKeySyncer) handle(ctx context.Context, newEonKey *bindings.KeyBroadcastContractEonKeyBroadcast) error { + pubk := newEonKey.Key + bn := newEonKey.Raw.BlockNumber + ev := &event.EonPublicKey{ + Eon: newEonKey.Eon, + Key: pubk, + AtBlockNumber: number.NewBlockNumber(&bn), + } + err := s.Handler(ctx, ev) + if err != nil { + return err + } + return nil +} + +func (s *EonPubKeySyncer) HandleVirtualEvent(ctx context.Context, block *number.BlockNumber) error { + pubKs, err := s.getInitialPubKeys(ctx, block) if err != nil { return err } @@ -50,33 +83,15 @@ func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) erro return err } } - - watchOpts := &bind.WatchOpts{ - Start: s.StartBlock.ToUInt64Ptr(), - Context: ctx, - } - s.keyBroadcastCh = make(chan *bindings.KeyBroadcastContractEonKeyBroadcast, channelSize) - runner.Defer(func() { - close(s.keyBroadcastCh) - }) - subs, err := s.KeyBroadcast.WatchEonKeyBroadcast(watchOpts, s.keyBroadcastCh) - // FIXME: what to do on subs.Error() - if err != nil { - return err - } - runner.Defer(subs.Unsubscribe) - runner.Go(func() error { - return s.watchNewEonPubkey(ctx) - }) return nil } -func (s *EonPubKeySyncer) getInitialPubKeys(ctx context.Context) ([]*event.EonPublicKey, error) { +func (s *EonPubKeySyncer) getInitialPubKeys(ctx context.Context, block *number.BlockNumber) ([]*event.EonPublicKey, error) { // This blocknumber specifies AT what state // the contract is called opts := &bind.CallOpts{ Context: ctx, - BlockNumber: s.StartBlock.Int, + BlockNumber: block.Int, } numKS, err := s.KeyperSetManager.GetNumKeyperSets(opts) if err != nil { @@ -84,21 +99,27 @@ func (s *EonPubKeySyncer) getInitialPubKeys(ctx context.Context) ([]*event.EonPu } // this blocknumber specifies the argument to the contract // getter - activeEon, err := s.KeyperSetManager.GetKeyperSetIndexByBlock(opts, s.StartBlock.Uint64()) + activeEon, err := s.KeyperSetManager.GetKeyperSetIndexByBlock(opts, block.Uint64()) if err != nil { return nil, err } - initialPubKeys := []*event.EonPublicKey{} + // NOTE: These are pubkeys that at the state of s.StartBlock + // are known to the contracts. + // That way we recreate older broadcast publickey events. + // We are only interested for keys that belong to keyper-set + // that are currently active or will become active in + // the future: for i := activeEon; i < numKS; i++ { e, err := s.GetEonPubKeyForEon(ctx, opts, i) - // FIXME: translate the error that there is no key - // to a continue of the loop - // (key not in mapping error, how can we catch that?) if err != nil { return nil, err } - initialPubKeys = append(initialPubKeys, e) + // if e == nil, this means the keyperset did not broadcast a + // key (yet) + if e != nil { + initialPubKeys = append(initialPubKeys, e) + } } return initialPubKeys, nil } @@ -118,41 +139,17 @@ func (s *EonPubKeySyncer) GetEonPubKeyForEon(ctx context.Context, opts *bind.Cal return nil, err } key, err := s.KeyBroadcast.GetEonKey(opts, eon) - // XXX: can the key be a null byte? - // I think we rather get a index out of bounds error. if err != nil { return nil, err } + // NOTE: Solidity returns the null value whenever + // one tries to access a key in mapping that doesn't exist + if len(key) == 0 { + return nil, nil + } return &event.EonPublicKey{ Eon: eon, Key: key, AtBlockNumber: number.BigToBlockNumber(opts.BlockNumber), }, nil } - -func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context) error { - for { - select { - case newEonKey, ok := <-s.keyBroadcastCh: - if !ok { - return nil - } - bn := newEonKey.Raw.BlockNumber - ev := &event.EonPublicKey{ - Eon: newEonKey.Eon, - Key: newEonKey.Key, - AtBlockNumber: number.NewBlockNumber(&bn), - } - err := s.Handler(ctx, ev) - if err != nil { - s.Log.Error( - "handler for `NewKeyperSet` errored", - "error", - err.Error(), - ) - } - case <-ctx.Done(): - return ctx.Err() - } - } -} diff --git a/rolling-shutter/medley/chainsync/syncer/keyperset.go b/rolling-shutter/medley/chainsync/syncer/keyperset.go index 91ff18810..be9e3c976 100644 --- a/rolling-shutter/medley/chainsync/syncer/keyperset.go +++ b/rolling-shutter/medley/chainsync/syncer/keyperset.go @@ -12,7 +12,6 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) func makeCallError(attrName string, err error) error { @@ -21,37 +20,45 @@ func makeCallError(attrName string, err error) error { const channelSize = 10 -type KeyperSetSyncer struct { - Client client.Client - Contract *bindings.KeyperSetManager - Log log.Logger - StartBlock *number.BlockNumber - Handler event.KeyperSetHandler +var _ ManualFilterHandler = &KeyperSetSyncer{} - keyperAddedCh chan *bindings.KeyperSetManagerKeyperSetAdded +type KeyperSetSyncer struct { + Client client.FullEthereumClient + Contract *bindings.KeyperSetManager + Log log.Logger + Handler event.KeyperSetHandler } -func (s *KeyperSetSyncer) Start(ctx context.Context, runner service.Runner) error { - if s.Handler == nil { - return errors.New("no handler registered") +func (s *KeyperSetSyncer) QueryAndHandle(ctx context.Context, block uint64) error { + opts := &bind.FilterOpts{ + Start: block, + End: &block, + Context: ctx, + } + iter, err := s.Contract.FilterKeyperSetAdded(opts) + if err != nil { + return err } + defer iter.Close() - // the latest block still has to be fixed. - // otherwise we could skip some block events - // between the initial poll and the subscription. - if s.StartBlock.IsLatest() { - latest, err := s.Client.BlockNumber(ctx) + for iter.Next() { + err := s.handle(ctx, iter.Event) if err != nil { - return err + s.Log.Error( + "handler for `NewKeyperSet` errored", + "error", + err.Error(), + ) } - s.StartBlock.SetUint64(latest) } - - watchOpts := &bind.WatchOpts{ - Start: s.StartBlock.ToUInt64Ptr(), - Context: ctx, + if err := iter.Error(); err != nil { + return errors.Wrap(err, "filter iterator error") } - initial, err := s.getInitialKeyperSets(ctx) + return nil +} + +func (s *KeyperSetSyncer) HandleVirtualEvent(ctx context.Context, block *number.BlockNumber) error { + initial, err := s.getInitialKeyperSets(ctx, block) if err != nil { return err } @@ -65,31 +72,18 @@ func (s *KeyperSetSyncer) Start(ctx context.Context, runner service.Runner) erro ) } } - s.keyperAddedCh = make(chan *bindings.KeyperSetManagerKeyperSetAdded, channelSize) - runner.Defer(func() { - close(s.keyperAddedCh) - }) - subs, err := s.Contract.WatchKeyperSetAdded(watchOpts, s.keyperAddedCh) - // FIXME: what to do on subs.Error() - if err != nil { - return err - } - runner.Defer(subs.Unsubscribe) - runner.Go(func() error { - return s.watchNewKeypersService(ctx) - }) return nil } -func (s *KeyperSetSyncer) getInitialKeyperSets(ctx context.Context) ([]*event.KeyperSet, error) { +func (s *KeyperSetSyncer) getInitialKeyperSets(ctx context.Context, block *number.BlockNumber) ([]*event.KeyperSet, error) { opts := &bind.CallOpts{ Context: ctx, - BlockNumber: s.StartBlock.Int, + BlockNumber: block.Int, } if err := guardCallOpts(opts, false); err != nil { return nil, err } - bn := s.StartBlock.ToUInt64Ptr() + bn := block.ToUInt64Ptr() if bn == nil { // this should not be the case return nil, errors.New("start block is 'latest'") @@ -98,7 +92,7 @@ func (s *KeyperSetSyncer) getInitialKeyperSets(ctx context.Context) ([]*event.Ke initialKeyperSets := []*event.KeyperSet{} // this blocknumber specifies the argument to the contract // getter - ks, err := s.GetKeyperSetForBlock(ctx, opts, s.StartBlock) + ks, err := s.GetKeyperSetForBlock(ctx, opts, block) if err != nil { return nil, err } @@ -208,38 +202,20 @@ func (s *KeyperSetSyncer) newEvent( }, nil } -func (s *KeyperSetSyncer) watchNewKeypersService(ctx context.Context) error { - for { - select { - case newKeypers, ok := <-s.keyperAddedCh: - if !ok { - return nil - } - opts := logToCallOpts(ctx, &newKeypers.Raw) - newKeyperSet, err := s.newEvent( - ctx, - opts, - newKeypers.KeyperSetContract, - newKeypers.ActivationBlock, - ) - if err != nil { - s.Log.Error( - "error while fetching new event", - "error", - err.Error(), - ) - continue - } - err = s.Handler(ctx, newKeyperSet) - if err != nil { - s.Log.Error( - "handler for `NewKeyperSet` errored", - "error", - err.Error(), - ) - } - case <-ctx.Done(): - return ctx.Err() - } +func (s *KeyperSetSyncer) handle(ctx context.Context, ev *bindings.KeyperSetManagerKeyperSetAdded) error { + opts := logToCallOpts(ctx, &ev.Raw) + newKeyperSet, err := s.newEvent( + ctx, + opts, + ev.KeyperSetContract, + ev.ActivationBlock, + ) + if err != nil { + return errors.Wrap(err, "fetch new event") } + err = s.Handler(ctx, newKeyperSet) + if err != nil { + return errors.Wrap(err, "call handler") + } + return nil } diff --git a/rolling-shutter/medley/chainsync/syncer/shutterstate.go b/rolling-shutter/medley/chainsync/syncer/shutterstate.go index e262d679a..fbd7773de 100644 --- a/rolling-shutter/medley/chainsync/syncer/shutterstate.go +++ b/rolling-shutter/medley/chainsync/syncer/shutterstate.go @@ -11,18 +11,15 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) -type ShutterStateSyncer struct { - Client client.Client - Contract *bindings.KeyperSetManager - StartBlock *number.BlockNumber - Log log.Logger - Handler event.ShutterStateHandler +var _ ManualFilterHandler = &ShutterStateSyncer{} - pausedCh chan *bindings.KeyperSetManagerPaused - unpausedCh chan *bindings.KeyperSetManagerUnpaused +type ShutterStateSyncer struct { + Client client.SyncEthereumClient + Contract *bindings.KeyperSetManager + Log log.Logger + Handler event.ShutterStateHandler } func (s *ShutterStateSyncer) GetShutterState(ctx context.Context, opts *bind.CallOpts) (*event.ShutterState, error) { @@ -40,48 +37,62 @@ func (s *ShutterStateSyncer) GetShutterState(ctx context.Context, opts *bind.Cal }, nil } -func (s *ShutterStateSyncer) Start(ctx context.Context, runner service.Runner) error { - if s.Handler == nil { - return errors.New("no handler registered") - } - watchOpts := &bind.WatchOpts{ - Start: s.StartBlock.ToUInt64Ptr(), // nil means latest +func (s *ShutterStateSyncer) QueryAndHandle(ctx context.Context, block uint64) error { + opts := &bind.FilterOpts{ + Start: block, + End: &block, Context: ctx, } - s.pausedCh = make(chan *bindings.KeyperSetManagerPaused) - subs, err := s.Contract.WatchPaused(watchOpts, s.pausedCh) - // FIXME: what to do on subs.Error() + iterPaused, err := s.Contract.FilterPaused(opts) if err != nil { return err } - runner.Defer(subs.Unsubscribe) - runner.Defer(func() { - close(s.pausedCh) - }) + defer iterPaused.Close() + + for iterPaused.Next() { + block := iterPaused.Event.Raw.BlockNumber + ev := &event.ShutterState{ + Active: false, + AtBlockNumber: number.NewBlockNumber(&block), + } + s.handle(ctx, ev) + } + if err := iterPaused.Error(); err != nil { + return errors.Wrap(err, "filter iterator error") + } - s.unpausedCh = make(chan *bindings.KeyperSetManagerUnpaused) - subs, err = s.Contract.WatchUnpaused(watchOpts, s.unpausedCh) - // FIXME: what to do on subs.Error() + iterUnpaused, err := s.Contract.FilterUnpaused(opts) if err != nil { return err } - runner.Defer(subs.Unsubscribe) - runner.Defer(func() { - close(s.unpausedCh) - }) + defer iterUnpaused.Close() - runner.Go(func() error { - return s.watchPaused(ctx) - }) + for iterUnpaused.Next() { + block := iterUnpaused.Event.Raw.BlockNumber + ev := &event.ShutterState{ + Active: true, + AtBlockNumber: number.NewBlockNumber(&block), + } + s.handle(ctx, ev) + } + if err := iterUnpaused.Error(); err != nil { + return errors.Wrap(err, "filter iterator error") + } return nil } -func (s *ShutterStateSyncer) pollIsActive(ctx context.Context) (bool, error) { - callOpts := bind.CallOpts{ - Context: ctx, +func (s *ShutterStateSyncer) HandleVirtualEvent(ctx context.Context, block *number.BlockNumber) error { + // query the initial state and re-construct a "virtual" event from the contract state + opts := &bind.CallOpts{ + BlockNumber: block.Int, + Context: ctx, } - paused, err := s.Contract.Paused(&callOpts) - return !paused, err + stateAtBlock, err := s.GetShutterState(ctx, opts) + if err != nil { + return err + } + s.handle(ctx, stateAtBlock) + return nil } func (s *ShutterStateSyncer) handle(ctx context.Context, ev *event.ShutterState) { @@ -94,45 +105,3 @@ func (s *ShutterStateSyncer) handle(ctx context.Context, ev *event.ShutterState) ) } } - -func (s *ShutterStateSyncer) watchPaused(ctx context.Context) error { - isActive, err := s.pollIsActive(ctx) - if err != nil { - // XXX: this will fail everything, do we want that? - return err - } - ev := &event.ShutterState{ - Active: isActive, - } - s.handle(ctx, ev) - for { - select { - case _, ok := <-s.unpausedCh: - if !ok { - return nil - } - if isActive { - s.Log.Error("state mismatch", "got", "actice", "have", "inactive") - } - ev := &event.ShutterState{ - Active: true, - } - isActive = ev.Active - s.handle(ctx, ev) - case _, ok := <-s.pausedCh: - if !ok { - return nil - } - if isActive { - s.Log.Error("state mismatch", "got", "inactive", "have", "active") - } - ev := &event.ShutterState{ - Active: false, - } - isActive = ev.Active - s.handle(ctx, ev) - case <-ctx.Done(): - return ctx.Err() - } - } -} diff --git a/rolling-shutter/medley/chainsync/syncer/unsafehead.go b/rolling-shutter/medley/chainsync/syncer/unsafehead.go index 86ddbd1b3..a90b9c69f 100644 --- a/rolling-shutter/medley/chainsync/syncer/unsafehead.go +++ b/rolling-shutter/medley/chainsync/syncer/unsafehead.go @@ -2,33 +2,73 @@ package syncer import ( "context" - "errors" + "math/big" + "time" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/pkg/errors" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/retry" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) type UnsafeHeadSyncer struct { - Client client.Client + Client client.SyncEthereumClient Log log.Logger Handler event.BlockHandler + // Handler to be manually triggered + // to handle their handler function + // before the own Handler is called: + SyncedHandler []ManualFilterHandler + SyncStartBlock *number.BlockNumber + FetchActiveAtStart bool newLatestHeadCh chan *types.Header + + syncHead *types.Header + nextSyncBlock *big.Int + latestHead *types.Header + headerCache map[uint64]*types.Header } func (s *UnsafeHeadSyncer) Start(ctx context.Context, runner service.Runner) error { + s.Log.Info("unsafe head syncer started") if s.Handler == nil { return errors.New("no handler registered") } + + s.headerCache = map[uint64]*types.Header{} s.newLatestHeadCh = make(chan *types.Header, 1) + _, err := retry.FunctionCall( + ctx, + func(ctx context.Context) (bool, error) { + err := s.fetchInitialHeaders(ctx) + return err == nil, err + }, + ) + if err != nil { + return errors.Wrap(err, "fetch initial latest header and sync start header") + } + s.SyncStartBlock = &number.BlockNumber{Int: s.syncHead.Number} + if s.FetchActiveAtStart { + for _, h := range s.SyncedHandler { + err := h.HandleVirtualEvent(ctx, s.SyncStartBlock) + if err != nil { + s.Log.Error("synced handler call errored, skipping", "error", err) + } + } + } + err = s.handle(ctx, s.syncHead, false) + if err != nil { + return errors.Wrap(err, "handle initial sync block") + } + s.nextSyncBlock = new(big.Int).Add(s.syncHead.Number, big.NewInt(1)) subs, err := s.Client.SubscribeNewHead(ctx, s.newLatestHeadCh) - // FIXME: what to do on subs.Error() if err != nil { return err } @@ -42,28 +82,227 @@ func (s *UnsafeHeadSyncer) Start(ctx context.Context, runner service.Runner) err return nil } -func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context) error { +func (s *UnsafeHeadSyncer) fetchInitialHeaders(ctx context.Context) error { + latest, err := s.Client.HeaderByNumber(ctx, nil) + if err != nil { + return errors.Wrap(err, "fetch latest header") + } + s.latestHead = latest + if s.SyncStartBlock.IsLatest() { + s.syncHead = latest + return nil + } + + start, err := s.Client.HeaderByNumber(ctx, s.SyncStartBlock.Int) + if err != nil { + return errors.Wrap(err, "fetch sync start header") + } + s.syncHead = start + return nil +} + +func parseLogs() error { + return nil +} + +func (s *UnsafeHeadSyncer) handle(ctx context.Context, newHeader *types.Header, reorg bool) error { + blockNum := number.BigToBlockNumber(newHeader.Number) + if reorg { + prevNum := new(big.Int).Sub(newHeader.Number, big.NewInt(1)) + prevBlockNum := number.BigToBlockNumber(prevNum) + // Re-emit the previous block, to pre-emptively signal an + // incoming reorg. Like this a client is able to e.g. + // rewind changes first before processing the new + // events of the reorg + ev := &event.LatestBlock{ + Number: prevBlockNum, + BlockHash: newHeader.ParentHash, + Header: newHeader, + } + err := s.Handler(ctx, ev) + if err != nil { + // XXX: return or log? + // return err + s.Log.Error( + "handler for `NewLatestBlock` errored", + "error", + err.Error(), + ) + } + } + + // TODO: check bloom filter for topic of all + // synced handlers and only call them if + // the bloomfilter retrieves something. + for _, h := range s.SyncedHandler { + // NOTE: this has to be blocking! + // So whenever this returns, it is expected + // that the handlers Handle function + // has been called and it returned. + err := h.QueryAndHandle(ctx, blockNum.Uint64()) + if err != nil { + s.Log.Error("synced handler call errored, skipping", "error", err) + } + } + ev := &event.LatestBlock{ + Number: blockNum, + BlockHash: newHeader.Hash(), + } + err := s.Handler(ctx, ev) + if err != nil { + s.Log.Error( + "handler for `NewLatestBlock` errored", + "error", + err.Error(), + ) + } + return nil +} + +func (s *UnsafeHeadSyncer) fetchHeader(ctx context.Context, num *big.Int) (*types.Header, error) { + h, ok := s.headerCache[num.Uint64()] + if ok { + return h, nil + } + return s.Client.HeaderByNumber(ctx, num) +} + +func (s *UnsafeHeadSyncer) reset(ctx context.Context) error { + // this means the latest head was reset - check wether that + // concerns the current sync status + switch s.latestHead.Number.Cmp(s.syncHead.Number) { + case 1: + // we didn't catch up to the reorg position, so it's safe to ignore + // TODO: delete the forward caches + return nil + case 0: + // we already processed the head we re-orged to + if s.latestHead.Hash().Cmp(s.syncHead.Hash()) == 0 { + return nil + } + } + // definite reorg + if err := s.handle(ctx, s.latestHead, true); err != nil { + return err + } + s.syncHead = s.latestHead + s.nextSyncBlock = new(big.Int).Add(s.latestHead.Number, big.NewInt(1)) + return nil +} + +func (s *UnsafeHeadSyncer) sync(ctx context.Context) (syncing bool, delay time.Duration, err error) { + var newHead *types.Header + s.Log.Info("syncing chain") + + delta := new(big.Int).Sub(s.latestHead.Number, s.nextSyncBlock) + switch delta.Cmp(big.NewInt(0)) { + case 1: + // positive delta, we are still catching up + newHead, err = s.fetchHeader(ctx, s.nextSyncBlock) + syncing = true + delay = 1 * time.Second + case 0: + // next sync is latest head + // use the latest head + newHead = s.latestHead + syncing = false + case -1: + if delta.Cmp(big.NewInt(-1)) != 0 { + // next sync is more than one block further in the future. + // this could mean a reorg, but this should have been called before + // and it shouldn't come to this here + return false, delay, errors.New("unexpected reorg condition in sync") + } + // reorgs are handled outside, at the place new latest-head + // information arrives. + // next sync is 1 into the future. + // this means we called sync but are still waiting for the next latest head. + return false, delay, err + } + if err != nil { + return true, delay, err + } + + if handleErr := s.handle(ctx, newHead, false); handleErr != nil { + return true, delay, handleErr + } + s.syncHead = newHead + delete(s.headerCache, newHead.Number.Uint64()) + s.nextSyncBlock = new(big.Int).Add(newHead.Number, big.NewInt(1)) + + s.Log.Info("chain sync", + "synced-head-num", s.syncHead.Number.Uint64(), + "synced-head-hash", s.syncHead.Hash(), + "latest-head-num", s.latestHead.Number.Uint64(), + "latest-head-hash", s.latestHead.Hash(), + ) + return syncing, delay, err +} + +func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context) error { //nolint: gocyclo + t := time.NewTimer(0) + sync := t.C +work: for { select { + case <-ctx.Done(): + if sync != nil && !t.Stop() { + select { + case <-t.C: + // TODO: the non-blocking select is here as a + // precaution against an already emptied channel. + // This should not be necessary + default: + } + } + return ctx.Err() + case <-sync: + syncing, delay, err := s.sync(ctx) + if err != nil { + s.Log.Error("error during unsafe head sync", "error", err) + } + if !syncing { + s.Log.Info("stop syncing from sync") + sync = nil + continue work + } + t.Reset(delay) case newHeader, ok := <-s.newLatestHeadCh: if !ok { + s.Log.Info("latest head stream closed, exiting handler loop") return nil } - ev := &event.LatestBlock{ - Number: number.BigToBlockNumber(newHeader.Number), - BlockHash: newHeader.Hash(), - Header: newHeader, - } - err := s.Handler(ctx, ev) - if err != nil { - s.Log.Error( - "handler for `NewLatestBlock` errored", - "error", - err.Error(), + s.Log.Info("new latest head from l2 ws-stream", "block-number", newHeader.Number.Uint64()) + if newHeader.Number.Cmp(s.latestHead.Number) <= 0 { + // reorg + s.Log.Info("new latest head is re-orging", + "old-block-number", s.latestHead.Number.Uint64(), + "new-block-number", newHeader.Number.Uint64(), ) + + s.latestHead = newHeader + if err := s.reset(ctx); err != nil { + s.Log.Error("error resetting reorg", "error", err) + } + continue work } - case <-ctx.Done(): - return ctx.Err() + s.headerCache[newHeader.Number.Uint64()] = newHeader + s.latestHead = newHeader + + if sync != nil && !t.Stop() { + // only if sync is still actively waiting, + // we need to drain the timer and reset it + select { + case <-t.C: + // TODO: the non-blocking select is here as a + // precaution against an already emptied channel. + // This should not be necessary + default: + } + } + t.Reset(0) + s.Log.Info("start syncing from latest head stream") + sync = t.C } } } diff --git a/rolling-shutter/medley/chainsync/syncer/unsafehead_test.go b/rolling-shutter/medley/chainsync/syncer/unsafehead_test.go new file mode 100644 index 000000000..380a944f0 --- /dev/null +++ b/rolling-shutter/medley/chainsync/syncer/unsafehead_test.go @@ -0,0 +1,113 @@ +package syncer + +import ( + "context" + "fmt" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "gotest.tools/v3/assert" + + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" +) + +func MakeChain(start int64, startParent common.Hash, numHeader uint, seed int64) []*types.Header { + n := numHeader + parent := startParent + num := big.NewInt(start) + h := []*types.Header{} + + // change the hashes for different seeds + mixinh := common.BigToHash(big.NewInt(seed)) + for n > 0 { + head := &types.Header{ + ParentHash: parent, + Number: num, + MixDigest: mixinh, + } + h = append(h, head) + num = new(big.Int).Add(num, big.NewInt(1)) + parent = head.Hash() + n-- + } + return h +} + +func TestReorg(t *testing.T) { + headersBeforeReorg := MakeChain(1, common.BigToHash(big.NewInt(0)), 10, 42) + branchOff := headersBeforeReorg[5] + // block number 5 will be reorged + headersReorgBranch := MakeChain(branchOff.Number.Int64()+1, branchOff.Hash(), 10, 43) + clnt, ctl := client.NewTestClient() + ctl.AppendNextHeaders(headersBeforeReorg...) + ctl.AppendNextHeaders(headersReorgBranch...) + + handlerBlock := make(chan *event.LatestBlock, 1) + + h := &UnsafeHeadSyncer{ + Client: clnt, + Log: log.New(), + Handler: func(_ context.Context, ev *event.LatestBlock) error { + handlerBlock <- ev + return nil + }, + SyncedHandler: []ManualFilterHandler{}, + SyncStartBlock: number.NewBlockNumber(nil), + FetchActiveAtStart: false, + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + service.RunBackground(ctx, h) + + // intitial sync is independent of the subscription, + // this will get polled from the eth client + b := <-handlerBlock + assert.Assert(t, b.Number.Cmp(headersBeforeReorg[0].Number) == 0) + idx := 1 + for { + ok := ctl.ProgressHead() + assert.Assert(t, ok) + err := ctl.EmitEvents(ctx) + assert.NilError(t, err) + + b = <-handlerBlock + assert.Equal(t, b.Number.Uint64(), headersBeforeReorg[idx].Number.Uint64(), fmt.Sprintf("block number equal for idx %d", idx)) + assert.Equal(t, b.BlockHash, headersBeforeReorg[idx].Hash()) + idx++ + if idx == len(headersBeforeReorg) { + break + } + } + ok := ctl.ProgressHead() + assert.Assert(t, ok) + err := ctl.EmitEvents(ctx) + assert.NilError(t, err) + b = <-handlerBlock + // now the reorg should have happened. + // the handler should have emitted an "artificial" latest head + // event for the block BEFORE the re-orged block + assert.Equal(t, b.Number.Uint64(), headersReorgBranch[0].Number.Uint64()-1, "block number equal for reorg") + assert.Equal(t, b.BlockHash, headersReorgBranch[0].ParentHash) + idx = 0 + for ctl.ProgressHead() { + assert.Assert(t, ok) + err := ctl.EmitEvents(ctx) + assert.NilError(t, err) + + b := <-handlerBlock + assert.Equal(t, b.Number.Uint64(), headersReorgBranch[idx].Number.Uint64(), fmt.Sprintf("block number equal for idx %d", idx)) + assert.Equal(t, b.BlockHash, headersReorgBranch[idx].Hash()) + idx++ + if idx == len(headersReorgBranch) { + break + } + } +} diff --git a/rolling-shutter/medley/chainsync/syncer/util.go b/rolling-shutter/medley/chainsync/syncer/util.go index 0c44cf3dc..8a470f06a 100644 --- a/rolling-shutter/medley/chainsync/syncer/util.go +++ b/rolling-shutter/medley/chainsync/syncer/util.go @@ -18,6 +18,11 @@ var ( errLatestBlock = errors.New("'nil' latest block") ) +type ManualFilterHandler interface { + QueryAndHandle(ctx context.Context, block uint64) error + HandleVirtualEvent(ctx context.Context, block *number.BlockNumber) error +} + func logToCallOpts(ctx context.Context, log *types.Log) *bind.CallOpts { block := new(big.Int) block.SetUint64(log.BlockNumber) @@ -40,7 +45,7 @@ func guardCallOpts(opts *bind.CallOpts, allowLatest bool) error { return nil } -func fixCallOpts(ctx context.Context, c client.Client, opts *bind.CallOpts) (*bind.CallOpts, *uint64, error) { +func fixCallOpts(ctx context.Context, c client.SyncEthereumClient, opts *bind.CallOpts) (*bind.CallOpts, *uint64, error) { err := guardCallOpts(opts, false) if err == nil { return opts, nil, nil diff --git a/rolling-shutter/p2p/params.go b/rolling-shutter/p2p/params.go index ad385e71a..901d53817 100644 --- a/rolling-shutter/p2p/params.go +++ b/rolling-shutter/p2p/params.go @@ -19,7 +19,9 @@ func makePubSubParams( gossipSubParams := &gsDefault // modified defaults from ethereum consensus spec - // https://github.com/ethereum/consensus-specs/blob/5d80b1954a4b7a121aa36143d50b366727b66cbc/specs/phase0/p2p-interface.md#why-are-these-specific-gossip-parameters-chosen //nolint:lll + + //nolint:lll + // https://github.com/ethereum/consensus-specs/blob/5d80b1954a4b7a121aa36143d50b366727b66cbc/specs/phase0/p2p-interface.md#why-are-these-specific-gossip-parameters-chosen gossipSubParams.HeartbeatInterval = 700 * time.Millisecond gossipSubParams.HistoryLength = 6