Skip to content

Commit

Permalink
Fix panic on mercury server error (#13231)
Browse files Browse the repository at this point in the history
* Fix panic on mercury server error

* Changelog

* Remove newline

* Fix changelog

* changelog
  • Loading branch information
samsondav committed May 17, 2024
1 parent 49f1bf3 commit c4ef6c6
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 49 deletions.
5 changes: 5 additions & 0 deletions .changeset/eighty-hotels-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix panic if mercury server returns error #bugfix
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@
You may disable if this results in excessive log volume. Disable like so:

```
[Pipeline]
[JobPipeline]
VerboseLogging = false
```

Expand Down Expand Up @@ -219,7 +219,7 @@

- [#12404](https://github.com/smartcontractkit/chainlink/pull/12404) [`b74079b672`](https://github.com/smartcontractkit/chainlink/commit/b74079b672f36fb0c241f90ea1e875ea3a9524da) Thanks [@HenryNguyen5](https://github.com/HenryNguyen5)! - Add OCR3 capability contract wrapper

- [#12498](https://github.com/smartcontractkit/chainlink/pull/12498) [`1c576d0e34`](https://github.com/smartcontractkit/chainlink/commit/1c576d0e34d93a6298ddcb662ee89fd04eeda53e) Thanks [@samsondav](https://github.com/samsondav)! - Add new config option Pipeline.VerboseLogging
- [#12498](https://github.com/smartcontractkit/chainlink/pull/12498) [`1c576d0e34`](https://github.com/smartcontractkit/chainlink/commit/1c576d0e34d93a6298ddcb662ee89fd04eeda53e) Thanks [@samsondav](https://github.com/samsondav)! - Add new config option JobPipeline.VerboseLogging

VerboseLogging enables detailed logging of pipeline execution steps. This is
disabled by default because it increases log volume for pipeline runs, but can
Expand All @@ -230,7 +230,7 @@
Set it like the following example:

```
[Pipeline]
[JobPipeline]
VerboseLogging = true
```

Expand Down
43 changes: 26 additions & 17 deletions core/services/relay/evm/mercury/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type asyncDeleter interface {
AsyncDelete(req *pb.TransmitRequest)
}

var _ services.Service = (*TransmitQueue)(nil)
var _ services.Service = (*transmitQueue)(nil)

var transmitQueueLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "mercury_transmit_queue_load",
Expand All @@ -40,7 +40,7 @@ const promInterval = 6500 * time.Millisecond

// TransmitQueue is the high-level package that everything outside of this file should be using
// It stores pending transmissions, yielding the latest (highest priority) first to the caller
type TransmitQueue struct {
type transmitQueue struct {
services.StateMachine

cond sync.Cond
Expand All @@ -62,11 +62,20 @@ type Transmission struct {
ReportCtx ocrtypes.ReportContext // contains priority information (latest epoch/round wins)
}

type TransmitQueue interface {
services.Service

BlockingPop() (t *Transmission)
Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool)
Init(transmissions []*Transmission)
IsEmpty() bool
}

// maxlen controls how many items will be stored in the queue
// 0 means unlimited - be careful, this can cause memory leaks
func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) *TransmitQueue {
func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) TransmitQueue {
mu := new(sync.RWMutex)
return &TransmitQueue{
return &transmitQueue{
services.StateMachine{},
sync.Cond{L: mu},
lggr.Named("TransmitQueue"),
Expand All @@ -80,13 +89,13 @@ func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int,
}
}

func (tq *TransmitQueue) Init(transmissions []*Transmission) {
func (tq *transmitQueue) Init(transmissions []*Transmission) {
pq := priorityQueue(transmissions)
heap.Init(&pq) // ensure the heap is ordered
tq.pq = &pq
}

func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) {
func (tq *transmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) {
tq.cond.L.Lock()
defer tq.cond.L.Unlock()

Expand All @@ -111,7 +120,7 @@ func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.Report

// BlockingPop will block until at least one item is in the heap, and then return it
// If the queue is closed, it will immediately return nil
func (tq *TransmitQueue) BlockingPop() (t *Transmission) {
func (tq *transmitQueue) BlockingPop() (t *Transmission) {
tq.cond.L.Lock()
defer tq.cond.L.Unlock()
if tq.closed {
Expand All @@ -126,13 +135,13 @@ func (tq *TransmitQueue) BlockingPop() (t *Transmission) {
return t
}

func (tq *TransmitQueue) IsEmpty() bool {
func (tq *transmitQueue) IsEmpty() bool {
tq.mu.RLock()
defer tq.mu.RUnlock()
return tq.pq.Len() == 0
}

func (tq *TransmitQueue) Start(context.Context) error {
func (tq *transmitQueue) Start(context.Context) error {
return tq.StartOnce("TransmitQueue", func() error {
t := time.NewTicker(utils.WithJitter(promInterval))
wg := new(sync.WaitGroup)
Expand All @@ -148,7 +157,7 @@ func (tq *TransmitQueue) Start(context.Context) error {
})
}

func (tq *TransmitQueue) Close() error {
func (tq *transmitQueue) Close() error {
return tq.StopOnce("TransmitQueue", func() error {
tq.cond.L.Lock()
tq.closed = true
Expand All @@ -159,7 +168,7 @@ func (tq *TransmitQueue) Close() error {
})
}

func (tq *TransmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, wg *sync.WaitGroup) {
func (tq *transmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()

for {
Expand All @@ -172,25 +181,25 @@ func (tq *TransmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{},
}
}

func (tq *TransmitQueue) report() {
func (tq *transmitQueue) report() {
tq.mu.RLock()
length := tq.pq.Len()
tq.mu.RUnlock()
tq.transmitQueueLoad.Set(float64(length))
}

func (tq *TransmitQueue) Ready() error {
func (tq *transmitQueue) Ready() error {
return nil
}
func (tq *TransmitQueue) Name() string { return tq.lggr.Name() }
func (tq *TransmitQueue) HealthReport() map[string]error {
func (tq *transmitQueue) Name() string { return tq.lggr.Name() }
func (tq *transmitQueue) HealthReport() map[string]error {
report := map[string]error{tq.Name(): errors.Join(
tq.status(),
)}
return report
}

func (tq *TransmitQueue) status() (merr error) {
func (tq *transmitQueue) status() (merr error) {
tq.mu.RLock()
length := tq.pq.Len()
closed := tq.closed
Expand All @@ -206,7 +215,7 @@ func (tq *TransmitQueue) status() (merr error) {

// pop latest Transmission from the heap
// Not thread-safe
func (tq *TransmitQueue) pop() *Transmission {
func (tq *transmitQueue) pop() *Transmission {
if tq.pq.Len() == 0 {
return nil
}
Expand Down
39 changes: 23 additions & 16 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,12 @@ type server struct {

c wsrpc.Client
pm *PersistenceManager
q *TransmitQueue
q TransmitQueue

deleteQueue chan *pb.TransmitRequest

url string

transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
Expand Down Expand Up @@ -268,7 +270,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed
s.transmitDuplicateCount.Inc()
s.lggr.Debugw("Transmit report success; duplicate report", "payload", hexutil.Encode(t.Req.Payload), "response", res, "repts", t.ReportCtx.ReportTimestamp)
default:
transmitServerErrorCount.WithLabelValues(feedIDHex, fmt.Sprintf("%d", res.Code)).Inc()
transmitServerErrorCount.WithLabelValues(feedIDHex, s.url, fmt.Sprintf("%d", res.Code)).Inc()
s.lggr.Errorw("Transmit report failed; mercury server returned error", "response", res, "reportCtx", t.ReportCtx, "err", res.Error, "code", res.Code)
}
}
Expand All @@ -281,26 +283,31 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed
}
}

func newServer(lggr logger.Logger, cfg TransmitterConfig, client wsrpc.Client, pm *PersistenceManager, serverURL, feedIDHex string) *server {
return &server{
lggr,
cfg.TransmitTimeout().Duration(),
client,
pm,
NewTransmitQueue(lggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm),
make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())),
serverURL,
transmitSuccessCount.WithLabelValues(feedIDHex, serverURL),
transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL),
transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL),
transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex, serverURL),
transmitQueueInsertErrorCount.WithLabelValues(feedIDHex, serverURL),
transmitQueuePushErrorCount.WithLabelValues(feedIDHex, serverURL),
}
}

func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder, triggerCapability *triggers.MercuryTriggerService) *mercuryTransmitter {
feedIDHex := fmt.Sprintf("0x%x", feedID[:])
servers := make(map[string]*server, len(clients))
for serverURL, client := range clients {
cLggr := lggr.Named(serverURL).With("serverURL", serverURL)
pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, int(cfg.TransmitQueueMaxSize()), flushDeletesFrequency, pruneFrequency)
servers[serverURL] = &server{
cLggr,
cfg.TransmitTimeout().Duration(),
client,
pm,
NewTransmitQueue(cLggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm),
make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())),
transmitSuccessCount.WithLabelValues(feedIDHex, serverURL),
transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL),
transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL),
transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex, serverURL),
transmitQueueInsertErrorCount.WithLabelValues(feedIDHex, serverURL),
transmitQueuePushErrorCount.WithLabelValues(feedIDHex, serverURL),
}
servers[serverURL] = newServer(cLggr, cfg, client, pm, serverURL, feedIDHex)
}
return &mercuryTransmitter{
services.StateMachine{},
Expand Down
Loading

0 comments on commit c4ef6c6

Please sign in to comment.