From 7f6a08a95581c6906d024364d8ef0f29ed08adbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Sat, 10 Feb 2024 16:47:46 +0100 Subject: [PATCH 01/14] fix: infer correct shard in statefulset setup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- .../commands/argocd_application_controller.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cmd/argocd-application-controller/commands/argocd_application_controller.go b/cmd/argocd-application-controller/commands/argocd_application_controller.go index 0ff9fa33c8254..74209bfed0de7 100644 --- a/cmd/argocd-application-controller/commands/argocd_application_controller.go +++ b/cmd/argocd-application-controller/commands/argocd_application_controller.go @@ -240,8 +240,6 @@ func NewCommand() *cobra.Command { func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) sharding.ClusterShardingCache { var replicasCount int - // StatefulSet mode and Deployment mode uses different default values for shard number. - defaultShardNumberValue := 0 applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{}) @@ -252,11 +250,10 @@ func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings. if enableDynamicClusterDistribution && appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil { replicasCount = int(*appControllerDeployment.Spec.Replicas) - defaultShardNumberValue = -1 } else { replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) } - shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, defaultShardNumberValue, -math.MaxInt32, math.MaxInt32) + shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32) if replicasCount > 1 { // check for shard mapping using configmap if application-controller is a deployment // else use existing logic to infer shard from pod name if application-controller is a statefulset From 14b7ec1710c630974036f805db284a8dbbbe9cc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Sat, 10 Feb 2024 17:30:53 +0100 Subject: [PATCH 02/14] fix the case if only a single replica MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- .../commands/argocd_application_controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/argocd-application-controller/commands/argocd_application_controller.go b/cmd/argocd-application-controller/commands/argocd_application_controller.go index 74209bfed0de7..58efccaff19de 100644 --- a/cmd/argocd-application-controller/commands/argocd_application_controller.go +++ b/cmd/argocd-application-controller/commands/argocd_application_controller.go @@ -283,6 +283,7 @@ func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings. } } else { log.Info("Processing all cluster shards") + shardNumber = 0 } db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient) return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm) From 80951b9a4a0aaaa602ac82b083da52127df55697 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Sun, 11 Feb 2024 11:43:18 +0100 Subject: [PATCH 03/14] fix: resolving pointer on shard compare MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- controller/sharding/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index d16574accdf8a..628b238b96d24 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -149,7 +149,7 @@ func hasShardingUpdates(old, new *v1alpha1.Cluster) bool { if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) { return false } - return old.Shard != new.Shard + return int64(*old.Shard) != int64(*new.Shard) } func (d *ClusterSharding) GetClusterAccessor() clusterAccessor { From a557f8d143c71e6c549efa556de18cec6ff9ba74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Sun, 11 Feb 2024 11:43:56 +0100 Subject: [PATCH 04/14] fix: add readlock for cluster accessor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- controller/sharding/cache.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index 628b238b96d24..a3a8131ba4c72 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -154,6 +154,8 @@ func hasShardingUpdates(old, new *v1alpha1.Cluster) bool { func (d *ClusterSharding) GetClusterAccessor() clusterAccessor { return func() []*v1alpha1.Cluster { + d.lock.RLock() + defer d.lock.RUnlock() clusters := make([]*v1alpha1.Cluster, 0, len(d.Clusters)) for _, c := range d.Clusters { clusters = append(clusters, c) From 9f505896b809d0011b6b910f0aae136e51ddb2a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Sun, 11 Feb 2024 12:33:17 +0100 Subject: [PATCH 05/14] fix: use defer to protect access of 'shard' MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- controller/sharding/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index a3a8131ba4c72..29a0a69fda6b5 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -111,8 +111,8 @@ func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) { func (sharding *ClusterSharding) GetDistribution() map[string]int { sharding.lock.RLock() + defer sharding.lock.RUnlock() shards := sharding.Shards - sharding.lock.RUnlock() distribution := make(map[string]int, len(shards)) for k, v := range shards { From 1bb3db9c6742f08ad21a67d331a4aa3301e53d19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Sun, 11 Feb 2024 17:17:31 +0100 Subject: [PATCH 06/14] fix: revert locking in getclusteraccessor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- controller/sharding/cache.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index 29a0a69fda6b5..7e2c37a2c37ee 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -154,8 +154,7 @@ func hasShardingUpdates(old, new *v1alpha1.Cluster) bool { func (d *ClusterSharding) GetClusterAccessor() clusterAccessor { return func() []*v1alpha1.Cluster { - d.lock.RLock() - defer d.lock.RUnlock() + // no need to lock, as this is only called from the updateDistribution function clusters := make([]*v1alpha1.Cluster, 0, len(d.Clusters)) for _, c := range d.Clusters { clusters = append(clusters, c) From a779b1c7e3876d2ea9ad0ef2c5b96adddc2de85d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Mon, 12 Feb 2024 20:00:50 +0100 Subject: [PATCH 07/14] fix: handle nil shard case MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- controller/sharding/cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index 69f7e4c851775..f2d92116a9bf5 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -111,8 +111,8 @@ func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) { func (sharding *ClusterSharding) GetDistribution() map[string]int { sharding.lock.RLock() + defer sharding.lock.RUnlock() shards := sharding.Shards - sharding.lock.RUnlock() distribution := make(map[string]int, len(shards)) for k, v := range shards { @@ -149,7 +149,7 @@ func hasShardingUpdates(old, new *v1alpha1.Cluster) bool { if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) { return false } - return int64(*old.Shard) != int64(*new.Shard) + return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard) } func (d *ClusterSharding) GetClusterAccessor() clusterAccessor { From 98e93b664afa0568777658ea3b4a4157f7656afb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Mon, 12 Feb 2024 21:39:30 +0100 Subject: [PATCH 08/14] fix: handle any nil shard value as false MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- controller/sharding/cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index f2d92116a9bf5..94355cb670060 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -146,10 +146,10 @@ func (sharding *ClusterSharding) updateDistribution() { // nil checking is done for the corner case of the in-cluster cluster which may // have a nil shard assigned func hasShardingUpdates(old, new *v1alpha1.Cluster) bool { - if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) { + if old == nil || new == nil || old.Shard == nil || new.Shard == nil { return false } - return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard) + return int64(*old.Shard) != int64(*new.Shard) } func (d *ClusterSharding) GetClusterAccessor() clusterAccessor { From d1f185d258a154aeb800f6a61d7109dd7893e05b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Mon, 12 Feb 2024 22:20:54 +0100 Subject: [PATCH 09/14] fix: handle nil case and fix another missing pointer dereference MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- controller/sharding/cache.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index 94355cb670060..79d5b456eb159 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -59,7 +59,7 @@ func (s *ClusterSharding) IsManagedCluster(c *v1alpha1.Cluster) bool { log.Warnf("The cluster %s has no assigned shard.", c.Server) } log.Debugf("Checking if cluster %s with clusterShard %d should be processed by shard %d", c.Server, clusterShard, s.Shard) - return clusterShard == s.Shard + return s.Shard != nil && clusterShard == int64(*s.Shard) } func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) { @@ -146,10 +146,10 @@ func (sharding *ClusterSharding) updateDistribution() { // nil checking is done for the corner case of the in-cluster cluster which may // have a nil shard assigned func hasShardingUpdates(old, new *v1alpha1.Cluster) bool { - if old == nil || new == nil || old.Shard == nil || new.Shard == nil { + if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) { return false } - return int64(*old.Shard) != int64(*new.Shard) + return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard) } func (d *ClusterSharding) GetClusterAccessor() clusterAccessor { From 08adee2a1be225307776c8e062679716b2fd2b9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Mon, 12 Feb 2024 22:25:47 +0100 Subject: [PATCH 10/14] revert MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- controller/sharding/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index 79d5b456eb159..f2d92116a9bf5 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -59,7 +59,7 @@ func (s *ClusterSharding) IsManagedCluster(c *v1alpha1.Cluster) bool { log.Warnf("The cluster %s has no assigned shard.", c.Server) } log.Debugf("Checking if cluster %s with clusterShard %d should be processed by shard %d", c.Server, clusterShard, s.Shard) - return s.Shard != nil && clusterShard == int64(*s.Shard) + return clusterShard == s.Shard } func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) { From 5b52b6d7f4a0a2dfb5cb475bef88f265c3719854 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Tue, 13 Feb 2024 09:45:08 +0100 Subject: [PATCH 11/14] fix: added tests and fixed some behaviour bugs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- controller/sharding/cache.go | 38 ++-- controller/sharding/cache_test.go | 341 ++++++++++++++++++++++++++++++ 2 files changed, 367 insertions(+), 12 deletions(-) create mode 100644 controller/sharding/cache_test.go diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index f2d92116a9bf5..dd6a41cfd179e 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -26,7 +26,7 @@ type ClusterSharding struct { getClusterShard DistributionFunction } -func NewClusterSharding(db db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache { +func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache { log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm) clusterSharding := &ClusterSharding{ Shard: shard, @@ -67,7 +67,8 @@ func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) { defer sharding.lock.Unlock() newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items)) for _, c := range clusters.Items { - newClusters[c.Server] = &c + cluster := c + newClusters[c.Server] = &cluster } sharding.Clusters = newClusters sharding.updateDistribution() @@ -122,9 +123,7 @@ func (sharding *ClusterSharding) GetDistribution() map[string]int { } func (sharding *ClusterSharding) updateDistribution() { - log.Info("Updating cluster shards") - - for _, c := range sharding.Clusters { + for k, c := range sharding.Clusters { shard := 0 if c.Shard != nil { requestedShard := int(*c.Shard) @@ -136,17 +135,32 @@ func (sharding *ClusterSharding) updateDistribution() { } else { shard = sharding.getClusterShard(c) } - var shard64 int64 = int64(shard) - c.Shard = &shard64 - sharding.Shards[c.Server] = shard + + existingShard, ok := sharding.Shards[k] + if ok && existingShard != shard { + log.Infof("Cluster %s has changed shard from %d to %d", k, existingShard, shard) + } else if !ok { + log.Infof("Cluster %s has been assigned to shard %d", k, shard) + } else { + log.Debugf("Cluster %s has not changed shard", k) + } + sharding.Shards[k] = shard } } -// hasShardingUpdates returns true if the sharding distribution has been updated. -// nil checking is done for the corner case of the in-cluster cluster which may -// have a nil shard assigned +// hasShardingUpdates returns true if the sharding distribution has explicitly changed func hasShardingUpdates(old, new *v1alpha1.Cluster) bool { - if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) { + if old == nil || new == nil { + return false + } + + // returns true if the cluster id has changed because some sharding algorithms depend on it. + if old.ID != new.ID { + return true + } + + // return false if the shard field has not been modified + if old.Shard == nil && new.Shard == nil { return false } return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard) diff --git a/controller/sharding/cache_test.go b/controller/sharding/cache_test.go new file mode 100644 index 0000000000000..ab431d39de44b --- /dev/null +++ b/controller/sharding/cache_test.go @@ -0,0 +1,341 @@ +package sharding + +import ( + "testing" + + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + dbmocks "github.com/argoproj/argo-cd/v2/util/db/mocks" + "github.com/stretchr/testify/assert" +) + +func setupTestSharding(shard int, replicas int) *ClusterSharding { + shardingAlgorithm := "legacy" // we are using the legacy algorithm as it is deterministic based on the cluster id + + db := &dbmocks.ArgoDB{} + + return NewClusterSharding(db, shard, replicas, shardingAlgorithm).(*ClusterSharding) +} + +func TestNewClusterSharding(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + assert.NotNil(t, sharding) + assert.Equal(t, shard, sharding.Shard) + assert.Equal(t, replicas, sharding.Replicas) + assert.NotNil(t, sharding.Shards) + assert.NotNil(t, sharding.Clusters) +} + +func TestClusterSharding_Add(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) // Implement a helper function to setup the test environment + + cluster := &v1alpha1.Cluster{ + ID: "2", + Server: "https://127.0.0.1:6443", + } + + sharding.Add(cluster) + + myCluster := v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + } + + sharding.Add(&myCluster) + + distribution := sharding.GetDistribution() + + assert.Contains(t, sharding.Clusters, cluster.Server) + assert.Contains(t, sharding.Clusters, myCluster.Server) + + clusterDistribution, ok := distribution[cluster.Server] + assert.True(t, ok) + assert.Equal(t, 1, clusterDistribution) + + myClusterDistribution, ok := distribution[myCluster.Server] + assert.True(t, ok) + assert.Equal(t, 0, myClusterDistribution) + + assert.Equal(t, 2, len(distribution)) +} + +func TestClusterSharding_AddRoundRobin(t *testing.T) { + shard := 1 + replicas := 2 + + db := &dbmocks.ArgoDB{} + + sharding := NewClusterSharding(db, shard, replicas, "round-robin").(*ClusterSharding) + + firstCluster := &v1alpha1.Cluster{ + ID: "1", + Server: "https://127.0.0.1:6443", + } + sharding.Add(firstCluster) + + secondCluster := v1alpha1.Cluster{ + ID: "2", + Server: "https://kubernetes.default.svc", + } + sharding.Add(&secondCluster) + + distribution := sharding.GetDistribution() + + assert.Contains(t, sharding.Clusters, firstCluster.Server) + assert.Contains(t, sharding.Clusters, secondCluster.Server) + + clusterDistribution, ok := distribution[firstCluster.Server] + assert.True(t, ok) + assert.Equal(t, 0, clusterDistribution) + + myClusterDistribution, ok := distribution[secondCluster.Server] + assert.True(t, ok) + assert.Equal(t, 1, myClusterDistribution) + + assert.Equal(t, 2, len(distribution)) +} + +func TestClusterSharding_Delete(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + sharding.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + }, + }, + ) + + sharding.Delete("https://kubernetes.default.svc") + distribution := sharding.GetDistribution() + assert.Equal(t, 1, len(distribution)) +} + +func TestClusterSharding_Update(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + sharding.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + }, + }, + ) + + distribution := sharding.GetDistribution() + assert.Equal(t, 2, len(distribution)) + + myClusterDistribution, ok := distribution["https://kubernetes.default.svc"] + assert.True(t, ok) + assert.Equal(t, 0, myClusterDistribution) + + sharding.Update(&v1alpha1.Cluster{ + ID: "4", + Server: "https://kubernetes.default.svc", + }) + + distribution = sharding.GetDistribution() + assert.Equal(t, 2, len(distribution)) + + myClusterDistribution, ok = distribution["https://kubernetes.default.svc"] + assert.True(t, ok) + assert.Equal(t, 1, myClusterDistribution) +} + +func TestClusterSharding_IsManagedCluster(t *testing.T) { + replicas := 2 + sharding0 := setupTestSharding(0, replicas) + + sharding0.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + }, + }, + ) + + assert.True(t, sharding0.IsManagedCluster(&v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + })) + + assert.False(t, sharding0.IsManagedCluster(&v1alpha1.Cluster{ + ID: "2", + Server: "https://127.0.0.1:6443", + })) + + sharding1 := setupTestSharding(1, replicas) + + sharding1.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + }, + }, + ) + + assert.False(t, sharding1.IsManagedCluster(&v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + })) + + assert.True(t, sharding1.IsManagedCluster(&v1alpha1.Cluster{ + ID: "2", + Server: "https://127.0.0.1:6443", + })) + +} + +func Int64Ptr(i int64) *int64 { + return &i +} + +func TestHasShardingUpdates(t *testing.T) { + testCases := []struct { + name string + old *v1alpha1.Cluster + new *v1alpha1.Cluster + expected bool + }{ + { + name: "No updates", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(1), + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(1), + }, + expected: false, + }, + { + name: "Updates", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(1), + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + expected: true, + }, + { + name: "Old is nil", + old: nil, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + expected: false, + }, + { + name: "New is nil", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + new: nil, + expected: false, + }, + { + name: "Both are nil", + old: nil, + new: nil, + expected: false, + }, + { + name: "Both shards are nil", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: nil, + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: nil, + }, + expected: false, + }, + { + name: "Old shard is nil", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: nil, + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + expected: true, + }, + { + name: "New shard is nil", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: nil, + }, + expected: true, + }, + { + name: "Cluster ID has changed", + old: &v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + new: &v1alpha1.Cluster{ + ID: "2", + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + expected: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, hasShardingUpdates(tc.old, tc.new)) + }) + } +} From 4fa4cc2572ead3c3ddfa5db5a1ffc13bd33304d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Tue, 13 Feb 2024 11:18:39 +0100 Subject: [PATCH 12/14] test: add test to validate that Shard value is not overriden MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- controller/sharding/cache_test.go | 144 ++++++++++++++++++++++-------- 1 file changed, 108 insertions(+), 36 deletions(-) diff --git a/controller/sharding/cache_test.go b/controller/sharding/cache_test.go index ab431d39de44b..1a8a12eab636b 100644 --- a/controller/sharding/cache_test.go +++ b/controller/sharding/cache_test.go @@ -9,10 +9,8 @@ import ( ) func setupTestSharding(shard int, replicas int) *ClusterSharding { - shardingAlgorithm := "legacy" // we are using the legacy algorithm as it is deterministic based on the cluster id - + shardingAlgorithm := "legacy" // we are using the legacy algorithm as it is deterministic based on the cluster id which is easier to test db := &dbmocks.ArgoDB{} - return NewClusterSharding(db, shard, replicas, shardingAlgorithm).(*ClusterSharding) } @@ -31,39 +29,39 @@ func TestNewClusterSharding(t *testing.T) { func TestClusterSharding_Add(t *testing.T) { shard := 1 replicas := 2 - sharding := setupTestSharding(shard, replicas) // Implement a helper function to setup the test environment + sharding := setupTestSharding(shard, replicas) - cluster := &v1alpha1.Cluster{ + clusterA := &v1alpha1.Cluster{ ID: "2", Server: "https://127.0.0.1:6443", } - sharding.Add(cluster) + sharding.Add(clusterA) - myCluster := v1alpha1.Cluster{ + clusterB := v1alpha1.Cluster{ ID: "1", Server: "https://kubernetes.default.svc", } - sharding.Add(&myCluster) + sharding.Add(&clusterB) distribution := sharding.GetDistribution() - assert.Contains(t, sharding.Clusters, cluster.Server) - assert.Contains(t, sharding.Clusters, myCluster.Server) + assert.Contains(t, sharding.Clusters, clusterA.Server) + assert.Contains(t, sharding.Clusters, clusterB.Server) - clusterDistribution, ok := distribution[cluster.Server] + clusterDistribution, ok := distribution[clusterA.Server] assert.True(t, ok) assert.Equal(t, 1, clusterDistribution) - myClusterDistribution, ok := distribution[myCluster.Server] + myClusterDistribution, ok := distribution[clusterB.Server] assert.True(t, ok) assert.Equal(t, 0, myClusterDistribution) assert.Equal(t, 2, len(distribution)) } -func TestClusterSharding_AddRoundRobin(t *testing.T) { +func TestClusterSharding_AddRoundRobin_Redistributes(t *testing.T) { shard := 1 replicas := 2 @@ -71,32 +69,56 @@ func TestClusterSharding_AddRoundRobin(t *testing.T) { sharding := NewClusterSharding(db, shard, replicas, "round-robin").(*ClusterSharding) - firstCluster := &v1alpha1.Cluster{ + clusterA := &v1alpha1.Cluster{ ID: "1", Server: "https://127.0.0.1:6443", } - sharding.Add(firstCluster) + sharding.Add(clusterA) - secondCluster := v1alpha1.Cluster{ - ID: "2", + clusterB := v1alpha1.Cluster{ + ID: "3", Server: "https://kubernetes.default.svc", } - sharding.Add(&secondCluster) + sharding.Add(&clusterB) - distribution := sharding.GetDistribution() + distributionBefore := sharding.GetDistribution() - assert.Contains(t, sharding.Clusters, firstCluster.Server) - assert.Contains(t, sharding.Clusters, secondCluster.Server) + assert.Contains(t, sharding.Clusters, clusterA.Server) + assert.Contains(t, sharding.Clusters, clusterB.Server) - clusterDistribution, ok := distribution[firstCluster.Server] + clusterDistributionA, ok := distributionBefore[clusterA.Server] assert.True(t, ok) - assert.Equal(t, 0, clusterDistribution) + assert.Equal(t, 0, clusterDistributionA) - myClusterDistribution, ok := distribution[secondCluster.Server] + clusterDistributionB, ok := distributionBefore[clusterB.Server] assert.True(t, ok) - assert.Equal(t, 1, myClusterDistribution) + assert.Equal(t, 1, clusterDistributionB) - assert.Equal(t, 2, len(distribution)) + assert.Equal(t, 2, len(distributionBefore)) + + clusterC := v1alpha1.Cluster{ + ID: "2", + Server: "https://1.1.1.1", + } + sharding.Add(&clusterC) + + distributionAfter := sharding.GetDistribution() + + assert.Contains(t, sharding.Clusters, clusterA.Server) + assert.Contains(t, sharding.Clusters, clusterB.Server) + assert.Contains(t, sharding.Clusters, clusterC.Server) + + clusterDistributionA, ok = distributionAfter[clusterA.Server] + assert.True(t, ok) + assert.Equal(t, 0, clusterDistributionA) + + clusterDistributionC, ok := distributionAfter[clusterC.Server] + assert.True(t, ok) + assert.Equal(t, 1, clusterDistributionC) // will be assigned to shard 1 because the .ID is smaller then the "B" cluster + + clusterDistributionB, ok = distributionAfter[clusterB.Server] + assert.True(t, ok) + assert.Equal(t, 0, clusterDistributionB) // will be reassigned to shard 0 because the .ID is bigger then the "C" cluster } func TestClusterSharding_Delete(t *testing.T) { @@ -144,24 +166,24 @@ func TestClusterSharding_Update(t *testing.T) { }, ) - distribution := sharding.GetDistribution() - assert.Equal(t, 2, len(distribution)) + distributionBefore := sharding.GetDistribution() + assert.Equal(t, 2, len(distributionBefore)) - myClusterDistribution, ok := distribution["https://kubernetes.default.svc"] + distributionA, ok := distributionBefore["https://kubernetes.default.svc"] assert.True(t, ok) - assert.Equal(t, 0, myClusterDistribution) + assert.Equal(t, 0, distributionA) sharding.Update(&v1alpha1.Cluster{ ID: "4", Server: "https://kubernetes.default.svc", }) - distribution = sharding.GetDistribution() - assert.Equal(t, 2, len(distribution)) + distributionAfter := sharding.GetDistribution() + assert.Equal(t, 2, len(distributionAfter)) - myClusterDistribution, ok = distribution["https://kubernetes.default.svc"] + distributionA, ok = distributionAfter["https://kubernetes.default.svc"] assert.True(t, ok) - assert.Equal(t, 1, myClusterDistribution) + assert.Equal(t, 1, distributionA) } func TestClusterSharding_IsManagedCluster(t *testing.T) { @@ -222,11 +244,61 @@ func TestClusterSharding_IsManagedCluster(t *testing.T) { } -func Int64Ptr(i int64) *int64 { - return &i +func TestClusterSharding_ClusterShardOfResourceShouldNotBeChanged(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + Int64Ptr := func(i int64) *int64 { + return &i + } + + clusterWithNil := &v1alpha1.Cluster{ + ID: "2", + Server: "https://127.0.0.1:6443", + Shard: nil, + } + + clusterWithValue := &v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(1), + } + + clusterWithToBigValue := &v1alpha1.Cluster{ + ID: "3", + Server: "https://1.1.1.1", + Shard: Int64Ptr(999), // shard value is explicitly bigger than the number of replicas + } + + sharding.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + *clusterWithNil, + *clusterWithValue, + *clusterWithToBigValue, + }, + }, + ) + distribution := sharding.GetDistribution() + assert.Equal(t, 3, len(distribution)) + + assert.Nil(t, sharding.Clusters[clusterWithNil.Server].Shard) + + assert.NotNil(t, sharding.Clusters[clusterWithValue.Server].Shard) + assert.Equal(t, int64(1), *sharding.Clusters[clusterWithValue.Server].Shard) + assert.Equal(t, 1, distribution[clusterWithValue.Server]) + + assert.NotNil(t, sharding.Clusters[clusterWithToBigValue.Server].Shard) + assert.Equal(t, int64(999), *sharding.Clusters[clusterWithToBigValue.Server].Shard) + assert.Equal(t, 0, distribution[clusterWithToBigValue.Server]) // will be assigned to shard 0 because the value is bigger than the number of replicas } func TestHasShardingUpdates(t *testing.T) { + Int64Ptr := func(i int64) *int64 { + return &i + } + testCases := []struct { name string old *v1alpha1.Cluster From 81c920db0be12a7d9dc6a1e851a67203ece6d7a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Tue, 13 Feb 2024 13:06:58 +0100 Subject: [PATCH 13/14] fix: added tests and fixe the case when server is changed inside a secret MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- controller/cache/cache.go | 2 +- controller/sharding/cache.go | 17 ++++++--- controller/sharding/cache_test.go | 62 +++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 6 deletions(-) diff --git a/controller/cache/cache.go b/controller/cache/cache.go index e3b1d7b77f19d..d1ae8989cd8e6 100644 --- a/controller/cache/cache.go +++ b/controller/cache/cache.go @@ -751,7 +751,7 @@ func (c *liveStateCache) handleAddEvent(cluster *appv1.Cluster) { } func (c *liveStateCache) handleModEvent(oldCluster *appv1.Cluster, newCluster *appv1.Cluster) { - c.clusterSharding.Update(newCluster) + c.clusterSharding.Update(oldCluster, newCluster) c.lock.Lock() cluster, ok := c.clusters[newCluster.Server] c.lock.Unlock() diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index dd6a41cfd179e..3818e7381f3ab 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -12,7 +12,7 @@ type ClusterShardingCache interface { Init(clusters *v1alpha1.ClusterList) Add(c *v1alpha1.Cluster) Delete(clusterServer string) - Update(c *v1alpha1.Cluster) + Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster) IsManagedCluster(c *v1alpha1.Cluster) bool GetDistribution() map[string]int } @@ -97,13 +97,16 @@ func (sharding *ClusterSharding) Delete(clusterServer string) { } } -func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) { +func (sharding *ClusterSharding) Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster) { sharding.lock.Lock() defer sharding.lock.Unlock() - old, ok := sharding.Clusters[c.Server] - sharding.Clusters[c.Server] = c - if !ok || hasShardingUpdates(old, c) { + if _, ok := sharding.Clusters[oldCluster.Server]; ok && oldCluster.Server != newCluster.Server { + delete(sharding.Clusters, oldCluster.Server) + delete(sharding.Shards, oldCluster.Server) + } + sharding.Clusters[newCluster.Server] = newCluster + if hasShardingUpdates(oldCluster, newCluster) { sharding.updateDistribution() } else { log.Debugf("Skipping sharding distribution update. No relevant changes") @@ -159,6 +162,10 @@ func hasShardingUpdates(old, new *v1alpha1.Cluster) bool { return true } + if old.Server != new.Server { + return true + } + // return false if the shard field has not been modified if old.Shard == nil && new.Shard == nil { return false diff --git a/controller/sharding/cache_test.go b/controller/sharding/cache_test.go index 1a8a12eab636b..ed3da752e7279 100644 --- a/controller/sharding/cache_test.go +++ b/controller/sharding/cache_test.go @@ -174,6 +174,9 @@ func TestClusterSharding_Update(t *testing.T) { assert.Equal(t, 0, distributionA) sharding.Update(&v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + }, &v1alpha1.Cluster{ ID: "4", Server: "https://kubernetes.default.svc", }) @@ -186,6 +189,51 @@ func TestClusterSharding_Update(t *testing.T) { assert.Equal(t, 1, distributionA) } +func TestClusterSharding_UpdateServerName(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + sharding.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + }, + }, + ) + + distributionBefore := sharding.GetDistribution() + assert.Equal(t, 2, len(distributionBefore)) + + distributionA, ok := distributionBefore["https://kubernetes.default.svc"] + assert.True(t, ok) + assert.Equal(t, 0, distributionA) + + sharding.Update(&v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + }, &v1alpha1.Cluster{ + ID: "1", + Server: "https://server2", + }) + + distributionAfter := sharding.GetDistribution() + assert.Equal(t, 2, len(distributionAfter)) + + _, ok = distributionAfter["https://kubernetes.default.svc"] + assert.False(t, ok) // the old server name should not be present anymore + + _, ok = distributionAfter["https://server2"] + assert.True(t, ok) // the new server name should be present +} + func TestClusterSharding_IsManagedCluster(t *testing.T) { replicas := 2 sharding0 := setupTestSharding(0, replicas) @@ -403,6 +451,20 @@ func TestHasShardingUpdates(t *testing.T) { }, expected: true, }, + { + name: "Server has changed", + old: &v1alpha1.Cluster{ + ID: "1", + Server: "https://server1", + Shard: Int64Ptr(2), + }, + new: &v1alpha1.Cluster{ + ID: "1", + Server: "https://server2", + Shard: Int64Ptr(2), + }, + expected: true, + }, } for _, tc := range testCases { From 4194e89a1341ef9d4046877a192808fece646197 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Tue, 13 Feb 2024 14:03:11 +0100 Subject: [PATCH 14/14] tests: add test cases for infering the shard logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- .../commands/argocd_application_controller.go | 61 +----- controller/sharding/sharding.go | 60 +++++- controller/sharding/sharding_test.go | 189 ++++++++++++++++++ 3 files changed, 249 insertions(+), 61 deletions(-) diff --git a/cmd/argocd-application-controller/commands/argocd_application_controller.go b/cmd/argocd-application-controller/commands/argocd_application_controller.go index 4e97946cd220c..a5fec90f6b972 100644 --- a/cmd/argocd-application-controller/commands/argocd_application_controller.go +++ b/cmd/argocd-application-controller/commands/argocd_application_controller.go @@ -10,8 +10,6 @@ import ( "github.com/redis/go-redis/v9" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - kubeerrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -26,7 +24,6 @@ import ( cacheutil "github.com/argoproj/argo-cd/v2/util/cache" appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate" "github.com/argoproj/argo-cd/v2/util/cli" - "github.com/argoproj/argo-cd/v2/util/db" "github.com/argoproj/argo-cd/v2/util/env" "github.com/argoproj/argo-cd/v2/util/errors" kubeutil "github.com/argoproj/argo-cd/v2/util/kube" @@ -147,7 +144,7 @@ func NewCommand() *cobra.Command { appController.InvalidateProjectsCache() })) kubectl := kubeutil.NewKubectl() - clusterSharding, err := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution) + clusterSharding, err := sharding.GetClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution) errors.CheckError(err) appController, err = controller.NewApplicationController( namespace, @@ -239,59 +236,3 @@ func NewCommand() *cobra.Command { }) return &command } - -func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (sharding.ClusterShardingCache, error) { - var replicasCount int - if enableDynamicClusterDistribution { - applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) - appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{}) - - // if app controller deployment is not found when dynamic cluster distribution is enabled error out - if err != nil { - return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: %v", err) - } - - if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil { - replicasCount = int(*appControllerDeployment.Spec.Replicas) - } else { - return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment replica count") - } - - } else { - replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) - } - shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32) - if replicasCount > 1 { - // check for shard mapping using configmap if application-controller is a deployment - // else use existing logic to infer shard from pod name if application-controller is a statefulset - if enableDynamicClusterDistribution { - var err error - // retry 3 times if we find a conflict while updating shard mapping configMap. - // If we still see conflicts after the retries, wait for next iteration of heartbeat process. - for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ { - shardNumber, err = sharding.GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicasCount, shardNumber) - if err != nil && !kubeerrors.IsConflict(err) { - err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err) - break - } - log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i) - } - errors.CheckError(err) - } else { - if shardNumber < 0 { - var err error - shardNumber, err = sharding.InferShard() - errors.CheckError(err) - } - if shardNumber > replicasCount { - log.Warnf("Calculated shard number %d is greated than the number of replicas count. Defaulting to 0", shardNumber) - shardNumber = 0 - } - } - } else { - log.Info("Processing all cluster shards") - shardNumber = 0 - } - db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient) - return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil -} diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index 2b86ed3f82bc6..49d38711a74f6 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "hash/fnv" + "math" "os" "sort" "strconv" @@ -20,6 +21,7 @@ import ( "github.com/argoproj/argo-cd/v2/util/db" "github.com/argoproj/argo-cd/v2/util/env" + "github.com/argoproj/argo-cd/v2/util/errors" "github.com/argoproj/argo-cd/v2/util/settings" log "github.com/sirupsen/logrus" kubeerrors "k8s.io/apimachinery/pkg/api/errors" @@ -206,7 +208,7 @@ func createClusterIndexByClusterIdMap(getCluster clusterAccessor) map[string]int // The function takes the shard number from the environment variable (default value -1, if not set) and passes it to this function. // If the shard value passed to this function is -1, that is, the shard was not set as an environment variable, // we default the shard number to 0 for computing the default config map. -func GetOrUpdateShardFromConfigMap(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, replicas, shard int) (int, error) { +func GetOrUpdateShardFromConfigMap(kubeClient kubernetes.Interface, settingsMgr *settings.SettingsManager, replicas, shard int) (int, error) { hostname, err := osHostnameFunction() if err != nil { return -1, err @@ -363,3 +365,59 @@ func getDefaultShardMappingData(replicas int) []shardApplicationControllerMappin } return shardMappingData } + +func GetClusterSharding(kubeClient kubernetes.Interface, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (ClusterShardingCache, error) { + var replicasCount int + if enableDynamicClusterDistribution { + applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) + appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{}) + + // if app controller deployment is not found when dynamic cluster distribution is enabled error out + if err != nil { + return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: %v", err) + } + + if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil { + replicasCount = int(*appControllerDeployment.Spec.Replicas) + } else { + return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment replica count") + } + + } else { + replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) + } + shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32) + if replicasCount > 1 { + // check for shard mapping using configmap if application-controller is a deployment + // else use existing logic to infer shard from pod name if application-controller is a statefulset + if enableDynamicClusterDistribution { + var err error + // retry 3 times if we find a conflict while updating shard mapping configMap. + // If we still see conflicts after the retries, wait for next iteration of heartbeat process. + for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ { + shardNumber, err = GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicasCount, shardNumber) + if err != nil && !kubeerrors.IsConflict(err) { + err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err) + break + } + log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i) + } + errors.CheckError(err) + } else { + if shardNumber < 0 { + var err error + shardNumber, err = InferShard() + errors.CheckError(err) + } + if shardNumber > replicasCount { + log.Warnf("Calculated shard number %d is greated than the number of replicas count. Defaulting to 0", shardNumber) + shardNumber = 0 + } + } + } else { + log.Info("Processing all cluster shards") + shardNumber = 0 + } + db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient) + return NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil +} diff --git a/controller/sharding/sharding_test.go b/controller/sharding/sharding_test.go index 0992f7a9dfd7f..15f834f190259 100644 --- a/controller/sharding/sharding_test.go +++ b/controller/sharding/sharding_test.go @@ -1,6 +1,7 @@ package sharding import ( + "context" "encoding/json" "errors" "fmt" @@ -12,10 +13,14 @@ import ( "github.com/argoproj/argo-cd/v2/common" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" dbmocks "github.com/argoproj/argo-cd/v2/util/db/mocks" + "github.com/argoproj/argo-cd/v2/util/settings" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kubefake "k8s.io/client-go/kubernetes/fake" ) func TestGetShardByID_NotEmptyID(t *testing.T) { @@ -681,3 +686,187 @@ func Test_getOrUpdateShardNumberForController(t *testing.T) { }) } } + +func TestGetClusterSharding(t *testing.T) { + IntPtr := func(i int32) *int32 { + return &i + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: common.DefaultApplicationControllerName, + Namespace: "argocd", + }, + Spec: appsv1.DeploymentSpec{ + Replicas: IntPtr(1), + }, + } + + deploymentMultiReplicas := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "argocd-application-controller-multi-replicas", + Namespace: "argocd", + }, + Spec: appsv1.DeploymentSpec{ + Replicas: IntPtr(3), + }, + } + + objects := append([]runtime.Object{}, deployment, deploymentMultiReplicas) + kubeclientset := kubefake.NewSimpleClientset(objects...) + + settingsMgr := settings.NewSettingsManager(context.TODO(), kubeclientset, "argocd", settings.WithRepoOrClusterChangedHandler(func() { + })) + + testCases := []struct { + name string + useDynamicSharding bool + envsSetter func(t *testing.T) + cleanup func() + expectedShard int + expectedReplicas int + expectedErr error + }{ + { + name: "Default sharding with statefulset", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvControllerReplicas, "1") + }, + cleanup: func() {}, + useDynamicSharding: false, + expectedShard: 0, + expectedReplicas: 1, + expectedErr: nil, + }, + { + name: "Default sharding with deployment", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvAppControllerName, common.DefaultApplicationControllerName) + }, + cleanup: func() {}, + useDynamicSharding: true, + expectedShard: 0, + expectedReplicas: 1, + expectedErr: nil, + }, + { + name: "Default sharding with deployment and multiple replicas", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvAppControllerName, "argocd-application-controller-multi-replicas") + }, + cleanup: func() {}, + useDynamicSharding: true, + expectedShard: 0, + expectedReplicas: 3, + expectedErr: nil, + }, + { + name: "Statefulset multiple replicas", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvControllerReplicas, "3") + osHostnameFunction = func() (string, error) { return "example-shard-3", nil } + }, + cleanup: func() { + osHostnameFunction = os.Hostname + }, + useDynamicSharding: false, + expectedShard: 3, + expectedReplicas: 3, + expectedErr: nil, + }, + { + name: "Explicit shard with statefulset and 1 replica", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvControllerReplicas, "1") + t.Setenv(common.EnvControllerShard, "3") + }, + cleanup: func() {}, + useDynamicSharding: false, + expectedShard: 0, + expectedReplicas: 1, + expectedErr: nil, + }, + { + name: "Explicit shard with statefulset and 2 replica - and to high shard", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvControllerReplicas, "2") + t.Setenv(common.EnvControllerShard, "3") + }, + cleanup: func() {}, + useDynamicSharding: false, + expectedShard: 0, + expectedReplicas: 2, + expectedErr: nil, + }, + { + name: "Explicit shard with statefulset and 2 replica", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvControllerReplicas, "2") + t.Setenv(common.EnvControllerShard, "1") + }, + cleanup: func() {}, + useDynamicSharding: false, + expectedShard: 1, + expectedReplicas: 2, + expectedErr: nil, + }, + { + name: "Explicit shard with deployment", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvControllerShard, "3") + }, + cleanup: func() {}, + useDynamicSharding: true, + expectedShard: 0, + expectedReplicas: 1, + expectedErr: nil, + }, + { + name: "Explicit shard with deployment and multiple replicas will read from configmap", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvAppControllerName, "argocd-application-controller-multi-replicas") + t.Setenv(common.EnvControllerShard, "3") + }, + cleanup: func() {}, + useDynamicSharding: true, + expectedShard: 0, + expectedReplicas: 3, + expectedErr: nil, + }, + { + name: "Dynamic sharding but missing deployment", + envsSetter: func(t *testing.T) { + t.Setenv(common.EnvAppControllerName, "missing-deployment") + }, + cleanup: func() {}, + useDynamicSharding: true, + expectedShard: 0, + expectedReplicas: 1, + expectedErr: fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: deployments.apps \"missing-deployment\" not found"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.envsSetter(t) + defer tc.cleanup() + shardingCache, err := GetClusterSharding(kubeclientset, settingsMgr, "round-robin", tc.useDynamicSharding) + + if shardingCache != nil { + clusterSharding := shardingCache.(*ClusterSharding) + assert.Equal(t, tc.expectedShard, clusterSharding.Shard) + assert.Equal(t, tc.expectedReplicas, clusterSharding.Replicas) + } + + if tc.expectedErr != nil { + if err != nil { + assert.Equal(t, tc.expectedErr.Error(), err.Error()) + } else { + t.Errorf("Expected error %v but got nil", tc.expectedErr) + } + } else { + assert.Nil(t, err) + } + }) + } +}