diff --git a/.changeset/polite-yaks-smell.md b/.changeset/polite-yaks-smell.md new file mode 100644 index 00000000000..1d189e5a3e2 --- /dev/null +++ b/.changeset/polite-yaks-smell.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#added workflow spec auto-approval via CLO diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index e185fbc8c39..701cff03c81 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -32,6 +32,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr" ocr2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate" "github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" ) @@ -50,6 +51,21 @@ var ( Help: "Metric to track job proposal requests", }) + promWorkflowRequests = promauto.NewCounter(prometheus.CounterOpts{ + Name: "feeds_workflow_requests", + Help: "Metric to track workflow requests", + }) + + promWorkflowApprovals = promauto.NewCounter(prometheus.CounterOpts{ + Name: "feeds_workflow_approvals", + Help: "Metric to track workflow successful auto approvals", + }) + + promWorkflowFailures = promauto.NewCounter(prometheus.CounterOpts{ + Name: "feeds_workflow_rejections", + Help: "Metric to track workflow failed auto approvals", + }) + promJobProposalCounts = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "feeds_job_proposal_count", Help: "Number of job proposals for the node partitioned by status.", @@ -553,6 +569,7 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, } if exists { + // note: CLO auto-increments the version number on re-proposal, so this should never happen return 0, errors.New("proposed job spec version already exists") } } @@ -596,9 +613,21 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, if err != nil { return 0, err } - - // Track the given job proposal request - promJobProposalRequest.Inc() + // auto approve workflow specs + if isWFSpec(logger, args.Spec) { + promWorkflowRequests.Inc() + err = s.ApproveSpec(ctx, id, true) + if err != nil { + promWorkflowFailures.Inc() + logger.Errorw("Failed to auto approve workflow spec", "id", id, "err", err) + return 0, fmt.Errorf("failed to approve workflow spec %d: %w", id, err) + } + logger.Infow("Successful workflow spec auto approval", "id", id) + promWorkflowApprovals.Inc() + } else { + // Track the given job proposal request + promJobProposalRequest.Inc() + } if err = s.observeJobProposalCounts(ctx); err != nil { logger.Errorw("Failed to push metrics for propose job", err) @@ -607,6 +636,16 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, return id, nil } +func isWFSpec(lggr logger.Logger, spec string) bool { + jobType, err := job.ValidateSpec(spec) + if err != nil { + // this should not happen in practice + lggr.Errorw("Failed to validate spec while checking for workflow", "err", err) + return false + } + return jobType == job.Workflow +} + // GetJobProposal gets a job proposal by id. func (s *service) GetJobProposal(ctx context.Context, id int64) (*JobProposal, error) { return s.orm.GetJobProposal(ctx, id) @@ -761,6 +800,15 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error { return errors.Wrap(txerr, "FindOCR2JobIDByAddress failed") } } + case job.Workflow: + existingJobID, txerr = findExistingWorkflowJob(ctx, *j.WorkflowSpec, tx.jobORM) + if txerr != nil { + // Return an error if the repository errors. If there is a not found + // error we want to continue with approving the job. + if !errors.Is(txerr, sql.ErrNoRows) { + return fmt.Errorf("failed while checking for existing workflow job: %w", txerr) + } + } default: return errors.Errorf("unsupported job type when approving job proposal specs: %s", j.Type) } @@ -1058,6 +1106,11 @@ func (s *service) observeJobProposalCounts(ctx context.Context) error { return nil } +// TODO KS-205 implement this. Need to figure out how exactly how we want to handle this. +func findExistingWorkflowJob(ctx context.Context, wfSpec job.WorkflowSpec, tx job.ORM) (int32, error) { + return 0, nil +} + // findExistingJobForOCR2 looks for existing job for OCR2 func findExistingJobForOCR2(ctx context.Context, j *job.Job, tx job.ORM) (int32, error) { var contractID string @@ -1073,7 +1126,7 @@ func findExistingJobForOCR2(ctx context.Context, j *job.Job, tx job.ORM) (int32, feedID = j.BootstrapSpec.FeedID } case job.FluxMonitor, job.OffchainReporting: - return 0, errors.Errorf("contradID and feedID not applicable for job type: %s", j.Type) + return 0, errors.Errorf("contractID and feedID not applicable for job type: %s", j.Type) default: return 0, errors.Errorf("unsupported job type: %s", j.Type) } @@ -1106,7 +1159,7 @@ func findExistingJobForOCRFlux(ctx context.Context, j *job.Job, tx job.ORM) (int func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error) { jobType, err := job.ValidateSpec(spec) if err != nil { - return nil, errors.Wrap(err, "failed to parse job spec TOML") + return nil, fmt.Errorf("failed to parse job spec TOML'%s': %w", spec, err) } var js job.Job @@ -1128,6 +1181,8 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error js, err = ocrbootstrap.ValidatedBootstrapSpecToml(spec) case job.FluxMonitor: js, err = fluxmonitorv2.ValidatedFluxMonitorSpec(s.jobCfg, spec) + case job.Workflow: + js, err = workflows.ValidatedWorkflowSpec(spec) default: return nil, errors.Errorf("unknown job type: %s", jobType) } diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index af656618f78..43d75f712a0 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -42,6 +42,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" "github.com/smartcontractkit/chainlink/v2/core/services/versioning" + "github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" ) @@ -638,6 +639,54 @@ func Test_Service_ProposeJob(t *testing.T) { } httpTimeout = *commonconfig.MustNewDuration(1 * time.Second) + + // variables for workflow spec + wfID = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef" + wfOwner = "00000000000000000000000000000000000000aa" + specYaml = ` +triggers: + - id: "a-trigger" + +actions: + - id: "an-action" + ref: "an-action" + inputs: + trigger_output: $(trigger.outputs) + +consensus: + - id: "a-consensus" + ref: "a-consensus" + inputs: + trigger_output: $(trigger.outputs) + an-action_output: $(an-action.outputs) + +targets: + - id: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +` + wfSpec = testspecs.GenerateWorkflowSpec(wfID, wfOwner, specYaml).Toml() + proposalIDWF = int64(11) + remoteUUIDWF = uuid.New() + argsWF = &feeds.ProposeJobArgs{ + FeedsManagerID: 1, + RemoteUUID: remoteUUIDWF, + Spec: wfSpec, + Version: 1, + } + jpWF = feeds.JobProposal{ + FeedsManagerID: 1, + Name: null.StringFrom("test-spec"), + RemoteUUID: remoteUUIDWF, + Status: feeds.JobProposalStatusPending, + } + proposalSpecWF = feeds.JobProposalSpec{ + Definition: wfSpec, + Status: feeds.SpecStatusPending, + Version: 1, + JobProposalID: proposalIDWF, + } ) testCases := []struct { @@ -647,6 +696,90 @@ func Test_Service_ProposeJob(t *testing.T) { wantID int64 wantErr string }{ + { + name: "Auto approve WF spec", + before: func(svc *TestService) { + svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, argsWF.RemoteUUID).Return(new(feeds.JobProposal), sql.ErrNoRows) + svc.orm.On("UpsertJobProposal", mock.Anything, &jpWF).Return(proposalIDWF, nil) + svc.orm.On("CreateSpec", mock.Anything, proposalSpecWF).Return(int64(100), nil) + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + transactCall := svc.orm.On("Transact", mock.Anything, mock.Anything) + transactCall.Run(func(args mock.Arguments) { + fn := args[1].(func(orm feeds.ORM) error) + transactCall.ReturnArguments = mock.Arguments{fn(svc.orm)} + }) + // Auto approve is really a call to ApproveJobProposal and so we have to mock that as well + svc.connMgr.On("GetClient", argsWF.FeedsManagerID).Return(svc.fmsClient, nil) + svc.orm.EXPECT().GetSpec(mock.Anything, proposalIDWF).Return(&proposalSpecWF, nil) + svc.orm.EXPECT().GetJobProposal(mock.Anything, proposalSpecWF.JobProposalID).Return(&jpWF, nil) + svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) + + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows) // TODO fix the external job id in wf spec generation + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + svc.spawner. + On("CreateJob", + mock.Anything, + mock.Anything, + mock.MatchedBy(func(j *job.Job) bool { + return j.WorkflowSpec.WorkflowOwner == wfOwner + }), + ). + Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). + Return(nil) + svc.orm.On("ApproveSpec", + mock.Anything, + proposalSpecWF.JobProposalID, + mock.IsType(uuid.UUID{}), + ).Return(nil) + svc.fmsClient.On("ApprovedJob", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + &proto.ApprovedJobRequest{ + Uuid: jpWF.RemoteUUID.String(), + Version: int64(proposalSpecWF.Version), + }, + ).Return(&proto.ApprovedJobResponse{}, nil) + }, + args: argsWF, + wantID: proposalIDWF, + }, + { + name: "Auto approve WF spec: error creating job", + before: func(svc *TestService) { + svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, argsWF.RemoteUUID).Return(new(feeds.JobProposal), sql.ErrNoRows) + svc.orm.On("UpsertJobProposal", mock.Anything, &jpWF).Return(proposalIDWF, nil) + svc.orm.On("CreateSpec", mock.Anything, proposalSpecWF).Return(int64(100), nil) + // svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + transactCall := svc.orm.On("Transact", mock.Anything, mock.Anything) + transactCall.Run(func(args mock.Arguments) { + fn := args[1].(func(orm feeds.ORM) error) + transactCall.ReturnArguments = mock.Arguments{fn(svc.orm)} + }) + // Auto approve is really a call to ApproveJobProposal and so we have to mock that as well + svc.connMgr.On("GetClient", argsWF.FeedsManagerID).Return(svc.fmsClient, nil) + svc.orm.EXPECT().GetSpec(mock.Anything, proposalIDWF).Return(&proposalSpecWF, nil) + svc.orm.EXPECT().GetJobProposal(mock.Anything, proposalSpecWF.JobProposalID).Return(&jpWF, nil) + svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil) + + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows) // TODO fix the external job id in wf spec generation + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + svc.spawner. + On("CreateJob", + mock.Anything, + mock.Anything, + mock.MatchedBy(func(j *job.Job) bool { + return j.WorkflowSpec.WorkflowOwner == wfOwner + }), + ). + Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }). + Return(fmt.Errorf("error creating job")) + }, + args: argsWF, + wantID: 0, + wantErr: "error creating job", + }, + { name: "Create success (Flux Monitor)", before: func(svc *TestService) { diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 9db802f9a2f..95d2f0ca29d 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -122,21 +122,28 @@ func ValidatedWorkflowSpec(tomlString string) (job.Job, error) { if err != nil { return jb, fmt.Errorf("toml unmarshal error on spec: %w", err) } + if jb.Type != job.Workflow { + return jb, fmt.Errorf("unsupported type %s, expected %s", jb.Type, job.Workflow) + } var spec job.WorkflowSpec err = tree.Unmarshal(&spec) if err != nil { - return jb, fmt.Errorf("toml unmarshal error on job: %w", err) + return jb, fmt.Errorf("toml unmarshal error on workflow spec: %w", err) } - if err := spec.Validate(); err != nil { - return jb, err + err = spec.Validate() + if err != nil { + return jb, fmt.Errorf("invalid WorkflowSpec: %w", err) } - jb.WorkflowSpec = &spec - if jb.Type != job.Workflow { - return jb, fmt.Errorf("unsupported type %s", jb.Type) + // ensure the embedded workflow graph is valid + _, err = Parse(spec.Workflow) + if err != nil { + return jb, fmt.Errorf("failed to parse workflow graph: %w", err) } + jb.WorkflowSpec = &spec + jb.WorkflowSpecID = &spec.ID return jb, nil }