Skip to content

Commit

Permalink
better workflow spec validation
Browse files Browse the repository at this point in the history
  • Loading branch information
krehermann committed May 20, 2024
1 parent a84bc42 commit 03d1da3
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 30 deletions.
31 changes: 8 additions & 23 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr"
ocr2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"
)

Expand Down Expand Up @@ -613,7 +614,7 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64,
return 0, err
}
// auto approve workflow specs
if isWFSpec(args.Spec) {
if isWFSpec(logger, args.Spec) {
promWorkflowRequests.Inc()
err = s.ApproveSpec(ctx, id, true)
if err != nil {
Expand All @@ -635,13 +636,14 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64,
return id, nil
}

func isWFSpec(spec string) bool {
var s job.WorkflowSpec
err := toml.Unmarshal([]byte(spec), &s)
func isWFSpec(lggr logger.Logger, spec string) bool {
jobType, err := job.ValidateSpec(spec)
if err != nil {
// this should not happen in practice
lggr.Errorw("Failed to validate spec while checking for workflow", "err", err)
return false
}
return s.Validate() == nil
return jobType == job.Workflow
}

// GetJobProposal gets a job proposal by id.
Expand Down Expand Up @@ -799,7 +801,6 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error {
}
}
case job.Workflow:
// do nothing. assume there is no existing job
existingJobID, txerr = findExistingWorkflowJob(ctx, *j.WorkflowSpec, tx.jobORM)
if txerr != nil {
// Return an error if the repository errors. If there is a not found
Expand Down Expand Up @@ -1181,23 +1182,7 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error
case job.FluxMonitor:
js, err = fluxmonitorv2.ValidatedFluxMonitorSpec(s.jobCfg, spec)
case job.Workflow:
// TODO what the right way to validate a workflow spec?
var s job.WorkflowSpec
err = toml.Unmarshal([]byte(spec), &s)
if err != nil {
return nil, fmt.Errorf("failed to parse workflow spec TOML: %w", err)
}
err = s.Validate()
if err != nil {
return nil, fmt.Errorf("failed to validate workflow spec: %w", err)
}
js = job.Job{
Type: job.Workflow,
SchemaVersion: 1,
ExternalJobID: uuid.New(), // is this right?
WorkflowSpec: &s,
}

js, err = workflows.ValidatedWorkflowSpec(spec)
default:
return nil, errors.Errorf("unknown job type: %s", jobType)
}
Expand Down
29 changes: 26 additions & 3 deletions core/services/feeds/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,32 @@ func Test_Service_ProposeJob(t *testing.T) {
httpTimeout = *commonconfig.MustNewDuration(1 * time.Second)

// variables for workflow spec
wfID = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef"
wfOwner = "00000000000000000000000000000000000000aa"
wfSpec = testspecs.GenerateWorkflowSpec(wfID, wfOwner, "spec details").Toml()
wfID = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef"
wfOwner = "00000000000000000000000000000000000000aa"
specYaml = `
triggers:
- id: "a-trigger"
actions:
- id: "an-action"
ref: "an-action"
inputs:
trigger_output: $(trigger.outputs)
consensus:
- id: "a-consensus"
ref: "a-consensus"
inputs:
trigger_output: $(trigger.outputs)
an-action_output: $(an-action.outputs)
targets:
- id: "a-target"
ref: "a-target"
inputs:
consensus_output: $(a-consensus.outputs)
`
wfSpec = testspecs.GenerateWorkflowSpec(wfID, wfOwner, specYaml).Toml()
proposalIDWF = int64(11)
remoteUUIDWF = uuid.New()
argsWF = &feeds.ProposeJobArgs{
Expand Down
14 changes: 10 additions & 4 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,27 @@ func ValidatedWorkflowSpec(tomlString string) (job.Job, error) {
if err != nil {
return jb, fmt.Errorf("toml unmarshal error on spec: %w", err)
}
if jb.Type != job.Workflow {
return jb, fmt.Errorf("unsupported type %s, expected %s", jb.Type, job.Workflow)
}

var spec job.WorkflowSpec
err = tree.Unmarshal(&spec)
if err != nil {
return jb, fmt.Errorf("toml unmarshal error on job: %w", err)
return jb, fmt.Errorf("toml unmarshal error on workflow spec: %w", err)
}

if err := spec.Validate(); err != nil {

Check failure on line 135 in core/services/workflows/delegate.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "err" shadows declaration at line 116 (govet)
return jb, err
}

jb.WorkflowSpec = &spec
if jb.Type != job.Workflow {
return jb, fmt.Errorf("unsupported type %s", jb.Type)
// ensure the embedded workflow graph is valid
_, err = Parse(spec.Workflow)
if err != nil {
return jb, fmt.Errorf("failed to parse workflow graph: %w", err)
}
jb.WorkflowSpec = &spec
jb.WorkflowSpecID = &spec.ID

return jb, nil
}

0 comments on commit 03d1da3

Please sign in to comment.