Skip to content

Commit

Permalink
Release/v0.0.4 (#385)
Browse files Browse the repository at this point in the history
* fix retry synchronization after an error (#371)

* fix retry synchronization after an error

* synchronizer test

* create index before delete duplicates (#379)

* create index before delete duplicates

* fix tests

* fix gasprice (#384)

* fix gasprice

* fix e2e tests

* log

* release --clean

---------

Co-authored-by: Cool Developer <125276287+C001-developer@users.noreply.github.com>
  • Loading branch information
ARR552 and jrs-engineer authored Apr 28, 2023
1 parent 3bd8389 commit 32a528a
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 212 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ jobs:
uses: goreleaser/goreleaser-action@v2
with:
version: latest
args: release --rm-dist
args: release --clean
env:
GITHUB_TOKEN: ${{ secrets.TOKEN_RELEASE }}
13 changes: 11 additions & 2 deletions claimtxman/claimtxman.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (tm *ClaimTxManager) updateDepositsStatus(ger *etherman.GlobalExitRoot) err
}
if ger.BlockID != 0 { // L2 exit root is updated
log.Infof("Rollup exitroot %v is updated", ger.ExitRoots[1])
// TODO Include networkID as input in UpdateL2DepositsStatus to handle multiple networkIDs in the query
if err := tm.storage.UpdateL2DepositsStatus(tm.ctx, ger.ExitRoots[1][:], dbTx); err != nil {
return err
}
Expand Down Expand Up @@ -300,7 +301,7 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error {
// tx infinitely
if allHistoryTxMined && len(mTx.History) >= maxHistorySize {
mTx.Status = ctmtypes.MonitoredTxStatusFailed
mTxLog.Infof("marked as failed because reached the history size limit")
mTxLog.Infof("marked as failed because reached the history size limit (%d)", maxHistorySize)
// update monitored tx changes into storage
err = tm.storage.UpdateClaimTx(ctx, mTx, dbTx)
if err != nil {
Expand All @@ -325,6 +326,14 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error {
}
}

// GasPrice is set here to use always the proper and most accurate value right before sending it to L2
gasPrice, err := tm.l2Node.SuggestGasPrice(ctx)
if err != nil {
mTxLog.Errorf("failed to get suggested gasPrice. Error: %v", err)
continue
}
mTx.GasPrice = gasPrice

var signedTx *types.Transaction
// rebuild transaction
tx := mTx.Tx()
Expand Down Expand Up @@ -353,7 +362,7 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error {
err := tm.l2Node.SendTransaction(ctx, signedTx)
if err != nil {
if strings.Contains(err.Error(), "nonce") {
mTxLog.Infof("nonce error detected, resetting nonce cache")
mTxLog.Infof("nonce error detected, resetting nonce cache. Nonce used: %d", signedTx.Nonce())
tm.nonceCache.Remove(mTx.From.Hex())
}
mTx.RemoveHistory(signedTx)
Expand Down
14 changes: 9 additions & 5 deletions claimtxman/types/monitoredtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type MonitoredTx struct {
// Gas is a tx gas
Gas uint64

// GasPrice is the tx gas price
GasPrice *big.Int

// Status of this monitoring
Status MonitoredTxStatus

Expand All @@ -83,11 +86,12 @@ type MonitoredTx struct {
// Tx uses the current information to build a tx
func (mTx MonitoredTx) Tx() *types.Transaction {
tx := types.NewTx(&types.LegacyTx{
To: mTx.To,
Nonce: mTx.Nonce,
Value: mTx.Value,
Data: mTx.Data,
Gas: mTx.Gas,
To: mTx.To,
Nonce: mTx.Nonce,
Value: mTx.Value,
Data: mTx.Data,
Gas: mTx.Gas,
GasPrice: mTx.GasPrice,
})

return tx
Expand Down
13 changes: 9 additions & 4 deletions db/pgstorage/migrations/0003.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ $$ LANGUAGE plpgsql;

UPDATE mt.root SET deposit_cnt = deposit_cnt + 1;

DROP INDEX IF EXISTS mt.rht_key_idx;

-- +migrate Up

ALTER TABLE mt.rht DROP COLUMN IF EXISTS root_id;
Expand All @@ -54,10 +56,13 @@ ALTER TABLE mt.rht ALTER COLUMN deposit_id DROP DEFAULT;

UPDATE mt.root SET deposit_cnt = deposit_cnt - 1;

DELETE FROM mt.rht a
WHERE a.ctid <> (SELECT min(b.ctid)
FROM mt.rht b
WHERE a.key = b.key);
-- Create indexes
CREATE INDEX IF NOT EXISTS rht_key_idx ON mt.rht(key);

-- Delete duplicates
CREATE TABLE mt.rht_temp AS (SELECT key, min(value), max(deposit_id) FROM mt.rht GROUP BY key HAVING count(key) > 1);
DELETE FROM mt.rht where key in (select key FROM mt.rht_temp);
INSERT INTO mt.rht(key, value, deposit_id) (SELECT b.key, b.min, b.max FROM mt.rht_temp b);

-- +migrate StatementBegin
DO $$
Expand Down
23 changes: 23 additions & 0 deletions db/pgstorage/migrations/0003_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ func (m migrationTest0003) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB)
assert.NoError(t, err)
assert.Equal(t, maxDepositCnt, 1)
assert.Equal(t, rootCount, 2)

indexes := []string{"rht_key_idx"}

// Check indexes adding
for _, idx := range indexes {
// getIndex
const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;`
row := db.QueryRow(getIndex, idx)
var result int
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 1, result)
}
}

func (m migrationTest0003) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) {
Expand All @@ -94,6 +106,17 @@ func (m migrationTest0003) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB
assert.NoError(t, err)
assert.Equal(t, maxDepositCnt, 2)
assert.Equal(t, rootCount, 2)

indexes := []string{"rht_key_idx"}
// Check indexes removing
for _, idx := range indexes {
// getIndex
const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;`
row := db.QueryRow(getIndex, idx)
var result int
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 0, result)
}
}

func TestMigration0003(t *testing.T) {
Expand Down
5 changes: 0 additions & 5 deletions db/pgstorage/migrations/0004.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ ALTER TABLE sync.deposit DROP COLUMN ready_for_claim;

ALTER TABLE sync.block DROP CONSTRAINT block_hash_unique;

DROP INDEX IF EXISTS mt.rht_key_idx;

-- +migrate Up

ALTER TABLE
Expand Down Expand Up @@ -88,6 +86,3 @@ WHERE
AND network = 1
)
AND network_id != 0;

-- Create indexes
CREATE INDEX IF NOT EXISTS rht_key_idx ON mt.rht(key);
31 changes: 5 additions & 26 deletions db/pgstorage/migrations/0004_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ func (m migrationTest0004) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB)
common.FromHex("0xa4bfa0908dc7b06d98da4309f859023d6947561bc19bc00d77f763dea1a0b9f5"),
[]byte{}).Scan(&depositID)
assert.NoError(t, err)
// Insert a new root
const addRoot = "INSERT INTO mt.root (root, deposit_cnt, network, deposit_id) VALUES ($1, $2, $3, $4)"
_, err = db.Exec(addRoot, common.FromHex("0x5a2dce0a8a7f68bb74560f8f71837c2c2ebbcbf7fffb42ae1896f13f7c7479a0"), 1, 0, depositID)
assert.NoError(t, err)
// Insert a new node to the rht table
const addNode = "INSERT INTO mt.rht (key, value, deposit_id) VALUES ($1, $2, $3)"
_, err = db.Exec(addNode, common.FromHex("0x5a2dce0a8a7f68bb74560f8f71837c2c2ebbcbf7fffb42ae1896f13f7c7479a0"), [][]byte{
Expand Down Expand Up @@ -85,31 +81,14 @@ func (m migrationTest0004) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB)
common.FromHex("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5"), 0, time.Now()).Scan(&blockID)
assert.NoError(t, err)
assert.Equal(t, blockID, uint64(1))

indexes := []string{"rht_key_idx"}

// Check indexes adding
for _, idx := range indexes {
// getIndex
const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;`
row := db.QueryRow(getIndex, idx)
var result int
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 1, result)
}
}

func (m migrationTest0004) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) {
indexes := []string{"rht_key_idx"}
// Check indexes removing
for _, idx := range indexes {
// getIndex
const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;`
row := db.QueryRow(getIndex, idx)
var result int
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 0, result)
}
// Insert a monitored tx
_, err := db.Exec(`INSERT INTO sync.monitored_txs
(id, block_id, from_addr, to_addr, nonce, value, data, gas, status, history, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`, 0, 1, common.FromHex("0x6B175474E89094C44Da98b954EedeAC495271d0F"), common.FromHex("0x6B175474E89094C44Da98b954EedeAC495271d0F"), 1, "10000", []byte{}, 5000000, "crerated", nil, time.Now(), time.Now())
assert.Error(t, err)
}

func TestMigration0004(t *testing.T) {
Expand Down
10 changes: 8 additions & 2 deletions synchronizer/mock_storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 5 additions & 18 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,7 @@ func (s *ClientSynchronizer) Sync() error {
// If there is no lastEthereumBlock means that sync from the beginning is necessary. If not, it continues from the retrieved ethereum block
// Get the latest synced block. If there is no block on db, use genesis block
log.Infof("NetworkID: %d, Synchronization started", s.networkID)
dbTx, err := s.storage.BeginDBTransaction(s.ctx)
if err != nil {
log.Fatalf("networkID: %d, error creating db transaction to get latest block", s.networkID)
}
lastBlockSynced, err := s.storage.GetLastBlock(s.ctx, s.networkID, dbTx)
lastBlockSynced, err := s.storage.GetLastBlock(s.ctx, s.networkID, nil)
if err != nil {
if err == gerror.ErrStorageNotFound {
log.Warnf("networkID: %d, error getting the latest ethereum block. No data stored. Setting genesis block. Error: %w", s.networkID, err)
Expand All @@ -113,16 +109,7 @@ func (s *ClientSynchronizer) Sync() error {
log.Fatalf("networkID: %d, unexpected error getting the latest block. Error: %s", s.networkID, err.Error())
}
}
err = s.storage.Commit(s.ctx, dbTx)
if err != nil {
log.Errorf("networkID: %d, error committing dbTx, err: %s", s.networkID, err.Error())
rollbackErr := s.storage.Rollback(s.ctx, dbTx)
if rollbackErr != nil {
log.Fatalf("networkID: %d, error rolling back state. RollbackErr: %s, err: %s",
s.networkID, rollbackErr.Error(), err.Error())
}
log.Fatalf("networkID: %d, error committing dbTx, err: %s", s.networkID, err.Error())
}
log.Debugf("NetworkID: %d, initial lastBlockSynced: %+v", s.networkID, lastBlockSynced)
for {
select {
case <-s.ctx.Done():
Expand All @@ -132,7 +119,7 @@ func (s *ClientSynchronizer) Sync() error {
//Sync L1Blocks
if lastBlockSynced, err = s.syncBlocks(lastBlockSynced); err != nil {
log.Warn("error syncing blocks: ", err)
lastBlockSynced, err = s.storage.GetLastBlock(s.ctx, s.networkID, dbTx)
lastBlockSynced, err = s.storage.GetLastBlock(s.ctx, s.networkID, nil)
if err != nil {
log.Fatal("error getting lastBlockSynced to resume the synchronization... Error: ", err)
}
Expand Down Expand Up @@ -310,7 +297,7 @@ func (s *ClientSynchronizer) processBlockRange(blocks []etherman.Block, order ma
}
// Add block information
blocks[i].NetworkID = s.networkID
log.Infof("NetworkID: %d. Syncing block: %d", s.networkID, &blocks[i].BlockNumber)
log.Infof("NetworkID: %d. Syncing block: %d", s.networkID, blocks[i].BlockNumber)
blockID, err := s.storage.AddBlock(s.ctx, &blocks[i], dbTx)
if err != nil {
log.Errorf("networkID: %d, error storing block. BlockNumber: %d, error: %v", s.networkID, blocks[i].BlockNumber, err)
Expand Down Expand Up @@ -524,7 +511,7 @@ func (s *ClientSynchronizer) checkTrustedState(batch etherman.Batch, dbTx pgx.Tx
batch.Coinbase == tBatch.Coinbase {
return false, nil
}
log.Errorf("networkID: %d, TRUSTED REORG DETECTED! Batch: ", s.networkID, batch.BatchNumber)
log.Errorf("networkID: %d, TRUSTED REORG DETECTED! Batch: %d", s.networkID, batch.BatchNumber)
log.Warnf("networkID: %d, BatchL2Data. Virtual: %s, Trusted: %s", s.networkID, hex.EncodeToString(batch.BatchL2Data), hex.EncodeToString(tBatch.BatchL2Data))
log.Warnf("networkID: %d, GlobalExitRoot. Virtual: %s, Trusted: %s", s.networkID, batch.GlobalExitRoot.String(), tBatch.GlobalExitRoot.String())
log.Warnf("networkID: %d, Timestamp. Virtual: %d, Trusted: %d", s.networkID, batch.Timestamp.Unix(), tBatch.Timestamp.Unix())
Expand Down
Loading

0 comments on commit 32a528a

Please sign in to comment.