Skip to content

Commit

Permalink
PortRegistry V2
Browse files Browse the repository at this point in the history
  • Loading branch information
dgkanatsios committed Mar 14, 2022
1 parent 7e9d174 commit c27f5a0
Show file tree
Hide file tree
Showing 10 changed files with 593 additions and 205 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ jobs:
- name: build Docker images
run: make builddockerlocal
- name: GameServer API service unit tests
run: cd cmd/gameserverapi && GIN_MODE=release go test
run: cd cmd/gameserverapi && GIN_MODE=release go test -race
- name: initcontainer unit tests
run: cd cmd/initcontainer && go test
run: cd cmd/initcontainer && go test -race
- name: nodeagent unit tests
run: cd cmd/nodeagent && go test
run: cd cmd/nodeagent && go test -race
- name: operator unit tests
run: IMAGE_NAME_SAMPLE=thundernetes-netcore IMAGE_NAME_INIT_CONTAINER=thundernetes-initcontainer TAG=$(git rev-list HEAD --max-count=1 --abbrev-commit) make -C pkg/operator test
- name: install kind binaries
Expand Down
10 changes: 8 additions & 2 deletions cmd/nodeagent/nodeagentmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ var _ = Describe("nodeagent tests", func() {
if !ok {
return false
}
return tempgs.(*GameServerDetails).WasActivated && tempgs.(*GameServerDetails).PreviousGameState == GameStateStandingBy
tempgs.(*GameServerDetails).Mutex.RLock()
gsd := *tempgs.(*GameServerDetails)
tempgs.(*GameServerDetails).Mutex.RUnlock()
return gsd.WasActivated && gsd.PreviousGameState == GameStateStandingBy
}).Should(BeTrue())

// heartbeat from the game is still StandingBy
Expand Down Expand Up @@ -206,7 +209,10 @@ var _ = Describe("nodeagent tests", func() {
if !ok {
return false
}
return tempgs.(*GameServerDetails).WasActivated && tempgs.(*GameServerDetails).PreviousGameState == GameStateStandingBy
tempgs.(*GameServerDetails).Mutex.RLock()
gsd := *tempgs.(*GameServerDetails)
tempgs.(*GameServerDetails).Mutex.RUnlock()
return gsd.WasActivated && tempgs.(*GameServerDetails).PreviousGameState == GameStateStandingBy
}).Should(BeTrue())

// heartbeat from the game is still StandingBy
Expand Down
2 changes: 1 addition & 1 deletion docs/howtos/configureportrange.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ nav_order: 7

# Configure Thundernetes port range

By default, Thundernetes will allocate ports in the range 10000-12000 to your GameServers. These ports are allocated to the entire set of VMs in the cluster, meaning that the max number of GameServers is 2000. If you need more or just a different port range, you can configure it via changing the `MIN_PORT` and the `MAX_PORT` environment variables in the controller deployment YAML file.
By default, Thundernetes will allocate ports in the range 10000-12000 to your GameServers. These ports are allocated to the entire set of VMs in the cluster and are open for each and every VM. If you need more or just a different port range, you can configure it via changing the `MIN_PORT` and the `MAX_PORT` environment variables in the controller deployment YAML file. However, do not modify the port range when there game servers running on the cluster, since this will probably corrupt the port registry, especially if the new and the old range are different.
2 changes: 1 addition & 1 deletion pkg/operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ vet: ## Run go vet against code.
ENVTEST_ASSETS_DIR=$(shell pwd)/testbin
.PHONY: test
test: manifests generate fmt vet envtest ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" THUNDERNETES_SAMPLE_IMAGE=$(IMAGE_NAME_SAMPLE):$(NETCORE_SAMPLE_TAG) THUNDERNETES_INIT_CONTAINER_IMAGE=$(IMAGE_NAME_INIT_CONTAINER):$(IMAGE_TAG) go test ./... -coverprofile cover.out
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" THUNDERNETES_SAMPLE_IMAGE=$(IMAGE_NAME_SAMPLE):$(NETCORE_SAMPLE_TAG) THUNDERNETES_INIT_CONTAINER_IMAGE=$(IMAGE_NAME_INIT_CONTAINER):$(IMAGE_TAG) go test -race ./... -v -coverprofile cover.out

##@ Build

Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: thundernetes-operator
newTag: a7cbf9c
newName: docker.io/dgkanatsios/thundernetes-operator
newTag: 477d7b9
250 changes: 153 additions & 97 deletions pkg/operator/controllers/port_registry.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,53 @@
package controllers

