Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delegate workflow spec creation #14365

Merged
merged 16 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/prometheus/client_golang v1.20.0
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907ce
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240916150342-36cb47701edf
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7
github.com/spf13/cobra v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1083,8 +1083,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240916150615-85b8aa5fa7e6 h1:cbHlV2CSphQ+ghDye21M8ym0aAO/Y649H2Mg60M2AuE=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240916150615-85b8aa5fa7e6/go.mod h1:Lv77O13ZxOdmlvnu2vaUC0Lg+t3JAL+N+9K8dRsgmDI=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907ce h1:qXS0aWiDFDoLRCB+kSGnzp77iYT2luflUyzE5BnNmpY=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907ce/go.mod h1:sjiiPwd4KsYOCf68MwL86EKphdXeT66EY7j53WH5DCc=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240916150342-36cb47701edf h1:1AlTUkT5D8HmvU9bwDoIN54/EFyOnRBl7gnXZVrYXEA=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240916150342-36cb47701edf/go.mod h1:l8NTByXUdGGJX+vyKYI6yX1/HIpM14F8Wm9BkU3Q4Qo=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc h1:tRmTlaoAt+7FakMXXgeCuRPmzzBo5jsGpeCVvcU6KMc=
Expand Down
2 changes: 1 addition & 1 deletion core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error
case job.FluxMonitor:
js, err = fluxmonitorv2.ValidatedFluxMonitorSpec(s.jobCfg, spec)
case job.Workflow:
js, err = workflows.ValidatedWorkflowJobSpec(spec)
js, err = workflows.ValidatedWorkflowJobSpec(ctx, spec)
case job.CCIP:
js, err = ccip.ValidatedCCIPSpec(spec)
default:
Expand Down
8 changes: 7 additions & 1 deletion core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,7 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) {
spec: &job.WorkflowSpec{
ID: 1,
Workflow: pkgworkflows.WFYamlSpec(t, "workflow01", addr1),
SpecType: job.YamlSpec,
},
before: mustInsertWFJob,
},
Expand All @@ -1881,6 +1882,7 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) {
var c job.WorkflowSpec
c.ID = s.ID
c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow99", addr1) // insert with mismatched name
c.SpecType = job.YamlSpec
return mustInsertWFJob(t, o, &c)
},
},
Expand Down Expand Up @@ -1948,18 +1950,21 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) {
wfYaml1 := pkgworkflows.WFYamlSpec(t, "workflow00", addr1)
s1 := job.WorkflowSpec{
Workflow: wfYaml1,
SpecType: job.YamlSpec,
}
wantJobID1 := mustInsertWFJob(t, o, &s1)

wfYaml2 := pkgworkflows.WFYamlSpec(t, "workflow01", addr1)
s2 := job.WorkflowSpec{
Workflow: wfYaml2,
SpecType: job.YamlSpec,
}
wantJobID2 := mustInsertWFJob(t, o, &s2)

wfYaml3 := pkgworkflows.WFYamlSpec(t, "workflow00", addr2)
s3 := job.WorkflowSpec{
Workflow: wfYaml3,
SpecType: job.YamlSpec,
}
wantJobID3 := mustInsertWFJob(t, o, &s3)

Expand All @@ -1976,13 +1981,14 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) {
assert.EqualValues(t, j.WorkflowSpec.WorkflowID, s.WorkflowID)
assert.EqualValues(t, j.WorkflowSpec.WorkflowOwner, s.WorkflowOwner)
assert.EqualValues(t, j.WorkflowSpec.WorkflowName, s.WorkflowName)
assert.Equal(t, j.WorkflowSpec.SpecType, job.YamlSpec)
}
})
}

