Skip to content

Commit

Permalink
fix: stop initializing deployment informer if dynamic sharding is dis…
Browse files Browse the repository at this point in the history
…abled (argoproj#17097)

* fix: stop initializing deployment informer if dynamic sharding is disabled

Signed-off-by: Soumya Ghosh Dastidar <gdsoumya@gmail.com>

* feat: updated sharding cache getter func

Signed-off-by: Soumya Ghosh Dastidar <gdsoumya@gmail.com>

---------

Signed-off-by: Soumya Ghosh Dastidar <gdsoumya@gmail.com>
Signed-off-by: Kevin Lyda <kevin@lyda.ie>
  • Loading branch information
gdsoumya authored and lyda committed Mar 28, 2024
1 parent 71cdaa7 commit 34e69c1
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func NewCommand() *cobra.Command {
appController.InvalidateProjectsCache()
}))
kubectl := kubeutil.NewKubectl()
clusterSharding := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
clusterSharding, err := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
errors.CheckError(err)
appController, err = controller.NewApplicationController(
namespace,
settingsMgr,
Expand All @@ -170,6 +171,7 @@ func NewCommand() *cobra.Command {
applicationNamespaces,
&workqueueRateLimit,
serverSideDiff,
enableDynamicClusterDistribution,
)
errors.CheckError(err)
cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer())
Expand Down Expand Up @@ -238,29 +240,37 @@ func NewCommand() *cobra.Command {
return &command
}

func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) sharding.ClusterShardingCache {
var replicasCount int
func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (sharding.ClusterShardingCache, error) {
var (
replicasCount int
)
// StatefulSet mode and Deployment mode uses different default values for shard number.
defaultShardNumberValue := 0
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})

// if the application controller deployment was not found, the Get() call returns an empty Deployment object. So, set the variable to nil explicitly
if err != nil && kubeerrors.IsNotFound(err) {
appControllerDeployment = nil
}
if enableDynamicClusterDistribution {
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})

// if app controller deployment is not found when dynamic cluster distribution is enabled error out
if err != nil {
return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: %v", err)
}

if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
replicasCount = int(*appControllerDeployment.Spec.Replicas)
defaultShardNumberValue = -1
} else {
return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment replica count")
}

