diff --git a/rolling-shutter/keyperimpl/shutterservice/syncmonitor.go b/rolling-shutter/keyper/syncmonitor/syncmonitor.go similarity index 65% rename from rolling-shutter/keyperimpl/shutterservice/syncmonitor.go rename to rolling-shutter/keyper/syncmonitor/syncmonitor.go index 9dd5694a..85324cf6 100644 --- a/rolling-shutter/keyperimpl/shutterservice/syncmonitor.go +++ b/rolling-shutter/keyper/syncmonitor/syncmonitor.go @@ -1,23 +1,28 @@ -package shutterservice +package syncmonitor import ( "context" + "errors" "fmt" "time" "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" - "github.com/pkg/errors" "github.com/rs/zerolog/log" - keyperDB "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database" - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) +// BlockSyncState is an interface that different keyper implementations +// can implement to provide their own block sync state logic. +type BlockSyncState interface { + // GetSyncedBlockNumber retrieves the current synced block number. + GetSyncedBlockNumber(ctx context.Context) (int64, error) +} + +// SyncMonitor monitors the sync state of the keyper. type SyncMonitor struct { - DBPool *pgxpool.Pool CheckInterval time.Duration + SyncState BlockSyncState } func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error { @@ -30,15 +35,13 @@ func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error { func (s *SyncMonitor) runMonitor(ctx context.Context) error { var lastBlockNumber int64 - db := database.New(s.DBPool) - keyperdb := keyperDB.New(s.DBPool) log.Debug().Msg("starting the sync monitor") for { select { case <-time.After(s.CheckInterval): - if err := s.runCheck(ctx, db, keyperdb, &lastBlockNumber); err != nil { + if err := s.runCheck(ctx, &lastBlockNumber); err != nil { if errors.Is(err, ErrBlockNotIncreasing) { return err } @@ -55,21 +58,18 @@ var ErrBlockNotIncreasing = errors.New("block number has not increased between c func (s *SyncMonitor) runCheck( ctx context.Context, - db *database.Queries, - keyperdb *keyperDB.Queries, lastBlockNumber *int64, ) error { - record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx) + currentBlockNumber, err := s.SyncState.GetSyncedBlockNumber(ctx) if err != nil { if errors.Is(err, pgx.ErrNoRows) { - log.Warn().Err(err).Msg("no rows found in table identity_registered_events_synced_until") + log.Warn().Err(err).Msg("no rows found in sync state table") return nil // This is not an error condition that should stop monitoring } - return fmt.Errorf("error getting identity_registered_events_synced_until: %w", err) + return fmt.Errorf("error getting synced block number: %w", err) } - currentBlockNumber := record.BlockNumber - log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number") + log.Debug().Int64("current-block-number", currentBlockNumber).Int64("last-block-number", *lastBlockNumber).Msg("current block number") // if the current block number < last block number, this means a reorg is detected, so we do not throw error // if the current block number > last block number, then syncing is working as expected diff --git a/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go b/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go new file mode 100644 index 00000000..a8c413ff --- /dev/null +++ b/rolling-shutter/keyper/syncmonitor/syncmonitor_test.go @@ -0,0 +1,222 @@ +package syncmonitor + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/jackc/pgx/v4" + "gotest.tools/assert" + + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" +) + +// MockSyncState is a mock implementation of BlockSyncState for testing. +type MockSyncState struct { + mu sync.Mutex + blockNumber int64 + err error +} + +func (m *MockSyncState) GetSyncedBlockNumber(_ context.Context) (int64, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.blockNumber, m.err +} + +func (m *MockSyncState) SetBlockNumber(n int64) { + m.mu.Lock() + defer m.mu.Unlock() + m.blockNumber = n +} + +func TestSyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + initialBlockNumber := int64(100) + mockSyncState := &MockSyncState{ + blockNumber: initialBlockNumber, + } + + monitor := &SyncMonitor{ + CheckInterval: 5 * time.Second, + SyncState: mockSyncState, + } + + errCh := make(chan error, 1) + go func() { + err := service.RunWithSighandler(ctx, monitor) + if err != nil { + errCh <- err + } + }() + + time.Sleep(12 * time.Second) + + select { + case err := <-errCh: + assert.ErrorContains(t, err, ErrBlockNotIncreasing.Error()) + case <-time.After(5 * time.Second): + t.Fatal("expected an error, but none was returned") + } + + // Verify final state + finalBlockNumber, err := mockSyncState.GetSyncedBlockNumber(ctx) + assert.NilError(t, err) + assert.Equal(t, initialBlockNumber, finalBlockNumber) +} + +func TestSyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + initialBlockNumber := int64(100) + mockSyncState := &MockSyncState{ + blockNumber: initialBlockNumber, + } + + monitor := &SyncMonitor{ + CheckInterval: 200 * time.Millisecond, + SyncState: mockSyncState, + } + + monitorCtx, cancelMonitor := context.WithCancel(ctx) + errCh := make(chan error, 1) + go func() { + if err := service.RunWithSighandler(monitorCtx, monitor); err != nil { + errCh <- err + } + }() + + // Update block numbers more quickly + for i := 0; i < 5; i++ { + time.Sleep(200 * time.Millisecond) + mockSyncState.SetBlockNumber(initialBlockNumber + int64(i+1)) + } + + cancelMonitor() + + // Verify final state + finalBlockNumber, err := mockSyncState.GetSyncedBlockNumber(ctx) + assert.NilError(t, err) + assert.Equal(t, initialBlockNumber+5, finalBlockNumber, "block number should have been incremented correctly") +} + +func TestSyncMonitor_RunsNormallyWhenNoEons(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + initialBlockNumber := int64(100) + mockSyncState := &MockSyncState{ + blockNumber: initialBlockNumber, + } + + monitor := &SyncMonitor{ + CheckInterval: 5 * time.Second, + SyncState: mockSyncState, + } + + monitorCtx, cancelMonitor := context.WithCancel(ctx) + defer cancelMonitor() + + errCh := make(chan error, 1) + go func() { + err := service.RunWithSighandler(monitorCtx, monitor) + if err != nil { + errCh <- err + } + }() + + // Let it run for a while without incrementing the block number + time.Sleep(15 * time.Second) + cancelMonitor() + + select { + case err := <-errCh: + assert.ErrorContains(t, err, ErrBlockNotIncreasing.Error()) + case <-time.After(1 * time.Second): + t.Fatalf("expected monitor to throw error, but no error returned") + } +} + +func TestSyncMonitor_ContinuesWhenNoRows(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set up mock sync state that returns no rows error + mockSyncState := &MockSyncState{ + err: pgx.ErrNoRows, + } + mockSyncState.SetBlockNumber(0) // Initialize block number + + monitor := &SyncMonitor{ + CheckInterval: 5 * time.Second, + SyncState: mockSyncState, + } + + monitorCtx, cancelMonitor := context.WithCancel(ctx) + defer cancelMonitor() + + errCh := make(chan error, 1) + go func() { + err := service.RunWithSighandler(monitorCtx, monitor) + if err != nil { + errCh <- err + } + }() + + time.Sleep(15 * time.Second) + cancelMonitor() + + select { + case err := <-errCh: + t.Fatalf("expected monitor to continue without error, but got: %v", err) + case <-time.After(1 * time.Second): + // Test passes if no error is received + } +} + +func TestSyncMonitor_HandlesReorg(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set up mock sync state that returns no rows error + mockSyncState := &MockSyncState{} + mockSyncState.SetBlockNumber(0) // Initialize block number + + monitor := &SyncMonitor{ + CheckInterval: 5 * time.Second, + SyncState: mockSyncState, + } + + monitorCtx, cancelMonitor := context.WithCancel(ctx) + defer cancelMonitor() + + errCh := make(chan error, 1) + go func() { + err := service.RunWithSighandler(monitorCtx, monitor) + if err != nil { + errCh <- err + } + }() + + // Decrease the block number + decreasedBlockNumber := int64(50) + mockSyncState.SetBlockNumber(decreasedBlockNumber) + + time.Sleep(4 * time.Second) + cancelMonitor() + + select { + case err := <-errCh: + t.Fatalf("expected monitor to continue without error, but got: %v", err) + case <-time.After(1 * time.Second): + } + + // Verify the block number was updated to the latest value + syncedData, err := mockSyncState.GetSyncedBlockNumber(ctx) + assert.NilError(t, err) + assert.Equal(t, decreasedBlockNumber, syncedData, "block number should be updated to the decreased value") +} diff --git a/rolling-shutter/keyperimpl/gnosis/keyper.go b/rolling-shutter/keyperimpl/gnosis/keyper.go index bff1dfb7..897c3657 100644 --- a/rolling-shutter/keyperimpl/gnosis/keyper.go +++ b/rolling-shutter/keyperimpl/gnosis/keyper.go @@ -17,6 +17,7 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/epochkghandler" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/kprconfig" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/syncmonitor" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/broker" @@ -49,7 +50,7 @@ type Keyper struct { validatorSyncer *ValidatorSyncer eonKeyPublisher *eonkeypublisher.EonKeyPublisher latestTriggeredSlot *uint64 - syncMonitor *SyncMonitor + syncMonitor *syncmonitor.SyncMonitor // input events newBlocks chan *syncevent.LatestBlock @@ -63,8 +64,7 @@ type Keyper struct { func New(c *Config) *Keyper { return &Keyper{ - config: c, - syncMonitor: &SyncMonitor{}, + config: c, } } @@ -156,9 +156,11 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error { return errors.Wrap(err, "failed to reset transaction pointer age") } - kpr.syncMonitor = &SyncMonitor{ - DBPool: kpr.dbpool, + kpr.syncMonitor = &syncmonitor.SyncMonitor{ CheckInterval: time.Duration(kpr.config.Gnosis.SyncMonitorCheckInterval) * time.Second, + SyncState: &GnosisSyncState{ + kpr.dbpool, + }, } runner.Go(func() error { return kpr.processInputs(ctx) }) diff --git a/rolling-shutter/keyperimpl/gnosis/syncmonitor.go b/rolling-shutter/keyperimpl/gnosis/syncmonitor.go deleted file mode 100644 index 5f47cab0..00000000 --- a/rolling-shutter/keyperimpl/gnosis/syncmonitor.go +++ /dev/null @@ -1,64 +0,0 @@ -package gnosis - -import ( - "context" - "time" - - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" - - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" -) - -type SyncMonitor struct { - DBPool *pgxpool.Pool - CheckInterval time.Duration -} - -func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error { - runner.Go(func() error { - return s.runMonitor(ctx) - }) - - return nil -} - -func (s *SyncMonitor) runMonitor(ctx context.Context) error { - var lastBlockNumber int64 - db := database.New(s.DBPool) - - log.Debug().Msg("starting the sync monitor") - - for { - select { - case <-time.After(s.CheckInterval): - record, err := db.GetTransactionSubmittedEventsSyncedUntil(ctx) - if err != nil { - if errors.Is(err, pgx.ErrNoRows) { - log.Warn().Err(err).Msg("no rows found in table transaction_submitted_events_synced_until") - continue - } - return errors.Wrap(err, "error getting transaction_submitted_events_synced_until") - } - - currentBlockNumber := record.BlockNumber - log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number") - - if currentBlockNumber > lastBlockNumber { - lastBlockNumber = currentBlockNumber - } else { - log.Error(). - Int64("last-block-number", lastBlockNumber). - Int64("current-block-number", currentBlockNumber). - Msg("block number has not increased between checks") - return errors.New("block number has not increased between checks") - } - case <-ctx.Done(): - log.Info().Msg("stopping syncMonitor due to context cancellation") - return ctx.Err() - } - } -} diff --git a/rolling-shutter/keyperimpl/gnosis/syncmonitor_test.go b/rolling-shutter/keyperimpl/gnosis/syncmonitor_test.go deleted file mode 100644 index 154d13e2..00000000 --- a/rolling-shutter/keyperimpl/gnosis/syncmonitor_test.go +++ /dev/null @@ -1,146 +0,0 @@ -package gnosis_test - -import ( - "context" - "testing" - "time" - - "gotest.tools/assert" - - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis" - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/testsetup" -) - -func TestSyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbpool, dbclose := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer dbclose() - db := database.New(dbpool) - - initialBlockNumber := int64(100) - - err := db.SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: initialBlockNumber, - Slot: 1, - }) - if err != nil { - t.Fatalf("failed to set initial synced data: %v", err) - } - - monitor := &gnosis.SyncMonitor{ - DBPool: dbpool, - CheckInterval: 5 * time.Second, - } - - errCh := make(chan error, 1) - - go func() { - err := service.RunWithSighandler(ctx, monitor) - if err != nil { - errCh <- err - } - }() - - time.Sleep(12 * time.Second) - - select { - case err := <-errCh: - assert.ErrorContains(t, err, "block number has not increased between checks") - case <-time.After(5 * time.Second): - t.Fatal("expected an error, but none was returned") - } -} - -func TestSyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer closeDB() - db := database.New(dbpool) - - initialBlockNumber := int64(100) - err := db.SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: initialBlockNumber, - Slot: 1, - }) - if err != nil { - t.Fatalf("failed to set initial synced data: %v", err) - } - - monitor := &gnosis.SyncMonitor{ - DBPool: dbpool, - CheckInterval: 5 * time.Second, - } - - _, deferFn := service.RunBackground(ctx, monitor) - defer deferFn() - - doneCh := make(chan struct{}) - go func() { - for i := 0; i < 5; i++ { - newBlockNumber := initialBlockNumber + int64(i+1) - err := db.SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: newBlockNumber, - Slot: 1, - }) - if err != nil { - t.Errorf("failed to update block number: %v", err) - return - } - - time.Sleep(5 * time.Second) - } - - doneCh <- struct{}{} - }() - - <-doneCh - syncedData, err := db.GetTransactionSubmittedEventsSyncedUntil(ctx) - if err != nil { - t.Fatalf("failed to retrieve final block number: %v", err) - } - - assert.Equal(t, initialBlockNumber+5, syncedData.BlockNumber, "block number should have been incremented correctly") -} - -func TestSyncMonitor_ContinuesWhenNoRows(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer closeDB() - _ = database.New(dbpool) - - monitor := &gnosis.SyncMonitor{ - DBPool: dbpool, - CheckInterval: 5 * time.Second, - } - - monitorCtx, cancelMonitor := context.WithCancel(ctx) - defer cancelMonitor() - - errCh := make(chan error, 1) - go func() { - err := service.RunWithSighandler(monitorCtx, monitor) - if err != nil { - errCh <- err - } - }() - - time.Sleep(15 * time.Second) - cancelMonitor() - - select { - case err := <-errCh: - t.Fatalf("expected monitor to continue without error, but got: %v", err) - case <-time.After(1 * time.Second): - } -} diff --git a/rolling-shutter/keyperimpl/gnosis/syncstate.go b/rolling-shutter/keyperimpl/gnosis/syncstate.go new file mode 100644 index 00000000..5fcbb8a7 --- /dev/null +++ b/rolling-shutter/keyperimpl/gnosis/syncstate.go @@ -0,0 +1,24 @@ +package gnosis + +import ( + "context" + + "github.com/jackc/pgx/v4/pgxpool" + + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database" +) + +// GnosisSyncState implements the BlockSyncState interface for the gnosis keyper. +type GnosisSyncState struct { + DBPool *pgxpool.Pool +} + +// GetSyncedBlockNumber retrieves the current synced block number from transaction submitted events. +func (s *GnosisSyncState) GetSyncedBlockNumber(ctx context.Context) (int64, error) { + db := database.New(s.DBPool) + record, err := db.GetTransactionSubmittedEventsSyncedUntil(ctx) + if err != nil { + return 0, err + } + return record.BlockNumber, nil +} diff --git a/rolling-shutter/keyperimpl/shutterservice/keyper.go b/rolling-shutter/keyperimpl/shutterservice/keyper.go index 5d847e5b..95115e92 100644 --- a/rolling-shutter/keyperimpl/shutterservice/keyper.go +++ b/rolling-shutter/keyperimpl/shutterservice/keyper.go @@ -16,6 +16,7 @@ import ( "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/epochkghandler" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/kprconfig" + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/syncmonitor" "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/broker" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/chainsync" @@ -36,7 +37,7 @@ type Keyper struct { registrySyncer *RegistrySyncer eonKeyPublisher *eonkeypublisher.EonKeyPublisher latestTriggeredTime *uint64 - syncMonitor *SyncMonitor + syncMonitor *syncmonitor.SyncMonitor // input events newBlocks chan *syncevent.LatestBlock @@ -113,10 +114,13 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error { return err } - kpr.syncMonitor = &SyncMonitor{ - DBPool: kpr.dbpool, + kpr.syncMonitor = &syncmonitor.SyncMonitor{ CheckInterval: time.Duration(kpr.config.Chain.SyncMonitorCheckInterval) * time.Second, + SyncState: &ShutterServiceSyncState{ + kpr.dbpool, + }, } + runner.Go(func() error { return kpr.processInputs(ctx) }) return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.eonKeyPublisher, kpr.syncMonitor) } diff --git a/rolling-shutter/keyperimpl/shutterservice/syncmonitor_test.go b/rolling-shutter/keyperimpl/shutterservice/syncmonitor_test.go deleted file mode 100644 index 5a65d9c5..00000000 --- a/rolling-shutter/keyperimpl/shutterservice/syncmonitor_test.go +++ /dev/null @@ -1,191 +0,0 @@ -package shutterservice_test - -import ( - "context" - "testing" - "time" - - "github.com/jackc/pgx/v4/pgxpool" - "gotest.tools/assert" - - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice" - "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/testsetup" -) - -func setupTestData(ctx context.Context, t *testing.T, dbpool *pgxpool.Pool, blockNumber int64) { - t.Helper() - db := database.New(dbpool) - - // Set up initial block - err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: blockNumber, - }) - assert.NilError(t, err) -} - -func TestAPISyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbpool, dbclose := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer dbclose() - - initialBlockNumber := int64(100) - setupTestData(ctx, t, dbpool, initialBlockNumber) - - monitor := &shutterservice.SyncMonitor{ - DBPool: dbpool, - CheckInterval: 5 * time.Second, - } - - errCh := make(chan error, 1) - go func() { - err := service.RunWithSighandler(ctx, monitor) - if err != nil { - errCh <- err - } - }() - - time.Sleep(12 * time.Second) - - select { - case err := <-errCh: - assert.ErrorContains(t, err, shutterservice.ErrBlockNotIncreasing.Error()) - case <-time.After(5 * time.Second): - t.Fatal("expected an error, but none was returned") - } -} - -func TestAPISyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer closeDB() - db := database.New(dbpool) - - initialBlockNumber := int64(100) - setupTestData(ctx, t, dbpool, initialBlockNumber) - - monitor := &shutterservice.SyncMonitor{ - DBPool: dbpool, - CheckInterval: 5 * time.Second, - } - - _, deferFn := service.RunBackground(ctx, monitor) - defer deferFn() - - doneCh := make(chan struct{}) - go func() { - for i := 0; i < 5; i++ { - newBlockNumber := initialBlockNumber + int64(i+1) - err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: newBlockNumber, - }) - if err != nil { - t.Errorf("failed to update block number: %v", err) - return - } - - time.Sleep(5 * time.Second) - } - - doneCh <- struct{}{} - }() - - <-doneCh - syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx) - if err != nil { - t.Fatalf("failed to retrieve final block number: %v", err) - } - - assert.Equal(t, initialBlockNumber+5, syncedData.BlockNumber, "block number should have been incremented correctly") -} - -func TestAPISyncMonitor_ContinuesWhenNoRows(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer closeDB() - - monitor := &shutterservice.SyncMonitor{ - DBPool: dbpool, - CheckInterval: 5 * time.Second, - } - - monitorCtx, cancelMonitor := context.WithCancel(ctx) - defer cancelMonitor() - - errCh := make(chan error, 1) - go func() { - err := service.RunWithSighandler(monitorCtx, monitor) - if err != nil { - errCh <- err - } - }() - - time.Sleep(15 * time.Second) - cancelMonitor() - - select { - case err := <-errCh: - t.Fatalf("expected monitor to continue without error, but got: %v", err) - case <-time.After(1 * time.Second): - } -} - -func TestAPISyncMonitor_HandlesReorg(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition) - defer closeDB() - db := database.New(dbpool) - - // Set up initial block at a higher number - initialBlockNumber := int64(100) - setupTestData(ctx, t, dbpool, initialBlockNumber) - - monitor := &shutterservice.SyncMonitor{ - DBPool: dbpool, - CheckInterval: 5 * time.Second, - } - - monitorCtx, cancelMonitor := context.WithCancel(ctx) - defer cancelMonitor() - - errCh := make(chan error, 1) - go func() { - err := service.RunWithSighandler(monitorCtx, monitor) - if err != nil { - errCh <- err - } - }() - - // Decrease the block number - decreasedBlockNumber := int64(50) - err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{ - BlockHash: []byte{0x01, 0x02, 0x03}, - BlockNumber: decreasedBlockNumber, - }) - assert.NilError(t, err) - - time.Sleep(4 * time.Second) - cancelMonitor() - - select { - case err := <-errCh: - t.Fatalf("expected monitor to continue without error, but got: %v", err) - case <-time.After(1 * time.Second): - } - - // Verify the block number was updated to the latest value - syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx) - assert.NilError(t, err) - assert.Equal(t, decreasedBlockNumber, syncedData.BlockNumber, "block number should be updated to the decreased value") -} diff --git a/rolling-shutter/keyperimpl/shutterservice/syncstate.go b/rolling-shutter/keyperimpl/shutterservice/syncstate.go new file mode 100644 index 00000000..17f24b4f --- /dev/null +++ b/rolling-shutter/keyperimpl/shutterservice/syncstate.go @@ -0,0 +1,24 @@ +package shutterservice + +import ( + "context" + + "github.com/jackc/pgx/v4/pgxpool" + + "github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database" +) + +// ShutterServiceSyncState implements the BlockSyncState interface for the shutter service. +type ShutterServiceSyncState struct { + DBPool *pgxpool.Pool +} + +// GetSyncedBlockNumber retrieves the current synced block number from identity events. +func (s *ShutterServiceSyncState) GetSyncedBlockNumber(ctx context.Context) (int64, error) { + db := database.New(s.DBPool) + record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx) + if err != nil { + return 0, err + } + return record.BlockNumber, nil +}