Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KS-198: Workflow Spec Approval #13181

Merged
merged 9 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/polite-yaks-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added workflow spec auto-approval via CLO
80 changes: 75 additions & 5 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ var (
Help: "Metric to track job proposal requests",
})

promWorkflowRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "feeds_workflow_requests",
bolekk marked this conversation as resolved.
Show resolved Hide resolved
Help: "Metric to track workflow requests",
})

promWorkflowApprovals = promauto.NewCounter(prometheus.CounterOpts{
Name: "feeds_workflow_approvals",
Help: "Metric to track workflow successful auto approvals",
})

promWorkflowFailures = promauto.NewCounter(prometheus.CounterOpts{
Name: "feeds_workflow_rejections",
Help: "Metric to track workflow failed auto approvals",
})

promJobProposalCounts = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "feeds_job_proposal_count",
Help: "Number of job proposals for the node partitioned by status.",
Expand Down Expand Up @@ -553,6 +568,7 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64,
}

if exists {
// note: CLO auto-increments the version number on re-proposal, so this should never happen
return 0, errors.New("proposed job spec version already exists")
}
}
Expand Down Expand Up @@ -596,9 +612,21 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64,
if err != nil {
return 0, err
}

// Track the given job proposal request
promJobProposalRequest.Inc()
// auto approve workflow specs
if isWFSpec(args.Spec) {
promWorkflowRequests.Inc()
err = s.ApproveSpec(ctx, id, true)
bolekk marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
promWorkflowFailures.Inc()
logger.Errorw("Failed to auto approve workflow spec", "id", id, "err", err)
return 0, fmt.Errorf("failed to approve workflow spec %d: %w", id, err)
}
logger.Infow("Successful workflow spec auto approval", "id", id)
promWorkflowApprovals.Inc()
} else {
// Track the given job proposal request
promJobProposalRequest.Inc()
}

if err = s.observeJobProposalCounts(ctx); err != nil {
logger.Errorw("Failed to push metrics for propose job", err)
Expand All @@ -607,6 +635,15 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64,
return id, nil
}

func isWFSpec(spec string) bool {
krehermann marked this conversation as resolved.
Show resolved Hide resolved
var s job.WorkflowSpec
err := toml.Unmarshal([]byte(spec), &s)
if err != nil {
return false
}
return s.Validate() == nil
}

// GetJobProposal gets a job proposal by id.
func (s *service) GetJobProposal(ctx context.Context, id int64) (*JobProposal, error) {
return s.orm.GetJobProposal(ctx, id)
Expand Down Expand Up @@ -761,6 +798,16 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error {
return errors.Wrap(txerr, "FindOCR2JobIDByAddress failed")
}
}
case job.Workflow:
// do nothing. assume there is no existing job
existingJobID, txerr = findExistingWorkflowJob(ctx, *j.WorkflowSpec, tx.jobORM)
if txerr != nil {
// Return an error if the repository errors. If there is a not found
// error we want to continue with approving the job.
if !errors.Is(txerr, sql.ErrNoRows) {
return fmt.Errorf("failed while checking for existing workflow job: %w", txerr)
}
}
default:
return errors.Errorf("unsupported job type when approving job proposal specs: %s", j.Type)
}
Expand Down Expand Up @@ -1058,6 +1105,11 @@ func (s *service) observeJobProposalCounts(ctx context.Context) error {
return nil
}

// TODO KS-205 implement this. Need to figure out how exactly how we want to handle this.
func findExistingWorkflowJob(ctx context.Context, wfSpec job.WorkflowSpec, tx job.ORM) (int32, error) {
return 0, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to highlight that without this being implemented, what this means is, if the job has always been managed through CLO then it won't be applicable, as the externalJobID should be able to find the existing job. However if the workflow was added outside CLO then that job needs to be manually removed first. If workflow specs are not being manually added by nops then technically the latter should also not be a problem. However, if we do hide the workflow job from the UI it would also mean that they can't remove the job either.

I don't think it's a problem for now but perhaps we should revisit so it's full proof. The query here is usually through a unique identifier that ties the job to the spec.

}

// findExistingJobForOCR2 looks for existing job for OCR2
func findExistingJobForOCR2(ctx context.Context, j *job.Job, tx job.ORM) (int32, error) {
var contractID string
Expand All @@ -1073,7 +1125,7 @@ func findExistingJobForOCR2(ctx context.Context, j *job.Job, tx job.ORM) (int32,
feedID = j.BootstrapSpec.FeedID
}
case job.FluxMonitor, job.OffchainReporting:
return 0, errors.Errorf("contradID and feedID not applicable for job type: %s", j.Type)
return 0, errors.Errorf("contractID and feedID not applicable for job type: %s", j.Type)
default:
return 0, errors.Errorf("unsupported job type: %s", j.Type)
}
Expand Down Expand Up @@ -1106,7 +1158,7 @@ func findExistingJobForOCRFlux(ctx context.Context, j *job.Job, tx job.ORM) (int
func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error) {
jobType, err := job.ValidateSpec(spec)
if err != nil {
return nil, errors.Wrap(err, "failed to parse job spec TOML")
return nil, fmt.Errorf("failed to parse job spec TOML'%s': %w", spec, err)
}

var js job.Job
Expand All @@ -1128,6 +1180,24 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error
js, err = ocrbootstrap.ValidatedBootstrapSpecToml(spec)
case job.FluxMonitor:
js, err = fluxmonitorv2.ValidatedFluxMonitorSpec(s.jobCfg, spec)
case job.Workflow:
// TODO what the right way to validate a workflow spec?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HenryNguyen5 to confirm

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be merged, handling it after CRIB #13125

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the eta for that? this feature is largely independent of validation details and needs to be merged for the CLO timeline of EOM. said differently, we can update the validation logic after this if we need to change it

var s job.WorkflowSpec
err = toml.Unmarshal([]byte(spec), &s)
if err != nil {
return nil, fmt.Errorf("failed to parse workflow spec TOML: %w", err)
}
err = s.Validate()
if err != nil {
return nil, fmt.Errorf("failed to validate workflow spec: %w", err)
}
js = job.Job{
Type: job.Workflow,
SchemaVersion: 1,
ExternalJobID: uuid.New(), // is this right?
krehermann marked this conversation as resolved.
Show resolved Hide resolved
WorkflowSpec: &s,
}

default:
return nil, errors.Errorf("unknown job type: %s", jobType)
}
Expand Down
110 changes: 110 additions & 0 deletions core/services/feeds/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
"github.com/smartcontractkit/chainlink/v2/core/services/versioning"
"github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"
)

