Skip to content

Commit

Permalink
Implement CS3APIs datatx name change PullTransfer -> CreateTransfer (#…
Browse files Browse the repository at this point in the history
…3553)

* Implement CS3APIs datatx message name change.

* Add #PR

* Bindings to make CI pass

* Opaque field ShareId promoted to real message property.

* Temporary reference to upcoming/to-be-merged CS3APIs change.

* Normalize before url parsing if necessary.

* Lint fixes.

* Lint fixes.

* Cleanup lines.

* Revert to new go bindings.

---------

Co-authored-by: Antoon P <antoon@redblom.com>
  • Loading branch information
redblom and antoonp authored Feb 13, 2023
1 parent 860a60e commit c178db1
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 58 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/rclone-tpc-cs3apis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Change: Rename PullTransfer to CreateTransfer

This change implements a CS3APIs name change in the datatx module (PullTransfer to CreateTransfer)

https://github.com/cs3org/reva/pull/3553
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/cheggaaa/pb v1.0.29
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e
github.com/cs3org/go-cs3apis v0.0.0-20230124153659-5dc78d32c9b7
github.com/cs3org/go-cs3apis v0.0.0-20230209081138-33f5a7d81cb2
github.com/dgraph-io/ristretto v0.1.1
github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59
github.com/gdexlab/go-render v1.0.1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJffz4pz0o1WuQxJ28+5x5JgaHD8=
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4=
github.com/cs3org/go-cs3apis v0.0.0-20230124153659-5dc78d32c9b7 h1:QShkOi9aBptnhYN4W0lueiWTlNtc7O69D6GRpYfZodg=
github.com/cs3org/go-cs3apis v0.0.0-20230124153659-5dc78d32c9b7/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/go-cs3apis v0.0.0-20230209081138-33f5a7d81cb2 h1:vJGHFm3lS7LC0XwsSit8ZMqIyE55Op2+X7p1xEH4Vt0=
github.com/cs3org/go-cs3apis v0.0.0-20230209081138-33f5a7d81cb2/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -921,6 +921,8 @@ github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rainycape/memcache v0.0.0-20150622160815-1031fa0ce2f2/go.mod h1:7tZKcyumwBO6qip7RNQ5r77yrssm9bfCowcLEBcU5IA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redblom/go-cs3apis v0.0.0-20230130162347-1c3e4b532eac h1:GsyT76KNkNHftiTKPLY9qRkrU/Nl91G+x9uStQHBVZE=
github.com/redblom/go-cs3apis v0.0.0-20230130162347-1c3e4b532eac/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
33 changes: 16 additions & 17 deletions internal/grpc/services/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
txdriver "github.com/cs3org/reva/pkg/datatx"
txregistry "github.com/cs3org/reva/pkg/datatx/manager/registry"
"github.com/cs3org/reva/pkg/errtypes"
Expand Down Expand Up @@ -72,7 +71,7 @@ type txShare struct {
TxID string
SrcTargetURI string
DestTargetURI string
Opaque *types.Opaque `json:"opaque"`
ShareID string
}

func (c *config) init() {
Expand Down Expand Up @@ -146,7 +145,7 @@ func (s *service) UnprotectedEndpoints() []string {
return []string{}
}

func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) {
func (s *service) CreateTransfer(ctx context.Context, req *datatx.CreateTransferRequest) (*datatx.CreateTransferResponse, error) {
txInfo, startTransferErr := s.txManager.CreateTransfer(ctx, req.SrcTargetUri, req.DestTargetUri)

// we always save the transfer regardless of start transfer outcome
Expand All @@ -155,29 +154,29 @@ func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequ
TxID: txInfo.GetId().OpaqueId,
SrcTargetURI: req.SrcTargetUri,
DestTargetURI: req.DestTargetUri,
Opaque: req.Opaque,
ShareID: req.GetShareId().OpaqueId,
}
s.txShareDriver.Lock()
defer s.txShareDriver.Unlock()

s.txShareDriver.model.TxShares[txInfo.GetId().OpaqueId] = txShare
if err := s.txShareDriver.model.saveTxShare(); err != nil {
err = errors.Wrap(err, "datatx service: error saving transfer share: "+datatx.Status_STATUS_INVALID.String())
return &datatx.PullTransferResponse{
Status: status.NewInvalid(ctx, "error pulling transfer"),
return &datatx.CreateTransferResponse{
Status: status.NewInvalid(ctx, "error creating transfer"),
}, err
}

// now check start transfer outcome
if startTransferErr != nil {
startTransferErr = errors.Wrap(startTransferErr, "datatx service: error starting transfer job")
return &datatx.PullTransferResponse{
Status: status.NewInvalid(ctx, "datatx service: error pulling transfer"),
return &datatx.CreateTransferResponse{
Status: status.NewInvalid(ctx, "datatx service: error creating transfer"),
TxInfo: txInfo,
}, startTransferErr
}

return &datatx.PullTransferResponse{
return &datatx.CreateTransferResponse{
Status: status.NewOK(ctx),
TxInfo: txInfo,
}, nil
Expand All @@ -198,7 +197,7 @@ func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransfer
}, err
}

txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}
txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID}

