Skip to content

Commit

Permalink
Parse workflow earlier and store spec type in the DB
Browse files Browse the repository at this point in the history
  • Loading branch information
nolag committed Sep 16, 2024
1 parent e2d3a10 commit aeed755
Show file tree
Hide file tree
Showing 20 changed files with 223 additions and 163 deletions.
2 changes: 1 addition & 1 deletion core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error
case job.FluxMonitor:
js, err = fluxmonitorv2.ValidatedFluxMonitorSpec(s.jobCfg, spec)
case job.Workflow:
js, err = workflows.ValidatedWorkflowJobSpec(spec)
js, err = workflows.ValidatedWorkflowJobSpec(ctx, spec)
case job.CCIP:
js, err = ccip.ValidatedCCIPSpec(spec)
default:
Expand Down
12 changes: 7 additions & 5 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package job_test

import (
"context"
"crypto/sha256"
"database/sql"
"fmt"
"testing"
Expand Down Expand Up @@ -1862,6 +1861,7 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) {
spec: &job.WorkflowSpec{
ID: 1,
Workflow: pkgworkflows.WFYamlSpec(t, "workflow01", addr1),
SpecType: job.YamlSpec,
},
before: mustInsertWFJob,
},
Expand All @@ -1882,6 +1882,7 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) {
var c job.WorkflowSpec
c.ID = s.ID
c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow99", addr1) // insert with mismatched name
c.SpecType = job.YamlSpec
return mustInsertWFJob(t, o, &c)
},
},
Expand Down Expand Up @@ -1949,18 +1950,21 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) {
wfYaml1 := pkgworkflows.WFYamlSpec(t, "workflow00", addr1)
s1 := job.WorkflowSpec{
Workflow: wfYaml1,
SpecType: job.YamlSpec,
}
wantJobID1 := mustInsertWFJob(t, o, &s1)

wfYaml2 := pkgworkflows.WFYamlSpec(t, "workflow01", addr1)
s2 := job.WorkflowSpec{
Workflow: wfYaml2,
SpecType: job.YamlSpec,
}
wantJobID2 := mustInsertWFJob(t, o, &s2)

wfYaml3 := pkgworkflows.WFYamlSpec(t, "workflow00", addr2)
s3 := job.WorkflowSpec{
Workflow: wfYaml3,
SpecType: job.YamlSpec,
}
wantJobID3 := mustInsertWFJob(t, o, &s3)

Expand All @@ -1977,16 +1981,14 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) {
assert.EqualValues(t, j.WorkflowSpec.WorkflowID, s.WorkflowID)
assert.EqualValues(t, j.WorkflowSpec.WorkflowOwner, s.WorkflowOwner)
assert.EqualValues(t, j.WorkflowSpec.WorkflowName, s.WorkflowName)
assert.Equal(t, j.WorkflowSpec.SpecType, job.YamlSpec)
}
})
}

