Skip to content

Commit

Permalink
Make Allocator batchWaitTime configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Valentin Torikian committed May 19, 2022
1 parent 9ad852f commit d1e747c
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 52 deletions.
7 changes: 4 additions & 3 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func main() {
return err
})

h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout)
h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime)

if !h.tlsDisabled {
watcherTLS, err := fsnotify.NewWatcher()
Expand Down Expand Up @@ -280,7 +280,7 @@ func runGRPC(h *serviceHandler, grpcPort int) {
}()
}

func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration) *serviceHandler {
func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration) *serviceHandler {
defaultResync := 30 * time.Second
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)
Expand All @@ -293,7 +293,8 @@ func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.I
kubeClient,
gameserverallocations.NewAllocationCache(agonesInformerFactory.Agones().V1().GameServers(), gsCounter, health),
remoteAllocationTimeout,
totalRemoteAllocationTimeout)
totalRemoteAllocationTimeout,
allocationBatchWaitTime)

ctx := signals.NewSigKillContext()
h := serviceHandler{
Expand Down
5 changes: 5 additions & 0 deletions cmd/allocator/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
apiServerSustainedQPSFlag = "api-server-qps"
apiServerBurstQPSFlag = "api-server-qps-burst"
logLevelFlag = "log-level"
allocationBatchWaitTime = "allocation-batch-wait-time"
)

