Skip to content

Commit

Permalink
Remove the core.Engine, finally 🎉
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Jan 16, 2023
1 parent cc38a6d commit 8324c42
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 955 deletions.
26 changes: 14 additions & 12 deletions api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/loader"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"go.k6.io/k6/output"
)

func TestSetupData(t *testing.T) {
Expand Down Expand Up @@ -138,31 +140,30 @@ func TestSetupData(t *testing.T) {

execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState())
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
t.Cleanup(globalCancel)
defer globalCancel()
runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger)
defer runAbort(fmt.Errorf("unexpected abort"))

require.NoError(t, engine.OutputManager.StartOutputs())
defer engine.OutputManager.StopOutputs(nil)
outputManager := output.NewManager([]output.Output{metricsEngine.CreateIngester()}, testState.Logger, runAbort)
samples := make(chan metrics.SampleContainer, 1000)
_, stopOutputs, err := outputManager.Start(samples)
require.NoError(t, err)
defer stopOutputs(nil)

cs := &ControlSurface{
RunCtx: runCtx,
Samples: engine.Samples,
MetricsEngine: engine.MetricsEngine,
Samples: samples,
MetricsEngine: metricsEngine,
Scheduler: execScheduler,
RunState: testState,
}
run, wait, err := engine.Init(globalCtx, runCtx)
require.NoError(t, err)

defer wait()

errC := make(chan error)
go func() { errC <- run() }()
go func() { errC <- execScheduler.Run(globalCtx, runCtx, samples) }()

handler := NewHandler(cs)

Expand Down Expand Up @@ -194,6 +195,7 @@ func TestSetupData(t *testing.T) {
case <-time.After(10 * time.Second):
t.Fatal("Test timed out")
case err := <-errC:
close(samples)
require.NoError(t, err)
}
})
Expand Down
31 changes: 17 additions & 14 deletions api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"go.k6.io/k6/output"
)

func TestGetStatus(t *testing.T) {
Expand Down Expand Up @@ -111,39 +113,40 @@ func TestPatchStatus(t *testing.T) {
testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{})
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
require.NoError(t, err)

require.NoError(t, engine.OutputManager.StartOutputs())
defer engine.OutputManager.StopOutputs(nil)
metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState())
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
t.Cleanup(globalCancel)
defer globalCancel()
runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger)
defer runAbort(fmt.Errorf("unexpected abort"))
engine.AbortFn = runAbort

outputManager := output.NewManager([]output.Output{metricsEngine.CreateIngester()}, testState.Logger, runAbort)
samples := make(chan metrics.SampleContainer, 1000)
waitMetricsFlushed, stopOutputs, err := outputManager.Start(samples)
require.NoError(t, err)
defer stopOutputs(nil)

cs := &ControlSurface{
RunCtx: runCtx,
Samples: engine.Samples,
MetricsEngine: engine.MetricsEngine,
Samples: samples,
MetricsEngine: metricsEngine,
Scheduler: execScheduler,
RunState: testState,
}

run, wait, err := engine.Init(globalCtx, runCtx)
require.NoError(t, err)

wg := &sync.WaitGroup{}
wg.Add(1)
defer func() {
runAbort(fmt.Errorf("custom cancel signal"))
wait()
waitMetricsFlushed()
wg.Wait()
}()

go func() {
assert.ErrorContains(t, run(), "custom cancel signal")
assert.ErrorContains(t, execScheduler.Run(globalCtx, runCtx, samples), "custom cancel signal")
close(samples)
wg.Done()
}()
// wait for the executor to initialize to avoid a potential data race below
Expand Down
11 changes: 5 additions & 6 deletions cmd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,8 @@ func TestThresholdsFailed(t *testing.T) {
)
newRootCommand(ts.globalState).execute()

assert.True(t, testutils.LogContains(ts.loggerHook.Drain(), logrus.ErrorLevel, `some thresholds have failed`))
expErr := "thresholds on metrics 'iterations{scenario:sc1}, iterations{scenario:sc2}' have been breached"
assert.True(t, testutils.LogContains(ts.loggerHook.Drain(), logrus.ErrorLevel, expErr))
stdOut := ts.stdOut.String()
t.Log(stdOut)
assert.Contains(t, stdOut, ` ✓ iterations...........: 3`)
Expand Down Expand Up @@ -655,7 +656,6 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) {
newRootCommand(ts.globalState).execute()

logs := ts.loggerHook.Drain()
assert.False(t, testutils.LogContains(logs, logrus.ErrorLevel, `some thresholds have failed`))
assert.True(t, testutils.LogContains(logs, logrus.ErrorLevel, `test run was aborted because k6 received a 'interrupt' signal`))
stdOut := ts.stdOut.String()
t.Log(stdOut)
Expand Down Expand Up @@ -848,7 +848,7 @@ func runTestWithNoLinger(t *testing.T, ts *globalTestState) {

func runTestWithLinger(t *testing.T, ts *globalTestState) {
ts.args = append(ts.args, "--linger")
asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "Linger set; waiting for Ctrl+C")
asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "waiting for Ctrl+C to continue")
newRootCommand(ts.globalState).execute()
}

Expand Down Expand Up @@ -939,7 +939,7 @@ func testAbortedByScriptError(t *testing.T, script string, runTest func(*testing
t.Log(stdOut)
assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`)
assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`)
assert.Contains(t, stdOut, `level=debug msg="Everything has finished, exiting k6!"`)
assert.Contains(t, stdOut, `level=debug msg="Everything has finished, exiting k6 with error`)
assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=7 tainted=false`)
return ts
}
Expand Down Expand Up @@ -1050,7 +1050,6 @@ func TestAbortedByScriptAbortInSetup(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, script, runTestWithNoLinger)
})

t.Run("withLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, script, runTestWithLinger)
Expand Down Expand Up @@ -1391,7 +1390,7 @@ func TestActiveVUsCount(t *testing.T) {
if i < 3 {
assert.Equal(t, "Insufficient VUs, reached 10 active VUs and cannot initialize more", logEntry.Message)
} else {
assert.Equal(t, "No script iterations finished, consider making the test duration longer", logEntry.Message)
assert.Equal(t, "No script iterations fully finished, consider making the test duration longer", logEntry.Message)
}
}
}
Expand Down
Loading

0 comments on commit 8324c42

Please sign in to comment.