func mustInsertWFJob(t *testing.T, orm job.ORM, s *job.WorkflowSpec) int32 {
t.Helper()
err := s.Validate()
err := s.Validate(testutils.Context(t))
require.NoError(t, err, "failed to validate spec %v", s)
ctx := testutils.Context(t)
_, err = toml.Marshal(s.Workflow)
Expand Down
48 changes: 40 additions & 8 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package job

import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
Expand All @@ -14,9 +15,12 @@ import (
"github.com/pkg/errors"
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"

commonassets "github.com/smartcontractkit/chainlink-common/pkg/assets"
"github.com/smartcontractkit/chainlink-common/pkg/types"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"

"github.com/smartcontractkit/chainlink/v2/core/services/relay"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
Expand Down Expand Up @@ -857,16 +861,26 @@ type LiquidityBalancerSpec struct {
LiquidityBalancerConfig string `toml:"liquidityBalancerConfig" db:"liquidity_balancer_config"`
}

type WorkflowSpecType string

const (
YamlSpec WorkflowSpecType = "yaml"
DefaultSpecType = YamlSpec
)

type WorkflowSpec struct {
ID int32 `toml:"-"`
Workflow string `toml:"workflow"` // the yaml representation of the workflow
Workflow string `toml:"workflow"` // the raw representation of the workflow
Config string `toml:"config"` // the raw representation of the config
// fields derived from the yaml spec, used for indexing the database
// note: i tried to make these private, but translating them to the database seems to require them to be public
WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow.
WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow.
WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow.
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow.
WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow.
WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow.
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `db:"spec_type"`
sdkWorkflow *sdk.WorkflowSpec
}

var (
Expand All @@ -879,14 +893,18 @@ const (
)

// Validate checks the workflow spec for correctness
func (w *WorkflowSpec) Validate() error {
func (w *WorkflowSpec) Validate(ctx context.Context) error {
s, err := pkgworkflows.ParseWorkflowSpecYaml(w.Workflow)
if err != nil {
return fmt.Errorf("%w: failed to parse workflow spec %s: %w", ErrInvalidWorkflowYAMLSpec, w.Workflow, err)
}

if _, err = w.SdkWorkflowSpec(ctx); err != nil {
return err
}

w.WorkflowOwner = strings.TrimPrefix(s.Owner, "0x") // the json schema validation ensures it is a hex string with 0x prefix, but the database does not store the prefix
w.WorkflowName = s.Name
w.WorkflowID = s.CID()
nolag marked this conversation as resolved.
Show resolved Hide resolved

if len(w.WorkflowID) != workflowIDLen {
return fmt.Errorf("%w: incorrect length for id %s: expected %d, got %d", ErrInvalidWorkflowID, w.WorkflowID, workflowIDLen, len(w.WorkflowID))
Expand All @@ -895,6 +913,20 @@ func (w *WorkflowSpec) Validate() error {
return nil
}

func (w *WorkflowSpec) SdkWorkflowSpec(ctx context.Context) (sdk.WorkflowSpec, error) {
nolag marked this conversation as resolved.
Show resolved Hide resolved
if w.sdkWorkflow != nil {
return *w.sdkWorkflow, nil
}

spec, cid, err := workflowSpecFactory.Spec(ctx, w.Workflow, []byte(w.Config), w.SpecType)
if err != nil {
return sdk.WorkflowSpec{}, err
}
w.sdkWorkflow = &spec
w.WorkflowID = cid
return spec, nil
}

type StandardCapabilitiesSpec struct {
ID int32
CreatedAt time.Time `toml:"-"`
Expand Down
4 changes: 3 additions & 1 deletion core/services/job/models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/codec"
"github.com/smartcontractkit/chainlink-common/pkg/types"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -322,7 +324,7 @@ func TestWorkflowSpec_Validate(t *testing.T) {
w := &WorkflowSpec{
Workflow: tt.fields.Workflow,
}
err := w.Validate()
err := w.Validate(testutils.Context(t))
require.Equal(t, tt.wantError, err != nil)
if !tt.wantError {
assert.NotEmpty(t, w.WorkflowID)
Expand Down
6 changes: 3 additions & 3 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
case Stream:
// 'stream' type has no associated spec, nothing to do here
case Workflow:
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW())
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at, spec_type)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW(), :spec_type)
RETURNING id;`
specID, err := tx.prepareQuerySpecID(ctx, sql, jb.WorkflowSpec)
if err != nil {
Expand Down Expand Up @@ -961,7 +961,7 @@ func (o *orm) FindJob(ctx context.Context, id int32) (jb Job, err error) {
return
}

// FindJobWithoutSpecErrors returns a job by ID, without loading Spec Errors preloaded
// FindJobWithoutSpecErrors returns a job by ID, without loading SpecVal Errors preloaded
func (o *orm) FindJobWithoutSpecErrors(ctx context.Context, id int32) (jb Job, err error) {
err = o.transact(ctx, true, func(tx *orm) error {
stmt := "SELECT jobs.*, job_pipeline_specs.pipeline_spec_id as pipeline_spec_id FROM jobs JOIN job_pipeline_specs ON (jobs.id = job_pipeline_specs.job_id) WHERE jobs.id = $1 LIMIT 1"
Expand Down
51 changes: 51 additions & 0 deletions core/services/job/workflow_spec_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package job

import (
"context"
"crypto/sha256"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
)

var ErrInvalidWorkflowType = errors.New("invalid workflow type")

type SDKWorkflowSpecFactory interface {
Spec(ctx context.Context, rawSpec, config []byte) (sdk.WorkflowSpec, error)
RawSpec(ctx context.Context, wf string) ([]byte, error)
}

type WorkflowSpecFactory map[WorkflowSpecType]SDKWorkflowSpecFactory

func (wsf WorkflowSpecFactory) Spec(
ctx context.Context, workflow string, config []byte, tpe WorkflowSpecType) (sdk.WorkflowSpec, string, error) {
if tpe == "" {
tpe = DefaultSpecType
}

factory, ok := wsf[tpe]
if !ok {
return sdk.WorkflowSpec{}, "", ErrInvalidWorkflowType
}

rawSpec, err := factory.RawSpec(ctx, workflow)
if err != nil {
return sdk.WorkflowSpec{}, "", err
}

spec, err := factory.Spec(ctx, rawSpec, config)
if err != nil {
return sdk.WorkflowSpec{}, "", err
}

sum := sha256.New()
sum.Write(rawSpec)
sum.Write(config)

return spec, fmt.Sprintf("%x", sum.Sum(nil)), nil
}

var workflowSpecFactory = WorkflowSpecFactory{
YamlSpec: YAMLSpecFactory{},
}
106 changes: 106 additions & 0 deletions core/services/job/workflow_spec_factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package job_test

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
)

func TestWorkflowSpecFactory_ToSpec(t *testing.T) {
t.Parallel()

anyData := "any data"
anyConfig := []byte("any config")
anySpec := sdk.WorkflowSpec{Name: "name", Owner: "owner"}

t.Run("delegates to factory and calculates CID", func(t *testing.T) {
runYamlSpecTest(t, anySpec, anyData, anyConfig, job.YamlSpec)
})

t.Run("delegates default", func(t *testing.T) {
runYamlSpecTest(t, anySpec, anyData, anyConfig, "")
})

t.Run("CID without config matches", func(t *testing.T) {
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, noConfig: true, SpecVal: anySpec},
}
results, cid, err := factory.Spec(testutils.Context(t), anyData, nil, job.YamlSpec)
require.NoError(t, err)

assert.Equal(t, anySpec, results)

sha256Hash := sha256.New()
sha256Hash.Write([]byte(anyData))
expectedCid := fmt.Sprintf("%x", sha256Hash.Sum(nil))
assert.Equal(t, expectedCid, cid)
})

t.Run("returns errors from sdk factory", func(t *testing.T) {
anyErr := errors.New("nope")
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, Err: anyErr},
}

_, _, err := factory.Spec(testutils.Context(t), anyData, anyConfig, job.YamlSpec)
assert.Equal(t, anyErr, err)
})

t.Run("returns an error if the type is not supported", func(t *testing.T) {
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, SpecVal: anySpec},
}

_, _, err := factory.Spec(testutils.Context(t), anyData, anyConfig, "unsupported")
assert.Error(t, err)
})
}

func runYamlSpecTest(t *testing.T, anySpec sdk.WorkflowSpec, anyData string, anyConfig []byte, specType job.WorkflowSpecType) {
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, SpecVal: anySpec},
}

results, cid, err := factory.Spec(testutils.Context(t), anyData, anyConfig, specType)

require.NoError(t, err)
assert.Equal(t, anySpec, results)

sha256Hash := sha256.New()
sha256Hash.Write([]byte(anyData))
sha256Hash.Write(anyConfig)
expectedCid := fmt.Sprintf("%x", sha256Hash.Sum(nil))
assert.Equal(t, expectedCid, cid)
}

type mockSdkSpecFactory struct {
t *testing.T
noConfig bool
SpecVal sdk.WorkflowSpec
Err error
}

func (f mockSdkSpecFactory) RawSpec(_ context.Context, wf string) ([]byte, error) {
return []byte(wf), nil
}

func (f mockSdkSpecFactory) Spec(_ context.Context, rawSpec, config []byte) (sdk.WorkflowSpec, error) {
assert.ElementsMatch(f.t, rawSpec, []byte("any data"))
if f.noConfig {
assert.Nil(f.t, config)
} else {
assert.ElementsMatch(f.t, config, []byte("any config"))
}

return f.SpecVal, f.Err
}
20 changes: 20 additions & 0 deletions core/services/job/yaml_spec_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package job

import (
"context"

"github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
)

type YAMLSpecFactory struct{}

var _ SDKWorkflowSpecFactory = (*YAMLSpecFactory)(nil)

func (y YAMLSpecFactory) Spec(_ context.Context, rawSpec, _ []byte) (sdk.WorkflowSpec, error) {
return workflows.ParseWorkflowSpecYaml(string(rawSpec))
}

func (y YAMLSpecFactory) RawSpec(_ context.Context, wf string) ([]byte, error) {
return []byte(wf), nil
}
Loading
Loading