diff --git a/changelog/unreleased/jsoncs3-concurrency.md b/changelog/unreleased/jsoncs3-concurrency.md new file mode 100644 index 0000000000..4c5d5128b9 --- /dev/null +++ b/changelog/unreleased/jsoncs3-concurrency.md @@ -0,0 +1,3 @@ +Bugfix: concurrently invalidate mtime cache in jsoncs3 share manager + +https://github.com/cs3org/reva/pull/3933 diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 2d301ba830..15ad3509dd 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -33,6 +33,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" "google.golang.org/genproto/protobuf/field_mask" gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" @@ -115,6 +116,7 @@ func init() { type config struct { GatewayAddr string `mapstructure:"gateway_addr"` + MaxConcurrency int `mapstructure:"max_concurrency"` ProviderAddr string `mapstructure:"provider_addr"` ServiceUserID string `mapstructure:"service_user_id"` ServiceUserIdp string `mapstructure:"service_user_idp"` @@ -145,6 +147,8 @@ type Manager struct { initialized bool + MaxConcurrency int + gateway gatewayv1beta1.GatewayAPIClient eventStream events.Stream } @@ -205,11 +209,11 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) { } } - return New(s, gc, c.CacheTTL, es) + return New(s, gc, c.CacheTTL, es, c.MaxConcurrency) } // New returns a new manager instance. -func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, es events.Stream) (*Manager, error) { +func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) { ttl := time.Duration(ttlSeconds) * time.Second return &Manager{ Cache: providercache.New(s, ttl), @@ -219,6 +223,7 @@ func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, storage: s, gateway: gc, eventStream: es, + MaxConcurrency: maxconcurrency, }, nil } @@ -703,7 +708,6 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati m.Lock() defer m.Unlock() - var rss []*collaboration.ReceivedShare user := ctxpkg.ContextMustGetUser(ctx) ssids := map[string]*receivedsharecache.Space{} @@ -750,46 +754,98 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati } } - for ssid, rspace := range ssids { - storageID, spaceID, _ := shareid.Decode(ssid) - err := m.Cache.Sync(ctx, storageID, spaceID) - if err != nil { - continue - } - for shareID, state := range rspace.States { - s := m.Cache.Get(storageID, spaceID, shareID) - if s == nil { - continue + numWorkers := m.MaxConcurrency + if numWorkers == 0 || len(ssids) < numWorkers { + numWorkers = len(ssids) + } + + type w struct { + ssid string + rspace *receivedsharecache.Space + } + work := make(chan w) + results := make(chan *collaboration.ReceivedShare) + + g, ctx := errgroup.WithContext(ctx) + + // Distribute work + g.Go(func() error { + defer close(work) + for ssid, rspace := range ssids { + select { + case work <- w{ssid, rspace}: + case <-ctx.Done(): + return ctx.Err() } - if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). - Msg("failed to unshare expired share") - } - if err := events.Publish(m.eventStream, events.ShareExpired{ - ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), - ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), - GranteeUserID: s.GetGrantee().GetUserId(), - GranteeGroupID: s.GetGrantee().GetGroupId(), - }); err != nil { - log.Error().Err(err). - Msg("failed to publish share expired event") + } + return nil + }) + + // Spawn workers that'll concurrently work the queue + for i := 0; i < numWorkers; i++ { + g.Go(func() error { + for w := range work { + storageID, spaceID, _ := shareid.Decode(w.ssid) + err := m.Cache.Sync(ctx, storageID, spaceID) + if err != nil { + continue } - continue - } + for shareID, state := range w.rspace.States { + s := m.Cache.Get(storageID, spaceID, shareID) + if s == nil { + continue + } + if share.IsExpired(s) { + if err := m.removeShare(ctx, s); err != nil { + log.Error().Err(err). + Msg("failed to unshare expired share") + } + if err := events.Publish(m.eventStream, events.ShareExpired{ + ShareOwner: s.GetOwner(), + ItemID: s.GetResourceId(), + ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), + GranteeUserID: s.GetGrantee().GetUserId(), + GranteeGroupID: s.GetGrantee().GetGroupId(), + }); err != nil { + log.Error().Err(err). + Msg("failed to publish share expired event") + } + continue + } - if share.IsGrantedToUser(s, user) { - if share.MatchesFiltersWithState(s, state.State, filters) { - rs := &collaboration.ReceivedShare{ - Share: s, - State: state.State, - MountPoint: state.MountPoint, + if share.IsGrantedToUser(s, user) { + if share.MatchesFiltersWithState(s, state.State, filters) { + rs := &collaboration.ReceivedShare{ + Share: s, + State: state.State, + MountPoint: state.MountPoint, + } + select { + case results <- rs: + case <-ctx.Done(): + return ctx.Err() + } + } } - rss = append(rss, rs) } } - } + return nil + }) + } + + // Wait for things to settle down, then close results chan + go func() { + _ = g.Wait() // error is checked later + close(results) + }() + + rss := []*collaboration.ReceivedShare{} + for n := range results { + rss = append(rss, n) + } + + if err := g.Wait(); err != nil { + return nil, err } return rss, nil diff --git a/pkg/share/manager/jsoncs3/jsoncs3_test.go b/pkg/share/manager/jsoncs3/jsoncs3_test.go index 8806dde1d9..63beca5ec0 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3_test.go +++ b/pkg/share/manager/jsoncs3/jsoncs3_test.go @@ -155,7 +155,7 @@ var _ = Describe("Jsoncs3", func() { Expect(err).ToNot(HaveOccurred()) client = &mocks.GatewayAPIClient{} - m, err = jsoncs3.New(storage, client, 0, nil) + m, err = jsoncs3.New(storage, client, 0, nil, 0) Expect(err).ToNot(HaveOccurred()) }) @@ -250,7 +250,7 @@ var _ = Describe("Jsoncs3", func() { }) Expect(s).ToNot(BeNil()) - m, err = jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache + m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) s = shareBykey(&collaboration.ShareKey{ @@ -444,7 +444,7 @@ var _ = Describe("Jsoncs3", func() { }) It("loads the cache when it doesn't have an entry", func() { - m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) s, err := m.GetShare(ctx, shareRef) @@ -504,7 +504,7 @@ var _ = Describe("Jsoncs3", func() { }) Expect(err).ToNot(HaveOccurred()) - m, err = jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache + m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) s, err := m.GetShare(ctx, &collaboration.ShareReference{ @@ -617,7 +617,7 @@ var _ = Describe("Jsoncs3", func() { Expect(us).ToNot(BeNil()) Expect(us.GetPermissions().GetPermissions().InitiateFileUpload).To(BeTrue()) - m, err = jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache + m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) s = shareBykey(&collaboration.ShareKey{ @@ -748,7 +748,7 @@ var _ = Describe("Jsoncs3", func() { }) It("syncronizes the user received cache before listing", func() { - m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}) @@ -816,7 +816,7 @@ var _ = Describe("Jsoncs3", func() { }) It("syncronizes the group received cache before listing", func() { - m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}) @@ -860,7 +860,7 @@ var _ = Describe("Jsoncs3", func() { }) It("syncs the cache", func() { - m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ @@ -894,7 +894,7 @@ var _ = Describe("Jsoncs3", func() { }) It("syncs the cache", func() { - m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ @@ -1017,7 +1017,7 @@ var _ = Describe("Jsoncs3", func() { Expect(err).ToNot(HaveOccurred()) Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) - m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache + m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache Expect(err).ToNot(HaveOccurred()) rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{