Skip to content

Commit

Permalink
core/services/relay/evm/mercury: switch to sqlutil.DataSource (#12818)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Apr 17, 2024
1 parent 707c707 commit 6a0b4a9
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 110 deletions.
5 changes: 5 additions & 0 deletions .changeset/pretty-flies-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

cor/services/relay/evm/mercury: switch to sqlutil.DataStore #internal
4 changes: 2 additions & 2 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,10 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
var (
pipelineORM = pipeline.NewORM(sqlxDB, globalLogger, cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns())
bridgeORM = bridges.NewORM(sqlxDB)
mercuryORM = mercury.NewORM(sqlxDB, globalLogger, cfg.Database())
mercuryORM = mercury.NewORM(opts.DB)
pipelineRunner = pipeline.NewRunner(pipelineORM, bridgeORM, cfg.JobPipeline(), cfg.WebServer(), legacyEVMChains, keyStore.Eth(), keyStore.VRF(), globalLogger, restrictedHTTPClient, unrestrictedHTTPClient)
jobORM = job.NewORM(sqlxDB, pipelineORM, bridgeORM, keyStore, globalLogger, cfg.Database())
txmORM = txmgr.NewTxStore(sqlxDB, globalLogger)
txmORM = txmgr.NewTxStore(opts.DB, globalLogger)
streamRegistry = streams.NewRegistry(globalLogger, pipelineRunner)
)

Expand Down
3 changes: 1 addition & 2 deletions core/services/ocr2/plugins/mercury/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus"
libocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/relay"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types"
Expand Down Expand Up @@ -279,7 +278,7 @@ var _ plugins.RegistrarConfig = (*testRegistrarConfig)(nil)
type testDataSourceORM struct{}

// LatestReport implements types.DataSourceORM.
func (*testDataSourceORM) LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error) {
func (*testDataSourceORM) LatestReport(ctx context.Context, feedID [32]byte) (report []byte, err error) {
return []byte{1, 2, 3}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R
}
lggr = lggr.Named("Relayer")

mercuryORM := mercury.NewORM(opts.DB, lggr, opts.QConfig)
mercuryORM := mercury.NewORM(opts.DS)
lloORM := llo.NewORM(opts.DS, chain.ID())
cdcFactory := llo.NewChannelDefinitionCacheFactory(lggr, lloORM, chain.LogPoller())
return &Relayer{
Expand Down
52 changes: 21 additions & 31 deletions core/services/relay/evm/mercury/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,22 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
pkgerrors "github.com/pkg/errors"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb"
)

type ORM interface {
InsertTransmitRequest(serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext, qopts ...pg.QOpt) error
DeleteTransmitRequests(serverURL string, reqs []*pb.TransmitRequest, qopts ...pg.QOpt) error
GetTransmitRequests(serverURL string, jobID int32, qopts ...pg.QOpt) ([]*Transmission, error)
PruneTransmitRequests(serverURL string, jobID int32, maxSize int, qopts ...pg.QOpt) error
LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error)
InsertTransmitRequest(ctx context.Context, serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error
DeleteTransmitRequests(ctx context.Context, serverURL string, reqs []*pb.TransmitRequest) error
GetTransmitRequests(ctx context.Context, serverURL string, jobID int32) ([]*Transmission, error)
PruneTransmitRequests(ctx context.Context, serverURL string, jobID int32, maxSize int) error
LatestReport(ctx context.Context, feedID [32]byte) (report []byte, err error)
}

func FeedIDFromReport(report ocrtypes.Report) (feedID utils.FeedID, err error) {
Expand All @@ -36,32 +34,27 @@ func FeedIDFromReport(report ocrtypes.Report) (feedID utils.FeedID, err error) {
}

type orm struct {
q pg.Q
ds sqlutil.DataSource
}

func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) ORM {
namedLogger := lggr.Named("MercuryORM")
q := pg.NewQ(db, namedLogger, cfg)
return &orm{
q: q,
}
func NewORM(ds sqlutil.DataSource) ORM {
return &orm{ds: ds}
}

// InsertTransmitRequest inserts one transmit request if the payload does not exist already.
func (o *orm) InsertTransmitRequest(serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext, qopts ...pg.QOpt) error {
func (o *orm) InsertTransmitRequest(ctx context.Context, serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error {
feedID, err := FeedIDFromReport(req.Payload)
if err != nil {
return err
}

q := o.q.WithOpts(qopts...)
var wg sync.WaitGroup
wg.Add(2)
var err1, err2 error

go func() {
defer wg.Done()
err1 = q.ExecQ(`
_, err1 = o.ds.ExecContext(ctx, `
INSERT INTO mercury_transmit_requests (server_url, payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (server_url, payload_hash) DO NOTHING
Expand All @@ -70,7 +63,7 @@ func (o *orm) InsertTransmitRequest(serverURL string, req *pb.TransmitRequest, j

go func() {
defer wg.Done()
err2 = q.ExecQ(`
_, err2 = o.ds.ExecContext(ctx, `
INSERT INTO feed_latest_reports (feed_id, report, epoch, round, updated_at, job_id)
VALUES ($1, $2, $3, $4, NOW(), $5)
ON CONFLICT (feed_id) DO UPDATE
Expand All @@ -83,7 +76,7 @@ func (o *orm) InsertTransmitRequest(serverURL string, req *pb.TransmitRequest, j
}

// DeleteTransmitRequest deletes the given transmit requests if they exist.
func (o *orm) DeleteTransmitRequests(serverURL string, reqs []*pb.TransmitRequest, qopts ...pg.QOpt) error {
func (o *orm) DeleteTransmitRequests(ctx context.Context, serverURL string, reqs []*pb.TransmitRequest) error {
if len(reqs) == 0 {
return nil
}
Expand All @@ -93,20 +86,18 @@ func (o *orm) DeleteTransmitRequests(serverURL string, reqs []*pb.TransmitReques
hashes = append(hashes, hashPayload(req.Payload))
}

q := o.q.WithOpts(qopts...)
err := q.ExecQ(`
_, err := o.ds.ExecContext(ctx, `
DELETE FROM mercury_transmit_requests
WHERE server_url = $1 AND payload_hash = ANY($2)
`, serverURL, hashes)
return err
}

// GetTransmitRequests returns all transmit requests in chronologically descending order.
func (o *orm) GetTransmitRequests(serverURL string, jobID int32, qopts ...pg.QOpt) ([]*Transmission, error) {
q := o.q.WithOpts(qopts...)
func (o *orm) GetTransmitRequests(ctx context.Context, serverURL string, jobID int32) ([]*Transmission, error) {
// The priority queue uses epoch and round to sort transmissions so order by
// the same fields here for optimal insertion into the pq.
rows, err := q.QueryContext(q.ParentCtx, `
rows, err := o.ds.QueryContext(ctx, `
SELECT payload, config_digest, epoch, round, extra_hash
FROM mercury_transmit_requests
WHERE job_id = $1 AND server_url = $2
Expand Down Expand Up @@ -146,10 +137,9 @@ func (o *orm) GetTransmitRequests(serverURL string, jobID int32, qopts ...pg.QOp

// PruneTransmitRequests keeps at most maxSize rows for the given job ID,
// deleting the oldest transactions.
func (o *orm) PruneTransmitRequests(serverURL string, jobID int32, maxSize int, qopts ...pg.QOpt) error {
q := o.q.WithOpts(qopts...)
func (o *orm) PruneTransmitRequests(ctx context.Context, serverURL string, jobID int32, maxSize int) error {
// Prune the oldest requests by epoch and round.
return q.ExecQ(`
_, err := o.ds.ExecContext(ctx, `
DELETE FROM mercury_transmit_requests
WHERE job_id = $1 AND server_url = $2 AND
payload_hash NOT IN (
Expand All @@ -160,11 +150,11 @@ func (o *orm) PruneTransmitRequests(serverURL string, jobID int32, maxSize int,
LIMIT $3
)
`, jobID, serverURL, maxSize)
return err
}

func (o *orm) LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error) {
q := o.q.WithOpts(qopts...)
err = q.GetContext(ctx, &report, `SELECT report FROM feed_latest_reports WHERE feed_id = $1`, feedID[:])
func (o *orm) LatestReport(ctx context.Context, feedID [32]byte) (report []byte, err error) {
err = o.ds.GetContext(ctx, &report, `SELECT report FROM feed_latest_reports WHERE feed_id = $1`, feedID[:])
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
Expand Down
Loading

0 comments on commit 6a0b4a9

Please sign in to comment.