From c27f5a03e18c488362a468cb02cb018e1d8edc52 Mon Sep 17 00:00:00 2001 From: Dimitris Gkanatsios Date: Sun, 13 Mar 2022 00:14:03 -0800 Subject: [PATCH] PortRegistry V2 --- .github/workflows/main.yml | 6 +- cmd/nodeagent/nodeagentmanager_test.go | 10 +- docs/howtos/configureportrange.md | 2 +- pkg/operator/Makefile | 2 +- .../config/manager/kustomization.yaml | 4 +- pkg/operator/controllers/port_registry.go | 250 ++++++---- .../controllers/port_registry_test.go | 443 ++++++++++++++---- pkg/operator/controllers/suite_test.go | 15 +- pkg/operator/controllers/utilities.go | 12 + pkg/operator/main.go | 54 ++- 10 files changed, 593 insertions(+), 205 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 559bfc2f..7117278b 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -36,11 +36,11 @@ jobs: - name: build Docker images run: make builddockerlocal - name: GameServer API service unit tests - run: cd cmd/gameserverapi && GIN_MODE=release go test + run: cd cmd/gameserverapi && GIN_MODE=release go test -race - name: initcontainer unit tests - run: cd cmd/initcontainer && go test + run: cd cmd/initcontainer && go test -race - name: nodeagent unit tests - run: cd cmd/nodeagent && go test + run: cd cmd/nodeagent && go test -race - name: operator unit tests run: IMAGE_NAME_SAMPLE=thundernetes-netcore IMAGE_NAME_INIT_CONTAINER=thundernetes-initcontainer TAG=$(git rev-list HEAD --max-count=1 --abbrev-commit) make -C pkg/operator test - name: install kind binaries diff --git a/cmd/nodeagent/nodeagentmanager_test.go b/cmd/nodeagent/nodeagentmanager_test.go index 68ea2ce8..e237880e 100644 --- a/cmd/nodeagent/nodeagentmanager_test.go +++ b/cmd/nodeagent/nodeagentmanager_test.go @@ -122,7 +122,10 @@ var _ = Describe("nodeagent tests", func() { if !ok { return false } - return tempgs.(*GameServerDetails).WasActivated && tempgs.(*GameServerDetails).PreviousGameState == GameStateStandingBy + tempgs.(*GameServerDetails).Mutex.RLock() + gsd := *tempgs.(*GameServerDetails) + tempgs.(*GameServerDetails).Mutex.RUnlock() + return gsd.WasActivated && gsd.PreviousGameState == GameStateStandingBy }).Should(BeTrue()) // heartbeat from the game is still StandingBy @@ -206,7 +209,10 @@ var _ = Describe("nodeagent tests", func() { if !ok { return false } - return tempgs.(*GameServerDetails).WasActivated && tempgs.(*GameServerDetails).PreviousGameState == GameStateStandingBy + tempgs.(*GameServerDetails).Mutex.RLock() + gsd := *tempgs.(*GameServerDetails) + tempgs.(*GameServerDetails).Mutex.RUnlock() + return gsd.WasActivated && tempgs.(*GameServerDetails).PreviousGameState == GameStateStandingBy }).Should(BeTrue()) // heartbeat from the game is still StandingBy diff --git a/docs/howtos/configureportrange.md b/docs/howtos/configureportrange.md index 56cd655c..efc50595 100644 --- a/docs/howtos/configureportrange.md +++ b/docs/howtos/configureportrange.md @@ -7,4 +7,4 @@ nav_order: 7 # Configure Thundernetes port range -By default, Thundernetes will allocate ports in the range 10000-12000 to your GameServers. These ports are allocated to the entire set of VMs in the cluster, meaning that the max number of GameServers is 2000. If you need more or just a different port range, you can configure it via changing the `MIN_PORT` and the `MAX_PORT` environment variables in the controller deployment YAML file. \ No newline at end of file +By default, Thundernetes will allocate ports in the range 10000-12000 to your GameServers. These ports are allocated to the entire set of VMs in the cluster and are open for each and every VM. If you need more or just a different port range, you can configure it via changing the `MIN_PORT` and the `MAX_PORT` environment variables in the controller deployment YAML file. However, do not modify the port range when there game servers running on the cluster, since this will probably corrupt the port registry, especially if the new and the old range are different. \ No newline at end of file diff --git a/pkg/operator/Makefile b/pkg/operator/Makefile index 5b4e65ef..e637455e 100644 --- a/pkg/operator/Makefile +++ b/pkg/operator/Makefile @@ -62,7 +62,7 @@ vet: ## Run go vet against code. ENVTEST_ASSETS_DIR=$(shell pwd)/testbin .PHONY: test test: manifests generate fmt vet envtest ## Run tests. - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" THUNDERNETES_SAMPLE_IMAGE=$(IMAGE_NAME_SAMPLE):$(NETCORE_SAMPLE_TAG) THUNDERNETES_INIT_CONTAINER_IMAGE=$(IMAGE_NAME_INIT_CONTAINER):$(IMAGE_TAG) go test ./... -coverprofile cover.out + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" THUNDERNETES_SAMPLE_IMAGE=$(IMAGE_NAME_SAMPLE):$(NETCORE_SAMPLE_TAG) THUNDERNETES_INIT_CONTAINER_IMAGE=$(IMAGE_NAME_INIT_CONTAINER):$(IMAGE_TAG) go test -race ./... -v -coverprofile cover.out ##@ Build diff --git a/pkg/operator/config/manager/kustomization.yaml b/pkg/operator/config/manager/kustomization.yaml index 518a0a7b..315a75e3 100755 --- a/pkg/operator/config/manager/kustomization.yaml +++ b/pkg/operator/config/manager/kustomization.yaml @@ -10,5 +10,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: thundernetes-operator - newTag: a7cbf9c + newName: docker.io/dgkanatsios/thundernetes-operator + newTag: 477d7b9 diff --git a/pkg/operator/controllers/port_registry.go b/pkg/operator/controllers/port_registry.go index fce2a94f..6839bc70 100644 --- a/pkg/operator/controllers/port_registry.go +++ b/pkg/operator/controllers/port_registry.go @@ -1,33 +1,53 @@ package controllers import ( + "context" "errors" - "fmt" + "sync" "github.com/go-logr/logr" mpsv1alpha1 "github.com/playfab/thundernetes/pkg/operator/api/v1alpha1" + v1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" ) // PortRegistry implements a custom map for the port registry type PortRegistry struct { - HostPorts map[int32]bool - Indexes []int32 - NextFreePortIndex int32 - Min int32 // Minimum Port - Max int32 // Maximum Port - portRequests chan struct{} // buffered channel to store port requests (request is the containerPort) - portResponses chan int32 // buffered channel to store port responses (system returns the HostPort) + client client.Client // used to get the list of nodes + NodeCount int // the number of Ready and Schedulable nodes in the cluster + HostPortsPerNode []map[int32]bool // a map of all the ports per node. false if available, true if registered + Min int32 // Minimum Port + Max int32 // Maximum Port + lockMutex sync.Mutex // lock for the map } -// NewPortRegistry initializes the IndexedDictionary that holds the port registry. -func NewPortRegistry(gameServers mpsv1alpha1.GameServerList, min, max int32, setupLog logr.Logger) (*PortRegistry, error) { +// NewPortRegistry initializes the IndexedDictionary that holds the port registry +// The way that this works is the following: +// We keep a map (HostPortsPerNode) of all the port numbers and registered status (bool) for every Node +// every time a new port is requested, we check if there is an available port on any of the nodes +// if there is, we set it to true +// We also set up a Kubernetes Watch for the Nodes +// When a new Node is added to the cluster, we add a new set of ports to the map (size = Max-Min+1) +// When a Node is removed, we have to delete the port range for this Node from the map +func NewPortRegistry(client client.Client, gameServers *mpsv1alpha1.GameServerList, min, max int32, nodeCount int, setupLog logr.Logger) (*PortRegistry, error) { + if min > max { + return nil, errors.New("min port cannot be greater than max port") + } + pr := &PortRegistry{ - HostPorts: make(map[int32]bool, max-min+1), - Indexes: make([]int32, max-min+1), - Min: min, - Max: max, - portRequests: make(chan struct{}, 100), - portResponses: make(chan int32, 100), + client: client, + Min: min, + Max: max, + lockMutex: sync.Mutex{}, + } + + // add the necessary set of ports to the map + for i := 0; i < nodeCount; i++ { + pr.onNodeAdded() } // gather ports for existing game servers @@ -39,7 +59,6 @@ func NewPortRegistry(gameServers mpsv1alpha1.GameServerList, min, max int32, set } for _, container := range gs.Spec.Template.Spec.Containers { - portsExposed := make([]int32, len(container.Ports)) portsExposedIndex := 0 @@ -51,115 +70,152 @@ func NewPortRegistry(gameServers mpsv1alpha1.GameServerList, min, max int32, set portsExposed[portsExposedIndex] = portInfo.HostPort portsExposedIndex++ } - + // and register them pr.assignRegisteredPorts(portsExposed) } - } } - - pr.assignUnregisteredPorts() - - go pr.portProducer() - return pr, nil - -} - -func (pr *PortRegistry) displayRegistry() { - fmt.Printf("-------------------------------------\n") - fmt.Printf("Ports: %v\n", pr.HostPorts) - fmt.Printf("Indexes: %v\n", pr.Indexes) - fmt.Printf("NextIndex: %d\n", pr.NextFreePortIndex) - fmt.Printf("-------------------------------------\n") } -// GetNewPort returns and registers a new port for the designated game server. Locks a mutex -func (pr *PortRegistry) GetNewPort() (int32, error) { - pr.portRequests <- struct{}{} - - port := <-pr.portResponses - - if port == -1 { - return -1, errors.New("cannot register a new port. No available ports") +// Reconcile runs when a Node is created/deleted or the node status changes +func (pr *PortRegistry) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := log.FromContext(ctx) + var nodeList v1.NodeList + err := pr.client.List(ctx, &nodeList) + if err != nil { + return ctrl.Result{}, err + } + // calculate how many nodes are ready and schedulable + schedulableNodesCount := 0 + for i := 0; i < len(nodeList.Items); i++ { + if !nodeList.Items[i].Spec.Unschedulable { + schedulableNodesCount++ + } + } + log.Info("Reconciling Nodes", "schedulableNodesCount", schedulableNodesCount, "currentNodesCount", pr.NodeCount) + // most probably it will just be a single node added or removed, but just in case + if pr.NodeCount > schedulableNodesCount { + for i := pr.NodeCount - 1; i >= schedulableNodesCount; i-- { + log.Info("Removing Node") + pr.onNodeRemoved() + } + } else if pr.NodeCount < schedulableNodesCount { + for i := pr.NodeCount; i < schedulableNodesCount; i++ { + log.Info("Adding Node") + pr.onNodeAdded() + } } - return port, nil + return ctrl.Result{}, nil } -func (pr *PortRegistry) portProducer() { - for range pr.portRequests { //wait till a new request comes - - initialIndex := pr.NextFreePortIndex - for { - if !pr.HostPorts[pr.Indexes[pr.NextFreePortIndex]] { - //we found a port - port := pr.Indexes[pr.NextFreePortIndex] - pr.HostPorts[port] = true - - pr.increaseNextFreePortIndex() - - //port is set - pr.portResponses <- port - break - } - - pr.increaseNextFreePortIndex() +// onNodeAdded is called when a Node is added to the cluster +func (pr *PortRegistry) onNodeAdded() { + defer pr.lockMutex.Unlock() + pr.lockMutex.Lock() + // create a new port range for the node + hostPorts := make(map[int32]bool, pr.Max-pr.Min+1) + for i := pr.Min; i <= pr.Max; i++ { + hostPorts[i] = false + } + // add it to the map + pr.HostPortsPerNode = append(pr.HostPortsPerNode, hostPorts) + pr.NodeCount = pr.NodeCount + 1 +} - if initialIndex == pr.NextFreePortIndex { - //we did a full loop - no empty ports - pr.portResponses <- -1 - break +// onNodeRemoved is called when a Node is removed from the cluster +// it removes one set of port ranges from the map +// we don't need to know which one was removed, we just need to move around the registered (set to true) ports +func (pr *PortRegistry) onNodeRemoved() { + defer pr.lockMutex.Unlock() + pr.lockMutex.Lock() + // we're removing the last Node port set + indexToRemove := len(pr.HostPortsPerNode) - 1 + for port := pr.Min; port <= pr.Max; port++ { + if pr.HostPortsPerNode[indexToRemove][port] { + // find a new place (node) for this registered port + for i := 0; i < len(pr.HostPortsPerNode)-1; i++ { + if !pr.HostPortsPerNode[i][port] { + pr.HostPortsPerNode[i][port] = true + break + } } } } + // removes the last item from the slice + pr.HostPortsPerNode = pr.HostPortsPerNode[:len(pr.HostPortsPerNode)-1] + pr.NodeCount = pr.NodeCount - 1 } -// Stop stops port registry mechanism by closing requests and responses channels -func (pr *PortRegistry) Stop() { - close(pr.portRequests) - close(pr.portResponses) +// GetNewPort returns and registers a new port for the designated game server +// One may wonder what happens if two GameServer Pods get assigned the same HostPort +// The answer is that we will not have a collision, since Kubernetes is pretty smart and will place the Pod on a different Node, to prevent collision +func (pr *PortRegistry) GetNewPort() (int32, error) { + defer pr.lockMutex.Unlock() + pr.lockMutex.Lock() + // loops through all the Node maps, returns the first available port + // we expect game servers to go up and down all the time, so the + // location of the first available port is non-deterministic + for nodeIndex := 0; nodeIndex < int(pr.NodeCount); nodeIndex++ { + for port := pr.Min; port <= pr.Max; port++ { + if !pr.HostPortsPerNode[nodeIndex][port] { + pr.HostPortsPerNode[nodeIndex][port] = true + return port, nil + } + } + } + return -1, errors.New("cannot register a new port. No available ports") } // DeregisterServerPorts deregisters all host ports so they can be re-used by additional game servers func (pr *PortRegistry) DeregisterServerPorts(ports []int32) { + defer pr.lockMutex.Unlock() + pr.lockMutex.Lock() for i := 0; i < len(ports); i++ { - pr.HostPorts[ports[i]] = false + for nodeIndex := 0; nodeIndex < pr.NodeCount; nodeIndex++ { + if pr.HostPortsPerNode[nodeIndex][ports[i]] { + // setting the port to false means it can be re-used + pr.HostPortsPerNode[nodeIndex][ports[i]] = false + break + } + } } } +// assignRegisteredPorts assigns ports that are already registered +// used for existing game servers and when the controller is updated/crashed and started again func (pr *PortRegistry) assignRegisteredPorts(ports []int32) { + defer pr.lockMutex.Unlock() + pr.lockMutex.Lock() for i := 0; i < len(ports); i++ { - pr.HostPorts[ports[i]] = true - pr.Indexes[i] = ports[i] - pr.increaseNextFreePortIndex() - } -} - -func (pr *PortRegistry) assignUnregisteredPorts() { - i := pr.NextFreePortIndex - for _, port := range pr.getPorts() { - if _, ok := pr.HostPorts[port]; !ok { - pr.HostPorts[port] = false - pr.Indexes[i] = port - i++ + for nodeIndex := 0; nodeIndex < pr.NodeCount; nodeIndex++ { + if pr.HostPortsPerNode[nodeIndex][ports[i]] { + // setting the port to true means it's registered + pr.HostPortsPerNode[nodeIndex][ports[i]] = true + break + } } } } -func (pr *PortRegistry) increaseNextFreePortIndex() { - pr.NextFreePortIndex++ - //reset the index if needed - if pr.NextFreePortIndex == pr.Max-pr.Min+1 { - pr.NextFreePortIndex = 0 - } - -} - -func (pr *PortRegistry) getPorts() []int32 { - ports := make([]int32, pr.Max-pr.Min+1) - for i := 0; i < len(ports); i++ { - ports[i] = int32(pr.Min) + int32(i) - } - return ports +// SetupWithManager registers the PortRegistry controller with the manager +// we care to watch for changes in the Node objects, only if they are "Ready" and "Schedulable" +func (pr *PortRegistry) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1.Node{}). + WithEventFilter(predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return IsNodeReadyAndSchedulable(e.Object.(*v1.Node)) + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldNode := e.ObjectOld.(*v1.Node) + newNode := e.ObjectNew.(*v1.Node) + return IsNodeReadyAndSchedulable(oldNode) != IsNodeReadyAndSchedulable(newNode) + }, + }). + Complete(pr) } diff --git a/pkg/operator/controllers/port_registry_test.go b/pkg/operator/controllers/port_registry_test.go index e1857fd0..c7dc2467 100644 --- a/pkg/operator/controllers/port_registry_test.go +++ b/pkg/operator/controllers/port_registry_test.go @@ -2,13 +2,21 @@ package controllers import ( "context" - "errors" "fmt" + "math/rand" + "reflect" + "sync" "time" "github.com/go-logr/logr" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" mpsv1alpha1 "github.com/playfab/thundernetes/pkg/operator/api/v1alpha1" ) @@ -22,138 +30,405 @@ const ( ) var _ = Describe("Port registry tests", func() { - log := logr.FromContextOrDiscard(context.Background()) - portRegistry, err := NewPortRegistry(mpsv1alpha1.GameServerList{}, 20000, 20010, log) - Expect(err).ToNot(HaveOccurred()) - registeredPorts := make([]int32, 7) - assignedPorts := make(map[int32]bool) - + const testMinPort = 20000 + const testMaxPort = 20009 It("should allocate hostPorts when creating game servers", func() { + portRegistry, _ := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) + Expect(portRegistry.Min).To(Equal(int32(testMinPort))) + Expect(portRegistry.Max).To(Equal(int32(testMaxPort))) + assignedPorts := make(map[int32]int) // get 4 ports for i := 0; i < 4; i++ { port, err := portRegistry.GetNewPort() Expect(err).ToNot(HaveOccurred()) + validatePort(port, testMinPort, testMaxPort) if _, ok := assignedPorts[port]; ok { Fail(fmt.Sprintf("Port %d should not be in the assignedPorts map", port)) } - assignedPorts[port] = true + assignedPorts[port] = assignedPorts[port] + 1 } - verifyAssignedHostPorts(portRegistry, assignedPorts) - verifyUnassignedHostPorts(portRegistry, assignedPorts) - - if displayPortRegistryVariablesDuringTesting { - portRegistry.displayRegistry() + verifyExpectedHostPorts(portRegistry, assignedPorts, 4) + }) + It("should fail to allocate more ports than the maximum", func() { + portRegistry, _ := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) + assignedPorts := make(map[int32]int) + for i := testMinPort; i <= testMaxPort; i++ { + port, err := portRegistry.GetNewPort() + fmt.Fprintf(GinkgoWriter, "port: %d\n", port) + Expect(err).ToNot(HaveOccurred()) + validatePort(port, testMinPort, testMaxPort) + if _, ok := assignedPorts[port]; ok { + Fail(fmt.Sprintf("Port %d should not be in the assignedPorts map", port)) + } + assignedPorts[port] = assignedPorts[port] + 1 } + verifyExpectedHostPorts(portRegistry, assignedPorts, 10) - go portRegistry.portProducer() - // end of initialization + // this one should fail + _, err := portRegistry.GetNewPort() + Expect(err).To(HaveOccurred()) }) - It("should allocate more ports", func() { - for i := 0; i < 7; i++ { - peekPort, err := peekNextPort(portRegistry) - actualPort, _ := portRegistry.GetNewPort() - Expect(actualPort).To(BeIdenticalTo(peekPort), fmt.Sprintf("Wrong port returned, peekPort:%d, actualPort:%d", peekPort, actualPort)) - Expect(err).ToNot(HaveOccurred()) - registeredPorts[i] = actualPort + It("should increase/decrease NodeCount when we add/delete Nodes from the cluster", func() { + portRegistry, kubeClient := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + } + err := kubeClient.Create(context.Background(), node) + Expect(err).ToNot(HaveOccurred()) + // do a manual reconcile since we haven't added the controller to the manager + portRegistry.Reconcile(context.Background(), reconcile.Request{}) + Eventually(func() error { + return verifyHostPortsPerNode(portRegistry, 2) + }).Should(Succeed()) - assignedPorts[actualPort] = true + err = kubeClient.Delete(context.Background(), node) + Expect(err).ToNot(HaveOccurred()) + portRegistry.Reconcile(context.Background(), reconcile.Request{}) + Eventually(func() error { + return verifyHostPortsPerNode(portRegistry, 1) + }).Should(Succeed()) + }) - verifyAssignedHostPorts(portRegistry, assignedPorts) - verifyUnassignedHostPorts(portRegistry, assignedPorts) + It("should successfully allocate ports on two Nodes", func() { + portRegistry, kubeClient := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, } + err := kubeClient.Create(context.Background(), node) + Expect(err).ToNot(HaveOccurred()) + // do a manual reconcile since we haven't added the controller to the manager + portRegistry.Reconcile(context.Background(), reconcile.Request{}) + Eventually(func() error { + return verifyHostPortsPerNode(portRegistry, 2) + }).Should(Succeed()) - if displayPortRegistryVariablesDuringTesting { - portRegistry.displayRegistry() + assignedPorts := make(map[int32]int) + // get 15 ports + for i := 0; i < 15; i++ { + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + validatePort(port, testMinPort, testMaxPort) + assignedPorts[port] = assignedPorts[port] + 1 } + + verifyExpectedHostPorts(portRegistry, assignedPorts, 15) }) - It("should return an error when we have exceeded the number of allocated ports", func() { - _, err := peekNextPort(portRegistry) - if err == nil { - Expect(err).To(HaveOccurred()) + It("should successfully deallocate ports - one Node scenario", func() { + portRegistry, _ := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) + Expect(portRegistry.Min).To(Equal(int32(testMinPort))) + Expect(portRegistry.Max).To(Equal(int32(testMaxPort))) + assignedPorts := make(map[int32]int) + // get 10 ports + for i := 0; i < 10; i++ { + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + validatePort(port, testMinPort, testMaxPort) + if _, ok := assignedPorts[port]; ok { + Fail(fmt.Sprintf("Port %d should not be in the assignedPorts map", port)) + } + assignedPorts[port] = assignedPorts[port] + 1 } - _, err = portRegistry.GetNewPort() - if err == nil { - Expect(err).To(HaveOccurred()) + verifyExpectedHostPorts(portRegistry, assignedPorts, 10) + // deallocate two ports + portRegistry.DeregisterServerPorts([]int32{testMinPort + 1, testMinPort + 3}) + delete(assignedPorts, testMinPort+1) + delete(assignedPorts, testMinPort+3) + verifyExpectedHostPorts(portRegistry, assignedPorts, 8) + }) + + It("should successfully deallocate ports - two Nodes to one scenario", func() { + portRegistry, kubeClient := getPortRegistryKubeClientForTesting(testMinPort, testMaxPort) + Expect(portRegistry.Min).To(Equal(int32(testMinPort))) + Expect(portRegistry.Max).To(Equal(int32(testMaxPort))) + assignedPorts := make(map[int32]int) + // get 10 ports + for i := 0; i < 10; i++ { + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + validatePort(port, testMinPort, testMaxPort) + if _, ok := assignedPorts[port]; ok { + Fail(fmt.Sprintf("Port %d should not be in the assignedPorts map", port)) + } + assignedPorts[port] = assignedPorts[port] + 1 } + verifyExpectedHostPorts(portRegistry, assignedPorts, 10) + // deallocate two ports + portRegistry.DeregisterServerPorts([]int32{testMinPort + 1, testMinPort + 3}) + delete(assignedPorts, testMinPort+1) + delete(assignedPorts, testMinPort+3) + verifyExpectedHostPorts(portRegistry, assignedPorts, 8) - if displayPortRegistryVariablesDuringTesting { - portRegistry.displayRegistry() + // add a second Node + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, } - }) - It("should successfully deallocate ports", func() { - portRegistry.DeregisterServerPorts(registeredPorts) + err := kubeClient.Create(context.Background(), node) + Expect(err).ToNot(HaveOccurred()) + // do a manual reconcile since we haven't added the controller to the manager + portRegistry.Reconcile(context.Background(), reconcile.Request{}) + Eventually(func() error { + return verifyHostPortsPerNode(portRegistry, 2) + }).Should(Succeed()) - for _, val := range registeredPorts { - delete(assignedPorts, val) + // get 8 ports, we have 16 in total + for i := 0; i < 8; i++ { + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + validatePort(port, testMinPort, testMaxPort) + assignedPorts[port] = assignedPorts[port] + 1 } + verifyExpectedHostPorts(portRegistry, assignedPorts, 16) - if displayPortRegistryVariablesDuringTesting { - portRegistry.displayRegistry() + // deallocate eight ports + i := 0 + portsToDeallocate := make([]int32, 8) + for port, val := range portRegistry.HostPortsPerNode[1] { + if val { + portsToDeallocate[i] = port + i++ + assignedPorts[port] = assignedPorts[port] - 1 + if assignedPorts[port] == 0 { + delete(assignedPorts, port) + } + } + if i == 8 { + break + } } + // if i is less than 8, this means that we didn't find all the ports on the second Node + // so let's delete them from the first + if i < 8 { + for port, val := range portRegistry.HostPortsPerNode[0] { + if val { + portsToDeallocate[i] = port + i++ + assignedPorts[port] = assignedPorts[port] - 1 + if assignedPorts[port] == 0 { + delete(assignedPorts, port) + } + } + if i == 8 { + break + } + } + } + + portRegistry.DeregisterServerPorts(portsToDeallocate) + verifyExpectedHostPorts(portRegistry, assignedPorts, 8) - verifyAssignedHostPorts(portRegistry, assignedPorts) - verifyUnassignedHostPorts(portRegistry, assignedPorts) + // now delete the second node + err = kubeClient.Delete(context.Background(), node) + Expect(err).ToNot(HaveOccurred()) + portRegistry.Reconcile(context.Background(), reconcile.Request{}) + Eventually(func() error { + return verifyHostPortsPerNode(portRegistry, 1) + }).Should(Succeed()) + verifyExpectedHostPorts(portRegistry, assignedPorts, 8) }) - It("should return another port", func() { - peekPort, err := peekNextPort(portRegistry) - actualPort, _ := portRegistry.GetNewPort() - Expect(actualPort).To(BeNumerically("==", peekPort), fmt.Sprintf("Wrong port returned, peekPort:%d,actualPort:%d", peekPort, actualPort)) +}) + +var _ = Describe("Port registry with two thousand ports, five hundred on four nodes", func() { + rand.Seed(time.Now().UnixNano()) + min := int32(20000) + max := int32(20499) + + portRegistry, kubeClient := getPortRegistryKubeClientForTesting(min, max) + Expect(portRegistry.Min).To(Equal(min)) + Expect(portRegistry.Max).To(Equal(max)) + assignedPorts := sync.Map{} + + // add three nodes + for i := 0; i < 3; i++ { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("node%d", i+2), + }, + } + err := kubeClient.Create(context.Background(), node) Expect(err).ToNot(HaveOccurred()) + portRegistry.Reconcile(context.Background(), reconcile.Request{}) + } - assignedPorts[actualPort] = true + Eventually(func() error { + return verifyHostPortsPerNode(portRegistry, 4) + }).Should(Succeed()) - verifyAssignedHostPorts(portRegistry, assignedPorts) - verifyUnassignedHostPorts(portRegistry, assignedPorts) + It("should work with allocating and deallocating ports", func() { + // allocate all 2000 ports + var wg sync.WaitGroup + for i := 0; i < int(max-min+1)*4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + n := rand.Intn(200) + 50 // n will be between 50 and 250 + time.Sleep(time.Duration(n) * time.Millisecond) + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + val, ok := assignedPorts.Load(port) + if !ok { + assignedPorts.Store(port, 1) + } else { + assignedPorts.Store(port, val.(int)+1) + } + }() + } + wg.Wait() + m := syncMapToMapInt32Int(&assignedPorts) + verifyExpectedHostPorts(portRegistry, m, 2000) + + // trying to get another port should fail, since we've allocated every available port + _, err := portRegistry.GetNewPort() + Expect(err).To(HaveOccurred()) + + //deallocate 1000 ports + for i := 0; i < int(max-min+1)*2; i++ { + wg.Add(1) + go func(portToDeallocate int32) { + defer wg.Done() + n := rand.Intn(200) + 50 // n will be between 50 and 250 + time.Sleep(time.Duration(n) * time.Millisecond) + portRegistry.DeregisterServerPorts([]int32{portToDeallocate}) + val, ok := assignedPorts.Load(portToDeallocate) + if !ok { + Fail(fmt.Sprintf("port %d was not found in the map", portToDeallocate)) + } + assignedPorts.Store(portToDeallocate, val.(int)-1) + }(int32((i / 2) + int(min))) // , this outputs 20000 2 times, 20001 2 times, 20002 2 times etc. + } + wg.Wait() + + m = syncMapToMapInt32Int(&assignedPorts) + verifyExpectedHostPorts(portRegistry, m, 1000) - if displayPortRegistryVariablesDuringTesting { - portRegistry.displayRegistry() + // allocate 500 ports + for i := 0; i < int(max-min+1); i++ { + wg.Add(1) + go func() { + defer wg.Done() + n := rand.Intn(200) + 50 // n will be between 50 and 250 + time.Sleep(time.Duration(n) * time.Millisecond) + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + val, ok := assignedPorts.Load(port) + if !ok { + assignedPorts.Store(port, 1) + } else { + assignedPorts.Store(port, val.(int)+1) + } + }() } + wg.Wait() + + m = syncMapToMapInt32Int(&assignedPorts) + verifyExpectedHostPorts(portRegistry, m, 1500) + + // allocate another 500 ports + for i := 0; i < int(max-min+1); i++ { + wg.Add(1) + go func() { + defer wg.Done() + n := rand.Intn(200) + 50 // n will be between 50 and 250 + time.Sleep(time.Duration(n) * time.Millisecond) + port, err := portRegistry.GetNewPort() + Expect(err).ToNot(HaveOccurred()) + val, ok := assignedPorts.Load(port) + if !ok { + assignedPorts.Store(port, 1) + } else { + assignedPorts.Store(port, val.(int)+1) + } + }() + } + wg.Wait() + + m = syncMapToMapInt32Int(&assignedPorts) + verifyExpectedHostPorts(portRegistry, m, 2000) - portRegistry.Stop() + // trying to get another port should fail, since we've allocated every available port + _, err = portRegistry.GetNewPort() + Expect(err).To(HaveOccurred()) }) }) -func verifyAssignedHostPorts(portRegistry *PortRegistry, assignedHostPorts map[int32]bool) { - for hostPort := range assignedHostPorts { - val, ok := portRegistry.HostPorts[hostPort] - Expect(ok).Should(BeTrue(), fmt.Sprintf("There should be an entry about hostPort %d", hostPort)) - Expect(val).Should(BeTrue(), fmt.Sprintf("HostPort %d should be registered", hostPort)) - } - +// validatePort checks if the port is in the range testMinPort<=port<=testMaxPort +func validatePort(port, testMinPort, testMaxPort int32) { + Expect(port).Should(BeNumerically(">=", testMinPort)) + Expect(port).Should(BeNumerically("<=", testMaxPort)) } -func verifyUnassignedHostPorts(portRegistry *PortRegistry, assignedHostPorts map[int32]bool) { - Expect(len(portRegistry.HostPorts)).Should(BeNumerically("==", int(portRegistry.Max-portRegistry.Min+1))) - for hostPort, ok := range portRegistry.HostPorts { - if ok { //ignore the assigned ones - continue +// verifyExpectedHostPorts compares the hostPortsPerNode map on the PortRegistry to the expectedHostPorts map +func verifyExpectedHostPorts(portRegistry *PortRegistry, expectedHostPorts map[int32]int, expectedTotalHostPortsCount int) { + actualHostPorts := make(map[int32]int) + for nodeIndex := 0; nodeIndex < portRegistry.NodeCount; nodeIndex++ { + for port := portRegistry.Min; port <= portRegistry.Max; port++ { + if val := portRegistry.HostPortsPerNode[nodeIndex][port]; val { + actualHostPorts[port] = actualHostPorts[port] + 1 + } } - exists := assignedHostPorts[hostPort] - Expect(exists).To(BeFalse()) } -} - -func peekNextPort(pr *PortRegistry) (int32, error) { - tempPointer := pr.NextFreePortIndex - tempPointerCopy := tempPointer - for { - if !pr.HostPorts[pr.Indexes[tempPointer]] { //port not taken - return pr.Indexes[tempPointer], nil + Expect(reflect.DeepEqual(actualHostPorts, expectedHostPorts)).To(BeTrue()) + actualTotalHostPortsCount := 0 + for nodeIndex := 0; nodeIndex < portRegistry.NodeCount; nodeIndex++ { + for port := portRegistry.Min; port <= portRegistry.Max; port++ { + if val := portRegistry.HostPortsPerNode[nodeIndex][port]; val { + actualTotalHostPortsCount++ + } } + } + Expect(actualTotalHostPortsCount).To(Equal(expectedTotalHostPortsCount)) +} - tempPointer++ - if tempPointer == (pr.Max - pr.Min + 1) { - tempPointer = 0 +// verifyHostPortsPerNode verifies that the hostPortsPerNode map on the PortRegistry has the proper length +// and its item has the correct length as well +func verifyHostPortsPerNode(portRegistry *PortRegistry, expectedNodeCount int) error { + if portRegistry.NodeCount != expectedNodeCount { + return fmt.Errorf("NodeCount is not %d, it is %d", expectedNodeCount, portRegistry.NodeCount) + } + if len(portRegistry.HostPortsPerNode) != expectedNodeCount { + return fmt.Errorf("HostPortsPerNode is not %d, it is %d", expectedNodeCount, len(portRegistry.HostPortsPerNode)) + } + for i := 0; i < expectedNodeCount; i++ { + if len(portRegistry.HostPortsPerNode[i]) != int(portRegistry.Max-portRegistry.Min+1) { + return fmt.Errorf("len(HostPortsPerNode[%d]) is not %d, it is %d", i, portRegistry.Max-portRegistry.Min+1, len(portRegistry.HostPortsPerNode[i])) } + } + return nil +} - if tempPointer == tempPointerCopy { - //we've done a full circle, no ports available - return 0, errors.New("No ports available") - } +// getPortRegistryKubeClientForTesting returns a PortRegistry and a fake Kubernetes client for testing +func getPortRegistryKubeClientForTesting(min, max int32) (*PortRegistry, client.Client) { + log := logr.FromContextOrDiscard(context.Background()) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, } + clientBuilder := fake.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(node) + kubeClient := clientBuilder.Build() + Expect(kubeClient).NotTo(BeNil()) + portRegistry, err := NewPortRegistry(kubeClient, &mpsv1alpha1.GameServerList{}, min, max, 1, log) + Expect(err).ToNot(HaveOccurred()) + return portRegistry, kubeClient +} + +// syncMapToMapInt32Bool converts a sync.Map to a map[int32]int +// useful as part of our test uses the sync.Map instead of the slice +func syncMapToMapInt32Int(sm *sync.Map) map[int32]int { + m := make(map[int32]int) + sm.Range(func(key, value interface{}) bool { + m[key.(int32)] = value.(int) + return true + }) + return m } diff --git a/pkg/operator/controllers/suite_test.go b/pkg/operator/controllers/suite_test.go index 1fd2018a..354546a7 100644 --- a/pkg/operator/controllers/suite_test.go +++ b/pkg/operator/controllers/suite_test.go @@ -75,16 +75,25 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + // generate a port registry for the tests + portRegistry, err := NewPortRegistry(k8sClient, &mpsv1alpha1.GameServerList{}, 20000, 20100, 1, ctrl.Log) + Expect(err).ToNot(HaveOccurred()) + + err = portRegistry.SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) + err = (&GameServerBuildReconciler{ - Client: k8sManager.GetClient(), - Scheme: k8sManager.GetScheme(), - Recorder: k8sManager.GetEventRecorderFor("GameServerBuildReconciler"), + Client: k8sManager.GetClient(), + Scheme: k8sManager.GetScheme(), + PortRegistry: portRegistry, + Recorder: k8sManager.GetEventRecorderFor("GameServerBuildReconciler"), }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) err = (&GameServerReconciler{ Client: k8sManager.GetClient(), Scheme: k8sManager.GetScheme(), + PortRegistry: portRegistry, Recorder: k8sManager.GetEventRecorderFor("GameServerReconciler"), GetPublicIpForNodeProvider: func(_ context.Context, _ client.Reader, _ string) (string, error) { return "testPublicIP", nil }, }).SetupWithManager(k8sManager) diff --git a/pkg/operator/controllers/utilities.go b/pkg/operator/controllers/utilities.go index e2c5c21e..84f4f02e 100644 --- a/pkg/operator/controllers/utilities.go +++ b/pkg/operator/controllers/utilities.go @@ -365,3 +365,15 @@ func getContainerHostPortTuples(pod *corev1.Pod) string { } return strings.TrimSuffix(ports.String(), ",") } + +func IsNodeReadyAndSchedulable(node *corev1.Node) bool { + if !node.Spec.Unschedulable { + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue { + return true + } + } + } + + return false +} diff --git a/pkg/operator/main.go b/pkg/operator/main.go index 94869c39..3dff1e6b 100644 --- a/pkg/operator/main.go +++ b/pkg/operator/main.go @@ -36,19 +36,20 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" "github.com/go-logr/logr" + mpsv1alpha1 "github.com/playfab/thundernetes/pkg/operator/api/v1alpha1" "github.com/playfab/thundernetes/pkg/operator/controllers" //+kubebuilder:scaffold:imports - "github.com/playfab/thundernetes/pkg/operator/http" corev1 "k8s.io/api/core/v1" + + "github.com/playfab/thundernetes/pkg/operator/http" ) var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") - portRegistry *controllers.PortRegistry + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") ) const ( @@ -94,6 +95,7 @@ func main() { os.Exit(1) } + // initialize a live API client, used for the PortRegistry k8sClient, err := client.New(ctrl.GetConfigOrDie(), client.Options{Scheme: scheme}) if err != nil { setupLog.Error(err, "unable to start live API client") @@ -116,10 +118,16 @@ func main() { } } - if err = initializePortRegistry(k8sClient, setupLog); err != nil { + portRegistry, err := initializePortRegistry(k8sClient, mgr.GetClient(), setupLog) + if err != nil { setupLog.Error(err, "unable to initialize portRegistry") os.Exit(1) } + // add the portRegistry to the manager so it can reconcile the Nodes + if err := portRegistry.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "PortRegistry") + os.Exit(1) + } if err = (&controllers.GameServerReconciler{ Client: mgr.GetClient(), @@ -140,8 +148,10 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "GameServerBuild") os.Exit(1) } + //+kubebuilder:scaffold:builder + // initialize the allocation API service err = http.NewAllocationApiServer(mgr, crt, key) if err != nil { setupLog.Error(err, "unable to create HTTP allocation API Server", "Allocation API Server", "HTTP Allocation API Server") @@ -164,26 +174,46 @@ func main() { } } -func initializePortRegistry(k8sClient client.Client, setupLog logr.Logger) error { +// initializePortRegistry performs some initialization and creates a new PortRegistry struct +// the k8sClient is a live API client and is used to get the existing gameservers and the "Ready" Nodes +// the crClient is the cached controller-runtime client, used to watch for changes to the nodes from inside the PortRegistry +func initializePortRegistry(k8sClient client.Client, crClient client.Client, setupLog logr.Logger) (*controllers.PortRegistry, error) { var gameServers mpsv1alpha1.GameServerList if err := k8sClient.List(context.Background(), &gameServers); err != nil { - return err + return nil, err } - minPort, maxPort, err := getMinMaxPortFromEnv() + var nodes corev1.NodeList + if err := k8sClient.List(context.Background(), &nodes); err != nil { + return nil, err + } + schedulableAndReadyNodeCount := 0 + for i := 0; i < len(nodes.Items); i++ { + if controllers.IsNodeReadyAndSchedulable(&nodes.Items[i]) { + schedulableAndReadyNodeCount = schedulableAndReadyNodeCount + 1 + } + } + + // get the min/max port from enviroment variables + // the code does not offer any protection in case the port range changes while game servers are running + minPort, maxPort, err := getMinMaxPortFromEnv() if err != nil { - return err + return nil, err } - portRegistry, err = controllers.NewPortRegistry(gameServers, minPort, maxPort, setupLog) + setupLog.Info("initializing port registry", "minPort", minPort, "maxPort", maxPort, "schedulableAndReadyNodeCount", schedulableAndReadyNodeCount) + + portRegistry, err := controllers.NewPortRegistry(crClient, &gameServers, minPort, maxPort, schedulableAndReadyNodeCount, setupLog) if err != nil { - return err + return nil, err } - return nil + return portRegistry, nil } +// getTlsSecret returns the TLS secret from the given namespace +// used in the allocation API service func getTlsSecret(k8sClient client.Client, namespace string) ([]byte, []byte, error) { var secret corev1.Secret err := k8sClient.Get(context.Background(), types.NamespacedName{