import (
"context"
"errors"
"fmt"
"sync"

"github.com/go-logr/logr"
mpsv1alpha1 "github.com/playfab/thundernetes/pkg/operator/api/v1alpha1"
v1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// PortRegistry implements a custom map for the port registry
type PortRegistry struct {
HostPorts map[int32]bool
Indexes []int32
NextFreePortIndex int32
Min int32 // Minimum Port
Max int32 // Maximum Port
portRequests chan struct{} // buffered channel to store port requests (request is the containerPort)
portResponses chan int32 // buffered channel to store port responses (system returns the HostPort)
client client.Client // used to get the list of nodes
NodeCount int // the number of Ready and Schedulable nodes in the cluster
HostPortsPerNode []map[int32]bool // a map of all the ports per node. false if available, true if registered
Min int32 // Minimum Port
Max int32 // Maximum Port
lockMutex sync.Mutex // lock for the map
}

// NewPortRegistry initializes the IndexedDictionary that holds the port registry.
func NewPortRegistry(gameServers mpsv1alpha1.GameServerList, min, max int32, setupLog logr.Logger) (*PortRegistry, error) {
// NewPortRegistry initializes the IndexedDictionary that holds the port registry
// The way that this works is the following:
// We keep a map (HostPortsPerNode) of all the port numbers and registered status (bool) for every Node
// every time a new port is requested, we check if there is an available port on any of the nodes
// if there is, we set it to true
// We also set up a Kubernetes Watch for the Nodes
// When a new Node is added to the cluster, we add a new set of ports to the map (size = Max-Min+1)
// When a Node is removed, we have to delete the port range for this Node from the map
func NewPortRegistry(client client.Client, gameServers *mpsv1alpha1.GameServerList, min, max int32, nodeCount int, setupLog logr.Logger) (*PortRegistry, error) {
if min > max {
return nil, errors.New("min port cannot be greater than max port")
}

pr := &PortRegistry{
HostPorts: make(map[int32]bool, max-min+1),
Indexes: make([]int32, max-min+1),
Min: min,
Max: max,
portRequests: make(chan struct{}, 100),
portResponses: make(chan int32, 100),
client: client,
Min: min,
Max: max,
lockMutex: sync.Mutex{},
}

// add the necessary set of ports to the map
for i := 0; i < nodeCount; i++ {
pr.onNodeAdded()
}

// gather ports for existing game servers
Expand All @@ -39,7 +59,6 @@ func NewPortRegistry(gameServers mpsv1alpha1.GameServerList, min, max int32, set
}

for _, container := range gs.Spec.Template.Spec.Containers {

portsExposed := make([]int32, len(container.Ports))
portsExposedIndex := 0

Expand All @@ -51,115 +70,152 @@ func NewPortRegistry(gameServers mpsv1alpha1.GameServerList, min, max int32, set
portsExposed[portsExposedIndex] = portInfo.HostPort
portsExposedIndex++
}

// and register them
pr.assignRegisteredPorts(portsExposed)
}

}
}

pr.assignUnregisteredPorts()

go pr.portProducer()

return pr, nil

}

func (pr *PortRegistry) displayRegistry() {
fmt.Printf("-------------------------------------\n")
fmt.Printf("Ports: %v\n", pr.HostPorts)
fmt.Printf("Indexes: %v\n", pr.Indexes)
fmt.Printf("NextIndex: %d\n", pr.NextFreePortIndex)
fmt.Printf("-------------------------------------\n")
}

