Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(repo-server): excess git request when using multi-source and ref sources (Issue #14725) #16309

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/argocd/commands/headless/headless.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func (c *forwardCacheClient) Set(item *cache.Item) error {
})
}

func (c *forwardCacheClient) Get(key string, obj interface{}) error {
func (c *forwardCacheClient) Get(item *cache.Item) error {
return c.doLazy(func(client cache.CacheClient) error {
return client.Get(key, obj)
return client.Get(item)
})
}

Expand Down
196 changes: 164 additions & 32 deletions reposerver/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/argoproj/gitops-engine/pkg/utils/text"
"github.com/go-git/go-git/v5/plumbing"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -24,12 +25,16 @@ import (
"github.com/argoproj/argo-cd/v2/util/hash"
)

//go:generate go run github.com/vektra/mockery/v2@v2.37.0 --name=Cache

var ErrCacheMiss = cacheutil.ErrCacheMiss
var ErrCacheKeyLocked = cacheutil.ErrCacheKeyLocked

type Cache struct {
cache *cacheutil.Cache
repoCacheExpiration time.Duration
revisionCacheExpiration time.Duration
cache *cacheutil.Cache
repoCacheExpiration time.Duration
revisionCacheExpiration time.Duration
revisionCacheLockTimeout time.Duration
}

// ClusterRuntimeInfo holds cluster runtime information
Expand All @@ -41,7 +46,7 @@ type ClusterRuntimeInfo interface {
}

func NewCache(cache *cacheutil.Cache, repoCacheExpiration time.Duration, revisionCacheExpiration time.Duration) *Cache {
return &Cache{cache, repoCacheExpiration, revisionCacheExpiration}
return &Cache{cache, repoCacheExpiration, revisionCacheExpiration, 10 * time.Second}
}

func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...func(client *redis.Client)) func() (*Cache, error) {
Expand All @@ -54,7 +59,7 @@ func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...func(client *redis.Client))
repoFactory := cacheutil.AddCacheFlagsToCmd(cmd, opts...)