func init() {
Expand All @@ -62,6 +63,7 @@ type config struct {
LogLevel string
totalRemoteAllocationTimeout time.Duration
remoteAllocationTimeout time.Duration
allocationBatchWaitTime time.Duration
}

func parseEnvFlags() config {
Expand All @@ -79,6 +81,7 @@ func parseEnvFlags() config {
viper.SetDefault(remoteAllocationTimeoutFlag, 10*time.Second)
viper.SetDefault(totalRemoteAllocationTimeoutFlag, 30*time.Second)
viper.SetDefault(logLevelFlag, "Info")
viper.SetDefault(allocationBatchWaitTime, 500*time.Millisecond)

pflag.Int32(httpPortFlag, viper.GetInt32(httpPortFlag), "Port to listen on for REST requests")
pflag.Int32(grpcPortFlag, viper.GetInt32(grpcPortFlag), "Port to listen on for gRPC requests")
Expand All @@ -93,6 +96,7 @@ func parseEnvFlags() config {
pflag.Duration(remoteAllocationTimeoutFlag, viper.GetDuration(remoteAllocationTimeoutFlag), "Flag to set remote allocation call timeout.")
pflag.Duration(totalRemoteAllocationTimeoutFlag, viper.GetDuration(totalRemoteAllocationTimeoutFlag), "Flag to set total remote allocation timeout including retries.")
pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Agones Log level")
pflag.Duration(allocationBatchWaitTime, viper.GetDuration(allocationBatchWaitTime), "Flag to configure the waiting period between allocations batches")
runtime.FeaturesBindFlags()
pflag.Parse()

Expand Down Expand Up @@ -127,6 +131,7 @@ func parseEnvFlags() config {
LogLevel: viper.GetString(logLevelFlag),
remoteAllocationTimeout: viper.GetDuration(remoteAllocationTimeoutFlag),
totalRemoteAllocationTimeout: viper.GetDuration(totalRemoteAllocationTimeoutFlag),
allocationBatchWaitTime: viper.GetDuration(allocationBatchWaitTime),
}
}

Expand Down
96 changes: 51 additions & 45 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const (
logLevelFlag = "log-level"
logSizeLimitMBFlag = "log-size-limit-mb"
kubeconfigFlag = "kubeconfig"
allocationBatchWaitTime = "allocation-batch-wait-time"
defaultResync = 30 * time.Second
)

Expand Down Expand Up @@ -210,7 +211,8 @@ func main() {
gsSetController := gameserversets.NewController(wh, health, gsCounter,
kubeClient, extClient, agonesClient, agonesInformerFactory)
fleetController := fleets.NewController(wh, health, kubeClient, extClient, agonesClient, agonesInformerFactory)
gasController := gameserverallocations.NewController(api, health, gsCounter, kubeClient, kubeInformerFactory, agonesClient, agonesInformerFactory, 10*time.Second, 30*time.Second)
gasController := gameserverallocations.NewController(api, health, gsCounter, kubeClient, kubeInformerFactory,
agonesClient, agonesInformerFactory, 10*time.Second, 30*time.Second, ctlConf.AllocationBatchWaitTime)
fasController := fleetautoscalers.NewController(wh, health,
kubeClient, extClient, agonesClient, agonesInformerFactory)

Expand Down Expand Up @@ -253,6 +255,7 @@ func parseEnvFlags() config {
viper.SetDefault(enablePrometheusMetricsFlag, true)
viper.SetDefault(enableStackdriverMetricsFlag, false)
viper.SetDefault(stackdriverLabels, "")
viper.SetDefault(allocationBatchWaitTime, 500*time.Millisecond)

viper.SetDefault(projectIDFlag, "")
viper.SetDefault(numWorkersFlag, 64)
Expand Down Expand Up @@ -284,6 +287,7 @@ func parseEnvFlags() config {
pflag.String(logDirFlag, viper.GetString(logDirFlag), "If set, store logs in a given directory.")
pflag.Int32(logSizeLimitMBFlag, 1000, "Log file size limit in MB")
pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Agones Log level")
pflag.Duration(allocationBatchWaitTime, viper.GetDuration(allocationBatchWaitTime), "Flag to configure the waiting period between allocations batches")
runtime.FeaturesBindFlags()
pflag.Parse()

Expand Down Expand Up @@ -336,55 +340,57 @@ func parseEnvFlags() config {
}

return config{
MinPort: int32(viper.GetInt64(minPortFlag)),
MaxPort: int32(viper.GetInt64(maxPortFlag)),
SidecarImage: viper.GetString(sidecarImageFlag),
SidecarCPURequest: requestCPU,
SidecarCPULimit: limitCPU,
SidecarMemoryRequest: requestMemory,
SidecarMemoryLimit: limitMemory,
SdkServiceAccount: viper.GetString(sdkServerAccountFlag),
AlwaysPullSidecar: viper.GetBool(pullSidecarFlag),
KeyFile: viper.GetString(keyFileFlag),
CertFile: viper.GetString(certFileFlag),
KubeConfig: viper.GetString(kubeconfigFlag),
PrometheusMetrics: viper.GetBool(enablePrometheusMetricsFlag),
Stackdriver: viper.GetBool(enableStackdriverMetricsFlag),
GCPProjectID: viper.GetString(projectIDFlag),
NumWorkers: int(viper.GetInt32(numWorkersFlag)),
APIServerSustainedQPS: int(viper.GetInt32(apiServerSustainedQPSFlag)),
APIServerBurstQPS: int(viper.GetInt32(apiServerBurstQPSFlag)),
LogDir: viper.GetString(logDirFlag),
LogLevel: viper.GetString(logLevelFlag),
LogSizeLimitMB: int(viper.GetInt32(logSizeLimitMBFlag)),
StackdriverLabels: viper.GetString(stackdriverLabels),
MinPort: int32(viper.GetInt64(minPortFlag)),
MaxPort: int32(viper.GetInt64(maxPortFlag)),
SidecarImage: viper.GetString(sidecarImageFlag),
SidecarCPURequest: requestCPU,
SidecarCPULimit: limitCPU,
SidecarMemoryRequest: requestMemory,
SidecarMemoryLimit: limitMemory,
SdkServiceAccount: viper.GetString(sdkServerAccountFlag),
AlwaysPullSidecar: viper.GetBool(pullSidecarFlag),
KeyFile: viper.GetString(keyFileFlag),
CertFile: viper.GetString(certFileFlag),
KubeConfig: viper.GetString(kubeconfigFlag),
PrometheusMetrics: viper.GetBool(enablePrometheusMetricsFlag),
Stackdriver: viper.GetBool(enableStackdriverMetricsFlag),
GCPProjectID: viper.GetString(projectIDFlag),
NumWorkers: int(viper.GetInt32(numWorkersFlag)),
APIServerSustainedQPS: int(viper.GetInt32(apiServerSustainedQPSFlag)),
APIServerBurstQPS: int(viper.GetInt32(apiServerBurstQPSFlag)),
LogDir: viper.GetString(logDirFlag),
LogLevel: viper.GetString(logLevelFlag),
LogSizeLimitMB: int(viper.GetInt32(logSizeLimitMBFlag)),
StackdriverLabels: viper.GetString(stackdriverLabels),
AllocationBatchWaitTime: viper.GetDuration(allocationBatchWaitTime),
}
}

// config stores all required configuration to create a game server controller.
type config struct {
MinPort int32
MaxPort int32
SidecarImage string
SidecarCPURequest resource.Quantity
SidecarCPULimit resource.Quantity
SidecarMemoryRequest resource.Quantity
SidecarMemoryLimit resource.Quantity
SdkServiceAccount string
AlwaysPullSidecar bool
PrometheusMetrics bool
Stackdriver bool
StackdriverLabels string
KeyFile string
CertFile string
KubeConfig string
GCPProjectID string
NumWorkers int
APIServerSustainedQPS int
APIServerBurstQPS int
LogDir string
LogLevel string
LogSizeLimitMB int
MinPort int32
MaxPort int32
SidecarImage string
SidecarCPURequest resource.Quantity
SidecarCPULimit resource.Quantity
SidecarMemoryRequest resource.Quantity
SidecarMemoryLimit resource.Quantity
SdkServiceAccount string
AlwaysPullSidecar bool
PrometheusMetrics bool
Stackdriver bool
StackdriverLabels string
KeyFile string
CertFile string
KubeConfig string
GCPProjectID string
NumWorkers int
APIServerSustainedQPS int
APIServerBurstQPS int
LogDir string
LogLevel string
LogSizeLimitMB int
AllocationBatchWaitTime time.Duration
}

// validate ensures the ctlConfig data is valid.
Expand Down
2 changes: 2 additions & 0 deletions install/helm/agones/templates/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ spec:
value: {{ .Values.agones.controller.logLevel | quote }}
- name: FEATURE_GATES
value: {{ .Values.agones.featureGates | quote }}
- name: ALLOCATION_BATCH_WAIT_TIME
value: {{ .Values.agones.controller.allocationBatchWaitTime | quote }}
{{- if .Values.agones.controller.persistentLogs }}
- name: LOG_DIR
value: "/home/agones/logs"
Expand Down
2 changes: 2 additions & 0 deletions install/helm/agones/templates/service/allocation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ spec:
value: {{ .Values.agones.allocator.logLevel | quote }}
- name: FEATURE_GATES
value: {{ .Values.agones.featureGates | quote }}
- name: ALLOCATION_BATCH_WAIT_TIME
value: {{ .Values.agones.allocator.allocationBatchWaitTime | quote }}
ports:
{{- if .Values.agones.allocator.service.http.enabled }}
- name: {{ .Values.agones.allocator.service.http.portName }}
Expand Down
2 changes: 2 additions & 0 deletions install/helm/agones/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ agones:
periodSeconds: 3
failureThreshold: 3
timeoutSeconds: 1
allocationBatchWaitTime: 500ms
ping:
install: true
resources: {}
Expand Down Expand Up @@ -199,6 +200,7 @@ agones:
disableTLS: false
remoteAllocationTimeout: 10s
totalRemoteAllocationTimeout: 30s
allocationBatchWaitTime: 500ms
image:
registry: gcr.io/agones-images
tag: 1.24.0-dev
Expand Down
7 changes: 4 additions & 3 deletions pkg/gameserverallocations/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ const (
allocatorPort = "443"
maxBatchQueue = 100
maxBatchBeforeRefresh = 100
batchWaitTime = 500 * time.Millisecond
)

var allocationRetry = wait.Backoff{
Expand Down Expand Up @@ -108,6 +107,7 @@ type Allocator struct {
remoteAllocationCallback func(context.Context, string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error)
remoteAllocationTimeout time.Duration
totalRemoteAllocationTimeout time.Duration
allocationBatchWaitTime time.Duration
}

// request is an async request for allocation
Expand All @@ -125,7 +125,7 @@ type response struct {

// NewAllocator creates an instance of Allocator
func NewAllocator(policyInformer multiclusterinformerv1.GameServerAllocationPolicyInformer, secretInformer informercorev1.SecretInformer, gameServerGetter getterv1.GameServersGetter,
kubeClient kubernetes.Interface, allocationCache *AllocationCache, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration) *Allocator {
kubeClient kubernetes.Interface, allocationCache *AllocationCache, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration) *Allocator {
ah := &Allocator{
pendingRequests: make(chan request, maxBatchQueue),
allocationPolicyLister: policyInformer.Lister(),
Expand All @@ -134,6 +134,7 @@ func NewAllocator(policyInformer multiclusterinformerv1.GameServerAllocationPoli
secretSynced: secretInformer.Informer().HasSynced,
gameServerGetter: gameServerGetter,
allocationCache: allocationCache,
allocationBatchWaitTime: allocationBatchWaitTime,
remoteAllocationTimeout: remoteAllocationTimeout,
totalRemoteAllocationTimeout: totalRemoteAllocationTimeout,
remoteAllocationCallback: func(ctx context.Context, endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
Expand Down Expand Up @@ -531,7 +532,7 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int
list = nil
requestCount = 0
// slow down cpu churn, and allow items to batch
time.Sleep(batchWaitTime)
time.Sleep(c.allocationBatchWaitTime)
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/gameserverallocations/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func NewController(apiServer *apiserver.APIServer,
agonesInformerFactory externalversions.SharedInformerFactory,
remoteAllocationTimeout time.Duration,
totalAllocationTimeout time.Duration,
allocationBatchWaitTime time.Duration,
) *Controller {
c := &Controller{
api: apiServer,
Expand All @@ -71,7 +72,8 @@ func NewController(apiServer *apiserver.APIServer,
kubeClient,
NewAllocationCache(agonesInformerFactory.Agones().V1().GameServers(), counter, health),
remoteAllocationTimeout,
totalAllocationTimeout),
totalAllocationTimeout,
allocationBatchWaitTime),
}
c.baseLogger = runtime.NewLoggerWithType(c)

Expand Down

0 comments on commit d1e747c

Please sign in to comment.