Skip to content

Commit

Permalink
Cloud product: Allow for different port allocators between cloud prod…
Browse files Browse the repository at this point in the history
…ucts

In the Agones on GKE Autopilot implementation, we have no need for the
port allocator - the informer/etc. is an unnecessary moving piece.
This PR allows for cloud products to provide their own port allocation
implementation. We do this by:

* Splitting portallocator off to its own package. It was basically
self-sufficient anyways, except it was a little too friendly with
controller_test.go. I solved that by introducing a TestInterface for
controller_test.go to upcast to.

* Allow cloud product implementations to define their own port
allocator.

A couple of notes:

* This doesn't yet do anything different in GKE Autopilot, it just
provides the place to do it.

* This change is layered on top of #2782 - on review, you can skip
those changes.
  • Loading branch information
zmerlynn committed Nov 2, 2022
1 parent e7499e8 commit 031a2df
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 58 deletions.
6 changes: 6 additions & 0 deletions pkg/cloudproduct/cloudproduct.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
"fmt"

agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
"agones.dev/agones/pkg/client/informers/externalversions"
"agones.dev/agones/pkg/cloudproduct/generic"
"agones.dev/agones/pkg/cloudproduct/gke"
"agones.dev/agones/pkg/portallocator"
"agones.dev/agones/pkg/util/runtime"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)

Expand All @@ -33,6 +36,9 @@ type CloudProduct interface {
// SyncPodPortsToGameServer runs after a Pod has been assigned to a Node and before we sync
// Pod host ports to the GameServer status.
SyncPodPortsToGameServer(*agonesv1.GameServer, *corev1.Pod) error

// PortAllocatorFactory creates a PortAllocator. c.f. gameservers.NewPortAllocator for parameters.
PortAllocatorFactory(int32, int32, informers.SharedInformerFactory, externalversions.SharedInformerFactory) portallocator.Interface
}

var (
Expand Down
9 changes: 9 additions & 0 deletions pkg/cloudproduct/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,20 @@ package generic

import (
agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
"agones.dev/agones/pkg/client/informers/externalversions"
"agones.dev/agones/pkg/portallocator"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
)

func New() (*generic, error) { return &generic{}, nil }

type generic struct{}

func (*generic) SyncPodPortsToGameServer(*agonesv1.GameServer, *corev1.Pod) error { return nil }

func (*generic) PortAllocatorFactory(minPort, maxPort int32,
kubeInformerFactory informers.SharedInformerFactory,
agonesInformerFactory externalversions.SharedInformerFactory) portallocator.Interface {
return portallocator.New(minPort, maxPort, kubeInformerFactory, agonesInformerFactory)
}
9 changes: 9 additions & 0 deletions pkg/cloudproduct/gke/gke.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ import (
"encoding/json"

agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
"agones.dev/agones/pkg/client/informers/externalversions"
"agones.dev/agones/pkg/portallocator"
"agones.dev/agones/pkg/util/runtime"
"cloud.google.com/go/compute/metadata"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -82,3 +85,9 @@ func (*gkeAutopilot) SyncPodPortsToGameServer(gs *agonesv1.GameServer, pod *core
}
return nil
}

func (*gkeAutopilot) PortAllocatorFactory(minPort, maxPort int32,
kubeInformerFactory informers.SharedInformerFactory,
agonesInformerFactory externalversions.SharedInformerFactory) portallocator.Interface {
return portallocator.New(minPort, maxPort, kubeInformerFactory, agonesInformerFactory)
}
5 changes: 3 additions & 2 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"agones.dev/agones/pkg/client/informers/externalversions"
listerv1 "agones.dev/agones/pkg/client/listers/agones/v1"
"agones.dev/agones/pkg/cloudproduct"
"agones.dev/agones/pkg/portallocator"
"agones.dev/agones/pkg/util/crd"
"agones.dev/agones/pkg/util/logfields"
"agones.dev/agones/pkg/util/runtime"
Expand Down Expand Up @@ -81,7 +82,7 @@ type Controller struct {
gameServerSynced cache.InformerSynced
nodeLister corelisterv1.NodeLister
nodeSynced cache.InformerSynced
portAllocator *PortAllocator
portAllocator portallocator.Interface
healthController *HealthController
migrationController *MigrationController
missingPodController *MissingPodController
Expand Down Expand Up @@ -133,7 +134,7 @@ func NewController(
gameServerSynced: gsInformer.HasSynced,
nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(),
nodeSynced: kubeInformerFactory.Core().V1().Nodes().Informer().HasSynced,
portAllocator: NewPortAllocator(minPort, maxPort, kubeInformerFactory, agonesInformerFactory),
portAllocator: cloudProduct.PortAllocatorFactory(minPort, maxPort, kubeInformerFactory, agonesInformerFactory),
healthController: NewHealthController(health, kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory),
migrationController: NewMigrationController(health, kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory, cloudProduct),
missingPodController: NewMissingPodController(health, kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory),
Expand Down
37 changes: 27 additions & 10 deletions pkg/gameservers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"agones.dev/agones/pkg/apis/agones"
agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
"agones.dev/agones/pkg/cloudproduct"
"agones.dev/agones/pkg/portallocator"
agtesting "agones.dev/agones/pkg/testing"
"agones.dev/agones/pkg/util/webhooks"
"github.com/heptiolabs/healthcheck"
Expand Down Expand Up @@ -126,10 +127,10 @@ func TestControllerSyncGameServer(t *testing.T) {
return true, gs, nil
})

ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.nodeSynced)
ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, portAllocatorNodeSynced(t, c.portAllocator))
defer cancel()