func mustInsertWFJob(t *testing.T, orm job.ORM, s *job.WorkflowSpec) int32 {
t.Helper()
sum := sha256.New()
sum.Write([]byte(s.Workflow))
s.WorkflowID = fmt.Sprintf("%x", sum.Sum(nil))
err := s.Validate()
err := s.Validate(testutils.Context(t))
require.NoError(t, err, "failed to validate spec %v", s)
ctx := testutils.Context(t)
_, err = toml.Marshal(s.Workflow)
Expand Down
27 changes: 25 additions & 2 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package job

import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
"gopkg.in/guregu/null.v4"

commonassets "github.com/smartcontractkit/chainlink-common/pkg/assets"
Expand Down Expand Up @@ -876,7 +878,9 @@ type WorkflowSpec struct {
WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow.
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `json:"spec_type"`
SpecType WorkflowSpecType `db:"spec_type"`
sdkWorkflow *sdk.WorkflowSpec
workflowCid *string

Check failure on line 883 in core/services/job/models.go

View workflow job for this annotation

GitHub Actions / lint

field `workflowCid` is unused (unused)
}

var (
Expand All @@ -889,11 +893,16 @@ const (
)

// Validate checks the workflow spec for correctness
func (w *WorkflowSpec) Validate() error {
func (w *WorkflowSpec) Validate(ctx context.Context) error {
s, err := pkgworkflows.ParseWorkflowSpecYaml(w.Workflow)
if err != nil {
return fmt.Errorf("%w: failed to parse workflow spec %s: %w", ErrInvalidWorkflowYAMLSpec, w.Workflow, err)
}

if _, err = w.SdkWorkflowSpec(ctx); err != nil {
return err
}

w.WorkflowOwner = strings.TrimPrefix(s.Owner, "0x") // the json schema validation ensures it is a hex string with 0x prefix, but the database does not store the prefix
w.WorkflowName = s.Name

Expand All @@ -904,6 +913,20 @@ func (w *WorkflowSpec) Validate() error {
return nil
}

func (w *WorkflowSpec) SdkWorkflowSpec(ctx context.Context) (sdk.WorkflowSpec, error) {
if w.sdkWorkflow != nil {
return *w.sdkWorkflow, nil
}

spec, cid, err := workflowSpecFactory.Spec(ctx, w.Workflow, []byte(w.Config), w.SpecType)
if err != nil {
return sdk.WorkflowSpec{}, err
}
w.sdkWorkflow = &spec
w.WorkflowID = cid
return spec, nil
}

type StandardCapabilitiesSpec struct {
ID int32
CreatedAt time.Time `toml:"-"`
Expand Down
9 changes: 3 additions & 6 deletions core/services/job/models_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package job

import (
"crypto/sha256"
_ "embed"
"fmt"
"reflect"
"testing"
"time"
Expand All @@ -13,6 +11,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/codec"
"github.com/smartcontractkit/chainlink-common/pkg/types"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -324,10 +324,7 @@ func TestWorkflowSpec_Validate(t *testing.T) {
w := &WorkflowSpec{
Workflow: tt.fields.Workflow,
}
sum := sha256.New()
sum.Write([]byte("test"))
w.WorkflowID = fmt.Sprintf("%x", sum.Sum(nil))
err := w.Validate()
err := w.Validate(testutils.Context(t))
require.Equal(t, tt.wantError, err != nil)
if !tt.wantError {
assert.NotEmpty(t, w.WorkflowID)
Expand Down
6 changes: 3 additions & 3 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
case Stream:
// 'stream' type has no associated spec, nothing to do here
case Workflow:
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW())
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at, spec_type)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW(), :spec_type)
RETURNING id;`
specID, err := tx.prepareQuerySpecID(ctx, sql, jb.WorkflowSpec)
if err != nil {
Expand Down Expand Up @@ -961,7 +961,7 @@ func (o *orm) FindJob(ctx context.Context, id int32) (jb Job, err error) {
return
}

// FindJobWithoutSpecErrors returns a job by ID, without loading Spec Errors preloaded
// FindJobWithoutSpecErrors returns a job by ID, without loading SpecVal Errors preloaded
func (o *orm) FindJobWithoutSpecErrors(ctx context.Context, id int32) (jb Job, err error) {
err = o.transact(ctx, true, func(tx *orm) error {
stmt := "SELECT jobs.*, job_pipeline_specs.pipeline_spec_id as pipeline_spec_id FROM jobs JOIN job_pipeline_specs ON (jobs.id = job_pipeline_specs.job_id) WHERE jobs.id = $1 LIMIT 1"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
package workflows
package job

import (
"context"
"crypto/sha256"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"

"github.com/smartcontractkit/chainlink/v2/core/services/job"
)

var ErrInvalidWorkflowType = errors.New("invalid workflow type")

type SDKWorkflowSpecFactory interface {
GetSpec(rawSpec, config []byte) (sdk.WorkflowSpec, error)
GetRawSpec(wf string) ([]byte, error)
Spec(ctx context.Context, rawSpec, config []byte) (sdk.WorkflowSpec, error)
RawSpec(ctx context.Context, wf string) ([]byte, error)
}

type WorkflowSpecFactory map[job.WorkflowSpecType]SDKWorkflowSpecFactory
type WorkflowSpecFactory map[WorkflowSpecType]SDKWorkflowSpecFactory

func (wsf WorkflowSpecFactory) ToSpec(workflow string, config []byte, tpe job.WorkflowSpecType) (sdk.WorkflowSpec, string, error) {
func (wsf WorkflowSpecFactory) Spec(
ctx context.Context, workflow string, config []byte, tpe WorkflowSpecType) (sdk.WorkflowSpec, string, error) {
if tpe == "" {
tpe = job.DefaultSpecType
tpe = DefaultSpecType
}

factory, ok := wsf[tpe]
if !ok {
return sdk.WorkflowSpec{}, "", ErrInvalidWorkflowType
}

rawSpec, err := factory.GetRawSpec(workflow)
rawSpec, err := factory.RawSpec(ctx, workflow)
if err != nil {
return sdk.WorkflowSpec{}, "", err
}

spec, err := factory.GetSpec(rawSpec, config)
spec, err := factory.Spec(ctx, rawSpec, config)
if err != nil {
return sdk.WorkflowSpec{}, "", err
}
Expand All @@ -47,5 +47,5 @@ func (wsf WorkflowSpecFactory) ToSpec(workflow string, config []byte, tpe job.Wo
}

var workflowSpecFactory = WorkflowSpecFactory{
job.YamlSpec: YAMLSpecFactory{},
YamlSpec: YAMLSpecFactory{},
}
105 changes: 105 additions & 0 deletions core/services/job/workflow_spec_factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package job_test

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"testing"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
)

func TestWorkflowSpecFactory_ToSpec(t *testing.T) {
t.Parallel()

anyData := "any data"
anyConfig := []byte("any config")
anySpec := sdk.WorkflowSpec{Name: "name", Owner: "owner"}

t.Run("delegates to factory and calculates CID", func(t *testing.T) {
runYamlSpecTest(t, anySpec, anyData, anyConfig, job.YamlSpec)
})

t.Run("delegates default", func(t *testing.T) {
runYamlSpecTest(t, anySpec, anyData, anyConfig, "")
})

t.Run("CID without config matches", func(t *testing.T) {
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, noConfig: true, SpecVal: anySpec},
}
results, cid, err := factory.Spec(testutils.Context(t), anyData, nil, job.YamlSpec)
require.NoError(t, err)

assert.Equal(t, anySpec, results)

sha256Hash := sha256.New()
sha256Hash.Write([]byte(anyData))
expectedCid := fmt.Sprintf("%x", sha256Hash.Sum(nil))
assert.Equal(t, expectedCid, cid)
})

t.Run("returns errors from sdk factory", func(t *testing.T) {
anyErr := errors.New("nope")
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, Err: anyErr},
}

_, _, err := factory.Spec(testutils.Context(t), anyData, anyConfig, job.YamlSpec)
assert.Equal(t, anyErr, err)
})

t.Run("returns an error if the type is not supported", func(t *testing.T) {
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, SpecVal: anySpec},
}

_, _, err := factory.Spec(testutils.Context(t), anyData, anyConfig, "unsupported")
assert.Error(t, err)
})
}

func runYamlSpecTest(t *testing.T, anySpec sdk.WorkflowSpec, anyData string, anyConfig []byte, specType job.WorkflowSpecType) {
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, SpecVal: anySpec},
}

results, cid, err := factory.Spec(testutils.Context(t), anyData, anyConfig, specType)

require.NoError(t, err)
assert.Equal(t, anySpec, results)

sha256Hash := sha256.New()
sha256Hash.Write([]byte(anyData))
sha256Hash.Write(anyConfig)
expectedCid := fmt.Sprintf("%x", sha256Hash.Sum(nil))
assert.Equal(t, expectedCid, cid)
}

type mockSdkSpecFactory struct {
t *testing.T
noConfig bool
SpecVal sdk.WorkflowSpec
Err error
}

func (f mockSdkSpecFactory) RawSpec(_ context.Context, wf string) ([]byte, error) {
return []byte(wf), nil
}

func (f mockSdkSpecFactory) Spec(_ context.Context, rawSpec, config []byte) (sdk.WorkflowSpec, error) {
assert.ElementsMatch(f.t, rawSpec, []byte("any data"))
if f.noConfig {
assert.Nil(f.t, config)
} else {
assert.ElementsMatch(f.t, config, []byte("any config"))
}

return f.SpecVal, f.Err
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package workflows
package job

import (
"context"

"github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
)
Expand All @@ -9,10 +11,10 @@ type YAMLSpecFactory struct{}

var _ SDKWorkflowSpecFactory = (*YAMLSpecFactory)(nil)

func (y YAMLSpecFactory) GetSpec(rawSpec, _ []byte) (sdk.WorkflowSpec, error) {
func (y YAMLSpecFactory) Spec(_ context.Context, rawSpec, _ []byte) (sdk.WorkflowSpec, error) {
return workflows.ParseWorkflowSpecYaml(string(rawSpec))
}

func (y YAMLSpecFactory) GetRawSpec(wf string) ([]byte, error) {
func (y YAMLSpecFactory) RawSpec(_ context.Context, wf string) ([]byte, error) {
return []byte(wf), nil
}
Loading

0 comments on commit aeed755

Please sign in to comment.