return &datatx.GetTransferStatusResponse{
Status: status.NewOK(ctx),
Expand All @@ -207,22 +206,22 @@ func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransfer
}

func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransferRequest) (*datatx.CancelTransferResponse, error) {
txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().GetOpaqueId()]
txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().OpaqueId]
if !ok {
return nil, errtypes.InternalError("datatx service: transfer not found")
}

txInfo, err := s.txManager.CancelTransfer(ctx, req.GetTxId().OpaqueId)
if err != nil {
txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}
txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID}
err = errors.Wrap(err, "datatx service: error cancelling transfer")
return &datatx.CancelTransferResponse{
Status: status.NewInternal(ctx, err, "error cancelling transfer"),
TxInfo: txInfo,
}, err
}

txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}
txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID}

return &datatx.CancelTransferResponse{
Status: status.NewOK(ctx),
Expand All @@ -237,15 +236,15 @@ func (s *service) ListTransfers(ctx context.Context, req *datatx.ListTransfersRe
if len(filters) == 0 {
txInfos = append(txInfos, &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: txShare.TxID},
ShareId: &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)},
ShareId: &ocm.ShareId{OpaqueId: txShare.ShareID},
})
} else {
for _, f := range filters {
if f.Type == datatx.ListTransfersRequest_Filter_TYPE_SHARE_ID {
if f.GetShareId().GetOpaqueId() == string(txShare.Opaque.Map["shareId"].Value) {
if f.GetShareId().GetOpaqueId() == txShare.ShareID {
txInfos = append(txInfos, &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: txShare.TxID},
ShareId: &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)},
ShareId: &ocm.ShareId{OpaqueId: txShare.ShareID},
})
}
}
Expand Down Expand Up @@ -274,7 +273,7 @@ func (s *service) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRe
}, err
}

txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}
txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID}

return &datatx.RetryTransferResponse{
Status: status.NewOK(ctx),
Expand Down
8 changes: 4 additions & 4 deletions internal/grpc/services/gateway/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ import (
"github.com/pkg/errors"
)

func (s *svc) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) {
func (s *svc) CreateTransfer(ctx context.Context, req *datatx.CreateTransferRequest) (*datatx.CreateTransferResponse, error) {
c, err := pool.GetDataTxClient(pool.Endpoint(s.c.DataTxEndpoint))
if err != nil {
err = errors.Wrap(err, "gateway: error calling GetDataTxClient")
return &datatx.PullTransferResponse{
return &datatx.CreateTransferResponse{
Status: status.NewInternal(ctx, err, "error getting data transfer client"),
}, nil
}

res, err := c.PullTransfer(ctx, req)
res, err := c.CreateTransfer(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling PullTransfer")
return nil, errors.Wrap(err, "gateway: error calling CreateTransfer")
}

return res, nil
Expand Down
58 changes: 32 additions & 26 deletions internal/grpc/services/gateway/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
ctxpkg "github.com/cs3org/reva/pkg/ctx"
"github.com/cs3org/reva/pkg/errtypes"
Expand Down Expand Up @@ -285,25 +284,31 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
var srcEndpointScheme string
for _, s := range meshProvider.ProviderInfo.Services {
if strings.ToLower(s.Endpoint.Type.Name) == "webdav" {
endpointURL, err := url.Parse(s.Endpoint.Path)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + s.Endpoint.Path)
srcWebdavEndpointURL, err := url.Parse(s.Endpoint.Path)
if err != nil || srcWebdavEndpointURL.Host == "" {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint \"" + s.Endpoint.Path + "\" into URL structure")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
urlServiceHostFull, err := url.Parse(s.Host)
var srcWebdavHostURLString string
if strings.Contains(s.Host, "://") {
srcWebdavHostURLString = s.Host
} else {
srcWebdavHostURLString = "http://" + s.Host
}
srcWebdavHostURL, err := url.Parse(srcWebdavHostURLString)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host " + s.Host)
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host \"" + s.Host + "\" into URL structure")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
srcServiceHost = urlServiceHostFull.Host + urlServiceHostFull.Path
srcServiceHost = srcWebdavHostURL.Host + srcWebdavHostURL.Path
// optional prefix must only appear in target url path:
// http://...token...@reva.eu/prefix/?name=remote.php/webdav/home/...
srcEndpointPath = strings.TrimPrefix(endpointURL.Path, urlServiceHostFull.Path)
srcEndpointScheme = endpointURL.Scheme
srcEndpointPath = strings.TrimPrefix(srcWebdavEndpointURL.Path, srcWebdavHostURL.Path)
srcEndpointScheme = srcWebdavEndpointURL.Scheme
break
}
}
Expand Down Expand Up @@ -345,7 +350,7 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
}
destWebdavEndpointURL, err := url.Parse(destWebdavEndpoint)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + destWebdavEndpoint)
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint \"" + destWebdavEndpoint + "\" into URL structure")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
Expand All @@ -357,17 +362,23 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
urlServiceHostFull, err := url.Parse(destWebdavHost)
var dstWebdavURLString string
if strings.Contains(destWebdavHost, "://") {
dstWebdavURLString = destWebdavHost
} else {
dstWebdavURLString = "http://" + destWebdavHost
}
dstWebdavHostURL, err := url.Parse(dstWebdavURLString)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host " + destWebdavHost)
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host \"" + dstWebdavURLString + "\" into URL structure")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
destServiceHost := urlServiceHostFull.Host + urlServiceHostFull.Path
destServiceHost := dstWebdavHostURL.Host + dstWebdavHostURL.Path
// optional prefix must only appear in target url path:
// http://...token...@reva.eu/prefix/?name=remote.php/webdav/home/...
destEndpointPath := strings.TrimPrefix(destWebdavEndpointURL.Path, urlServiceHostFull.Path)
destEndpointPath := strings.TrimPrefix(destWebdavEndpointURL.Path, dstWebdavHostURL.Path)
destEndpointScheme := destWebdavEndpointURL.Scheme
destToken := ctxpkg.ContextMustGetToken(ctx)
homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{})
Expand All @@ -380,30 +391,25 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
destPath := path.Join(destEndpointPath, homeRes.Path, s.c.DataTransfersFolder, path.Base(share.GetShare().Name))
destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destServiceHost, destPath)

