From 79dbc5a27b1b4c3e13189991c9a7d898557e81f2 Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Fri, 11 Apr 2025 12:23:00 +0530 Subject: [PATCH 1/7] feat: created generic syncmonitor package in medley and updated gnosis and api keyper services --- rolling-shutter/keyperimpl/gnosis/keyper.go | 11 +- .../keyperimpl/gnosis/syncmonitor.go | 64 ------- .../keyperimpl/gnosis/syncmonitor_test.go | 146 --------------- .../keyperimpl/gnosis/syncstate.go | 24 +++ .../keyperimpl/shutterservice/keyper.go | 9 +- .../keyperimpl/shutterservice/syncstate.go | 24 +++ .../syncmonitor}/syncmonitor.go | 35 ++-- .../syncmonitor}/syncmonitor_test.go | 167 ++++++++---------- 8 files changed, 156 insertions(+), 324 deletions(-) delete mode 100644 rolling-shutter/keyperimpl/gnosis/syncmonitor.go delete mode 100644 rolling-shutter/keyperimpl/gnosis/syncmonitor_test.go create mode 100644 rolling-shutter/keyperimpl/gnosis/syncstate.go create mode 100644 rolling-shutter/keyperimpl/shutterservice/syncstate.go rename rolling-shutter/{keyperimpl/shutterservice => medley/syncmonitor}/syncmonitor.go (75%) rename rolling-shutter/{keyperimpl/shutterservice => medley/syncmonitor}/syncmonitor_test.go (53%) diff --git a/rolling-shutter/keyperimpl/gnosis/keyper.go b/rolling-shutter/keyperimpl/gnosis/keyper.go index c6d12141..2aaa1be2 100644 --- a/rolling-shutter/keyperimpl/gnosis/keyper.go +++ b/rolling-shutter/keyperimpl/gnosis/keyper.go @@ -25,6 +25,7 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/db" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/slotticker" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/syncmonitor" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2p" ) @@ -49,7 +50,7 @@ type Keyper struct { validatorSyncer *ValidatorSyncer eonKeyPublisher *eonkeypublisher.EonKeyPublisher latestTriggeredSlot *uint64 - syncMonitor *SyncMonitor + syncMonitor *syncmonitor.SyncMonitor // input events newBlocks chan *syncevent.LatestBlock @@ -63,8 +64,7 @@ type Keyper struct { func New(c *Config) *Keyper { return &Keyper{ - config: c, - syncMonitor: &SyncMonitor{}, + config: c, } } @@ -156,9 +156,12 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error { return errors.Wrap(err, "failed to reset transaction pointer age") } - kpr.syncMonitor = &SyncMonitor{ + kpr.syncMonitor = &syncmonitor.SyncMonitor{ DBPool: kpr.dbpool, CheckInterval: time.Duration(kpr.config.Gnosis.SyncMonitorCheckInterval) * time.Second, + SyncState: &GnosisSyncState{ + kpr.dbpool, + }, } runner.Go(func() error { return kpr.processInputs(ctx) }) diff --git a/rolling-shutter/keyperimpl/gnosis/syncmonitor.go b/rolling-shutter/keyperimpl/gnosis/syncmonitor.go deleted file mode 100644 index 5f47cab0..00000000 --- a/rolling-shutter/keyperimpl/gnosis/syncmonitor.go +++ /dev/null @@ -1,64 +0,0 @@ -package gnosis - -import ( - "context" - "time" - - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" - - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" -) - -type SyncMonitor struct { - DBPool *pgxpool.Pool - CheckInterval time.Duration -} - -func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error { - runner.Go(func() error { - return s.runMonitor(ctx) - }) - - return nil -} - -func (s *SyncMonitor) runMonitor(ctx context.Context) error { - var lastBlockNumber int64 - db := database.New(s.DBPool) - - log.Debug().Msg("starting the sync monitor") - - for { - select { - case <-time.After(s.CheckInterval): - record, err := db.GetTransactionSubmittedEventsSyncedUntil(ctx) - if err != nil { - if errors.Is(err, pgx.ErrNoRows) { - log.Warn().Err(err).Msg("no rows found in table transaction_submitted_events_synced_until") - continue - } - return errors.Wrap(err, "error getting transaction_submitted_events_synced_until") - } - - currentBlockNumber := record.BlockNumber - log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number") - - if currentBlockNumber > lastBlockNumber { - lastBlockNumber = currentBlockNumber - } else { - log.Error(). - Int64("last-block-number", lastBlockNumber). - Int64("current-block-number", currentBlockNumber). - Msg("block number has not increased between checks") - return errors.New("block number has not increased between checks") - } - case <-ctx.Done(): - log.Info().Msg("stopping syncMonitor due to context cancellation") - return ctx.Err() - } - } -} diff --git a/rolling-shutter/keyperimpl/gnosis/syncmonitor_test.go b/rolling-shutter/keyperimpl/gnosis/syncmonitor_test.go deleted file mode 100644 index 154d13e2..00000000 --- a/rolling-shutter/keyperimpl/gnosis/syncmonitor_test.go +++ /dev/null @@ -1,146 +0,0 @@ -package gnosis_test - -import ( - "context" - "testing" - "time" - - "gotest.tools/assert" - - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis" - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/testsetup" -) - -func TestSyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbpool, dbclose := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer dbclose() - db := database.New(dbpool) - - initialBlockNumber := int64(100) - - err := db.SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: initialBlockNumber, - Slot: 1, - }) - if err != nil { - t.Fatalf("failed to set initial synced data: %v", err) - } - - monitor := &gnosis.SyncMonitor{ - DBPool: dbpool, - CheckInterval: 5 * time.Second, - } - - errCh := make(chan error, 1) - - go func() { - err := service.RunWithSighandler(ctx, monitor) - if err != nil { - errCh <- err - } - }() - - time.Sleep(12 * time.Second) - - select { - case err := <-errCh: - assert.ErrorContains(t, err, "block number has not increased between checks") - case <-time.After(5 * time.Second): - t.Fatal("expected an error, but none was returned") - } -} - -func TestSyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer closeDB() - db := database.New(dbpool) - - initialBlockNumber := int64(100) - err := db.SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: initialBlockNumber, - Slot: 1, - }) - if err != nil { - t.Fatalf("failed to set initial synced data: %v", err) - } - - monitor := &gnosis.SyncMonitor{ - DBPool: dbpool, - CheckInterval: 5 * time.Second, - } - - _, deferFn := service.RunBackground(ctx, monitor) - defer deferFn() - - doneCh := make(chan struct{}) - go func() { - for i := 0; i < 5; i++ { - newBlockNumber := initialBlockNumber + int64(i+1) - err := db.SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: newBlockNumber, - Slot: 1, - }) - if err != nil { - t.Errorf("failed to update block number: %v", err) - return - } - - time.Sleep(5 * time.Second) - } - - doneCh <- struct{}{} - }() - - <-doneCh - syncedData, err := db.GetTransactionSubmittedEventsSyncedUntil(ctx) - if err != nil { - t.Fatalf("failed to retrieve final block number: %v", err) - } - - assert.Equal(t, initialBlockNumber+5, syncedData.BlockNumber, "block number should have been incremented correctly") -} - -func TestSyncMonitor_ContinuesWhenNoRows(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer closeDB() - _ = database.New(dbpool) - - monitor := &gnosis.SyncMonitor{ - DBPool: dbpool, - CheckInterval: 5 * time.Second, - } - - monitorCtx, cancelMonitor := context.WithCancel(ctx) - defer cancelMonitor() - - errCh := make(chan error, 1) - go func() { - err := service.RunWithSighandler(monitorCtx, monitor) - if err != nil { - errCh <- err - } - }() - - time.Sleep(15 * time.Second) - cancelMonitor() - - select { - case err := <-errCh: - t.Fatalf("expected monitor to continue without error, but got: %v", err) - case <-time.After(1 * time.Second): - } -} diff --git a/rolling-shutter/keyperimpl/gnosis/syncstate.go b/rolling-shutter/keyperimpl/gnosis/syncstate.go new file mode 100644 index 00000000..0549db90 --- /dev/null +++ b/rolling-shutter/keyperimpl/gnosis/syncstate.go @@ -0,0 +1,24 @@ +package gnosis + +import ( + "context" + + "github.com/jackc/pgx/v4/pgxpool" + + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" +) + +// GnosisSyncState implements the BlockSyncState interface for the gnosis keyper +type GnosisSyncState struct { + DBPool *pgxpool.Pool +} + +// GetSyncedBlockNumber retrieves the current synced block number from transaction submitted events +func (s *GnosisSyncState) GetSyncedBlockNumber(ctx context.Context) (int64, error) { + db := database.New(s.DBPool) + record, err := db.GetTransactionSubmittedEventsSyncedUntil(ctx) + if err != nil { + return 0, err + } + return record.BlockNumber, nil +} diff --git a/rolling-shutter/keyperimpl/shutterservice/keyper.go b/rolling-shutter/keyperimpl/shutterservice/keyper.go index f5ecefbc..85c0b0b3 100644 --- a/rolling-shutter/keyperimpl/shutterservice/keyper.go +++ b/rolling-shutter/keyperimpl/shutterservice/keyper.go @@ -22,6 +22,7 @@ import ( syncevent "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/db" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/syncmonitor" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2p" ) @@ -36,7 +37,7 @@ type Keyper struct { registrySyncer *RegistrySyncer eonKeyPublisher *eonkeypublisher.EonKeyPublisher latestTriggeredTime *uint64 - syncMonitor *SyncMonitor + syncMonitor *syncmonitor.SyncMonitor // input events newBlocks chan *syncevent.LatestBlock @@ -113,10 +114,14 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error { return err } - kpr.syncMonitor = &SyncMonitor{ + kpr.syncMonitor = &syncmonitor.SyncMonitor{ DBPool: kpr.dbpool, CheckInterval: time.Duration(kpr.config.Chain.SyncMonitorCheckInterval) * time.Second, + SyncState: &ShutterServiceSyncState{ + kpr.dbpool, + }, } + runner.Go(func() error { return kpr.processInputs(ctx) }) return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.eonKeyPublisher, kpr.syncMonitor) } diff --git a/rolling-shutter/keyperimpl/shutterservice/syncstate.go b/rolling-shutter/keyperimpl/shutterservice/syncstate.go new file mode 100644 index 00000000..934edfc8 --- /dev/null +++ b/rolling-shutter/keyperimpl/shutterservice/syncstate.go @@ -0,0 +1,24 @@ +package shutterservice + +import ( + "context" + + "github.com/jackc/pgx/v4/pgxpool" + + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database" +) + +// ShutterServiceSyncState implements the BlockSyncState interface for the shutter service +type ShutterServiceSyncState struct { + DBPool *pgxpool.Pool +} + +// GetSyncedBlockNumber retrieves the current synced block number from identity events +func (s *ShutterServiceSyncState) GetSyncedBlockNumber(ctx context.Context) (int64, error) { + db := database.New(s.DBPool) + record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx) + if err != nil { + return 0, err + } + return record.BlockNumber, nil +} diff --git a/rolling-shutter/keyperimpl/shutterservice/syncmonitor.go b/rolling-shutter/medley/syncmonitor/syncmonitor.go similarity index 75% rename from rolling-shutter/keyperimpl/shutterservice/syncmonitor.go rename to rolling-shutter/medley/syncmonitor/syncmonitor.go index cbd13315..8334e9dd 100644 --- a/rolling-shutter/keyperimpl/shutterservice/syncmonitor.go +++ b/rolling-shutter/medley/syncmonitor/syncmonitor.go @@ -1,23 +1,31 @@ -package shutterservice +package syncmonitor import ( "context" + "errors" "fmt" "time" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" - "github.com/pkg/errors" "github.com/rs/zerolog/log" - keyperDB "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database" - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) +// BlockSyncState is an interface that different keyper implementations +// can implement to provide their own block sync state logic +type BlockSyncState interface { + // GetSyncedBlockNumber retrieves the current synced block number + GetSyncedBlockNumber(ctx context.Context) (int64, error) +} + +// SyncMonitor monitors the sync state of the keyper type SyncMonitor struct { DBPool *pgxpool.Pool CheckInterval time.Duration + SyncState BlockSyncState } func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error { @@ -30,15 +38,14 @@ func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error { func (s *SyncMonitor) runMonitor(ctx context.Context) error { var lastBlockNumber int64 - db := database.New(s.DBPool) - keyperdb := keyperDB.New(s.DBPool) + keyperdb := database.New(s.DBPool) log.Debug().Msg("starting the sync monitor") for { select { case <-time.After(s.CheckInterval): - if err := s.runCheck(ctx, db, keyperdb, &lastBlockNumber); err != nil { + if err := s.runCheck(ctx, keyperdb, &lastBlockNumber); err != nil { if errors.Is(err, ErrBlockNotIncreasing) { return err } @@ -55,29 +62,27 @@ var ErrBlockNotIncreasing = errors.New("block number has not increased between c func (s *SyncMonitor) runCheck( ctx context.Context, - db *database.Queries, - keyperdb *keyperDB.Queries, + keyperdb *database.Queries, lastBlockNumber *int64, ) error { isRunning, err := s.isDKGRunning(ctx, keyperdb) if err != nil { - return fmt.Errorf("syncMonitor | error in dkgIsRunning: %w", err) + return fmt.Errorf("syncMonitor | error in isDKGRunning: %w", err) } if isRunning { log.Debug().Msg("dkg is running, skipping sync monitor checks") return nil } - record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx) + currentBlockNumber, err := s.SyncState.GetSyncedBlockNumber(ctx) if err != nil { if errors.Is(err, pgx.ErrNoRows) { - log.Warn().Err(err).Msg("no rows found in table identity_registered_events_synced_until") + log.Warn().Err(err).Msg("no rows found in sync state table") return nil // This is not an error condition that should stop monitoring } - return fmt.Errorf("error getting identity_registered_events_synced_until: %w", err) + return fmt.Errorf("error getting synced block number: %w", err) } - currentBlockNumber := record.BlockNumber log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number") if currentBlockNumber > *lastBlockNumber { @@ -92,7 +97,7 @@ func (s *SyncMonitor) runCheck( return ErrBlockNotIncreasing } -func (s *SyncMonitor) isDKGRunning(ctx context.Context, keyperdb *keyperDB.Queries) (bool, error) { +func (s *SyncMonitor) isDKGRunning(ctx context.Context, keyperdb *database.Queries) (bool, error) { // if latest eon is registered then EonStarted event has triggered, which means the dkg can start eons, err := keyperdb.GetAllEons(ctx) if errors.Is(err, pgx.ErrNoRows) { diff --git a/rolling-shutter/keyperimpl/shutterservice/syncmonitor_test.go b/rolling-shutter/medley/syncmonitor/syncmonitor_test.go similarity index 53% rename from rolling-shutter/keyperimpl/shutterservice/syncmonitor_test.go rename to rolling-shutter/medley/syncmonitor/syncmonitor_test.go index 47a36d96..1d780a82 100644 --- a/rolling-shutter/keyperimpl/shutterservice/syncmonitor_test.go +++ b/rolling-shutter/medley/syncmonitor/syncmonitor_test.go @@ -1,47 +1,48 @@ -package shutterservice_test +package syncmonitor import ( "context" "testing" "time" + "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" "gotest.tools/assert" - keyperDB "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database" - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice" - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/testsetup" ) -func setupTestData(ctx context.Context, t *testing.T, dbpool *pgxpool.Pool, blockNumber int64) { +// MockSyncState is a mock implementation of BlockSyncState for testing +type MockSyncState struct { + blockNumber int64 + err error +} + +func (m *MockSyncState) GetSyncedBlockNumber(ctx context.Context) (int64, error) { + return m.blockNumber, m.err +} + +func setupTestData(ctx context.Context, t *testing.T, dbpool *pgxpool.Pool) { t.Helper() - db := database.New(dbpool) - keyperdb := keyperDB.New(dbpool) + keyperdb := database.New(dbpool) // Set up eon - err := keyperdb.InsertEon(ctx, keyperDB.InsertEonParams{ + err := keyperdb.InsertEon(ctx, database.InsertEonParams{ Eon: 1, }) assert.NilError(t, err) // Set up DKG result - err = keyperdb.InsertDKGResult(ctx, keyperDB.InsertDKGResultParams{ + err = keyperdb.InsertDKGResult(ctx, database.InsertDKGResultParams{ Eon: 1, Success: true, }) assert.NilError(t, err) - - // Set up initial block - err = db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: blockNumber, - }) - assert.NilError(t, err) } -func TestAPISyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) { +func TestSyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -49,11 +50,16 @@ func TestAPISyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) { defer dbclose() initialBlockNumber := int64(100) - setupTestData(ctx, t, dbpool, initialBlockNumber) + setupTestData(ctx, t, dbpool) + + mockSyncState := &MockSyncState{ + blockNumber: initialBlockNumber, + } - monitor := &shutterservice.SyncMonitor{ + monitor := &SyncMonitor{ DBPool: dbpool, CheckInterval: 5 * time.Second, + SyncState: mockSyncState, } errCh := make(chan error, 1) @@ -68,26 +74,30 @@ func TestAPISyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) { select { case err := <-errCh: - assert.ErrorContains(t, err, shutterservice.ErrBlockNotIncreasing.Error()) + assert.ErrorContains(t, err, ErrBlockNotIncreasing.Error()) case <-time.After(5 * time.Second): t.Fatal("expected an error, but none was returned") } } -func TestAPISyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) { +func TestSyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) defer closeDB() - db := database.New(dbpool) initialBlockNumber := int64(100) - setupTestData(ctx, t, dbpool, initialBlockNumber) + setupTestData(ctx, t, dbpool) - monitor := &shutterservice.SyncMonitor{ + mockSyncState := &MockSyncState{ + blockNumber: initialBlockNumber, + } + + monitor := &SyncMonitor{ DBPool: dbpool, CheckInterval: 5 * time.Second, + SyncState: mockSyncState, } _, deferFn := service.RunBackground(ctx, monitor) @@ -96,57 +106,40 @@ func TestAPISyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) { doneCh := make(chan struct{}) go func() { for i := 0; i < 5; i++ { - newBlockNumber := initialBlockNumber + int64(i+1) - err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: newBlockNumber, - }) - if err != nil { - t.Errorf("failed to update block number: %v", err) - return - } - time.Sleep(5 * time.Second) + mockSyncState.blockNumber = initialBlockNumber + int64(i+1) } doneCh <- struct{}{} }() <-doneCh - syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx) - if err != nil { - t.Fatalf("failed to retrieve final block number: %v", err) - } - - assert.Equal(t, initialBlockNumber+5, syncedData.BlockNumber, "block number should have been incremented correctly") + assert.Equal(t, initialBlockNumber+5, mockSyncState.blockNumber, "block number should have been incremented correctly") } -func TestAPISyncMonitor_SkipsWhenDKGIsRunning(t *testing.T) { +func TestSyncMonitor_SkipsWhenDKGIsRunning(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) defer closeDB() - db := database.New(dbpool) - keyperdb := keyperDB.New(dbpool) + keyperdb := database.New(dbpool) // Set up eon but no DKG result to simulate DKG running - err := keyperdb.InsertEon(ctx, keyperDB.InsertEonParams{ + err := keyperdb.InsertEon(ctx, database.InsertEonParams{ Eon: 1, }) assert.NilError(t, err) - // Set up initial block data initialBlockNumber := int64(100) - err = db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: initialBlockNumber, - }) - assert.NilError(t, err) + mockSyncState := &MockSyncState{ + blockNumber: initialBlockNumber, + } - monitor := &shutterservice.SyncMonitor{ + monitor := &SyncMonitor{ DBPool: dbpool, CheckInterval: 5 * time.Second, + SyncState: mockSyncState, } monitorCtx, cancelMonitor := context.WithCancel(ctx) @@ -172,30 +165,25 @@ func TestAPISyncMonitor_SkipsWhenDKGIsRunning(t *testing.T) { } // Verify the block number hasn't changed - syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx) - assert.NilError(t, err) - assert.Equal(t, initialBlockNumber, syncedData.BlockNumber, "block number should remain unchanged") + assert.Equal(t, initialBlockNumber, mockSyncState.blockNumber, "block number should remain unchanged") } -func TestAPISyncMonitor_RunsNormallyWhenNoEons(t *testing.T) { +func TestSyncMonitor_RunsNormallyWhenNoEons(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) defer closeDB() - db := database.New(dbpool) - // Only set up initial block data, no eon entries initialBlockNumber := int64(100) - err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: initialBlockNumber, - }) - assert.NilError(t, err) + mockSyncState := &MockSyncState{ + blockNumber: initialBlockNumber, + } - monitor := &shutterservice.SyncMonitor{ + monitor := &SyncMonitor{ DBPool: dbpool, CheckInterval: 5 * time.Second, + SyncState: mockSyncState, } monitorCtx, cancelMonitor := context.WithCancel(ctx) @@ -215,41 +203,41 @@ func TestAPISyncMonitor_RunsNormallyWhenNoEons(t *testing.T) { select { case err := <-errCh: - assert.ErrorContains(t, err, shutterservice.ErrBlockNotIncreasing.Error()) + assert.ErrorContains(t, err, ErrBlockNotIncreasing.Error()) case <-time.After(1 * time.Second): t.Fatalf("expected monitor to throw error, but no error returned") } - - // Verify the block number hasn't changed - syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx) - assert.NilError(t, err) - assert.Equal(t, initialBlockNumber, syncedData.BlockNumber, "block number should remain unchanged") } -func TestAPISyncMonitor_ContinuesWhenNoRows(t *testing.T) { +func TestSyncMonitor_ContinuesWhenNoRows(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) defer closeDB() - // Set up eon and DKG result, but no block data - keyperdb := keyperDB.New(dbpool) - - err := keyperdb.InsertEon(ctx, keyperDB.InsertEonParams{ + // Set up eon and DKG result + keyperdb := database.New(dbpool) + err := keyperdb.InsertEon(ctx, database.InsertEonParams{ Eon: 1, }) assert.NilError(t, err) - err = keyperdb.InsertDKGResult(ctx, keyperDB.InsertDKGResultParams{ + err = keyperdb.InsertDKGResult(ctx, database.InsertDKGResultParams{ Eon: 1, Success: true, }) assert.NilError(t, err) - monitor := &shutterservice.SyncMonitor{ + // Set up mock sync state that returns no rows error + mockSyncState := &MockSyncState{ + err: pgx.ErrNoRows, + } + + monitor := &SyncMonitor{ DBPool: dbpool, CheckInterval: 5 * time.Second, + SyncState: mockSyncState, } monitorCtx, cancelMonitor := context.WithCancel(ctx) @@ -270,30 +258,28 @@ func TestAPISyncMonitor_ContinuesWhenNoRows(t *testing.T) { case err := <-errCh: t.Fatalf("expected monitor to continue without error, but got: %v", err) case <-time.After(1 * time.Second): + // Test passes if no error is received } } -func TestAPISyncMonitor_RunsNormallyWithCompletedDKG(t *testing.T) { +func TestSyncMonitor_RunsNormallyWithCompletedDKG(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) defer closeDB() - db := database.New(dbpool) - initialBlockNumber := int64(100) - setupTestData(ctx, t, dbpool, initialBlockNumber) + setupTestData(ctx, t, dbpool) - // Set up initial block data - err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: initialBlockNumber, - }) - assert.NilError(t, err) + initialBlockNumber := int64(100) + mockSyncState := &MockSyncState{ + blockNumber: initialBlockNumber, + } - monitor := &shutterservice.SyncMonitor{ + monitor := &SyncMonitor{ DBPool: dbpool, CheckInterval: 5 * time.Second, + SyncState: mockSyncState, } monitorCtx, cancelMonitor := context.WithCancel(ctx) @@ -313,13 +299,8 @@ func TestAPISyncMonitor_RunsNormallyWithCompletedDKG(t *testing.T) { select { case err := <-errCh: - assert.ErrorContains(t, err, shutterservice.ErrBlockNotIncreasing.Error()) + assert.ErrorContains(t, err, ErrBlockNotIncreasing.Error()) case <-time.After(1 * time.Second): t.Fatalf("expected monitor to throw error, but no error returned") } - - // Verify the block number hasn't changed - syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx) - assert.NilError(t, err) - assert.Equal(t, initialBlockNumber, syncedData.BlockNumber, "block number should remain unchanged") } From 97f317d0e643dd7880090acdd946cb3ce84acc52 Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Fri, 11 Apr 2025 12:34:30 +0530 Subject: [PATCH 2/7] fix: generic sync monitor | lint fixes --- rolling-shutter/keyperimpl/gnosis/syncstate.go | 4 ++-- rolling-shutter/keyperimpl/shutterservice/syncstate.go | 4 ++-- rolling-shutter/medley/syncmonitor/syncmonitor.go | 6 +++--- rolling-shutter/medley/syncmonitor/syncmonitor_test.go | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/rolling-shutter/keyperimpl/gnosis/syncstate.go b/rolling-shutter/keyperimpl/gnosis/syncstate.go index 0549db90..5fcbb8a7 100644 --- a/rolling-shutter/keyperimpl/gnosis/syncstate.go +++ b/rolling-shutter/keyperimpl/gnosis/syncstate.go @@ -8,12 +8,12 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" ) -// GnosisSyncState implements the BlockSyncState interface for the gnosis keyper +// GnosisSyncState implements the BlockSyncState interface for the gnosis keyper. type GnosisSyncState struct { DBPool *pgxpool.Pool } -// GetSyncedBlockNumber retrieves the current synced block number from transaction submitted events +// GetSyncedBlockNumber retrieves the current synced block number from transaction submitted events. func (s *GnosisSyncState) GetSyncedBlockNumber(ctx context.Context) (int64, error) { db := database.New(s.DBPool) record, err := db.GetTransactionSubmittedEventsSyncedUntil(ctx) diff --git a/rolling-shutter/keyperimpl/shutterservice/syncstate.go b/rolling-shutter/keyperimpl/shutterservice/syncstate.go index 934edfc8..17f24b4f 100644 --- a/rolling-shutter/keyperimpl/shutterservice/syncstate.go +++ b/rolling-shutter/keyperimpl/shutterservice/syncstate.go @@ -8,12 +8,12 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database" ) -// ShutterServiceSyncState implements the BlockSyncState interface for the shutter service +// ShutterServiceSyncState implements the BlockSyncState interface for the shutter service. type ShutterServiceSyncState struct { DBPool *pgxpool.Pool } -// GetSyncedBlockNumber retrieves the current synced block number from identity events +// GetSyncedBlockNumber retrieves the current synced block number from identity events. func (s *ShutterServiceSyncState) GetSyncedBlockNumber(ctx context.Context) (int64, error) { db := database.New(s.DBPool) record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx) diff --git a/rolling-shutter/medley/syncmonitor/syncmonitor.go b/rolling-shutter/medley/syncmonitor/syncmonitor.go index 8334e9dd..fdfa536a 100644 --- a/rolling-shutter/medley/syncmonitor/syncmonitor.go +++ b/rolling-shutter/medley/syncmonitor/syncmonitor.go @@ -15,13 +15,13 @@ import ( ) // BlockSyncState is an interface that different keyper implementations -// can implement to provide their own block sync state logic +// can implement to provide their own block sync state logic. type BlockSyncState interface { - // GetSyncedBlockNumber retrieves the current synced block number + // GetSyncedBlockNumber retrieves the current synced block number. GetSyncedBlockNumber(ctx context.Context) (int64, error) } -// SyncMonitor monitors the sync state of the keyper +// SyncMonitor monitors the sync state of the keyper. type SyncMonitor struct { DBPool *pgxpool.Pool CheckInterval time.Duration diff --git a/rolling-shutter/medley/syncmonitor/syncmonitor_test.go b/rolling-shutter/medley/syncmonitor/syncmonitor_test.go index 1d780a82..d3f1f0be 100644 --- a/rolling-shutter/medley/syncmonitor/syncmonitor_test.go +++ b/rolling-shutter/medley/syncmonitor/syncmonitor_test.go @@ -14,13 +14,13 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/testsetup" ) -// MockSyncState is a mock implementation of BlockSyncState for testing +// MockSyncState is a mock implementation of BlockSyncState for testing. type MockSyncState struct { blockNumber int64 err error } -func (m *MockSyncState) GetSyncedBlockNumber(ctx context.Context) (int64, error) { +func (m *MockSyncState) GetSyncedBlockNumber(_ context.Context) (int64, error) { return m.blockNumber, m.err } From c15341ac62bad282e42661d8e13ec3461a7fffee Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Tue, 15 Apr 2025 13:34:51 +0530 Subject: [PATCH 3/7] moved sync monitor to keyper core from medley --- .../syncmonitor/syncmonitor.go | 0 .../syncmonitor/syncmonitor_test.go | 54 ++++++++++++++----- rolling-shutter/keyperimpl/gnosis/keyper.go | 2 +- .../keyperimpl/shutterservice/keyper.go | 2 +- 4 files changed, 43 insertions(+), 15 deletions(-) rename rolling-shutter/{medley => keyper}/syncmonitor/syncmonitor.go (100%) rename rolling-shutter/{medley => keyper}/syncmonitor/syncmonitor_test.go (83%) diff --git a/rolling-shutter/medley/syncmonitor/syncmonitor.go b/rolling-shutter/keyper/syncmonitor/syncmonitor.go similarity index 100% rename from rolling-shutter/medley/syncmonitor/syncmonitor.go rename to rolling-shutter/keyper/syncmonitor/syncmonitor.go diff --git a/rolling-shutter/medley/syncmonitor/syncmonitor_test.go b/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go similarity index 83% rename from rolling-shutter/medley/syncmonitor/syncmonitor_test.go rename to rolling-shutter/keyper/syncmonitor/syncmonitor_test.go index d3f1f0be..d09586f8 100644 --- a/rolling-shutter/medley/syncmonitor/syncmonitor_test.go +++ b/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go @@ -2,6 +2,7 @@ package syncmonitor import ( "context" + "sync" "testing" "time" @@ -16,14 +17,23 @@ import ( // MockSyncState is a mock implementation of BlockSyncState for testing. type MockSyncState struct { + mu sync.Mutex blockNumber int64 err error } func (m *MockSyncState) GetSyncedBlockNumber(_ context.Context) (int64, error) { + m.mu.Lock() + defer m.mu.Unlock() return m.blockNumber, m.err } +func (m *MockSyncState) SetBlockNumber(n int64) { + m.mu.Lock() + defer m.mu.Unlock() + m.blockNumber = n +} + func setupTestData(ctx context.Context, t *testing.T, dbpool *pgxpool.Pool) { t.Helper() keyperdb := database.New(dbpool) @@ -78,6 +88,11 @@ func TestSyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) { case <-time.After(5 * time.Second): t.Fatal("expected an error, but none was returned") } + + // Verify final state + finalBlockNumber, err := mockSyncState.GetSyncedBlockNumber(ctx) + assert.NilError(t, err) + assert.Equal(t, initialBlockNumber, finalBlockNumber) } func TestSyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) { @@ -96,25 +111,30 @@ func TestSyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) { monitor := &SyncMonitor{ DBPool: dbpool, - CheckInterval: 5 * time.Second, + CheckInterval: 200 * time.Millisecond, SyncState: mockSyncState, } - _, deferFn := service.RunBackground(ctx, monitor) - defer deferFn() - - doneCh := make(chan struct{}) + monitorCtx, cancelMonitor := context.WithCancel(ctx) + errCh := make(chan error, 1) go func() { - for i := 0; i < 5; i++ { - time.Sleep(5 * time.Second) - mockSyncState.blockNumber = initialBlockNumber + int64(i+1) + if err := service.RunWithSighandler(monitorCtx, monitor); err != nil { + errCh <- err } - - doneCh <- struct{}{} }() - <-doneCh - assert.Equal(t, initialBlockNumber+5, mockSyncState.blockNumber, "block number should have been incremented correctly") + // Update block numbers more quickly + for i := 0; i < 5; i++ { + time.Sleep(200 * time.Millisecond) + mockSyncState.SetBlockNumber(initialBlockNumber + int64(i+1)) + } + + cancelMonitor() + + // Verify final state + finalBlockNumber, err := mockSyncState.GetSyncedBlockNumber(ctx) + assert.NilError(t, err) + assert.Equal(t, initialBlockNumber+5, finalBlockNumber, "block number should have been incremented correctly") } func TestSyncMonitor_SkipsWhenDKGIsRunning(t *testing.T) { @@ -165,7 +185,9 @@ func TestSyncMonitor_SkipsWhenDKGIsRunning(t *testing.T) { } // Verify the block number hasn't changed - assert.Equal(t, initialBlockNumber, mockSyncState.blockNumber, "block number should remain unchanged") + finalBlockNumber, err := mockSyncState.GetSyncedBlockNumber(ctx) + assert.NilError(t, err) + assert.Equal(t, initialBlockNumber, finalBlockNumber, "block number should remain unchanged") } func TestSyncMonitor_RunsNormallyWhenNoEons(t *testing.T) { @@ -233,6 +255,7 @@ func TestSyncMonitor_ContinuesWhenNoRows(t *testing.T) { mockSyncState := &MockSyncState{ err: pgx.ErrNoRows, } + mockSyncState.SetBlockNumber(0) // Initialize block number monitor := &SyncMonitor{ DBPool: dbpool, @@ -303,4 +326,9 @@ func TestSyncMonitor_RunsNormallyWithCompletedDKG(t *testing.T) { case <-time.After(1 * time.Second): t.Fatalf("expected monitor to throw error, but no error returned") } + + // Verify final state if needed + finalBlockNumber, err := mockSyncState.GetSyncedBlockNumber(ctx) + assert.NilError(t, err) + assert.Equal(t, initialBlockNumber, finalBlockNumber) } diff --git a/rolling-shutter/keyperimpl/gnosis/keyper.go b/rolling-shutter/keyperimpl/gnosis/keyper.go index 2aaa1be2..39126135 100644 --- a/rolling-shutter/keyperimpl/gnosis/keyper.go +++ b/rolling-shutter/keyperimpl/gnosis/keyper.go @@ -17,6 +17,7 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/epochkghandler" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/kprconfig" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/syncmonitor" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/broker" @@ -25,7 +26,6 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/db" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/slotticker" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/syncmonitor" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2p" ) diff --git a/rolling-shutter/keyperimpl/shutterservice/keyper.go b/rolling-shutter/keyperimpl/shutterservice/keyper.go index 85c0b0b3..dfaa2704 100644 --- a/rolling-shutter/keyperimpl/shutterservice/keyper.go +++ b/rolling-shutter/keyperimpl/shutterservice/keyper.go @@ -16,13 +16,13 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/epochkghandler" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/kprconfig" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/syncmonitor" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/broker" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync" syncevent "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync/event" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/db" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/syncmonitor" "github.com/shutter-network/rolling-shutter/rolling-shutter/p2p" ) From 02802f917bc368de615fcf10631ab26f984a750d Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Fri, 18 Apr 2025 18:46:38 +0530 Subject: [PATCH 4/7] update sync monitor to not check for dkg --- .../keyper/syncmonitor/syncmonitor.go | 37 ----- .../keyper/syncmonitor/syncmonitor_test.go | 137 ------------------ 2 files changed, 174 deletions(-) diff --git a/rolling-shutter/keyper/syncmonitor/syncmonitor.go b/rolling-shutter/keyper/syncmonitor/syncmonitor.go index fdfa536a..5c743587 100644 --- a/rolling-shutter/keyper/syncmonitor/syncmonitor.go +++ b/rolling-shutter/keyper/syncmonitor/syncmonitor.go @@ -65,15 +65,6 @@ func (s *SyncMonitor) runCheck( keyperdb *database.Queries, lastBlockNumber *int64, ) error { - isRunning, err := s.isDKGRunning(ctx, keyperdb) - if err != nil { - return fmt.Errorf("syncMonitor | error in isDKGRunning: %w", err) - } - if isRunning { - log.Debug().Msg("dkg is running, skipping sync monitor checks") - return nil - } - currentBlockNumber, err := s.SyncState.GetSyncedBlockNumber(ctx) if err != nil { if errors.Is(err, pgx.ErrNoRows) { @@ -96,31 +87,3 @@ func (s *SyncMonitor) runCheck( Msg("block number has not increased between checks") return ErrBlockNotIncreasing } - -func (s *SyncMonitor) isDKGRunning(ctx context.Context, keyperdb *database.Queries) (bool, error) { - // if latest eon is registered then EonStarted event has triggered, which means the dkg can start - eons, err := keyperdb.GetAllEons(ctx) - if errors.Is(err, pgx.ErrNoRows) { - return false, nil - } - if err != nil { - log.Error(). - Err(err). - Msg("syncMonitor | error getting all eons") - return false, err - } - - if len(eons) == 0 { - return false, nil - } - - // if we get no rows in getting dkg result then dkg is not completed for that eon - _, err = keyperdb.GetDKGResult(ctx, eons[len(eons)-1].Eon) - if errors.Is(err, pgx.ErrNoRows) { - return true, nil - } else if err != nil { - log.Error().Err(err).Msg("syncMonitor | error getting dkg result") - return false, err - } - return false, nil -} diff --git a/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go b/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go index d09586f8..c44a0161 100644 --- a/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go +++ b/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" "gotest.tools/assert" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database" @@ -34,24 +33,6 @@ func (m *MockSyncState) SetBlockNumber(n int64) { m.blockNumber = n } -func setupTestData(ctx context.Context, t *testing.T, dbpool *pgxpool.Pool) { - t.Helper() - keyperdb := database.New(dbpool) - - // Set up eon - err := keyperdb.InsertEon(ctx, database.InsertEonParams{ - Eon: 1, - }) - assert.NilError(t, err) - - // Set up DKG result - err = keyperdb.InsertDKGResult(ctx, database.InsertDKGResultParams{ - Eon: 1, - Success: true, - }) - assert.NilError(t, err) -} - func TestSyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -60,8 +41,6 @@ func TestSyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) { defer dbclose() initialBlockNumber := int64(100) - setupTestData(ctx, t, dbpool) - mockSyncState := &MockSyncState{ blockNumber: initialBlockNumber, } @@ -103,8 +82,6 @@ func TestSyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) { defer closeDB() initialBlockNumber := int64(100) - setupTestData(ctx, t, dbpool) - mockSyncState := &MockSyncState{ blockNumber: initialBlockNumber, } @@ -137,59 +114,6 @@ func TestSyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) { assert.Equal(t, initialBlockNumber+5, finalBlockNumber, "block number should have been incremented correctly") } -func TestSyncMonitor_SkipsWhenDKGIsRunning(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer closeDB() - keyperdb := database.New(dbpool) - - // Set up eon but no DKG result to simulate DKG running - err := keyperdb.InsertEon(ctx, database.InsertEonParams{ - Eon: 1, - }) - assert.NilError(t, err) - - initialBlockNumber := int64(100) - mockSyncState := &MockSyncState{ - blockNumber: initialBlockNumber, - } - - monitor := &SyncMonitor{ - DBPool: dbpool, - CheckInterval: 5 * time.Second, - SyncState: mockSyncState, - } - - monitorCtx, cancelMonitor := context.WithCancel(ctx) - defer cancelMonitor() - - errCh := make(chan error, 1) - go func() { - err := service.RunWithSighandler(monitorCtx, monitor) - if err != nil { - errCh <- err - } - }() - - // Let it run for a while without incrementing the block number - time.Sleep(15 * time.Second) - cancelMonitor() - - select { - case err := <-errCh: - t.Fatalf("expected monitor to continue without error, but got: %v", err) - case <-time.After(1 * time.Second): - // Test passes if no error is received - } - - // Verify the block number hasn't changed - finalBlockNumber, err := mockSyncState.GetSyncedBlockNumber(ctx) - assert.NilError(t, err) - assert.Equal(t, initialBlockNumber, finalBlockNumber, "block number should remain unchanged") -} - func TestSyncMonitor_RunsNormallyWhenNoEons(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -238,19 +162,6 @@ func TestSyncMonitor_ContinuesWhenNoRows(t *testing.T) { dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) defer closeDB() - // Set up eon and DKG result - keyperdb := database.New(dbpool) - err := keyperdb.InsertEon(ctx, database.InsertEonParams{ - Eon: 1, - }) - assert.NilError(t, err) - - err = keyperdb.InsertDKGResult(ctx, database.InsertDKGResultParams{ - Eon: 1, - Success: true, - }) - assert.NilError(t, err) - // Set up mock sync state that returns no rows error mockSyncState := &MockSyncState{ err: pgx.ErrNoRows, @@ -284,51 +195,3 @@ func TestSyncMonitor_ContinuesWhenNoRows(t *testing.T) { // Test passes if no error is received } } - -func TestSyncMonitor_RunsNormallyWithCompletedDKG(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer closeDB() - - setupTestData(ctx, t, dbpool) - - initialBlockNumber := int64(100) - mockSyncState := &MockSyncState{ - blockNumber: initialBlockNumber, - } - - monitor := &SyncMonitor{ - DBPool: dbpool, - CheckInterval: 5 * time.Second, - SyncState: mockSyncState, - } - - monitorCtx, cancelMonitor := context.WithCancel(ctx) - defer cancelMonitor() - - errCh := make(chan error, 1) - go func() { - err := service.RunWithSighandler(monitorCtx, monitor) - if err != nil { - errCh <- err - } - }() - - // Let it run for a while without incrementing the block number - time.Sleep(15 * time.Second) - cancelMonitor() - - select { - case err := <-errCh: - assert.ErrorContains(t, err, ErrBlockNotIncreasing.Error()) - case <-time.After(1 * time.Second): - t.Fatalf("expected monitor to throw error, but no error returned") - } - - // Verify final state if needed - finalBlockNumber, err := mockSyncState.GetSyncedBlockNumber(ctx) - assert.NilError(t, err) - assert.Equal(t, initialBlockNumber, finalBlockNumber) -} From e85a52bde1d6c095253411d3b7b88b24acf8f7e8 Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Fri, 18 Apr 2025 18:59:37 +0530 Subject: [PATCH 5/7] remove dbpool from generic sync monitor --- .../keyper/syncmonitor/syncmonitor.go | 7 +------ .../keyper/syncmonitor/syncmonitor_test.go | 18 ------------------ rolling-shutter/keyperimpl/gnosis/keyper.go | 1 - .../keyperimpl/shutterservice/keyper.go | 1 - 4 files changed, 1 insertion(+), 26 deletions(-) diff --git a/rolling-shutter/keyper/syncmonitor/syncmonitor.go b/rolling-shutter/keyper/syncmonitor/syncmonitor.go index 5c743587..36b97ecd 100644 --- a/rolling-shutter/keyper/syncmonitor/syncmonitor.go +++ b/rolling-shutter/keyper/syncmonitor/syncmonitor.go @@ -7,10 +7,8 @@ import ( "time" "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" "github.com/rs/zerolog/log" - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) @@ -23,7 +21,6 @@ type BlockSyncState interface { // SyncMonitor monitors the sync state of the keyper. type SyncMonitor struct { - DBPool *pgxpool.Pool CheckInterval time.Duration SyncState BlockSyncState } @@ -38,14 +35,13 @@ func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error { func (s *SyncMonitor) runMonitor(ctx context.Context) error { var lastBlockNumber int64 - keyperdb := database.New(s.DBPool) log.Debug().Msg("starting the sync monitor") for { select { case <-time.After(s.CheckInterval): - if err := s.runCheck(ctx, keyperdb, &lastBlockNumber); err != nil { + if err := s.runCheck(ctx, &lastBlockNumber); err != nil { if errors.Is(err, ErrBlockNotIncreasing) { return err } @@ -62,7 +58,6 @@ var ErrBlockNotIncreasing = errors.New("block number has not increased between c func (s *SyncMonitor) runCheck( ctx context.Context, - keyperdb *database.Queries, lastBlockNumber *int64, ) error { currentBlockNumber, err := s.SyncState.GetSyncedBlockNumber(ctx) diff --git a/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go b/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go index c44a0161..7556b0e4 100644 --- a/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go +++ b/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go @@ -9,9 +9,7 @@ import ( "github.com/jackc/pgx/v4" "gotest.tools/assert" - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/testsetup" ) // MockSyncState is a mock implementation of BlockSyncState for testing. @@ -37,16 +35,12 @@ func TestSyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dbpool, dbclose := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer dbclose() - initialBlockNumber := int64(100) mockSyncState := &MockSyncState{ blockNumber: initialBlockNumber, } monitor := &SyncMonitor{ - DBPool: dbpool, CheckInterval: 5 * time.Second, SyncState: mockSyncState, } @@ -78,16 +72,12 @@ func TestSyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer closeDB() - initialBlockNumber := int64(100) mockSyncState := &MockSyncState{ blockNumber: initialBlockNumber, } monitor := &SyncMonitor{ - DBPool: dbpool, CheckInterval: 200 * time.Millisecond, SyncState: mockSyncState, } @@ -118,16 +108,12 @@ func TestSyncMonitor_RunsNormallyWhenNoEons(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer closeDB() - initialBlockNumber := int64(100) mockSyncState := &MockSyncState{ blockNumber: initialBlockNumber, } monitor := &SyncMonitor{ - DBPool: dbpool, CheckInterval: 5 * time.Second, SyncState: mockSyncState, } @@ -159,9 +145,6 @@ func TestSyncMonitor_ContinuesWhenNoRows(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer closeDB() - // Set up mock sync state that returns no rows error mockSyncState := &MockSyncState{ err: pgx.ErrNoRows, @@ -169,7 +152,6 @@ func TestSyncMonitor_ContinuesWhenNoRows(t *testing.T) { mockSyncState.SetBlockNumber(0) // Initialize block number monitor := &SyncMonitor{ - DBPool: dbpool, CheckInterval: 5 * time.Second, SyncState: mockSyncState, } diff --git a/rolling-shutter/keyperimpl/gnosis/keyper.go b/rolling-shutter/keyperimpl/gnosis/keyper.go index 39126135..1b65667d 100644 --- a/rolling-shutter/keyperimpl/gnosis/keyper.go +++ b/rolling-shutter/keyperimpl/gnosis/keyper.go @@ -157,7 +157,6 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error { } kpr.syncMonitor = &syncmonitor.SyncMonitor{ - DBPool: kpr.dbpool, CheckInterval: time.Duration(kpr.config.Gnosis.SyncMonitorCheckInterval) * time.Second, SyncState: &GnosisSyncState{ kpr.dbpool, diff --git a/rolling-shutter/keyperimpl/shutterservice/keyper.go b/rolling-shutter/keyperimpl/shutterservice/keyper.go index dfaa2704..37cbeed1 100644 --- a/rolling-shutter/keyperimpl/shutterservice/keyper.go +++ b/rolling-shutter/keyperimpl/shutterservice/keyper.go @@ -115,7 +115,6 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error { } kpr.syncMonitor = &syncmonitor.SyncMonitor{ - DBPool: kpr.dbpool, CheckInterval: time.Duration(kpr.config.Chain.SyncMonitorCheckInterval) * time.Second, SyncState: &ShutterServiceSyncState{ kpr.dbpool, From 96d4a9f6963842f7aaabc7cfa8e9ff912d7e983b Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Tue, 6 May 2025 11:54:08 +0530 Subject: [PATCH 6/7] generic sync monitor to handle reorgs --- .../keyper/syncmonitor/syncmonitor.go | 4 +- .../keyper/syncmonitor/syncmonitor_test.go | 43 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/rolling-shutter/keyper/syncmonitor/syncmonitor.go b/rolling-shutter/keyper/syncmonitor/syncmonitor.go index 36b97ecd..40f86dca 100644 --- a/rolling-shutter/keyper/syncmonitor/syncmonitor.go +++ b/rolling-shutter/keyper/syncmonitor/syncmonitor.go @@ -71,7 +71,9 @@ func (s *SyncMonitor) runCheck( log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number") - if currentBlockNumber > *lastBlockNumber { + // if the current block number < last block number, this means a reorg is detected, so we do not throw error + // if the current block number > last block number, then syncing is working as expected + if currentBlockNumber != *lastBlockNumber { *lastBlockNumber = currentBlockNumber return nil } diff --git a/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go b/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go index 7556b0e4..a8c413ff 100644 --- a/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go +++ b/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go @@ -177,3 +177,46 @@ func TestSyncMonitor_ContinuesWhenNoRows(t *testing.T) { // Test passes if no error is received } } + +func TestSyncMonitor_HandlesReorg(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set up mock sync state that returns no rows error + mockSyncState := &MockSyncState{} + mockSyncState.SetBlockNumber(0) // Initialize block number + + monitor := &SyncMonitor{ + CheckInterval: 5 * time.Second, + SyncState: mockSyncState, + } + + monitorCtx, cancelMonitor := context.WithCancel(ctx) + defer cancelMonitor() + + errCh := make(chan error, 1) + go func() { + err := service.RunWithSighandler(monitorCtx, monitor) + if err != nil { + errCh <- err + } + }() + + // Decrease the block number + decreasedBlockNumber := int64(50) + mockSyncState.SetBlockNumber(decreasedBlockNumber) + + time.Sleep(4 * time.Second) + cancelMonitor() + + select { + case err := <-errCh: + t.Fatalf("expected monitor to continue without error, but got: %v", err) + case <-time.After(1 * time.Second): + } + + // Verify the block number was updated to the latest value + syncedData, err := mockSyncState.GetSyncedBlockNumber(ctx) + assert.NilError(t, err) + assert.Equal(t, decreasedBlockNumber, syncedData, "block number should be updated to the decreased value") +} From 242a638668e186211f2902619c9df3b1fc598af5 Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Tue, 3 Jun 2025 12:02:49 +0530 Subject: [PATCH 7/7] update debug log for sync monitor to include last block number --- rolling-shutter/keyper/syncmonitor/syncmonitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rolling-shutter/keyper/syncmonitor/syncmonitor.go b/rolling-shutter/keyper/syncmonitor/syncmonitor.go index 40f86dca..85324cf6 100644 --- a/rolling-shutter/keyper/syncmonitor/syncmonitor.go +++ b/rolling-shutter/keyper/syncmonitor/syncmonitor.go @@ -69,7 +69,7 @@ func (s *SyncMonitor) runCheck( return fmt.Errorf("error getting synced block number: %w", err) } - log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number") + log.Debug().Int64("current-block-number", currentBlockNumber).Int64("last-block-number", *lastBlockNumber).Msg("current block number") // if the current block number < last block number, this means a reorg is detected, so we do not throw error // if the current block number > last block number, then syncing is working as expected