diff --git a/changelog/unreleased/decomposedfs-finish-upload-rewrite.md b/changelog/unreleased/decomposedfs-finish-upload-rewrite.md new file mode 100644 index 0000000000..834958b94a --- /dev/null +++ b/changelog/unreleased/decomposedfs-finish-upload-rewrite.md @@ -0,0 +1,7 @@ +Bugfix: decomposedfs fix revision download + +We rewrote the finish upload code to use a write lock when creating and updating node metadata. This prevents some cornercases, allows us to calculate the size diff atomically and fixes downloading revisions. + +https://github.com/cs3org/reva/pull/3473 +https://github.com/owncloud/ocis/issues/765 +https://github.com/owncloud/ocis/issues/3868 \ No newline at end of file diff --git a/internal/http/services/owncloud/ocdav/meta.go b/internal/http/services/owncloud/ocdav/meta.go index a345ffbd8b..69d7da344d 100644 --- a/internal/http/services/owncloud/ocdav/meta.go +++ b/internal/http/services/owncloud/ocdav/meta.go @@ -23,6 +23,7 @@ import ( "fmt" "net/http" "path" + "strings" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -70,6 +71,15 @@ func (h *MetaHandler) Handler(s *svc) http.Handler { errors.HandleWebdavError(logger, w, b, err) return } + if did.StorageId == "" && did.OpaqueId == "" && strings.Count(id, ":") >= 2 { + logger := appctx.GetLogger(r.Context()) + logger.Warn().Str("id", id).Msg("detected invalid : separated resourceid id, trying to split it ... but fix the client that made the request") + // try splitting with : + parts := strings.SplitN(id, ":", 3) + did.StorageId = parts[0] + did.SpaceId = parts[1] + did.OpaqueId = parts[2] + } var head string head, r.URL.Path = router.ShiftPath(r.URL.Path) diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 776ceb5d49..843a0df575 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -30,6 +30,7 @@ import ( "path" "path/filepath" "strconv" + "strings" "syscall" cs3permissions "github.com/cs3org/go-cs3apis/cs3/permissions/v1beta1" @@ -591,6 +592,12 @@ func (fs *Decomposedfs) Delete(ctx context.Context, ref *provider.Reference) (er // Download returns a reader to the specified resource func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) { + // check if we are trying to download a revision + // TODO the CS3 api should allow initiating a revision download + if ref.ResourceId != nil && strings.Contains(ref.ResourceId.OpaqueId, node.RevisionIDDelimiter) { + return fs.DownloadRevision(ctx, ref, ref.ResourceId.OpaqueId) + } + node, err := fs.lu.NodeFromResource(ctx, ref) if err != nil { return nil, errors.Wrap(err, "Decomposedfs: error resolving ref") diff --git a/pkg/storage/utils/decomposedfs/metadata.go b/pkg/storage/utils/decomposedfs/metadata.go index ed9122796c..0e5100e86c 100644 --- a/pkg/storage/utils/decomposedfs/metadata.go +++ b/pkg/storage/utils/decomposedfs/metadata.go @@ -73,8 +73,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. if md.Metadata != nil { if val, ok := md.Metadata["mtime"]; ok { delete(md.Metadata, "mtime") - err := n.SetMtime(ctx, val) - if err != nil { + if err := n.SetMtimeString(val); err != nil { errs = append(errs, errors.Wrap(err, "could not set mtime")) } } @@ -85,8 +84,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. // TODO unset when folder is updated or add timestamp to etag? if val, ok := md.Metadata["etag"]; ok { delete(md.Metadata, "etag") - err := n.SetEtag(ctx, val) - if err != nil { + if err := n.SetEtag(ctx, val); err != nil { errs = append(errs, errors.Wrap(err, "could not set etag")) } } diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 04acdf444f..d2f579ca9b 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -215,6 +215,19 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis return r, nil } + // are we reading a revision? + revisionSuffix := "" + if strings.Contains(nodeID, RevisionIDDelimiter) { + // verify revision key format + kp := strings.SplitN(nodeID, RevisionIDDelimiter, 2) + if len(kp) == 2 { + // use the actual node for the metadata lookup + nodeID = kp[0] + // remember revision for blob metadata + revisionSuffix = RevisionIDDelimiter + kp[1] + } + } + // read node n = &Node{ SpaceID: spaceID, @@ -223,6 +236,11 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis SpaceRoot: r, } + // append back revision to nodeid, even when returning a not existing node + defer func() { + n.ID += revisionSuffix + }() + nodePath := n.InternalPath() // lookup name in extended attributes @@ -237,7 +255,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis n.Exists = true // lookup blobID in extended attributes - n.BlobID, err = n.Xattr(xattrs.BlobIDAttr) + n.BlobID, err = ReadBlobIDAttr(nodePath + revisionSuffix) switch { case xattrs.IsNotExist(err): return n, nil // swallow not found, the node defaults to exists = false @@ -246,7 +264,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis } // Lookup blobsize - n.Blobsize, err = ReadBlobSizeAttr(nodePath) + n.Blobsize, err = ReadBlobSizeAttr(nodePath + revisionSuffix) switch { case xattrs.IsNotExist(err): return n, nil // swallow not found, the node defaults to exists = false @@ -495,25 +513,20 @@ func calculateEtag(nodeID string, tmTime time.Time) (string, error) { return fmt.Sprintf(`"%x"`, h.Sum(nil)), nil } -// SetMtime sets the mtime and atime of a node -func (n *Node) SetMtime(ctx context.Context, mtime string) error { - sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() - if mt, err := parseMTime(mtime); err == nil { - nodePath := n.InternalPath() - // updating mtime also updates atime - if err := os.Chtimes(nodePath, mt, mt); err != nil { - sublog.Error().Err(err). - Time("mtime", mt). - Msg("could not set mtime") - return errors.Wrap(err, "could not set mtime") - } - } else { - sublog.Error().Err(err). - Str("mtime", mtime). - Msg("could not parse mtime") - return errors.Wrap(err, "could not parse mtime") +// SetMtimeString sets the mtime and atime of a node to the unixtime parsed from the given string +func (n *Node) SetMtimeString(mtime string) error { + mt, err := parseMTime(mtime) + if err != nil { + return err } - return nil + return n.SetMtime(mt) +} + +// SetMtime sets the mtime and atime of a node +func (n *Node) SetMtime(mtime time.Time) error { + nodePath := n.InternalPath() + // updating mtime also updates atime + return os.Chtimes(nodePath, mtime, mtime) } // SetEtag sets the temporary etag of a node if it differs from the current etag @@ -929,6 +942,15 @@ func (n *Node) SetTreeSize(ts uint64) (err error) { return n.SetXattr(xattrs.TreesizeAttr, strconv.FormatUint(ts, 10)) } +// GetBlobSize reads the blobsize from the extended attributes +func (n *Node) GetBlobSize() (treesize uint64, err error) { + var b string + if b, err = n.Xattr(xattrs.TreesizeAttr); err != nil { + return + } + return strconv.ParseUint(b, 10, 64) +} + // SetChecksum writes the checksum with the given checksum type to the extended attributes func (n *Node) SetChecksum(csType string, h hash.Hash) (err error) { return n.SetXattr(xattrs.ChecksumPrefix+csType, string(h.Sum(nil))) diff --git a/pkg/storage/utils/decomposedfs/node/node_test.go b/pkg/storage/utils/decomposedfs/node/node_test.go index a8c192936f..c92ce5b785 100644 --- a/pkg/storage/utils/decomposedfs/node/node_test.go +++ b/pkg/storage/utils/decomposedfs/node/node_test.go @@ -89,10 +89,10 @@ var _ = Describe("Node", func() { n, err := env.Lookup.NodeFromResource(env.Ctx, ref) Expect(err).ToNot(HaveOccurred()) - blobsize := 239485734 + blobsize := int64(239485734) n.Name = "TestName" n.BlobID = "TestBlobID" - n.Blobsize = int64(blobsize) + n.Blobsize = blobsize err = n.WriteAllNodeMetadata() Expect(err).ToNot(HaveOccurred()) @@ -100,7 +100,7 @@ var _ = Describe("Node", func() { Expect(err).ToNot(HaveOccurred()) Expect(n2.Name).To(Equal("TestName")) Expect(n2.BlobID).To(Equal("TestBlobID")) - Expect(n2.Blobsize).To(Equal(int64(blobsize))) + Expect(n2.Blobsize).To(Equal(blobsize)) }) }) diff --git a/pkg/storage/utils/decomposedfs/node/permissions.go b/pkg/storage/utils/decomposedfs/node/permissions.go index 117b9a72f3..9a076f7f65 100644 --- a/pkg/storage/utils/decomposedfs/node/permissions.go +++ b/pkg/storage/utils/decomposedfs/node/permissions.go @@ -20,10 +20,12 @@ package node import ( "context" + "strings" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/utils" "github.com/pkg/errors" ) @@ -91,6 +93,17 @@ func (p *Permissions) AssemblePermissions(ctx context.Context, n *Node) (ap prov return NoPermissions(), nil } + // are we reading a revision? + if strings.Contains(n.ID, RevisionIDDelimiter) { + // verify revision key format + kp := strings.SplitN(n.ID, RevisionIDDelimiter, 2) + if len(kp) != 2 { + return NoPermissions(), errtypes.NotFound(n.ID) + } + // use the actual node for the permission assembly + n.ID = kp[0] + } + // check if the current user is the owner if utils.UserIDEqual(u.Id, n.Owner()) { return OwnerPermissions(), nil diff --git a/pkg/storage/utils/decomposedfs/node/xattrs.go b/pkg/storage/utils/decomposedfs/node/xattrs.go index be65e0d177..538fc9da87 100644 --- a/pkg/storage/utils/decomposedfs/node/xattrs.go +++ b/pkg/storage/utils/decomposedfs/node/xattrs.go @@ -20,6 +20,7 @@ package node import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/gofrs/flock" "github.com/pkg/xattr" ) @@ -34,6 +35,18 @@ func (n *Node) SetXattrs(attribs map[string]string) (err error) { return xattrs.SetMultiple(n.InternalPath(), attribs) } +// SetXattrsWithLock sets multiple extended attributes on the write-through cache/node with a given lock +func (n *Node) SetXattrsWithLock(attribs map[string]string, fileLock *flock.Flock) (err error) { + // TODO what if writing the lock fails? + if n.xattrsCache != nil { + for k, v := range attribs { + n.xattrsCache[k] = v + } + } + + return xattrs.SetMultipleWithLock(n.InternalPath(), attribs, fileLock) +} + // SetXattr sets an extended attribute on the write-through cache/node func (n *Node) SetXattr(key, val string) (err error) { if n.xattrsCache != nil { @@ -80,5 +93,6 @@ func (n *Node) Xattr(key string) (string, error) { if val, ok := n.xattrsCache[key]; ok { return val, nil } - return "", xattr.ENOATTR + // wrap the error as xattr does + return "", &xattr.Error{Op: "xattr.get", Path: n.InternalPath(), Name: key, Err: xattr.ENOATTR} } diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index a9a7381c21..26b8237600 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -21,7 +21,6 @@ package decomposedfs import ( "context" "io" - iofs "io/fs" "os" "path/filepath" "strings" @@ -31,6 +30,7 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/pkg/errors" ) @@ -83,7 +83,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen } blobSize, err := node.ReadBlobSizeAttr(items[i]) if err != nil { - return nil, errors.Wrapf(err, "error reading blobsize xattr") + appctx.GetLogger(ctx).Error().Err(err).Str("name", fi.Name()).Msg("error reading blobsize xattr, using 0") } rev.Size = uint64(blobSize) etag, err := node.CalculateEtag(np, mtime) @@ -99,6 +99,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen } // DownloadRevision returns a reader for the specified revision +// FIXME the CS3 api should explicitly allow initiating revision and trash download, a related issue is https://github.com/cs3org/reva/issues/1813 func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Reference, revisionKey string) (io.ReadCloser, error) { log := appctx.GetLogger(ctx) @@ -125,7 +126,7 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe switch { case err != nil: return nil, errtypes.InternalError(err.Error()) - case !rp.ListFileVersions || !rp.RestoreFileVersion || !rp.InitiateFileDownload: // TODO add explicit permission in the CS3 api? + case !rp.ListFileVersions || !rp.InitiateFileDownload: // TODO add explicit permission in the CS3 api? f, _ := storagespace.FormatReference(ref) if rp.Stat { return nil, errtypes.PermissionDenied(f) @@ -135,14 +136,18 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe contentPath := fs.lu.InternalPath(spaceID, revisionKey) - r, err := os.Open(contentPath) + blobid, err := node.ReadBlobIDAttr(contentPath) if err != nil { - if errors.Is(err, iofs.ErrNotExist) { - return nil, errtypes.NotFound(contentPath) - } - return nil, errors.Wrap(err, "Decomposedfs: error opening revision "+revisionKey) + return nil, errors.Wrapf(err, "Decomposedfs: could not read blob id of revision '%s' for node '%s'", n.ID, revisionKey) } - return r, nil + + revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid} + + reader, err := fs.tp.ReadBlob(&revisionNode) + if err != nil { + return nil, errors.Wrapf(err, "Decomposedfs: could not download blob of revision '%s' for node '%s'", n.ID, revisionKey) + } + return reader, nil } // RestoreRevision restores the specified revision of the resource @@ -194,17 +199,50 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries versionsPath := fs.lu.InternalPath(spaceID, kp[0]+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) - err = os.Rename(nodePath, versionsPath) + // touch version node + if file, err := os.Create(versionsPath); err != nil { + return err + } else if err := file.Close(); err != nil { + return err + } + + // copy blob metadata to version node + err = xattrs.CopyMetadata(nodePath, versionsPath, func(attributeName string) bool { + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || // for checksums + attributeName == xattrs.BlobIDAttr || + attributeName == xattrs.BlobsizeAttr + }) if err != nil { - return + return errtypes.InternalError("failed to copy blob xattrs to version node") } - // copy old revision to current location + // keep mtime from previous version + if err := os.Chtimes(versionsPath, fi.ModTime(), fi.ModTime()); err != nil { + return errtypes.InternalError("failed to change mtime of version node") + } + + // update blob id in node + // copy blob metadata from revision to node revisionPath := fs.lu.InternalPath(spaceID, revisionKey) + err = xattrs.CopyMetadata(revisionPath, nodePath, func(attributeName string) bool { + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || + attributeName == xattrs.BlobIDAttr || + attributeName == xattrs.BlobsizeAttr + }) + if err != nil { + return errtypes.InternalError("failed to copy blob xattrs to version node") + } + + // drop old revision + if err := os.Remove(revisionPath); err != nil { + log.Warn().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("could not delete old revision, continuing") + } - if err = os.Rename(revisionPath, nodePath); err != nil { - return + // explicitly update mtime of node as writing xattrs does not change mtime + now := time.Now() + if err := os.Chtimes(nodePath, now, now); err != nil { + return errtypes.InternalError("failed to change mtime of version node") } return fs.tp.Propagate(ctx, n) diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index e1e458832d..37fd1071ec 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -692,7 +692,7 @@ func (t *Tree) removeNode(path string, n *node.Node) error { // Propagate propagates changes to the root of the tree func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) { - sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() if !t.treeTimeAccounting && !t.treeSizeAccounting { // no propagation enabled sublog.Debug().Msg("propagation disabled") @@ -713,7 +713,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) { break } - sublog = sublog.With().Interface("node", n).Logger() + sublog = sublog.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() // TODO none, sync and async? if !n.HasPropagation() { @@ -757,7 +757,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) { } } - if err := n.UnsetTempEtag(); err != nil { + if err := n.UnsetTempEtag(); err != nil && !xattrs.IsAttrUnset(err) { sublog.Error().Err(err).Msg("could not remove temporary etag attribute") } } diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index d45c573586..c53d20f90b 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -48,11 +48,11 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" "github.com/pkg/errors" - "github.com/rs/zerolog" ) var defaultFilePerm = os.FileMode(0664) @@ -566,6 +566,20 @@ func (upload *fileUpload) writeInfo() error { } // FinishUpload finishes an upload and moves the file to the internal destination +// +// # upload steps +// check if match header to fail early +// copy blob +// lock metadata node +// check if match header again as safeguard +// read metadata +// create version node with current metadata +// update node metadata with new blobid etc +// remember size diff +// unlock metadata +// propagate size diff and new etag +// - propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant +// - propagation needs to propagate the diff func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // ensure cleanup @@ -599,18 +613,18 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { } overwrite := n.ID != "" - var oldSize uint64 + var oldSize int64 if overwrite { // read size from existing node old, _ := node.ReadNode(ctx, upload.fs.lu, spaceID, n.ID, false) - oldSize = uint64(old.Blobsize) + oldSize = old.Blobsize } else { // create new fileid n.ID = uuid.New().String() upload.info.Storage["NodeId"] = n.ID } - if _, err = node.CheckQuota(n.SpaceRoot, overwrite, oldSize, uint64(fi.Size())); err != nil { + if _, err = node.CheckQuota(n.SpaceRoot, overwrite, uint64(oldSize), uint64(fi.Size())); err != nil { return err } @@ -618,6 +632,8 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { sublog := appctx.GetLogger(upload.ctx). With(). Interface("info", upload.info). + Str("spaceid", spaceID). + Str("nodeid", n.ID). Str("binPath", upload.binPath). Str("targetPath", targetPath). Logger() @@ -669,8 +685,9 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { // defer writing the checksums until the node is in place - // if target exists create new version - versionsPath := "" + // upload steps + // check if match header to fail early + if fi, err = os.Stat(targetPath); err == nil { // When the if-match header was set we need to check if the // etag still matches before finishing the upload. @@ -684,24 +701,16 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { return errtypes.Aborted("etag mismatch") } } - - // FIXME move versioning to blobs ... no need to copy all the metadata! well ... it does if we want to version metadata... - // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries - versionsPath = upload.fs.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) - - // This move drops all metadata!!! We copy it below with CopyMetadata - // FIXME the node must remain the same. otherwise we might restore share metadata - if err = os.Rename(targetPath, versionsPath); err != nil { - sublog.Err(err). - Str("binPath", upload.binPath). - Str("versionsPath", versionsPath). - Msg("Decomposedfs: could not create version") - return + } else { + // create dir to node + if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil { + sublog.Err(err).Msg("could not create node dir") + return errtypes.InternalError("could not create node dir") } - } - // upload the data to the blobstore + // copy blob + file, err := os.Open(upload.binPath) if err != nil { return err @@ -709,58 +718,170 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { defer file.Close() err = upload.fs.tp.WriteBlob(n, file) if err != nil { - return errors.Wrap(err, "failed to upload file to blostore") + return errors.Wrap(err, "failed to upload file to blobstore") } - // now truncate the upload (the payload stays in the blobstore) and move it to the target path - // TODO put uploads on the same underlying storage as the destination dir? - // TODO trigger a workflow as the final rename might eg involve antivirus scanning - if err = os.Truncate(upload.binPath, 0); err != nil { - sublog.Err(err). - Msg("Decomposedfs: could not truncate") - return + // prepare discarding the blob if something changed while writing it + discardBlob := func() { + if err := upload.fs.tp.DeleteBlob(n); err != nil { + sublog.Err(err).Str("blobid", n.BlobID).Msg("Decomposedfs: failed to discard blob in blobstore") + } } - if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil { - sublog.Warn().Err(err).Msg("Decomposedfs: could not create node dir, trying to write file anyway") + + // lock metadata node + lock, err := filelocks.AcquireWriteLock(targetPath) + if err != nil { + discardBlob() + return errtypes.InternalError(err.Error()) } - if err = os.Rename(upload.binPath, targetPath); err != nil { - sublog.Error().Err(err).Msg("Decomposedfs: could not rename") - return + releaseLock := func() { + // ReleaseLock returns nil if already unlocked + if err := filelocks.ReleaseLock(lock); err != nil { + sublog.Err(err).Msg("Decomposedfs:could not unlock node") + } } + defer releaseLock() + + // check if match header again as safeguard + var oldMtime time.Time + versionsPath := "" + if fi, err = os.Stat(targetPath); err == nil { + // When the if-match header was set we need to check if the + // etag still matches before finishing the upload. + if ifMatch, ok := upload.info.MetaData["if-match"]; ok { + var targetEtag string + targetEtag, err = node.CalculateEtag(n.ID, fi.ModTime()) + if err != nil { + discardBlob() + return errtypes.InternalError(err.Error()) + } + if ifMatch != targetEtag { + discardBlob() + return errtypes.Aborted("etag mismatch") + } + } + + // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries + versionsPath = upload.fs.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) + + // remember mtime of existing file so we can apply it to the version + oldMtime = fi.ModTime() + } + + // read metadata + + // attributes that will change + attrs := map[string]string{ + xattrs.BlobIDAttr: n.BlobID, + xattrs.BlobsizeAttr: strconv.FormatInt(n.Blobsize, 10), + + // update checksums + xattrs.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)), + xattrs.ChecksumPrefix + "md5": string(md5h.Sum(nil)), + xattrs.ChecksumPrefix + "adler32": string(adler32h.Sum(nil)), + } + + // create version node with current metadata + + var newMtime time.Time + // if file already exists if versionsPath != "" { - // copy grant and arbitrary metadata - // FIXME ... now restoring an older revision might bring back a grant that was removed! - err = xattrs.CopyMetadata(versionsPath, targetPath, func(attributeName string) bool { - return true - // TODO determine all attributes that must be copied, currently we just copy all and overwrite changed properties - /* - return strings.HasPrefix(attributeName, xattrs.GrantPrefix) || // for grants - strings.HasPrefix(attributeName, xattrs.MetadataPrefix) || // for arbitrary metadata - strings.HasPrefix(attributeName, xattrs.FavPrefix) || // for favorites - strings.HasPrefix(attributeName, xattrs.SpaceNameAttr) || // for a shared file - */ - }) + // touch version node + file, err := os.Create(versionsPath) if err != nil { - sublog.Info().Err(err).Msg("Decomposedfs: failed to copy xattrs") + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("could not create version node") + return errtypes.InternalError("could not create version node") } - } + fi, err := file.Stat() + if err != nil { + file.Close() + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("could not stat version node") + return errtypes.InternalError("could not stat version node") + } + newMtime = fi.ModTime() + file.Close() + + // copy blob metadata to version node + err = xattrs.CopyMetadataWithSourceLock(targetPath, versionsPath, func(attributeName string) bool { + return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || + attributeName == xattrs.BlobIDAttr || + attributeName == xattrs.BlobsizeAttr + }, lock) + if err != nil { + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("failed to copy xattrs to version node") + return errtypes.InternalError("failed to copy blob xattrs to version node") + } + + // keep mtime from previous version + if err := os.Chtimes(versionsPath, oldMtime, oldMtime); err != nil { + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("failed to change mtime of version node") + return errtypes.InternalError("failed to change mtime of version node") + } + + // we MUST bypass any cache here as we have to calculate the size diff atomically + oldSize, err = node.ReadBlobSizeAttr(targetPath) + if err != nil { + discardBlob() + sublog.Err(err).Str("version", versionsPath).Msg("failed to read old blobsize") + return errtypes.InternalError("failed to read old blobsize") + } + } else { + // touch metadata node + file, err := os.Create(targetPath) + if err != nil { + discardBlob() + sublog.Err(err).Msg("could not create node") + return errtypes.InternalError("could not create node") + } + file.Close() - // now try write all checksums - tryWritingChecksum(&sublog, n, "sha1", sha1h) - tryWritingChecksum(&sublog, n, "md5", md5h) - tryWritingChecksum(&sublog, n, "adler32", adler32h) + // basic node metadata + attrs[xattrs.ParentidAttr] = n.ParentID + attrs[xattrs.NameAttr] = n.Name + oldSize = 0 + } - // who will become the owner? the owner of the parent actually ... not the currently logged in user - err = n.WriteAllNodeMetadata() + // update node metadata with new blobid etc + err = n.SetXattrsWithLock(attrs, lock) if err != nil { + discardBlob() return errors.Wrap(err, "Decomposedfs: could not write metadata") } + // update mtime + switch { + case upload.info.MetaData["mtime"] != "": + if err := n.SetMtimeString(upload.info.MetaData["mtime"]); err != nil { + sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not apply mtime from metadata") + return err + } + case newMtime != time.Time{}: + // we are creating a version + if err := n.SetMtime(newMtime); err != nil { + sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not change mtime of node") + return err + } + } + + // remember size diff + sizeDiff := oldSize - n.Blobsize + + // unlock metadata + err = filelocks.ReleaseLock(lock) + if err != nil { + return errtypes.InternalError(err.Error()) + } + // link child name to parent if it is new childNameLink := filepath.Join(n.ParentInternalPath(), n.Name) + relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) var link string link, err = os.Readlink(childNameLink) - if err == nil && link != "../"+n.ID { + if err == nil && link != relativeNodePath { sublog.Err(err). Interface("node", n). Str("childNameLink", childNameLink). @@ -771,30 +892,12 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { return errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") } } - if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID { - relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) + if errors.Is(err, iofs.ErrNotExist) || link != relativeNodePath { if err = os.Symlink(relativeNodePath, childNameLink); err != nil { return errors.Wrap(err, "Decomposedfs: could not symlink child entry") } } - // only delete the upload if it was successfully written to the storage - if err = os.Remove(upload.infoPath); err != nil { - if !errors.Is(err, iofs.ErrNotExist) { - sublog.Err(err).Msg("Decomposedfs: could not delete upload info") - return - } - } - // use set arbitrary metadata? - if upload.info.MetaData["mtime"] != "" { - err := n.SetMtime(ctx, upload.info.MetaData["mtime"]) - if err != nil { - sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not set mtime metadata") - return err - } - - } - // fill metadata with current mtime if fi, err = os.Stat(targetPath); err == nil { upload.info.MetaData["mtime"] = fmt.Sprintf("%d.%d", fi.ModTime().Unix(), fi.ModTime().Nanosecond()) @@ -803,6 +906,12 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { n.Exists = true + // propagate size diff and new etag + // propagation can happen outside the metadata lock because diff calculation happens inside the lock and the order in which diffs are applied to the parent is irrelvevant + // propagation needs to propagate the diff + + // return upload.fs.tp.Propagate(upload.ctx, n, sizeDiff) + sublog.Debug().Int64("sizediff", sizeDiff).Msg("Decomposedfs: propagating size diff") return upload.fs.tp.Propagate(upload.ctx, n) } @@ -813,15 +922,6 @@ func (upload *fileUpload) checkHash(expected string, h hash.Hash) error { } return nil } -func tryWritingChecksum(log *zerolog.Logger, n *node.Node, algo string, h hash.Hash) { - if err := n.SetChecksum(algo, h); err != nil { - log.Err(err). - Str("csType", algo). - Bytes("hash", h.Sum(nil)). - Msg("Decomposedfs: could not write checksum") - // this is not critical, the bytes are there so we will continue - } -} func (upload *fileUpload) discardChunk() { if err := os.Remove(upload.binPath); err != nil { diff --git a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go index 732e8e880c..357da0f046 100644 --- a/pkg/storage/utils/decomposedfs/xattrs/xattrs.go +++ b/pkg/storage/utils/decomposedfs/xattrs/xattrs.go @@ -125,16 +125,16 @@ func refFromCS3(b []byte) (*provider.Reference, error) { // For the source file, a shared lock is acquired. For the target, an exclusive // write lock is acquired. func CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) { - var writeLock, readLock *flock.Flock + var readLock *flock.Flock - // Acquire the write log on the target node first. - writeLock, err = filelocks.AcquireWriteLock(target) + // Acquire a read log on the source node + readLock, err = filelocks.AcquireReadLock(src) if err != nil { - return errors.Wrap(err, "xattrs: Unable to lock target to write") + return errors.Wrap(err, "xattrs: Unable to lock source to read") } defer func() { - rerr := filelocks.ReleaseLock(writeLock) + rerr := filelocks.ReleaseLock(readLock) // if err is non nil we do not overwrite that if err == nil { @@ -142,14 +142,33 @@ func CopyMetadata(src, target string, filter func(attributeName string) bool) (e } }() - // now try to get a shared lock on the source - readLock, err = filelocks.AcquireReadLock(src) + return CopyMetadataWithSourceLock(src, target, filter, readLock) +} + +// CopyMetadataWithSourceLock copies all extended attributes from source to target. +// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix +// For the source file, a shared lock is acquired. For the target, an exclusive +// write lock is acquired. +func CopyMetadataWithSourceLock(src, target string, filter func(attributeName string) bool, readLock *flock.Flock) (err error) { + switch { + case readLock == nil: + return errors.New("no lock provided") + case readLock.Path() != src+".flock": + return errors.New("lockpath does not match filepath") + case !readLock.Locked() && !readLock.RLocked(): // we need either a read or a write lock + return errors.New("not locked") + } + + var writeLock *flock.Flock + + // Acquire the write log on the target node + writeLock, err = filelocks.AcquireWriteLock(target) if err != nil { - return errors.Wrap(err, "xattrs: Unable to lock file for read") + return errors.Wrap(err, "xattrs: Unable to lock target to write") } defer func() { - rerr := filelocks.ReleaseLock(readLock) + rerr := filelocks.ReleaseLock(writeLock) // if err is non nil we do not overwrite that if err == nil { @@ -246,6 +265,20 @@ func SetMultiple(filePath string, attribs map[string]string) (err error) { } }() + return SetMultipleWithLock(filePath, attribs, fileLock) +} + +// SetMultipleWithLock allows setting multiple key value pairs at once with an existing lock +func SetMultipleWithLock(filePath string, attribs map[string]string, fileLock *flock.Flock) (err error) { + switch { + case fileLock == nil: + return errors.New("no lock provided") + case fileLock.Path() != filePath+".flock": + return errors.New("lockpath does not match filepath") + case !fileLock.Locked(): + return errors.New("not locked") + } + // error handling: Count if there are errors while setting the attribs. // if there were any, return an error. var ( diff --git a/pkg/storage/utils/filelocks/filelocks.go b/pkg/storage/utils/filelocks/filelocks.go index f338f881f5..9911f0d6fa 100644 --- a/pkg/storage/utils/filelocks/filelocks.go +++ b/pkg/storage/utils/filelocks/filelocks.go @@ -85,10 +85,6 @@ func acquireLock(file string, write bool) (*flock.Flock, error) { return nil, ErrPathEmpty } - if _, err = os.Stat(file); err != nil { - return nil, err - } - var flock *flock.Flock for i := 1; i <= _lockCyclesValue; i++ { if flock = getMutexedFlock(n); flock != nil { diff --git a/tests/acceptance/expected-failures-on-OCIS-storage.md b/tests/acceptance/expected-failures-on-OCIS-storage.md index d8f28a0a75..2dc0719d14 100644 --- a/tests/acceptance/expected-failures-on-OCIS-storage.md +++ b/tests/acceptance/expected-failures-on-OCIS-storage.md @@ -284,10 +284,6 @@ _requires a [CS3 user provisioning api that can update the quota for a user](htt - [apiWebdavMove2/moveShareOnOcis.feature:169](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavMove2/moveShareOnOcis.feature#L169) - [apiWebdavMove2/moveShareOnOcis.feature:170](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavMove2/moveShareOnOcis.feature#L170) - -#### [restoring an older version of a shared file deletes the share](https://github.com/owncloud/ocis/issues/765) -- [apiShareManagementToShares/acceptShares.feature:579](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiShareManagementToShares/acceptShares.feature#L579) - #### [Expiration date for shares is not implemented](https://github.com/owncloud/ocis/issues/1250) #### Expiration date of user shares - [apiShareCreateSpecialToShares1/createShareExpirationDate.feature:52](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiShareCreateSpecialToShares1/createShareExpirationDate.feature#L52) @@ -788,9 +784,6 @@ moving outside of the Shares folder gives 501 Not Implemented. - [apiWebdavProperties1/copyFile.feature:437](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavProperties1/copyFile.feature#L437) - [apiWebdavProperties1/copyFile.feature:442](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavProperties1/copyFile.feature#L442) -#### [Downloading the older version of shared file gives 404](https://github.com/owncloud/ocis/issues/3868) -- [apiVersions/fileVersionsSharingToShares.feature:306](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionsSharingToShares.feature#L306) - #### [file versions do not report the version author](https://github.com/owncloud/ocis/issues/2914) - [apiVersions/fileVersionAuthor.feature:14](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionAuthor.feature#L14) - [apiVersions/fileVersionAuthor.feature:37](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionAuthor.feature#L37) diff --git a/tests/acceptance/expected-failures-on-S3NG-storage.md b/tests/acceptance/expected-failures-on-S3NG-storage.md index a9efc55de4..0cf8dde14c 100644 --- a/tests/acceptance/expected-failures-on-S3NG-storage.md +++ b/tests/acceptance/expected-failures-on-S3NG-storage.md @@ -9,9 +9,6 @@ Basic file management like up and download, move, copy, properties, quota, trash - [apiTrashbin/trashbinFilesFolders.feature:318](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiTrashbin/trashbinFilesFolders.feature#L318) - [apiTrashbin/trashbinFilesFolders.feature:323](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiTrashbin/trashbinFilesFolders.feature#L323) -#### [Downloading the older version of shared file gives 404](https://github.com/owncloud/ocis/issues/3868) -- [apiVersions/fileVersionsSharingToShares.feature:306](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionsSharingToShares.feature#L306) - #### [file versions do not report the version author](https://github.com/owncloud/ocis/issues/2914) - [apiVersions/fileVersionAuthor.feature:14](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionAuthor.feature#L14) - [apiVersions/fileVersionAuthor.feature:37](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiVersions/fileVersionAuthor.feature#L37) @@ -293,9 +290,6 @@ _requires a [CS3 user provisioning api that can update the quota for a user](htt - [apiWebdavMove2/moveShareOnOcis.feature:169](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavMove2/moveShareOnOcis.feature#L169) - [apiWebdavMove2/moveShareOnOcis.feature:170](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiWebdavMove2/moveShareOnOcis.feature#L170) -#### [restoring an older version of a shared file deletes the share](https://github.com/owncloud/ocis/issues/765) -- [apiShareManagementToShares/acceptShares.feature:579](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiShareManagementToShares/acceptShares.feature#L579) - #### [Expiration date for shares is not implemented](https://github.com/owncloud/ocis/issues/1250) #### Expiration date of user shares - [apiShareCreateSpecialToShares1/createShareExpirationDate.feature:52](https://github.com/owncloud/core/blob/master/tests/acceptance/features/apiShareCreateSpecialToShares1/createShareExpirationDate.feature#L52)