diff --git a/core/local/eventloop_test.go b/core/local/eventloop_test.go new file mode 100644 index 000000000000..b16a8c13eeda --- /dev/null +++ b/core/local/eventloop_test.go @@ -0,0 +1,125 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package local + +import ( + "io/ioutil" + "net/url" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "go.k6.io/k6/js" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/metrics" + "go.k6.io/k6/lib/testutils" + "go.k6.io/k6/lib/types" + "go.k6.io/k6/loader" +) + +func TestEventLoop(t *testing.T) { + t.Parallel() + script := []byte(` + setTimeout(()=> {console.log("initcontext setTimeout")}, 200) + console.log("initcontext"); + export default function() { + setTimeout(()=> {console.log("default setTimeout")}, 200) + console.log("default"); + }; + export function setup() { + setTimeout(()=> {console.log("setup setTimeout")}, 200) + console.log("setup"); + }; + export function teardown() { + setTimeout(()=> {console.log("teardown setTimeout")}, 200) + console.log("teardown"); + }; + export function handleSummary() { + setTimeout(()=> {console.log("handleSummary setTimeout")}, 200) + console.log("handleSummary"); + }; +`) + + logger := logrus.New() + logger.SetOutput(ioutil.Discard) + logHook := testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel}} + logger.AddHook(&logHook) + + registry := metrics.NewRegistry() + builtinMetrics := metrics.RegisterBuiltinMetrics(registry) + runner, err := js.New( + logger, + &loader.SourceData{ + URL: &url.URL{Path: "/script.js"}, + Data: script, + }, + nil, + lib.RuntimeOptions{}, + builtinMetrics, + registry, + ) + require.NoError(t, err) + + ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, + lib.Options{ + TeardownTimeout: types.NullDurationFrom(time.Second), + SetupTimeout: types.NullDurationFrom(time.Second), + }) + defer cancel() + + errCh := make(chan error, 1) + go func() { errCh <- execScheduler.Run(ctx, ctx, samples, builtinMetrics) }() + + select { + case err := <-errCh: + require.NoError(t, err) + _, err = runner.HandleSummary(ctx, &lib.Summary{RootGroup: &lib.Group{}}) + require.NoError(t, err) + entries := logHook.Drain() + msgs := make([]string, len(entries)) + for i, entry := range entries { + msgs[i] = entry.Message + } + require.Equal(t, []string{ + "initcontext", // first initialization + "initcontext setTimeout", + "initcontext", // for vu + "initcontext setTimeout", + "initcontext", // for setup + "initcontext setTimeout", + "setup", // setup + "setup setTimeout", + "default", // one iteration + "default setTimeout", + "initcontext", // for teardown + "initcontext setTimeout", + "teardown", // teardown + "teardown setTimeout", + "initcontext", // for handleSummary + "initcontext setTimeout", + "handleSummary", // handleSummary + "handleSummary setTimeout", + }, msgs) + case <-time.After(10 * time.Second): + t.Fatal("timed out") + } +} diff --git a/js/bundle.go b/js/bundle.go index 39bf259f6ffc..d578812b900a 100644 --- a/js/bundle.go +++ b/js/bundle.go @@ -28,6 +28,7 @@ import ( "fmt" "net/url" "runtime" + "time" "github.com/dop251/goja" "github.com/dop251/goja/parser" @@ -69,6 +70,7 @@ type BundleInstance struct { env map[string]string exports map[string]goja.Callable + loop *eventLoop } // NewBundle creates a new bundle from a source file and a filesystem. @@ -261,6 +263,7 @@ func (b *Bundle) Instantiate(logger logrus.FieldLogger, vuID uint64) (bi *Bundle Context: ctxPtr, exports: make(map[string]goja.Callable), env: b.RuntimeOptions.Env, + loop: init.loop, } // Grab any exported functions that could be executed. These were @@ -307,6 +310,16 @@ func (b *Bundle) instantiate(logger logrus.FieldLogger, rt *goja.Runtime, init * } rt.Set("__ENV", env) rt.Set("__VU", vuID) + _ = rt.Set("setTimeout", func(f func(), t float64) { + // TODO checks and fixes + // TODO maybe really return something to use with `clearTimeout + // TODO support arguments ... maybe + runOnLoop := init.loop.Reserve() + go func() { + time.Sleep(time.Duration(t * float64(time.Millisecond))) + runOnLoop(f) + }() + }) rt.Set("console", common.Bind(rt, newConsole(logger), init.ctxPtr)) if init.compatibilityMode == lib.CompatibilityModeExtended { @@ -324,7 +337,10 @@ func (b *Bundle) instantiate(logger logrus.FieldLogger, rt *goja.Runtime, init * ctx := common.WithInitEnv(context.Background(), initenv) *init.ctxPtr = common.WithRuntime(ctx, rt) unbindInit := common.BindToGlobal(rt, common.Bind(rt, init, init.ctxPtr)) - if _, err := rt.RunProgram(b.Program); err != nil { + var err error + init.loop.RunOnLoop(func() { _, err = rt.RunProgram(b.Program) }) + init.loop.Start(*init.ctxPtr) + if err != nil { var exception *goja.Exception if errors.As(err, &exception) { err = &scriptException{inner: exception} diff --git a/js/eventloop.go b/js/eventloop.go new file mode 100644 index 000000000000..fddd489d896f --- /dev/null +++ b/js/eventloop.go @@ -0,0 +1,116 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package js + +import ( + "context" + "sync" +) + +// an event loop +// TODO: DO NOT USE AS IT'S NOT DONE +type eventLoop struct { + queueLock sync.Mutex + queue []func() + wakeupCh chan struct{} // maybe use sync.Cond ? + reservedCount int +} + +func newEventLoop() *eventLoop { + return &eventLoop{ + wakeupCh: make(chan struct{}, 1), + } +} + +// RunOnLoop queues the function to be called from/on the loop +// This needs to be called before calling `Start` +// TODO maybe have only Reserve as this is equal to `e.Reserve()(f)` +func (e *eventLoop) RunOnLoop(f func()) { + e.queueLock.Lock() + e.queue = append(e.queue, f) + e.queueLock.Unlock() + select { + case e.wakeupCh <- struct{}{}: + default: + } +} + +// Reserve "reserves" a spot on the loop, preventing it from returning/finishing. The returning function will queue it's +// argument and wakeup the loop if needed and also unreserve the spot so that the loop can exit. +// this should be used instead of MakeHandledPromise if a promise will not be returned +// TODO better name +func (e *eventLoop) Reserve() func(func()) { + e.queueLock.Lock() + e.reservedCount++ + e.queueLock.Unlock() + + return func(f func()) { + e.queueLock.Lock() + e.queue = append(e.queue, f) + e.reservedCount-- + e.queueLock.Unlock() + select { + case e.wakeupCh <- struct{}{}: + default: + } + } +} + +// Start will run the event loop until it's empty and there are no reserved spots +// or the context is done +func (e *eventLoop) Start(ctx context.Context) { + done := ctx.Done() + for { + select { // check if done + case <-done: + return + default: + } + + // acquire the queue + e.queueLock.Lock() + queue := e.queue + e.queue = make([]func(), 0, len(queue)) + reserved := e.reservedCount != 0 + e.queueLock.Unlock() + + if len(queue) == 0 { + if !reserved { // we have empty queue and nothing that reserved a spot + return + } + select { // wait until the reserved is done + case <-done: + return + case <-e.wakeupCh: + } + } + + for _, f := range queue { + // run each function in the queue if not done + select { + case <-done: + return + default: + f() + } + } + } +} diff --git a/js/eventloop_test.go b/js/eventloop_test.go new file mode 100644 index 000000000000..26487fc3e6c7 --- /dev/null +++ b/js/eventloop_test.go @@ -0,0 +1,71 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package js + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBasicEventLoop(t *testing.T) { + t.Parallel() + loop := newEventLoop() + var ran int + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + loop.RunOnLoop(func() { ran++ }) + loop.Start(ctx) + require.Equal(t, ran, 1) + loop.RunOnLoop(func() { ran++ }) + loop.RunOnLoop(func() { ran++ }) + loop.Start(ctx) + require.Equal(t, ran, 3) + loop.RunOnLoop(func() { ran++; cancel() }) + loop.RunOnLoop(func() { ran++ }) + loop.Start(ctx) + require.Equal(t, ran, 4) +} + +func TestEventLoopReserve(t *testing.T) { + t.Parallel() + loop := newEventLoop() + var ran int + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + loop.RunOnLoop(func() { + ran++ + r := loop.Reserve() + go func() { + time.Sleep(time.Second) + r(func() { + ran++ + }) + }() + }) + start := time.Now() + loop.Start(ctx) + took := time.Since(start) + require.Equal(t, ran, 2) + require.Greater(t, took, time.Second) +} diff --git a/js/initcontext.go b/js/initcontext.go index 4580ce9df5dc..5d9d660fa5d6 100644 --- a/js/initcontext.go +++ b/js/initcontext.go @@ -67,6 +67,7 @@ type InitContext struct { // Bound runtime; used to instantiate objects. runtime *goja.Runtime compiler *compiler.Compiler + loop *eventLoop // Pointer to a context that bridged modules are invoked with. ctxPtr *context.Context @@ -100,6 +101,7 @@ func NewInitContext( compatibilityMode: compatMode, logger: logger, modules: getJSModules(), + loop: newEventLoop(), } } @@ -126,6 +128,7 @@ func newBoundInitContext(base *InitContext, ctxPtr *context.Context, rt *goja.Ru compatibilityMode: base.compatibilityMode, logger: base.logger, modules: base.modules, + loop: newEventLoop(), } } @@ -153,6 +156,8 @@ func (i *InitContext) Require(arg string) goja.Value { type moduleInstanceCoreImpl struct { ctxPtr *context.Context + rt *goja.Runtime + loop *eventLoop // we can technically put lib.State here as well as anything else } @@ -169,7 +174,26 @@ func (m *moduleInstanceCoreImpl) GetState() *lib.State { } func (m *moduleInstanceCoreImpl) GetRuntime() *goja.Runtime { - return common.GetRuntime(*m.ctxPtr) // TODO thread it correctly instead + return m.rt +} + +func (m *moduleInstanceCoreImpl) AddToEventLoop(f func()) { + m.loop.RunOnLoop(f) +} + +// MakeHandledPromise will create and promise and return it's resolve, reject methods as well wrapped in such a way that +// it will block the eventloop from exiting before they are called even if the promise isn't resolved by the time the +// current script ends executing +func (m *moduleInstanceCoreImpl) MakeHandledPromise() (*goja.Promise, func(interface{}), func(interface{})) { + reserved := m.loop.Reserve() + p, resolve, reject := m.rt.NewPromise() + return p, func(i interface{}) { + // more stuff + reserved(func() { resolve(i) }) + }, func(i interface{}) { + // more stuff + reserved(func() { reject(i) }) + } } func toESModuleExports(exp modules.Exports) interface{} { @@ -201,7 +225,7 @@ func (i *InitContext) requireModule(name string) (goja.Value, error) { return nil, fmt.Errorf("unknown module: %s", name) } if modV2, ok := mod.(modules.IsModuleV2); ok { - instance := modV2.NewModuleInstance(&moduleInstanceCoreImpl{ctxPtr: i.ctxPtr}) + instance := modV2.NewModuleInstance(&moduleInstanceCoreImpl{ctxPtr: i.ctxPtr, rt: i.runtime, loop: i.loop}) return i.runtime.ToValue(toESModuleExports(instance.GetExports())), nil } if perInstance, ok := mod.(modules.HasModuleInstancePerVU); ok { diff --git a/js/modules/modules.go b/js/modules/modules.go index 5c940b07e17b..d784851dc61e 100644 --- a/js/modules/modules.go +++ b/js/modules/modules.go @@ -123,6 +123,21 @@ type InstanceCore interface { // sealing field will help probably with pointing users that they just need to embed this in their Instance // implementations + + // MakeHandledPromise needs a better name + // because of the way promises work and the fact that we probably don't want promises from one iteration to live + // till the next, this method lets a piece of module code say that they will be returning a promise that needs to be + // resolved/rejected within this iteration. K6 will not continue with a next iteration until either `resolve` or + // `reject` are called at which point the Promise usual handling of those will trigger. + // Caveats: this likely won't work if the Promise is rejected from within the js code + // This also will likely have problems with context canceling so both of those will need extra care + // TODO maybe export eventloop.Reserve and implement this in the js/common + MakeHandledPromise() (p *goja.Promise, resolve func(interface{}), reject func(interface{})) + + // AddToEventLoop needs a better name + // MUST only be called while absolutely certain that something will not let the iteration end between the start and + // end of the call + AddToEventLoop(func()) } // Exports is representation of ESM exports of a module diff --git a/js/modulestest/modulestest.go b/js/modulestest/modulestest.go index e00cae7c7dfe..7e36f1fadf16 100644 --- a/js/modulestest/modulestest.go +++ b/js/modulestest/modulestest.go @@ -58,3 +58,13 @@ func (m *InstanceCore) GetState() *lib.State { func (m *InstanceCore) GetRuntime() *goja.Runtime { return m.Runtime } + +// MakeHandledPromise is not really implemented +func (m *InstanceCore) MakeHandledPromise() (p *goja.Promise, resolve func(interface{}), reject func(interface{})) { + return m.Runtime.NewPromise() // TODO fix +} + +// AddToEventLoop is not really implemented +func (m *InstanceCore) AddToEventLoop(f func()) { + // TODO Implement +} diff --git a/js/runner.go b/js/runner.go index ae0a690af696..685f54f39ef4 100644 --- a/js/runner.go +++ b/js/runner.go @@ -231,6 +231,7 @@ func (r *Runner) newVU(idLocal, idGlobal uint64, samplesOut chan<- stats.SampleC BPool: bpool.NewBufferPool(100), Samples: samplesOut, scenarioIter: make(map[string]uint64), + loop: bi.loop, } vu.state = &lib.State{ @@ -541,6 +542,7 @@ func (r *Runner) getTimeoutFor(stage string) time.Duration { type VU struct { BundleInstance + loop *eventLoop Runner *Runner Transport *http.Transport Dialer *netext.Dialer @@ -734,24 +736,27 @@ func (u *VU) runFn( u.state.Tags.Set("iter", strconv.FormatInt(u.state.Iteration, 10)) } - defer func() { - if r := recover(); r != nil { - gojaStack := u.Runtime.CaptureCallStack(20, nil) - err = fmt.Errorf("a panic occurred in VU code but was caught: %s", r) - // TODO figure out how to use PanicLevel without panicing .. this might require changing - // the logger we use see - // https://github.com/sirupsen/logrus/issues/1028 - // https://github.com/sirupsen/logrus/issues/993 - b := new(bytes.Buffer) - for _, s := range gojaStack { - s.Write(b) + u.loop.RunOnLoop(func() { + defer func() { + if r := recover(); r != nil { + gojaStack := u.Runtime.CaptureCallStack(20, nil) + err = fmt.Errorf("a panic occurred in VU code but was caught: %s", r) + // TODO figure out how to use PanicLevel without panicing .. this might require changing + // the logger we use see + // https://github.com/sirupsen/logrus/issues/1028 + // https://github.com/sirupsen/logrus/issues/993 + b := new(bytes.Buffer) + for _, s := range gojaStack { + s.Write(b) + } + u.state.Logger.Log(logrus.ErrorLevel, "panic: ", r, "\n", string(debug.Stack()), "\nGoja stack:\n", b.String()) } - u.state.Logger.Log(logrus.ErrorLevel, "panic: ", r, "\n", string(debug.Stack()), "\nGoja stack:\n", b.String()) - } - }() + }() + v, err = fn(goja.Undefined(), args...) // Actually run the JS script + }) startTime := time.Now() - v, err = fn(goja.Undefined(), args...) // Actually run the JS script + u.loop.Start(ctx) endTime := time.Now() var exception *goja.Exception if errors.As(err, &exception) {