From ae123b47155188f8d78eaa3a57668e863a72410d Mon Sep 17 00:00:00 2001 From: Dimitris-Ilias Gkanatsios Date: Thu, 17 Mar 2022 09:47:04 -0700 Subject: [PATCH] PortRegistry V2 (#183) * PortRegistry V2 * adding label filter for node pools/groups * simplified algorithm in portRegistry --- .github/workflows/main.yml | 6 +- cmd/nodeagent/nodeagentmanager_test.go | 14 +- docs/howtos/configureportrange.md | 2 +- pkg/operator/Makefile | 2 +- .../controllers/gameserver_controller.go | 10 +- pkg/operator/controllers/port_registry.go | 255 +++++++---- .../controllers/port_registry_test.go | 416 ++++++++++++++---- pkg/operator/controllers/suite_test.go | 15 +- pkg/operator/controllers/utilities.go | 21 + pkg/operator/main.go | 62 ++- 10 files changed, 598 insertions(+), 205 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 559bfc2f..7117278b 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -36,11 +36,11 @@ jobs: - name: build Docker images run: make builddockerlocal - name: GameServer API service unit tests - run: cd cmd/gameserverapi && GIN_MODE=release go test + run: cd cmd/gameserverapi && GIN_MODE=release go test -race - name: initcontainer unit tests - run: cd cmd/initcontainer && go test + run: cd cmd/initcontainer && go test -race - name: nodeagent unit tests - run: cd cmd/nodeagent && go test + run: cd cmd/nodeagent && go test -race - name: operator unit tests run: IMAGE_NAME_SAMPLE=thundernetes-netcore IMAGE_NAME_INIT_CONTAINER=thundernetes-initcontainer TAG=$(git rev-list HEAD --max-count=1 --abbrev-commit) make -C pkg/operator test - name: install kind binaries diff --git a/cmd/nodeagent/nodeagentmanager_test.go b/cmd/nodeagent/nodeagentmanager_test.go index 9dca608a..47b57911 100644 --- a/cmd/nodeagent/nodeagentmanager_test.go +++ b/cmd/nodeagent/nodeagentmanager_test.go @@ -122,7 +122,14 @@ var _ = Describe("nodeagent tests", func() { if !ok { return false } +<<<<<<< HEAD return tempgs.(*GameServerDetails).IsActive && tempgs.(*GameServerDetails).PreviousGameState == GameStateStandingBy +======= + tempgs.(*GameServerDetails).Mutex.RLock() + gsd := *tempgs.(*GameServerDetails) + tempgs.(*GameServerDetails).Mutex.RUnlock() + return gsd.WasActivated && gsd.PreviousGameState == GameStateStandingBy +>>>>>>> PortRegistry V2 }).Should(BeTrue()) // heartbeat from the game is still StandingBy @@ -196,7 +203,7 @@ var _ = Describe("nodeagent tests", func() { // update GameServer CR to active gs.Object["status"].(map[string]interface{})["state"] = "Active" gs.Object["status"].(map[string]interface{})["health"] = "Healthy" - gs.Object["status"].(map[string]interface{})["sessiocCookie"] = "cookie123" + gs.Object["status"].(map[string]interface{})["sessionCookie"] = "cookie123" _, err = dynamicClient.Resource(gameserverGVR).Namespace(testGameServerNamespace).Update(context.Background(), gs, metav1.UpdateOptions{}) Expect(err).ToNot(HaveOccurred()) @@ -206,7 +213,10 @@ var _ = Describe("nodeagent tests", func() { if !ok { return false } - return tempgs.(*GameServerDetails).IsActive && tempgs.(*GameServerDetails).PreviousGameState == GameStateStandingBy + tempgs.(*GameServerDetails).Mutex.RLock() + gsd := *tempgs.(*GameServerDetails) + tempgs.(*GameServerDetails).Mutex.RUnlock() + return gsd.IsActive && tempgs.(*GameServerDetails).PreviousGameState == GameStateStandingBy }).Should(BeTrue()) // heartbeat from the game is still StandingBy diff --git a/docs/howtos/configureportrange.md b/docs/howtos/configureportrange.md index 56cd655c..efc50595 100644 --- a/docs/howtos/configureportrange.md +++ b/docs/howtos/configureportrange.md @@ -7,4 +7,4 @@ nav_order: 7 # Configure Thundernetes port range -By default, Thundernetes will allocate ports in the range 10000-12000 to your GameServers. These ports are allocated to the entire set of VMs in the cluster, meaning that the max number of GameServers is 2000. If you need more or just a different port range, you can configure it via changing the `MIN_PORT` and the `MAX_PORT` environment variables in the controller deployment YAML file. \ No newline at end of file +By default, Thundernetes will allocate ports in the range 10000-12000 to your GameServers. These ports are allocated to the entire set of VMs in the cluster and are open for each and every VM. If you need more or just a different port range, you can configure it via changing the `MIN_PORT` and the `MAX_PORT` environment variables in the controller deployment YAML file. However, do not modify the port range when there game servers running on the cluster, since this will probably corrupt the port registry, especially if the new and the old range are different. \ No newline at end of file diff --git a/pkg/operator/Makefile b/pkg/operator/Makefile index 5b4e65ef..e637455e 100644 --- a/pkg/operator/Makefile +++ b/pkg/operator/Makefile @@ -62,7 +62,7 @@ vet: ## Run go vet against code. ENVTEST_ASSETS_DIR=$(shell pwd)/testbin .PHONY: test test: manifests generate fmt vet envtest ## Run tests. - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" THUNDERNETES_SAMPLE_IMAGE=$(IMAGE_NAME_SAMPLE):$(NETCORE_SAMPLE_TAG) THUNDERNETES_INIT_CONTAINER_IMAGE=$(IMAGE_NAME_INIT_CONTAINER):$(IMAGE_TAG) go test ./... -coverprofile cover.out + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" THUNDERNETES_SAMPLE_IMAGE=$(IMAGE_NAME_SAMPLE):$(NETCORE_SAMPLE_TAG) THUNDERNETES_INIT_CONTAINER_IMAGE=$(IMAGE_NAME_INIT_CONTAINER):$(IMAGE_TAG) go test -race ./... -v -coverprofile cover.out ##@ Build diff --git a/pkg/operator/controllers/gameserver_controller.go b/pkg/operator/controllers/gameserver_controller.go index 6dc242b7..4f41a3d0 100644 --- a/pkg/operator/controllers/gameserver_controller.go +++ b/pkg/operator/controllers/gameserver_controller.go @@ -101,7 +101,11 @@ func (r *GameServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) if containsString(gs.GetFinalizers(), finalizerName) { patch := client.MergeFrom(gs.DeepCopy()) // our finalizer is present, so lets handle any external dependency - r.unassignPorts(&gs) + err := r.unassignPorts(&gs) + if err != nil { + // we're logging the error but no more actions + log.Error(err, "unable to unassign ports. PortRegistry might be corrupt", "GameServer", gs.Name) + } // remove our finalizer from the list and update it. controllerutil.RemoveFinalizer(&gs, finalizerName) if err := r.Patch(ctx, &gs, patch); err != nil { @@ -223,7 +227,7 @@ func (r *GameServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // unassignPorts will remove any ports that are used by this GameServer from the port registry -func (r *GameServerReconciler) unassignPorts(gs *mpsv1alpha1.GameServer) { +func (r *GameServerReconciler) unassignPorts(gs *mpsv1alpha1.GameServer) error { hostPorts := make([]int32, 0) for i := 0; i < len(gs.Spec.Template.Spec.Containers); i++ { container := gs.Spec.Template.Spec.Containers[i] @@ -233,7 +237,7 @@ func (r *GameServerReconciler) unassignPorts(gs *mpsv1alpha1.GameServer) { } } } - r.PortRegistry.DeregisterServerPorts(hostPorts) + return r.PortRegistry.DeregisterServerPorts(hostPorts) } // SetupWithManager sets up the controller with the Manager. diff --git a/pkg/operator/controllers/port_registry.go b/pkg/operator/controllers/port_registry.go index fce2a94f..e3645fb1 100644 --- a/pkg/operator/controllers/port_registry.go +++ b/pkg/operator/controllers/port_registry.go @@ -1,33 +1,62 @@ package controllers import ( + "context" "errors" "fmt" + "sync" "github.com/go-logr/logr" mpsv1alpha1 "github.com/playfab/thundernetes/pkg/operator/api/v1alpha1" + v1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" ) // PortRegistry implements a custom map for the port registry type PortRegistry struct { - HostPorts map[int32]bool - Indexes []int32 - NextFreePortIndex int32 - Min int32 // Minimum Port - Max int32 // Maximum Port - portRequests chan struct{} // buffered channel to store port requests (request is the containerPort) - portResponses chan int32 // buffered channel to store port responses (system returns the HostPort) + client client.Client // used to get the list of nodes + NodeCount int // the number of Ready and Schedulable nodes in the cluster + HostPortsPerNode map[int32]int // a slice for the entire port range. increases by 1 for each registered port + Min int32 // Minimum Port + Max int32 // Maximum Port + lockMutex sync.Mutex // lock for the map + useSpecificNodePool bool // if true, we only take into account Nodes that have the Label "mps.playfab.com/gameservernode"=true + nextPortNumber int32 // the next port to be assigned } -// NewPortRegistry initializes the IndexedDictionary that holds the port registry. -func NewPortRegistry(gameServers mpsv1alpha1.GameServerList, min, max int32, setupLog logr.Logger) (*PortRegistry, error) { +// NewPortRegistry initializes the map[port]counter that holds the port registry +// The way that this works is the following: +// We keep a map (HostPortsPerNode) of all the port numbers +// every time a new port is requested, we check if the counter for this port is less than the number of Nodes +// if it is, we increase it by one. If not, we check the next port. +// the nextPortNumber facilitates getting the next port (port+1), +// since getting the same port again would cause the GameServer Pod to be placed on a different Node, to avoid collision. +// This would have a negative impact in cases where we want as many GameServers as possible on the same Node. +// We also set up a Kubernetes Watch for the Nodes +// When a new Node is added or removed to the cluster, we modify the NodeCount variable +func NewPortRegistry(client client.Client, gameServers *mpsv1alpha1.GameServerList, min, max int32, nodeCount int, useSpecificNodePool bool, setupLog logr.Logger) (*PortRegistry, error) { + if min > max { + return nil, errors.New("min port cannot be greater than max port") + } + pr := &PortRegistry{ - HostPorts: make(map[int32]bool, max-min+1), - Indexes: make([]int32, max-min+1), - Min: min, - Max: max, - portRequests: make(chan struct{}, 100), - portResponses: make(chan int32, 100), + client: client, + Min: min, + Max: max, + lockMutex: sync.Mutex{}, + useSpecificNodePool: useSpecificNodePool, + nextPortNumber: min, + NodeCount: nodeCount, + } + + // initialize the ports + pr.HostPortsPerNode = make(map[int32]int) + for port := pr.Min; port <= pr.Max; port++ { + pr.HostPortsPerNode[port] = 0 } // gather ports for existing game servers @@ -39,7 +68,6 @@ func NewPortRegistry(gameServers mpsv1alpha1.GameServerList, min, max int32, set } for _, container := range gs.Spec.Template.Spec.Containers { - portsExposed := make([]int32, len(container.Ports)) portsExposedIndex := 0 @@ -51,115 +79,152 @@ func NewPortRegistry(gameServers mpsv1alpha1.GameServerList, min, max int32, set portsExposed[portsExposedIndex] = portInfo.HostPort portsExposedIndex++ } - + // and register them pr.assignRegisteredPorts(portsExposed) } - } } - - pr.assignUnregisteredPorts() - - go pr.portProducer() - return pr, nil - } -func (pr *PortRegistry) displayRegistry() { - fmt.Printf("-------------------------------------\n") - fmt.Printf("Ports: %v\n", pr.HostPorts) - fmt.Printf("Indexes: %v\n", pr.Indexes) - fmt.Printf("NextIndex: %d\n", pr.NextFreePortIndex) - fmt.Printf("-------------------------------------\n") -} +// Reconcile runs when a Node is created/deleted or the node status changes +func (pr *PortRegistry) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := log.FromContext(ctx) + var nodeList v1.NodeList + var err error + + // if we have a specific node pool/group for game servers (with mps.playfab.com/gameservernode=true Label) + if pr.useSpecificNodePool { + err = pr.client.List(ctx, &nodeList, client.MatchingLabels{LabelGameServerNode: "true"}) + } else { // get all the Nodes + err = pr.client.List(ctx, &nodeList) + } -// GetNewPort returns and registers a new port for the designated game server. Locks a mutex -func (pr *PortRegistry) GetNewPort() (int32, error) { - pr.portRequests <- struct{}{} + if err != nil { + return ctrl.Result{}, err + } - port := <-pr.portResponses + // calculate how many nodes are ready and schedulable + schedulableNodesCount := 0 + for i := 0; i < len(nodeList.Items); i++ { + if !nodeList.Items[i].Spec.Unschedulable { + schedulableNodesCount++ + } + } + log.Info("Reconciling Nodes", "schedulableNodesCount", schedulableNodesCount, "currentNodesCount", pr.NodeCount) - if port == -1 { - return -1, errors.New("cannot register a new port. No available ports") + // most probably it will just be a single node added or removed, but just in case + if pr.NodeCount > schedulableNodesCount { + for i := pr.NodeCount - 1; i >= schedulableNodesCount; i-- { + log.Info("Node was removed") + pr.onNodeRemoved() + } + } else if pr.NodeCount < schedulableNodesCount { + for i := pr.NodeCount; i < schedulableNodesCount; i++ { + log.Info("Node was added") + pr.onNodeAdded() + } } - return port, nil + return ctrl.Result{}, nil } -func (pr *PortRegistry) portProducer() { - for range pr.portRequests { //wait till a new request comes - - initialIndex := pr.NextFreePortIndex - for { - if !pr.HostPorts[pr.Indexes[pr.NextFreePortIndex]] { - //we found a port - port := pr.Indexes[pr.NextFreePortIndex] - pr.HostPorts[port] = true - - pr.increaseNextFreePortIndex() - - //port is set - pr.portResponses <- port - break - } +// onNodeAdded is called when a Node is added to the cluster +func (pr *PortRegistry) onNodeAdded() { + defer pr.lockMutex.Unlock() + pr.lockMutex.Lock() + pr.NodeCount++ +} - pr.increaseNextFreePortIndex() +// onNodeRemoved is called when a Node is removed from the cluster +// it removes one set of port ranges from the map +// we don't need to know which one was removed, we just need to move around the registered (set to true) ports +func (pr *PortRegistry) onNodeRemoved() { + defer pr.lockMutex.Unlock() + pr.lockMutex.Lock() + pr.NodeCount-- +} - if initialIndex == pr.NextFreePortIndex { - //we did a full loop - no empty ports - pr.portResponses <- -1 - break +// GetNewPort returns and registers a new port for the designated game server +// One may wonder what happens if two GameServer Pods get assigned the same HostPort +// The answer is that we will not have a collision, since Kubernetes is pretty smart and will place the Pod on a different Node, to prevent it +func (pr *PortRegistry) GetNewPort() (int32, error) { + defer pr.lockMutex.Unlock() + pr.lockMutex.Lock() + + for port := pr.nextPortNumber; port <= pr.Max; port++ { + // this port is used less than maximum times (where maximum is the number of nodes) + if pr.HostPortsPerNode[port] < pr.NodeCount { + pr.HostPortsPerNode[port]++ + pr.nextPortNumber = port + 1 + // we did a full cycle on the map + if pr.nextPortNumber > pr.Max { + pr.nextPortNumber = pr.Min } + return port, nil } } -} -// Stop stops port registry mechanism by closing requests and responses channels -func (pr *PortRegistry) Stop() { - close(pr.portRequests) - close(pr.portResponses) + return -1, errors.New("cannot register a new port. No available ports") } // DeregisterServerPorts deregisters all host ports so they can be re-used by additional game servers -func (pr *PortRegistry) DeregisterServerPorts(ports []int32) { +func (pr *PortRegistry) DeregisterServerPorts(ports []int32) error { + defer pr.lockMutex.Unlock() + pr.lockMutex.Lock() for i := 0; i < len(ports); i++ { - pr.HostPorts[ports[i]] = false + if pr.HostPortsPerNode[ports[i]] > 0 { + pr.HostPortsPerNode[ports[i]]-- + } else { + return fmt.Errorf("cannot deregister port %d, it is not registered", ports[i]) + } } + return nil } +// assignRegisteredPorts assigns ports that are already registered +// used for existing game servers and when the controller is updated/crashed and started again func (pr *PortRegistry) assignRegisteredPorts(ports []int32) { + defer pr.lockMutex.Unlock() + pr.lockMutex.Lock() for i := 0; i < len(ports); i++ { - pr.HostPorts[ports[i]] = true - pr.Indexes[i] = ports[i] - pr.increaseNextFreePortIndex() - } -} - -func (pr *PortRegistry) assignUnregisteredPorts() { - i := pr.NextFreePortIndex - for _, port := range pr.getPorts() { - if _, ok := pr.HostPorts[port]; !ok { - pr.HostPorts[port] = false - pr.Indexes[i] = port - i++ - } + pr.HostPortsPerNode[ports[i]]++ } } -func (pr *PortRegistry) increaseNextFreePortIndex() { - pr.NextFreePortIndex++ - //reset the index if needed - if pr.NextFreePortIndex == pr.Max-pr.Min+1 { - pr.NextFreePortIndex = 0 - } - -} - -func (pr *PortRegistry) getPorts() []int32 { - ports := make([]int32, pr.Max-pr.Min+1) - for i := 0; i < len(ports); i++ { - ports[i] = int32(pr.Min) + int32(i) - } - return ports +// SetupWithManager registers the PortRegistry controller with the manager +// we care to watch for changes in the Node objects, only if they are "Ready" and "Schedulable" +func (pr *PortRegistry) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1.Node{}). + WithEventFilter(predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + node := e.Object.(*v1.Node) + if useSpecificNodePoolAndNodeNotGameServer(pr.useSpecificNodePool, node) { + return false + } + return IsNodeReadyAndSchedulable(node) + }, + DeleteFunc: func(e event.DeleteEvent) bool { + node := e.Object.(*v1.Node) + // ignore this Node if we have a specific node pool for game servers (with mps.playfab.com/gameservernode=true Label) + // and the current Node does not have this Label + if useSpecificNodePoolAndNodeNotGameServer(pr.useSpecificNodePool, node) { + return false + } + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldNode := e.ObjectOld.(*v1.Node) + newNode := e.ObjectNew.(*v1.Node) + if useSpecificNodePoolAndNodeNotGameServer(pr.useSpecificNodePool, newNode) { + return false + } + return IsNodeReadyAndSchedulable(oldNode) != IsNodeReadyAndSchedulable(newNode) + }, + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + }). + Complete(pr) } diff --git a/pkg/operator/controllers/port_registry_test.go b/pkg/operator/controllers/port_registry_test.go index e1857fd0..0359da24 100644 --- a/pkg/operator/controllers/port_registry_test.go +++ b/pkg/operator/controllers/port_registry_test.go @@ -2,13 +2,21 @@ package controllers import ( "context" - "errors" "fmt" + "math/rand" + "reflect" + "sync" "time" "github.com/go-logr/logr" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" mpsv1alpha1 "github.com/playfab/thundernetes/pkg/operator/api/v1alpha1" ) @@ -22,138 +30,376 @@ const ( ) var _ = Describe("Port registry tests", func() { - log := logr.FromContextOrDiscard(context.Background()) - portRegistry, err := NewPortRegistry(mpsv1alpha1.GameServerList{}, 20000, 20010, log) - Expect(err).ToNot(HaveOccurred()) - registeredPorts := make([]int32, 7) - assignedPorts := make(map[int32]bool) - + const testMinPort = 20000 + const testMaxPort = 20009 It("should allocate hostPorts when creating game servers", func() { + portRegistry, _ := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) + Expect(portRegistry.Min).To(Equal(int32(testMinPort))) + Expect(portRegistry.Max).To(Equal(int32(testMaxPort))) + assignedPorts := make(map[int32]int) // get 4 ports for i := 0; i < 4; i++ { port, err := portRegistry.GetNewPort() Expect(err).ToNot(HaveOccurred()) + validatePort(port, testMinPort, testMaxPort) + Expect(port).To(Equal(int32(testMinPort + i))) if _, ok := assignedPorts[port]; ok { Fail(fmt.Sprintf("Port %d should not be in the assignedPorts map", port)) } - assignedPorts[port] = true + assignedPorts[port] = assignedPorts[port] + 1 } - verifyAssignedHostPorts(portRegistry, assignedPorts) - verifyUnassignedHostPorts(portRegistry, assignedPorts) - - if displayPortRegistryVariablesDuringTesting { - portRegistry.displayRegistry() + verifyExpectedHostPorts(portRegistry, assignedPorts, 4) + }) + It("should fail to allocate more ports than the maximum", func() { + portRegistry, _ := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) + assignedPorts := make(map[int32]int) + for i := testMinPort; i <= testMaxPort; i++ { + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + validatePort(port, testMinPort, testMaxPort) + if _, ok := assignedPorts[port]; ok { + Fail(fmt.Sprintf("Port %d should not be in the assignedPorts map", port)) + } + assignedPorts[port] = assignedPorts[port] + 1 } + verifyExpectedHostPorts(portRegistry, assignedPorts, 10) - go portRegistry.portProducer() - // end of initialization + // this one should fail + _, err := portRegistry.GetNewPort() + Expect(err).To(HaveOccurred()) }) - It("should allocate more ports", func() { - for i := 0; i < 7; i++ { - peekPort, err := peekNextPort(portRegistry) - actualPort, _ := portRegistry.GetNewPort() - Expect(actualPort).To(BeIdenticalTo(peekPort), fmt.Sprintf("Wrong port returned, peekPort:%d, actualPort:%d", peekPort, actualPort)) - Expect(err).ToNot(HaveOccurred()) - registeredPorts[i] = actualPort + It("should increase/decrease NodeCount when we add/delete Nodes from the cluster", func() { + portRegistry, kubeClient := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + } + err := kubeClient.Create(context.Background(), node) + Expect(err).ToNot(HaveOccurred()) + // do a manual reconcile since we haven't added the controller to the manager + portRegistry.Reconcile(context.Background(), reconcile.Request{}) + Eventually(func() error { + return verifyHostPortsPerNode(portRegistry, 2) + }).Should(Succeed()) - assignedPorts[actualPort] = true + err = kubeClient.Delete(context.Background(), node) + Expect(err).ToNot(HaveOccurred()) + portRegistry.Reconcile(context.Background(), reconcile.Request{}) + Eventually(func() error { + return verifyHostPortsPerNode(portRegistry, 1) + }).Should(Succeed()) + }) - verifyAssignedHostPorts(portRegistry, assignedPorts) - verifyUnassignedHostPorts(portRegistry, assignedPorts) + It("should successfully allocate ports on two Nodes", func() { + portRegistry, kubeClient := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, } + err := kubeClient.Create(context.Background(), node) + Expect(err).ToNot(HaveOccurred()) + // do a manual reconcile since we haven't added the controller to the manager + portRegistry.Reconcile(context.Background(), reconcile.Request{}) + Eventually(func() error { + return verifyHostPortsPerNode(portRegistry, 2) + }).Should(Succeed()) - if displayPortRegistryVariablesDuringTesting { - portRegistry.displayRegistry() + assignedPorts := make(map[int32]int) + // get 15 ports + for i := 0; i < 15; i++ { + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + validatePort(port, testMinPort, testMaxPort) + assignedPorts[port] = assignedPorts[port] + 1 } + + verifyExpectedHostPorts(portRegistry, assignedPorts, 15) }) - It("should return an error when we have exceeded the number of allocated ports", func() { - _, err := peekNextPort(portRegistry) - if err == nil { - Expect(err).To(HaveOccurred()) + It("should successfully deallocate ports - one Node scenario", func() { + portRegistry, _ := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) + Expect(portRegistry.Min).To(Equal(int32(testMinPort))) + Expect(portRegistry.Max).To(Equal(int32(testMaxPort))) + assignedPorts := make(map[int32]int) + // get 10 ports + for i := 0; i < 10; i++ { + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + validatePort(port, testMinPort, testMaxPort) + if _, ok := assignedPorts[port]; ok { + Fail(fmt.Sprintf("Port %d should not be in the assignedPorts map", port)) + } + assignedPorts[port] = assignedPorts[port] + 1 } - _, err = portRegistry.GetNewPort() - if err == nil { - Expect(err).To(HaveOccurred()) + verifyExpectedHostPorts(portRegistry, assignedPorts, 10) + // deallocate two ports + err := portRegistry.DeregisterServerPorts([]int32{testMinPort + 1, testMinPort + 3}) + Expect(err).ToNot(HaveOccurred()) + delete(assignedPorts, testMinPort+1) + delete(assignedPorts, testMinPort+3) + verifyExpectedHostPorts(portRegistry, assignedPorts, 8) + }) + + It("should successfully deallocate ports - two Nodes to one scenario", func() { + portRegistry, kubeClient := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) + Expect(portRegistry.Min).To(Equal(int32(testMinPort))) + Expect(portRegistry.Max).To(Equal(int32(testMaxPort))) + assignedPorts := make(map[int32]int) + // get 10 ports + for i := 0; i < 10; i++ { + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + validatePort(port, testMinPort, testMaxPort) + if _, ok := assignedPorts[port]; ok { + Fail(fmt.Sprintf("Port %d should not be in the assignedPorts map", port)) + } + assignedPorts[port] = assignedPorts[port] + 1 } + verifyExpectedHostPorts(portRegistry, assignedPorts, 10) + // deallocate two ports + err := portRegistry.DeregisterServerPorts([]int32{testMinPort + 1, testMinPort + 3}) + Expect(err).ToNot(HaveOccurred()) + delete(assignedPorts, testMinPort+1) + delete(assignedPorts, testMinPort+3) + verifyExpectedHostPorts(portRegistry, assignedPorts, 8) - if displayPortRegistryVariablesDuringTesting { - portRegistry.displayRegistry() + // add a second Node + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, } - }) - It("should successfully deallocate ports", func() { - portRegistry.DeregisterServerPorts(registeredPorts) + err = kubeClient.Create(context.Background(), node) + Expect(err).ToNot(HaveOccurred()) + // do a manual reconcile since we haven't added the controller to the manager + portRegistry.Reconcile(context.Background(), reconcile.Request{}) + Eventually(func() error { + return verifyHostPortsPerNode(portRegistry, 2) + }).Should(Succeed()) - for _, val := range registeredPorts { - delete(assignedPorts, val) + // get 8 ports, we have 16 in total + for i := 0; i < 8; i++ { + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + validatePort(port, testMinPort, testMaxPort) + assignedPorts[port] = assignedPorts[port] + 1 } + verifyExpectedHostPorts(portRegistry, assignedPorts, 16) - if displayPortRegistryVariablesDuringTesting { - portRegistry.displayRegistry() + // deallocate six ports that exist on the second Node + deletedPortsCount := 0 + for port := portRegistry.Min; port <= portRegistry.Max; port++ { + if portRegistry.HostPortsPerNode[port] == 2 { + err := portRegistry.DeregisterServerPorts([]int32{port}) + assignedPorts[port] = assignedPorts[port] - 1 + Expect(err).ToNot(HaveOccurred()) + deletedPortsCount++ + } } + Expect(deletedPortsCount).To(Equal(6)) + verifyExpectedHostPorts(portRegistry, assignedPorts, 10) + + // now delete the second node + err = kubeClient.Delete(context.Background(), node) + Expect(err).ToNot(HaveOccurred()) + portRegistry.Reconcile(context.Background(), reconcile.Request{}) + Eventually(func() error { + return verifyHostPortsPerNode(portRegistry, 1) + }).Should(Succeed()) + verifyExpectedHostPorts(portRegistry, assignedPorts, 10) - verifyAssignedHostPorts(portRegistry, assignedPorts) - verifyUnassignedHostPorts(portRegistry, assignedPorts) + // deallocate three ports + err = portRegistry.DeregisterServerPorts([]int32{testMinPort + 1, testMinPort + 2, testMinPort + 3}) + Expect(err).ToNot(HaveOccurred()) + delete(assignedPorts, testMinPort+1) + delete(assignedPorts, testMinPort+2) + delete(assignedPorts, testMinPort+3) + verifyExpectedHostPorts(portRegistry, assignedPorts, 7) }) - It("should return another port", func() { - peekPort, err := peekNextPort(portRegistry) - actualPort, _ := portRegistry.GetNewPort() - Expect(actualPort).To(BeNumerically("==", peekPort), fmt.Sprintf("Wrong port returned, peekPort:%d,actualPort:%d", peekPort, actualPort)) +}) + +var _ = Describe("Port registry with two thousand ports, five hundred on four nodes", func() { + rand.Seed(time.Now().UnixNano()) + min := int32(20000) + max := int32(20499) + + portRegistry, kubeClient := getPortRegistryKubeClientForTesting(min, max) + Expect(portRegistry.Min).To(Equal(min)) + Expect(portRegistry.Max).To(Equal(max)) + assignedPorts := sync.Map{} + + // add three nodes + for i := 0; i < 3; i++ { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("node%d", i+2), + }, + } + err := kubeClient.Create(context.Background(), node) Expect(err).ToNot(HaveOccurred()) + portRegistry.Reconcile(context.Background(), reconcile.Request{}) + } - assignedPorts[actualPort] = true + Eventually(func() error { + return verifyHostPortsPerNode(portRegistry, 4) + }).Should(Succeed()) + + It("should work with allocating and deallocating ports", func() { + // allocate all 2000 ports + var wg sync.WaitGroup + for i := 0; i < int(max-min+1)*4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + n := rand.Intn(200) + 50 // n will be between 50 and 250 + time.Sleep(time.Duration(n) * time.Millisecond) + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + val, ok := assignedPorts.Load(port) + if !ok { + assignedPorts.Store(port, 1) + } else { + assignedPorts.Store(port, val.(int)+1) + } + }() + } + wg.Wait() + m := syncMapToMapInt32Int(&assignedPorts) + verifyExpectedHostPorts(portRegistry, m, 2000) - verifyAssignedHostPorts(portRegistry, assignedPorts) - verifyUnassignedHostPorts(portRegistry, assignedPorts) + // trying to get another port should fail, since we've allocated every available port + _, err := portRegistry.GetNewPort() + Expect(err).To(HaveOccurred()) - if displayPortRegistryVariablesDuringTesting { - portRegistry.displayRegistry() + //deallocate 1000 ports + for i := 0; i < int(max-min+1)*2; i++ { + wg.Add(1) + go func(portToDeallocate int32) { + defer wg.Done() + n := rand.Intn(200) + 50 // n will be between 50 and 250 + time.Sleep(time.Duration(n) * time.Millisecond) + err := portRegistry.DeregisterServerPorts([]int32{portToDeallocate}) + Expect(err).ToNot(HaveOccurred()) + val, ok := assignedPorts.Load(portToDeallocate) + if !ok { + Fail(fmt.Sprintf("port %d was not found in the map", portToDeallocate)) + } + assignedPorts.Store(portToDeallocate, val.(int)-1) + }(int32((i / 2) + int(min))) // , this outputs 20000 2 times, 20001 2 times, 20002 2 times etc. } + wg.Wait() + + m = syncMapToMapInt32Int(&assignedPorts) + verifyExpectedHostPorts(portRegistry, m, 1000) + + // allocate 500 ports + for i := 0; i < int(max-min+1); i++ { + wg.Add(1) + go func() { + defer wg.Done() + n := rand.Intn(200) + 50 // n will be between 50 and 250 + time.Sleep(time.Duration(n) * time.Millisecond) + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + val, ok := assignedPorts.Load(port) + if !ok { + assignedPorts.Store(port, 1) + } else { + assignedPorts.Store(port, val.(int)+1) + } + }() + } + wg.Wait() + + m = syncMapToMapInt32Int(&assignedPorts) + verifyExpectedHostPorts(portRegistry, m, 1500) - portRegistry.Stop() + // allocate another 500 ports + for i := 0; i < int(max-min+1); i++ { + wg.Add(1) + go func() { + defer wg.Done() + n := rand.Intn(200) + 50 // n will be between 50 and 250 + time.Sleep(time.Duration(n) * time.Millisecond) + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + val, ok := assignedPorts.Load(port) + if !ok { + assignedPorts.Store(port, 1) + } else { + assignedPorts.Store(port, val.(int)+1) + } + }() + } + wg.Wait() + + m = syncMapToMapInt32Int(&assignedPorts) + verifyExpectedHostPorts(portRegistry, m, 2000) + + // trying to get another port should fail, since we've allocated every available port + _, err = portRegistry.GetNewPort() + Expect(err).To(HaveOccurred()) }) }) -func verifyAssignedHostPorts(portRegistry *PortRegistry, assignedHostPorts map[int32]bool) { - for hostPort := range assignedHostPorts { - val, ok := portRegistry.HostPorts[hostPort] - Expect(ok).Should(BeTrue(), fmt.Sprintf("There should be an entry about hostPort %d", hostPort)) - Expect(val).Should(BeTrue(), fmt.Sprintf("HostPort %d should be registered", hostPort)) - } - +// validatePort checks if the port is in the range testMinPort<=port<=testMaxPort +func validatePort(port, testMinPort, testMaxPort int32) { + Expect(port).Should(BeNumerically(">=", testMinPort)) + Expect(port).Should(BeNumerically("<=", testMaxPort)) } -func verifyUnassignedHostPorts(portRegistry *PortRegistry, assignedHostPorts map[int32]bool) { - Expect(len(portRegistry.HostPorts)).Should(BeNumerically("==", int(portRegistry.Max-portRegistry.Min+1))) - for hostPort, ok := range portRegistry.HostPorts { - if ok { //ignore the assigned ones - continue +// verifyExpectedHostPorts compares the hostPortsPerNode map on the PortRegistry to the expectedHostPorts map +func verifyExpectedHostPorts(portRegistry *PortRegistry, expectedHostPorts map[int32]int, expectedTotalHostPortsCount int) { + actualHostPorts := make(map[int32]int) + actualTotalHostPortsCount := 0 + for port, count := range portRegistry.HostPortsPerNode { + if count > 0 { + actualHostPorts[port] = count + actualTotalHostPortsCount += count } - exists := assignedHostPorts[hostPort] - Expect(exists).To(BeFalse()) } + Expect(reflect.DeepEqual(actualHostPorts, expectedHostPorts)).To(BeTrue()) + Expect(actualTotalHostPortsCount).To(Equal(expectedTotalHostPortsCount)) } -func peekNextPort(pr *PortRegistry) (int32, error) { - tempPointer := pr.NextFreePortIndex - tempPointerCopy := tempPointer - for { - if !pr.HostPorts[pr.Indexes[tempPointer]] { //port not taken - return pr.Indexes[tempPointer], nil - } - - tempPointer++ - if tempPointer == (pr.Max - pr.Min + 1) { - tempPointer = 0 - } +// verifyHostPortsPerNode verifies that the hostPortsPerNode map on the PortRegistry has the proper length +// and its item has the correct length as well +func verifyHostPortsPerNode(portRegistry *PortRegistry, expectedNodeCount int) error { + if portRegistry.NodeCount != expectedNodeCount { + return fmt.Errorf("NodeCount is not %d, it is %d", expectedNodeCount, portRegistry.NodeCount) + } + return nil +} - if tempPointer == tempPointerCopy { - //we've done a full circle, no ports available - return 0, errors.New("No ports available") - } +// getPortRegistryKubeClientForTesting returns a PortRegistry and a fake Kubernetes client for testing +func getPortRegistryKubeClientForTesting(min, max int32) (*PortRegistry, client.Client) { + log := logr.FromContextOrDiscard(context.Background()) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, } + clientBuilder := fake.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(node) + kubeClient := clientBuilder.Build() + Expect(kubeClient).NotTo(BeNil()) + portRegistry, err := NewPortRegistry(kubeClient, &mpsv1alpha1.GameServerList{}, min, max, 1, false, log) + Expect(err).ToNot(HaveOccurred()) + return portRegistry, kubeClient +} + +// syncMapToMapInt32Bool converts a sync.Map to a map[int32]int +// useful as part of our test uses the sync.Map instead of the slice +func syncMapToMapInt32Int(sm *sync.Map) map[int32]int { + m := make(map[int32]int) + sm.Range(func(key, value interface{}) bool { + m[key.(int32)] = value.(int) + return true + }) + return m } diff --git a/pkg/operator/controllers/suite_test.go b/pkg/operator/controllers/suite_test.go index 1fd2018a..99c01e66 100644 --- a/pkg/operator/controllers/suite_test.go +++ b/pkg/operator/controllers/suite_test.go @@ -75,16 +75,25 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + // generate a port registry for the tests + portRegistry, err := NewPortRegistry(k8sClient, &mpsv1alpha1.GameServerList{}, 20000, 20100, 1, false, ctrl.Log) + Expect(err).ToNot(HaveOccurred()) + + err = portRegistry.SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) + err = (&GameServerBuildReconciler{ - Client: k8sManager.GetClient(), - Scheme: k8sManager.GetScheme(), - Recorder: k8sManager.GetEventRecorderFor("GameServerBuildReconciler"), + Client: k8sManager.GetClient(), + Scheme: k8sManager.GetScheme(), + PortRegistry: portRegistry, + Recorder: k8sManager.GetEventRecorderFor("GameServerBuildReconciler"), }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) err = (&GameServerReconciler{ Client: k8sManager.GetClient(), Scheme: k8sManager.GetScheme(), + PortRegistry: portRegistry, Recorder: k8sManager.GetEventRecorderFor("GameServerReconciler"), GetPublicIpForNodeProvider: func(_ context.Context, _ client.Reader, _ string) (string, error) { return "testPublicIP", nil }, }).SetupWithManager(k8sManager) diff --git a/pkg/operator/controllers/utilities.go b/pkg/operator/controllers/utilities.go index e2c5c21e..a813f06c 100644 --- a/pkg/operator/controllers/utilities.go +++ b/pkg/operator/controllers/utilities.go @@ -42,6 +42,8 @@ const ( GameSharedContentDirectory = DataVolumeMountPath + "/GameSharedContent" DaemonSetPort int32 = 56001 + + LabelGameServerNode string = "mps.playfab.com/gameservernode" ) var InitContainerImage string @@ -365,3 +367,22 @@ func getContainerHostPortTuples(pod *corev1.Pod) string { } return strings.TrimSuffix(ports.String(), ",") } + +// IsNodeReadyAndSchedulable returns true if the node is ready and schedulable +func IsNodeReadyAndSchedulable(node *corev1.Node) bool { + if !node.Spec.Unschedulable { + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue { + return true + } + } + } + return false +} + +// useSpecificNodePoolAndNodeNotGameServer returns true if +// 1. the cluster contains a specific Node Pool/Group for GameServers (designated by the mps.playfab.com/gameservernode=true Label) +// 2. and the current Node does *not* have this Label +func useSpecificNodePoolAndNodeNotGameServer(useSpecificNodePool bool, node *corev1.Node) bool { + return useSpecificNodePool && node.Labels[LabelGameServerNode] != "true" +} diff --git a/pkg/operator/main.go b/pkg/operator/main.go index 94869c39..54c481ff 100644 --- a/pkg/operator/main.go +++ b/pkg/operator/main.go @@ -36,19 +36,20 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" "github.com/go-logr/logr" + mpsv1alpha1 "github.com/playfab/thundernetes/pkg/operator/api/v1alpha1" "github.com/playfab/thundernetes/pkg/operator/controllers" //+kubebuilder:scaffold:imports - "github.com/playfab/thundernetes/pkg/operator/http" corev1 "k8s.io/api/core/v1" + + "github.com/playfab/thundernetes/pkg/operator/http" ) var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") - portRegistry *controllers.PortRegistry + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") ) const ( @@ -94,6 +95,7 @@ func main() { os.Exit(1) } + // initialize a live API client, used for the PortRegistry k8sClient, err := client.New(ctrl.GetConfigOrDie(), client.Options{Scheme: scheme}) if err != nil { setupLog.Error(err, "unable to start live API client") @@ -116,10 +118,16 @@ func main() { } } - if err = initializePortRegistry(k8sClient, setupLog); err != nil { + portRegistry, err := initializePortRegistry(k8sClient, mgr.GetClient(), setupLog) + if err != nil { setupLog.Error(err, "unable to initialize portRegistry") os.Exit(1) } + // add the portRegistry to the manager so it can reconcile the Nodes + if err := portRegistry.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "PortRegistry") + os.Exit(1) + } if err = (&controllers.GameServerReconciler{ Client: mgr.GetClient(), @@ -140,8 +148,10 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "GameServerBuild") os.Exit(1) } + //+kubebuilder:scaffold:builder + // initialize the allocation API service err = http.NewAllocationApiServer(mgr, crt, key) if err != nil { setupLog.Error(err, "unable to create HTTP allocation API Server", "Allocation API Server", "HTTP Allocation API Server") @@ -164,26 +174,50 @@ func main() { } } -func initializePortRegistry(k8sClient client.Client, setupLog logr.Logger) error { +// initializePortRegistry performs some initialization and creates a new PortRegistry struct +// the k8sClient is a live API client and is used to get the existing gameservers and the "Ready" Nodes +// the crClient is the cached controller-runtime client, used to watch for changes to the nodes from inside the PortRegistry +func initializePortRegistry(k8sClient client.Client, crClient client.Client, setupLog logr.Logger) (*controllers.PortRegistry, error) { var gameServers mpsv1alpha1.GameServerList if err := k8sClient.List(context.Background(), &gameServers); err != nil { - return err + return nil, err } - minPort, maxPort, err := getMinMaxPortFromEnv() + useExclusivelyGameServerNodesForPortRegistry := useExclusivelyGameServerNodesForPortRegistry() + var nodes corev1.NodeList + if err := k8sClient.List(context.Background(), &nodes); err != nil { + return nil, err + } + + schedulableAndReadyNodeCount := 0 + for i := 0; i < len(nodes.Items); i++ { + if controllers.IsNodeReadyAndSchedulable(&nodes.Items[i]) { + if useExclusivelyGameServerNodesForPortRegistry && nodes.Items[i].Labels[controllers.LabelGameServerNode] == "true" { + schedulableAndReadyNodeCount = schedulableAndReadyNodeCount + 1 + } + } + } + + // get the min/max port from enviroment variables + // the code does not offer any protection in case the port range changes while game servers are running + minPort, maxPort, err := getMinMaxPortFromEnv() if err != nil { - return err + return nil, err } - portRegistry, err = controllers.NewPortRegistry(gameServers, minPort, maxPort, setupLog) + setupLog.Info("initializing port registry", "minPort", minPort, "maxPort", maxPort, "schedulableAndReadyNodeCount", schedulableAndReadyNodeCount) + + portRegistry, err := controllers.NewPortRegistry(crClient, &gameServers, minPort, maxPort, schedulableAndReadyNodeCount, useExclusivelyGameServerNodesForPortRegistry, setupLog) if err != nil { - return err + return nil, err } - return nil + return portRegistry, nil } +// getTlsSecret returns the TLS secret from the given namespace +// used in the allocation API service func getTlsSecret(k8sClient client.Client, namespace string) ([]byte, []byte, error) { var secret corev1.Secret err := k8sClient.Get(context.Background(), types.NamespacedName{ @@ -235,3 +269,7 @@ func getMinMaxPortFromEnv() (int32, int32, error) { return minPort, maxPort, nil } + +func useExclusivelyGameServerNodesForPortRegistry() bool { + return os.Getenv("PORT_REGISTRY_EXCLUSIVELY_GAMESERVER_NODES") == "true" +}