From f242eade69be7d223317911bdb4024a2d04cbef3 Mon Sep 17 00:00:00 2001 From: Zach Loafman Date: Tue, 1 Nov 2022 23:52:26 +0000 Subject: [PATCH] Cloud product: Split port allocators, implement Autopilot port allocation/policies In the Agones on GKE Autopilot implementation, we have no need for the port allocator - the informer/etc. is an unnecessary moving piece. This PR allows for cloud products to provide their own port allocation implementation, and implements the GKE Autopilot "allocator". We do this by: * Splitting portallocator off to its own package. It was basically self-sufficient anyways, except it was a little too friendly with controller_test.go. I solved that by introducing a TestInterface for controller_test.go to upcast to. * Allow cloud product implementations to define their own port allocator. * Defining a new port allocator for GKE that does a simple per-port HostPort allocation, and adds the host-port-assignment annotation to the pod template. * Extend cloudproduct again to add a GameServer validator * And in Autopilot, reject if the PortPolicy is not `Dynamic` --- pkg/cloudproduct/cloudproduct.go | 10 + pkg/cloudproduct/generic/generic.go | 14 ++ pkg/cloudproduct/gke/gke.go | 77 +++++++ pkg/cloudproduct/gke/gke_test.go | 198 +++++++++++++++++- pkg/gameservers/controller.go | 10 +- pkg/gameservers/controller_test.go | 25 ++- pkg/portallocator/doc.go | 16 ++ .../portallocator.go | 63 ++++-- .../portallocator_test.go | 48 ++--- 9 files changed, 396 insertions(+), 65 deletions(-) create mode 100644 pkg/portallocator/doc.go rename pkg/{gameservers => portallocator}/portallocator.go (86%) rename pkg/{gameservers => portallocator}/portallocator_test.go (94%) diff --git a/pkg/cloudproduct/cloudproduct.go b/pkg/cloudproduct/cloudproduct.go index a965632ee2..68f536f801 100644 --- a/pkg/cloudproduct/cloudproduct.go +++ b/pkg/cloudproduct/cloudproduct.go @@ -19,10 +19,14 @@ import ( "fmt" agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + "agones.dev/agones/pkg/client/informers/externalversions" "agones.dev/agones/pkg/cloudproduct/generic" "agones.dev/agones/pkg/cloudproduct/gke" + "agones.dev/agones/pkg/portallocator" "agones.dev/agones/pkg/util/runtime" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" ) @@ -33,6 +37,12 @@ type CloudProduct interface { // SyncPodPortsToGameServer runs after a Pod has been assigned to a Node and before we sync // Pod host ports to the GameServer status. SyncPodPortsToGameServer(*agonesv1.GameServer, *corev1.Pod) error + + // ValidateGameServer is called by GameServer.Validate to allow for product specific validation. + ValidateGameServer(*agonesv1.GameServer) []metav1.StatusCause + + // NewPortAllocator creates a PortAllocator. c.f. gameservers.NewPortAllocator for parameters. + NewPortAllocator(int32, int32, informers.SharedInformerFactory, externalversions.SharedInformerFactory) portallocator.Interface } const ( diff --git a/pkg/cloudproduct/generic/generic.go b/pkg/cloudproduct/generic/generic.go index c593ea16f5..4c7827d655 100644 --- a/pkg/cloudproduct/generic/generic.go +++ b/pkg/cloudproduct/generic/generic.go @@ -15,7 +15,11 @@ package generic import ( agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + "agones.dev/agones/pkg/client/informers/externalversions" + "agones.dev/agones/pkg/portallocator" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" ) func New() (*generic, error) { return &generic{}, nil } @@ -23,3 +27,13 @@ func New() (*generic, error) { return &generic{}, nil } type generic struct{} func (*generic) SyncPodPortsToGameServer(*agonesv1.GameServer, *corev1.Pod) error { return nil } + +func (*generic) NewPortAllocator(minPort, maxPort int32, + kubeInformerFactory informers.SharedInformerFactory, + agonesInformerFactory externalversions.SharedInformerFactory) portallocator.Interface { + return portallocator.New(minPort, maxPort, kubeInformerFactory, agonesInformerFactory) +} + +func (*generic) ValidateGameServer(*agonesv1.GameServer) []metav1.StatusCause { + return nil +} diff --git a/pkg/cloudproduct/gke/gke.go b/pkg/cloudproduct/gke/gke.go index f2afda1f00..d4e85090a9 100644 --- a/pkg/cloudproduct/gke/gke.go +++ b/pkg/cloudproduct/gke/gke.go @@ -16,13 +16,17 @@ package gke import ( "context" "encoding/json" + "fmt" agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + "agones.dev/agones/pkg/client/informers/externalversions" + "agones.dev/agones/pkg/portallocator" "agones.dev/agones/pkg/util/runtime" "cloud.google.com/go/compute/metadata" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" ) @@ -30,6 +34,8 @@ const ( workloadDefaulterWebhook = "workload-defaulter.config.common-webhooks.networking.gke.io" noWorkloadDefaulter = "failed to get MutatingWebhookConfigurations/workload-defaulter.config.common-webhooks.networking.gke.io (error expected if not on GKE Autopilot)" hostPortAssignmentAnnotation = "autopilot.gke.io/host-port-assignment" + + errPortPolicyMustBeDynamic = "PortPolicy must be Dynamic on GKE Autopilot" ) var logger = runtime.NewLoggerWithSource("gke") @@ -82,3 +88,74 @@ func (*gkeAutopilot) SyncPodPortsToGameServer(gs *agonesv1.GameServer, pod *core } return nil } + +func (*gkeAutopilot) NewPortAllocator(minPort, maxPort int32, + _ informers.SharedInformerFactory, + _ externalversions.SharedInformerFactory) portallocator.Interface { + return &autopilotPortAllocator{minPort: minPort, maxPort: maxPort} +} + +func (*gkeAutopilot) ValidateGameServer(gs *agonesv1.GameServer) []metav1.StatusCause { + var causes []metav1.StatusCause + for _, p := range gs.Spec.Ports { + if p.PortPolicy != agonesv1.Dynamic { + causes = append(causes, metav1.StatusCause{ + Type: metav1.CauseTypeFieldValueInvalid, + Field: fmt.Sprintf("%s.portPolicy", p.Name), + Message: errPortPolicyMustBeDynamic, + }) + } + } + return causes +} + +type autopilotPortAllocator struct { + minPort int32 + maxPort int32 +} + +func (*autopilotPortAllocator) Run(_ context.Context) error { return nil } +func (*autopilotPortAllocator) DeAllocate(gs *agonesv1.GameServer) {} + +func (apa *autopilotPortAllocator) Allocate(gs *agonesv1.GameServer) *agonesv1.GameServer { + if len(gs.Spec.Ports) == 0 { + return gs // Nothing to do. + } + + var ports []agonesv1.GameServerPort + for i, p := range gs.Spec.Ports { + if p.PortPolicy != agonesv1.Dynamic { + logger.Errorf("GameServer %s port %v has PortPolicy %q - this should have been rejected by webhooks, refusing to assign ports", + gs.Name, p, p.PortPolicy) + return gs + } + p.HostPort = int32(i + 1) // Autopilot expects _some_ host port - use a value unique to this GameServer Port. + + if p.Protocol == agonesv1.ProtocolTCPUDP { + tcp := p + tcp.Name = p.Name + "-tcp" + tcp.Protocol = corev1.ProtocolTCP + ports = append(ports, tcp) + + p.Name += "-udp" + p.Protocol = corev1.ProtocolUDP + } + ports = append(ports, p) + } + + hpa := hostPortAssignment{Min: apa.minPort, Max: apa.maxPort} + hpaJSON, err := json.Marshal(hpa) + if err != nil { + logger.Errorf("Internal error marshalling hostPortAssignment %v for GameServer %s: %v", hpa, gs.Name, err) + // In error cases, return the original gs - on Autopilot this will result in a policy failure. + return gs + } + + // No errors past here. + gs.Spec.Ports = ports + if gs.Spec.Template.ObjectMeta.Annotations == nil { + gs.Spec.Template.ObjectMeta.Annotations = make(map[string]string) + } + gs.Spec.Template.ObjectMeta.Annotations[hostPortAssignmentAnnotation] = string(hpaJSON) + return gs +} diff --git a/pkg/cloudproduct/gke/gke_test.go b/pkg/cloudproduct/gke/gke_test.go index d833f06cff..416dbccf80 100644 --- a/pkg/cloudproduct/gke/gke_test.go +++ b/pkg/cloudproduct/gke/gke_test.go @@ -17,8 +17,8 @@ import ( "testing" agonesv1 "agones.dev/agones/pkg/apis/agones/v1" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -66,17 +66,201 @@ func TestSyncPodPortsToGameServer(t *testing.T) { return } if assert.NoError(t, err) { - if diff := cmp.Diff(tc.wantGS, tc.gs); diff != "" { - t.Errorf("GameServer diff (-want +got):\n%s", diff) - } - if diff := cmp.Diff(oldPod, tc.pod); diff != "" { - t.Errorf("Pod was modified (-old +new):\n%s", diff) - } + require.Equal(t, tc.wantGS, tc.gs) + require.Equal(t, oldPod, tc.pod) } }) } } +func TestValidateGameServer(t *testing.T) { + for name, tc := range map[string]struct { + gs *agonesv1.GameServer + want []metav1.StatusCause + }{ + "no ports => validated": { + gs: &agonesv1.GameServer{}, + }, + "good ports => validated": { + gs: &agonesv1.GameServer{ + Spec: agonesv1.GameServerSpec{ + Ports: []agonesv1.GameServerPort{ + { + Name: "some-tcpudp", + PortPolicy: agonesv1.Dynamic, + ContainerPort: 4321, + Protocol: agonesv1.ProtocolTCPUDP, + }, + { + Name: "awesome-udp", + PortPolicy: agonesv1.Dynamic, + ContainerPort: 1234, + Protocol: corev1.ProtocolUDP, + }, + { + Name: "awesome-tcp", + PortPolicy: agonesv1.Dynamic, + ContainerPort: 1234, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + }, + "bad policy => fails validation": { + gs: &agonesv1.GameServer{ + Spec: agonesv1.GameServerSpec{ + Ports: []agonesv1.GameServerPort{ + { + Name: "best-tcpudp", + PortPolicy: agonesv1.Dynamic, + ContainerPort: 4321, + Protocol: agonesv1.ProtocolTCPUDP, + }, + { + Name: "bad-udp", + PortPolicy: agonesv1.Static, + ContainerPort: 1234, + Protocol: corev1.ProtocolUDP, + }, + { + Name: "another-bad-udp", + PortPolicy: agonesv1.Static, + ContainerPort: 1234, + Protocol: corev1.ProtocolUDP, + }, + }, + }, + }, + want: []metav1.StatusCause{ + { + Type: "FieldValueInvalid", + Message: "PortPolicy must be Dynamic on GKE Autopilot", + Field: "bad-udp.portPolicy", + }, + { + Type: "FieldValueInvalid", + Message: "PortPolicy must be Dynamic on GKE Autopilot", + Field: "another-bad-udp.portPolicy", + }, + }, + }, + } { + t.Run(name, func(t *testing.T) { + causes := (&gkeAutopilot{}).ValidateGameServer(tc.gs) + require.Equal(t, tc.want, causes) + }) + } +} + +func TestAutopilotPortAllocator(t *testing.T) { + for name, tc := range map[string]struct { + gs *agonesv1.GameServer + wantGS *agonesv1.GameServer + }{ + "no ports => no change": { + gs: &agonesv1.GameServer{}, + wantGS: &agonesv1.GameServer{}, + }, + "ports => assigned and annotated": { + gs: &agonesv1.GameServer{ + Spec: agonesv1.GameServerSpec{ + Ports: []agonesv1.GameServerPort{ + { + Name: "some-tcpudp", + PortPolicy: agonesv1.Dynamic, + ContainerPort: 4321, + Protocol: agonesv1.ProtocolTCPUDP, + }, + { + Name: "awesome-udp", + PortPolicy: agonesv1.Dynamic, + ContainerPort: 1234, + Protocol: corev1.ProtocolUDP, + }, + { + Name: "awesome-tcp", + PortPolicy: agonesv1.Dynamic, + ContainerPort: 1234, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + wantGS: &agonesv1.GameServer{ + Spec: agonesv1.GameServerSpec{ + Ports: []agonesv1.GameServerPort{ + { + Name: "some-tcpudp-tcp", + PortPolicy: agonesv1.Dynamic, + ContainerPort: 4321, + HostPort: 1, + Protocol: corev1.ProtocolTCP, + }, + { + Name: "some-tcpudp-udp", + PortPolicy: agonesv1.Dynamic, + ContainerPort: 4321, + HostPort: 1, + Protocol: corev1.ProtocolUDP, + }, + { + Name: "awesome-udp", + PortPolicy: agonesv1.Dynamic, + ContainerPort: 1234, + HostPort: 2, + Protocol: corev1.ProtocolUDP, + }, + { + Name: "awesome-tcp", + PortPolicy: agonesv1.Dynamic, + ContainerPort: 1234, + HostPort: 3, + Protocol: corev1.ProtocolTCP, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{"autopilot.gke.io/host-port-assignment": `{"min":8000,"max":9000}`}, + }, + }, + }, + }, + }, + "bad policy => no change (should be rejected by webhooks previously)": { + gs: &agonesv1.GameServer{ + Spec: agonesv1.GameServerSpec{ + Ports: []agonesv1.GameServerPort{ + { + Name: "awesome-udp", + PortPolicy: agonesv1.Static, + ContainerPort: 1234, + Protocol: corev1.ProtocolUDP, + }, + }, + }, + }, + wantGS: &agonesv1.GameServer{ + Spec: agonesv1.GameServerSpec{ + Ports: []agonesv1.GameServerPort{ + { + Name: "awesome-udp", + PortPolicy: agonesv1.Static, + ContainerPort: 1234, + Protocol: corev1.ProtocolUDP, + }, + }, + }, + }, + }, + } { + t.Run(name, func(t *testing.T) { + gs := (&autopilotPortAllocator{minPort: 8000, maxPort: 9000}).Allocate(tc.gs) + require.Equal(t, tc.wantGS, gs) + }) + } +} + func testPod(annotations map[string]string) *corev1.Pod { return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index 057e9b3db5..284293f81e 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -29,6 +29,7 @@ import ( "agones.dev/agones/pkg/client/informers/externalversions" listerv1 "agones.dev/agones/pkg/client/listers/agones/v1" "agones.dev/agones/pkg/cloudproduct" + "agones.dev/agones/pkg/portallocator" "agones.dev/agones/pkg/util/crd" "agones.dev/agones/pkg/util/logfields" "agones.dev/agones/pkg/util/runtime" @@ -81,7 +82,7 @@ type Controller struct { gameServerSynced cache.InformerSynced nodeLister corelisterv1.NodeLister nodeSynced cache.InformerSynced - portAllocator *PortAllocator + portAllocator portallocator.Interface healthController *HealthController migrationController *MigrationController missingPodController *MissingPodController @@ -133,7 +134,7 @@ func NewController( gameServerSynced: gsInformer.HasSynced, nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(), nodeSynced: kubeInformerFactory.Core().V1().Nodes().Informer().HasSynced, - portAllocator: NewPortAllocator(minPort, maxPort, kubeInformerFactory, agonesInformerFactory), + portAllocator: cloudProduct.NewPortAllocator(minPort, maxPort, kubeInformerFactory, agonesInformerFactory), healthController: NewHealthController(health, kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory), migrationController: NewMigrationController(health, kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory, cloudProduct), missingPodController: NewMissingPodController(health, kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory), @@ -285,6 +286,11 @@ func (c *Controller) creationValidationHandler(review admissionv1.AdmissionRevie c.loggerForGameServer(gs).WithField("review", review).Debug("creationValidationHandler") causes, ok := gs.Validate() + // Merge product-specific validation - we handle it here to avoid introducing cloudproduct to the api packages. + if productCauses := c.cloudProduct.ValidateGameServer(gs); len(productCauses) > 0 { + ok = false + causes = append(causes, productCauses...) + } if !ok { review.Response.Allowed = false details := metav1.StatusDetails{ diff --git a/pkg/gameservers/controller_test.go b/pkg/gameservers/controller_test.go index 8e6939c879..ea215edb9f 100644 --- a/pkg/gameservers/controller_test.go +++ b/pkg/gameservers/controller_test.go @@ -56,6 +56,11 @@ const ( var GameServerKind = metav1.GroupVersionKind(agonesv1.SchemeGroupVersion.WithKind("GameServer")) +type portAllocInternal interface { + NodeSynced() bool + SyncAll() error +} + func TestControllerSyncGameServer(t *testing.T) { t.Parallel() @@ -126,10 +131,10 @@ func TestControllerSyncGameServer(t *testing.T) { return true, gs, nil }) - ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.nodeSynced) + ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.(portAllocInternal).NodeSynced) defer cancel() - err := c.portAllocator.syncAll() + err := c.portAllocator.(portAllocInternal).SyncAll() assert.Nil(t, err) err = c.syncGameServer(ctx, "default/test") @@ -213,10 +218,10 @@ func TestControllerSyncGameServerWithDevIP(t *testing.T) { return true, gs, nil }) - ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.nodeSynced) + ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.(portAllocInternal).NodeSynced) defer cancel() - err := c.portAllocator.syncAll() + err := c.portAllocator.(portAllocInternal).SyncAll() assert.Nil(t, err) err = c.syncGameServer(ctx, "default/test") @@ -241,10 +246,10 @@ func TestControllerSyncGameServerWithDevIP(t *testing.T) { return true, nil, nil }) - ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.nodeSynced) + ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.(portAllocInternal).NodeSynced) defer cancel() - err := c.portAllocator.syncAll() + err := c.portAllocator.(portAllocInternal).SyncAll() require.NoError(t, err) err = c.syncGameServer(ctx, "default/test") @@ -695,9 +700,9 @@ func TestControllerSyncGameServerPortAllocationState(t *testing.T) { return true, gs, nil }) - ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.nodeSynced) + ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.(portAllocInternal).NodeSynced) defer cancel() - err := c.portAllocator.syncAll() + err := c.portAllocator.(portAllocInternal).SyncAll() require.NoError(t, err) result, err := c.syncGameServerPortAllocationState(ctx, fixture) @@ -734,9 +739,9 @@ func TestControllerSyncGameServerPortAllocationState(t *testing.T) { return true, gs, errors.New("update-err") }) - ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.nodeSynced) + ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.(portAllocInternal).NodeSynced) defer cancel() - err := c.portAllocator.syncAll() + err := c.portAllocator.(portAllocInternal).SyncAll() require.NoError(t, err) _, err = c.syncGameServerPortAllocationState(ctx, fixture) diff --git a/pkg/portallocator/doc.go b/pkg/portallocator/doc.go new file mode 100644 index 0000000000..c1688eb111 --- /dev/null +++ b/pkg/portallocator/doc.go @@ -0,0 +1,16 @@ +// Copyright 2018 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package portallocator defines a generic port allocator for game servers +package portallocator diff --git a/pkg/gameservers/portallocator.go b/pkg/portallocator/portallocator.go similarity index 86% rename from pkg/gameservers/portallocator.go rename to pkg/portallocator/portallocator.go index 7c8eedd4ea..34193987f0 100644 --- a/pkg/gameservers/portallocator.go +++ b/pkg/portallocator/portallocator.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package gameservers +package portallocator import ( "context" @@ -36,12 +36,7 @@ import ( // A set of port allocations for a node type portAllocation map[int32]bool -// PortAllocator manages the dynamic port -// allocation strategy. Only use exposed methods to ensure -// appropriate locking is taken. -// The PortAllocator does not currently support mixing static portAllocations (or any pods with defined HostPort) -// within the dynamic port range other than the ones it coordinates. -type PortAllocator struct { +type portAllocator struct { logger *logrus.Entry mutex sync.RWMutex portAllocations []portAllocation @@ -56,18 +51,34 @@ type PortAllocator struct { nodeInformer cache.SharedIndexInformer } -// NewPortAllocator returns a new dynamic port +// portallocator.Interface manages the dynamic port allocation strategy. +// +// The portallocator does not currently support mixing static portAllocations (or any pods with defined HostPort) +// within the dynamic port range other than the ones it coordinates. +type Interface interface { + // Run sets up the current state of port allocations and + // starts tracking Pod and Node changes + Run(ctx context.Context) error + + // Allocate assigns a port to the GameServer and returns it. + Allocate(gs *agonesv1.GameServer) *agonesv1.GameServer + + // DeAllocate marks the given ports as no longer allocated + DeAllocate(gs *agonesv1.GameServer) +} + +// New returns a new dynamic port // allocator. minPort and maxPort are the top and bottom portAllocations that can be allocated in the range for // the game servers -func NewPortAllocator(minPort, maxPort int32, +func New(minPort, maxPort int32, kubeInformerFactory informers.SharedInformerFactory, - agonesInformerFactory externalversions.SharedInformerFactory) *PortAllocator { + agonesInformerFactory externalversions.SharedInformerFactory) Interface { v1 := kubeInformerFactory.Core().V1() nodes := v1.Nodes() gameServers := agonesInformerFactory.Agones().V1().GameServers() - pa := &PortAllocator{ + pa := &portAllocator{ mutex: sync.RWMutex{}, minPort: minPort, maxPort: maxPort, @@ -91,7 +102,7 @@ func NewPortAllocator(minPort, maxPort int32, // Run sets up the current state of port allocations and // starts tracking Pod and Node changes -func (pa *PortAllocator) Run(ctx context.Context) error { +func (pa *portAllocator) Run(ctx context.Context) error { pa.logger.Debug("Running") if !cache.WaitForCacheSync(ctx.Done(), pa.gameServerSynced, pa.nodeSynced) { @@ -99,7 +110,7 @@ func (pa *PortAllocator) Run(ctx context.Context) error { } // on run, let's make sure we start with a perfect slate straight away - if err := pa.syncAll(); err != nil { + if err := pa.SyncAll(); err != nil { return errors.Wrap(err, "error performing initial sync") } @@ -107,7 +118,7 @@ func (pa *PortAllocator) Run(ctx context.Context) error { } // Allocate assigns a port to the GameServer and returns it. -func (pa *PortAllocator) Allocate(gs *agonesv1.GameServer) *agonesv1.GameServer { +func (pa *portAllocator) Allocate(gs *agonesv1.GameServer) *agonesv1.GameServer { pa.mutex.Lock() defer pa.mutex.Unlock() @@ -203,8 +214,8 @@ func (pa *PortAllocator) Allocate(gs *agonesv1.GameServer) *agonesv1.GameServer return allocate(gs) } -// DeAllocate marks the given port as no longer allocated -func (pa *PortAllocator) DeAllocate(gs *agonesv1.GameServer) { +// DeAllocate marks the given ports as no longer allocated +func (pa *portAllocator) DeAllocate(gs *agonesv1.GameServer) { // skip if it wasn't previously allocated found := func() bool { @@ -236,20 +247,22 @@ func (pa *PortAllocator) DeAllocate(gs *agonesv1.GameServer) { // syncDeleteGameServer when a GameServer Pod is deleted // make the HostPort available -func (pa *PortAllocator) syncDeleteGameServer(object interface{}) { +func (pa *portAllocator) syncDeleteGameServer(object interface{}) { if gs, ok := object.(*agonesv1.GameServer); ok { pa.logger.WithField("gs", gs).Debug("Syncing deleted GameServer") pa.DeAllocate(gs) } } -// syncAll syncs the pod, node and gameserver caches then +// SyncAll syncs the pod, node and gameserver caches then // traverses all Nodes in the cluster and all looks at GameServers // and Terminating Pods values make sure those // portAllocations are marked as taken. // Locks the mutex while doing this. // This is basically a stop the world Garbage Collection on port allocations, but it only happens on startup. -func (pa *PortAllocator) syncAll() error { +// +// SyncAll is exposed on the TestInterface only. +func (pa *portAllocator) SyncAll() error { pa.mutex.Lock() defer pa.mutex.Unlock() @@ -284,10 +297,16 @@ func (pa *PortAllocator) syncAll() error { return nil } +// NodeSynced returns whether the node informer is synced. +// NodeSynced is exposed on the TestInterface only. +func (pa *portAllocator) NodeSynced() bool { + return pa.nodeSynced() +} + // registerExistingGameServerPorts registers the gameservers against gsRegistry and the ports against nodePorts. // and returns an ordered list of portAllocations per cluster nodes, and an array of // any GameServers allocated a port, but not yet assigned a Node will returned as an array of port values. -func (pa *PortAllocator) registerExistingGameServerPorts(gameservers []*agonesv1.GameServer, nodes []*corev1.Node, gsRegistry map[types.UID]bool) ([]portAllocation, []int32) { +func (pa *portAllocator) registerExistingGameServerPorts(gameservers []*agonesv1.GameServer, nodes []*corev1.Node, gsRegistry map[types.UID]bool) ([]portAllocation, []int32) { // setup blank port values nodePortAllocation := pa.nodePortAllocation(nodes) nodePortCount := make(map[string]int64, len(nodes)) @@ -339,7 +358,7 @@ func (pa *PortAllocator) registerExistingGameServerPorts(gameservers []*agonesv1 // nodePortAllocation returns a map of port allocations all set to being available // with a map key for each node, as well as the node registry record (since we're already looping) -func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]portAllocation { +func (pa *portAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]portAllocation { nodePorts := map[string]portAllocation{} for _, n := range nodes { @@ -352,7 +371,7 @@ func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]por return nodePorts } -func (pa *PortAllocator) newPortAllocation() portAllocation { +func (pa *portAllocator) newPortAllocation() portAllocation { p := make(portAllocation, (pa.maxPort-pa.minPort)+1) for i := pa.minPort; i <= pa.maxPort; i++ { p[i] = false diff --git a/pkg/gameservers/portallocator_test.go b/pkg/portallocator/portallocator_test.go similarity index 94% rename from pkg/gameservers/portallocator_test.go rename to pkg/portallocator/portallocator_test.go index d138d1ee7a..63cf8debff 100644 --- a/pkg/gameservers/portallocator_test.go +++ b/pkg/portallocator/portallocator_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package gameservers +package portallocator import ( "fmt" @@ -48,7 +48,7 @@ func TestPortAllocatorAllocate(t *testing.T) { t.Run("test allocated port counts", func(t *testing.T) { m := agtesting.NewMocks() - pa := NewPortAllocator(10, 50, m.KubeInformerFactory, m.AgonesInformerFactory) + pa := New(10, 50, m.KubeInformerFactory, m.AgonesInformerFactory).(*portAllocator) nodeWatch := watch.NewFake() m.KubeClient.AddWatchReactor("nodes", k8stesting.DefaultWatchReactor(nodeWatch, nil)) @@ -61,7 +61,7 @@ func TestPortAllocatorAllocate(t *testing.T) { nodeWatch.Add(&n2) assert.True(t, cache.WaitForCacheSync(ctx.Done(), pa.nodeSynced)) - err := pa.syncAll() + err := pa.SyncAll() require.NoError(t, err) // single port dynamic @@ -137,7 +137,7 @@ func TestPortAllocatorAllocate(t *testing.T) { t.Run("ports are all allocated", func(t *testing.T) { m := agtesting.NewMocks() - pa := NewPortAllocator(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory) + pa := New(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory).(*portAllocator) nodeWatch := watch.NewFake() m.KubeClient.AddWatchReactor("nodes", k8stesting.DefaultWatchReactor(nodeWatch, nil)) @@ -149,7 +149,7 @@ func TestPortAllocatorAllocate(t *testing.T) { nodeWatch.Add(&n2) assert.True(t, cache.WaitForCacheSync(ctx.Done(), pa.nodeSynced)) - err := pa.syncAll() + err := pa.SyncAll() require.NoError(t, err) // two nodes @@ -173,7 +173,7 @@ func TestPortAllocatorAllocate(t *testing.T) { m := agtesting.NewMocks() minPort := int32(10) maxPort := int32(20) - pa := NewPortAllocator(minPort, maxPort, m.KubeInformerFactory, m.AgonesInformerFactory) + pa := New(minPort, maxPort, m.KubeInformerFactory, m.AgonesInformerFactory).(*portAllocator) nodeWatch := watch.NewFake() m.KubeClient.AddWatchReactor("nodes", k8stesting.DefaultWatchReactor(nodeWatch, nil)) @@ -191,7 +191,7 @@ func TestPortAllocatorAllocate(t *testing.T) { nodeWatch.Add(&n2) assert.True(t, cache.WaitForCacheSync(ctx.Done(), pa.nodeSynced)) - err := pa.syncAll() + err := pa.SyncAll() require.NoError(t, err) // two nodes @@ -241,7 +241,7 @@ func TestPortAllocatorAllocate(t *testing.T) { t.Run("ports are unique in a node", func(t *testing.T) { fixture := dynamicGameServerFixture() m := agtesting.NewMocks() - pa := NewPortAllocator(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory) + pa := New(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory).(*portAllocator) m.KubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { nl := &corev1.NodeList{Items: []corev1.Node{n1}} @@ -250,7 +250,7 @@ func TestPortAllocatorAllocate(t *testing.T) { _, cancel := agtesting.StartInformers(m, pa.nodeSynced) defer cancel() - err := pa.syncAll() + err := pa.SyncAll() require.NoError(t, err) var ports []int32 @@ -264,7 +264,7 @@ func TestPortAllocatorAllocate(t *testing.T) { t.Run("portPolicy as an empty string", func(t *testing.T) { m := agtesting.NewMocks() - pa := NewPortAllocator(10, 50, m.KubeInformerFactory, m.AgonesInformerFactory) + pa := New(10, 50, m.KubeInformerFactory, m.AgonesInformerFactory).(*portAllocator) nodeWatch := watch.NewFake() m.KubeClient.AddWatchReactor("nodes", k8stesting.DefaultWatchReactor(nodeWatch, nil)) @@ -277,7 +277,7 @@ func TestPortAllocatorAllocate(t *testing.T) { nodeWatch.Add(&n2) assert.True(t, cache.WaitForCacheSync(ctx.Done(), pa.nodeSynced)) - err := pa.syncAll() + err := pa.SyncAll() require.NoError(t, err) // single port empty @@ -292,7 +292,7 @@ func TestPortAllocatorAllocate(t *testing.T) { func TestPortAllocatorMultithreadAllocate(t *testing.T) { fixture := dynamicGameServerFixture() m := agtesting.NewMocks() - pa := NewPortAllocator(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory) + pa := New(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory).(*portAllocator) m.KubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { nl := &corev1.NodeList{Items: []corev1.Node{n1, n2}} @@ -301,7 +301,7 @@ func TestPortAllocatorMultithreadAllocate(t *testing.T) { _, cancel := agtesting.StartInformers(m, pa.nodeSynced) defer cancel() - err := pa.syncAll() + err := pa.SyncAll() require.NoError(t, err) wg := sync.WaitGroup{} @@ -331,7 +331,7 @@ func TestPortAllocatorDeAllocate(t *testing.T) { fixture := dynamicGameServerFixture() m := agtesting.NewMocks() - pa := NewPortAllocator(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory) + pa := New(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory).(*portAllocator) nodes := []corev1.Node{n1, n2, n3} m.KubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { nl := &corev1.NodeList{Items: nodes} @@ -339,7 +339,7 @@ func TestPortAllocatorDeAllocate(t *testing.T) { }) _, cancel := agtesting.StartInformers(m, pa.nodeSynced) defer cancel() - err := pa.syncAll() + err := pa.SyncAll() require.NoError(t, err) // gate @@ -372,7 +372,7 @@ func TestPortAllocatorSyncPortAllocations(t *testing.T) { t.Parallel() m := agtesting.NewMocks() - pa := NewPortAllocator(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory) + pa := New(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory).(*portAllocator) m.KubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { nl := &corev1.NodeList{Items: []corev1.Node{n1, n2, n3}} @@ -417,7 +417,7 @@ func TestPortAllocatorSyncPortAllocations(t *testing.T) { _, cancel := agtesting.StartInformers(m, pa.gameServerSynced, pa.nodeSynced) defer cancel() - err := pa.syncAll() + err := pa.SyncAll() require.NoError(t, err) assert.Len(t, pa.portAllocations, 3) @@ -463,7 +463,7 @@ func TestPortAllocatorSyncDeleteGameServer(t *testing.T) { }, Status: agonesv1.GameServerStatus{State: agonesv1.GameServerStateReady, Ports: []agonesv1.GameServerStatusPort{{Port: 10}}, NodeName: n2.ObjectMeta.Name}} - pa := NewPortAllocator(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory) + pa := New(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory).(*portAllocator) m.KubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { nl := &corev1.NodeList{Items: []corev1.Node{n1, n2, n3}} @@ -482,7 +482,7 @@ func TestPortAllocatorSyncDeleteGameServer(t *testing.T) { return len(list) == 3 }, 5*time.Second, time.Second) - err := pa.syncAll() + err := pa.SyncAll() require.NoError(t, err) // gate @@ -522,7 +522,7 @@ func TestNodePortAllocation(t *testing.T) { t.Parallel() m := agtesting.NewMocks() - pa := NewPortAllocator(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory) + pa := New(10, 20, m.KubeInformerFactory, m.AgonesInformerFactory).(*portAllocator) nodes := []corev1.Node{n1, n2, n3} m.KubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { nl := &corev1.NodeList{Items: nodes} @@ -559,7 +559,7 @@ func TestTakePortAllocation(t *testing.T) { func TestPortAllocatorRegisterExistingGameServerPorts(t *testing.T) { t.Parallel() m := agtesting.NewMocks() - pa := NewPortAllocator(10, 13, m.KubeInformerFactory, m.AgonesInformerFactory) + pa := New(10, 13, m.KubeInformerFactory, m.AgonesInformerFactory).(*portAllocator) gs1 := &agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", UID: "1"}, Spec: agonesv1.GameServerSpec{ @@ -603,7 +603,7 @@ func dynamicGameServerFixture() *agonesv1.GameServer { // countAllocatedPorts counts how many of a given port have been // allocated across nodes -func countAllocatedPorts(pa *PortAllocator, p int32) int { +func countAllocatedPorts(pa *portAllocator, p int32) int { count := 0 for _, node := range pa.portAllocations { if node[p] { @@ -614,7 +614,7 @@ func countAllocatedPorts(pa *PortAllocator, p int32) int { } // countTotalAllocatedPorts counts the total number of allocated ports -func countTotalAllocatedPorts(pa *PortAllocator) int { +func countTotalAllocatedPorts(pa *portAllocator) int { count := 0 for _, node := range pa.portAllocations { for _, alloc := range node { @@ -626,7 +626,7 @@ func countTotalAllocatedPorts(pa *PortAllocator) int { return count } -func countTotalPorts(pa *PortAllocator) int { +func countTotalPorts(pa *portAllocator) int { count := 0 for _, node := range pa.portAllocations { count += len(node)