From 61391260340ba74f3510e6ded4fdace6829630b7 Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Wed, 22 May 2024 12:54:24 -0500 Subject: [PATCH] Pipeline Data Corruption (#13286) * Pipeline Data Corruption The unit test `TestDivide_Example` was acting flakey in the CI pipeline which suggested a flaw in the divide and multiply operations. When running the test, the expected result would be one of the input values or the division result in failure cases. This implied that results were either received out of order or were being sorted incorrectly. The pipeline runner does a final sort on the results, so that ruled out the received out of order possibility. On inspection of the sorting index on each task, every index was the zero value. This resulted in occasional correct and incorrect sorting, causing the test flake. To correct the problem, the test was updated such that the expected result has an index of `1`, leaving all other tasks with a `0` index. * fix test * updated changeset --- .changeset/blue-camels-begin.md | 5 +++ core/services/ocr/validate_test.go | 52 ++++++++++++++++++++++ core/services/pipeline/common.go | 19 ++++++++ core/services/pipeline/graph.go | 11 +++++ core/services/pipeline/graph_test.go | 12 ++--- core/services/pipeline/task.divide_test.go | 19 ++++---- 6 files changed, 103 insertions(+), 15 deletions(-) create mode 100644 .changeset/blue-camels-begin.md diff --git a/.changeset/blue-camels-begin.md b/.changeset/blue-camels-begin.md new file mode 100644 index 0000000000..3ad57286e9 --- /dev/null +++ b/.changeset/blue-camels-begin.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +enforce proper result indexing on pipeline results #breaking_change diff --git a/core/services/ocr/validate_test.go b/core/services/ocr/validate_test.go index 6e68559d09..59213c7168 100644 --- a/core/services/ocr/validate_test.go +++ b/core/services/ocr/validate_test.go @@ -27,6 +27,58 @@ func TestValidateOracleSpec(t *testing.T) { overrides func(c *chainlink.Config, s *chainlink.Secrets) assertion func(t *testing.T, os job.Job, err error) }{ + { + name: "invalid result sorting index", + toml: ` +ds1 [type=memo value=10000.1234]; +ds2 [type=memo value=100]; + +div_by_ds2 [type=divide divisor="$(ds2)"]; + +ds1 -> div_by_ds2 -> answer1; + +answer1 [type=multiply times=10000 index=-1]; +`, + assertion: func(t *testing.T, os job.Job, err error) { + require.Error(t, err) + }, + }, + { + name: "duplicate sorting indexes not allowed", + toml: ` +ds1 [type=memo value=10000.1234]; +ds2 [type=memo value=100]; + +div_by_ds2 [type=divide divisor="$(ds2)"]; + +ds1 -> div_by_ds2 -> answer1; +ds1 -> div_by_ds2 -> answer2; + +answer1 [type=multiply times=10000 index=0]; +answer2 [type=multiply times=10000 index=0]; +`, + assertion: func(t *testing.T, os job.Job, err error) { + require.Error(t, err) + }, + }, + { + name: "invalid result sorting index", + toml: ` +type = "offchainreporting" +schemaVersion = 1 +contractAddress = "0x613a38AC1659769640aaE063C651F48E0250454C" +isBootstrapPeer = false +observationSource = """ +ds1 [type=bridge name=voter_turnout]; +ds1_parse [type=jsonparse path="one,two"]; +ds1_multiply [type=multiply times=1.23]; +ds1 -> ds1_parse -> ds1_multiply -> answer1; +answer1 [type=median index=-1]; +"""`, + assertion: func(t *testing.T, os job.Job, err error) { + require.Error(t, err) + }, + }, { name: "minimal non-bootstrap oracle spec", toml: ` diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index a0fc28c686..5d843b8b91 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -415,9 +415,11 @@ func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, ID int, dotID return nil, pkgerrors.Errorf(`unknown task type: "%v"`, taskType) } + metadata := mapstructure.Metadata{} decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ Result: task, WeaklyTypedInput: true, + Metadata: &metadata, DecodeHook: mapstructure.ComposeDecodeHookFunc( mapstructure.StringToTimeDurationHookFunc(), func(from reflect.Type, to reflect.Type, data interface{}) (interface{}, error) { @@ -441,6 +443,23 @@ func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, ID int, dotID if err != nil { return nil, err } + + // valid explicit index values are 0-based + for _, key := range metadata.Keys { + if key == "index" { + if task.OutputIndex() < 0 { + return nil, errors.New("result sorting indexes should start with 0") + } + } + } + + // the 'unset' value should be -1 to allow explicit indexes to be 0-based + for _, key := range metadata.Unset { + if key == "index" { + task.Base().Index = -1 + } + } + return task, nil } diff --git a/core/services/pipeline/graph.go b/core/services/pipeline/graph.go index c3914e698c..12bec2bc8b 100644 --- a/core/services/pipeline/graph.go +++ b/core/services/pipeline/graph.go @@ -235,6 +235,8 @@ func Parse(text string) (*Pipeline, error) { // we need a temporary mapping of graph.IDs to positional ids after toposort ids := make(map[int64]int) + resultIdxs := make(map[int32]struct{}) + // use the new ordering as the id so that we can easily reproduce the original toposort for id, node := range nodes { node, is := node.(*GraphNode) @@ -251,6 +253,15 @@ func Parse(text string) (*Pipeline, error) { return nil, err } + if task.OutputIndex() > 0 { + _, exists := resultIdxs[task.OutputIndex()] + if exists { + return nil, errors.New("duplicate sorting indexes detected") + } + + resultIdxs[task.OutputIndex()] = struct{}{} + } + // re-link the edges for inputs := g.To(node.ID()); inputs.Next(); { isImplicitEdge := g.IsImplicitEdge(inputs.Node().ID(), node.ID()) diff --git a/core/services/pipeline/graph_test.go b/core/services/pipeline/graph_test.go index b3960bb1f4..c6248a38e2 100644 --- a/core/services/pipeline/graph_test.go +++ b/core/services/pipeline/graph_test.go @@ -171,27 +171,27 @@ func TestGraph_TasksInDependencyOrder(t *testing.T) { "ds1_multiply", []pipeline.TaskDependency{{PropagateResult: true, InputTask: pipeline.Task(ds1_parse)}}, []pipeline.Task{answer1}, - 0) + -1) ds2_multiply.BaseTask = pipeline.NewBaseTask( 5, "ds2_multiply", []pipeline.TaskDependency{{PropagateResult: true, InputTask: pipeline.Task(ds2_parse)}}, []pipeline.Task{answer1}, - 0) + -1) ds1_parse.BaseTask = pipeline.NewBaseTask( 1, "ds1_parse", []pipeline.TaskDependency{{PropagateResult: true, InputTask: pipeline.Task(ds1)}}, []pipeline.Task{ds1_multiply}, - 0) + -1) ds2_parse.BaseTask = pipeline.NewBaseTask( 4, "ds2_parse", []pipeline.TaskDependency{{PropagateResult: true, InputTask: pipeline.Task(ds2)}}, []pipeline.Task{ds2_multiply}, - 0) - ds1.BaseTask = pipeline.NewBaseTask(0, "ds1", nil, []pipeline.Task{ds1_parse}, 0) - ds2.BaseTask = pipeline.NewBaseTask(3, "ds2", nil, []pipeline.Task{ds2_parse}, 0) + -1) + ds1.BaseTask = pipeline.NewBaseTask(0, "ds1", nil, []pipeline.Task{ds1_parse}, -1) + ds2.BaseTask = pipeline.NewBaseTask(3, "ds2", nil, []pipeline.Task{ds2_parse}, -1) for i, task := range p.Tasks { // Make sure inputs appear before the task, and outputs don't diff --git a/core/services/pipeline/task.divide_test.go b/core/services/pipeline/task.divide_test.go index 3c2f57c7a0..8eb8e4de06 100644 --- a/core/services/pipeline/task.divide_test.go +++ b/core/services/pipeline/task.divide_test.go @@ -3,6 +3,7 @@ package pipeline_test import ( "fmt" "math" + "reflect" "testing" "github.com/pkg/errors" @@ -198,19 +199,17 @@ func TestDivideTask_Overflow(t *testing.T) { } func TestDivide_Example(t *testing.T) { - testutils.SkipFlakey(t, "BCF-3236") t.Parallel() dag := ` -ds1 [type=memo value=10000.1234] +ds1 [type=memo value=10000.1234]; +ds2 [type=memo value=100]; -ds2 [type=memo value=100] +div_by_ds2 [type=divide divisor="$(ds2)"]; +multiply [type=multiply times=10000 index=0]; -div_by_ds2 [type=divide divisor="$(ds2)"] +ds1 -> div_by_ds2 -> multiply; -multiply [type=multiply times=10000 index=0] - -ds1->div_by_ds2->multiply; ` db := pgtest.NewSqlxDB(t) @@ -223,12 +222,14 @@ ds1->div_by_ds2->multiply; lggr := logger.TestLogger(t) _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars, lggr) - require.NoError(t, err) + require.NoError(t, err) require.Len(t, trrs, 4) finalResult := trrs[3] - assert.Nil(t, finalResult.Result.Error) + require.NoError(t, finalResult.Result.Error) + require.Equal(t, reflect.TypeOf(decimal.Decimal{}), reflect.TypeOf(finalResult.Result.Value)) + assert.Equal(t, "1000012.34", finalResult.Result.Value.(decimal.Decimal).String()) }