Skip to content

Commit

Permalink
Merge pull request #73 from 0xPolygon/agglayer-integration
Browse files Browse the repository at this point in the history
AggLayer integration
  • Loading branch information
vcastellm authored Feb 8, 2024
2 parents c3a18cd + 1fa8b47 commit fce434a
Show file tree
Hide file tree
Showing 15 changed files with 399 additions and 45 deletions.
157 changes: 137 additions & 20 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aggregator

import (
"context"
"crypto/ecdsa"
"encoding/json"
"errors"
"fmt"
Expand All @@ -13,6 +14,9 @@ import (
"time"
"unicode"

"github.com/0xPolygon/agglayer/client"
agglayerTypes "github.com/0xPolygon/agglayer/rpc/types"
"github.com/0xPolygon/agglayer/tx"
"github.com/0xPolygonHermez/zkevm-node/aggregator/metrics"
"github.com/0xPolygonHermez/zkevm-node/aggregator/prover"
"github.com/0xPolygonHermez/zkevm-node/config/types"
Expand Down Expand Up @@ -65,6 +69,9 @@ type Aggregator struct {
srv *grpc.Server
ctx context.Context
exit context.CancelFunc

AggLayerClient client.ClientInterface
sequencerPrivateKey *ecdsa.PrivateKey
}

// New creates a new aggregator.
Expand All @@ -73,6 +80,8 @@ func New(
stateInterface stateInterface,
ethTxManager ethTxManager,
etherman etherman,
agglayerClient client.ClientInterface,
sequencerPrivateKey *ecdsa.PrivateKey,
) (Aggregator, error) {
var profitabilityChecker aggregatorTxProfitabilityChecker
switch cfg.TxProfitabilityCheckerType {
Expand All @@ -94,6 +103,9 @@ func New(
TimeCleanupLockedProofs: cfg.CleanupLockedProofsInterval,

finalProof: make(chan finalProofMsg),

AggLayerClient: agglayerClient,
sequencerPrivateKey: sequencerPrivateKey,
}

return a, nil
Expand Down Expand Up @@ -267,34 +279,139 @@ func (a *Aggregator) sendFinalProof() {

log.Infof("Final proof inputs: NewLocalExitRoot [%#x], NewStateRoot [%#x]", inputs.NewLocalExitRoot, inputs.NewStateRoot)

// add batch verification to be monitored
sender := common.HexToAddress(a.cfg.SenderAddress)
to, data, err := a.Ethman.BuildTrustedVerifyBatchesTxData(proof.BatchNumber-1, proof.BatchNumberFinal, &inputs, sender)
if err != nil {
log.Errorf("Error estimating batch verification to add to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
continue
}
monitoredTxID := buildMonitoredTxID(proof.BatchNumber, proof.BatchNumberFinal)
err = a.EthTxManager.Add(ctx, ethTxManagerOwner, monitoredTxID, sender, to, nil, data, a.cfg.GasOffset, nil)
if err != nil {
mTxLogger := ethtxmanager.CreateLogger(ethTxManagerOwner, monitoredTxID, sender, to)
mTxLogger.Errorf("Error to add batch verification tx to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
continue
switch a.cfg.SettlementBackend {
case AggLayer:
if success := a.settleWithAggLayer(ctx, proof, inputs); !success {
continue
}
default:
if success := a.settleDirect(ctx, proof, inputs); !success {
continue
}
}

// process monitored batch verifications before starting a next cycle
a.EthTxManager.ProcessPendingMonitoredTxs(ctx, ethTxManagerOwner, func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
a.handleMonitoredTxResult(result)
}, nil)

a.resetVerifyProofTime()
a.endProofVerification()
}
}
}

func (a *Aggregator) settleDirect(
ctx context.Context,
proof *state.Proof,
inputs ethmanTypes.FinalProofInputs,
) (success bool) {
// add batch verification to be monitored
sender := common.HexToAddress(a.cfg.SenderAddress)

to, data, err := a.Ethman.BuildTrustedVerifyBatchesTxData(
proof.BatchNumber-1,
proof.BatchNumberFinal,
&inputs,
sender,
)
if err != nil {
log.Errorf("Error estimating batch verification to add to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)

return false
}

monitoredTxID := buildMonitoredTxID(proof.BatchNumber, proof.BatchNumberFinal)
err = a.EthTxManager.Add(
ctx,
ethTxManagerOwner,
monitoredTxID,
sender,
to,
nil,
data,
a.cfg.GasOffset,
nil,
)
if err != nil {
mTxLogger := ethtxmanager.CreateLogger(ethTxManagerOwner, monitoredTxID, sender, to)
mTxLogger.Errorf("Error to add batch verification tx to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)

return false
}

// process monitored batch verifications before starting a next cycle
a.EthTxManager.ProcessPendingMonitoredTxs(
ctx,
ethTxManagerOwner,
func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
a.handleMonitoredTxResult(result)
},
nil,
)

return true
}

func (a *Aggregator) settleWithAggLayer(
ctx context.Context,
proof *state.Proof,
inputs ethmanTypes.FinalProofInputs,
) (success bool) {
proofStrNo0x := strings.TrimPrefix(inputs.FinalProof.Proof, "0x")
proofBytes := common.Hex2Bytes(proofStrNo0x)
tx := tx.Tx{
LastVerifiedBatch: agglayerTypes.ArgUint64(proof.BatchNumber - 1),
NewVerifiedBatch: agglayerTypes.ArgUint64(proof.BatchNumberFinal),
ZKP: tx.ZKP{
NewStateRoot: common.BytesToHash(inputs.NewStateRoot),
NewLocalExitRoot: common.BytesToHash(inputs.NewLocalExitRoot),
Proof: agglayerTypes.ArgBytes(proofBytes),
},
RollupID: a.Ethman.GetRollupId(),
}
signedTx, err := tx.Sign(a.sequencerPrivateKey)

if err != nil {
log.Errorf("failed to sign tx: %v", err)
a.handleFailureToSendToAggLayer(ctx, proof)

return false
}

log.Debug("final proof signedTx: ", signedTx.Tx.ZKP.Proof.Hex())
txHash, err := a.AggLayerClient.SendTx(*signedTx)
if err != nil {
log.Errorf("failed to send tx to the interop: %v", err)
a.handleFailureToSendToAggLayer(ctx, proof)

return false
}

log.Infof("tx %s sent to agglayer, waiting to be mined", txHash.Hex())
log.Debugf("Timeout set to %f seconds", a.cfg.AggLayerTxTimeout.Duration.Seconds())
waitCtx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(a.cfg.AggLayerTxTimeout.Duration))
defer cancelFunc()
if err := a.AggLayerClient.WaitTxToBeMined(txHash, waitCtx); err != nil {
log.Errorf("interop didn't mine the tx: %v", err)
a.handleFailureToSendToAggLayer(ctx, proof)

return false
}

// TODO: wait for synchronizer to catch up
return true
}

func (a *Aggregator) handleFailureToSendToAggLayer(ctx context.Context, proof *state.Proof) {
log := log.WithFields("proofId", proof.ProofID, "batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal))
proof.GeneratingSince = nil

err := a.State.UpdateGeneratedProof(ctx, proof, nil)
if err != nil {
log.Errorf("Failed updating proof state (false): %v", err)
}

a.endProofVerification()
}

func (a *Aggregator) handleFailureToAddVerifyBatchToBeMonitored(ctx context.Context, proof *state.Proof) {
log := log.WithFields("proofId", proof.ProofID, "batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal))
proof.GeneratingSince = nil
Expand Down
10 changes: 5 additions & 5 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestSendFinalProof(t *testing.T) {
stateMock := mocks.NewStateMock(t)
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
a.ctx, a.exit = context.WithCancel(context.Background())
m := mox{
Expand Down Expand Up @@ -685,7 +685,7 @@ func TestTryAggregateProofs(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -1023,7 +1023,7 @@ func TestTryGenerateBatchProof(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -1300,7 +1300,7 @@ func TestTryBuildFinalProof(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -1430,7 +1430,7 @@ func TestIsSynced(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down
23 changes: 23 additions & 0 deletions aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ import (
"github.com/0xPolygonHermez/zkevm-node/encoding"
)

// SettlementBackend is the type of the settlement backend
type SettlementBackend string

const (
// AggLayer settlement backend
AggLayer SettlementBackend = "agglayer"

// L1 settlement backend
L1 SettlementBackend = "l1"
)

// TokenAmountWithDecimals is a wrapper type that parses token amount with decimals to big int
type TokenAmountWithDecimals struct {
*big.Int `validate:"required"`
Expand Down Expand Up @@ -88,4 +99,16 @@ type Config struct {

// UpgradeEtrogBatchNumber is the number of the first batch after upgrading to etrog
UpgradeEtrogBatchNumber uint64 `mapstructure:"UpgradeEtrogBatchNumber"`

// SettlementBackend configuration defines how a final ZKP should be settled. Directly to L1 or over the Beethoven service.
SettlementBackend SettlementBackend `mapstructure:"SettlementBackend"`

// AggLayerTxTimeout is the interval time to wait for a tx to be mined from the agglayer
AggLayerTxTimeout types.Duration `mapstructure:"AggLayerTxTimeout"`

// AggLayerURL url of the agglayer service
AggLayerURL string `mapstructure:"AggLayerURL"`

// SequencerPrivateKey Private key of the trusted sequencer
SequencerPrivateKey types.KeystoreFileConfig `mapstructure:"SequencerPrivateKey"`
}
1 change: 1 addition & 0 deletions aggregator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type ethTxManager interface {

// etherman contains the methods required to interact with ethereum
type etherman interface {
GetRollupId() uint32
GetLatestVerifiedBatchNum() (uint64, error)
BuildTrustedVerifyBatchesTxData(lastVerifiedBatch, newVerifiedBatch uint64, inputs *ethmanTypes.FinalProofInputs, beneficiary common.Address) (to *common.Address, data []byte, err error)
}
Expand Down
20 changes: 19 additions & 1 deletion aggregator/mocks/mock_etherman.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"runtime"
"time"

agglayerClient "github.com/0xPolygon/agglayer/client"
dataCommitteeClient "github.com/0xPolygon/cdk-data-availability/client"
datastreamerlog "github.com/0xPolygonHermez/zkevm-data-streamer/log"
"github.com/0xPolygonHermez/zkevm-node"
Expand Down Expand Up @@ -506,7 +507,23 @@ func createSequenceSender(cfg config.Config, pool *pool.Pool, etmStorage *ethtxm
}

func runAggregator(ctx context.Context, c aggregator.Config, etherman *etherman.Client, ethTxManager *ethtxmanager.Client, st *state.State) {
agg, err := aggregator.New(c, st, ethTxManager, etherman)
var (
aggCli *agglayerClient.Client
pk *ecdsa.PrivateKey
err error
)

if c.SettlementBackend == aggregator.AggLayer {
aggCli = agglayerClient.New(c.AggLayerURL)

// Load private key
pk, err = config.NewKeyFromKeystore(c.SequencerPrivateKey)
if err != nil {
log.Fatal(err)
}
}

agg, err := aggregator.New(c, st, ethTxManager, etherman, aggCli, pk)
if err != nil {
log.Fatal(err)
}
Expand Down
Loading

0 comments on commit fce434a

Please sign in to comment.