Skip to content

Commit

Permalink
Merge pull request #3994 from aduffeck/load-spaces-concurrently
Browse files Browse the repository at this point in the history
Load spaces concurrently
  • Loading branch information
aduffeck authored Jun 19, 2023
2 parents 9bd3d73 + 9b6d7e4 commit a74db9d
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Load matching spaces concurrently

Matching spaces in a ListStorageSpace call are now loaded concurrently which reduces the response time.

https://github.com/cs3org/reva/pull/3994
142 changes: 92 additions & 50 deletions pkg/storage/utils/decomposedfs/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"

userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -429,66 +431,106 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
// But what about sharding nodes by space?
// an efficient lookup would be possible if we received a spaceid&opaqueid in the request
// the personal spaces must also use the nodeid and not the name
numShares := atomic.Int64{}
errg, ctx := errgroup.WithContext(ctx)
work := make(chan string, len(matches))
results := make(chan *provider.StorageSpace, len(matches))

// Distribute work
errg.Go(func() error {
defer close(work)
for match := range matches {
select {
case work <- match:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})

numShares := 0
// Spawn workers that'll concurrently work the queue
numWorkers := 20
if len(matches) < numWorkers {
numWorkers = len(matches)
}
for i := 0; i < numWorkers; i++ {
errg.Go(func() error {
for match := range work {
var err error
// TODO introduce metadata.IsLockFile(path)
// do not investigate flock files any further. They indicate file locks but are not relevant here.
if strings.HasSuffix(match, filelocks.LockFileSuffix) {
continue
}
// skip metadata files
if fs.lu.MetadataBackend().IsMetaFile(match) {
continue
}
// always read link in case storage space id != node id
spaceID, nodeID, err = ReadSpaceAndNodeFromIndexLink(match)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("match", match).Msg("could not read link, skipping")
continue
}

for match := range matches {
var err error
// TODO introduce metadata.IsLockFile(path)
// do not investigate flock files any further. They indicate file locks but are not relevant here.
if strings.HasSuffix(match, filelocks.LockFileSuffix) {
continue
}
// skip metadata files
if fs.lu.MetadataBackend().IsMetaFile(match) {
continue
}
// always read link in case storage space id != node id
spaceID, nodeID, err = ReadSpaceAndNodeFromIndexLink(match)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("match", match).Msg("could not read link, skipping")
continue
}
n, err := node.ReadNode(ctx, fs.lu, spaceID, nodeID, true, nil, true)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("id", nodeID).Msg("could not read node, skipping")
continue
}

n, err := node.ReadNode(ctx, fs.lu, spaceID, nodeID, true, nil, true)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("id", nodeID).Msg("could not read node, skipping")
continue
}
if !n.Exists {
continue
}

if !n.Exists {
continue
}
space, err := fs.storageSpaceFromNode(ctx, n, checkNodePermissions)
if err != nil {
switch err.(type) {
case errtypes.IsPermissionDenied:
// ok
case errtypes.NotFound:
// ok
default:
appctx.GetLogger(ctx).Error().Err(err).Str("id", nodeID).Msg("could not convert to storage space")
}
continue
}

space, err := fs.storageSpaceFromNode(ctx, n, checkNodePermissions)
if err != nil {
switch err.(type) {
case errtypes.IsPermissionDenied:
// ok
case errtypes.NotFound:
// ok
default:
appctx.GetLogger(ctx).Error().Err(err).Str("id", nodeID).Msg("could not convert to storage space")
// FIXME type share evolved to grant on the edge branch ... make it configurable if the driver should support them or not for now ... ignore type share
if space.SpaceType == spaceTypeShare {
numShares.Add(1)
// do not list shares as spaces for the owner
continue
}

// TODO apply more filters
_, ok1 := spaceTypes[spaceTypeAny]
_, ok2 := spaceTypes[space.SpaceType]
if ok1 || ok2 {
select {
case results <- space:
case <-ctx.Done():
return ctx.Err()
}
}
}
continue
}
return nil
})
}

// FIXME type share evolved to grant on the edge branch ... make it configurable if the driver should support them or not for now ... ignore type share
if space.SpaceType == spaceTypeShare {
numShares++
// do not list shares as spaces for the owner
continue
}
// Wait for things to settle down, then close results chan
go func() {
_ = errg.Wait() // error is checked later
close(results)
}()

// TODO apply more filters
_, ok1 := spaceTypes[spaceTypeAny]
_, ok2 := spaceTypes[space.SpaceType]
if ok1 || ok2 {
spaces = append(spaces, space)
}
for r := range results {
spaces = append(spaces, r)
}

// if there are no matches (or they happened to be spaces for the owner) and the node is a child return a space
if len(matches) <= numShares && nodeID != spaceID {
if int64(len(matches)) <= numShares.Load() && nodeID != spaceID {
// try node id
n, err := node.ReadNode(ctx, fs.lu, spaceID, nodeID, true, nil, false) // permission to read disabled space is checked in storageSpaceFromNode
if err != nil {
Expand Down

0 comments on commit a74db9d

Please sign in to comment.