return func() (*Cache, error) {
cache, err := repoFactory()
cache, err := repoFactory(true)
if err != nil {
return nil, fmt.Errorf("error adding cache flags to cmd: %w", err)
}
Expand Down Expand Up @@ -141,12 +146,18 @@ func listApps(repoURL, revision string) string {

func (c *Cache) ListApps(repoUrl, revision string) (map[string]string, error) {
res := make(map[string]string)
err := c.cache.GetItem(listApps(repoUrl, revision), &res)
err := c.cache.GetItem(listApps(repoUrl, revision), &res, &cacheutil.CacheActionOpts{CacheType: cacheutil.CacheTypeExternal})
return res, err
}

func (c *Cache) SetApps(repoUrl, revision string, apps map[string]string) error {
return c.cache.SetItem(listApps(repoUrl, revision), apps, c.repoCacheExpiration, apps == nil)
return c.cache.SetItem(
listApps(repoUrl, revision),
apps,
&cacheutil.CacheActionOpts{
Expiration: c.repoCacheExpiration,
Delete: apps == nil,
CacheType: cacheutil.CacheTypeExternal})
}

func helmIndexRefsKey(repo string) string {
Expand All @@ -155,12 +166,19 @@ func helmIndexRefsKey(repo string) string {

// SetHelmIndex stores helm repository index.yaml content to cache
func (c *Cache) SetHelmIndex(repo string, indexData []byte) error {
return c.cache.SetItem(helmIndexRefsKey(repo), indexData, c.revisionCacheExpiration, false)
if indexData == nil {
// Logged as warning upstream
return fmt.Errorf("helm index data is nil, skipping cache")
}
return c.cache.SetItem(
helmIndexRefsKey(repo),
indexData,
&cacheutil.CacheActionOpts{Expiration: c.revisionCacheExpiration})
}

// GetHelmIndex retrieves helm repository index.yaml content from cache
func (c *Cache) GetHelmIndex(repo string, indexData *[]byte) error {
return c.cache.GetItem(helmIndexRefsKey(repo), indexData)
return c.cache.GetItem(helmIndexRefsKey(repo), indexData, nil)
}

func gitRefsKey(repo string) string {
Expand All @@ -173,21 +191,108 @@ func (c *Cache) SetGitReferences(repo string, references []*plumbing.Reference)
for i := range references {
input = append(input, references[i].Strings())
}
return c.cache.SetItem(gitRefsKey(repo), input, c.revisionCacheExpiration, false)
return c.cache.SetItem(gitRefsKey(repo), input, &cacheutil.CacheActionOpts{Expiration: c.revisionCacheExpiration})
}

func GitRefCacheItemToReferences(cacheItem [][2]string) *[]*plumbing.Reference {
var res []*plumbing.Reference
for i := range cacheItem {
// Skip empty data
if cacheItem[i][0] != "" || cacheItem[i][1] != "" {
res = append(res, plumbing.NewReferenceFromStrings(cacheItem[i][0], cacheItem[i][1]))
}
}
return &res
}

// GetGitReferences retrieves resolved Git repository references from cache
func (c *Cache) GetGitReferences(repo string, references *[]*plumbing.Reference) error {
// TryLockGitRefCache attempts to lock the key for the Git repository references if the key doesn't exist
func (c *Cache) TryLockGitRefCache(repo string, lockId string) error {
// This try set with DisableOverwrite is important for making sure that only one process is able to claim ownership
// A normal get + set, or just set would cause ownership to go to whoever the last writer was, and during race conditions
// leads to duplicate requests
err := c.cache.SetItem(gitRefsKey(repo), [][2]string{{cacheutil.CacheLockedValue, lockId}}, &cacheutil.CacheActionOpts{
Expiration: c.revisionCacheLockTimeout,
DisableOverwrite: true,
CacheType: cacheutil.CacheTypeExternal})
return err
}

func (c *Cache) GetGitReferences(repo string, lockId string, cacheType cacheutil.CacheType) (lockOwner string, references *[]*plumbing.Reference, err error) {
var input [][2]string
if err := c.cache.GetItem(gitRefsKey(repo), &input); err != nil {
return err
err = c.cache.GetItem(gitRefsKey(repo), &input, &cacheutil.CacheActionOpts{CacheType: cacheType, Expiration: c.revisionCacheExpiration})
if err == ErrCacheMiss {
// Expected
return "", nil, nil
} else if err == nil && len(input) > 0 && len(input[0]) > 0 {
if input[0][0] != cacheutil.CacheLockedValue {
// Valid value in cache, convert to plumbing.Reference and return
return "", GitRefCacheItemToReferences(input), nil
} else {
// The key lock is being held
return input[0][1], nil, nil
}
}
var res []*plumbing.Reference
for i := range input {
res = append(res, plumbing.NewReferenceFromStrings(input[i][0], input[i][1]))
return "", nil, err
}

// GetOrLockGitReferences retrieves the git references if they exist, otherwise creates a lock and returns so the caller can populate the cache
func (c *Cache) GetOrLockGitReferences(repo string, references *[]*plumbing.Reference) (updateCache bool, lockId string, err error) {
myLockUUID, err := uuid.NewRandom()
if err != nil {
log.Debug("Error generating git references cache lock id: ", err)
return false, "", err
}
*references = res
return nil
// We need to be able to identify that our lock was the successful one, otherwise we'll still have duplicate requests
myLockId := myLockUUID.String()
// Value matches the ttl on the lock in TryLockGitRefCache
waitUntil := time.Now().Add(c.revisionCacheLockTimeout)
// Wait only the maximum amount of time configured for the lock
for time.Now().Before(waitUntil) {
// Attempt to retrieve the key from local cache only
if _, cacheReferences, err := c.GetGitReferences(repo, myLockId, cacheutil.CacheTypeInMemory); err != nil || cacheReferences != nil {
if cacheReferences != nil {
*references = *cacheReferences
}
return false, myLockId, err
}
// Could not get key locally attempt to get the lock
err = c.TryLockGitRefCache(repo, myLockId)
if err != nil {
// Log but ignore this error since we'll want to retry, failing to obtain the lock should not throw an error
log.Errorf("Error attempting to acquire git references cache lock: %v", err)
}
// Attempt to retrieve the key again to see if we have the lock, or the key was populated
if lockOwner, cacheReferences, err := c.GetGitReferences(repo, myLockId, cacheutil.CacheTypeTwoLevel); err != nil || cacheReferences != nil {
if cacheReferences != nil {
// Someone else populated the key
*references = *cacheReferences
}
return false, myLockId, err
} else if lockOwner == myLockId {
// We have the lock, populate the key
return true, myLockId, nil
}
// Wait for lock, valid value, or timeout
time.Sleep(1 * time.Second)
}
// Timeout waiting for lock
log.Debug("Repository cache was unable to acquire lock or valid data within timeout")
return true, myLockId, nil
}

// UnlockGitReferences unlocks the key for the Git repository references if needed
func (c *Cache) UnlockGitReferences(repo string, lockId string) error {
var input [][2]string
var err error
if err = c.cache.GetItem(gitRefsKey(repo), &input, nil); err == nil &&
len(input) > 0 &&
len(input[0]) > 1 &&
input[0][0] == cacheutil.CacheLockedValue &&
input[0][1] == lockId {
// We have the lock, so remove it
return c.cache.SetItem(gitRefsKey(repo), input, &cacheutil.CacheActionOpts{Delete: true, CacheType: cacheutil.CacheTypeExternal})
}
return err
}

// refSourceCommitSHAs is a list of resolved revisions for each ref source. This allows us to invalidate the cache
Expand Down Expand Up @@ -226,7 +331,7 @@ func LogDebugManifestCacheKeyFields(message string, reason string, revision stri
}

func (c *Cache) GetManifests(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, clusterInfo ClusterRuntimeInfo, namespace string, trackingMethod string, appLabelKey string, appName string, res *CachedManifestResponse, refSourceCommitSHAs ResolvedRevisions) error {
err := c.cache.GetItem(manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs), res)
err := c.cache.GetItem(manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs), res, &cacheutil.CacheActionOpts{CacheType: cacheutil.CacheTypeExternal})

if err != nil {
return err
Expand Down Expand Up @@ -269,11 +374,20 @@ func (c *Cache) SetManifests(revision string, appSrc *appv1.ApplicationSource, s
res.CacheEntryHash = hash
}

return c.cache.SetItem(manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs), res, c.repoCacheExpiration, res == nil)
return c.cache.SetItem(
manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs),
res,
&cacheutil.CacheActionOpts{
Expiration: c.repoCacheExpiration,
Delete: res == nil,
CacheType: cacheutil.CacheTypeExternal})
}

func (c *Cache) DeleteManifests(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, clusterInfo ClusterRuntimeInfo, namespace, trackingMethod, appLabelKey, appName string, refSourceCommitSHAs ResolvedRevisions) error {
return c.cache.SetItem(manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs), "", c.repoCacheExpiration, true)
return c.cache.SetItem(
manifestCacheKey(revision, appSrc, srcRefs, namespace, trackingMethod, appLabelKey, appName, clusterInfo, refSourceCommitSHAs),
"",
&cacheutil.CacheActionOpts{Delete: true, CacheType: cacheutil.CacheTypeExternal})
}

func appDetailsCacheKey(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, trackingMethod appv1.TrackingMethod, refSourceCommitSHAs ResolvedRevisions) string {
Expand All @@ -284,11 +398,17 @@ func appDetailsCacheKey(revision string, appSrc *appv1.ApplicationSource, srcRef
}

func (c *Cache) GetAppDetails(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, res *apiclient.RepoAppDetailsResponse, trackingMethod appv1.TrackingMethod, refSourceCommitSHAs ResolvedRevisions) error {
return c.cache.GetItem(appDetailsCacheKey(revision, appSrc, srcRefs, trackingMethod, refSourceCommitSHAs), res)
return c.cache.GetItem(appDetailsCacheKey(revision, appSrc, srcRefs, trackingMethod, refSourceCommitSHAs), res, &cacheutil.CacheActionOpts{CacheType: cacheutil.CacheTypeExternal})
}

func (c *Cache) SetAppDetails(revision string, appSrc *appv1.ApplicationSource, srcRefs appv1.RefTargetRevisionMapping, res *apiclient.RepoAppDetailsResponse, trackingMethod appv1.TrackingMethod, refSourceCommitSHAs ResolvedRevisions) error {
return c.cache.SetItem(appDetailsCacheKey(revision, appSrc, srcRefs, trackingMethod, refSourceCommitSHAs), res, c.repoCacheExpiration, res == nil)
return c.cache.SetItem(
appDetailsCacheKey(revision, appSrc, srcRefs, trackingMethod, refSourceCommitSHAs),
res,
&cacheutil.CacheActionOpts{
Expiration: c.repoCacheExpiration,
Delete: res == nil,
CacheType: cacheutil.CacheTypeExternal})
}

func revisionMetadataKey(repoURL, revision string) string {
Expand All @@ -297,11 +417,14 @@ func revisionMetadataKey(repoURL, revision string) string {

func (c *Cache) GetRevisionMetadata(repoURL, revision string) (*appv1.RevisionMetadata, error) {
item := &appv1.RevisionMetadata{}
return item, c.cache.GetItem(revisionMetadataKey(repoURL, revision), item)
return item, c.cache.GetItem(revisionMetadataKey(repoURL, revision), item, nil)
}

func (c *Cache) SetRevisionMetadata(repoURL, revision string, item *appv1.RevisionMetadata) error {
return c.cache.SetItem(revisionMetadataKey(repoURL, revision), item, c.repoCacheExpiration, false)
return c.cache.SetItem(
revisionMetadataKey(repoURL, revision),
item,
&cacheutil.CacheActionOpts{Expiration: c.repoCacheExpiration})
}

func revisionChartDetailsKey(repoURL, chart, revision string) string {
Expand All @@ -310,37 +433,46 @@ func revisionChartDetailsKey(repoURL, chart, revision string) string {

func (c *Cache) GetRevisionChartDetails(repoURL, chart, revision string) (*appv1.ChartDetails, error) {
item := &appv1.ChartDetails{}
return item, c.cache.GetItem(revisionChartDetailsKey(repoURL, chart, revision), item)
return item, c.cache.GetItem(revisionChartDetailsKey(repoURL, chart, revision), item, nil)
}

func (c *Cache) SetRevisionChartDetails(repoURL, chart, revision string, item *appv1.ChartDetails) error {
return c.cache.SetItem(revisionChartDetailsKey(repoURL, chart, revision), item, c.repoCacheExpiration, false)
return c.cache.SetItem(
revisionChartDetailsKey(repoURL, chart, revision),
item,
&cacheutil.CacheActionOpts{Expiration: c.repoCacheExpiration})
}

func gitFilesKey(repoURL, revision, pattern string) string {
return fmt.Sprintf("gitfiles|%s|%s|%s", repoURL, revision, pattern)
}

func (c *Cache) SetGitFiles(repoURL, revision, pattern string, files map[string][]byte) error {
return c.cache.SetItem(gitFilesKey(repoURL, revision, pattern), &files, c.repoCacheExpiration, false)
return c.cache.SetItem(
gitFilesKey(repoURL, revision, pattern),
&files,
&cacheutil.CacheActionOpts{Expiration: c.repoCacheExpiration, CacheType: cacheutil.CacheTypeExternal})
}

func (c *Cache) GetGitFiles(repoURL, revision, pattern string) (map[string][]byte, error) {
var item map[string][]byte
return item, c.cache.GetItem(gitFilesKey(repoURL, revision, pattern), &item)
return item, c.cache.GetItem(gitFilesKey(repoURL, revision, pattern), &item, &cacheutil.CacheActionOpts{CacheType: cacheutil.CacheTypeExternal})
}

func gitDirectoriesKey(repoURL, revision string) string {
return fmt.Sprintf("gitdirs|%s|%s", repoURL, revision)
}

func (c *Cache) SetGitDirectories(repoURL, revision string, directories []string) error {
return c.cache.SetItem(gitDirectoriesKey(repoURL, revision), &directories, c.repoCacheExpiration, false)
return c.cache.SetItem(
gitDirectoriesKey(repoURL, revision),
&directories,
&cacheutil.CacheActionOpts{Expiration: c.repoCacheExpiration})
}

func (c *Cache) GetGitDirectories(repoURL, revision string) ([]string, error) {
var item []string
return item, c.cache.GetItem(gitDirectoriesKey(repoURL, revision), &item)
return item, c.cache.GetItem(gitDirectoriesKey(repoURL, revision), &item, nil)
}

func (cmr *CachedManifestResponse) shallowCopy() *CachedManifestResponse {
Expand Down
Loading
Loading