Skip to content

Commit

Permalink
Fix sync of scenario iteration increments
Browse files Browse the repository at this point in the history
This moves the iteration state from lib.State to js.VU and js.ActiveVU,
and adds synchronization between scenario iteration increments to ensure
the information returned by k6/execution within a single VU iteration is
stable.

Resolves grafana/k6#1863 (comment)
  • Loading branch information
Ivan Mirić committed Jun 21, 2021
1 parent 3c88561 commit 58946f2
Show file tree
Hide file tree
Showing 19 changed files with 303 additions and 232 deletions.
86 changes: 72 additions & 14 deletions js/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (r *Runner) newVU(id uint64, samplesOut chan<- stats.SampleContainer) (*VU,

vu := &VU***REMOVED***
ID: id,
iteration: int64(-1),
BundleInstance: *bi,
Runner: r,
Transport: transport,
Expand All @@ -217,6 +218,8 @@ func (r *Runner) newVU(id uint64, samplesOut chan<- stats.SampleContainer) (*VU,
Console: r.console,
BPool: bpool.NewBufferPool(100),
Samples: samplesOut,
scenarioID: make(map[string]uint64),
scenarioIter: make(map[string]int64),
***REMOVED***

vu.state = &lib.State***REMOVED***
Expand All @@ -233,7 +236,6 @@ func (r *Runner) newVU(id uint64, samplesOut chan<- stats.SampleContainer) (*VU,
Tags: vu.Runner.Bundle.Options.RunTags.CloneTags(),
Group: r.defaultGroup,
***REMOVED***
vu.state.Init()
vu.Runtime.Set("console", common.Bind(vu.Runtime, vu.Console, vu.Context))

// This is here mostly so if someone tries they get a nice message
Expand Down Expand Up @@ -532,6 +534,7 @@ type VU struct ***REMOVED***
CookieJar *cookiejar.Jar
TLSConfig *tls.Config
ID uint64
iteration int64

Console *console
BPool *bpool.BufferPool
Expand All @@ -541,6 +544,10 @@ type VU struct ***REMOVED***
setupData goja.Value

state *lib.State
// ID of this VU in each scenario
scenarioID map[string]uint64
// count of iterations executed by this VU in each scenario
scenarioIter map[string]int64
***REMOVED***

// Verify that interfaces are implemented
Expand All @@ -554,6 +561,18 @@ type ActiveVU struct ***REMOVED***
*VU
*lib.VUActivationParams
busy chan struct***REMOVED******REMOVED***

scenarioName string
// Used to synchronize iteration increments for scenarios between VUs.
iterSync chan struct***REMOVED******REMOVED***
// Returns the iteration number across all VUs in the current scenario
// unique to this single k6 instance.
getNextScLocalIter func() uint64
// Returns the iteration number across all VUs in the current scenario
// unique globally across k6 instances (taking into account execution
// segments).
getNextScGlobalIter func() uint64
scIterLocal, scIterGlobal uint64
***REMOVED***

// GetID returns the unique VU ID.
Expand Down Expand Up @@ -589,7 +608,7 @@ func (u *VU) Activate(params *lib.VUActivationParams) lib.ActiveVU ***REMOVED***
u.state.Tags["vu"] = strconv.FormatUint(u.ID, 10)
***REMOVED***
if opts.SystemTags.Has(stats.TagIter) ***REMOVED***
u.state.Tags["iter"] = strconv.FormatInt(u.state.GetIteration(), 10)
u.state.Tags["iter"] = strconv.FormatInt(u.iteration, 10)
***REMOVED***
if opts.SystemTags.Has(stats.TagGroup) ***REMOVED***
u.state.Tags["group"] = u.state.Group.Path
Expand All @@ -602,20 +621,34 @@ func (u *VU) Activate(params *lib.VUActivationParams) lib.ActiveVU ***REMOVED***
ctx = lib.WithState(ctx, u.state)
params.RunContext = ctx
*u.Context = ctx
u.state.ScenarioName = params.Scenario
if params.GetScenarioVUID != nil ***REMOVED***
if _, ok := u.state.GetScenarioVUID(); !ok ***REMOVED***
u.state.SetScenarioVUID(params.GetScenarioVUID())
if params.GetNextScVUID != nil ***REMOVED***
if _, ok := u.scenarioID[params.Scenario]; !ok ***REMOVED***
u.state.VUIDScenario = params.GetNextScVUID()
u.scenarioID[params.Scenario] = u.state.VUIDScenario
***REMOVED***
***REMOVED***

u.state.IncrScIter = params.IncrScIter
u.state.IncrScIterGlobal = params.IncrScIterGlobal
u.state.GetScenarioVUIter = func() int64 ***REMOVED***
return u.scenarioIter[params.Scenario]
***REMOVED***

avu := &ActiveVU***REMOVED***
VU: u,
VUActivationParams: params,
busy: make(chan struct***REMOVED******REMOVED***, 1),
VU: u,
VUActivationParams: params,
busy: make(chan struct***REMOVED******REMOVED***, 1),
scenarioName: params.Scenario,
iterSync: params.IterSync,
scIterLocal: int64(-1),
scIterGlobal: int64(-1),
getNextScLocalIter: params.GetNextScLocalIter,
getNextScGlobalIter: params.GetNextScGlobalIter,
***REMOVED***

u.state.GetScenarioLocalVUIter = func() int64 ***REMOVED***
return avu.scIterLocal
***REMOVED***
u.state.GetScenarioGlobalVUIter = func() int64 ***REMOVED***
return avu.scIterGlobal
***REMOVED***

go func() ***REMOVED***
Expand Down Expand Up @@ -667,8 +700,8 @@ func (u *ActiveVU) RunOnce() error ***REMOVED***
panic(fmt.Sprintf("function '%s' not found in exports", u.Exec))
***REMOVED***

u.state.IncrIteration()
if err := u.Runtime.Set("__ITER", u.state.GetIteration()); err != nil ***REMOVED***
u.incrIteration()
if err := u.Runtime.Set("__ITER", u.iteration); err != nil ***REMOVED***
panic(fmt.Errorf("error setting __ITER in goja runtime: %w", err))
***REMOVED***

Expand Down Expand Up @@ -702,7 +735,7 @@ func (u *VU) runFn(

opts := &u.Runner.Bundle.Options
if opts.SystemTags.Has(stats.TagIter) ***REMOVED***
u.state.Tags["iter"] = strconv.FormatInt(u.state.GetIteration(), 10)
u.state.Tags["iter"] = strconv.FormatInt(u.state.Iteration, 10)
***REMOVED***

defer func() ***REMOVED***
Expand Down Expand Up @@ -745,6 +778,31 @@ func (u *VU) runFn(
return v, isFullIteration, endTime.Sub(startTime), err
***REMOVED***

func (u *ActiveVU) incrIteration() ***REMOVED***
u.iteration++
u.state.Iteration = u.iteration

if u.iterSync != nil ***REMOVED***
// block other VUs from incrementing scenario iterations
u.iterSync <- struct***REMOVED******REMOVED******REMOVED******REMOVED***
defer func() ***REMOVED***
<-u.iterSync // unlock
***REMOVED***()
***REMOVED***

if _, ok := u.scenarioIter[u.scenarioName]; ok ***REMOVED***
u.scenarioIter[u.scenarioName]++
***REMOVED*** else ***REMOVED***
u.scenarioIter[u.scenarioName] = 0
***REMOVED***
if u.getNextScLocalIter != nil ***REMOVED***
u.scIterLocal = u.getNextScLocalIter()
***REMOVED***
if u.getNextScGlobalIter != nil ***REMOVED***
u.scIterGlobal = u.getNextScGlobalIter()
***REMOVED***
***REMOVED***

type scriptException struct ***REMOVED***
inner *goja.Exception
***REMOVED***
Expand Down
34 changes: 18 additions & 16 deletions lib/executor/base_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,23 @@ import (
type BaseExecutor struct ***REMOVED***
config lib.ExecutorConfig
executionState *lib.ExecutionState
localVUID *uint64 // counter for assigning executor-specific VU IDs
localIter *int64 // counter for keeping track of all VU iterations completed by this executor
logger *logrus.Entry
progress *pb.ProgressBar
VUIDLocal *uint64 // counter for assigning executor-specific VU IDs
// Counter for keeping track of all VU iterations completed by this executor
// in the current (local) k6 instance.
iterLocal *int64
logger *logrus.Entry
progress *pb.ProgressBar
***REMOVED***

// NewBaseExecutor returns an initialized BaseExecutor
func NewBaseExecutor(config lib.ExecutorConfig, es *lib.ExecutionState, logger *logrus.Entry) *BaseExecutor ***REMOVED***
// Start at -1 so that the first iteration can be 0
startLocalIter := int64(-1)
startIterLocal := int64(-1)
return &BaseExecutor***REMOVED***
config: config,
executionState: es,
localVUID: new(uint64),
localIter: &startLocalIter,
VUIDLocal: new(uint64),
iterLocal: &startIterLocal,
logger: logger,
progress: pb.New(
pb.WithLeft(config.GetName),
Expand All @@ -73,10 +75,16 @@ func (bs BaseExecutor) GetConfig() lib.ExecutorConfig ***REMOVED***
return bs.config
***REMOVED***

// GetNextLocalVUID increments and returns the next VU ID that's specific for
// getNextLocalVUID increments and returns the next VU ID that's specific for
// this executor (i.e. not global like __VU).
func (bs BaseExecutor) GetNextLocalVUID() uint64 ***REMOVED***
return atomic.AddUint64(bs.localVUID, 1)
func (bs BaseExecutor) getNextLocalVUID() uint64 ***REMOVED***
return atomic.AddUint64(bs.VUIDLocal, 1)
***REMOVED***

// getNextLocalIter increments and returns the next local iteration number, for
// keeping track of total iterations executed by this scenario/executor.
func (bs *BaseExecutor) getNextLocalIter() int64 ***REMOVED***
return atomic.AddInt64(bs.iterLocal, 1)
***REMOVED***

// GetLogger returns the executor logger entry.
Expand All @@ -101,9 +109,3 @@ func (bs BaseExecutor) getMetricTags(vuID *uint64) *stats.SampleTags ***REMOVED*
***REMOVED***
return stats.IntoSampleTags(&tags)
***REMOVED***

// incrScenarioIter increments the counter of completed iterations by all VUs
// for this executor.
func (bs *BaseExecutor) incrScenarioIter() int64 ***REMOVED***
return atomic.AddInt64(bs.localIter, 1)
***REMOVED***
35 changes: 17 additions & 18 deletions lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,10 @@ func (carc ConstantArrivalRateConfig) GetExecutionRequirements(et *lib.Execution
func (carc ConstantArrivalRateConfig) NewExecutor(
es *lib.ExecutionState, logger *logrus.Entry,
) (lib.Executor, error) ***REMOVED***
startGlobalIter := int64(-1)
return &ConstantArrivalRate***REMOVED***
BaseExecutor: NewBaseExecutor(&carc, es, logger),
config: carc,
globalIter: &startGlobalIter,
iterMx: &sync.Mutex***REMOVED******REMOVED***,
***REMOVED***, nil
***REMOVED***

Expand All @@ -185,11 +184,10 @@ func (carc ConstantArrivalRateConfig) HasWork(et *lib.ExecutionTuple) bool ***RE
// specific period.
type ConstantArrivalRate struct ***REMOVED***
*BaseExecutor
config ConstantArrivalRateConfig
et *lib.ExecutionTuple
segIdx *lib.SegmentedIndex
iterMx sync.Mutex
globalIter *int64
config ConstantArrivalRateConfig
et *lib.ExecutionTuple
iterMx *sync.Mutex
segIdx *lib.SegmentedIndex
***REMOVED***

// Make sure we implement the lib.Executor interface.
Expand All @@ -207,19 +205,16 @@ func (car *ConstantArrivalRate) Init(ctx context.Context) error ***REMOVED***
return err
***REMOVED***

// incrGlobalIter increments the global iteration count for this executor,
// taking into account the configured execution segment.
func (car *ConstantArrivalRate) incrGlobalIter() int64 ***REMOVED***
// getNextGlobalIter advances and returns the next global iteration number for
// this executor, taking into account the configured execution segment.
// Unlike the local iteration number returned by getNextLocalIter(), this
// iteration number will be unique across k6 instances.
func (car *ConstantArrivalRate) getNextGlobalIter() int64 ***REMOVED***
car.iterMx.Lock()
defer car.iterMx.Unlock()
car.segIdx.Next()
atomic.StoreInt64(car.globalIter, car.segIdx.GetUnscaled()-1)
return atomic.LoadInt64(car.globalIter)
***REMOVED***

// getGlobalIter returns the global iteration count for this executor.
func (car *ConstantArrivalRate) getGlobalIter() int64 ***REMOVED***
return atomic.LoadInt64(car.globalIter)
// iterations are 0-based
return car.segIdx.GetUnscaled() - 1
***REMOVED***

// Run executes a constant number of iterations per second.
Expand Down Expand Up @@ -302,12 +297,16 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
activeVUsWg.Done()
***REMOVED***

// Channel for synchronizing scenario-specific iteration increments
iterSync := make(chan struct***REMOVED******REMOVED***, 1)
runIterationBasic := getIterationRunner(car.executionState, car.logger)
activateVU := func(initVU lib.InitializedVU) lib.ActiveVU ***REMOVED***
activeVUsWg.Add(1)
activeVU := initVU.Activate(getVUActivationParams(
maxDurationCtx, car.config.BaseConfig, returnVU,
car.GetNextLocalVUID, car.incrScenarioIter, car.incrGlobalIter))
car.getNextLocalVUID, car.getNextLocalIter, car.getNextGlobalIter,
iterSync,
))
car.executionState.ModCurrentlyActiveVUsCount(+1)
atomic.AddUint64(&activeVUsCount, 1)
vusPool.AddVU(maxDurationCtx, activeVU, runIterationBasic)
Expand Down
3 changes: 2 additions & 1 deletion lib/executor/constant_arrival_rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,9 @@ func TestConstantArrivalRateGlobalIters(t *testing.T) ***REMOVED***
gotIters := []int64***REMOVED******REMOVED***
var mx sync.Mutex
runner.Fn = func(ctx context.Context, _ chan<- stats.SampleContainer) error ***REMOVED***
state := lib.GetState(ctx)
mx.Lock()
gotIters = append(gotIters, executor.(*ConstantArrivalRate).getGlobalIter())
gotIters = append(gotIters, state.GetScenarioGlobalVUIter())
mx.Unlock()
return nil
***REMOVED***
Expand Down
2 changes: 1 addition & 1 deletion lib/executor/constant_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (clv ConstantVUs) Run(parentCtx context.Context, out chan<- stats.SampleCon

activeVU := initVU.Activate(
getVUActivationParams(ctx, clv.config.BaseConfig, returnVU,
clv.GetNextLocalVUID, clv.incrScenarioIter, nil))
clv.getNextLocalVUID, clv.getNextLocalIter, nil, iterSync))

for ***REMOVED***
select ***REMOVED***
Expand Down
8 changes: 7 additions & 1 deletion lib/executor/externally_controlled.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ func (rs *externallyControlledRunState) newManualVUHandle(
ctx, cancel := context.WithCancel(rs.ctx)
return &manualVUHandle***REMOVED***
vuHandle: newStoppedVUHandle(ctx, getVU, returnVU,
rs.executor.GetNextLocalVUID, &rs.executor.config.BaseConfig, logger),
rs.executor.getNextLocalVUID, rs.executor.getNextLocalIter,
rs.iterSync, &rs.executor.config.BaseConfig, logger),
initVU: initVU,
wg: &wg,
cancelVU: cancel,
Expand All @@ -383,6 +384,8 @@ type externallyControlledRunState struct ***REMOVED***
currentlyPaused bool // whether the executor is currently paused

runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration
// channel for synchronizing scenario-specific iteration increments
iterSync chan struct***REMOVED******REMOVED***
***REMOVED***

// retrieveStartMaxVUs gets and initializes the (scaled) number of MaxVUs
Expand Down Expand Up @@ -521,6 +524,8 @@ func (mex *ExternallyControlled) Run(parentCtx context.Context, out chan<- stats
).Debug("Starting executor run...")

startMaxVUs := mex.executionState.Options.ExecutionSegment.Scale(mex.config.MaxVUs.Int64)
// Channel for synchronizing scenario-specific iteration increments
iterSync := make(chan struct***REMOVED******REMOVED***, 1)
runState := &externallyControlledRunState***REMOVED***
ctx: ctx,
executor: mex,
Expand All @@ -531,6 +536,7 @@ func (mex *ExternallyControlled) Run(parentCtx context.Context, out chan<- stats
activeVUsCount: new(int64),
maxVUs: new(int64),
runIteration: getIterationRunner(mex.executionState, mex.logger),
iterSync: iterSync,
***REMOVED***
*runState.maxVUs = startMaxVUs
if err = runState.retrieveStartMaxVUs(); err != nil ***REMOVED***
Expand Down
22 changes: 12 additions & 10 deletions lib/executor/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,19 @@ func getArrivalRatePerSec(scaledArrivalRate *big.Rat) *big.Rat ***REMOVED***
// TODO: Refactor this, maybe move all scenario things to an embedded struct?
func getVUActivationParams(
ctx context.Context, conf BaseConfig, deactivateCallback func(lib.InitializedVU),
getScenarioVUID func() uint64, incrScIter func() int64, incrScIterGlobal func() int64,
getNextScVUID func() uint64, getNextScLocalIter, getNextScGlobalIter func() int64,
iterSync chan struct***REMOVED******REMOVED***,
) *lib.VUActivationParams ***REMOVED***
return &lib.VUActivationParams***REMOVED***
RunContext: ctx,
Scenario: conf.Name,
Exec: conf.GetExec(),
Env: conf.GetEnv(),
Tags: conf.GetTags(),
DeactivateCallback: deactivateCallback,
GetScenarioVUID: getScenarioVUID,
IncrScIter: incrScIter,
IncrScIterGlobal: incrScIterGlobal,
RunContext: ctx,
Scenario: conf.Name,
Exec: conf.GetExec(),
Env: conf.GetEnv(),
Tags: conf.GetTags(),
DeactivateCallback: deactivateCallback,
GetNextScVUID: getNextScVUID,
IterSync: iterSync,
GetNextScLocalIter: getNextScLocalIter,
GetNextScGlobalIter: getNextScGlobalIter,
***REMOVED***
***REMOVED***
2 changes: 1 addition & 1 deletion lib/executor/per_vu_iterations.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (pvi PerVUIterations) Run(parentCtx context.Context, out chan<- stats.Sampl
vuID := initVU.GetID()
activeVU := initVU.Activate(
getVUActivationParams(ctx, pvi.config.BaseConfig, returnVU,
pvi.GetNextLocalVUID, pvi.incrScenarioIter, nil))
pvi.getNextLocalVUID, pvi.getNextLocalIter, nil, iterSync))

for i := int64(0); i < iterations; i++ ***REMOVED***
select ***REMOVED***
Expand Down
Loading

0 comments on commit 58946f2

Please sign in to comment.