opaqueObj := &types.Opaque{
Map: map[string]*types.OpaqueEntry{
"shareId": {
Decoder: "plain",
Value: []byte(share.GetShare().GetId().OpaqueId),
},
},
shareID := &ocm.ShareId{
OpaqueId: share.GetShare().GetId().OpaqueId,
}
req := &datatx.PullTransferRequest{
req := &datatx.CreateTransferRequest{
SrcTargetUri: srcTargetURI,
DestTargetUri: destTargetURI,
Opaque: opaqueObj,
ShareId: shareID,
}
res, err := s.PullTransfer(ctx, req)
res, err := s.CreateTransfer(ctx, req)
if err != nil {
log.Err(err).Msg("gateway: error calling PullTransfer")
log.Err(err).Msg("gateway: error calling CreateTransfer")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_INTERNAL,
},
}, err
}

log.Info().Msgf("gateway: PullTransfer: %v", res.TxInfo)
log.Info().Msgf("gateway: CreateTransfer: %v", res.TxInfo)

// do not create an OCM reference, just return
return &ocm.UpdateReceivedOCMShareResponse{
Expand Down
9 changes: 1 addition & 8 deletions pkg/datatx/manager/rclone/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,6 @@ func (m *transferModel) saveTransfer(e error) error {
// CreateTransfer creates a transfer job and returns a TxInfo object that includes a unique transfer id.
// Specified target URIs are of form scheme://userinfo@host:port?name={path}
func (driver *rclone) CreateTransfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) {
logger := appctx.GetLogger(ctx)

srcEp, err := driver.extractEndpointInfo(ctx, srcTargetURI)
if err != nil {
return nil, err
Expand All @@ -238,9 +236,6 @@ func (driver *rclone) CreateTransfer(ctx context.Context, srcTargetURI string, d
// we always set the userinfo part of the destination url for rclone tpc push support
dstRemote := fmt.Sprintf("%s://%s@%s", destEp.endpointScheme, dstToken, destEp.endpoint)

logger.Debug().Msgf("destination target URI: %v", dstTargetURI)
logger.Debug().Msgf("destination remote: %v", dstRemote)

return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken)
}

Expand Down Expand Up @@ -308,10 +303,8 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote

type rcloneAsyncReqJSON struct {
SrcFs string `json:"srcFs"`
// SrcToken string `json:"srcToken"`
DstFs string `json:"dstFs"`
// DstToken string `json:"destToken"`
Async bool `json:"_async"`
Async bool `json:"_async"`
}
// bearer is the default authentication scheme for reva
srcAuthHeader := fmt.Sprintf("bearer_token=\"%v\"", srcToken)
Expand Down

0 comments on commit c178db1

Please sign in to comment.