Skip to content

Commit

Permalink
Add error tracking and step metrics to streams lookup and error handl…
Browse files Browse the repository at this point in the history
…er (#12387)

* Add error tracking metrics to streams lookup

* Address feedback and add streams retries and responses

* Minor lints

* Log status code label as int string

* Rebase and add changeset
  • Loading branch information
ogtownsend committed Apr 1, 2024
1 parent f9d02e3 commit 42e72d2
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 7 deletions.
5 changes: 5 additions & 0 deletions .changeset/large-oranges-warn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Adds prometheus metrics for automation streams error handling
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury"
v02 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v02"
v03 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/v03"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -121,6 +122,7 @@ func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keeper
lookupLggr := s.lggr.With("where", "StreamsLookup")
if checkResult.IneligibilityReason != uint8(encoding.UpkeepFailureReasonTargetCheckReverted) {
// Streams Lookup only works when upkeep target check reverts
prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorReasonNotReverted).Inc()
return
}

Expand All @@ -134,11 +136,13 @@ func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keeper
if err != nil {
lookupLggr.Debugf("at block %d upkeep %s DecodeStreamsLookupRequest failed: %v", block, upkeepId, err)
// user contract did not revert with StreamsLookup error
prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorDecodeRequestFailed).Inc()
return
}
streamsLookupResponse := &mercury.StreamsLookup{StreamsLookupError: streamsLookupErr}
if s.mercuryConfig.Credentials() == nil {
lookupLggr.Errorf("at block %d upkeep %s tries to access mercury server but mercury credential is not configured", block, upkeepId)
prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorCredentialsNotConfigured).Inc()
return
}

Expand Down Expand Up @@ -179,6 +183,7 @@ func (s *streams) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *merc
values, errCode, err := s.DoMercuryRequest(ctx, lookup, checkResults, i)
if err != nil {
s.lggr.Errorf("at block %d upkeep %s requested time %s DoMercuryRequest err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error())
prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorDoMercuryRequest).Inc()
return
}

Expand All @@ -187,17 +192,20 @@ func (s *streams) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *merc
if err != nil {
s.lggr.Errorf("at block %d upkeep %s requested time %s CheckErrorHandler err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error())
}
prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorCodeNotNil).Inc()
return
}

// Mercury request returned values or user's checkErrorhandler didn't return error, call checkCallback
err = s.CheckCallback(ctx, values, lookup, checkResults, i)
if err != nil {
s.lggr.Errorf("at block %d upkeep %s requested time %s CheckCallback err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error())
prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorCheckCallback).Inc()
}
}