if enableDynamicClusterDistribution && appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
replicasCount = int(*appControllerDeployment.Spec.Replicas)
defaultShardNumberValue = -1
} else {
replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
}
shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, defaultShardNumberValue, -math.MaxInt32, math.MaxInt32)
if replicasCount > 1 {
// check for shard mapping using configmap if application-controller is a deployment
// else use existing logic to infer shard from pod name if application-controller is a statefulset
if enableDynamicClusterDistribution && appControllerDeployment != nil {
if enableDynamicClusterDistribution {
var err error
// retry 3 times if we find a conflict while updating shard mapping configMap.
// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
Expand Down Expand Up @@ -288,5 +298,5 @@ func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.
log.Info("Processing all cluster shards")
}
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm)
return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil
}
97 changes: 57 additions & 40 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ type ApplicationController struct {
appInformer cache.SharedIndexInformer
appLister applisters.ApplicationLister
projInformer cache.SharedIndexInformer
deploymentInformer informerv1.DeploymentInformer
appStateManager AppStateManager
stateCache statecache.LiveStateCache
statusRefreshTimeout time.Duration
Expand All @@ -130,6 +129,10 @@ type ApplicationController struct {
clusterSharding sharding.ClusterShardingCache
projByNameCache sync.Map
applicationNamespaces []string

// dynamicClusterDistributionEnabled if disabled deploymentInformer is never initialized
dynamicClusterDistributionEnabled bool
deploymentInformer informerv1.DeploymentInformer
}

// NewApplicationController creates new instance of ApplicationController.
Expand All @@ -155,6 +158,7 @@ func NewApplicationController(
applicationNamespaces []string,
rateLimiterConfig *ratelimiter.AppControllerRateLimiterConfig,
serverSideDiff bool,
dynamicClusterDistributionEnabled bool,
) (*ApplicationController, error) {
log.Infof("appResyncPeriod=%v, appHardResyncPeriod=%v, appResyncJitter=%v", appResyncPeriod, appHardResyncPeriod, appResyncJitter)
db := db.NewDB(namespace, settingsMgr, kubeClientset)
Expand All @@ -163,28 +167,29 @@ func NewApplicationController(
log.Info("Using default workqueue rate limiter config")
}
ctrl := ApplicationController{
cache: argoCache,
namespace: namespace,
kubeClientset: kubeClientset,
kubectl: kubectl,
applicationClientset: applicationClientset,
repoClientset: repoClientset,
appRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_reconciliation_queue"),
appOperationQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_operation_processing_queue"),
projectRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "project_reconciliation_queue"),
appComparisonTypeRefreshQueue: workqueue.NewRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig)),
db: db,
statusRefreshTimeout: appResyncPeriod,
statusHardRefreshTimeout: appHardResyncPeriod,
statusRefreshJitter: appResyncJitter,
refreshRequestedApps: make(map[string]CompareWith),
refreshRequestedAppsMutex: &sync.Mutex{},
auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController),
settingsMgr: settingsMgr,
selfHealTimeout: selfHealTimeout,
clusterSharding: clusterSharding,
projByNameCache: sync.Map{},
applicationNamespaces: applicationNamespaces,
cache: argoCache,
namespace: namespace,
kubeClientset: kubeClientset,
kubectl: kubectl,
applicationClientset: applicationClientset,
repoClientset: repoClientset,
appRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_reconciliation_queue"),
appOperationQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_operation_processing_queue"),
projectRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "project_reconciliation_queue"),
appComparisonTypeRefreshQueue: workqueue.NewRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig)),
db: db,
statusRefreshTimeout: appResyncPeriod,
statusHardRefreshTimeout: appHardResyncPeriod,
statusRefreshJitter: appResyncJitter,
refreshRequestedApps: make(map[string]CompareWith),
refreshRequestedAppsMutex: &sync.Mutex{},
auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController),
settingsMgr: settingsMgr,
selfHealTimeout: selfHealTimeout,
clusterSharding: clusterSharding,
projByNameCache: sync.Map{},
applicationNamespaces: applicationNamespaces,
dynamicClusterDistributionEnabled: dynamicClusterDistributionEnabled,
}
if kubectlParallelismLimit > 0 {
ctrl.kubectlSemaphore = semaphore.NewWeighted(kubectlParallelismLimit)
Expand Down Expand Up @@ -227,25 +232,33 @@ func NewApplicationController(
}

factory := informers.NewSharedInformerFactoryWithOptions(ctrl.kubeClientset, defaultDeploymentInformerResyncDuration, informers.WithNamespace(settingsMgr.GetNamespace()))
deploymentInformer := factory.Apps().V1().Deployments()

var deploymentInformer informerv1.DeploymentInformer

// only initialize deployment informer if dynamic distribution is enabled
if dynamicClusterDistributionEnabled {
deploymentInformer = factory.Apps().V1().Deployments()
}

readinessHealthCheck := func(r *http.Request) error {
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName)
if err != nil {
if kubeerrors.IsNotFound(err) {
appControllerDeployment = nil
} else {
return fmt.Errorf("error retrieving Application Controller Deployment: %s", err)
}
}
if appControllerDeployment != nil {
if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 {
return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas)
if dynamicClusterDistributionEnabled {
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName)
if err != nil {
if kubeerrors.IsNotFound(err) {
appControllerDeployment = nil
} else {
return fmt.Errorf("error retrieving Application Controller Deployment: %s", err)
}
}
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil {
return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err)
if appControllerDeployment != nil {
if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 {
return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas)
}
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil {
return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err)
}
}
}
return nil
Expand Down Expand Up @@ -773,7 +786,11 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int

go ctrl.appInformer.Run(ctx.Done())
go ctrl.projInformer.Run(ctx.Done())
go ctrl.deploymentInformer.Informer().Run(ctx.Done())

if ctrl.dynamicClusterDistributionEnabled {
// only start deployment informer if dynamic distribution is enabled
go ctrl.deploymentInformer.Informer().Run(ctx.Done())
}

clusters, err := ctrl.db.ListClusters(ctx)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions controller/appcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func newFakeController(data *fakeData, repoErr error) *ApplicationController {
nil,

false,
false,
)
db := &dbmocks.ArgoDB{}
db.On("GetApplicationControllerReplicas").Return(1)
Expand Down

0 comments on commit 34e69c1

Please sign in to comment.