From 0be520a1e91bbe24a587d38a4f362bb181f19dbe Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Thu, 8 Dec 2022 14:53:42 +0200 Subject: [PATCH] Remove the unneccessary execution.Scheduler.Init() There is no need to have 2 separate methods, Run() can start the VU initialization itself. As a bonus, this immediately makes the error handling around init errors much more in line with other error handling, allowing us to respect --linger and to try and execute handleSummary() if there were problems. Except the first init that is used to get the exported options, of course, that one is still special. --- cmd/tests/cmd_test.go | 54 +++++++++----------- core/engine.go | 5 -- core/engine_test.go | 51 ------------------- execution/scheduler.go | 87 ++++++++++++++++----------------- execution/scheduler_ext_test.go | 22 ++------- 5 files changed, 69 insertions(+), 150 deletions(-) diff --git a/cmd/tests/cmd_test.go b/cmd/tests/cmd_test.go index a49db5debd1..8159933ac93 100644 --- a/cmd/tests/cmd_test.go +++ b/cmd/tests/cmd_test.go @@ -978,11 +978,18 @@ func TestAbortedByTestAbortInNonFirstInitCode(t *testing.T) { export default function () {}; - // Should not be called, since error is in the init context export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} ` - testAbortedByScriptTestAbort(t, false, script, runTestWithNoLinger) + t.Run("noLinger", func(t *testing.T) { + t.Parallel() + testAbortedByScriptTestAbort(t, script, runTestWithNoLinger) + }) + + t.Run("withLinger", func(t *testing.T) { + t.Parallel() + testAbortedByScriptTestAbort(t, script, runTestWithLinger) + }) } func TestAbortedByScriptAbortInVUCode(t *testing.T) { @@ -997,12 +1004,12 @@ func TestAbortedByScriptAbortInVUCode(t *testing.T) { t.Run("noLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger) + testAbortedByScriptTestAbort(t, script, runTestWithNoLinger) }) t.Run("withLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithLinger) + testAbortedByScriptTestAbort(t, script, runTestWithLinger) }) } @@ -1021,12 +1028,12 @@ func TestAbortedByScriptAbortInVUCodeInGroup(t *testing.T) { t.Run("noLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger) + testAbortedByScriptTestAbort(t, script, runTestWithNoLinger) }) t.Run("withLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithLinger) + testAbortedByScriptTestAbort(t, script, runTestWithLinger) }) } @@ -1043,12 +1050,12 @@ func TestAbortedByScriptAbortInSetup(t *testing.T) { t.Run("noLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger) + testAbortedByScriptTestAbort(t, script, runTestWithNoLinger) }) t.Run("withLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithLinger) + testAbortedByScriptTestAbort(t, script, runTestWithLinger) }) } @@ -1065,18 +1072,16 @@ func TestAbortedByScriptAbortInTeardown(t *testing.T) { t.Run("noLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithNoLinger) + testAbortedByScriptTestAbort(t, script, runTestWithNoLinger) }) t.Run("withLinger", func(t *testing.T) { t.Parallel() - testAbortedByScriptTestAbort(t, true, script, runTestWithLinger) + testAbortedByScriptTestAbort(t, script, runTestWithLinger) }) } -func testAbortedByScriptTestAbort( - t *testing.T, shouldHaveMetrics bool, script string, runTest func(*testing.T, *GlobalTestState), -) *GlobalTestState { //nolint:unparam +func testAbortedByScriptTestAbort(t *testing.T, script string, runTest func(*testing.T, *GlobalTestState)) { ts := getSimpleCloudOutputTestState( t, script, nil, cloudapi.RunStatusAbortedUser, cloudapi.ResultStatusPassed, exitcodes.ScriptAborted, ) @@ -1087,13 +1092,8 @@ func testAbortedByScriptTestAbort( assert.Contains(t, stdout, "test aborted: foo") assert.Contains(t, stdout, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`) assert.Contains(t, stdout, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) - if shouldHaveMetrics { - assert.Contains(t, stdout, `level=debug msg="Metrics processing finished!"`) - assert.Contains(t, stdout, "bogus summary") - } else { - assert.NotContains(t, stdout, "bogus summary") - } - return ts + assert.Contains(t, stdout, `level=debug msg="Metrics processing finished!"`) + assert.Contains(t, stdout, "bogus summary") } func TestAbortedByInterruptDuringVUInit(t *testing.T) { @@ -1112,14 +1112,8 @@ func TestAbortedByInterruptDuringVUInit(t *testing.T) { export default function () {}; ` - - // TODO: fix this to exect lib.RunStatusAbortedUser and - // exitcodes.ExternalAbort - // - // This is testing the current behavior, which is expected, but it's not - // actually the desired one! See https://github.com/grafana/k6/issues/2804 ts := getSimpleCloudOutputTestState( - t, script, nil, cloudapi.RunStatusAbortedSystem, cloudapi.ResultStatusPassed, exitcodes.GenericEngine, + t, script, nil, cloudapi.RunStatusAbortedUser, cloudapi.ResultStatusPassed, exitcodes.ExternalAbort, ) asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "VU init sleeping for a while") cmd.ExecuteWithGlobalState(ts.GlobalState) @@ -1129,10 +1123,8 @@ func TestAbortedByInterruptDuringVUInit(t *testing.T) { assert.Contains(t, stdOut, `level=debug msg="Stopping k6 in response to signal..." sig=interrupt`) assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) - - // TODO: same as above, fix expected error message and run_status to 5 - assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=6 tainted=false`) - assert.Contains(t, stdOut, `level=error msg="context canceled`) + assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`) + assert.Contains(t, stdOut, `level=error msg="test run was aborted because k6 received a 'interrupt' signal"`) } func TestAbortedByScriptInitError(t *testing.T) { diff --git a/core/engine.go b/core/engine.go index 5e7e1e75c57..ef8366f12c6 100644 --- a/core/engine.go +++ b/core/engine.go @@ -95,11 +95,6 @@ func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []o // - The second returned lambda can be used to wait for that process to finish. func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait func(), err error) { e.logger.Debug("Initialization starting...") - // TODO: if we ever need metrics processing in the init context, we can move - // this below the other components... or even start them concurrently? - if err := e.ExecutionScheduler.Init(runCtx, e.Samples); err != nil { - return nil, nil, err - } // TODO: move all of this in a separate struct? see main TODO above processMetricsAfterRun := make(chan struct{}) diff --git a/core/engine_test.go b/core/engine_test.go index 2152a8c1aec..5dfa333b999 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -893,57 +893,6 @@ func TestSetupException(t *testing.T) { } } -// TODO: delete when implementing https://github.com/grafana/k6/issues/1889, the -// test functionality was duplicated in cmd/integration_test.go -func TestVuInitException(t *testing.T) { - t.Parallel() - - script := []byte(` - export let options = { - vus: 3, - iterations: 5, - }; - - export default function() {}; - - if (__VU == 2) { - throw new Error('oops in ' + __VU); - } - `) - - piState := getTestPreInitState(t) - runner, err := js.New( - piState, - &loader.SourceData{URL: &url.URL{Scheme: "file", Path: "/script.js"}, Data: script}, - nil, - ) - require.NoError(t, err) - - opts, err := executor.DeriveScenariosFromShortcuts(runner.GetOptions(), nil) - require.NoError(t, err) - - testState := getTestRunState(t, piState, opts, runner) - - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := NewEngine(testState, execScheduler, nil) - require.NoError(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - _, _, err = engine.Init(ctx, ctx) // no need for 2 different contexts - - require.Error(t, err) - - var exception errext.Exception - require.ErrorAs(t, err, &exception) - assert.Equal(t, "Error: oops in 2\n\tat file:///script.js:10:9(29)\n", err.Error()) - - var errWithHint errext.HasHint - require.ErrorAs(t, err, &errWithHint) - assert.Equal(t, "error while initializing VU #2 (script exception)", errWithHint.Hint()) -} - func TestEmittedMetricsWhenScalingDown(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) diff --git a/execution/scheduler.go b/execution/scheduler.go index 2ebb442bc50..32507e938e8 100644 --- a/execution/scheduler.go +++ b/execution/scheduler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime" + "sync" "sync/atomic" "time" @@ -26,10 +27,6 @@ type Scheduler struct { maxDuration time.Duration // cached value derived from the execution plan maxPossibleVUs uint64 // cached value derived from the execution plan state *lib.ExecutionState - - // TODO: remove these when we don't have separate Init() and Run() methods - // and can use a context + a WaitGroup (or something like that) - stopVUsEmission, vusEmissionStopped chan struct{} } // NewScheduler creates and returns a new Scheduler instance, without @@ -84,9 +81,6 @@ func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) { maxDuration: maxDuration, maxPossibleVUs: maxPossibleVUs, state: executionState, - - stopVUsEmission: make(chan struct{}), - vusEmissionStopped: make(chan struct{}), }, nil } @@ -199,9 +193,11 @@ func (e *Scheduler) initVUsConcurrently( return doneInits } -func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) { +func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) func() { e.state.Test.Logger.Debug("Starting emission of VUs and VUsMax metrics...") tags := e.state.Test.RunTags + wg := &sync.WaitGroup{} + wg.Add(1) emitMetrics := func() { t := time.Now() @@ -234,7 +230,7 @@ func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.Sam defer func() { ticker.Stop() e.state.Test.Logger.Debug("Metrics emission of VUs and VUsMax metrics stopped") - close(e.vusEmissionStopped) + wg.Done() }() for { @@ -243,23 +239,17 @@ func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.Sam emitMetrics() case <-ctx.Done(): return - case <-e.stopVUsEmission: - return } } }() + + return wg.Wait } -// Init concurrently initializes all of the planned VUs and then sequentially -// initializes all of the configured executors. -func (e *Scheduler) Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { - e.emitVUsAndVUsMax(ctx, samplesOut) - defer func() { - if err != nil { - close(e.stopVUsEmission) - <-e.vusEmissionStopped - } - }() +// initVusAndExecutors concurrently initializes all of the planned VUs and then +// sequentially initializes all of the configured executors. +func (e *Scheduler) initVusAndExecutors(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { + e.initProgress.Modify(pb.WithConstProgress(0, "Init VUs...")) logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init") vusToInitialize := lib.GetMaxPlannedVUs(e.executionPlan) @@ -386,23 +376,27 @@ func (e *Scheduler) runExecutor( // out channel. // //nolint:funlen -func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metrics.SampleContainer) error { - defer func() { - close(e.stopVUsEmission) - <-e.vusEmissionStopped - }() - - executorsCount := len(e.executors) +func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run") - e.initProgress.Modify(pb.WithConstLeft("Run")) - var interrupted bool + + execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx) + waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut) + defer waitForVUsMetricPush() + defer execSchedRunCancel() + defer func() { - e.state.MarkEnded() - if interrupted { + if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil { + logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err) e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted) + err = interruptErr } }() + if err := e.initVusAndExecutors(execSchedRunCtx, samplesOut); err != nil { + return err + } + + e.initProgress.Modify(pb.WithConstLeft("Run")) if e.state.IsPaused() { logger.Debug("Execution is paused, waiting for resume or interrupt...") e.state.SetExecutionStatus(lib.ExecutionStatusPausedBeforeRun) @@ -410,27 +404,31 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr select { case <-e.state.ResumeNotify(): // continue - case <-runCtx.Done(): + case <-execSchedRunCtx.Done(): return nil } } + e.initProgress.Modify(pb.WithConstProgress(1, "Starting test...")) e.state.MarkStarted() + defer e.state.MarkEnded() e.initProgress.Modify(pb.WithConstProgress(1, "running")) + executorsCount := len(e.executors) logger.WithFields(logrus.Fields{"executorsCount": executorsCount}).Debugf("Start of test run") runResults := make(chan error, executorsCount) // nil values are successful runs - runCtx = lib.WithExecutionState(runCtx, e.state) - runSubCtx, cancel := context.WithCancel(runCtx) - defer cancel() // just in case, and to shut up go vet... + // TODO: get rid of this context, pass the e.state directly to VUs when they + // are initialized by e.initVusAndExecutors(). This will also give access to + // its properties in their init context executions. + withExecStateCtx := lib.WithExecutionState(execSchedRunCtx, e.state) // Run setup() before any executors, if it's not disabled if !e.state.Test.Options.NoSetup.Bool { e.state.SetExecutionStatus(lib.ExecutionStatusSetup) e.initProgress.Modify(pb.WithConstProgress(1, "setup()")) - if err := e.state.Test.Runner.Setup(runSubCtx, engineOut); err != nil { + if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil { logger.WithField("error", err).Debug("setup() aborted by error") return err } @@ -441,8 +439,10 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr logger.Debug("Start all executors...") e.state.SetExecutionStatus(lib.ExecutionStatusRunning) + executorsRunCtx, executorsRunCancel := context.WithCancel(withExecStateCtx) + defer executorsRunCancel() for _, exec := range e.executors { - go e.runExecutor(runSubCtx, runResults, engineOut, exec) + go e.runExecutor(executorsRunCtx, runResults, samplesOut, exec) } // Wait for all executors to finish @@ -452,7 +452,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr if err != nil && firstErr == nil { logger.WithError(err).Debug("Executor returned with an error, cancelling test run...") firstErr = err - cancel() + executorsRunCancel() } } @@ -462,16 +462,13 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr e.initProgress.Modify(pb.WithConstProgress(1, "teardown()")) // We run teardown() with the global context, so it isn't interrupted by - // aborts caused by thresholds or even Ctrl+C (unless used twice). - if err := e.state.Test.Runner.Teardown(globalCtx, engineOut); err != nil { + // thresholds or test.abort() or even Ctrl+C (unless used twice). + if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil { logger.WithField("error", err).Debug("teardown() aborted by error") return err } } - if err := GetCancelReasonIfTestAborted(runSubCtx); err != nil { - interrupted = true - return err - } + return firstErr } diff --git a/execution/scheduler_ext_test.go b/execution/scheduler_ext_test.go index 43dfc9b69ac..6e9f37c048c 100644 --- a/execution/scheduler_ext_test.go +++ b/execution/scheduler_ext_test.go @@ -88,8 +88,6 @@ func newTestScheduler( } }() - require.NoError(t, execScheduler.Init(ctx, samples)) - return ctx, cancel, execScheduler, samples } @@ -107,9 +105,9 @@ func TestSchedulerRunNonDefault(t *testing.T) { t.Parallel() testCases := []struct { - name, script, expErr string + name, script string }{ - {"defaultOK", `export default function () {}`, ""}, + {"defaultOK", `export default function () {}`}, {"nonDefaultOK", ` export let options = { scenarios: { @@ -121,7 +119,7 @@ func TestSchedulerRunNonDefault(t *testing.T) { }, } } - export function nonDefault() {}`, ""}, + export function nonDefault() {}`}, } for _, tc := range testCases { @@ -146,13 +144,7 @@ func TestSchedulerRunNonDefault(t *testing.T) { done := make(chan struct{}) samples := make(chan metrics.SampleContainer) go func() { - err := execScheduler.Init(ctx, samples) - if tc.expErr != "" { - assert.EqualError(t, err, tc.expErr) - } else { - assert.NoError(t, err) - assert.NoError(t, execScheduler.Run(ctx, ctx, samples)) - } + assert.NoError(t, execScheduler.Run(ctx, ctx, samples)) close(done) }() for { @@ -263,7 +255,6 @@ func TestSchedulerRunEnv(t *testing.T) { done := make(chan struct{}) samples := make(chan metrics.SampleContainer) go func() { - assert.NoError(t, execScheduler.Init(ctx, samples)) assert.NoError(t, execScheduler.Run(ctx, ctx, samples)) close(done) }() @@ -333,7 +324,6 @@ func TestSchedulerSystemTags(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) - require.NoError(t, execScheduler.Init(ctx, samples)) require.NoError(t, execScheduler.Run(ctx, ctx, samples)) }() @@ -464,7 +454,6 @@ func TestSchedulerRunCustomTags(t *testing.T) { samples := make(chan metrics.SampleContainer) go func() { defer close(done) - require.NoError(t, execScheduler.Init(ctx, samples)) require.NoError(t, execScheduler.Run(ctx, ctx, samples)) }() var gotTrailTag, gotNetTrailTag bool @@ -626,7 +615,6 @@ func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) { samples := make(chan metrics.SampleContainer) go func() { - assert.NoError(t, execScheduler.Init(ctx, samples)) assert.NoError(t, execScheduler.Run(ctx, ctx, samples)) close(samples) }() @@ -959,7 +947,6 @@ func TestSchedulerEndIterations(t *testing.T) { require.NoError(t, err) samples := make(chan metrics.SampleContainer, 300) - require.NoError(t, execScheduler.Init(ctx, samples)) require.NoError(t, execScheduler.Run(ctx, ctx, samples)) assert.Equal(t, uint64(100), execScheduler.GetState().GetFullIterationCount()) @@ -1170,7 +1157,6 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { done := make(chan struct{}) sampleContainers := make(chan metrics.SampleContainer) go func() { - require.NoError(t, execScheduler.Init(ctx, sampleContainers)) assert.NoError(t, execScheduler.Run(ctx, ctx, sampleContainers)) close(done) }()