From fced7bdf510e96459f6b20523681ab2a566ce2b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 17 Nov 2022 13:16:24 +0000 Subject: [PATCH 01/24] rewrite finish upload to get atomic size diff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/upload.go | 216 +++++++++++++++-------- 1 file changed, 145 insertions(+), 71 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index d45c573586..75d9fc7a9d 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -48,11 +48,11 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" "github.com/pkg/errors" - "github.com/rs/zerolog" ) var defaultFilePerm = os.FileMode(0664) @@ -566,6 +566,20 @@ func (upload *fileUpload) writeInfo() error { } // FinishUpload finishes an upload and moves the file to the internal destination +// +// # upload steps +// check if match header to fail early +// copy blob +// lock metadata node +// check if match header again as safeguard +// read metadata +// create version node with current metadata +// update node metadata with new blobid etc +// remember size diff +// unlock metadata +// propagate size diff and new etag +// - propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant +// - propagation needs to propagate the diff func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // ensure cleanup @@ -599,18 +613,18 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { } overwrite := n.ID != "" - var oldSize uint64 + var oldSize int64 if overwrite { // read size from existing node old, _ := node.ReadNode(ctx, upload.fs.lu, spaceID, n.ID, false) - oldSize = uint64(old.Blobsize) + oldSize = old.Blobsize } else { // create new fileid n.ID = uuid.New().String() upload.info.Storage["NodeId"] = n.ID } - if _, err = node.CheckQuota(n.SpaceRoot, overwrite, oldSize, uint64(fi.Size())); err != nil { + if _, err = node.CheckQuota(n.SpaceRoot, overwrite, uint64(oldSize), uint64(fi.Size())); err != nil { return err } @@ -618,6 +632,8 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { sublog := appctx.GetLogger(upload.ctx). With(). Interface("info", upload.info). + Str("spaceid", spaceID). + Str("nodeid", n.ID). Str("binPath", upload.binPath). Str("targetPath", targetPath). Logger() @@ -669,8 +685,9 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // defer writing the checksums until the node is in place - // if target exists create new version - versionsPath := "" + // upload steps + // check if match header to fail early + if fi, err = os.Stat(targetPath); err == nil { // When the if-match header was set we need to check if the // etag still matches before finishing the upload. @@ -684,24 +701,10 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { return errtypes.Aborted("etag mismatch") } } - - // FIXME move versioning to blobs ... no need to copy all the metadata! well ... it does if we want to version metadata... - // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries - versionsPath = upload.fs.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) - - // This move drops all metadata!!! We copy it below with CopyMetadata - // FIXME the node must remain the same. otherwise we might restore share metadata - if err = os.Rename(targetPath, versionsPath); err != nil { - sublog.Err(err). - Str("binPath", upload.binPath). - Str("versionsPath", versionsPath). - Msg("Decomposedfs: could not create version") - return - } - } - // upload the data to the blobstore + // copy blob + file, err := os.Open(upload.binPath) if err != nil { return err @@ -712,25 +715,72 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { return errors.Wrap(err, "failed to upload file to blostore") } - // now truncate the upload (the payload stays in the blobstore) and move it to the target path - // TODO put uploads on the same underlying storage as the destination dir? - // TODO trigger a workflow as the final rename might eg involve antivirus scanning - if err = os.Truncate(upload.binPath, 0); err != nil { - sublog.Err(err). - Msg("Decomposedfs: could not truncate") - return + // prepare discarding the blob if something changed while writing it + discardBlob := func() { + err = upload.fs.tp.DeleteBlob(n) + if err != nil { + sublog.Err(err).Str("blobid", n.BlobID).Msg("Decomposedfs: failed to discard blob in blostore") + } } - if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil { - sublog.Warn().Err(err).Msg("Decomposedfs: could not create node dir, trying to write file anyway") + + // lock metadata node + lock, err := filelocks.AcquireWriteLock(targetPath) + if err != nil { + return errtypes.InternalError(err.Error()) } - if err = os.Rename(upload.binPath, targetPath); err != nil { - sublog.Error().Err(err).Msg("Decomposedfs: could not rename") - return + + // check if match header again as safequard + versionsPath := "" + if fi, err = os.Stat(targetPath); err == nil { + // When the if-match header was set we need to check if the + // etag still matches before finishing the upload. + if ifMatch, ok := upload.info.MetaData["if-match"]; ok { + var targetEtag string + targetEtag, err = node.CalculateEtag(n.ID, fi.ModTime()) + if err != nil { + discardBlob() + return errtypes.InternalError(err.Error()) + } + if ifMatch != targetEtag { + discardBlob() + return errtypes.Aborted("etag mismatch") + } + } + + // FIXME move versioning to blobs ... no need to copy all the metadata! well ... it does if we want to version metadata... + // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries + versionsPath = upload.fs.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) } + + // read metadata + + // attributes that will change + attrs := map[string]string{ + xattrs.BlobIDAttr: n.BlobID, + xattrs.BlobsizeAttr: strconv.FormatInt(n.Blobsize, 10), + + // update checksums + xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)), + xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)), + xattrs.ChecksumPrefix + "shadler32a1": string(adler32h.Sum(nil)), + } + + // create version node with current metadata + + // if file already exists if versionsPath != "" { - // copy grant and arbitrary metadata + // touch version node + file, err := os.Create(versionsPath) + if err != nil { + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("could not create version") + return errtypes.InternalError("could not create version") + } + file.Close() + + // copy grant and arbitrary metadata to version node // FIXME ... now restoring an older revision might bring back a grant that was removed! - err = xattrs.CopyMetadata(versionsPath, targetPath, func(attributeName string) bool { + err = xattrs.CopyMetadataWithSourceLock(targetPath, versionsPath, func(attributeName string) bool { return true // TODO determine all attributes that must be copied, currently we just copy all and overwrite changed properties /* @@ -739,23 +789,68 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { strings.HasPrefix(attributeName, xattrs.FavPrefix) || // for favorites strings.HasPrefix(attributeName, xattrs.SpaceNameAttr) || // for a shared file */ - }) + }, lock) if err != nil { - sublog.Info().Err(err).Msg("Decomposedfs: failed to copy xattrs") + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("failed to copy xattrs to version node") + return errtypes.InternalError("failed to copy xattrs to version node") } - } - // now try write all checksums - tryWritingChecksum(&sublog, n, "sha1", sha1h) - tryWritingChecksum(&sublog, n, "md5", md5h) - tryWritingChecksum(&sublog, n, "adler32", adler32h) + // we MUST bypass any cache here as we have to calculate the size diff atomically + oldSize, err = node.ReadBlobSizeAttr(targetPath) + if err != nil { + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("failed to copy xattrs to version node") + return errtypes.InternalError("failed to copy xattrs to version node") + } + + } else { + // create dir to node + if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil { + discardBlob() + sublog.Err(err).Msg("could not create node dir") + return errtypes.InternalError("could not create node dir") + } + // touch metadata node + file, err := os.Create(targetPath) + if err != nil { + discardBlob() + sublog.Err(err).Msg("could not create metadata node") + return errtypes.InternalError("could not create version") + } + file.Close() + + // basic node metadata + attrs[xattrs.ParentidAttr] = n.ParentID + attrs[xattrs.NameAttr] = n.Name + oldSize = 0 + } - // who will become the owner? the owner of the parent actually ... not the currently logged in user - err = n.WriteAllNodeMetadata() + // update node metadata with new blobid etc + err = n.SetXattrsWithLock(attrs, lock) if err != nil { + discardBlob() return errors.Wrap(err, "Decomposedfs: could not write metadata") } + // use set arbitrary metadata? + if upload.info.MetaData["mtime"] != "" { + err := n.SetMtime(ctx, upload.info.MetaData["mtime"]) + if err != nil { + sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not set mtime metadata") + return err + } + } + + // remember size diff + //sizeDiff := oldSize - n.Blobsize + + // unlock metadata + err = filelocks.ReleaseLock(lock) + if err != nil { + return errtypes.InternalError(err.Error()) + } + // link child name to parent if it is new childNameLink := filepath.Join(n.ParentInternalPath(), n.Name) var link string @@ -778,23 +873,6 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { } } - // only delete the upload if it was successfully written to the storage - if err = os.Remove(upload.infoPath); err != nil { - if !errors.Is(err, iofs.ErrNotExist) { - sublog.Err(err).Msg("Decomposedfs: could not delete upload info") - return - } - } - // use set arbitrary metadata? - if upload.info.MetaData["mtime"] != "" { - err := n.SetMtime(ctx, upload.info.MetaData["mtime"]) - if err != nil { - sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not set mtime metadata") - return err - } - - } - // fill metadata with current mtime if fi, err = os.Stat(targetPath); err == nil { upload.info.MetaData["mtime"] = fmt.Sprintf("%d.%d", fi.ModTime().Unix(), fi.ModTime().Nanosecond()) @@ -803,6 +881,11 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { n.Exists = true + // propagate size diff and new etag + // propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant + // propagation needs to propagate the diff + + //return upload.fs.tp.Propagate(upload.ctx, n, sizeDiff) return upload.fs.tp.Propagate(upload.ctx, n) } @@ -813,15 +896,6 @@ func (upload *fileUpload) checkHash(expected string, h hash.Hash) error { } return nil } -func tryWritingChecksum(log *zerolog.Logger, n *node.Node, algo string, h hash.Hash) { - if err := n.SetChecksum(algo, h); err != nil { - log.Err(err). - Str("csType", algo). - Bytes("hash", h.Sum(nil)). - Msg("Decomposedfs: could not write checksum") - // this is not critical, the bytes are there so we will continue - } -} func (upload *fileUpload) discardChunk() { if err := os.Remove(upload.binPath); err != nil { From c58f166ded316012a2987faf2ce0b573836f4558 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 17 Nov 2022 13:20:28 +0000 Subject: [PATCH 02/24] decomposedfs: make finish upload atomic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- changelog/unreleased/decomposedfs-finish-upload-rewrite.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/unreleased/decomposedfs-finish-upload-rewrite.md diff --git a/changelog/unreleased/decomposedfs-finish-upload-rewrite.md b/changelog/unreleased/decomposedfs-finish-upload-rewrite.md new file mode 100644 index 0000000000..8c3607ba78 --- /dev/null +++ b/changelog/unreleased/decomposedfs-finish-upload-rewrite.md @@ -0,0 +1,5 @@ +Bugfix: decomposedfs make finish upload atomic + +We rewrote the finish upload code to use a write lock when creating and updating node metadata. This prevents some cornercases and allows us to calculate the size diff atomically. + +https://github.com/cs3org/reva/pull/3473 \ No newline at end of file From 150fe7d7f5cb91c2275c307b4e9a7b8dc72c9b08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 17 Nov 2022 13:26:34 +0000 Subject: [PATCH 03/24] add lock functions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/node/node.go | 9 +++++ .../utils/decomposedfs/node/node_test.go | 4 +-- pkg/storage/utils/decomposedfs/node/xattrs.go | 13 +++++++ .../utils/decomposedfs/xattrs/xattrs.go | 35 ++++++++++++++++--- 4 files changed, 55 insertions(+), 6 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 04acdf444f..2606b17b09 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -929,6 +929,15 @@ func (n *Node) SetTreeSize(ts uint64) (err error) { return n.SetXattr(xattrs.TreesizeAttr, strconv.FormatUint(ts, 10)) } +// GetBlobSize reads the blobsize from the extended attributes +func (n *Node) GetBlobSize() (treesize uint64, err error) { + var b string + if b, err = n.Xattr(xattrs.TreesizeAttr); err != nil { + return + } + return strconv.ParseUint(b, 10, 64) +} + // SetChecksum writes the checksum with the given checksum type to the extended attributes func (n *Node) SetChecksum(csType string, h hash.Hash) (err error) { return n.SetXattr(xattrs.ChecksumPrefix+csType, string(h.Sum(nil))) diff --git a/pkg/storage/utils/decomposedfs/node/node_test.go b/pkg/storage/utils/decomposedfs/node/node_test.go index a8c192936f..ddd4074c4a 100644 --- a/pkg/storage/utils/decomposedfs/node/node_test.go +++ b/pkg/storage/utils/decomposedfs/node/node_test.go @@ -89,10 +89,10 @@ var _ = Describe("Node", func() { n, err := env.Lookup.NodeFromResource(env.Ctx, ref) Expect(err).ToNot(HaveOccurred()) - blobsize := 239485734 + blobsize := int64(239485734) n.Name = "TestName" n.BlobID = "TestBlobID" - n.Blobsize = int64(blobsize) + n.Blobsize = blobsize err = n.WriteAllNodeMetadata() Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/storage/utils/decomposedfs/node/xattrs.go b/pkg/storage/utils/decomposedfs/node/xattrs.go index be65e0d177..55b9c124b0 100644 --- a/pkg/storage/utils/decomposedfs/node/xattrs.go +++ b/pkg/storage/utils/decomposedfs/node/xattrs.go @@ -20,6 +20,7 @@ package node import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/gofrs/flock" "github.com/pkg/xattr" ) @@ -34,6 +35,18 @@ func (n *Node) SetXattrs(attribs map[string]string) (err error) { return xattrs.SetMultiple(n.InternalPath(), attribs) } +// SetXattrsWithLock sets multiple extended attributes on the write-through cache/node with a given lock +func (n *Node) SetXattrsWithLock(attribs map[string]string, fileLock *flock.Flock) (err error) { + // TODO what if writing the lock fails? + if n.xattrsCache != nil { + for k, v := range attribs { + n.xattrsCache[k] = v + } + } + + return xattrs.SetMultipleWithLock(n.InternalPath(), attribs, fileLock) +} + // SetXattr sets an extended attribute on the write-through cache/node func (n *Node) SetXattr(key, val string) (err error) { if n.xattrsCache != nil { diff --git a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go index 732e8e880c..7915af4cc0 100644 --- a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go +++ b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go @@ -142,14 +142,24 @@ func CopyMetadata(src, target string, filter func(attributeName string) bool) (e } }() - // now try to get a shared lock on the source - readLock, err = filelocks.AcquireReadLock(src) + return CopyMetadataWithSourceLock(src, target, filter, readLock) +} + +// CopyMetadataWithSourceLock copies all extended attributes from source to target. +// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix +// For the source file, a shared lock is acquired. For the target, an exclusive +// write lock is acquired. +func CopyMetadataWithSourceLock(src, target string, filter func(attributeName string) bool, readLock *flock.Flock) (err error) { + var writeLock *flock.Flock + + // Acquire the write log on the target node first. + writeLock, err = filelocks.AcquireWriteLock(target) if err != nil { - return errors.Wrap(err, "xattrs: Unable to lock file for read") + return errors.Wrap(err, "xattrs: Unable to lock target to write") } defer func() { - rerr := filelocks.ReleaseLock(readLock) + rerr := filelocks.ReleaseLock(writeLock) // if err is non nil we do not overwrite that if err == nil { @@ -246,6 +256,23 @@ func SetMultiple(filePath string, attribs map[string]string) (err error) { } }() + return SetMultipleWithLock(filePath, attribs, fileLock) +} + +// SetMultiple allows setting multiple key value pairs at once +// the changes are protected with an file lock +// If the file lock can not be acquired the function returns a +// lock error. +func SetMultipleWithLock(filePath string, attribs map[string]string, fileLock *flock.Flock) (err error) { + switch { + case fileLock != nil: + return errors.Wrap(err, "no lock provided") + case fileLock.Path() != filePath+".lock": + return errors.Wrap(err, "lockpath does not match filepath") + case !fileLock.Locked(): + return errors.Wrap(err, "not locked provided") + } + // error handling: Count if there are errors while setting the attribs. // if there were any, return an error. var ( From 4dae4b6fb09bb240d23fbb141a78fdcbe6219483 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 17 Nov 2022 15:27:33 +0000 Subject: [PATCH 04/24] allow locking non existing files, fix locking with existing lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/tree/tree.go | 6 ++--- pkg/storage/utils/decomposedfs/upload.go | 19 ++++++++------- .../utils/decomposedfs/xattrs/xattrs.go | 24 ++++++++++++------- pkg/storage/utils/filelocks/filelocks.go | 4 ---- 4 files changed, 28 insertions(+), 25 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index e1e458832d..38f0c23192 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -692,7 +692,7 @@ func (t *Tree) removeNode(path string, n *node.Node) error { // Propagate propagates changes to the root of the tree func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) { - sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() if !t.treeTimeAccounting && !t.treeSizeAccounting { // no propagation enabled sublog.Debug().Msg("propagation disabled") @@ -713,7 +713,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) { break } - sublog = sublog.With().Interface("node", n).Logger() + sublog = sublog.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() // TODO none, sync and async? if !n.HasPropagation() { @@ -757,7 +757,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) { } } - if err := n.UnsetTempEtag(); err != nil { + if err := n.UnsetTempEtag(); !xattrs.IsAttrUnset(err) { sublog.Error().Err(err).Msg("could not remove temporary etag attribute") } } diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 75d9fc7a9d..b8b84f1ea3 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -701,6 +701,12 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { return errtypes.Aborted("etag mismatch") } } + } else { + // create dir to node + if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil { + sublog.Err(err).Msg("could not create node dir") + return errtypes.InternalError("could not create node dir") + } } // copy blob @@ -726,10 +732,11 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // lock metadata node lock, err := filelocks.AcquireWriteLock(targetPath) if err != nil { + discardBlob() return errtypes.InternalError(err.Error()) } - // check if match header again as safequard + // check if match header again as safeguard versionsPath := "" if fi, err = os.Stat(targetPath); err == nil { // When the if-match header was set we need to check if the @@ -805,12 +812,6 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { } } else { - // create dir to node - if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil { - discardBlob() - sublog.Err(err).Msg("could not create node dir") - return errtypes.InternalError("could not create node dir") - } // touch metadata node file, err := os.Create(targetPath) if err != nil { @@ -855,7 +856,8 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { childNameLink := filepath.Join(n.ParentInternalPath(), n.Name) var link string link, err = os.Readlink(childNameLink) - if err == nil && link != "../"+n.ID { + relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) + if err == nil && link != relativeNodePath { sublog.Err(err). Interface("node", n). Str("childNameLink", childNameLink). @@ -867,7 +869,6 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { } } if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID { - relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) if err = os.Symlink(relativeNodePath, childNameLink); err != nil { return errors.Wrap(err, "Decomposedfs: could not symlink child entry") } diff --git a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go index 7915af4cc0..74e622e77f 100644 --- a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go +++ b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go @@ -150,6 +150,15 @@ func CopyMetadata(src, target string, filter func(attributeName string) bool) (e // For the source file, a shared lock is acquired. For the target, an exclusive // write lock is acquired. func CopyMetadataWithSourceLock(src, target string, filter func(attributeName string) bool, readLock *flock.Flock) (err error) { + switch { + case readLock == nil: + return errors.New("no lock provided") + case readLock.Path() != src+".flock": + return errors.New("lockpath does not match filepath") + case !readLock.Locked(): + return errors.New("not locked") + } + var writeLock *flock.Flock // Acquire the write log on the target node first. @@ -259,18 +268,15 @@ func SetMultiple(filePath string, attribs map[string]string) (err error) { return SetMultipleWithLock(filePath, attribs, fileLock) } -// SetMultiple allows setting multiple key value pairs at once -// the changes are protected with an file lock -// If the file lock can not be acquired the function returns a -// lock error. +// SetMultipleWithLock allows setting multiple key value pairs at once with an existing lock func SetMultipleWithLock(filePath string, attribs map[string]string, fileLock *flock.Flock) (err error) { switch { - case fileLock != nil: - return errors.Wrap(err, "no lock provided") - case fileLock.Path() != filePath+".lock": - return errors.Wrap(err, "lockpath does not match filepath") + case fileLock == nil: + return errors.New("no lock provided") + case fileLock.Path() != filePath+".flock": + return errors.New("lockpath does not match filepath") case !fileLock.Locked(): - return errors.Wrap(err, "not locked provided") + return errors.New("not locked") } // error handling: Count if there are errors while setting the attribs. diff --git a/pkg/storage/utils/filelocks/filelocks.go b/pkg/storage/utils/filelocks/filelocks.go index f338f881f5..9911f0d6fa 100644 --- a/pkg/storage/utils/filelocks/filelocks.go +++ b/pkg/storage/utils/filelocks/filelocks.go @@ -85,10 +85,6 @@ func acquireLock(file string, write bool) (*flock.Flock, error) { return nil, ErrPathEmpty } - if _, err = os.Stat(file); err != nil { - return nil, err - } - var flock *flock.Flock for i := 1; i <= _lockCyclesValue; i++ { if flock = getMutexedFlock(n); flock != nil { From 3d0977a14d0dbc1a9400641741f1ef71c58cd393 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 17 Nov 2022 15:57:08 +0000 Subject: [PATCH 05/24] make returned error recognizable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/node/xattrs.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/storage/utils/decomposedfs/node/xattrs.go b/pkg/storage/utils/decomposedfs/node/xattrs.go index 55b9c124b0..538fc9da87 100644 --- a/pkg/storage/utils/decomposedfs/node/xattrs.go +++ b/pkg/storage/utils/decomposedfs/node/xattrs.go @@ -93,5 +93,6 @@ func (n *Node) Xattr(key string) (string, error) { if val, ok := n.xattrsCache[key]; ok { return val, nil } - return "", xattr.ENOATTR + // wrap the error as xattr does + return "", &xattr.Error{Op: "xattr.get", Path: n.InternalPath(), Name: key, Err: xattr.ENOATTR} } From 11fc7c4727fde0b1a240fd3c2916b6891e2fe9b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 17 Nov 2022 15:57:26 +0000 Subject: [PATCH 06/24] more lock fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/upload.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index b8b84f1ea3..69358a7082 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -854,9 +854,9 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // link child name to parent if it is new childNameLink := filepath.Join(n.ParentInternalPath(), n.Name) + relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) var link string link, err = os.Readlink(childNameLink) - relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) if err == nil && link != relativeNodePath { sublog.Err(err). Interface("node", n). @@ -868,7 +868,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { return errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") } } - if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID { + if errors.Is(err, iofs.ErrNotExist) || link != relativeNodePath { if err = os.Symlink(relativeNodePath, childNameLink); err != nil { return errors.Wrap(err, "Decomposedfs: could not symlink child entry") } From 47f29a145a230d1063c80d50cc90d48abd8c9385 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 10:01:42 +0100 Subject: [PATCH 07/24] do not log nil error Co-authored-by: Andre Duffeck --- pkg/storage/utils/decomposedfs/tree/tree.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 38f0c23192..37fd1071ec 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -757,7 +757,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) { } } - if err := n.UnsetTempEtag(); !xattrs.IsAttrUnset(err) { + if err := n.UnsetTempEtag(); err != nil && !xattrs.IsAttrUnset(err) { sublog.Error().Err(err).Msg("could not remove temporary etag attribute") } } From bf30d98f28ffdb5f5f5c9895d99e091083f43300 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 09:04:54 +0000 Subject: [PATCH 08/24] don't overwrite original error when deleting the blob fails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/upload.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 69358a7082..2b5937d8a5 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -723,8 +723,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // prepare discarding the blob if something changed while writing it discardBlob := func() { - err = upload.fs.tp.DeleteBlob(n) - if err != nil { + if err := upload.fs.tp.DeleteBlob(n); err != nil { sublog.Err(err).Str("blobid", n.BlobID).Msg("Decomposedfs: failed to discard blob in blostore") } } From 786d0b31b4cabc325e901ea7ede1508c05561386 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 09:21:49 +0000 Subject: [PATCH 09/24] always release node lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/upload.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 2b5937d8a5..49243854d8 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -734,6 +734,13 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { discardBlob() return errtypes.InternalError(err.Error()) } + releaseLock := func() { + // ReleaseLock returns nil if already unlocked + if err := filelocks.ReleaseLock(lock); err != nil { + sublog.Err(err).Msg("Decomposedfs:could not unlock node") + } + } + defer releaseLock() // check if match header again as safeguard versionsPath := "" @@ -843,7 +850,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { } // remember size diff - //sizeDiff := oldSize - n.Blobsize + sizeDiff := oldSize - n.Blobsize // unlock metadata err = filelocks.ReleaseLock(lock) @@ -886,6 +893,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // propagation needs to propagate the diff //return upload.fs.tp.Propagate(upload.ctx, n, sizeDiff) + sublog.Debug().Int64("sizediff", sizeDiff).Msg("Decomposedfs: propagating size diff") return upload.fs.tp.Propagate(upload.ctx, n) } From b955128c10065a21c97f1adc3f5bb18e5e2c7dde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 10:10:27 +0000 Subject: [PATCH 10/24] keep correct mtimes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/metadata.go | 6 +-- pkg/storage/utils/decomposedfs/node/node.go | 31 ++++++------- pkg/storage/utils/decomposedfs/upload.go | 48 +++++++++++++++------ 3 files changed, 51 insertions(+), 34 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/metadata.go b/pkg/storage/utils/decomposedfs/metadata.go index ed9122796c..0e5100e86c 100644 --- a/pkg/storage/utils/decomposedfs/metadata.go +++ b/pkg/storage/utils/decomposedfs/metadata.go @@ -73,8 +73,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. if md.Metadata != nil { if val, ok := md.Metadata["mtime"]; ok { delete(md.Metadata, "mtime") - err := n.SetMtime(ctx, val) - if err != nil { + if err := n.SetMtimeString(val); err != nil { errs = append(errs, errors.Wrap(err, "could not set mtime")) } } @@ -85,8 +84,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. // TODO unset when folder is updated or add timestamp to etag? if val, ok := md.Metadata["etag"]; ok { delete(md.Metadata, "etag") - err := n.SetEtag(ctx, val) - if err != nil { + if err := n.SetEtag(ctx, val); err != nil { errs = append(errs, errors.Wrap(err, "could not set etag")) } } diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 2606b17b09..affd0b8e1a 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -495,25 +495,20 @@ func calculateEtag(nodeID string, tmTime time.Time) (string, error) { return fmt.Sprintf(`"%x"`, h.Sum(nil)), nil } -// SetMtime sets the mtime and atime of a node -func (n *Node) SetMtime(ctx context.Context, mtime string) error { - sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() - if mt, err := parseMTime(mtime); err == nil { - nodePath := n.InternalPath() - // updating mtime also updates atime - if err := os.Chtimes(nodePath, mt, mt); err != nil { - sublog.Error().Err(err). - Time("mtime", mt). - Msg("could not set mtime") - return errors.Wrap(err, "could not set mtime") - } - } else { - sublog.Error().Err(err). - Str("mtime", mtime). - Msg("could not parse mtime") - return errors.Wrap(err, "could not parse mtime") +// SetMtimeString sets the mtime and atime of a node to the unixtime parsed from the given string +func (n *Node) SetMtimeString(mtime string) error { + mt, err := parseMTime(mtime) + if err != nil { + return err } - return nil + return n.SetMtime(mt) +} + +// SetMtime sets the mtime and atime of a node +func (n *Node) SetMtime(mtime time.Time) error { + nodePath := n.InternalPath() + // updating mtime also updates atime + return os.Chtimes(nodePath, mtime, mtime) } // SetEtag sets the temporary etag of a node if it differs from the current etag diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 49243854d8..744f477c56 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -743,6 +743,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { defer releaseLock() // check if match header again as safeguard + var oldMtime time.Time versionsPath := "" if fi, err = os.Stat(targetPath); err == nil { // When the if-match header was set we need to check if the @@ -763,6 +764,9 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // FIXME move versioning to blobs ... no need to copy all the metadata! well ... it does if we want to version metadata... // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries versionsPath = upload.fs.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) + + // remember mtime of existing file so we can apply it to the version + oldMtime = fi.ModTime() } // read metadata @@ -780,16 +784,24 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // create version node with current metadata + var newMtime time.Time // if file already exists if versionsPath != "" { // touch version node file, err := os.Create(versionsPath) if err != nil { discardBlob() - sublog.Err(err).Str("version", versionsPath).Msg("could not create version") - return errtypes.InternalError("could not create version") + sublog.Err(err).Str("version", versionsPath).Msg("could not create version node") + return errtypes.InternalError("could not create version node") } file.Close() + fi, err := file.Stat() + if err != nil { + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("could not stat version node") + return errtypes.InternalError("could not stat version node") + } + newMtime = fi.ModTime() // copy grant and arbitrary metadata to version node // FIXME ... now restoring an older revision might bring back a grant that was removed! @@ -809,21 +821,27 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { return errtypes.InternalError("failed to copy xattrs to version node") } + // keep mtime from previous version + if err := os.Chtimes(versionsPath, oldMtime, oldMtime); err != nil { + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("failed to change mtime of version node") + return errtypes.InternalError("failed to change mtime of version node") + } + // we MUST bypass any cache here as we have to calculate the size diff atomically oldSize, err = node.ReadBlobSizeAttr(targetPath) if err != nil { discardBlob() - sublog.Err(err).Str("version", versionsPath).Msg("failed to copy xattrs to version node") - return errtypes.InternalError("failed to copy xattrs to version node") + sublog.Err(err).Str("version", versionsPath).Msg("failed to read old blobsize") + return errtypes.InternalError("failed to read old blobsize") } - } else { // touch metadata node file, err := os.Create(targetPath) if err != nil { discardBlob() - sublog.Err(err).Msg("could not create metadata node") - return errtypes.InternalError("could not create version") + sublog.Err(err).Msg("could not create node") + return errtypes.InternalError("could not create node") } file.Close() @@ -840,11 +858,17 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { return errors.Wrap(err, "Decomposedfs: could not write metadata") } - // use set arbitrary metadata? - if upload.info.MetaData["mtime"] != "" { - err := n.SetMtime(ctx, upload.info.MetaData["mtime"]) - if err != nil { - sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not set mtime metadata") + // update mtime + switch { + case upload.info.MetaData["mtime"] != "": + if err := n.SetMtimeString(upload.info.MetaData["mtime"]); err != nil { + sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not apply mtime from metadata") + return err + } + case newMtime != time.Time{}: + // we are creating a version + if err := n.SetMtime(newMtime); err != nil { + sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not change mtime of node") return err } } From 3db92431bbdea207cfd41a09bab5c020aedfae62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 10:13:04 +0000 Subject: [PATCH 11/24] fix adler checksum MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/upload.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 744f477c56..c782801832 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -777,9 +777,9 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { xattrs.BlobsizeAttr: strconv.FormatInt(n.Blobsize, 10), // update checksums - xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)), - xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)), - xattrs.ChecksumPrefix + "shadler32a1": string(adler32h.Sum(nil)), + xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)), + xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)), + xattrs.ChecksumPrefix + "adler32": string(adler32h.Sum(nil)), } // create version node with current metadata From 38c1d93d66e4a30f9dd0f000c32422a7eee611e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 10:27:21 +0000 Subject: [PATCH 12/24] stat before closing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/upload.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index c782801832..d968897e2b 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -794,15 +794,17 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { sublog.Err(err).Str("version", versionsPath).Msg("could not create version node") return errtypes.InternalError("could not create version node") } - file.Close() fi, err := file.Stat() if err != nil { + file.Close() discardBlob() sublog.Err(err).Str("version", versionsPath).Msg("could not stat version node") return errtypes.InternalError("could not stat version node") } newMtime = fi.ModTime() + file.Close() + // FIXME the copying might not be necessary when we are leaving the node in place since we only need the blobsize and the blobid in the extended attributes for a revision // copy grant and arbitrary metadata to version node // FIXME ... now restoring an older revision might bring back a grant that was removed! err = xattrs.CopyMetadataWithSourceLock(targetPath, versionsPath, func(attributeName string) bool { From cb990cdb3cad58e957c9fb359c3def6b1b19b09f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 11:16:27 +0000 Subject: [PATCH 13/24] fix lint ... and proper revision download is not covered by the CS3 api MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/node/node_test.go | 2 +- pkg/storage/utils/decomposedfs/revisions.go | 3 +++ pkg/storage/utils/decomposedfs/upload.go | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/node/node_test.go b/pkg/storage/utils/decomposedfs/node/node_test.go index ddd4074c4a..c92ce5b785 100644 --- a/pkg/storage/utils/decomposedfs/node/node_test.go +++ b/pkg/storage/utils/decomposedfs/node/node_test.go @@ -100,7 +100,7 @@ var _ = Describe("Node", func() { Expect(err).ToNot(HaveOccurred()) Expect(n2.Name).To(Equal("TestName")) Expect(n2.BlobID).To(Equal("TestBlobID")) - Expect(n2.Blobsize).To(Equal(int64(blobsize))) + Expect(n2.Blobsize).To(Equal(blobsize)) }) }) diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index a9a7381c21..40ac8c5a07 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -135,6 +135,9 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe contentPath := fs.lu.InternalPath(spaceID, revisionKey) + // FIXME this will always be an empty file ... actuallyd DownloadRevision is never called ... the CS3 api has no way to initiate a file revision download + // it all just magically works by accident ... and only partly ... + // see also https://github.com/cs3org/reva/issues/1813 r, err := os.Open(contentPath) if err != nil { if errors.Is(err, iofs.ErrNotExist) { diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index d968897e2b..af89689ce5 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -918,7 +918,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant // propagation needs to propagate the diff - //return upload.fs.tp.Propagate(upload.ctx, n, sizeDiff) + // return upload.fs.tp.Propagate(upload.ctx, n, sizeDiff) sublog.Debug().Int64("sizediff", sizeDiff).Msg("Decomposedfs: propagating size diff") return upload.fs.tp.Propagate(upload.ctx, n) } From 57aaca2d81bfbaeba37f4c688aee09364e633764 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 11:42:37 +0000 Subject: [PATCH 14/24] fix permissions when downloading grants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../utils/decomposedfs/decomposedfs.go | 7 +++++++ pkg/storage/utils/decomposedfs/revisions.go | 21 ++++++++++--------- pkg/storage/utils/decomposedfs/upload.go | 15 ++++--------- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 776ceb5d49..843a0df575 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -30,6 +30,7 @@ import ( "path" "path/filepath" "strconv" + "strings" "syscall" cs3permissions "github.com/cs3org/go-cs3apis/cs3/permissions/v1beta1" @@ -591,6 +592,12 @@ func (fs *Decomposedfs) Delete(ctx context.Context, ref *provider.Reference) (er // Download returns a reader to the specified resource func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) { + // check if we are trying to download a revision + // TODO the CS3 api should allow initiating a revision download + if ref.ResourceId != nil && strings.Contains(ref.ResourceId.OpaqueId, node.RevisionIDDelimiter) { + return fs.DownloadRevision(ctx, ref, ref.ResourceId.OpaqueId) + } + node, err := fs.lu.NodeFromResource(ctx, ref) if err != nil { return nil, errors.Wrap(err, "Decomposedfs: error resolving ref") diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index 40ac8c5a07..7c6fcb9135 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -21,7 +21,6 @@ package decomposedfs import ( "context" "io" - iofs "io/fs" "os" "path/filepath" "strings" @@ -99,6 +98,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen } // DownloadRevision returns a reader for the specified revision +// FIXME the CS3 api should explicitly allow initiating revision and trash download, a related issue is https://github.com/cs3org/reva/issues/1813 func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Reference, revisionKey string) (io.ReadCloser, error) { log := appctx.GetLogger(ctx) @@ -135,17 +135,18 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe contentPath := fs.lu.InternalPath(spaceID, revisionKey) - // FIXME this will always be an empty file ... actuallyd DownloadRevision is never called ... the CS3 api has no way to initiate a file revision download - // it all just magically works by accident ... and only partly ... - // see also https://github.com/cs3org/reva/issues/1813 - r, err := os.Open(contentPath) + blobid, err := node.ReadBlobIDAttr(contentPath) if err != nil { - if errors.Is(err, iofs.ErrNotExist) { - return nil, errtypes.NotFound(contentPath) - } - return nil, errors.Wrap(err, "Decomposedfs: error opening revision "+revisionKey) + return nil, errors.Wrapf(err, "Decomposedfs: could not read blob id of revision '%s' for node '%s'", n.ID, revisionKey) + } + + revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid} + + reader, err := fs.tp.ReadBlob(&revisionNode) + if err != nil { + return nil, errors.Wrapf(err, "Decomposedfs: could not download blob of revision '%s' for node '%s'", n.ID, revisionKey) } - return r, nil + return reader, nil } // RestoreRevision restores the specified revision of the resource diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index af89689ce5..cd278c92dd 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -804,18 +804,11 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { newMtime = fi.ModTime() file.Close() - // FIXME the copying might not be necessary when we are leaving the node in place since we only need the blobsize and the blobid in the extended attributes for a revision - // copy grant and arbitrary metadata to version node - // FIXME ... now restoring an older revision might bring back a grant that was removed! + // copy blob metadata to version node err = xattrs.CopyMetadataWithSourceLock(targetPath, versionsPath, func(attributeName string) bool { - return true - // TODO determine all attributes that must be copied, currently we just copy all and overwrite changed properties - /* - return strings.HasPrefix(attributeName, xattrs.GrantPrefix) || // for grants - strings.HasPrefix(attributeName, xattrs.MetadataPrefix) || // for arbitrary metadata - strings.HasPrefix(attributeName, xattrs.FavPrefix) || // for favorites - strings.HasPrefix(attributeName, xattrs.SpaceNameAttr) || // for a shared file - */ + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || // for checksums + attributeName == xattrs.BlobIDAttr || + attributeName == xattrs.BlobsizeAttr }, lock) if err != nil { discardBlob() From 82419a356b8a74d2c845f4e735d140ca427db8b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 11:57:32 +0000 Subject: [PATCH 15/24] update changelog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- changelog/unreleased/decomposedfs-finish-upload-rewrite.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/changelog/unreleased/decomposedfs-finish-upload-rewrite.md b/changelog/unreleased/decomposedfs-finish-upload-rewrite.md index 8c3607ba78..73cfb8f306 100644 --- a/changelog/unreleased/decomposedfs-finish-upload-rewrite.md +++ b/changelog/unreleased/decomposedfs-finish-upload-rewrite.md @@ -1,5 +1,5 @@ -Bugfix: decomposedfs make finish upload atomic +Bugfix: decomposedfs fix revision download -We rewrote the finish upload code to use a write lock when creating and updating node metadata. This prevents some cornercases and allows us to calculate the size diff atomically. +We rewrote the finish upload code to use a write lock when creating and updating node metadata. This prevents some cornercases, allows us to calculate the size diff atomically and fixes downloading revisions. https://github.com/cs3org/reva/pull/3473 \ No newline at end of file From 0686a7cd6699fb594aed1f3afc6efb5b22182aba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 12:34:41 +0000 Subject: [PATCH 16/24] fix locks and revision restore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/revisions.go | 41 ++++++++++++++++--- pkg/storage/utils/decomposedfs/upload.go | 4 +- .../utils/decomposedfs/xattrs/xattrs.go | 14 +++---- 3 files changed, 44 insertions(+), 15 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index 7c6fcb9135..9036634490 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -30,6 +30,7 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/pkg/errors" ) @@ -82,7 +83,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen } blobSize, err := node.ReadBlobSizeAttr(items[i]) if err != nil { - return nil, errors.Wrapf(err, "error reading blobsize xattr") + appctx.GetLogger(ctx).Error().Err(err).Str("name", fi.Name()).Msg("error reading blobsize xattr, using 0") } rev.Size = uint64(blobSize) etag, err := node.CalculateEtag(np, mtime) @@ -198,17 +199,45 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries versionsPath := fs.lu.InternalPath(spaceID, kp[0]+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) - err = os.Rename(nodePath, versionsPath) + // touch version node + if file, err := os.Create(versionsPath); err != nil { + return err + } else if err := file.Close(); err != nil { + return err + } + + // copy blob metadata to version node + err = xattrs.CopyMetadata(nodePath, versionsPath, func(attributeName string) bool { + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || // for checksums + attributeName == xattrs.BlobIDAttr || + attributeName == xattrs.BlobsizeAttr + }) if err != nil { - return + return errtypes.InternalError("failed to copy blob xattrs to version node") + } + + // keep mtime from previous version + if err := os.Chtimes(versionsPath, fi.ModTime(), fi.ModTime()); err != nil { + return errtypes.InternalError("failed to change mtime of version node") } - // copy old revision to current location + // update blob id in node + // copy blob metadata from revision to node revisionPath := fs.lu.InternalPath(spaceID, revisionKey) + err = xattrs.CopyMetadata(revisionPath, nodePath, func(attributeName string) bool { + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || + attributeName == xattrs.BlobIDAttr || + attributeName == xattrs.BlobsizeAttr + }) + if err != nil { + return errtypes.InternalError("failed to copy blob xattrs to version node") + } - if err = os.Rename(revisionPath, nodePath); err != nil { - return + // explicitly update mtime of node as writing xattrs does not change mtime + now := time.Now() + if err := os.Chtimes(nodePath, now, now); err != nil { + return errtypes.InternalError("failed to change mtime of version node") } return fs.tp.Propagate(ctx, n) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index cd278c92dd..0d8c3f763c 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -806,14 +806,14 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // copy blob metadata to version node err = xattrs.CopyMetadataWithSourceLock(targetPath, versionsPath, func(attributeName string) bool { - return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || // for checksums + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || attributeName == xattrs.BlobIDAttr || attributeName == xattrs.BlobsizeAttr }, lock) if err != nil { discardBlob() sublog.Err(err).Str("version", versionsPath).Msg("failed to copy xattrs to version node") - return errtypes.InternalError("failed to copy xattrs to version node") + return errtypes.InternalError("failed to copy blob xattrs to version node") } // keep mtime from previous version diff --git a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go index 74e622e77f..357da0f046 100644 --- a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go +++ b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go @@ -125,16 +125,16 @@ func refFromCS3(b []byte) (*provider.Reference, error) { // For the source file, a shared lock is acquired. For the target, an exclusive // write lock is acquired. func CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) { - var writeLock, readLock *flock.Flock + var readLock *flock.Flock - // Acquire the write log on the target node first. - writeLock, err = filelocks.AcquireWriteLock(target) + // Acquire a read log on the source node + readLock, err = filelocks.AcquireReadLock(src) if err != nil { - return errors.Wrap(err, "xattrs: Unable to lock target to write") + return errors.Wrap(err, "xattrs: Unable to lock source to read") } defer func() { - rerr := filelocks.ReleaseLock(writeLock) + rerr := filelocks.ReleaseLock(readLock) // if err is non nil we do not overwrite that if err == nil { @@ -155,13 +155,13 @@ func CopyMetadataWithSourceLock(src, target string, filter func(attributeName st return errors.New("no lock provided") case readLock.Path() != src+".flock": return errors.New("lockpath does not match filepath") - case !readLock.Locked(): + case !readLock.Locked() && !readLock.RLocked(): // we need either a read or a write lock return errors.New("not locked") } var writeLock *flock.Flock - // Acquire the write log on the target node first. + // Acquire the write log on the target node writeLock, err = filelocks.AcquireWriteLock(target) if err != nil { From b1628940bd7b7e21fa67019a7b3e3d9a0c20f4f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 13:40:59 +0000 Subject: [PATCH 17/24] assemble permissions on the node when checking a revision MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/node/permissions.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/storage/utils/decomposedfs/node/permissions.go b/pkg/storage/utils/decomposedfs/node/permissions.go index 117b9a72f3..53253868be 100644 --- a/pkg/storage/utils/decomposedfs/node/permissions.go +++ b/pkg/storage/utils/decomposedfs/node/permissions.go @@ -20,10 +20,12 @@ package node import ( "context" + "strings" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/utils" "github.com/pkg/errors" ) @@ -91,6 +93,16 @@ func (p *Permissions) AssemblePermissions(ctx context.Context, n *Node) (ap prov return NoPermissions(), nil } + if strings.Contains(n.ID, RevisionIDDelimiter) { + // verify revision key format + kp := strings.SplitN(n.ID, RevisionIDDelimiter, 2) + if len(kp) != 2 { + return NoPermissions(), errtypes.NotFound(n.ID) + } + // use the actual node for the permission assembly + n.ID = kp[0] + } + // check if the current user is the owner if utils.UserIDEqual(u.Id, n.Owner()) { return OwnerPermissions(), nil From d9534ef21be3bb7a09ea3bd483d37149a9cdb83c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 16:10:23 +0000 Subject: [PATCH 18/24] fix typos MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/upload.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 0d8c3f763c..c53d20f90b 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -718,13 +718,13 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { defer file.Close() err = upload.fs.tp.WriteBlob(n, file) if err != nil { - return errors.Wrap(err, "failed to upload file to blostore") + return errors.Wrap(err, "failed to upload file to blobstore") } // prepare discarding the blob if something changed while writing it discardBlob := func() { if err := upload.fs.tp.DeleteBlob(n); err != nil { - sublog.Err(err).Str("blobid", n.BlobID).Msg("Decomposedfs: failed to discard blob in blostore") + sublog.Err(err).Str("blobid", n.BlobID).Msg("Decomposedfs: failed to discard blob in blobstore") } } @@ -761,7 +761,6 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { } } - // FIXME move versioning to blobs ... no need to copy all the metadata! well ... it does if we want to version metadata... // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries versionsPath = upload.fs.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) From 36b8a848d99f3cbced35a2171d2cf3f03c20f844 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 18 Nov 2022 16:11:22 +0000 Subject: [PATCH 19/24] allow revision download when user has initiate download and list revision permission MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- internal/http/services/owncloud/ocdav/meta.go | 10 ++++++++++ pkg/storage/utils/decomposedfs/revisions.go | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/internal/http/services/owncloud/ocdav/meta.go b/internal/http/services/owncloud/ocdav/meta.go index a345ffbd8b..0d6bed1be0 100644 --- a/internal/http/services/owncloud/ocdav/meta.go +++ b/internal/http/services/owncloud/ocdav/meta.go @@ -23,6 +23,7 @@ import ( "fmt" "net/http" "path" + "strings" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -70,6 +71,15 @@ func (h *MetaHandler) Handler(s *svc) http.Handler { errors.HandleWebdavError(logger, w, b, err) return } + if did.StorageId == "" && did.OpaqueId == "" && strings.Count(id, ":") >= 2 { + logger := appctx.GetLogger(r.Context()) + logger.Warn().Str("id", id).Msg("detected invalid : separted resourceid id, trying to split it ... but fix the client that made the request") + // try splitting with : + parts := strings.SplitN(id, ":", 3) + did.StorageId = parts[0] + did.SpaceId = parts[1] + did.OpaqueId = parts[2] + } var head string head, r.URL.Path = router.ShiftPath(r.URL.Path) diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index 9036634490..1b0adc741d 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -126,7 +126,7 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe switch { case err != nil: return nil, errtypes.InternalError(err.Error()) - case !rp.ListFileVersions || !rp.RestoreFileVersion || !rp.InitiateFileDownload: // TODO add explicit permission in the CS3 api? + case !rp.ListFileVersions || !rp.InitiateFileDownload: // TODO add explicit permission in the CS3 api? f, _ := storagespace.FormatReference(ref) if rp.Stat { return nil, errtypes.PermissionDenied(f) From c90d8c4d34f4651c74559aaefae6e7d49083352a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Mon, 21 Nov 2022 10:49:05 +0000 Subject: [PATCH 20/24] fix reading revision node MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- internal/http/services/owncloud/ocdav/meta.go | 2 +- pkg/storage/utils/decomposedfs/node/node.go | 20 +++++++++++++++++-- .../utils/decomposedfs/node/permissions.go | 1 + 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/internal/http/services/owncloud/ocdav/meta.go b/internal/http/services/owncloud/ocdav/meta.go index 0d6bed1be0..69d7da344d 100644 --- a/internal/http/services/owncloud/ocdav/meta.go +++ b/internal/http/services/owncloud/ocdav/meta.go @@ -73,7 +73,7 @@ func (h *MetaHandler) Handler(s *svc) http.Handler { } if did.StorageId == "" && did.OpaqueId == "" && strings.Count(id, ":") >= 2 { logger := appctx.GetLogger(r.Context()) - logger.Warn().Str("id", id).Msg("detected invalid : separted resourceid id, trying to split it ... but fix the client that made the request") + logger.Warn().Str("id", id).Msg("detected invalid : separated resourceid id, trying to split it ... but fix the client that made the request") // try splitting with : parts := strings.SplitN(id, ":", 3) did.StorageId = parts[0] diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index affd0b8e1a..cafe50e0d3 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -215,6 +215,19 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis return r, nil } + // are we reading a revision? + revision := "" + if strings.Contains(nodeID, RevisionIDDelimiter) { + // verify revision key format + kp := strings.SplitN(nodeID, RevisionIDDelimiter, 2) + if len(kp) == 2 { + // use the actual node for the metadata lookup + nodeID = kp[0] + // remember revision for blob metadata + revision = kp[1] + } + } + // read node n = &Node{ SpaceID: spaceID, @@ -237,7 +250,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis n.Exists = true // lookup blobID in extended attributes - n.BlobID, err = n.Xattr(xattrs.BlobIDAttr) + n.BlobID, err = ReadBlobIDAttr(nodePath + revision) switch { case xattrs.IsNotExist(err): return n, nil // swallow not found, the node defaults to exists = false @@ -246,7 +259,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis } // Lookup blobsize - n.Blobsize, err = ReadBlobSizeAttr(nodePath) + n.Blobsize, err = ReadBlobSizeAttr(nodePath + revision) switch { case xattrs.IsNotExist(err): return n, nil // swallow not found, the node defaults to exists = false @@ -265,6 +278,9 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis return nil, errtypes.InternalError(err.Error()) } + // append back revision to nodeid + n.ID += revision + // TODO why do we stat the parent? to determine if the current node is in the trash we would need to traverse all parents... // we need to traverse all parents for permissions anyway ... // - we can compare to space root owner with the current user diff --git a/pkg/storage/utils/decomposedfs/node/permissions.go b/pkg/storage/utils/decomposedfs/node/permissions.go index 53253868be..9a076f7f65 100644 --- a/pkg/storage/utils/decomposedfs/node/permissions.go +++ b/pkg/storage/utils/decomposedfs/node/permissions.go @@ -93,6 +93,7 @@ func (p *Permissions) AssemblePermissions(ctx context.Context, n *Node) (ap prov return NoPermissions(), nil } + // are we reading a revision? if strings.Contains(n.ID, RevisionIDDelimiter) { // verify revision key format kp := strings.SplitN(n.ID, RevisionIDDelimiter, 2) From 86791037dc3e50d28cc2b541932190afcdc82176 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Mon, 21 Nov 2022 11:23:45 +0000 Subject: [PATCH 21/24] do not forget revision delimiter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/node/node.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index cafe50e0d3..d2f579ca9b 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -216,7 +216,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis } // are we reading a revision? - revision := "" + revisionSuffix := "" if strings.Contains(nodeID, RevisionIDDelimiter) { // verify revision key format kp := strings.SplitN(nodeID, RevisionIDDelimiter, 2) @@ -224,7 +224,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis // use the actual node for the metadata lookup nodeID = kp[0] // remember revision for blob metadata - revision = kp[1] + revisionSuffix = RevisionIDDelimiter + kp[1] } } @@ -236,6 +236,11 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis SpaceRoot: r, } + // append back revision to nodeid, even when returning a not existing node + defer func() { + n.ID += revisionSuffix + }() + nodePath := n.InternalPath() // lookup name in extended attributes @@ -250,7 +255,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis n.Exists = true // lookup blobID in extended attributes - n.BlobID, err = ReadBlobIDAttr(nodePath + revision) + n.BlobID, err = ReadBlobIDAttr(nodePath + revisionSuffix) switch { case xattrs.IsNotExist(err): return n, nil // swallow not found, the node defaults to exists = false @@ -259,7 +264,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis } // Lookup blobsize - n.Blobsize, err = ReadBlobSizeAttr(nodePath + revision) + n.Blobsize, err = ReadBlobSizeAttr(nodePath + revisionSuffix) switch { case xattrs.IsNotExist(err): return n, nil // swallow not found, the node defaults to exists = false @@ -278,9 +283,6 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis return nil, errtypes.InternalError(err.Error()) } - // append back revision to nodeid - n.ID += revision - // TODO why do we stat the parent? to determine if the current node is in the trash we would need to traverse all parents... // we need to traverse all parents for permissions anyway ... // - we can compare to space root owner with the current user From f2d29bb500d631d003ff9db6693eba6481354089 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Mon, 21 Nov 2022 12:18:36 +0000 Subject: [PATCH 22/24] drop old revision MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/revisions.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index 1b0adc741d..26b8237600 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -234,6 +234,11 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer return errtypes.InternalError("failed to copy blob xattrs to version node") } + // drop old revision + if err := os.Remove(revisionPath); err != nil { + log.Warn().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("could not delete old revision, continuing") + } + // explicitly update mtime of node as writing xattrs does not change mtime now := time.Now() if err := os.Chtimes(nodePath, now, now); err != nil { From 2401c1f4f20d120af0858035929bdeb7604c5832 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Mon, 21 Nov 2022 12:24:52 +0000 Subject: [PATCH 23/24] remove unexpected failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- tests/acceptance/expected-failures-on-OCIS-storage.md | 4 ---- tests/acceptance/expected-failures-on-S3NG-storage.md | 3 --- 2 files changed, 7 deletions(-) diff --git a/tests/acceptance/expected-failures-on-OCIS-storage.md b/tests/acceptance/expected-failures-on-OCIS-storage.md index d8f28a0a75..5808c17a2a 100644 --- a/tests/acceptance/expected-failures-on-OCIS-storage.md +++ b/tests/acceptance/expected-failures-on-OCIS-storage.md @@ -284,10 +284,6 @@ _requires a [CS3 user provisioning api that can update the quota for a user](htt - [apiWebdavMove2/moveShareOnOcis.feature:169](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavMove2/moveShareOnOcis.feature#L169) - [apiWebdavMove2/moveShareOnOcis.feature:170](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavMove2/moveShareOnOcis.feature#L170) - -#### [restoring an older version of a shared file deletes the share](https://github.com/owncloud/ocis/issues/765) -- [apiShareManagementToShares/acceptShares.feature:579](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiShareManagementToShares/acceptShares.feature#L579) - #### [Expiration date for shares is not implemented](https://github.com/owncloud/ocis/issues/1250) #### Expiration date of user shares - [apiShareCreateSpecialToShares1/createShareExpirationDate.feature:52](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiShareCreateSpecialToShares1/createShareExpirationDate.feature#L52) diff --git a/tests/acceptance/expected-failures-on-S3NG-storage.md b/tests/acceptance/expected-failures-on-S3NG-storage.md index a9efc55de4..6945fffa9a 100644 --- a/tests/acceptance/expected-failures-on-S3NG-storage.md +++ b/tests/acceptance/expected-failures-on-S3NG-storage.md @@ -293,9 +293,6 @@ _requires a [CS3 user provisioning api that can update the quota for a user](htt - [apiWebdavMove2/moveShareOnOcis.feature:169](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavMove2/moveShareOnOcis.feature#L169) - [apiWebdavMove2/moveShareOnOcis.feature:170](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavMove2/moveShareOnOcis.feature#L170) -#### [restoring an older version of a shared file deletes the share](https://github.com/owncloud/ocis/issues/765) -- [apiShareManagementToShares/acceptShares.feature:579](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiShareManagementToShares/acceptShares.feature#L579) - #### [Expiration date for shares is not implemented](https://github.com/owncloud/ocis/issues/1250) #### Expiration date of user shares - [apiShareCreateSpecialToShares1/createShareExpirationDate.feature:52](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiShareCreateSpecialToShares1/createShareExpirationDate.feature#L52) From 19ee0cc84aacd090e9ac73f2c07cb80b67999d48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Mon, 21 Nov 2022 12:51:01 +0000 Subject: [PATCH 24/24] update changelog and unexpected passes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- changelog/unreleased/decomposedfs-finish-upload-rewrite.md | 4 +++- tests/acceptance/expected-failures-on-OCIS-storage.md | 3 --- tests/acceptance/expected-failures-on-S3NG-storage.md | 3 --- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/changelog/unreleased/decomposedfs-finish-upload-rewrite.md b/changelog/unreleased/decomposedfs-finish-upload-rewrite.md index 73cfb8f306..834958b94a 100644 --- a/changelog/unreleased/decomposedfs-finish-upload-rewrite.md +++ b/changelog/unreleased/decomposedfs-finish-upload-rewrite.md @@ -2,4 +2,6 @@ Bugfix: decomposedfs fix revision download We rewrote the finish upload code to use a write lock when creating and updating node metadata. This prevents some cornercases, allows us to calculate the size diff atomically and fixes downloading revisions. -https://github.com/cs3org/reva/pull/3473 \ No newline at end of file +https://github.com/cs3org/reva/pull/3473 +https://github.com/owncloud/ocis/issues/765 +https://github.com/owncloud/ocis/issues/3868 \ No newline at end of file diff --git a/tests/acceptance/expected-failures-on-OCIS-storage.md b/tests/acceptance/expected-failures-on-OCIS-storage.md index 5808c17a2a..2dc0719d14 100644 --- a/tests/acceptance/expected-failures-on-OCIS-storage.md +++ b/tests/acceptance/expected-failures-on-OCIS-storage.md @@ -784,9 +784,6 @@ moving outside of the Shares folder gives 501 Not Implemented. - [apiWebdavProperties1/copyFile.feature:437](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavProperties1/copyFile.feature#L437) - [apiWebdavProperties1/copyFile.feature:442](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavProperties1/copyFile.feature#L442) -#### [Downloading the older version of shared file gives 404](https://github.com/owncloud/ocis/issues/3868) -- [apiVersions/fileVersionsSharingToShares.feature:306](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionsSharingToShares.feature#L306) - #### [file versions do not report the version author](https://github.com/owncloud/ocis/issues/2914) - [apiVersions/fileVersionAuthor.feature:14](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionAuthor.feature#L14) - [apiVersions/fileVersionAuthor.feature:37](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionAuthor.feature#L37) diff --git a/tests/acceptance/expected-failures-on-S3NG-storage.md b/tests/acceptance/expected-failures-on-S3NG-storage.md index 6945fffa9a..0cf8dde14c 100644 --- a/tests/acceptance/expected-failures-on-S3NG-storage.md +++ b/tests/acceptance/expected-failures-on-S3NG-storage.md @@ -9,9 +9,6 @@ Basic file management like up and download, move, copy, properties, quota, trash - [apiTrashbin/trashbinFilesFolders.feature:318](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiTrashbin/trashbinFilesFolders.feature#L318) - [apiTrashbin/trashbinFilesFolders.feature:323](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiTrashbin/trashbinFilesFolders.feature#L323) -#### [Downloading the older version of shared file gives 404](https://github.com/owncloud/ocis/issues/3868) -- [apiVersions/fileVersionsSharingToShares.feature:306](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionsSharingToShares.feature#L306) - #### [file versions do not report the version author](https://github.com/owncloud/ocis/issues/2914) - [apiVersions/fileVersionAuthor.feature:14](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionAuthor.feature#L14) - [apiVersions/fileVersionAuthor.feature:37](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionAuthor.feature#L37)