Skip to content

Commit

Permalink
trace upload progress (#4067)
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic authored Jul 14, 2023
1 parent 7ee83f2 commit 3a8f830
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 7 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/trace-upload-progress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Enhancement: trace upload progress

https://github.com/cs3org/reva/pull/4067
10 changes: 7 additions & 3 deletions pkg/storage/utils/decomposedfs/upload/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,11 @@ func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot stri

// CreateNodeForUpload will create the target node for the Upload
func CreateNodeForUpload(upload *Upload, initAttrs node.Attributes) (*node.Node, error) {
ctx, span := tracer.Start(upload.Ctx, "CreateNodeForUpload")
defer span.End()
_, subspan := tracer.Start(ctx, "os.Stat")
fi, err := os.Stat(upload.binPath)
subspan.End()
if err != nil {
return nil, err
}
Expand All @@ -258,13 +262,13 @@ func CreateNodeForUpload(upload *Upload, initAttrs node.Attributes) (*node.Node,
nil,
upload.lu,
)
n.SpaceRoot, err = node.ReadNode(upload.Ctx, upload.lu, spaceID, spaceID, false, nil, false)
n.SpaceRoot, err = node.ReadNode(ctx, upload.lu, spaceID, spaceID, false, nil, false)
if err != nil {
return nil, err
}

// check lock
if err := n.CheckLock(upload.Ctx); err != nil {
if err := n.CheckLock(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -293,7 +297,7 @@ func CreateNodeForUpload(upload *Upload, initAttrs node.Attributes) (*node.Node,
initAttrs.SetString(prefixes.StatusPrefix, node.ProcessingStatus+upload.Info.ID)

// update node metadata with new blobid etc
err = n.SetXattrsWithContext(context.TODO(), initAttrs, false)
err = n.SetXattrsWithContext(ctx, initAttrs, false)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: could not write metadata")
}
Expand Down
38 changes: 34 additions & 4 deletions pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,16 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog"
tusd "github.com/tus/tusd/pkg/handler"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

var tracer trace.Tracer

func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/upload")
}

// Tree is used to manage a tree hierarchy
type Tree interface {
Setup() error
Expand Down Expand Up @@ -125,19 +133,25 @@ func buildUpload(ctx context.Context, info tusd.FileInfo, binPath string, infoPa

// Cleanup cleans the upload
func Cleanup(upload *Upload, failure bool, keepUpload bool) {
ctx, span := tracer.Start(upload.Ctx, "Cleanup")
defer span.End()
upload.cleanup(failure, !keepUpload, !keepUpload)

// unset processing status
if upload.Node != nil { // node can be nil when there was an error before it was created (eg. checksum-mismatch)
if err := upload.Node.UnmarkProcessing(upload.Ctx, upload.Info.ID); err != nil {
if err := upload.Node.UnmarkProcessing(ctx, upload.Info.ID); err != nil {
upload.log.Info().Str("path", upload.Node.InternalPath()).Err(err).Msg("unmarking processing failed")
}
}
}

// WriteChunk writes the stream from the reader to the given offset of the upload
func (upload *Upload) WriteChunk(_ context.Context, offset int64, src io.Reader) (int64, error) {
ctx, span := tracer.Start(upload.Ctx, "WriteChunk")
defer span.End()
_, subspan := tracer.Start(ctx, "os.OpenFile")
file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm)
subspan.End()
if err != nil {
return 0, err
}
Expand All @@ -147,7 +161,9 @@ func (upload *Upload) WriteChunk(_ context.Context, offset int64, src io.Reader)
// TODO but how do we get the `Upload-Checksum`? WriteChunk() only has a context, offset and the reader ...
// It is sent with the PATCH request, well or in the POST when the creation-with-upload extension is used
// but the tus handler uses a context.Background() so we cannot really check the header and put it in the context ...
_, subspan = tracer.Start(ctx, "io.Copy")
n, err := io.Copy(file, src)
subspan.End()

// If the HTTP PATCH request gets interrupted in the middle (e.g. because
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
Expand All @@ -168,11 +184,15 @@ func (upload *Upload) GetInfo(_ context.Context) (tusd.FileInfo, error) {

// GetReader returns an io.Reader for the upload
func (upload *Upload) GetReader(_ context.Context) (io.Reader, error) {
_, span := tracer.Start(upload.Ctx, "GetReader")
defer span.End()
return os.Open(upload.binPath)
}

// FinishUpload finishes an upload and moves the file to the internal destination
func (upload *Upload) FinishUpload(_ context.Context) error {
ctx, span := tracer.Start(upload.Ctx, "FinishUpload")
defer span.End()
// set lockID to context
if upload.Info.MetaData["lockid"] != "" {
upload.Ctx = ctxpkg.ContextSetLockID(upload.Ctx, upload.Info.MetaData["lockid"])
Expand All @@ -188,7 +208,9 @@ func (upload *Upload) FinishUpload(_ context.Context) error {
md5h := md5.New()
adler32h := adler32.New()
{
_, subspan := tracer.Start(ctx, "os.Open")
f, err := os.Open(upload.binPath)
subspan.End()
if err != nil {
// we can continue if no oc checksum header is set
log.Info().Err(err).Str("binPath", upload.binPath).Msg("error opening binPath")
Expand All @@ -198,7 +220,9 @@ func (upload *Upload) FinishUpload(_ context.Context) error {
r1 := io.TeeReader(f, sha1h)
r2 := io.TeeReader(r1, md5h)

_, subspan = tracer.Start(ctx, "io.Copy")
_, err = io.Copy(adler32h, r2)
subspan.End()
if err != nil {
log.Info().Err(err).Msg("error copying checksums")
}
Expand Down Expand Up @@ -315,6 +339,8 @@ func (upload *Upload) ConcatUploads(_ context.Context, uploads []tusd.Upload) (e

// writeInfo updates the entire information. Everything will be overwritten.
func (upload *Upload) writeInfo() error {
_, span := tracer.Start(upload.Ctx, "writeInfo")
defer span.End()
data, err := json.Marshal(upload.Info)
if err != nil {
return err
Expand All @@ -324,19 +350,23 @@ func (upload *Upload) writeInfo() error {

// Finalize finalizes the upload (eg moves the file to the internal destination)
func (upload *Upload) Finalize() (err error) {
ctx, span := tracer.Start(upload.Ctx, "Finalize")
defer span.End()
n := upload.Node
if n == nil {
var err error
n, err = node.ReadNode(upload.Ctx, upload.lu, upload.Info.Storage["SpaceRoot"], upload.Info.Storage["NodeId"], false, nil, false)
n, err = node.ReadNode(ctx, upload.lu, upload.Info.Storage["SpaceRoot"], upload.Info.Storage["NodeId"], false, nil, false)
if err != nil {
return err
}
upload.Node = n
}

// upload the data to the blobstore

if err := upload.tp.WriteBlob(n, upload.binPath); err != nil {
_, subspan := tracer.Start(ctx, "WriteBlob")
err = upload.tp.WriteBlob(n, upload.binPath)
subspan.End()
if err != nil {
return errors.Wrap(err, "failed to upload file to blostore")
}

Expand Down

0 comments on commit 3a8f830

Please sign in to comment.