From acb6f1b33a3babef75d8359b06f0d90b023d8c4f Mon Sep 17 00:00:00 2001 From: Maximilian Langenfeld <15726643+ezdac@users.noreply.github.com> Date: Fri, 10 May 2024 16:20:06 +0200 Subject: [PATCH 1/8] chore(op): add initial op-shutter keyper --- .../docs/rolling-shutter_optimismkeyper.md | 34 +++++++++++++++++++ ...-shutter_optimismkeyper_generate-config.md | 28 +++++++++++++++ .../rolling-shutter_optimismkeyper_initdb.md | 27 +++++++++++++++ 3 files changed, 89 insertions(+) create mode 100644 rolling-shutter/docs/rolling-shutter_optimismkeyper.md create mode 100644 rolling-shutter/docs/rolling-shutter_optimismkeyper_generate-config.md create mode 100644 rolling-shutter/docs/rolling-shutter_optimismkeyper_initdb.md diff --git a/rolling-shutter/docs/rolling-shutter_optimismkeyper.md b/rolling-shutter/docs/rolling-shutter_optimismkeyper.md new file mode 100644 index 000000000..bf0a546e0 --- /dev/null +++ b/rolling-shutter/docs/rolling-shutter_optimismkeyper.md @@ -0,0 +1,34 @@ +## rolling-shutter optimismkeyper + +Run a Shutter optimism keyper node + +### Synopsis + +This command runs a keyper node. It will connect to both an Optimism and a +Shuttermint node which have to be started separately in advance. + +``` +rolling-shutter optimismkeyper [flags] +``` + +### Options + +``` + --config string config file + -h, --help help for optimismkeyper +``` + +### Options inherited from parent commands + +``` + --logformat string set log format, possible values: min, short, long, max (default "long") + --loglevel string set log level, possible values: warn, info, debug (default "info") + --no-color do not write colored logs +``` + +### SEE ALSO + +* [rolling-shutter](rolling-shutter.md) - A collection of commands to run and interact with Rolling Shutter nodes +* [rolling-shutter optimismkeyper generate-config](rolling-shutter_optimismkeyper_generate-config.md) - Generate a 'optimismkeyper' configuration file +* [rolling-shutter optimismkeyper initdb](rolling-shutter_optimismkeyper_initdb.md) - Initialize the database of the 'optimismkeyper' + diff --git a/rolling-shutter/docs/rolling-shutter_optimismkeyper_generate-config.md b/rolling-shutter/docs/rolling-shutter_optimismkeyper_generate-config.md new file mode 100644 index 000000000..55d179a08 --- /dev/null +++ b/rolling-shutter/docs/rolling-shutter_optimismkeyper_generate-config.md @@ -0,0 +1,28 @@ +## rolling-shutter optimismkeyper generate-config + +Generate a 'optimismkeyper' configuration file + +``` +rolling-shutter optimismkeyper generate-config [flags] +``` + +### Options + +``` + -h, --help help for generate-config + --output string output file +``` + +### Options inherited from parent commands + +``` + --config string config file + --logformat string set log format, possible values: min, short, long, max (default "long") + --loglevel string set log level, possible values: warn, info, debug (default "info") + --no-color do not write colored logs +``` + +### SEE ALSO + +* [rolling-shutter optimismkeyper](rolling-shutter_optimismkeyper.md) - Run a Shutter optimism keyper node + diff --git a/rolling-shutter/docs/rolling-shutter_optimismkeyper_initdb.md b/rolling-shutter/docs/rolling-shutter_optimismkeyper_initdb.md new file mode 100644 index 000000000..cec7e7bc0 --- /dev/null +++ b/rolling-shutter/docs/rolling-shutter_optimismkeyper_initdb.md @@ -0,0 +1,27 @@ +## rolling-shutter optimismkeyper initdb + +Initialize the database of the 'optimismkeyper' + +``` +rolling-shutter optimismkeyper initdb [flags] +``` + +### Options + +``` + -h, --help help for initdb +``` + +### Options inherited from parent commands + +``` + --config string config file + --logformat string set log format, possible values: min, short, long, max (default "long") + --loglevel string set log level, possible values: warn, info, debug (default "info") + --no-color do not write colored logs +``` + +### SEE ALSO + +* [rolling-shutter optimismkeyper](rolling-shutter_optimismkeyper.md) - Run a Shutter optimism keyper node + From 2c8842e4ae13218e47c79bfa818b22b7cf8f1a55 Mon Sep 17 00:00:00 2001 From: Maximilian Langenfeld <15726643+ezdac@users.noreply.github.com> Date: Fri, 10 May 2024 16:20:31 +0200 Subject: [PATCH 2/8] feat(chainsync): sync contract event handler with latest head --- rolling-shutter/keyperimpl/optimism/keyper.go | 3 + rolling-shutter/medley/chainsync/client.go | 4 +- .../medley/chainsync/client/client.go | 2 +- rolling-shutter/medley/chainsync/options.go | 91 +++++++---- .../medley/chainsync/syncer/eonpubkey.go | 88 +++++++--- .../medley/chainsync/syncer/keyperset.go | 50 +++++- .../medley/chainsync/syncer/shutterstate.go | 151 +++++++++++++----- .../medley/chainsync/syncer/unsafehead.go | 30 +++- .../medley/chainsync/syncer/util.go | 6 +- 9 files changed, 317 insertions(+), 108 deletions(-) diff --git a/rolling-shutter/keyperimpl/optimism/keyper.go b/rolling-shutter/keyperimpl/optimism/keyper.go index 44941848a..aac660780 100644 --- a/rolling-shutter/keyperimpl/optimism/keyper.go +++ b/rolling-shutter/keyperimpl/optimism/keyper.go @@ -144,11 +144,14 @@ func (kpr *Keyper) newEonPublicKey(ctx context.Context, pubKey keyper.EonPublicK log.Info(). Uint64("eon", pubKey.Eon). Uint64("activation-block", pubKey.ActivationBlock). + Bytes("pub-key", pubKey.PublicKey). Msg("new eon pk") // Currently all keypers call this and race to call this function first. // For now this is fine, but a keyper should only send a transaction if // the key is not set yet. // Best would be a coordinatated leader election who will broadcast the key. + // FIXME: the syncer receives an empty key byte. + // Is this already tx, err := kpr.l2Client.BroadcastEonKey(ctx, pubKey.Eon, pubKey.PublicKey) if err != nil { log.Error().Err(err).Msg("error broadcasting eon public key") diff --git a/rolling-shutter/medley/chainsync/client.go b/rolling-shutter/medley/chainsync/client.go index 7d2bd57de..bb2d3f3b0 100644 --- a/rolling-shutter/medley/chainsync/client.go +++ b/rolling-shutter/medley/chainsync/client.go @@ -24,7 +24,7 @@ var noopLogger = &logger.NoopLogger{} var ErrServiceNotInstantiated = errors.New("service is not instantiated, pass a handler function option") type Client struct { - client.Client + client.EthereumClient log log.Logger options *options @@ -136,7 +136,7 @@ func (s *Client) BroadcastEonKey(ctx context.Context, eon uint64, eonPubKey []by // This value is cached, since it is not expected to change. func (s *Client) ChainID(ctx context.Context) (*big.Int, error) { if s.chainID == nil { - cid, err := s.Client.ChainID(ctx) + cid, err := s.EthereumClient.ChainID(ctx) if err != nil { return nil, err } diff --git a/rolling-shutter/medley/chainsync/client/client.go b/rolling-shutter/medley/chainsync/client/client.go index 0444ab8c9..a69e2692a 100644 --- a/rolling-shutter/medley/chainsync/client/client.go +++ b/rolling-shutter/medley/chainsync/client/client.go @@ -9,7 +9,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -type Client interface { +type EthereumClient interface { Close() ChainID(ctx context.Context) (*big.Int, error) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) diff --git a/rolling-shutter/medley/chainsync/options.go b/rolling-shutter/medley/chainsync/options.go index 580876243..36d8e3c9a 100644 --- a/rolling-shutter/medley/chainsync/options.go +++ b/rolling-shutter/medley/chainsync/options.go @@ -24,7 +24,7 @@ type options struct { keyperSetManagerAddress *common.Address keyBroadcastContractAddress *common.Address clientURL string - client syncclient.Client + ethClient syncclient.EthereumClient logger log.Logger runner service.Runner syncStart *number.BlockNumber @@ -37,11 +37,11 @@ type options struct { } func (o *options) verify() error { - if o.clientURL != "" && o.client != nil { + if o.clientURL != "" && o.ethClient != nil { // TODO: error message return errors.New("can't use client and client url") } - if o.clientURL == "" && o.client == nil { + if o.clientURL == "" && o.ethClient == nil { // TODO: error message return errors.New("have to provide either url or client") } @@ -56,24 +56,30 @@ func (o *options) verify() error { // of shutter clients background workers. func (o *options) apply(ctx context.Context, c *Client) error { var ( - client syncclient.Client + client syncclient.EthereumClient err error ) if o.clientURL != "" { - o.client, err = ethclient.DialContext(ctx, o.clientURL) + o.ethClient, err = ethclient.DialContext(ctx, o.clientURL) if err != nil { return err } } - client = o.client - c.log = o.logger + client = o.ethClient - c.Client = client + c.EthereumClient = client + if o.logger != nil { + c.log = o.logger + // NOCHECKIN: + c.log.Info("got logger in options") + } + + syncedServices := []syncer.ManualFilterHandler{} // the nil passthrough will use "latest" for each call, // but we want to harmonize and fix the sync start to a specific block. if o.syncStart.IsLatest() { - latestBlock, err := c.Client.BlockNumber(ctx) + latestBlock, err := c.EthereumClient.BlockNumber(ctx) if err != nil { return errors.Wrap(err, "polling latest block") } @@ -85,14 +91,16 @@ func (o *options) apply(ctx context.Context, c *Client) error { return err } c.kssync = &syncer.KeyperSetSyncer{ - Client: client, - Contract: c.KeyperSetManager, - Log: c.log, - StartBlock: o.syncStart, - Handler: o.handlerKeyperSet, + Client: client, + Contract: c.KeyperSetManager, + Log: c.log, + StartBlock: o.syncStart, + Handler: o.handlerKeyperSet, + DisableEventWatcher: true, } if o.handlerKeyperSet != nil { c.services = append(c.services, c.kssync) + syncedServices = append(syncedServices, c.kssync) } c.KeyBroadcast, err = bindings.NewKeyBroadcastContract(*o.keyBroadcastContractAddress, client) @@ -100,35 +108,52 @@ func (o *options) apply(ctx context.Context, c *Client) error { return err } c.epksync = &syncer.EonPubKeySyncer{ - Client: client, - Log: c.log, - KeyBroadcast: c.KeyBroadcast, - KeyperSetManager: c.KeyperSetManager, - Handler: o.handlerEonPublicKey, - StartBlock: o.syncStart, + Client: client, + Log: c.log, + KeyBroadcast: c.KeyBroadcast, + KeyperSetManager: c.KeyperSetManager, + Handler: o.handlerEonPublicKey, + StartBlock: o.syncStart, + DisableEventWatcher: true, } if o.handlerEonPublicKey != nil { c.services = append(c.services, c.epksync) + syncedServices = append(syncedServices, c.epksync) } c.sssync = &syncer.ShutterStateSyncer{ - Client: client, - Contract: c.KeyperSetManager, - Log: c.log, - Handler: o.handlerShutterState, - StartBlock: o.syncStart, + Client: client, + Contract: c.KeyperSetManager, + Log: c.log, + Handler: o.handlerShutterState, + StartBlock: o.syncStart, + DisableEventWatcher: true, } if o.handlerShutterState != nil { c.services = append(c.services, c.sssync) + syncedServices = append(syncedServices, c.sssync) } - if o.handlerBlock != nil { - c.uhsync = &syncer.UnsafeHeadSyncer{ - Client: client, - Log: c.log, - Handler: o.handlerBlock, + if o.handlerBlock == nil { + // NOOP - but we need to run the UnsafeHeadSyncer. + // This is to keep the inner workings consisten, + // we use the DisableEventWatcher mechanism in combination + // with the UnsafeHeadSyncer instead of the streaming + // Watch... subscription on events. + // TODO: think about allowing the streaming events, + // when guaranteed event order (event1,event2,new-block-event) + // is not required + o.handlerBlock = func(ctx context.Context, lb *event.LatestBlock) error { + return nil } } + + c.uhsync = &syncer.UnsafeHeadSyncer{ + Client: client, + Log: c.log, + Handler: o.handlerBlock, + SyncedHandler: syncedServices, + } if o.handlerBlock != nil { c.services = append(c.services, c.uhsync) } @@ -141,7 +166,7 @@ func defaultOptions() *options { keyperSetManagerAddress: &predeploy.KeyperSetManagerAddr, keyBroadcastContractAddress: &predeploy.KeyBroadcastContractAddr, clientURL: "", - client: nil, + ethClient: nil, logger: noopLogger, runner: nil, syncStart: number.NewBlockNumber(nil), @@ -193,9 +218,9 @@ func WithLogger(l log.Logger) Option { } } -func WithClient(client syncclient.Client) Option { +func WithClient(client syncclient.EthereumClient) Option { return func(o *options) error { - o.client = client + o.ethClient = client return nil } } diff --git a/rolling-shutter/medley/chainsync/syncer/eonpubkey.go b/rolling-shutter/medley/chainsync/syncer/eonpubkey.go index 1c0494845..635a10028 100644 --- a/rolling-shutter/medley/chainsync/syncer/eonpubkey.go +++ b/rolling-shutter/medley/chainsync/syncer/eonpubkey.go @@ -2,11 +2,11 @@ package syncer import ( "context" - "errors" "fmt" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/log" + "github.com/pkg/errors" "github.com/shutter-network/shop-contracts/bindings" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" @@ -15,18 +15,54 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) +var _ ManualFilterHandler = &EonPubKeySyncer{} + type EonPubKeySyncer struct { - Client client.Client - Log log.Logger - KeyBroadcast *bindings.KeyBroadcastContract - KeyperSetManager *bindings.KeyperSetManager - StartBlock *number.BlockNumber - Handler event.EonPublicKeyHandler + Client client.EthereumClient + Log log.Logger + KeyBroadcast *bindings.KeyBroadcastContract + KeyperSetManager *bindings.KeyperSetManager + StartBlock *number.BlockNumber + Handler event.EonPublicKeyHandler + DisableEventWatcher bool keyBroadcastCh chan *bindings.KeyBroadcastContractEonKeyBroadcast } +func (s *EonPubKeySyncer) QueryAndHandle(ctx context.Context, block uint64) error { + s.Log.Info( + "pubsyncer query and handle called", + "block", + block, + ) + opts := &bind.FilterOpts{ + Start: block, + End: &block, + Context: ctx, + } + iter, err := s.KeyBroadcast.FilterEonKeyBroadcast(opts) + if err != nil { + return err + } + defer iter.Close() + + for iter.Next() { + select { + case s.keyBroadcastCh <- iter.Event: + case <-ctx.Done(): + return ctx.Err() + } + } + if err := iter.Error(); err != nil { + return errors.Wrap(err, "filter iterator error") + } + return nil +} + func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) error { + s.Log.Info( + "pubsyncer loop started", + ) if s.Handler == nil { return errors.New("no handler registered") } @@ -59,12 +95,14 @@ func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) erro runner.Defer(func() { close(s.keyBroadcastCh) }) - subs, err := s.KeyBroadcast.WatchEonKeyBroadcast(watchOpts, s.keyBroadcastCh) - // FIXME: what to do on subs.Error() - if err != nil { - return err + if !s.DisableEventWatcher { + subs, err := s.KeyBroadcast.WatchEonKeyBroadcast(watchOpts, s.keyBroadcastCh) + // FIXME: what to do on subs.Error() + if err != nil { + return err + } + runner.Defer(subs.Unsubscribe) } - runner.Defer(subs.Unsubscribe) runner.Go(func() error { return s.watchNewEonPubkey(ctx) }) @@ -88,17 +126,23 @@ func (s *EonPubKeySyncer) getInitialPubKeys(ctx context.Context) ([]*event.EonPu if err != nil { return nil, err } - initialPubKeys := []*event.EonPublicKey{} + // NOTE: These are pubkeys that at the state of s.StartBlock + // are known to the contracts. + // That way we recreate older broadcast publickey events. + // We are only interested for keys that belong to keyper-set + // that are currently active or will become active in + // the future: for i := activeEon; i < numKS; i++ { e, err := s.GetEonPubKeyForEon(ctx, opts, i) - // FIXME: translate the error that there is no key - // to a continue of the loop - // (key not in mapping error, how can we catch that?) if err != nil { return nil, err } - initialPubKeys = append(initialPubKeys, e) + // if e == nil, this means the keyperset did not broadcast a + // key (yet) + if e != nil { + initialPubKeys = append(initialPubKeys, e) + } } return initialPubKeys, nil } @@ -118,11 +162,14 @@ func (s *EonPubKeySyncer) GetEonPubKeyForEon(ctx context.Context, opts *bind.Cal return nil, err } key, err := s.KeyBroadcast.GetEonKey(opts, eon) - // XXX: can the key be a null byte? - // I think we rather get a index out of bounds error. if err != nil { return nil, err } + // NOTE: Solidity returns the null value whenever + // one tries to access a key in mapping that doesn't exist + if len(key) == 0 { + return nil, nil + } return &event.EonPublicKey{ Eon: eon, Key: key, @@ -137,10 +184,11 @@ func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context) error { if !ok { return nil } + pubk := newEonKey.Key bn := newEonKey.Raw.BlockNumber ev := &event.EonPublicKey{ Eon: newEonKey.Eon, - Key: newEonKey.Key, + Key: pubk, AtBlockNumber: number.NewBlockNumber(&bn), } err := s.Handler(ctx, ev) diff --git a/rolling-shutter/medley/chainsync/syncer/keyperset.go b/rolling-shutter/medley/chainsync/syncer/keyperset.go index 91ff18810..aef0db113 100644 --- a/rolling-shutter/medley/chainsync/syncer/keyperset.go +++ b/rolling-shutter/medley/chainsync/syncer/keyperset.go @@ -21,16 +21,52 @@ func makeCallError(attrName string, err error) error { const channelSize = 10 +var _ ManualFilterHandler = &KeyperSetSyncer{} + type KeyperSetSyncer struct { - Client client.Client + Client client.EthereumClient Contract *bindings.KeyperSetManager Log log.Logger StartBlock *number.BlockNumber Handler event.KeyperSetHandler + // disable this when the QueryAndHandle manual polling should be used: + DisableEventWatcher bool keyperAddedCh chan *bindings.KeyperSetManagerKeyperSetAdded } +func (s *KeyperSetSyncer) QueryAndHandle(ctx context.Context, block uint64) error { + opts := &bind.FilterOpts{ + // FIXME: does this work, or do we need index -1 or something? + Start: block, + End: &block, + Context: ctx, + } + iter, err := s.Contract.FilterKeyperSetAdded(opts) + // TODO: what errors possible? + if err != nil { + return err + } + defer iter.Close() + + for iter.Next() { + select { + // XXX: this can be nil during the handlers startup. + // As far as I understand, a nil channel is never selected. + // Will it be selected as soon as the channel is not nil anymore? + case s.keyperAddedCh <- iter.Event: + case <-ctx.Done(): + return ctx.Err() + } + } + // XXX: it looks like this is nil when the iterator is + // exhausted without failures + if err := iter.Error(); err != nil { + return errors.Wrap(err, "filter iterator error") + } + return nil +} + func (s *KeyperSetSyncer) Start(ctx context.Context, runner service.Runner) error { if s.Handler == nil { return errors.New("no handler registered") @@ -69,12 +105,14 @@ func (s *KeyperSetSyncer) Start(ctx context.Context, runner service.Runner) erro runner.Defer(func() { close(s.keyperAddedCh) }) - subs, err := s.Contract.WatchKeyperSetAdded(watchOpts, s.keyperAddedCh) - // FIXME: what to do on subs.Error() - if err != nil { - return err + if !s.DisableEventWatcher { + subs, err := s.Contract.WatchKeyperSetAdded(watchOpts, s.keyperAddedCh) + // FIXME: what to do on subs.Error() + if err != nil { + return err + } + runner.Defer(subs.Unsubscribe) } - runner.Defer(subs.Unsubscribe) runner.Go(func() error { return s.watchNewKeypersService(ctx) }) diff --git a/rolling-shutter/medley/chainsync/syncer/shutterstate.go b/rolling-shutter/medley/chainsync/syncer/shutterstate.go index e262d679a..a510d4650 100644 --- a/rolling-shutter/medley/chainsync/syncer/shutterstate.go +++ b/rolling-shutter/medley/chainsync/syncer/shutterstate.go @@ -14,12 +14,15 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) +var _ ManualFilterHandler = &ShutterStateSyncer{} + type ShutterStateSyncer struct { - Client client.Client - Contract *bindings.KeyperSetManager - StartBlock *number.BlockNumber - Log log.Logger - Handler event.ShutterStateHandler + Client client.EthereumClient + Contract *bindings.KeyperSetManager + StartBlock *number.BlockNumber + Log log.Logger + Handler event.ShutterStateHandler + DisableEventWatcher bool pausedCh chan *bindings.KeyperSetManagerPaused unpausedCh chan *bindings.KeyperSetManagerUnpaused @@ -40,50 +43,96 @@ func (s *ShutterStateSyncer) GetShutterState(ctx context.Context, opts *bind.Cal }, nil } +func (s *ShutterStateSyncer) QueryAndHandle(ctx context.Context, block uint64) error { + opts := &bind.FilterOpts{ + Start: block, + End: &block, + Context: ctx, + } + iterPaused, err := s.Contract.FilterPaused(opts) + if err != nil { + return err + } + defer iterPaused.Close() + + for iterPaused.Next() { + select { + case s.pausedCh <- iterPaused.Event: + case <-ctx.Done(): + return ctx.Err() + } + } + if err := iterPaused.Error(); err != nil { + return errors.Wrap(err, "filter iterator error") + } + iterUnpaused, err := s.Contract.FilterUnpaused(opts) + if err != nil { + return err + } + defer iterUnpaused.Close() + + for iterUnpaused.Next() { + select { + case s.unpausedCh <- iterUnpaused.Event: + case <-ctx.Done(): + return ctx.Err() + } + } + if err := iterUnpaused.Error(); err != nil { + return errors.Wrap(err, "filter iterator error") + } + return nil +} + func (s *ShutterStateSyncer) Start(ctx context.Context, runner service.Runner) error { if s.Handler == nil { return errors.New("no handler registered") } - watchOpts := &bind.WatchOpts{ - Start: s.StartBlock.ToUInt64Ptr(), // nil means latest + // the latest block still has to be fixed. + // otherwise we could skip some block events + // between the initial poll and the subscription. + if s.StartBlock.IsLatest() { + latest, err := s.Client.BlockNumber(ctx) + if err != nil { + return err + } + s.StartBlock.SetUint64(latest) + } + + opts := &bind.WatchOpts{ + Start: s.StartBlock.ToUInt64Ptr(), Context: ctx, } s.pausedCh = make(chan *bindings.KeyperSetManagerPaused) - subs, err := s.Contract.WatchPaused(watchOpts, s.pausedCh) - // FIXME: what to do on subs.Error() - if err != nil { - return err - } - runner.Defer(subs.Unsubscribe) runner.Defer(func() { close(s.pausedCh) }) - s.unpausedCh = make(chan *bindings.KeyperSetManagerUnpaused) - subs, err = s.Contract.WatchUnpaused(watchOpts, s.unpausedCh) - // FIXME: what to do on subs.Error() - if err != nil { - return err - } - runner.Defer(subs.Unsubscribe) runner.Defer(func() { close(s.unpausedCh) }) + if !s.DisableEventWatcher { + subs, err := s.Contract.WatchPaused(opts, s.pausedCh) + // FIXME: what to do on subs.Error() + if err != nil { + return err + } + runner.Defer(subs.Unsubscribe) + subs, err = s.Contract.WatchUnpaused(opts, s.unpausedCh) + // FIXME: what to do on subs.Error() + if err != nil { + return err + } + runner.Defer(subs.Unsubscribe) + } + runner.Go(func() error { return s.watchPaused(ctx) }) return nil } -func (s *ShutterStateSyncer) pollIsActive(ctx context.Context) (bool, error) { - callOpts := bind.CallOpts{ - Context: ctx, - } - paused, err := s.Contract.Paused(&callOpts) - return !paused, err -} - func (s *ShutterStateSyncer) handle(ctx context.Context, ev *event.ShutterState) { err := s.Handler(ctx, ev) if err != nil { @@ -96,40 +145,56 @@ func (s *ShutterStateSyncer) handle(ctx context.Context, ev *event.ShutterState) } func (s *ShutterStateSyncer) watchPaused(ctx context.Context) error { - isActive, err := s.pollIsActive(ctx) + // query the initial state + // and construct a "virtual" + // event + opts := &bind.CallOpts{ + BlockNumber: s.StartBlock.Int, + Context: nil, + } + + stateAtStartBlock, err := s.GetShutterState(ctx, opts) if err != nil { // XXX: this will fail everything, do we want that? return err } - ev := &event.ShutterState{ - Active: isActive, - } - s.handle(ctx, ev) + s.handle(ctx, stateAtStartBlock) + lastState := stateAtStartBlock for { select { - case _, ok := <-s.unpausedCh: + case unpaused, ok := <-s.unpausedCh: if !ok { return nil } - if isActive { - s.Log.Error("state mismatch", "got", "actice", "have", "inactive") + if lastState.Active { + s.Log.Warn( + "state/event mismatch, but continue handler", + "new-event", "Unpaused", + "last-state", "active", + ) } + block := unpaused.Raw.BlockNumber ev := &event.ShutterState{ - Active: true, + Active: true, + AtBlockNumber: number.NewBlockNumber(&block), } - isActive = ev.Active s.handle(ctx, ev) - case _, ok := <-s.pausedCh: + case paused, ok := <-s.pausedCh: if !ok { return nil } - if isActive { - s.Log.Error("state mismatch", "got", "inactive", "have", "active") + if !lastState.Active { + s.Log.Warn( + "state/event mismatch, but continue handler", + "new-event", "Paused", + "last-state", "inactive", + ) } + block := paused.Raw.BlockNumber ev := &event.ShutterState{ - Active: false, + Active: false, + AtBlockNumber: number.NewBlockNumber(&block), } - isActive = ev.Active s.handle(ctx, ev) case <-ctx.Done(): return ctx.Err() diff --git a/rolling-shutter/medley/chainsync/syncer/unsafehead.go b/rolling-shutter/medley/chainsync/syncer/unsafehead.go index 86ddbd1b3..667d0d3ff 100644 --- a/rolling-shutter/medley/chainsync/syncer/unsafehead.go +++ b/rolling-shutter/medley/chainsync/syncer/unsafehead.go @@ -14,14 +14,19 @@ import ( ) type UnsafeHeadSyncer struct { - Client client.Client + Client client.EthereumClient Log log.Logger Handler event.BlockHandler + // Handler to be manually triggered + // to handle their handler function + // before the own Handler is called: + SyncedHandler []ManualFilterHandler newLatestHeadCh chan *types.Header } func (s *UnsafeHeadSyncer) Start(ctx context.Context, runner service.Runner) error { + s.Log.Info("unsafe head syncer started") if s.Handler == nil { return errors.New("no handler registered") } @@ -42,6 +47,10 @@ func (s *UnsafeHeadSyncer) Start(ctx context.Context, runner service.Runner) err return nil } +func parseLogs() error { + return nil +} + func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context) error { for { select { @@ -49,8 +58,25 @@ func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context) error { if !ok { return nil } + // TODO: check bloom filter for topic of all + // synced handlers and only call them if + // the bloomfilter retrieves something. + + blockNum := number.BigToBlockNumber(newHeader.Number) + for _, h := range s.SyncedHandler { + // NOTE: this has to be blocking! + // So whenever this returns, it is expected + // that the handlers Handle function + // has been called and it returned. + err := h.QueryAndHandle(ctx, blockNum.Uint64()) + if err != nil { + // XXX: return or log? + // return err + s.Log.Error("synced handler call errored, skipping", "error", err) + } + } ev := &event.LatestBlock{ - Number: number.BigToBlockNumber(newHeader.Number), + Number: blockNum, BlockHash: newHeader.Hash(), Header: newHeader, } diff --git a/rolling-shutter/medley/chainsync/syncer/util.go b/rolling-shutter/medley/chainsync/syncer/util.go index 0c44cf3dc..916e93141 100644 --- a/rolling-shutter/medley/chainsync/syncer/util.go +++ b/rolling-shutter/medley/chainsync/syncer/util.go @@ -18,6 +18,10 @@ var ( errLatestBlock = errors.New("'nil' latest block") ) +type ManualFilterHandler interface { + QueryAndHandle(ctx context.Context, block uint64) error +} + func logToCallOpts(ctx context.Context, log *types.Log) *bind.CallOpts { block := new(big.Int) block.SetUint64(log.BlockNumber) @@ -40,7 +44,7 @@ func guardCallOpts(opts *bind.CallOpts, allowLatest bool) error { return nil } -func fixCallOpts(ctx context.Context, c client.Client, opts *bind.CallOpts) (*bind.CallOpts, *uint64, error) { +func fixCallOpts(ctx context.Context, c client.EthereumClient, opts *bind.CallOpts) (*bind.CallOpts, *uint64, error) { err := guardCallOpts(opts, false) if err == nil { return opts, nil, nil From d44c6bdd5da737b3dd12830d038b7747226f6bae Mon Sep 17 00:00:00 2001 From: Maximilian Langenfeld <15726643+ezdac@users.noreply.github.com> Date: Fri, 10 May 2024 16:20:51 +0200 Subject: [PATCH 3/8] fix(op): one-off error in decryption-trigger on new block --- rolling-shutter/keyperimpl/optimism/keyper.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rolling-shutter/keyperimpl/optimism/keyper.go b/rolling-shutter/keyperimpl/optimism/keyper.go index aac660780..922b1e784 100644 --- a/rolling-shutter/keyperimpl/optimism/keyper.go +++ b/rolling-shutter/keyperimpl/optimism/keyper.go @@ -99,9 +99,10 @@ func (kpr *Keyper) newBlock(_ context.Context, ev *syncevent.LatestBlock) error // TODO: sanity checks - idPreimage := identitypreimage.BigToIdentityPreimage(ev.Number.Int) + latestBlockNumber := ev.Number.Uint64() + idPreimage := identitypreimage.Uint64ToIdentityPreimage(latestBlockNumber + 1) trig := &epochkghandler.DecryptionTrigger{ - BlockNumber: ev.Number.Uint64(), + BlockNumber: latestBlockNumber + 1, IdentityPreimages: []identitypreimage.IdentityPreimage{idPreimage}, } From 74d780f8f0fd7971db1c116932d908e3eeba3393 Mon Sep 17 00:00:00 2001 From: Maximilian Langenfeld <15726643+ezdac@users.noreply.github.com> Date: Fri, 10 May 2024 16:20:54 +0200 Subject: [PATCH 4/8] feat(chainsync): allow not fetching past active events --- rolling-shutter/medley/chainsync/options.go | 54 ++++++++++++------- .../medley/chainsync/syncer/eonpubkey.go | 30 ++++++----- .../medley/chainsync/syncer/keyperset.go | 28 +++++----- .../medley/chainsync/syncer/shutterstate.go | 40 +++++--------- 4 files changed, 81 insertions(+), 71 deletions(-) diff --git a/rolling-shutter/medley/chainsync/options.go b/rolling-shutter/medley/chainsync/options.go index 36d8e3c9a..77e05bc55 100644 --- a/rolling-shutter/medley/chainsync/options.go +++ b/rolling-shutter/medley/chainsync/options.go @@ -28,6 +28,7 @@ type options struct { logger log.Logger runner service.Runner syncStart *number.BlockNumber + fetchActivesAtSyncStart bool privKey *ecdsa.PrivateKey handlerShutterState event.ShutterStateHandler @@ -91,12 +92,13 @@ func (o *options) apply(ctx context.Context, c *Client) error { return err } c.kssync = &syncer.KeyperSetSyncer{ - Client: client, - Contract: c.KeyperSetManager, - Log: c.log, - StartBlock: o.syncStart, - Handler: o.handlerKeyperSet, - DisableEventWatcher: true, + Client: client, + Contract: c.KeyperSetManager, + Log: c.log, + StartBlock: o.syncStart, + Handler: o.handlerKeyperSet, + FetchActiveAtStartBlock: o.fetchActivesAtSyncStart, + DisableEventWatcher: true, } if o.handlerKeyperSet != nil { c.services = append(c.services, c.kssync) @@ -108,13 +110,14 @@ func (o *options) apply(ctx context.Context, c *Client) error { return err } c.epksync = &syncer.EonPubKeySyncer{ - Client: client, - Log: c.log, - KeyBroadcast: c.KeyBroadcast, - KeyperSetManager: c.KeyperSetManager, - Handler: o.handlerEonPublicKey, - StartBlock: o.syncStart, - DisableEventWatcher: true, + Client: client, + Log: c.log, + KeyBroadcast: c.KeyBroadcast, + KeyperSetManager: c.KeyperSetManager, + Handler: o.handlerEonPublicKey, + StartBlock: o.syncStart, + FetchActiveAtStartBlock: o.fetchActivesAtSyncStart, + DisableEventWatcher: true, } if o.handlerEonPublicKey != nil { c.services = append(c.services, c.epksync) @@ -122,12 +125,13 @@ func (o *options) apply(ctx context.Context, c *Client) error { } c.sssync = &syncer.ShutterStateSyncer{ - Client: client, - Contract: c.KeyperSetManager, - Log: c.log, - Handler: o.handlerShutterState, - StartBlock: o.syncStart, - DisableEventWatcher: true, + Client: client, + Contract: c.KeyperSetManager, + Log: c.log, + Handler: o.handlerShutterState, + StartBlock: o.syncStart, + FetchActiveAtStartBlock: o.fetchActivesAtSyncStart, + DisableEventWatcher: true, } if o.handlerShutterState != nil { c.services = append(c.services, c.sssync) @@ -169,11 +173,21 @@ func defaultOptions() *options { ethClient: nil, logger: noopLogger, runner: nil, + fetchActivesAtSyncStart: true, syncStart: number.NewBlockNumber(nil), } } -func WithSyncStartBlock(blockNumber *number.BlockNumber) Option { +func WithNoFetchActivesBeforeStart() Option { + return func(o *options) error { + o.fetchActivesAtSyncStart = false + return nil + } +} + +func WithSyncStartBlock( + blockNumber *number.BlockNumber, +) Option { if blockNumber == nil { blockNumber = number.NewBlockNumber(nil) } diff --git a/rolling-shutter/medley/chainsync/syncer/eonpubkey.go b/rolling-shutter/medley/chainsync/syncer/eonpubkey.go index 635a10028..f973abe47 100644 --- a/rolling-shutter/medley/chainsync/syncer/eonpubkey.go +++ b/rolling-shutter/medley/chainsync/syncer/eonpubkey.go @@ -18,13 +18,14 @@ import ( var _ ManualFilterHandler = &EonPubKeySyncer{} type EonPubKeySyncer struct { - Client client.EthereumClient - Log log.Logger - KeyBroadcast *bindings.KeyBroadcastContract - KeyperSetManager *bindings.KeyperSetManager - StartBlock *number.BlockNumber - Handler event.EonPublicKeyHandler - DisableEventWatcher bool + Client client.EthereumClient + Log log.Logger + KeyBroadcast *bindings.KeyBroadcastContract + KeyperSetManager *bindings.KeyperSetManager + StartBlock *number.BlockNumber + Handler event.EonPublicKeyHandler + FetchActiveAtStartBlock bool + DisableEventWatcher bool keyBroadcastCh chan *bindings.KeyBroadcastContractEonKeyBroadcast } @@ -76,15 +77,18 @@ func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) erro } s.StartBlock.SetUint64(latest) } - pubKs, err := s.getInitialPubKeys(ctx) - if err != nil { - return err - } - for _, k := range pubKs { - err := s.Handler(ctx, k) + + if s.FetchActiveAtStartBlock { + pubKs, err := s.getInitialPubKeys(ctx) if err != nil { return err } + for _, k := range pubKs { + err := s.Handler(ctx, k) + if err != nil { + return err + } + } } watchOpts := &bind.WatchOpts{ diff --git a/rolling-shutter/medley/chainsync/syncer/keyperset.go b/rolling-shutter/medley/chainsync/syncer/keyperset.go index aef0db113..d07c1bd0a 100644 --- a/rolling-shutter/medley/chainsync/syncer/keyperset.go +++ b/rolling-shutter/medley/chainsync/syncer/keyperset.go @@ -30,7 +30,8 @@ type KeyperSetSyncer struct { StartBlock *number.BlockNumber Handler event.KeyperSetHandler // disable this when the QueryAndHandle manual polling should be used: - DisableEventWatcher bool + FetchActiveAtStartBlock bool + DisableEventWatcher bool keyperAddedCh chan *bindings.KeyperSetManagerKeyperSetAdded } @@ -87,18 +88,21 @@ func (s *KeyperSetSyncer) Start(ctx context.Context, runner service.Runner) erro Start: s.StartBlock.ToUInt64Ptr(), Context: ctx, } - initial, err := s.getInitialKeyperSets(ctx) - if err != nil { - return err - } - for _, ks := range initial { - err = s.Handler(ctx, ks) + + if s.FetchActiveAtStartBlock { + initial, err := s.getInitialKeyperSets(ctx) if err != nil { - s.Log.Error( - "handler for `NewKeyperSet` errored for initial sync", - "error", - err.Error(), - ) + return err + } + for _, ks := range initial { + err = s.Handler(ctx, ks) + if err != nil { + s.Log.Error( + "handler for `NewKeyperSet` errored for initial sync", + "error", + err.Error(), + ) + } } } s.keyperAddedCh = make(chan *bindings.KeyperSetManagerKeyperSetAdded, channelSize) diff --git a/rolling-shutter/medley/chainsync/syncer/shutterstate.go b/rolling-shutter/medley/chainsync/syncer/shutterstate.go index a510d4650..058772857 100644 --- a/rolling-shutter/medley/chainsync/syncer/shutterstate.go +++ b/rolling-shutter/medley/chainsync/syncer/shutterstate.go @@ -17,12 +17,13 @@ import ( var _ ManualFilterHandler = &ShutterStateSyncer{} type ShutterStateSyncer struct { - Client client.EthereumClient - Contract *bindings.KeyperSetManager - StartBlock *number.BlockNumber - Log log.Logger - Handler event.ShutterStateHandler - DisableEventWatcher bool + Client client.EthereumClient + Contract *bindings.KeyperSetManager + StartBlock *number.BlockNumber + Log log.Logger + Handler event.ShutterStateHandler + FetchActiveAtStartBlock bool + DisableEventWatcher bool pausedCh chan *bindings.KeyperSetManagerPaused unpausedCh chan *bindings.KeyperSetManagerUnpaused @@ -153,26 +154,20 @@ func (s *ShutterStateSyncer) watchPaused(ctx context.Context) error { Context: nil, } - stateAtStartBlock, err := s.GetShutterState(ctx, opts) - if err != nil { - // XXX: this will fail everything, do we want that? - return err + if s.FetchActiveAtStartBlock { + stateAtStartBlock, err := s.GetShutterState(ctx, opts) + if err != nil { + // XXX: this will fail everything, do we want that? + return err + } + s.handle(ctx, stateAtStartBlock) } - s.handle(ctx, stateAtStartBlock) - lastState := stateAtStartBlock for { select { case unpaused, ok := <-s.unpausedCh: if !ok { return nil } - if lastState.Active { - s.Log.Warn( - "state/event mismatch, but continue handler", - "new-event", "Unpaused", - "last-state", "active", - ) - } block := unpaused.Raw.BlockNumber ev := &event.ShutterState{ Active: true, @@ -183,13 +178,6 @@ func (s *ShutterStateSyncer) watchPaused(ctx context.Context) error { if !ok { return nil } - if !lastState.Active { - s.Log.Warn( - "state/event mismatch, but continue handler", - "new-event", "Paused", - "last-state", "inactive", - ) - } block := paused.Raw.BlockNumber ev := &event.ShutterState{ Active: false, From 65dcfe764bbfed06199cbd806ebed53346702725 Mon Sep 17 00:00:00 2001 From: Maximilian Langenfeld <15726643+ezdac@users.noreply.github.com> Date: Fri, 10 May 2024 16:20:57 +0200 Subject: [PATCH 5/8] feat: consider reorgs in chainsync --- .../medley/chainsync/syncer/unsafehead.go | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/rolling-shutter/medley/chainsync/syncer/unsafehead.go b/rolling-shutter/medley/chainsync/syncer/unsafehead.go index 667d0d3ff..60a0b783b 100644 --- a/rolling-shutter/medley/chainsync/syncer/unsafehead.go +++ b/rolling-shutter/medley/chainsync/syncer/unsafehead.go @@ -3,6 +3,7 @@ package syncer import ( "context" "errors" + "math/big" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" @@ -52,17 +53,45 @@ func parseLogs() error { } func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context) error { + var currentBlock *big.Int for { select { case newHeader, ok := <-s.newLatestHeadCh: if !ok { return nil } + blockNum := number.BigToBlockNumber(newHeader.Number) + if currentBlock != nil { + switch newHeader.Number.Cmp(currentBlock) { + case -1, 0: + prevNum := new(big.Int).Sub(newHeader.Number, big.NewInt(1)) + prevBlockNum := number.BigToBlockNumber(prevNum) + // Re-emit the previous block, to pre-emptively signal an + // incoming reorg. Like this a client is able to e.g. + // rewind changes first before processing the new + // events of the reorg + ev := &event.LatestBlock{ + Number: prevBlockNum, + BlockHash: newHeader.ParentHash, + } + err := s.Handler(ctx, ev) + if err != nil { + // XXX: return or log? + // return err + s.Log.Error( + "handler for `NewLatestBlock` errored", + "error", + err.Error(), + ) + } + case 1: + // expected + } + } + // TODO: check bloom filter for topic of all // synced handlers and only call them if // the bloomfilter retrieves something. - - blockNum := number.BigToBlockNumber(newHeader.Number) for _, h := range s.SyncedHandler { // NOTE: this has to be blocking! // So whenever this returns, it is expected @@ -88,6 +117,7 @@ func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context) error { err.Error(), ) } + currentBlock = newHeader.Number case <-ctx.Done(): return ctx.Err() } From cdb0a9e1a733f51b15eb8193b4758c2e988edd7c Mon Sep 17 00:00:00 2001 From: Maximilian Langenfeld <15726643+ezdac@users.noreply.github.com> Date: Fri, 10 May 2024 16:21:00 +0200 Subject: [PATCH 6/8] fix(chainsync): sync all blocks from start to latest head Before, the chainsync did not fetch all subscribed events from the provided 'WithSyncStartBlock' to the current latest head. This is now fixed with a mechanism where the chainsync is progressing a sync-head in tandem with the incoming updates for the latest head. The syncer will catch up with polling for all older blocks until the current latest head is reached. --- rolling-shutter/medley/chainsync/options.go | 145 ++++----- .../medley/chainsync/syncer/eonpubkey.go | 123 ++----- .../medley/chainsync/syncer/keyperset.go | 148 +++------ .../medley/chainsync/syncer/shutterstate.go | 132 ++------ .../medley/chainsync/syncer/unsafehead.go | 303 ++++++++++++++---- .../medley/chainsync/syncer/util.go | 1 + 6 files changed, 409 insertions(+), 443 deletions(-) diff --git a/rolling-shutter/medley/chainsync/options.go b/rolling-shutter/medley/chainsync/options.go index 77e05bc55..076e1666c 100644 --- a/rolling-shutter/medley/chainsync/options.go +++ b/rolling-shutter/medley/chainsync/options.go @@ -39,132 +39,119 @@ type options struct { func (o *options) verify() error { if o.clientURL != "" && o.ethClient != nil { - // TODO: error message - return errors.New("can't use client and client url") + return errors.New("'WithClient' and 'WithClientURL' options are mutually exclusive") } if o.clientURL == "" && o.ethClient == nil { - // TODO: error message - return errors.New("have to provide either url or client") + return errors.New("either 'WithClient' or 'WithClientURL' options are expected") } // TODO: check for the existence of the contract addresses depending on // what handlers are not nil return nil } -// initialize the shutter client and apply the options. -// the context is only the initialisation context, -// and should not be considered to handle the lifecycle -// of shutter clients background workers. -func (o *options) apply(ctx context.Context, c *Client) error { - var ( - client syncclient.EthereumClient - err error - ) - if o.clientURL != "" { - o.ethClient, err = ethclient.DialContext(ctx, o.clientURL) - if err != nil { - return err - } - } - client = o.ethClient - - c.EthereumClient = client - - if o.logger != nil { - c.log = o.logger - // NOCHECKIN: - c.log.Info("got logger in options") - } - +func (o *options) applyHandler(c *Client) error { + var err error syncedServices := []syncer.ManualFilterHandler{} - // the nil passthrough will use "latest" for each call, - // but we want to harmonize and fix the sync start to a specific block. - if o.syncStart.IsLatest() { - latestBlock, err := c.EthereumClient.BlockNumber(ctx) - if err != nil { - return errors.Wrap(err, "polling latest block") - } - o.syncStart = number.NewBlockNumber(&latestBlock) - } - c.KeyperSetManager, err = bindings.NewKeyperSetManager(*o.keyperSetManagerAddress, client) + c.KeyperSetManager, err = bindings.NewKeyperSetManager(*o.keyperSetManagerAddress, o.ethClient) if err != nil { return err } c.kssync = &syncer.KeyperSetSyncer{ - Client: client, - Contract: c.KeyperSetManager, - Log: c.log, - StartBlock: o.syncStart, - Handler: o.handlerKeyperSet, - FetchActiveAtStartBlock: o.fetchActivesAtSyncStart, - DisableEventWatcher: true, + Client: o.ethClient, + Contract: c.KeyperSetManager, + Log: c.log, + Handler: o.handlerKeyperSet, } if o.handlerKeyperSet != nil { - c.services = append(c.services, c.kssync) syncedServices = append(syncedServices, c.kssync) } - c.KeyBroadcast, err = bindings.NewKeyBroadcastContract(*o.keyBroadcastContractAddress, client) + c.KeyBroadcast, err = bindings.NewKeyBroadcastContract(*o.keyBroadcastContractAddress, o.ethClient) if err != nil { return err } c.epksync = &syncer.EonPubKeySyncer{ - Client: client, - Log: c.log, - KeyBroadcast: c.KeyBroadcast, - KeyperSetManager: c.KeyperSetManager, - Handler: o.handlerEonPublicKey, - StartBlock: o.syncStart, - FetchActiveAtStartBlock: o.fetchActivesAtSyncStart, - DisableEventWatcher: true, + Client: o.ethClient, + Log: c.log, + KeyBroadcast: c.KeyBroadcast, + KeyperSetManager: c.KeyperSetManager, + Handler: o.handlerEonPublicKey, } if o.handlerEonPublicKey != nil { - c.services = append(c.services, c.epksync) syncedServices = append(syncedServices, c.epksync) } - c.sssync = &syncer.ShutterStateSyncer{ - Client: client, - Contract: c.KeyperSetManager, - Log: c.log, - Handler: o.handlerShutterState, - StartBlock: o.syncStart, - FetchActiveAtStartBlock: o.fetchActivesAtSyncStart, - DisableEventWatcher: true, + Client: o.ethClient, + Contract: c.KeyperSetManager, + Log: c.log, + Handler: o.handlerShutterState, } if o.handlerShutterState != nil { - c.services = append(c.services, c.sssync) syncedServices = append(syncedServices, c.sssync) } if o.handlerBlock == nil { - // NOOP - but we need to run the UnsafeHeadSyncer. - // This is to keep the inner workings consisten, - // we use the DisableEventWatcher mechanism in combination - // with the UnsafeHeadSyncer instead of the streaming - // Watch... subscription on events. - // TODO: think about allowing the streaming events, - // when guaranteed event order (event1,event2,new-block-event) - // is not required + // Even if the user is not interested in handling new block events, + // the streaming block handler must be running in order to + // synchronize polling of new contract events. + // Since the handler function is always called, we need to + // inject a noop-handler o.handlerBlock = func(ctx context.Context, lb *event.LatestBlock) error { return nil } } c.uhsync = &syncer.UnsafeHeadSyncer{ - Client: client, - Log: c.log, - Handler: o.handlerBlock, - SyncedHandler: syncedServices, + Client: o.ethClient, + Log: c.log, + Handler: o.handlerBlock, + SyncedHandler: syncedServices, + FetchActiveAtStart: o.fetchActivesAtSyncStart, + SyncStartBlock: o.syncStart, } if o.handlerBlock != nil { c.services = append(c.services, c.uhsync) } - c.privKey = o.privKey return nil } +// initialize the shutter client and apply the options. +// the context is only the initialisation context, +// and should not be considered to handle the lifecycle +// of shutter clients background workers. +func (o *options) apply(ctx context.Context, c *Client) error { + var ( + client syncclient.EthereumClient + err error + ) + if o.clientURL != "" { + o.ethClient, err = ethclient.DialContext(ctx, o.clientURL) + if err != nil { + return err + } + } + client = o.ethClient + c.EthereumClient = client + + // the nil passthrough will use "latest" for each call, + // but we want to harmonize and fix the sync start to a specific block. + if o.syncStart.IsLatest() { + latestBlock, err := c.EthereumClient.BlockNumber(ctx) + if err != nil { + return errors.Wrap(err, "polling latest block") + } + o.syncStart = number.NewBlockNumber(&latestBlock) + } + + if o.logger != nil { + c.log = o.logger + } + + c.privKey = o.privKey + return o.applyHandler(c) +} + func defaultOptions() *options { return &options{ keyperSetManagerAddress: &predeploy.KeyperSetManagerAddr, diff --git a/rolling-shutter/medley/chainsync/syncer/eonpubkey.go b/rolling-shutter/medley/chainsync/syncer/eonpubkey.go index f973abe47..7aa6b8171 100644 --- a/rolling-shutter/medley/chainsync/syncer/eonpubkey.go +++ b/rolling-shutter/medley/chainsync/syncer/eonpubkey.go @@ -12,22 +12,16 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) var _ ManualFilterHandler = &EonPubKeySyncer{} type EonPubKeySyncer struct { - Client client.EthereumClient - Log log.Logger - KeyBroadcast *bindings.KeyBroadcastContract - KeyperSetManager *bindings.KeyperSetManager - StartBlock *number.BlockNumber - Handler event.EonPublicKeyHandler - FetchActiveAtStartBlock bool - DisableEventWatcher bool - - keyBroadcastCh chan *bindings.KeyBroadcastContractEonKeyBroadcast + Client client.EthereumClient + Log log.Logger + KeyBroadcast *bindings.KeyBroadcastContract + KeyperSetManager *bindings.KeyperSetManager + Handler event.EonPublicKeyHandler } func (s *EonPubKeySyncer) QueryAndHandle(ctx context.Context, block uint64) error { @@ -48,10 +42,13 @@ func (s *EonPubKeySyncer) QueryAndHandle(ctx context.Context, block uint64) erro defer iter.Close() for iter.Next() { - select { - case s.keyBroadcastCh <- iter.Event: - case <-ctx.Done(): - return ctx.Err() + err := s.handle(ctx, iter.Event) + if err != nil { + s.Log.Error( + "handler for `NewKeyperSet` errored", + "error", + err.Error(), + ) } } if err := iter.Error(); err != nil { @@ -60,65 +57,41 @@ func (s *EonPubKeySyncer) QueryAndHandle(ctx context.Context, block uint64) erro return nil } -func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) error { - s.Log.Info( - "pubsyncer loop started", - ) - if s.Handler == nil { - return errors.New("no handler registered") - } - // the latest block still has to be fixed. - // otherwise we could skip some block events - // between the initial poll and the subscription. - if s.StartBlock.IsLatest() { - latest, err := s.Client.BlockNumber(ctx) - if err != nil { - return err - } - s.StartBlock.SetUint64(latest) - } - - if s.FetchActiveAtStartBlock { - pubKs, err := s.getInitialPubKeys(ctx) - if err != nil { - return err - } - for _, k := range pubKs { - err := s.Handler(ctx, k) - if err != nil { - return err - } - } +func (s *EonPubKeySyncer) handle(ctx context.Context, newEonKey *bindings.KeyBroadcastContractEonKeyBroadcast) error { + pubk := newEonKey.Key + bn := newEonKey.Raw.BlockNumber + ev := &event.EonPublicKey{ + Eon: newEonKey.Eon, + Key: pubk, + AtBlockNumber: number.NewBlockNumber(&bn), + } + err := s.Handler(ctx, ev) + if err != nil { + return err } + return nil +} - watchOpts := &bind.WatchOpts{ - Start: s.StartBlock.ToUInt64Ptr(), - Context: ctx, +func (s *EonPubKeySyncer) HandleVirtualEvent(ctx context.Context, block *number.BlockNumber) error { + pubKs, err := s.getInitialPubKeys(ctx, block) + if err != nil { + return err } - s.keyBroadcastCh = make(chan *bindings.KeyBroadcastContractEonKeyBroadcast, channelSize) - runner.Defer(func() { - close(s.keyBroadcastCh) - }) - if !s.DisableEventWatcher { - subs, err := s.KeyBroadcast.WatchEonKeyBroadcast(watchOpts, s.keyBroadcastCh) - // FIXME: what to do on subs.Error() + for _, k := range pubKs { + err := s.Handler(ctx, k) if err != nil { return err } - runner.Defer(subs.Unsubscribe) } - runner.Go(func() error { - return s.watchNewEonPubkey(ctx) - }) return nil } -func (s *EonPubKeySyncer) getInitialPubKeys(ctx context.Context) ([]*event.EonPublicKey, error) { +func (s *EonPubKeySyncer) getInitialPubKeys(ctx context.Context, block *number.BlockNumber) ([]*event.EonPublicKey, error) { // This blocknumber specifies AT what state // the contract is called opts := &bind.CallOpts{ Context: ctx, - BlockNumber: s.StartBlock.Int, + BlockNumber: block.Int, } numKS, err := s.KeyperSetManager.GetNumKeyperSets(opts) if err != nil { @@ -126,7 +99,7 @@ func (s *EonPubKeySyncer) getInitialPubKeys(ctx context.Context) ([]*event.EonPu } // this blocknumber specifies the argument to the contract // getter - activeEon, err := s.KeyperSetManager.GetKeyperSetIndexByBlock(opts, s.StartBlock.Uint64()) + activeEon, err := s.KeyperSetManager.GetKeyperSetIndexByBlock(opts, block.Uint64()) if err != nil { return nil, err } @@ -180,31 +153,3 @@ func (s *EonPubKeySyncer) GetEonPubKeyForEon(ctx context.Context, opts *bind.Cal AtBlockNumber: number.BigToBlockNumber(opts.BlockNumber), }, nil } - -func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context) error { - for { - select { - case newEonKey, ok := <-s.keyBroadcastCh: - if !ok { - return nil - } - pubk := newEonKey.Key - bn := newEonKey.Raw.BlockNumber - ev := &event.EonPublicKey{ - Eon: newEonKey.Eon, - Key: pubk, - AtBlockNumber: number.NewBlockNumber(&bn), - } - err := s.Handler(ctx, ev) - if err != nil { - s.Log.Error( - "handler for `NewKeyperSet` errored", - "error", - err.Error(), - ) - } - case <-ctx.Done(): - return ctx.Err() - } - } -} diff --git a/rolling-shutter/medley/chainsync/syncer/keyperset.go b/rolling-shutter/medley/chainsync/syncer/keyperset.go index d07c1bd0a..d0c50af19 100644 --- a/rolling-shutter/medley/chainsync/syncer/keyperset.go +++ b/rolling-shutter/medley/chainsync/syncer/keyperset.go @@ -12,7 +12,6 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) func makeCallError(attrName string, err error) error { @@ -24,114 +23,67 @@ const channelSize = 10 var _ ManualFilterHandler = &KeyperSetSyncer{} type KeyperSetSyncer struct { - Client client.EthereumClient - Contract *bindings.KeyperSetManager - Log log.Logger - StartBlock *number.BlockNumber - Handler event.KeyperSetHandler - // disable this when the QueryAndHandle manual polling should be used: - FetchActiveAtStartBlock bool - DisableEventWatcher bool - - keyperAddedCh chan *bindings.KeyperSetManagerKeyperSetAdded + Client client.EthereumClient + Contract *bindings.KeyperSetManager + Log log.Logger + Handler event.KeyperSetHandler } func (s *KeyperSetSyncer) QueryAndHandle(ctx context.Context, block uint64) error { opts := &bind.FilterOpts{ - // FIXME: does this work, or do we need index -1 or something? Start: block, End: &block, Context: ctx, } iter, err := s.Contract.FilterKeyperSetAdded(opts) - // TODO: what errors possible? if err != nil { return err } defer iter.Close() for iter.Next() { - select { - // XXX: this can be nil during the handlers startup. - // As far as I understand, a nil channel is never selected. - // Will it be selected as soon as the channel is not nil anymore? - case s.keyperAddedCh <- iter.Event: - case <-ctx.Done(): - return ctx.Err() + err := s.handle(ctx, iter.Event) + if err != nil { + s.Log.Error( + "handler for `NewKeyperSet` errored", + "error", + err.Error(), + ) } } - // XXX: it looks like this is nil when the iterator is - // exhausted without failures if err := iter.Error(); err != nil { return errors.Wrap(err, "filter iterator error") } return nil } -func (s *KeyperSetSyncer) Start(ctx context.Context, runner service.Runner) error { - if s.Handler == nil { - return errors.New("no handler registered") - } - - // the latest block still has to be fixed. - // otherwise we could skip some block events - // between the initial poll and the subscription. - if s.StartBlock.IsLatest() { - latest, err := s.Client.BlockNumber(ctx) - if err != nil { - return err - } - s.StartBlock.SetUint64(latest) - } - - watchOpts := &bind.WatchOpts{ - Start: s.StartBlock.ToUInt64Ptr(), - Context: ctx, - } - - if s.FetchActiveAtStartBlock { - initial, err := s.getInitialKeyperSets(ctx) - if err != nil { - return err - } - for _, ks := range initial { - err = s.Handler(ctx, ks) - if err != nil { - s.Log.Error( - "handler for `NewKeyperSet` errored for initial sync", - "error", - err.Error(), - ) - } - } +func (s *KeyperSetSyncer) HandleVirtualEvent(ctx context.Context, block *number.BlockNumber) error { + initial, err := s.getInitialKeyperSets(ctx, block) + if err != nil { + return err } - s.keyperAddedCh = make(chan *bindings.KeyperSetManagerKeyperSetAdded, channelSize) - runner.Defer(func() { - close(s.keyperAddedCh) - }) - if !s.DisableEventWatcher { - subs, err := s.Contract.WatchKeyperSetAdded(watchOpts, s.keyperAddedCh) - // FIXME: what to do on subs.Error() + for _, ks := range initial { + err = s.Handler(ctx, ks) if err != nil { - return err + s.Log.Error( + "handler for `NewKeyperSet` errored for initial sync", + "error", + err.Error(), + ) } - runner.Defer(subs.Unsubscribe) } - runner.Go(func() error { - return s.watchNewKeypersService(ctx) - }) return nil } -func (s *KeyperSetSyncer) getInitialKeyperSets(ctx context.Context) ([]*event.KeyperSet, error) { +func (s *KeyperSetSyncer) getInitialKeyperSets(ctx context.Context, block *number.BlockNumber) ([]*event.KeyperSet, error) { opts := &bind.CallOpts{ Context: ctx, - BlockNumber: s.StartBlock.Int, + BlockNumber: block.Int, } if err := guardCallOpts(opts, false); err != nil { return nil, err } - bn := s.StartBlock.ToUInt64Ptr() + bn := block.ToUInt64Ptr() if bn == nil { // this should not be the case return nil, errors.New("start block is 'latest'") @@ -140,7 +92,7 @@ func (s *KeyperSetSyncer) getInitialKeyperSets(ctx context.Context) ([]*event.Ke initialKeyperSets := []*event.KeyperSet{} // this blocknumber specifies the argument to the contract // getter - ks, err := s.GetKeyperSetForBlock(ctx, opts, s.StartBlock) + ks, err := s.GetKeyperSetForBlock(ctx, opts, block) if err != nil { return nil, err } @@ -250,38 +202,20 @@ func (s *KeyperSetSyncer) newEvent( }, nil } -func (s *KeyperSetSyncer) watchNewKeypersService(ctx context.Context) error { - for { - select { - case newKeypers, ok := <-s.keyperAddedCh: - if !ok { - return nil - } - opts := logToCallOpts(ctx, &newKeypers.Raw) - newKeyperSet, err := s.newEvent( - ctx, - opts, - newKeypers.KeyperSetContract, - newKeypers.ActivationBlock, - ) - if err != nil { - s.Log.Error( - "error while fetching new event", - "error", - err.Error(), - ) - continue - } - err = s.Handler(ctx, newKeyperSet) - if err != nil { - s.Log.Error( - "handler for `NewKeyperSet` errored", - "error", - err.Error(), - ) - } - case <-ctx.Done(): - return ctx.Err() - } +func (s *KeyperSetSyncer) handle(ctx context.Context, ev *bindings.KeyperSetManagerKeyperSetAdded) error { + opts := logToCallOpts(ctx, &ev.Raw) + newKeyperSet, err := s.newEvent( + ctx, + opts, + ev.KeyperSetContract, + ev.ActivationBlock, + ) + if err != nil { + return errors.Wrap(err, "fetch new event") } + err = s.Handler(ctx, newKeyperSet) + if err != nil { + return errors.Wrap(err, "call handler") + } + return nil } diff --git a/rolling-shutter/medley/chainsync/syncer/shutterstate.go b/rolling-shutter/medley/chainsync/syncer/shutterstate.go index 058772857..c42eedd89 100644 --- a/rolling-shutter/medley/chainsync/syncer/shutterstate.go +++ b/rolling-shutter/medley/chainsync/syncer/shutterstate.go @@ -11,22 +11,15 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) var _ ManualFilterHandler = &ShutterStateSyncer{} type ShutterStateSyncer struct { - Client client.EthereumClient - Contract *bindings.KeyperSetManager - StartBlock *number.BlockNumber - Log log.Logger - Handler event.ShutterStateHandler - FetchActiveAtStartBlock bool - DisableEventWatcher bool - - pausedCh chan *bindings.KeyperSetManagerPaused - unpausedCh chan *bindings.KeyperSetManagerUnpaused + Client client.EthereumClient + Contract *bindings.KeyperSetManager + Log log.Logger + Handler event.ShutterStateHandler } func (s *ShutterStateSyncer) GetShutterState(ctx context.Context, opts *bind.CallOpts) (*event.ShutterState, error) { @@ -57,15 +50,17 @@ func (s *ShutterStateSyncer) QueryAndHandle(ctx context.Context, block uint64) e defer iterPaused.Close() for iterPaused.Next() { - select { - case s.pausedCh <- iterPaused.Event: - case <-ctx.Done(): - return ctx.Err() + block := iterPaused.Event.Raw.BlockNumber + ev := &event.ShutterState{ + Active: false, + AtBlockNumber: number.NewBlockNumber(&block), } + s.handle(ctx, ev) } if err := iterPaused.Error(); err != nil { return errors.Wrap(err, "filter iterator error") } + iterUnpaused, err := s.Contract.FilterUnpaused(opts) if err != nil { return err @@ -73,11 +68,12 @@ func (s *ShutterStateSyncer) QueryAndHandle(ctx context.Context, block uint64) e defer iterUnpaused.Close() for iterUnpaused.Next() { - select { - case s.unpausedCh <- iterUnpaused.Event: - case <-ctx.Done(): - return ctx.Err() + block := iterUnpaused.Event.Raw.BlockNumber + ev := &event.ShutterState{ + Active: true, + AtBlockNumber: number.NewBlockNumber(&block), } + s.handle(ctx, ev) } if err := iterUnpaused.Error(); err != nil { return errors.Wrap(err, "filter iterator error") @@ -85,52 +81,17 @@ func (s *ShutterStateSyncer) QueryAndHandle(ctx context.Context, block uint64) e return nil } -func (s *ShutterStateSyncer) Start(ctx context.Context, runner service.Runner) error { - if s.Handler == nil { - return errors.New("no handler registered") - } - // the latest block still has to be fixed. - // otherwise we could skip some block events - // between the initial poll and the subscription. - if s.StartBlock.IsLatest() { - latest, err := s.Client.BlockNumber(ctx) - if err != nil { - return err - } - s.StartBlock.SetUint64(latest) - } - - opts := &bind.WatchOpts{ - Start: s.StartBlock.ToUInt64Ptr(), - Context: ctx, +func (s *ShutterStateSyncer) HandleVirtualEvent(ctx context.Context, block *number.BlockNumber) error { + // query the initial state and re-construct a "virtual" event from the contract state + opts := &bind.CallOpts{ + BlockNumber: block.Int, + Context: ctx, } - s.pausedCh = make(chan *bindings.KeyperSetManagerPaused) - runner.Defer(func() { - close(s.pausedCh) - }) - s.unpausedCh = make(chan *bindings.KeyperSetManagerUnpaused) - runner.Defer(func() { - close(s.unpausedCh) - }) - - if !s.DisableEventWatcher { - subs, err := s.Contract.WatchPaused(opts, s.pausedCh) - // FIXME: what to do on subs.Error() - if err != nil { - return err - } - runner.Defer(subs.Unsubscribe) - subs, err = s.Contract.WatchUnpaused(opts, s.unpausedCh) - // FIXME: what to do on subs.Error() - if err != nil { - return err - } - runner.Defer(subs.Unsubscribe) + stateAtBlock, err := s.GetShutterState(ctx, opts) + if err != nil { + return err } - - runner.Go(func() error { - return s.watchPaused(ctx) - }) + s.handle(ctx, stateAtBlock) return nil } @@ -144,48 +105,3 @@ func (s *ShutterStateSyncer) handle(ctx context.Context, ev *event.ShutterState) ) } } - -func (s *ShutterStateSyncer) watchPaused(ctx context.Context) error { - // query the initial state - // and construct a "virtual" - // event - opts := &bind.CallOpts{ - BlockNumber: s.StartBlock.Int, - Context: nil, - } - - if s.FetchActiveAtStartBlock { - stateAtStartBlock, err := s.GetShutterState(ctx, opts) - if err != nil { - // XXX: this will fail everything, do we want that? - return err - } - s.handle(ctx, stateAtStartBlock) - } - for { - select { - case unpaused, ok := <-s.unpausedCh: - if !ok { - return nil - } - block := unpaused.Raw.BlockNumber - ev := &event.ShutterState{ - Active: true, - AtBlockNumber: number.NewBlockNumber(&block), - } - s.handle(ctx, ev) - case paused, ok := <-s.pausedCh: - if !ok { - return nil - } - block := paused.Raw.BlockNumber - ev := &event.ShutterState{ - Active: false, - AtBlockNumber: number.NewBlockNumber(&block), - } - s.handle(ctx, ev) - case <-ctx.Done(): - return ctx.Err() - } - } -} diff --git a/rolling-shutter/medley/chainsync/syncer/unsafehead.go b/rolling-shutter/medley/chainsync/syncer/unsafehead.go index 60a0b783b..ccff7aca1 100644 --- a/rolling-shutter/medley/chainsync/syncer/unsafehead.go +++ b/rolling-shutter/medley/chainsync/syncer/unsafehead.go @@ -2,15 +2,17 @@ package syncer import ( "context" - "errors" "math/big" + "time" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/pkg/errors" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/retry" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) @@ -21,9 +23,16 @@ type UnsafeHeadSyncer struct { // Handler to be manually triggered // to handle their handler function // before the own Handler is called: - SyncedHandler []ManualFilterHandler + SyncedHandler []ManualFilterHandler + SyncStartBlock *number.BlockNumber + FetchActiveAtStart bool newLatestHeadCh chan *types.Header + + syncHead *types.Header + nextSyncBlock *big.Int + latestHead *types.Header + headerCache map[uint64]*types.Header } func (s *UnsafeHeadSyncer) Start(ctx context.Context, runner service.Runner) error { @@ -31,10 +40,35 @@ func (s *UnsafeHeadSyncer) Start(ctx context.Context, runner service.Runner) err if s.Handler == nil { return errors.New("no handler registered") } + + s.headerCache = map[uint64]*types.Header{} s.newLatestHeadCh = make(chan *types.Header, 1) + _, err := retry.FunctionCall( + ctx, + func(ctx context.Context) (bool, error) { + err := s.fetchInitialHeaders(ctx) + return err == nil, err + }, + ) + if err != nil { + return errors.Wrap(err, "fetch initial latest header and sync start header") + } + s.SyncStartBlock = &number.BlockNumber{Int: s.syncHead.Number} + if s.FetchActiveAtStart { + for _, h := range s.SyncedHandler { + err := h.HandleVirtualEvent(ctx, s.SyncStartBlock) + if err != nil { + s.Log.Error("synced handler call errored, skipping", "error", err) + } + } + } + err = s.handle(ctx, s.syncHead, false) + if err != nil { + return errors.Wrap(err, "handle initial sync block") + } + s.nextSyncBlock = new(big.Int).Add(s.syncHead.Number, big.NewInt(1)) subs, err := s.Client.SubscribeNewHead(ctx, s.newLatestHeadCh) - // FIXME: what to do on subs.Error() if err != nil { return err } @@ -48,78 +82,227 @@ func (s *UnsafeHeadSyncer) Start(ctx context.Context, runner service.Runner) err return nil } +func (s *UnsafeHeadSyncer) fetchInitialHeaders(ctx context.Context) error { + latest, err := s.Client.HeaderByNumber(ctx, nil) + if err != nil { + return errors.Wrap(err, "fetch latest header") + } + s.latestHead = latest + if s.SyncStartBlock.IsLatest() { + s.syncHead = latest + return nil + } + + start, err := s.Client.HeaderByNumber(ctx, s.SyncStartBlock.Int) + if err != nil { + return errors.Wrap(err, "fetch sync start header") + } + s.syncHead = start + return nil +} + func parseLogs() error { return nil } -func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context) error { - var currentBlock *big.Int +func (s *UnsafeHeadSyncer) handle(ctx context.Context, newHeader *types.Header, reorg bool) error { + blockNum := number.BigToBlockNumber(newHeader.Number) + if reorg { + prevNum := new(big.Int).Sub(newHeader.Number, big.NewInt(1)) + prevBlockNum := number.BigToBlockNumber(prevNum) + // Re-emit the previous block, to pre-emptively signal an + // incoming reorg. Like this a client is able to e.g. + // rewind changes first before processing the new + // events of the reorg + ev := &event.LatestBlock{ + Number: prevBlockNum, + BlockHash: newHeader.ParentHash, + Header: newHeader, + } + err := s.Handler(ctx, ev) + if err != nil { + // XXX: return or log? + // return err + s.Log.Error( + "handler for `NewLatestBlock` errored", + "error", + err.Error(), + ) + } + } + + // TODO: check bloom filter for topic of all + // synced handlers and only call them if + // the bloomfilter retrieves something. + for _, h := range s.SyncedHandler { + // NOTE: this has to be blocking! + // So whenever this returns, it is expected + // that the handlers Handle function + // has been called and it returned. + err := h.QueryAndHandle(ctx, blockNum.Uint64()) + if err != nil { + s.Log.Error("synced handler call errored, skipping", "error", err) + } + } + ev := &event.LatestBlock{ + Number: blockNum, + BlockHash: newHeader.Hash(), + } + err := s.Handler(ctx, ev) + if err != nil { + s.Log.Error( + "handler for `NewLatestBlock` errored", + "error", + err.Error(), + ) + } + return nil +} + +func (s *UnsafeHeadSyncer) fetchHeader(ctx context.Context, num *big.Int) (*types.Header, error) { + h, ok := s.headerCache[num.Uint64()] + if ok { + return h, nil + } + return s.Client.HeaderByNumber(ctx, num) +} + +func (s *UnsafeHeadSyncer) reset(ctx context.Context) error { + // this means the latest head was reset - check wether that + // concerns the current sync status + switch s.latestHead.Number.Cmp(s.syncHead.Number) { + case 1: + // we didn't catch up to the reorg position, so it's safe to ignore + // TODO: delete the forward caches + return nil + case 0: + // we already processed the head we re-orged to + if s.latestHead.Hash().Cmp(s.syncHead.Hash()) == 0 { + return nil + } + } + // definite reorg + if err := s.handle(ctx, s.latestHead, true); err != nil { + return err + } + s.syncHead = s.latestHead + s.nextSyncBlock = new(big.Int).Add(s.latestHead.Number, big.NewInt(1)) + return nil +} + +func (s *UnsafeHeadSyncer) sync(ctx context.Context) (syncing bool, delay time.Duration, err error) { + var newHead *types.Header + s.Log.Info("syncing chain") + + delta := new(big.Int).Sub(s.latestHead.Number, s.nextSyncBlock) + switch delta.Cmp(big.NewInt(0)) { + case 1: + // positive delta, we are still catching up + newHead, err = s.fetchHeader(ctx, s.nextSyncBlock) + syncing = true + delay = 1 * time.Second + case 0: + // next sync is latest head + // use the latest head + newHead = s.latestHead + syncing = false + case -1: + if delta.Cmp(big.NewInt(-1)) != 0 { + // next sync is more than one block further in the future. + // this could mean a reorg, but this should have been called before + // and it shouldn't come to this here + return false, delay, errors.New("unexpected reorg condition in sync") + } + // reorgs are handled outside, at the place new latest-head + // information arrives. + // next sync is 1 into the future. + // this means we called sync but are still waiting for the next latest head. + return false, delay, err + } + if err != nil { + return true, delay, err + } + + if handleErr := s.handle(ctx, newHead, false); handleErr != nil { + return true, delay, handleErr + } + s.syncHead = newHead + delete(s.headerCache, newHead.Number.Uint64()) + s.nextSyncBlock = new(big.Int).Add(newHead.Number, big.NewInt(1)) + + s.Log.Info("chain sync", + "synced-head-num", s.syncHead.Number.Uint64(), + "synced-head-hash", s.syncHead.Hash(), + "latest-head-num", s.latestHead.Number.Uint64(), + "latest-head-hash", s.latestHead.Hash(), + ) + return syncing, delay, err +} + +func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context) error { //nolint: gocyclo + t := time.NewTimer(0) + sync := t.C +work: for { select { + case <-ctx.Done(): + if sync != nil && !t.Stop() { + select { + case <-t.C: + // TODO: the non-blocking select is here as a + // precaution against an already emptied channel. + // This should not be necessary + default: + } + } + return ctx.Err() + case <-sync: + syncing, delay, err := s.sync(ctx) + if err != nil { + s.Log.Error("error during unsafe head sync", "error", err) + } + if !syncing { + s.Log.Info("stop syncing from sync") + sync = nil + continue work + } + t.Reset(delay) case newHeader, ok := <-s.newLatestHeadCh: if !ok { + s.Log.Info("latest head stream closed, exiting handler loop") return nil } - blockNum := number.BigToBlockNumber(newHeader.Number) - if currentBlock != nil { - switch newHeader.Number.Cmp(currentBlock) { - case -1, 0: - prevNum := new(big.Int).Sub(newHeader.Number, big.NewInt(1)) - prevBlockNum := number.BigToBlockNumber(prevNum) - // Re-emit the previous block, to pre-emptively signal an - // incoming reorg. Like this a client is able to e.g. - // rewind changes first before processing the new - // events of the reorg - ev := &event.LatestBlock{ - Number: prevBlockNum, - BlockHash: newHeader.ParentHash, - } - err := s.Handler(ctx, ev) - if err != nil { - // XXX: return or log? - // return err - s.Log.Error( - "handler for `NewLatestBlock` errored", - "error", - err.Error(), - ) - } - case 1: - // expected + s.Log.Info("new latest head from l2 ws-stream", "block-number", newHeader.Number.Uint64()) + if newHeader.Number.Cmp(s.latestHead.Number) <= 0 { + // reorg + s.Log.Info("new latest head is re-orging", + "old-block-number", s.latestHead.Number.Uint64(), + "new-block-number", newHeader.Number.Uint64(), + ) + + s.latestHead = newHeader + if err := s.reset(ctx); err != nil { + s.Log.Error("error resetting reorg", "error", err) } + continue work } + s.headerCache[newHeader.Number.Uint64()] = newHeader + s.latestHead = newHeader - // TODO: check bloom filter for topic of all - // synced handlers and only call them if - // the bloomfilter retrieves something. - for _, h := range s.SyncedHandler { - // NOTE: this has to be blocking! - // So whenever this returns, it is expected - // that the handlers Handle function - // has been called and it returned. - err := h.QueryAndHandle(ctx, blockNum.Uint64()) - if err != nil { - // XXX: return or log? - // return err - s.Log.Error("synced handler call errored, skipping", "error", err) + if sync != nil && !t.Stop() { + // only if sync is still actively waiting, + // we need to drain the timer and reset it + select { + case <-t.C: + // TODO: the non-blocking select is here as a + // precaution against an already emptied channel. + // This should not be necessary + default: } } - ev := &event.LatestBlock{ - Number: blockNum, - BlockHash: newHeader.Hash(), - Header: newHeader, - } - err := s.Handler(ctx, ev) - if err != nil { - s.Log.Error( - "handler for `NewLatestBlock` errored", - "error", - err.Error(), - ) - } - currentBlock = newHeader.Number - case <-ctx.Done(): - return ctx.Err() + t.Reset(0) + s.Log.Info("start syncing from latest head stream") + sync = t.C } } } diff --git a/rolling-shutter/medley/chainsync/syncer/util.go b/rolling-shutter/medley/chainsync/syncer/util.go index 916e93141..80da11aa2 100644 --- a/rolling-shutter/medley/chainsync/syncer/util.go +++ b/rolling-shutter/medley/chainsync/syncer/util.go @@ -20,6 +20,7 @@ var ( type ManualFilterHandler interface { QueryAndHandle(ctx context.Context, block uint64) error + HandleVirtualEvent(ctx context.Context, block *number.BlockNumber) error } func logToCallOpts(ctx context.Context, log *types.Log) *bind.CallOpts { From b1a10bd2fd4823a0da30637d1f98409ce5aa4c78 Mon Sep 17 00:00:00 2001 From: Maximilian Langenfeld <15726643+ezdac@users.noreply.github.com> Date: Fri, 10 May 2024 16:21:04 +0200 Subject: [PATCH 7/8] chore(chainsync): add unit test for reorgs --- rolling-shutter/medley/chainsync/client.go | 4 +- .../medley/chainsync/client/client.go | 15 +- .../medley/chainsync/client/test.go | 170 ++++++++++++++++++ rolling-shutter/medley/chainsync/options.go | 10 +- .../medley/chainsync/syncer/eonpubkey.go | 2 +- .../medley/chainsync/syncer/keyperset.go | 2 +- .../medley/chainsync/syncer/shutterstate.go | 2 +- .../medley/chainsync/syncer/unsafehead.go | 2 +- .../chainsync/syncer/unsafehead_test.go | 113 ++++++++++++ .../medley/chainsync/syncer/util.go | 2 +- 10 files changed, 309 insertions(+), 13 deletions(-) create mode 100644 rolling-shutter/medley/chainsync/client/test.go create mode 100644 rolling-shutter/medley/chainsync/syncer/unsafehead_test.go diff --git a/rolling-shutter/medley/chainsync/client.go b/rolling-shutter/medley/chainsync/client.go index bb2d3f3b0..6bd674905 100644 --- a/rolling-shutter/medley/chainsync/client.go +++ b/rolling-shutter/medley/chainsync/client.go @@ -24,7 +24,7 @@ var noopLogger = &logger.NoopLogger{} var ErrServiceNotInstantiated = errors.New("service is not instantiated, pass a handler function option") type Client struct { - client.EthereumClient + client.SyncEthereumClient log log.Logger options *options @@ -136,7 +136,7 @@ func (s *Client) BroadcastEonKey(ctx context.Context, eon uint64, eonPubKey []by // This value is cached, since it is not expected to change. func (s *Client) ChainID(ctx context.Context) (*big.Int, error) { if s.chainID == nil { - cid, err := s.EthereumClient.ChainID(ctx) + cid, err := s.SyncEthereumClient.ChainID(ctx) if err != nil { return nil, err } diff --git a/rolling-shutter/medley/chainsync/client/client.go b/rolling-shutter/medley/chainsync/client/client.go index a69e2692a..4924bc8c9 100644 --- a/rolling-shutter/medley/chainsync/client/client.go +++ b/rolling-shutter/medley/chainsync/client/client.go @@ -9,7 +9,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -type EthereumClient interface { +type FullEthereumClient interface { Close() ChainID(ctx context.Context) (*big.Int, error) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) @@ -45,3 +45,16 @@ type EthereumClient interface { EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) SendTransaction(ctx context.Context, tx *types.Transaction) error } + +type SyncEthereumClient interface { + Close() + ChainID(ctx context.Context) (*big.Int, error) + BlockNumber(ctx context.Context) (uint64, error) + HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) + HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) + FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) + SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) + CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) + TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) +} diff --git a/rolling-shutter/medley/chainsync/client/test.go b/rolling-shutter/medley/chainsync/client/test.go new file mode 100644 index 000000000..d436ac884 --- /dev/null +++ b/rolling-shutter/medley/chainsync/client/test.go @@ -0,0 +1,170 @@ +package client + +import ( + "context" + "errors" + "math/big" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +var ErrNotImplemented = errors.New("not implemented") + +var _ SyncEthereumClient = &TestClient{} + +type TestClient struct { + headerChain []*types.Header + latestHeadIndex int + intialProgress bool + latestHeadEmitter []chan<- *types.Header + latestHeadSubscription []*Subscription +} + +func NewSubscription(idx int) *Subscription { + return &Subscription{ + idx: idx, + err: make(chan error, 1), + } +} + +type Subscription struct { + idx int + err chan error +} + +func (su *Subscription) Unsubscribe() { + // TODO: not implemented yet, but we don't want to panic +} + +func (su *Subscription) Err() <-chan error { + return su.err +} + +type TestClientController struct { + c *TestClient +} + +func NewTestClient() (*TestClient, *TestClientController) { + c := &TestClient{ + headerChain: []*types.Header{}, + latestHeadIndex: 0, + } + ctrl := &TestClientController{c} + return c, ctrl +} + +func (c *TestClientController) ProgressHead() bool { + if c.c.latestHeadIndex >= len(c.c.headerChain)-1 { + return false + } + c.c.latestHeadIndex++ + return true +} + +func (c *TestClientController) EmitEvents(ctx context.Context) error { + if len(c.c.latestHeadEmitter) == 0 { + return nil + } + h := c.c.getLatestHeader() + for _, em := range c.c.latestHeadEmitter { + select { + case em <- h: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil +} + +func (c *TestClientController) AppendNextHeaders(h ...*types.Header) { + c.c.headerChain = append(c.c.headerChain, h...) +} + +func (t *TestClient) ChainID(_ context.Context) (*big.Int, error) { + return big.NewInt(42), nil +} + +func (t *TestClient) Close() { + // TODO: cleanup +} + +func (t *TestClient) getLatestHeader() *types.Header { + if len(t.headerChain) == 0 { + return nil + } + return t.headerChain[t.latestHeadIndex] +} + +func (t *TestClient) searchBlock(f func(*types.Header) bool) *types.Header { + for i := t.latestHeadIndex; i >= 0; i-- { + h := t.headerChain[i] + if f(h) { + return h + } + } + return nil +} + +func (t *TestClient) searchBlockByNumber(number *big.Int) *types.Header { + return t.searchBlock( + func(h *types.Header) bool { + return h.Number.Cmp(number) == 0 + }) +} + +func (t *TestClient) searchBlockByHash(hash common.Hash) *types.Header { + return t.searchBlock( + func(h *types.Header) bool { + return hash.Cmp(h.Hash()) == 0 + }) +} + +func (t *TestClient) BlockNumber(_ context.Context) (uint64, error) { + return t.getLatestHeader().Nonce.Uint64(), nil +} + +func (t *TestClient) HeaderByHash(_ context.Context, hash common.Hash) (*types.Header, error) { + h := t.searchBlockByHash(hash) + if h == nil { + return nil, errors.New("header not found") + } + return h, nil +} + +func (t *TestClient) HeaderByNumber(_ context.Context, number *big.Int) (*types.Header, error) { + if number == nil { + return t.getLatestHeader(), nil + } + h := t.searchBlockByNumber(number) + if h == nil { + return nil, errors.New("header not found") + } + return h, nil +} + +func (t *TestClient) SubscribeNewHead(_ context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { + t.latestHeadEmitter = append(t.latestHeadEmitter, ch) + su := NewSubscription(len(t.latestHeadSubscription) - 1) + t.latestHeadSubscription = append(t.latestHeadSubscription, su) + // TODO: unsubscribe and deleting from the array + // TODO: filling error promise in the subscription + return su, nil +} + +func (t *TestClient) FilterLogs(_ context.Context, _ ethereum.FilterQuery) ([]types.Log, error) { + panic(ErrNotImplemented) +} + +func (t *TestClient) SubscribeFilterLogs(_ context.Context, _ ethereum.FilterQuery, _ chan<- types.Log) (ethereum.Subscription, error) { + panic(ErrNotImplemented) +} + +func (t *TestClient) CodeAt(_ context.Context, _ common.Address, _ *big.Int) ([]byte, error) { + panic(ErrNotImplemented) +} + +func (t *TestClient) TransactionReceipt(_ context.Context, _ common.Hash) (*types.Receipt, error) { + panic(ErrNotImplemented) +} diff --git a/rolling-shutter/medley/chainsync/options.go b/rolling-shutter/medley/chainsync/options.go index 076e1666c..9a7e43473 100644 --- a/rolling-shutter/medley/chainsync/options.go +++ b/rolling-shutter/medley/chainsync/options.go @@ -24,7 +24,7 @@ type options struct { keyperSetManagerAddress *common.Address keyBroadcastContractAddress *common.Address clientURL string - ethClient syncclient.EthereumClient + ethClient syncclient.FullEthereumClient logger log.Logger runner service.Runner syncStart *number.BlockNumber @@ -122,7 +122,7 @@ func (o *options) applyHandler(c *Client) error { // of shutter clients background workers. func (o *options) apply(ctx context.Context, c *Client) error { var ( - client syncclient.EthereumClient + client syncclient.SyncEthereumClient err error ) if o.clientURL != "" { @@ -132,12 +132,12 @@ func (o *options) apply(ctx context.Context, c *Client) error { } } client = o.ethClient - c.EthereumClient = client + c.SyncEthereumClient = client // the nil passthrough will use "latest" for each call, // but we want to harmonize and fix the sync start to a specific block. if o.syncStart.IsLatest() { - latestBlock, err := c.EthereumClient.BlockNumber(ctx) + latestBlock, err := c.SyncEthereumClient.BlockNumber(ctx) if err != nil { return errors.Wrap(err, "polling latest block") } @@ -219,7 +219,7 @@ func WithLogger(l log.Logger) Option { } } -func WithClient(client syncclient.EthereumClient) Option { +func WithClient(client syncclient.FullEthereumClient) Option { return func(o *options) error { o.ethClient = client return nil diff --git a/rolling-shutter/medley/chainsync/syncer/eonpubkey.go b/rolling-shutter/medley/chainsync/syncer/eonpubkey.go index 7aa6b8171..f055fc118 100644 --- a/rolling-shutter/medley/chainsync/syncer/eonpubkey.go +++ b/rolling-shutter/medley/chainsync/syncer/eonpubkey.go @@ -17,7 +17,7 @@ import ( var _ ManualFilterHandler = &EonPubKeySyncer{} type EonPubKeySyncer struct { - Client client.EthereumClient + Client client.SyncEthereumClient Log log.Logger KeyBroadcast *bindings.KeyBroadcastContract KeyperSetManager *bindings.KeyperSetManager diff --git a/rolling-shutter/medley/chainsync/syncer/keyperset.go b/rolling-shutter/medley/chainsync/syncer/keyperset.go index d0c50af19..be9e3c976 100644 --- a/rolling-shutter/medley/chainsync/syncer/keyperset.go +++ b/rolling-shutter/medley/chainsync/syncer/keyperset.go @@ -23,7 +23,7 @@ const channelSize = 10 var _ ManualFilterHandler = &KeyperSetSyncer{} type KeyperSetSyncer struct { - Client client.EthereumClient + Client client.FullEthereumClient Contract *bindings.KeyperSetManager Log log.Logger Handler event.KeyperSetHandler diff --git a/rolling-shutter/medley/chainsync/syncer/shutterstate.go b/rolling-shutter/medley/chainsync/syncer/shutterstate.go index c42eedd89..fbd7773de 100644 --- a/rolling-shutter/medley/chainsync/syncer/shutterstate.go +++ b/rolling-shutter/medley/chainsync/syncer/shutterstate.go @@ -16,7 +16,7 @@ import ( var _ ManualFilterHandler = &ShutterStateSyncer{} type ShutterStateSyncer struct { - Client client.EthereumClient + Client client.SyncEthereumClient Contract *bindings.KeyperSetManager Log log.Logger Handler event.ShutterStateHandler diff --git a/rolling-shutter/medley/chainsync/syncer/unsafehead.go b/rolling-shutter/medley/chainsync/syncer/unsafehead.go index ccff7aca1..a90b9c69f 100644 --- a/rolling-shutter/medley/chainsync/syncer/unsafehead.go +++ b/rolling-shutter/medley/chainsync/syncer/unsafehead.go @@ -17,7 +17,7 @@ import ( ) type UnsafeHeadSyncer struct { - Client client.EthereumClient + Client client.SyncEthereumClient Log log.Logger Handler event.BlockHandler // Handler to be manually triggered diff --git a/rolling-shutter/medley/chainsync/syncer/unsafehead_test.go b/rolling-shutter/medley/chainsync/syncer/unsafehead_test.go new file mode 100644 index 000000000..380a944f0 --- /dev/null +++ b/rolling-shutter/medley/chainsync/syncer/unsafehead_test.go @@ -0,0 +1,113 @@ +package syncer + +import ( + "context" + "fmt" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "gotest.tools/v3/assert" + + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/client" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/number" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" +) + +func MakeChain(start int64, startParent common.Hash, numHeader uint, seed int64) []*types.Header { + n := numHeader + parent := startParent + num := big.NewInt(start) + h := []*types.Header{} + + // change the hashes for different seeds + mixinh := common.BigToHash(big.NewInt(seed)) + for n > 0 { + head := &types.Header{ + ParentHash: parent, + Number: num, + MixDigest: mixinh, + } + h = append(h, head) + num = new(big.Int).Add(num, big.NewInt(1)) + parent = head.Hash() + n-- + } + return h +} + +func TestReorg(t *testing.T) { + headersBeforeReorg := MakeChain(1, common.BigToHash(big.NewInt(0)), 10, 42) + branchOff := headersBeforeReorg[5] + // block number 5 will be reorged + headersReorgBranch := MakeChain(branchOff.Number.Int64()+1, branchOff.Hash(), 10, 43) + clnt, ctl := client.NewTestClient() + ctl.AppendNextHeaders(headersBeforeReorg...) + ctl.AppendNextHeaders(headersReorgBranch...) + + handlerBlock := make(chan *event.LatestBlock, 1) + + h := &UnsafeHeadSyncer{ + Client: clnt, + Log: log.New(), + Handler: func(_ context.Context, ev *event.LatestBlock) error { + handlerBlock <- ev + return nil + }, + SyncedHandler: []ManualFilterHandler{}, + SyncStartBlock: number.NewBlockNumber(nil), + FetchActiveAtStart: false, + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + service.RunBackground(ctx, h) + + // intitial sync is independent of the subscription, + // this will get polled from the eth client + b := <-handlerBlock + assert.Assert(t, b.Number.Cmp(headersBeforeReorg[0].Number) == 0) + idx := 1 + for { + ok := ctl.ProgressHead() + assert.Assert(t, ok) + err := ctl.EmitEvents(ctx) + assert.NilError(t, err) + + b = <-handlerBlock + assert.Equal(t, b.Number.Uint64(), headersBeforeReorg[idx].Number.Uint64(), fmt.Sprintf("block number equal for idx %d", idx)) + assert.Equal(t, b.BlockHash, headersBeforeReorg[idx].Hash()) + idx++ + if idx == len(headersBeforeReorg) { + break + } + } + ok := ctl.ProgressHead() + assert.Assert(t, ok) + err := ctl.EmitEvents(ctx) + assert.NilError(t, err) + b = <-handlerBlock + // now the reorg should have happened. + // the handler should have emitted an "artificial" latest head + // event for the block BEFORE the re-orged block + assert.Equal(t, b.Number.Uint64(), headersReorgBranch[0].Number.Uint64()-1, "block number equal for reorg") + assert.Equal(t, b.BlockHash, headersReorgBranch[0].ParentHash) + idx = 0 + for ctl.ProgressHead() { + assert.Assert(t, ok) + err := ctl.EmitEvents(ctx) + assert.NilError(t, err) + + b := <-handlerBlock + assert.Equal(t, b.Number.Uint64(), headersReorgBranch[idx].Number.Uint64(), fmt.Sprintf("block number equal for idx %d", idx)) + assert.Equal(t, b.BlockHash, headersReorgBranch[idx].Hash()) + idx++ + if idx == len(headersReorgBranch) { + break + } + } +} diff --git a/rolling-shutter/medley/chainsync/syncer/util.go b/rolling-shutter/medley/chainsync/syncer/util.go index 80da11aa2..8a470f06a 100644 --- a/rolling-shutter/medley/chainsync/syncer/util.go +++ b/rolling-shutter/medley/chainsync/syncer/util.go @@ -45,7 +45,7 @@ func guardCallOpts(opts *bind.CallOpts, allowLatest bool) error { return nil } -func fixCallOpts(ctx context.Context, c client.EthereumClient, opts *bind.CallOpts) (*bind.CallOpts, *uint64, error) { +func fixCallOpts(ctx context.Context, c client.SyncEthereumClient, opts *bind.CallOpts) (*bind.CallOpts, *uint64, error) { err := guardCallOpts(opts, false) if err == nil { return opts, nil, nil From 80f095dc7984fcf9fc948556f04abf52968418f9 Mon Sep 17 00:00:00 2001 From: Maximilian Langenfeld <15726643+ezdac@users.noreply.github.com> Date: Fri, 31 May 2024 10:51:40 +0200 Subject: [PATCH 8/8] fix: lint --- rolling-shutter/p2p/params.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rolling-shutter/p2p/params.go b/rolling-shutter/p2p/params.go index ad385e71a..901d53817 100644 --- a/rolling-shutter/p2p/params.go +++ b/rolling-shutter/p2p/params.go @@ -19,7 +19,9 @@ func makePubSubParams( gossipSubParams := &gsDefault // modified defaults from ethereum consensus spec - // https://github.com/ethereum/consensus-specs/blob/5d80b1954a4b7a121aa36143d50b366727b66cbc/specs/phase0/p2p-interface.md#why-are-these-specific-gossip-parameters-chosen //nolint:lll + + //nolint:lll + // https://github.com/ethereum/consensus-specs/blob/5d80b1954a4b7a121aa36143d50b366727b66cbc/specs/phase0/p2p-interface.md#why-are-these-specific-gossip-parameters-chosen gossipSubParams.HeartbeatInterval = 700 * time.Millisecond gossipSubParams.HistoryLength = 6