Skip to content

Commit

Permalink
feat: updated sharding cache getter func
Browse files Browse the repository at this point in the history
Signed-off-by: Soumya Ghosh Dastidar <gdsoumya@gmail.com>
  • Loading branch information
gdsoumya committed Feb 9, 2024
1 parent c1b0177 commit c2d7650
Showing 1 changed file with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
appsv1 "k8s.io/api/apps/v1"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -148,7 +147,8 @@ func NewCommand() *cobra.Command {
appController.InvalidateProjectsCache()
}))
kubectl := kubeutil.NewKubectl()
clusterSharding := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
clusterSharding, err := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
errors.CheckError(err)
appController, err = controller.NewApplicationController(
namespace,
settingsMgr,
Expand Down Expand Up @@ -240,37 +240,37 @@ func NewCommand() *cobra.Command {
return &command
}

func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) sharding.ClusterShardingCache {
var replicasCount int
// StatefulSet mode and Deployment mode uses different default values for shard number.
defaultShardNumberValue := 0
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)

func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (sharding.ClusterShardingCache, error) {
var (
appControllerDeployment *appsv1.Deployment
err error
replicasCount int
)
// StatefulSet mode and Deployment mode uses different default values for shard number.
defaultShardNumberValue := 0

// skip deployment fetching if dynamic sharding is disabled
if enableDynamicClusterDistribution {
appControllerDeployment, err = kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})
// if the application controller deployment was not found, the Get() call returns an empty Deployment object. So, set the variable to nil explicitly
if err != nil && kubeerrors.IsNotFound(err) {
appControllerDeployment = nil
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})

// if app controller deployment is not found when dynamic cluster distribution is enabled error out
if err != nil {
return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: %v", err)
}

if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
replicasCount = int(*appControllerDeployment.Spec.Replicas)
defaultShardNumberValue = -1
} else {
return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment replica count")
}
}

if enableDynamicClusterDistribution && appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
replicasCount = int(*appControllerDeployment.Spec.Replicas)
defaultShardNumberValue = -1
} else {
replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
}
shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, defaultShardNumberValue, -math.MaxInt32, math.MaxInt32)
if replicasCount > 1 {
// check for shard mapping using configmap if application-controller is a deployment
// else use existing logic to infer shard from pod name if application-controller is a statefulset
if enableDynamicClusterDistribution && appControllerDeployment != nil {
if enableDynamicClusterDistribution {
var err error
// retry 3 times if we find a conflict while updating shard mapping configMap.
// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
Expand Down Expand Up @@ -298,5 +298,5 @@ func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.
log.Info("Processing all cluster shards")
}
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm)
return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil
}

0 comments on commit c2d7650

Please sign in to comment.