Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concurrently invalidate mtime cache in jsoncs3 share manager #3933

Merged
merged 2 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/unreleased/jsoncs3-concurrency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Bugfix: concurrently invalidate mtime cache in jsoncs3 share manager

https://github.com/cs3org/reva/pull/3933
130 changes: 93 additions & 37 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"

gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
Expand Down Expand Up @@ -115,6 +116,7 @@ func init() {

type config struct {
GatewayAddr string `mapstructure:"gateway_addr"`
MaxConcurrency int `mapstructure:"max_concurrency"`
ProviderAddr string `mapstructure:"provider_addr"`
ServiceUserID string `mapstructure:"service_user_id"`
ServiceUserIdp string `mapstructure:"service_user_idp"`
Expand Down Expand Up @@ -145,6 +147,8 @@ type Manager struct {

initialized bool

MaxConcurrency int

gateway gatewayv1beta1.GatewayAPIClient
eventStream events.Stream
}
Expand Down Expand Up @@ -205,11 +209,11 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
}
}

return New(s, gc, c.CacheTTL, es)
return New(s, gc, c.CacheTTL, es, c.MaxConcurrency)
}

// New returns a new manager instance.
func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, es events.Stream) (*Manager, error) {
func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) {
ttl := time.Duration(ttlSeconds) * time.Second
return &Manager{
Cache: providercache.New(s, ttl),
Expand All @@ -219,6 +223,7 @@ func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int,
storage: s,
gateway: gc,
eventStream: es,
MaxConcurrency: maxconcurrency,
}, nil
}

Expand Down Expand Up @@ -703,7 +708,6 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
m.Lock()
defer m.Unlock()

var rss []*collaboration.ReceivedShare
user := ctxpkg.ContextMustGetUser(ctx)

ssids := map[string]*receivedsharecache.Space{}
Expand Down Expand Up @@ -750,46 +754,98 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
}
}

for ssid, rspace := range ssids {
storageID, spaceID, _ := shareid.Decode(ssid)
err := m.Cache.Sync(ctx, storageID, spaceID)
if err != nil {
continue
}
for shareID, state := range rspace.States {
s := m.Cache.Get(storageID, spaceID, shareID)
if s == nil {
continue
numWorkers := m.MaxConcurrency
if numWorkers == 0 || len(ssids) < numWorkers {
numWorkers = len(ssids)
}

type w struct {
ssid string
rspace *receivedsharecache.Space
}
work := make(chan w)
results := make(chan *collaboration.ReceivedShare)

g, ctx := errgroup.WithContext(ctx)

// Distribute work
g.Go(func() error {
defer close(work)
for ssid, rspace := range ssids {
select {
case work <- w{ssid, rspace}:
case <-ctx.Done():
return ctx.Err()
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
return nil
})

// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for w := range work {
storageID, spaceID, _ := shareid.Decode(w.ssid)
err := m.Cache.Sync(ctx, storageID, spaceID)
if err != nil {
continue
}
continue
}
for shareID, state := range w.rspace.States {
s := m.Cache.Get(storageID, spaceID, shareID)
if s == nil {
continue
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
}

if share.IsGrantedToUser(s, user) {
if share.MatchesFiltersWithState(s, state.State, filters) {
rs := &collaboration.ReceivedShare{
Share: s,
State: state.State,
MountPoint: state.MountPoint,
if share.IsGrantedToUser(s, user) {
if share.MatchesFiltersWithState(s, state.State, filters) {
rs := &collaboration.ReceivedShare{
Share: s,
State: state.State,
MountPoint: state.MountPoint,
}
select {
case results <- rs:
case <-ctx.Done():
return ctx.Err()
}
}
}
rss = append(rss, rs)
}
}
}
return nil
})
}

// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()

rss := []*collaboration.ReceivedShare{}
for n := range results {
rss = append(rss, n)
}

if err := g.Wait(); err != nil {
return nil, err
}

return rss, nil
Expand Down
20 changes: 10 additions & 10 deletions pkg/share/manager/jsoncs3/jsoncs3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ var _ = Describe("Jsoncs3", func() {
Expect(err).ToNot(HaveOccurred())

client = &mocks.GatewayAPIClient{}
m, err = jsoncs3.New(storage, client, 0, nil)
m, err = jsoncs3.New(storage, client, 0, nil, 0)
Expect(err).ToNot(HaveOccurred())
})

Expand Down Expand Up @@ -250,7 +250,7 @@ var _ = Describe("Jsoncs3", func() {
})
Expect(s).ToNot(BeNil())

m, err = jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

s = shareBykey(&collaboration.ShareKey{
Expand Down Expand Up @@ -444,7 +444,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("loads the cache when it doesn't have an entry", func() {
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

s, err := m.GetShare(ctx, shareRef)
Expand Down Expand Up @@ -504,7 +504,7 @@ var _ = Describe("Jsoncs3", func() {
})
Expect(err).ToNot(HaveOccurred())

m, err = jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

s, err := m.GetShare(ctx, &collaboration.ShareReference{
Expand Down Expand Up @@ -617,7 +617,7 @@ var _ = Describe("Jsoncs3", func() {
Expect(us).ToNot(BeNil())
Expect(us.GetPermissions().GetPermissions().InitiateFileUpload).To(BeTrue())

m, err = jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err = jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

s = shareBykey(&collaboration.ShareKey{
Expand Down Expand Up @@ -748,7 +748,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("syncronizes the user received cache before listing", func() {
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{})
Expand Down Expand Up @@ -816,7 +816,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("syncronizes the group received cache before listing", func() {
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{})
Expand Down Expand Up @@ -860,7 +860,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("syncs the cache", func() {
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{
Expand Down Expand Up @@ -894,7 +894,7 @@ var _ = Describe("Jsoncs3", func() {
})

It("syncs the cache", func() {
m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{
Expand Down Expand Up @@ -1017,7 +1017,7 @@ var _ = Describe("Jsoncs3", func() {
Expect(err).ToNot(HaveOccurred())
Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED))

m, err := jsoncs3.New(storage, nil, 0, nil) // Reset in-memory cache
m, err := jsoncs3.New(storage, nil, 0, nil, 0) // Reset in-memory cache
Expect(err).ToNot(HaveOccurred())

rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{
Expand Down