err := c.portAllocator.syncAll()
err := portAllocatorSyncAll(t, c.portAllocator)
assert.Nil(t, err)

err = c.syncGameServer(ctx, "default/test")
Expand Down Expand Up @@ -213,10 +214,10 @@ func TestControllerSyncGameServerWithDevIP(t *testing.T) {
return true, gs, nil
})

ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.nodeSynced)
ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, portAllocatorNodeSynced(t, c.portAllocator))
defer cancel()

err := c.portAllocator.syncAll()
err := portAllocatorSyncAll(t, c.portAllocator)
assert.Nil(t, err)

err = c.syncGameServer(ctx, "default/test")
Expand All @@ -241,10 +242,10 @@ func TestControllerSyncGameServerWithDevIP(t *testing.T) {
return true, nil, nil
})

ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.nodeSynced)
ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, portAllocatorNodeSynced(t, c.portAllocator))
defer cancel()

err := c.portAllocator.syncAll()
err := portAllocatorSyncAll(t, c.portAllocator)
require.NoError(t, err)

err = c.syncGameServer(ctx, "default/test")
Expand Down Expand Up @@ -695,9 +696,9 @@ func TestControllerSyncGameServerPortAllocationState(t *testing.T) {
return true, gs, nil
})

ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.nodeSynced)
ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, portAllocatorNodeSynced(t, c.portAllocator))
defer cancel()
err := c.portAllocator.syncAll()
err := portAllocatorSyncAll(t, c.portAllocator)
require.NoError(t, err)

result, err := c.syncGameServerPortAllocationState(ctx, fixture)
Expand Down Expand Up @@ -734,9 +735,9 @@ func TestControllerSyncGameServerPortAllocationState(t *testing.T) {
return true, gs, errors.New("update-err")
})

ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, c.portAllocator.nodeSynced)
ctx, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, portAllocatorNodeSynced(t, c.portAllocator))
defer cancel()
err := c.portAllocator.syncAll()
err := portAllocatorSyncAll(t, c.portAllocator)
require.NoError(t, err)

_, err = c.syncGameServerPortAllocationState(ctx, fixture)
Expand Down Expand Up @@ -1962,3 +1963,19 @@ func newSingleContainerSpec() agonesv1.GameServerSpec {
},
}
}

