diff --git a/cmd/argocd/commands/admin/cluster.go b/cmd/argocd/commands/admin/cluster.go index abb055cdfa354..2e833a68927f4 100644 --- a/cmd/argocd/commands/admin/cluster.go +++ b/cmd/argocd/commands/admin/cluster.go @@ -86,8 +86,12 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie if err != nil { return nil, err } + appItems, err := appClient.ArgoprojV1alpha1().Applications(namespace).List(ctx, v1.ListOptions{}) + if err != nil { + return nil, err + } clusterShardingCache := sharding.NewClusterSharding(argoDB, shard, replicas, shardingAlgorithm) - clusterShardingCache.Init(clustersList) + clusterShardingCache.Init(clustersList, appItems) clusterShards := clusterShardingCache.GetDistribution() var cache *appstatecache.Cache @@ -113,10 +117,6 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie } } - appItems, err := appClient.ArgoprojV1alpha1().Applications(namespace).List(ctx, v1.ListOptions{}) - if err != nil { - return nil, err - } apps := appItems.Items for i, app := range apps { err := argo.ValidateDestination(ctx, &app.Spec.Destination, argoDB) @@ -129,12 +129,6 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie batchSize := 10 batchesCount := int(math.Ceil(float64(len(clusters)) / float64(batchSize))) - clusterSharding := &sharding.ClusterSharding{ - Shard: shard, - Replicas: replicas, - Shards: make(map[string]int), - Clusters: make(map[string]*v1alpha1.Cluster), - } for batchNum := 0; batchNum < batchesCount; batchNum++ { batchStart := batchSize * batchNum batchEnd := batchSize * (batchNum + 1) @@ -146,9 +140,7 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie clusterShard := 0 cluster := batch[i] if replicas > 0 { - distributionFunction := sharding.GetDistributionFunction(clusterSharding.GetClusterAccessor(), common.DefaultShardingAlgorithm, replicas) - distributionFunction(&cluster) - clusterShard := clusterShards[cluster.Server] + clusterShard = clusterShards[cluster.Server] cluster.Shard = pointer.Int64(int64(clusterShard)) log.Infof("Cluster with uid: %s will be processed by shard %d", cluster.ID, clusterShard) } diff --git a/controller/appcontroller.go b/controller/appcontroller.go index b60e2124c8841..9d89b6e6b37d6 100644 --- a/controller/appcontroller.go +++ b/controller/appcontroller.go @@ -796,7 +796,13 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int if err != nil { log.Warnf("Cannot init sharding. Error while querying clusters list from database: %v", err) } else { - ctrl.clusterSharding.Init(clusters) + appItems, err := ctrl.getAppList(metav1.ListOptions{}) + + if err != nil { + log.Warnf("Cannot init sharding. Error while querying application list from database: %v", err) + } else { + ctrl.clusterSharding.Init(clusters, appItems) + } } errors.CheckError(ctrl.stateCache.Init()) @@ -2106,6 +2112,10 @@ func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.Shar ctrl.appRefreshQueue.AddRateLimited(key) ctrl.appOperationQueue.AddRateLimited(key) } + newApp, newOK := obj.(*appv1.Application) + if err == nil && newOK { + ctrl.clusterSharding.AddApp(newApp) + } }, UpdateFunc: func(old, new interface{}) { if !ctrl.canProcessApp(new) { @@ -2136,6 +2146,7 @@ func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.Shar ctrl.requestAppRefresh(newApp.QualifiedName(), compareWith, delay) ctrl.appOperationQueue.AddRateLimited(key) + ctrl.clusterSharding.UpdateApp(newApp) }, DeleteFunc: func(obj interface{}) { if !ctrl.canProcessApp(obj) { @@ -2148,6 +2159,10 @@ func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.Shar // for deletes, we immediately add to the refresh queue ctrl.appRefreshQueue.Add(key) } + delApp, delOK := obj.(*appv1.Application) + if err == nil && delOK { + ctrl.clusterSharding.DeleteApp(delApp) + } }, }, ) @@ -2223,4 +2238,26 @@ func (ctrl *ApplicationController) toAppQualifiedName(appName, appNamespace stri return fmt.Sprintf("%s/%s", appNamespace, appName) } +func (ctrl *ApplicationController) getAppList(options metav1.ListOptions) (*appv1.ApplicationList, error) { + watchNamespace := ctrl.namespace + // If we have at least one additional namespace configured, we need to + // watch on them all. + if len(ctrl.applicationNamespaces) > 0 { + watchNamespace = "" + } + + appList, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(watchNamespace).List(context.TODO(), options) + if err != nil { + return nil, err + } + newItems := []appv1.Application{} + for _, app := range appList.Items { + if ctrl.isAppNamespaceAllowed(&app) { + newItems = append(newItems, app) + } + } + appList.Items = newItems + return appList, nil +} + type ClusterFilterFunction func(c *appv1.Cluster, distributionFunction sharding.DistributionFunction) bool diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index 3818e7381f3ab..2f3ffcbcb95c6 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -9,12 +9,16 @@ import ( ) type ClusterShardingCache interface { - Init(clusters *v1alpha1.ClusterList) + Init(clusters *v1alpha1.ClusterList, apps *v1alpha1.ApplicationList) Add(c *v1alpha1.Cluster) Delete(clusterServer string) Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster) + AddApp(a *v1alpha1.Application) + DeleteApp(a *v1alpha1.Application) + UpdateApp(a *v1alpha1.Application) IsManagedCluster(c *v1alpha1.Cluster) bool GetDistribution() map[string]int + GetAppDistribution() map[string]int } type ClusterSharding struct { @@ -22,6 +26,7 @@ type ClusterSharding struct { Replicas int Shards map[string]int Clusters map[string]*v1alpha1.Cluster + Apps map[string]*v1alpha1.Application lock sync.RWMutex getClusterShard DistributionFunction } @@ -33,11 +38,12 @@ func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm stri Replicas: replicas, Shards: make(map[string]int), Clusters: make(map[string]*v1alpha1.Cluster), + Apps: make(map[string]*v1alpha1.Application), } distributionFunction := NoShardingDistributionFunction() if replicas > 1 { log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm) - distributionFunction = GetDistributionFunction(clusterSharding.GetClusterAccessor(), shardingAlgorithm, replicas) + distributionFunction = GetDistributionFunction(clusterSharding.getClusterAccessor(), clusterSharding.getAppAccessor(), shardingAlgorithm, replicas) } else { log.Info("Processing all cluster shards") } @@ -62,7 +68,7 @@ func (s *ClusterSharding) IsManagedCluster(c *v1alpha1.Cluster) bool { return clusterShard == s.Shard } -func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) { +func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList, apps *v1alpha1.ApplicationList) { sharding.lock.Lock() defer sharding.lock.Unlock() newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items)) @@ -71,6 +77,13 @@ func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) { newClusters[c.Server] = &cluster } sharding.Clusters = newClusters + + newApps := make(map[string]*v1alpha1.Application, len(apps.Items)) + for i := range apps.Items { + app := apps.Items[i] + newApps[app.Name] = &app + } + sharding.Apps = newApps sharding.updateDistribution() } @@ -173,7 +186,8 @@ func hasShardingUpdates(old, new *v1alpha1.Cluster) bool { return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard) } -func (d *ClusterSharding) GetClusterAccessor() clusterAccessor { +// A read lock should be acquired before calling getClusterAccessor. +func (d *ClusterSharding) getClusterAccessor() clusterAccessor { return func() []*v1alpha1.Cluster { // no need to lock, as this is only called from the updateDistribution function clusters := make([]*v1alpha1.Cluster, 0, len(d.Clusters)) @@ -183,3 +197,68 @@ func (d *ClusterSharding) GetClusterAccessor() clusterAccessor { return clusters } } + +// A read lock should be acquired before calling getAppAccessor. +func (d *ClusterSharding) getAppAccessor() appAccessor { + return func() []*v1alpha1.Application { + apps := make([]*v1alpha1.Application, 0, len(d.Apps)) + for _, a := range d.Apps { + apps = append(apps, a) + } + return apps + } +} + +func (sharding *ClusterSharding) AddApp(a *v1alpha1.Application) { + sharding.lock.Lock() + defer sharding.lock.Unlock() + + _, ok := sharding.Apps[a.Name] + sharding.Apps[a.Name] = a + if !ok { + sharding.updateDistribution() + } else { + log.Debugf("Skipping sharding distribution update. App already added") + } +} + +func (sharding *ClusterSharding) DeleteApp(a *v1alpha1.Application) { + sharding.lock.Lock() + defer sharding.lock.Unlock() + if _, ok := sharding.Apps[a.Name]; ok { + delete(sharding.Apps, a.Name) + sharding.updateDistribution() + } +} + +func (sharding *ClusterSharding) UpdateApp(a *v1alpha1.Application) { + sharding.lock.Lock() + defer sharding.lock.Unlock() + + _, ok := sharding.Apps[a.Name] + sharding.Apps[a.Name] = a + if !ok { + sharding.updateDistribution() + } else { + log.Debugf("Skipping sharding distribution update. No relevant changes") + } +} + +// GetAppDistribution should be not be called from a DestributionFunction because +// it could cause a deadlock when updateDistribution is called. +func (sharding *ClusterSharding) GetAppDistribution() map[string]int { + sharding.lock.RLock() + clusters := sharding.Clusters + apps := sharding.Apps + sharding.lock.RUnlock() + + appDistribution := make(map[string]int, len(clusters)) + + for _, a := range apps { + if _, ok := appDistribution[a.Spec.Destination.Server]; !ok { + appDistribution[a.Spec.Destination.Server] = 0 + } + appDistribution[a.Spec.Destination.Server]++ + } + return appDistribution +} diff --git a/controller/sharding/cache_test.go b/controller/sharding/cache_test.go index ed3da752e7279..f7798c31e3608 100644 --- a/controller/sharding/cache_test.go +++ b/controller/sharding/cache_test.go @@ -139,6 +139,12 @@ func TestClusterSharding_Delete(t *testing.T) { }, }, }, + &v1alpha1.ApplicationList{ + Items: []v1alpha1.Application{ + createApp("app2", "https://127.0.0.1:6443"), + createApp("app1", "https://kubernetes.default.svc"), + }, + }, ) sharding.Delete("https://kubernetes.default.svc") @@ -164,6 +170,12 @@ func TestClusterSharding_Update(t *testing.T) { }, }, }, + &v1alpha1.ApplicationList{ + Items: []v1alpha1.Application{ + createApp("app2", "https://127.0.0.1:6443"), + createApp("app1", "https://kubernetes.default.svc"), + }, + }, ) distributionBefore := sharding.GetDistribution() @@ -207,6 +219,12 @@ func TestClusterSharding_UpdateServerName(t *testing.T) { }, }, }, + &v1alpha1.ApplicationList{ + Items: []v1alpha1.Application{ + createApp("app2", "https://127.0.0.1:6443"), + createApp("app1", "https://kubernetes.default.svc"), + }, + }, ) distributionBefore := sharding.GetDistribution() @@ -251,6 +269,12 @@ func TestClusterSharding_IsManagedCluster(t *testing.T) { }, }, }, + &v1alpha1.ApplicationList{ + Items: []v1alpha1.Application{ + createApp("app2", "https://127.0.0.1:6443"), + createApp("app1", "https://kubernetes.default.svc"), + }, + }, ) assert.True(t, sharding0.IsManagedCluster(&v1alpha1.Cluster{ @@ -278,6 +302,12 @@ func TestClusterSharding_IsManagedCluster(t *testing.T) { }, }, }, + &v1alpha1.ApplicationList{ + Items: []v1alpha1.Application{ + createApp("app2", "https://127.0.0.1:6443"), + createApp("app1", "https://kubernetes.default.svc"), + }, + }, ) assert.False(t, sharding1.IsManagedCluster(&v1alpha1.Cluster{ @@ -327,6 +357,12 @@ func TestClusterSharding_ClusterShardOfResourceShouldNotBeChanged(t *testing.T) *clusterWithToBigValue, }, }, + &v1alpha1.ApplicationList{ + Items: []v1alpha1.Application{ + createApp("app2", "https://127.0.0.1:6443"), + createApp("app1", "https://kubernetes.default.svc"), + }, + }, ) distribution := sharding.GetDistribution() assert.Equal(t, 3, len(distribution)) diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index 568e12c51eda1..e4af7010931c6 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -43,6 +43,7 @@ const ShardControllerMappingKey = "shardControllerMapping" type DistributionFunction func(c *v1alpha1.Cluster) int type ClusterFilterFunction func(c *v1alpha1.Cluster) bool type clusterAccessor func() []*v1alpha1.Cluster +type appAccessor func() []*v1alpha1.Application // shardApplicationControllerMapping stores the mapping of Shard Number to Application Controller in ConfigMap. // It also stores the heartbeat of last synced time of the application controller. @@ -75,7 +76,7 @@ func GetClusterFilter(db db.ArgoDB, distributionFunction DistributionFunction, r // GetDistributionFunction returns which DistributionFunction should be used based on the passed algorithm and // the current datas. -func GetDistributionFunction(clusters clusterAccessor, shardingAlgorithm string, replicasCount int) DistributionFunction { +func GetDistributionFunction(clusters clusterAccessor, apps appAccessor, shardingAlgorithm string, replicasCount int) DistributionFunction { log.Debugf("Using filter function: %s", shardingAlgorithm) distributionFunction := LegacyDistributionFunction(replicasCount) switch shardingAlgorithm { diff --git a/controller/sharding/sharding_test.go b/controller/sharding/sharding_test.go index 0c6d4452ff94d..1c338aac5f271 100644 --- a/controller/sharding/sharding_test.go +++ b/controller/sharding/sharding_test.go @@ -21,6 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" kubefake "k8s.io/client-go/kubernetes/fake" + "sigs.k8s.io/yaml" ) func TestGetShardByID_NotEmptyID(t *testing.T) { @@ -101,13 +102,14 @@ func TestGetClusterFilterLegacy(t *testing.T) { func TestGetClusterFilterUnknown(t *testing.T) { clusterAccessor, db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters() + appAccessor, _, _, _, _, _ := createTestApps() // Test with replicas set to 0 t.Setenv(common.EnvControllerReplicas, "2") os.Unsetenv(common.EnvControllerShardingAlgorithm) t.Setenv(common.EnvControllerShardingAlgorithm, "unknown") replicasCount := 2 db.On("GetApplicationControllerReplicas").Return(replicasCount) - distributionFunction := GetDistributionFunction(clusterAccessor, "unknown", replicasCount) + distributionFunction := GetDistributionFunction(clusterAccessor, appAccessor, "unknown", replicasCount) assert.Equal(t, 0, distributionFunction(nil)) assert.Equal(t, 0, distributionFunction(&cluster1)) assert.Equal(t, 1, distributionFunction(&cluster2)) @@ -119,9 +121,10 @@ func TestLegacyGetClusterFilterWithFixedShard(t *testing.T) { //shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) t.Setenv(common.EnvControllerReplicas, "5") clusterAccessor, db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters() + appAccessor, _, _, _, _, _ := createTestApps() replicasCount := 5 db.On("GetApplicationControllerReplicas").Return(replicasCount) - filter := GetDistributionFunction(clusterAccessor, common.DefaultShardingAlgorithm, replicasCount) + filter := GetDistributionFunction(clusterAccessor, appAccessor, common.DefaultShardingAlgorithm, replicasCount) assert.Equal(t, 0, filter(nil)) assert.Equal(t, 4, filter(&cluster1)) assert.Equal(t, 1, filter(&cluster2)) @@ -131,13 +134,13 @@ func TestLegacyGetClusterFilterWithFixedShard(t *testing.T) { var fixedShard int64 = 4 cluster5 := &v1alpha1.Cluster{ID: "5", Shard: &fixedShard} clusterAccessor = getClusterAccessor([]v1alpha1.Cluster{cluster1, cluster2, cluster2, cluster4, *cluster5}) - filter = GetDistributionFunction(clusterAccessor, common.DefaultShardingAlgorithm, replicasCount) + filter = GetDistributionFunction(clusterAccessor, appAccessor, common.DefaultShardingAlgorithm, replicasCount) assert.Equal(t, int(fixedShard), filter(cluster5)) fixedShard = 1 cluster5.Shard = &fixedShard clusterAccessor = getClusterAccessor([]v1alpha1.Cluster{cluster1, cluster2, cluster2, cluster4, *cluster5}) - filter = GetDistributionFunction(clusterAccessor, common.DefaultShardingAlgorithm, replicasCount) + filter = GetDistributionFunction(clusterAccessor, appAccessor, common.DefaultShardingAlgorithm, replicasCount) assert.Equal(t, int(fixedShard), filter(&v1alpha1.Cluster{ID: "4", Shard: &fixedShard})) } @@ -145,10 +148,11 @@ func TestRoundRobinGetClusterFilterWithFixedShard(t *testing.T) { //shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) t.Setenv(common.EnvControllerReplicas, "4") clusterAccessor, db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters() + appAccessor, _, _, _, _, _ := createTestApps() replicasCount := 4 db.On("GetApplicationControllerReplicas").Return(replicasCount) - filter := GetDistributionFunction(clusterAccessor, common.RoundRobinShardingAlgorithm, replicasCount) + filter := GetDistributionFunction(clusterAccessor, appAccessor, common.RoundRobinShardingAlgorithm, replicasCount) assert.Equal(t, filter(nil), 0) assert.Equal(t, filter(&cluster1), 0) assert.Equal(t, filter(&cluster2), 1) @@ -161,14 +165,14 @@ func TestRoundRobinGetClusterFilterWithFixedShard(t *testing.T) { cluster5 := v1alpha1.Cluster{Name: "cluster5", ID: "5", Shard: &fixedShard} clusters := []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5} clusterAccessor = getClusterAccessor(clusters) - filter = GetDistributionFunction(clusterAccessor, common.RoundRobinShardingAlgorithm, replicasCount) + filter = GetDistributionFunction(clusterAccessor, appAccessor, common.RoundRobinShardingAlgorithm, replicasCount) assert.Equal(t, int(fixedShard), filter(&cluster5)) fixedShard = 1 cluster5 = v1alpha1.Cluster{Name: "cluster5", ID: "5", Shard: &fixedShard} clusters = []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5} clusterAccessor = getClusterAccessor(clusters) - filter = GetDistributionFunction(clusterAccessor, common.RoundRobinShardingAlgorithm, replicasCount) + filter = GetDistributionFunction(clusterAccessor, appAccessor, common.RoundRobinShardingAlgorithm, replicasCount) assert.Equal(t, int(fixedShard), filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard})) } @@ -870,3 +874,81 @@ func TestGetClusterSharding(t *testing.T) { }) } } + +func TestAppAwareCache(t *testing.T) { + _, db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters() + _, app1, app2, app3, app4, app5 := createTestApps() + + clusterSharding := NewClusterSharding(db, 0, 1, "legacy") + + clusterList := &v1alpha1.ClusterList{Items: []v1alpha1.Cluster{cluster1, cluster2, cluster3, cluster4, cluster5}} + appList := &v1alpha1.ApplicationList{Items: []v1alpha1.Application{app1, app2, app3, app4, app5}} + clusterSharding.Init(clusterList, appList) + + appDistribution := clusterSharding.GetAppDistribution() + + assert.Equal(t, 2, appDistribution["cluster1"]) + assert.Equal(t, 2, appDistribution["cluster2"]) + assert.Equal(t, 1, appDistribution["cluster3"]) + + app6 := createApp("app6", "cluster4") + clusterSharding.AddApp(&app6) + + app1Update := createApp("app1", "cluster2") + clusterSharding.UpdateApp(&app1Update) + + clusterSharding.DeleteApp(&app3) + + appDistribution = clusterSharding.GetAppDistribution() + + assert.Equal(t, 1, appDistribution["cluster1"]) + assert.Equal(t, 2, appDistribution["cluster2"]) + assert.Equal(t, 1, appDistribution["cluster3"]) + assert.Equal(t, 1, appDistribution["cluster4"]) +} + +func createTestApps() (appAccessor, v1alpha1.Application, v1alpha1.Application, v1alpha1.Application, v1alpha1.Application, v1alpha1.Application) { + app1 := createApp("app1", "cluster1") + app2 := createApp("app2", "cluster1") + app3 := createApp("app3", "cluster2") + app4 := createApp("app4", "cluster2") + app5 := createApp("app5", "cluster3") + + apps := []v1alpha1.Application{app1, app2, app3, app4, app5} + + return getAppAccessor(apps), app1, app2, app3, app4, app5 +} + +func getAppAccessor(apps []v1alpha1.Application) appAccessor { + // Convert the array to a slice of pointers + appPointers := getAppPointers(apps) + appAccessor := func() []*v1alpha1.Application { return appPointers } + return appAccessor +} + +func getAppPointers(apps []v1alpha1.Application) []*v1alpha1.Application { + var appPointers []*v1alpha1.Application + for i := range apps { + appPointers = append(appPointers, &apps[i]) + } + return appPointers +} + +func createApp(name string, server string) v1alpha1.Application { + var testApp = ` +apiVersion: argoproj.io/v1alpha1 +kind: Application +metadata: + name: ` + name + ` +spec: + destination: + server: ` + server + ` +` + + var app v1alpha1.Application + err := yaml.Unmarshal([]byte(testApp), &app) + if err != nil { + panic(err) + } + return app +}