// GetNewPort returns and registers a new port for the designated game server. Locks a mutex
func (pr *PortRegistry) GetNewPort() (int32, error) {
pr.portRequests <- struct{}{}

port := <-pr.portResponses

if port == -1 {
return -1, errors.New("cannot register a new port. No available ports")
// Reconcile runs when a Node is created/deleted or the node status changes
func (pr *PortRegistry) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
var nodeList v1.NodeList
err := pr.client.List(ctx, &nodeList)
if err != nil {
return ctrl.Result{}, err
}
// calculate how many nodes are ready and schedulable
schedulableNodesCount := 0
for i := 0; i < len(nodeList.Items); i++ {
if !nodeList.Items[i].Spec.Unschedulable {
schedulableNodesCount++
}
}
log.Info("Reconciling Nodes", "schedulableNodesCount", schedulableNodesCount, "currentNodesCount", pr.NodeCount)
// most probably it will just be a single node added or removed, but just in case
if pr.NodeCount > schedulableNodesCount {
for i := pr.NodeCount - 1; i >= schedulableNodesCount; i-- {
log.Info("Removing Node")
pr.onNodeRemoved()
}
} else if pr.NodeCount < schedulableNodesCount {
for i := pr.NodeCount; i < schedulableNodesCount; i++ {
log.Info("Adding Node")
pr.onNodeAdded()
}
}

return port, nil
return ctrl.Result{}, nil
}

func (pr *PortRegistry) portProducer() {
for range pr.portRequests { //wait till a new request comes

initialIndex := pr.NextFreePortIndex
for {
if !pr.HostPorts[pr.Indexes[pr.NextFreePortIndex]] {
//we found a port
port := pr.Indexes[pr.NextFreePortIndex]
pr.HostPorts[port] = true

pr.increaseNextFreePortIndex()

//port is set
pr.portResponses <- port
break
}

pr.increaseNextFreePortIndex()
// onNodeAdded is called when a Node is added to the cluster
func (pr *PortRegistry) onNodeAdded() {
defer pr.lockMutex.Unlock()
pr.lockMutex.Lock()
// create a new port range for the node
hostPorts := make(map[int32]bool, pr.Max-pr.Min+1)
for i := pr.Min; i <= pr.Max; i++ {
hostPorts[i] = false
}
// add it to the map
pr.HostPortsPerNode = append(pr.HostPortsPerNode, hostPorts)
pr.NodeCount = pr.NodeCount + 1
}

if initialIndex == pr.NextFreePortIndex {
//we did a full loop - no empty ports
pr.portResponses <- -1
break
// onNodeRemoved is called when a Node is removed from the cluster
// it removes one set of port ranges from the map
// we don't need to know which one was removed, we just need to move around the registered (set to true) ports
func (pr *PortRegistry) onNodeRemoved() {
defer pr.lockMutex.Unlock()
pr.lockMutex.Lock()
// we're removing the last Node port set
indexToRemove := len(pr.HostPortsPerNode) - 1
for port := pr.Min; port <= pr.Max; port++ {
if pr.HostPortsPerNode[indexToRemove][port] {
// find a new place (node) for this registered port
for i := 0; i < len(pr.HostPortsPerNode)-1; i++ {
if !pr.HostPortsPerNode[i][port] {
pr.HostPortsPerNode[i][port] = true
break
}
}
}
}
// removes the last item from the slice
pr.HostPortsPerNode = pr.HostPortsPerNode[:len(pr.HostPortsPerNode)-1]
pr.NodeCount = pr.NodeCount - 1
}

// Stop stops port registry mechanism by closing requests and responses channels
func (pr *PortRegistry) Stop() {
close(pr.portRequests)
close(pr.portResponses)
// GetNewPort returns and registers a new port for the designated game server
// One may wonder what happens if two GameServer Pods get assigned the same HostPort
// The answer is that we will not have a collision, since Kubernetes is pretty smart and will place the Pod on a different Node, to prevent collision
func (pr *PortRegistry) GetNewPort() (int32, error) {
defer pr.lockMutex.Unlock()
pr.lockMutex.Lock()
// loops through all the Node maps, returns the first available port
// we expect game servers to go up and down all the time, so the
// location of the first available port is non-deterministic
for nodeIndex := 0; nodeIndex < int(pr.NodeCount); nodeIndex++ {
for port := pr.Min; port <= pr.Max; port++ {
if !pr.HostPortsPerNode[nodeIndex][port] {
pr.HostPortsPerNode[nodeIndex][port] = true
return port, nil
}
}
}
return -1, errors.New("cannot register a new port. No available ports")
}

// DeregisterServerPorts deregisters all host ports so they can be re-used by additional game servers
func (pr *PortRegistry) DeregisterServerPorts(ports []int32) {
defer pr.lockMutex.Unlock()
pr.lockMutex.Lock()
for i := 0; i < len(ports); i++ {
pr.HostPorts[ports[i]] = false
for nodeIndex := 0; nodeIndex < pr.NodeCount; nodeIndex++ {
if pr.HostPortsPerNode[nodeIndex][ports[i]] {
// setting the port to false means it can be re-used
pr.HostPortsPerNode[nodeIndex][ports[i]] = false
break
}
}
}
}

// assignRegisteredPorts assigns ports that are already registered
// used for existing game servers and when the controller is updated/crashed and started again
func (pr *PortRegistry) assignRegisteredPorts(ports []int32) {
defer pr.lockMutex.Unlock()
pr.lockMutex.Lock()
for i := 0; i < len(ports); i++ {
pr.HostPorts[ports[i]] = true
pr.Indexes[i] = ports[i]
pr.increaseNextFreePortIndex()
}
}

func (pr *PortRegistry) assignUnregisteredPorts() {
i := pr.NextFreePortIndex
for _, port := range pr.getPorts() {
if _, ok := pr.HostPorts[port]; !ok {
pr.HostPorts[port] = false
pr.Indexes[i] = port
i++
for nodeIndex := 0; nodeIndex < pr.NodeCount; nodeIndex++ {
if pr.HostPortsPerNode[nodeIndex][ports[i]] {
// setting the port to true means it's registered
pr.HostPortsPerNode[nodeIndex][ports[i]] = true
break
}
}
}
}

func (pr *PortRegistry) increaseNextFreePortIndex() {
pr.NextFreePortIndex++
//reset the index if needed
if pr.NextFreePortIndex == pr.Max-pr.Min+1 {
pr.NextFreePortIndex = 0
}

}

func (pr *PortRegistry) getPorts() []int32 {
ports := make([]int32, pr.Max-pr.Min+1)
for i := 0; i < len(ports); i++ {
ports[i] = int32(pr.Min) + int32(i)
}
return ports
// SetupWithManager registers the PortRegistry controller with the manager
// we care to watch for changes in the Node objects, only if they are "Ready" and "Schedulable"
func (pr *PortRegistry) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1.Node{}).
WithEventFilter(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return IsNodeReadyAndSchedulable(e.Object.(*v1.Node))
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldNode := e.ObjectOld.(*v1.Node)
newNode := e.ObjectNew.(*v1.Node)
return IsNodeReadyAndSchedulable(oldNode) != IsNodeReadyAndSchedulable(newNode)
},
}).
Complete(pr)
}
Loading

0 comments on commit c27f5a0

Please sign in to comment.