func (s *streams) CheckCallback(ctx context.Context, values [][]byte, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) error {
prommetrics.AutomationStreamsLookupStep.WithLabelValues(prommetrics.StreamsLookupStepCheckCallback).Inc()
payload, err := s.abi.Pack("checkCallback", lookup.UpkeepId, values, lookup.ExtraData)
if err != nil {
checkResults[i].Retryable = false
Expand Down Expand Up @@ -243,6 +251,7 @@ func (s *streams) makeCallbackEthCall(ctx context.Context, payload []byte, looku
// Does the mercury request for the checkResult. Returns either the looked up values or an error code if something is wrong with mercury
// In case of any pipeline processing issues, returns an error and also sets approriate state on the checkResult itself
func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) ([][]byte, encoding.ErrCode, error) {
prommetrics.AutomationStreamsLookupStep.WithLabelValues(prommetrics.StreamsLookupStepDoMercuryRequest).Inc()
var state, values, errCode, retryable, retryInterval = encoding.NoPipelineError, [][]byte{}, encoding.ErrCodeNil, false, 0 * time.Second
var err error
pluginRetryKey := generatePluginRetryKey(checkResults[i].WorkID, lookup.Block)
Expand Down Expand Up @@ -276,18 +285,21 @@ func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsL

func (s *streams) CheckErrorHandler(ctx context.Context, errCode encoding.ErrCode, lookup *mercury.StreamsLookup, checkResults []ocr2keepers.CheckResult, i int) error {
s.lggr.Debugf("at block %d upkeep %s requested time %s CheckErrorHandler error code: %d", lookup.Block, lookup.UpkeepId, lookup.Time, errCode)
prommetrics.AutomationStreamsLookupStep.WithLabelValues(prommetrics.StreamsLookupStepCheckErrorHandler).Inc()

userPayload, err := s.packer.PackUserCheckErrorHandler(errCode, lookup.ExtraData)
if err != nil {
checkResults[i].Retryable = false
checkResults[i].PipelineExecutionState = uint8(encoding.PackUnpackDecodeFailed)
prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorPackUserCheckErrorHandler).Inc()
return err
}

payload, err := s.abi.Pack("executeCallback", lookup.UpkeepId, userPayload)
if err != nil {
checkResults[i].Retryable = false
checkResults[i].PipelineExecutionState = uint8(encoding.PackUnpackDecodeFailed)
prommetrics.AutomationStreamsLookupError.WithLabelValues(prommetrics.StreamsLookupErrorPackExecuteCallback).Inc()
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -172,6 +173,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
sent := false
retryErr := retry.Do(
func() error {
prommetrics.AutomationStreamsRetries.WithLabelValues(prommetrics.StreamsVersion02).Inc()
var httpResponse *http.Response
var responseBody []byte
var blobBytes []byte
Expand Down Expand Up @@ -206,6 +208,7 @@ func (c *client) singleFeedRequest(ctx context.Context, ch chan<- mercury.Mercur
return nil
}

prommetrics.AutomationStreamsResponses.WithLabelValues(prommetrics.StreamsVersion02, fmt.Sprintf("%d", httpResponse.StatusCode)).Inc()
switch httpResponse.StatusCode {
case http.StatusNotFound, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
// Considered as pipeline error, but if retry attempts go over threshold, is changed upstream to ErrCode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -149,6 +150,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur
defer cancel()
retryErr := retry.Do(
func() error {
prommetrics.AutomationStreamsRetries.WithLabelValues(prommetrics.StreamsVersion03).Inc()
retryable = false
resp, err := c.httpClient.Do(req)
if err != nil {
Expand Down Expand Up @@ -180,6 +182,7 @@ func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.Mercur
}

c.lggr.Infof("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode)
prommetrics.AutomationStreamsResponses.WithLabelValues(prommetrics.StreamsVersion03, fmt.Sprintf("%d", resp.StatusCode)).Inc()
switch resp.StatusCode {
case http.StatusUnauthorized:
c.lggr.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", sl.Time.String(), sl.UpkeepId.String(), resp.StatusCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,36 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

// AutomationNamespace is the namespace for all Automation related metrics
const AutomationLogTriggerNamespace = "automation_log_trigger"
// Namespaces
const (
NamespaceAutomationLogTrigger = "automation_log_trigger"
NamespaceAutomationStreams = "automation_streams"
)

// Streams steps
const (
StreamsLookupStepDoMercuryRequest = "do_mercury_request"
StreamsLookupStepCheckErrorHandler = "check_error_handler"
StreamsLookupStepCheckCallback = "check_callback"
)

// Streams error labels
const (
StreamsLookupErrorReasonNotReverted = "reason_not_target_check_reverted"
StreamsLookupErrorDecodeRequestFailed = "decode_request_failed"
StreamsLookupErrorCredentialsNotConfigured = "credentials_not_configured"
StreamsLookupErrorDoMercuryRequest = "do_mercury_request"
StreamsLookupErrorCodeNotNil = "err_code_not_nil"
StreamsLookupErrorCheckCallback = "check_callback"
StreamsLookupErrorPackUserCheckErrorHandler = "pack_user_check_error_handler"
StreamsLookupErrorPackExecuteCallback = "pack_execute_callback"
)

// Streams versions
const (
StreamsVersion02 = "v02"
StreamsVersion03 = "v03"
)

// Metric labels
const (
Expand All @@ -17,31 +45,63 @@ const (

// Automation metrics
var (
// Log Trigger metrics
AutomationLogBufferFlow = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: AutomationLogTriggerNamespace,
Namespace: NamespaceAutomationLogTrigger,
Name: "num_logs_in_log_buffer",
Help: "The total number of logs currently being stored in the log buffer",
}, []string{
"direction",
})
AutomationRecovererMissedLogs = promauto.NewCounter(prometheus.CounterOpts{
Namespace: AutomationLogTriggerNamespace,
Namespace: NamespaceAutomationLogTrigger,
Name: "num_recoverer_missed_logs",
Help: "How many valid log triggers were identified as being missed by the recoverer",
})
AutomationRecovererPendingPayloads = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: AutomationLogTriggerNamespace,
Namespace: NamespaceAutomationLogTrigger,
Name: "num_recoverer_pending_payloads",
Help: "How many log trigger payloads are currently pending in the recoverer",
})
AutomationActiveUpkeeps = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: AutomationLogTriggerNamespace,
Namespace: NamespaceAutomationLogTrigger,
Name: "num_active_upkeeps",
Help: "How many log trigger upkeeps are currently active",
})
AutomationLogProviderLatestBlock = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: AutomationLogTriggerNamespace,
Namespace: NamespaceAutomationLogTrigger,
Name: "log_provider_latest_block",
Help: "The latest block number the log provider has seen",
})

// Streams metrics
AutomationStreamsLookupStep = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: NamespaceAutomationStreams,
Name: "streams_lookup_step_count",
Help: "How many times individual steps of the streams lookup process run",
}, []string{
"step",
})
AutomationStreamsLookupError = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: NamespaceAutomationStreams,
Name: "streams_lookup_error_count",
Help: "Errors occurred during a streams lookup attempt",
}, []string{
"error",
})
AutomationStreamsRetries = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: NamespaceAutomationStreams,
Name: "streams_retries",
Help: "Count of the times a streams lookup was retried",
}, []string{
"version",
})
AutomationStreamsResponses = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: NamespaceAutomationStreams,
Name: "streams_responses",
Help: "Count of individual response codes from streams lookup",
}, []string{
"version",
"status",
})
)

0 comments on commit 42e72d2

Please sign in to comment.