Expand Down Expand Up @@ -638,6 +639,31 @@ func Test_Service_ProposeJob(t *testing.T) {
}

httpTimeout = *commonconfig.MustNewDuration(1 * time.Second)

// variables for workflow spec
wfID = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef"
wfOwner = "00000000000000000000000000000000000000aa"
wfSpec = testspecs.GenerateWorkflowSpec(wfID, wfOwner, "spec details").Toml()
proposalIDWF = int64(11)
remoteUUIDWF = uuid.New()
argsWF = &feeds.ProposeJobArgs{
FeedsManagerID: 1,
RemoteUUID: remoteUUIDWF,
Spec: wfSpec,
Version: 1,
}
jpWF = feeds.JobProposal{
FeedsManagerID: 1,
Name: null.StringFrom("test-spec"),
RemoteUUID: remoteUUIDWF,
Status: feeds.JobProposalStatusPending,
}
proposalSpecWF = feeds.JobProposalSpec{
Definition: wfSpec,
Status: feeds.SpecStatusPending,
Version: 1,
JobProposalID: proposalIDWF,
}
)

testCases := []struct {
Expand All @@ -647,6 +673,90 @@ func Test_Service_ProposeJob(t *testing.T) {
wantID int64
wantErr string
}{
{
name: "Auto approve WF spec",
before: func(svc *TestService) {
svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, argsWF.RemoteUUID).Return(new(feeds.JobProposal), sql.ErrNoRows)
svc.orm.On("UpsertJobProposal", mock.Anything, &jpWF).Return(proposalIDWF, nil)
svc.orm.On("CreateSpec", mock.Anything, proposalSpecWF).Return(int64(100), nil)
svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
transactCall := svc.orm.On("Transact", mock.Anything, mock.Anything)
transactCall.Run(func(args mock.Arguments) {
fn := args[1].(func(orm feeds.ORM) error)
transactCall.ReturnArguments = mock.Arguments{fn(svc.orm)}
})
// Auto approve is really a call to ApproveJobProposal and so we have to mock that as well
svc.connMgr.On("GetClient", argsWF.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.EXPECT().GetSpec(mock.Anything, proposalIDWF).Return(&proposalSpecWF, nil)
svc.orm.EXPECT().GetJobProposal(mock.Anything, proposalSpecWF.JobProposalID).Return(&jpWF, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)

svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows) // TODO fix the external job id in wf spec generation
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
svc.spawner.
On("CreateJob",
mock.Anything,
mock.Anything,
mock.MatchedBy(func(j *job.Job) bool {
return j.WorkflowSpec.WorkflowOwner == wfOwner
}),
).
Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }).
Return(nil)
svc.orm.On("ApproveSpec",
mock.Anything,
proposalSpecWF.JobProposalID,
mock.IsType(uuid.UUID{}),
).Return(nil)
svc.fmsClient.On("ApprovedJob",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
&proto.ApprovedJobRequest{
Uuid: jpWF.RemoteUUID.String(),
Version: int64(proposalSpecWF.Version),
},
).Return(&proto.ApprovedJobResponse{}, nil)
},
args: argsWF,
wantID: proposalIDWF,
},
{
name: "Auto approve WF spec: error creating job",
before: func(svc *TestService) {
svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, argsWF.RemoteUUID).Return(new(feeds.JobProposal), sql.ErrNoRows)
svc.orm.On("UpsertJobProposal", mock.Anything, &jpWF).Return(proposalIDWF, nil)
svc.orm.On("CreateSpec", mock.Anything, proposalSpecWF).Return(int64(100), nil)
// svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
transactCall := svc.orm.On("Transact", mock.Anything, mock.Anything)
transactCall.Run(func(args mock.Arguments) {
fn := args[1].(func(orm feeds.ORM) error)
transactCall.ReturnArguments = mock.Arguments{fn(svc.orm)}
})
// Auto approve is really a call to ApproveJobProposal and so we have to mock that as well
svc.connMgr.On("GetClient", argsWF.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.EXPECT().GetSpec(mock.Anything, proposalIDWF).Return(&proposalSpecWF, nil)
svc.orm.EXPECT().GetJobProposal(mock.Anything, proposalSpecWF.JobProposalID).Return(&jpWF, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)

svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows) // TODO fix the external job id in wf spec generation
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
svc.spawner.
On("CreateJob",
mock.Anything,
mock.Anything,
mock.MatchedBy(func(j *job.Job) bool {
return j.WorkflowSpec.WorkflowOwner == wfOwner
}),
).
Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }).
Return(fmt.Errorf("error creating job"))
},
args: argsWF,
wantID: 0,
wantErr: "error creating job",
},

{
name: "Create success (Flux Monitor)",
before: func(svc *TestService) {
Expand Down
Loading