func portAllocatorNodeSynced(t *testing.T, pai portallocator.Interface) func() bool {
pat, ok := pai.(portallocator.TestInterface)
if !ok {
t.Fatalf("portAllocatorNodeSynced called on portallocator.Interface of type %t", pai)
}
return pat.NodeSynced
}

func portAllocatorSyncAll(t *testing.T, pai portallocator.Interface) error {
pat, ok := pai.(portallocator.TestInterface)
if !ok {
t.Fatalf("portAllocatorNodeSynced called on portallocator.Interface of type %t", pai)
}
return pat.SyncAll()
}
16 changes: 16 additions & 0 deletions pkg/portallocator/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2018 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package portallocator defines a generic port allocator for game servers
package portallocator
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package gameservers
package portallocator

import (
"context"
Expand All @@ -36,12 +36,7 @@ import (
// A set of port allocations for a node
type portAllocation map[int32]bool

// PortAllocator manages the dynamic port
// allocation strategy. Only use exposed methods to ensure
// appropriate locking is taken.
// The PortAllocator does not currently support mixing static portAllocations (or any pods with defined HostPort)
// within the dynamic port range other than the ones it coordinates.
type PortAllocator struct {
type portAllocator struct {
logger *logrus.Entry
mutex sync.RWMutex
portAllocations []portAllocation
Expand All @@ -56,18 +51,42 @@ type PortAllocator struct {
nodeInformer cache.SharedIndexInformer
}

// NewPortAllocator returns a new dynamic port
// portallocator.Interface manages the dynamic port allocation strategy. Use portallocator.Interface
// in production code, or portallocator.TestInterface in tests.
//
// The portallocator does not currently support mixing static portAllocations (or any pods with defined HostPort)
// within the dynamic port range other than the ones it coordinates.
type Interface interface {
// Run sets up the current state of port allocations and
// starts tracking Pod and Node changes
Run(ctx context.Context) error

// Allocate assigns a port to the GameServer and returns it.
Allocate(gs *agonesv1.GameServer) *agonesv1.GameServer

// DeAllocate marks the given ports as no longer allocated
DeAllocate(gs *agonesv1.GameServer)
}

// Use only in test code, by upcasting from Interface to TestInterface
type TestInterface interface {
Interface
NodeSynced() bool
SyncAll() error
}

// New returns a new dynamic port
// allocator. minPort and maxPort are the top and bottom portAllocations that can be allocated in the range for
// the game servers
func NewPortAllocator(minPort, maxPort int32,
func New(minPort, maxPort int32,
kubeInformerFactory informers.SharedInformerFactory,
agonesInformerFactory externalversions.SharedInformerFactory) *PortAllocator {
agonesInformerFactory externalversions.SharedInformerFactory) Interface {

v1 := kubeInformerFactory.Core().V1()
nodes := v1.Nodes()
gameServers := agonesInformerFactory.Agones().V1().GameServers()

pa := &PortAllocator{
pa := &portAllocator{
mutex: sync.RWMutex{},
minPort: minPort,
maxPort: maxPort,
Expand All @@ -91,23 +110,23 @@ func NewPortAllocator(minPort, maxPort int32,

// Run sets up the current state of port allocations and
// starts tracking Pod and Node changes
func (pa *PortAllocator) Run(ctx context.Context) error {
func (pa *portAllocator) Run(ctx context.Context) error {
pa.logger.Debug("Running")

if !cache.WaitForCacheSync(ctx.Done(), pa.gameServerSynced, pa.nodeSynced) {
return errors.New("failed to wait for caches to sync")
}

// on run, let's make sure we start with a perfect slate straight away
if err := pa.syncAll(); err != nil {
if err := pa.SyncAll(); err != nil {
return errors.Wrap(err, "error performing initial sync")
}

return nil
}

// Allocate assigns a port to the GameServer and returns it.
func (pa *PortAllocator) Allocate(gs *agonesv1.GameServer) *agonesv1.GameServer {
func (pa *portAllocator) Allocate(gs *agonesv1.GameServer) *agonesv1.GameServer {
pa.mutex.Lock()
defer pa.mutex.Unlock()

Expand Down Expand Up @@ -203,8 +222,8 @@ func (pa *PortAllocator) Allocate(gs *agonesv1.GameServer) *agonesv1.GameServer
return allocate(gs)
}

// DeAllocate marks the given port as no longer allocated
func (pa *PortAllocator) DeAllocate(gs *agonesv1.GameServer) {
// DeAllocate marks the given ports as no longer allocated
func (pa *portAllocator) DeAllocate(gs *agonesv1.GameServer) {
// skip if it wasn't previously allocated

found := func() bool {
Expand Down Expand Up @@ -236,20 +255,22 @@ func (pa *PortAllocator) DeAllocate(gs *agonesv1.GameServer) {

// syncDeleteGameServer when a GameServer Pod is deleted
// make the HostPort available
func (pa *PortAllocator) syncDeleteGameServer(object interface{}) {
func (pa *portAllocator) syncDeleteGameServer(object interface{}) {
if gs, ok := object.(*agonesv1.GameServer); ok {
pa.logger.WithField("gs", gs).Debug("Syncing deleted GameServer")
pa.DeAllocate(gs)
}
}

// syncAll syncs the pod, node and gameserver caches then
// SyncAll syncs the pod, node and gameserver caches then
// traverses all Nodes in the cluster and all looks at GameServers
// and Terminating Pods values make sure those
// portAllocations are marked as taken.
// Locks the mutex while doing this.
// This is basically a stop the world Garbage Collection on port allocations, but it only happens on startup.
func (pa *PortAllocator) syncAll() error {
//
// SyncAll is exposed on the TestInterface only.
func (pa *portAllocator) SyncAll() error {
pa.mutex.Lock()
defer pa.mutex.Unlock()

Expand Down Expand Up @@ -284,10 +305,16 @@ func (pa *PortAllocator) syncAll() error {
return nil
}

// NodeSynced returns whether the node informer is synced.
// NodeSynced is exposed on the TestInterface only.
func (pa *portAllocator) NodeSynced() bool {
return pa.nodeSynced()
}

// registerExistingGameServerPorts registers the gameservers against gsRegistry and the ports against nodePorts.
// and returns an ordered list of portAllocations per cluster nodes, and an array of
// any GameServers allocated a port, but not yet assigned a Node will returned as an array of port values.
func (pa *PortAllocator) registerExistingGameServerPorts(gameservers []*agonesv1.GameServer, nodes []*corev1.Node, gsRegistry map[types.UID]bool) ([]portAllocation, []int32) {
func (pa *portAllocator) registerExistingGameServerPorts(gameservers []*agonesv1.GameServer, nodes []*corev1.Node, gsRegistry map[types.UID]bool) ([]portAllocation, []int32) {
// setup blank port values
nodePortAllocation := pa.nodePortAllocation(nodes)
nodePortCount := make(map[string]int64, len(nodes))
Expand Down Expand Up @@ -339,7 +366,7 @@ func (pa *PortAllocator) registerExistingGameServerPorts(gameservers []*agonesv1

// nodePortAllocation returns a map of port allocations all set to being available
// with a map key for each node, as well as the node registry record (since we're already looping)
func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]portAllocation {
func (pa *portAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]portAllocation {
nodePorts := map[string]portAllocation{}

for _, n := range nodes {
Expand All @@ -352,7 +379,7 @@ func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]por
return nodePorts
}

func (pa *PortAllocator) newPortAllocation() portAllocation {
func (pa *portAllocator) newPortAllocation() portAllocation {
p := make(portAllocation, (pa.maxPort-pa.minPort)+1)
for i := pa.minPort; i <= pa.maxPort; i++ {
p[i] = false
Expand Down
Loading

0 comments on commit 031a2df

Please sign in to comment.