From 204e3ba75e6a6d0bbf8afcaab8f5191cca72c1db Mon Sep 17 00:00:00 2001 From: Antoon P Date: Thu, 4 May 2023 18:28:29 +0200 Subject: [PATCH] Data transfers new ocm impl (#3847) * * remove unnecessary creation of ocm reference in ocmshareprovider when accepting an ocm share * set transfer protocol when creating transfer type ocm share * refactor transfer endpoints for new ocm impl * refactor/cleanup gateway.ocmshareprovider.UpdateReceivedOCMShare() code * refactor data transfers folder config * new transfers config setting 'remove_on_cancel' * implement transfer destination path * update datatx example toml * update cli ocm-share-update-received with path flag * Add changelog * Add #PR --------- Co-authored-by: Antoon P --- changelog/unreleased/datatx-new-ocm-impl.md | 4 + cmd/reva/ocm-share-update-received.go | 28 ++ examples/datatx/datatx.toml | 86 ++-- internal/grpc/services/datatx/datatx.go | 25 +- internal/grpc/services/gateway/gateway.go | 4 - .../grpc/services/gateway/ocmshareprovider.go | 391 +++++++----------- .../ocmshareprovider/ocmshareprovider.go | 10 +- pkg/datatx/manager/rclone/rclone.go | 7 +- 8 files changed, 260 insertions(+), 295 deletions(-) create mode 100644 changelog/unreleased/datatx-new-ocm-impl.md diff --git a/changelog/unreleased/datatx-new-ocm-impl.md b/changelog/unreleased/datatx-new-ocm-impl.md new file mode 100644 index 00000000000..2ffad43d88f --- /dev/null +++ b/changelog/unreleased/datatx-new-ocm-impl.md @@ -0,0 +1,4 @@ +Enhancement: Update data transfers for current OCM shares implementation + +https://github.com/cs3org/reva/pull/3847 +https://github.com/cs3org/reva/issues/3846 \ No newline at end of file diff --git a/cmd/reva/ocm-share-update-received.go b/cmd/reva/ocm-share-update-received.go index 2006c721d3d..92b5b2d80c0 100644 --- a/cmd/reva/ocm-share-update-received.go +++ b/cmd/reva/ocm-share-update-received.go @@ -24,6 +24,7 @@ import ( rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" + typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/pkg/errors" "google.golang.org/protobuf/types/known/fieldmaskpb" ) @@ -33,9 +34,11 @@ func ocmShareUpdateReceivedCommand() *command { cmd.Description = func() string { return "update a received OCM share" } cmd.Usage = func() string { return "Usage: ocm-share-update-received [-flags] " } state := cmd.String("state", "pending", "the state of the share (pending, accepted or rejected)") + path := cmd.String("path", "", "the destination path of the data transfer (ignored if this is not a transfer type share)") cmd.ResetFlags = func() { *state = "pending" + *path = "" } cmd.Action = func(w ...io.Writer) error { @@ -75,9 +78,25 @@ func ocmShareUpdateReceivedCommand() *command { } shareRes.Share.State = shareState + // check if we are dealing with a transfer in case the destination path needs to be set + _, ok := getTransferProtocol(shareRes.Share) + var opaque *typesv1beta1.Opaque + if ok { + // transfer_destination_path is not part of TransferProtocol and is specified as an opaque field + opaque = &typesv1beta1.Opaque{ + Map: map[string]*typesv1beta1.OpaqueEntry{ + "transfer_destination_path": { + Decoder: "plain", + Value: []byte(*path), + }, + }, + } + } + shareRequest := &ocm.UpdateReceivedOCMShareRequest{ Share: shareRes.Share, UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"state"}}, + Opaque: opaque, } updateRes, err := shareClient.UpdateReceivedOCMShare(ctx, shareRequest) @@ -95,6 +114,15 @@ func ocmShareUpdateReceivedCommand() *command { return cmd } +func getTransferProtocol(share *ocm.ReceivedShare) (*ocm.TransferProtocol, bool) { + for _, p := range share.Protocols { + if d, ok := p.Term.(*ocm.Protocol_TransferOptions); ok { + return d.TransferOptions, true + } + } + return nil, false +} + func getOCMShareState(state string) ocm.ShareState { switch state { case "pending": diff --git a/examples/datatx/datatx.toml b/examples/datatx/datatx.toml index f1271b34839..bb19821f9e9 100644 --- a/examples/datatx/datatx.toml +++ b/examples/datatx/datatx.toml @@ -1,39 +1,47 @@ -# Example data transfer service configuration -[grpc.services.datatx] -# Rclone is the default data transfer driver -txdriver = "rclone" -# The shares,transfers db file (default: /var/tmp/reva/datatx-shares.json) -tx_shares_file = "" -# Base folder of the data transfers (default: /home/DataTransfers) -data_transfers_folder = "" - -# Rclone data transfer driver -[grpc.services.datatx.txdrivers.rclone] -# Rclone endpoint -endpoint = "http://..." -# Basic auth is used -auth_user = "...rcloneuser" -auth_pass = "...rcloneusersecret" -# The authentication scheme to use in the src and dest requests by rclone (follows the endpoints' authentication methods) -# Valid values: -# "bearer" (default) will result in rclone using request header: Authorization: "Bearer ...token..." -# "x-access-token" will result in rclone using request header: X-Access-Token: "...token..." -# If not set "bearer" is assumed -auth_header = "x-access-token" -# The transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json) -file = "" -# Check status job interval in milliseconds -job_status_check_interval = 2000 -# The job timeout in milliseconds (must be long enough for big transfers!) -job_timeout = 120000 - -[http.services.ocdav] -# Rclone supports third-party copy push; for that to work with reva enable this setting -enable_http_tpc = true -# The authentication scheme reva uses for the tpc push call (the call to Destination). -# Follows the destination endpoint authentication method. -# Valid values: -# "bearer" (default) will result in header: Authorization: "Bearer ...token..." -# "x-access-token" will result in header: X-Access-Token: "...token..." -# If not set "bearer" is assumed -http_tpc_push_auth_header = "x-access-token" +# all relevant settings for data transfers + +[grpc.services.gateway] +datatx = "localhost:19000" +# base folder of the data transfers (eg. /home/DataTransfers) +data_transfers_folder = "" + + +[grpc.services.datatx] +# rclone is currently the only data transfer driver implementation +txdriver = "rclone" +# the shares,transfers db file (default: /var/tmp/reva/datatx-shares.json) +tx_shares_file = "" +# base folder of the data transfers (eg. /home/DataTransfers) +data_transfers_folder = "" + +# rclone driver +[grpc.services.datatx.txdrivers.rclone] +# rclone endpoint +endpoint = "http://..." +# Basic auth is used for authenticating with rclone +auth_user = "{rclone user}" +auth_pass = "{rclone user secret}" +# The authentication scheme to use in the src and dest requests by rclone (follows the endpoints' authentication methods) +# Valid values: +# "bearer" (default) will result in rclone using request header: Authorization: "Bearer ...token..." +# "x-access-token" will result in rclone using request header: X-Access-Token: "...token..." +# If not set "bearer" is assumed +auth_header = "x-access-token" +# the transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json) +file = "" +# check status job interval in milliseconds +job_status_check_interval = 2000 +# the job timeout in milliseconds (must be long enough for big transfers!) +job_timeout = 120000 + +[http.services.ocdav] +# reva supports http third party copy +enable_http_tpc = true +# with rclone reva only supports http tpc push (ie. with the destination header specified) +# The authentication scheme reva uses for the tpc push call (the call to Destination). +# Follows the destination endpoint authentication method. +# Valid values: +# "bearer" (default) will result in header: Authorization: "Bearer ...token..." +# "x-access-token" will result in header: X-Access-Token: "...token..." +# If not set "bearer" is assumed +http_tpc_push_auth_header = "x-access-token" diff --git a/internal/grpc/services/datatx/datatx.go b/internal/grpc/services/datatx/datatx.go index c2a67aee6d0..b3ac0b43983 100644 --- a/internal/grpc/services/datatx/datatx.go +++ b/internal/grpc/services/datatx/datatx.go @@ -46,10 +46,10 @@ type config struct { TxDriver string `mapstructure:"txdriver"` TxDrivers map[string]map[string]interface{} `mapstructure:"txdrivers"` // storage driver to persist share/transfer relation - StorageDriver string `mapstructure:"storage_driver"` - StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"` - TxSharesFile string `mapstructure:"tx_shares_file"` - DataTransfersFolder string `mapstructure:"data_transfers_folder"` + StorageDriver string `mapstructure:"storage_driver"` + StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"` + TxSharesFile string `mapstructure:"tx_shares_file"` + RemoveOnCancel bool `mapstructure:"remove_on_cancel"` } type service struct { @@ -81,9 +81,6 @@ func (c *config) init() { if c.TxSharesFile == "" { c.TxSharesFile = "/var/tmp/reva/datatx-shares.json" } - if c.DataTransfersFolder == "" { - c.DataTransfersFolder = "/home/DataTransfers" - } } func (s *service) Register(ss *grpc.Server) { @@ -211,10 +208,22 @@ func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransfer return nil, errtypes.InternalError("datatx service: transfer not found") } + transferRemovedMessage := "" + if s.conf.RemoveOnCancel { + delete(s.txShareDriver.model.TxShares, req.TxId.GetOpaqueId()) + if err := s.txShareDriver.model.saveTxShare(); err != nil { + err = errors.Wrap(err, "datatx service: error deleting transfer: "+datatx.Status_STATUS_INVALID.String()) + return &datatx.CancelTransferResponse{ + Status: status.NewInvalid(ctx, "error cancelling transfer"), + }, err + } + transferRemovedMessage = "transfer successfully removed" + } + txInfo, err := s.txManager.CancelTransfer(ctx, req.GetTxId().OpaqueId) if err != nil { txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID} - err = errors.Wrap(err, "datatx service: error cancelling transfer") + err = errors.Wrapf(err, "(%v) datatx service: error cancelling transfer", transferRemovedMessage) return &datatx.CancelTransferResponse{ Status: status.NewInternal(ctx, err, "error cancelling transfer"), TxInfo: txInfo, diff --git a/internal/grpc/services/gateway/gateway.go b/internal/grpc/services/gateway/gateway.go index 9e812aafe19..72776e5fb57 100644 --- a/internal/grpc/services/gateway/gateway.go +++ b/internal/grpc/services/gateway/gateway.go @@ -81,10 +81,6 @@ func (c *config) init() { c.ShareFolder = strings.Trim(c.ShareFolder, "/") - if c.DataTransfersFolder == "" { - c.DataTransfersFolder = "DataTransfers" - } - if c.TokenManager == "" { c.TokenManager = "jwt" } diff --git a/internal/grpc/services/gateway/ocmshareprovider.go b/internal/grpc/services/gateway/ocmshareprovider.go index 28019e786c7..ddcadd44d32 100644 --- a/internal/grpc/services/gateway/ocmshareprovider.go +++ b/internal/grpc/services/gateway/ocmshareprovider.go @@ -25,7 +25,6 @@ import ( "path" "strings" - ocmprovider "github.com/cs3org/go-cs3apis/cs3/ocm/provider/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -171,6 +170,36 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive }, nil } + // retrieve the persisted received share + getShareReq := &ocm.GetReceivedOCMShareRequest{ + Ref: &ocm.ShareReference{ + Spec: &ocm.ShareReference_Id{ + Id: req.Share.Id, + }, + }, + } + getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq) + if err != nil { + log.Err(err).Msg("gateway: error calling GetReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, + }, nil + } + if getShareRes.Status.Code != rpc.Code_CODE_OK { + log.Error().Msg("gateway: error calling GetReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, + }, nil + } + share := getShareRes.Share + if share == nil { + panic("gateway: error updating a received share: the share is nil") + } + res, err := c.UpdateReceivedOCMShare(ctx, req) if err != nil { log.Err(err).Msg("gateway: error calling UpdateReceivedShare") @@ -181,200 +210,16 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive }, nil } - // properties are updated in the order they appear in the field mask - // when an error occurs the request ends and no further fields are updated for i := range req.UpdateMask.Paths { switch req.UpdateMask.Paths[i] { case "state": switch req.GetShare().GetState() { case ocm.ShareState_SHARE_STATE_ACCEPTED: - getShareReq := &ocm.GetReceivedOCMShareRequest{ - Ref: &ocm.ShareReference{ - Spec: &ocm.ShareReference_Id{ - Id: req.Share.Id, - }, - }, - } - getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq) - if err != nil { - log.Err(err).Msg("gateway: error calling GetReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{ - Code: rpc.Code_CODE_INTERNAL, - }, - }, nil - } - - if getShareRes.Status.Code != rpc.Code_CODE_OK { - log.Error().Msg("gateway: error calling GetReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{ - Code: rpc.Code_CODE_INTERNAL, - }, - }, nil - } - - share := getShareRes.Share - if share == nil { - panic("gateway: error updating a received share: the share is nil") - } - - if isTransferShare(share) { - srcIdp := share.GetOwner().GetIdp() - meshProvider, err := s.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{ - Domain: srcIdp, - }) - if err != nil { - log.Err(err).Msg("gateway: error calling GetInfoByDomain") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - var srcServiceHost string - var srcEndpointPath string - // target URI scheme will be the webdav endpoint scheme - var srcEndpointScheme string - for _, s := range meshProvider.ProviderInfo.Services { - if strings.ToLower(s.Endpoint.Type.Name) == "webdav" { - srcWebdavEndpointURL, err := url.Parse(s.Endpoint.Path) - if err != nil || srcWebdavEndpointURL.Host == "" { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint \"" + s.Endpoint.Path + "\" into URL structure") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - var srcWebdavHostURLString string - if strings.Contains(s.Host, "://") { - srcWebdavHostURLString = s.Host - } else { - srcWebdavHostURLString = "http://" + s.Host - } - srcWebdavHostURL, err := url.Parse(srcWebdavHostURLString) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host \"" + s.Host + "\" into URL structure") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - srcServiceHost = srcWebdavHostURL.Host + srcWebdavHostURL.Path - // optional prefix must only appear in target url path: - // http://...token...@reva.eu/prefix/?name=remote.php/webdav/home/... - srcEndpointPath = strings.TrimPrefix(srcWebdavEndpointURL.Path, srcWebdavHostURL.Path) - srcEndpointScheme = srcWebdavEndpointURL.Scheme - break - } - } - - var srcToken string - srcTokenOpaque, ok := share.Grantee.Opaque.Map["token"] - if !ok { - return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewNotFound(ctx, "token not found"), - }, nil - } - switch srcTokenOpaque.Decoder { - case "plain": - srcToken = string(srcTokenOpaque.Value) - default: - err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder) - return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewInternal(ctx, err, "error updating received share"), - }, nil - } - - srcPath := path.Join(srcEndpointPath, share.Name) - srcTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", srcEndpointScheme, srcToken, srcServiceHost, srcPath) - - // get the webdav endpoint of the grantee's idp - var granteeIdp string - if share.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER { - granteeIdp = share.GetGrantee().GetUserId().Idp - } - if share.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP { - granteeIdp = share.GetGrantee().GetGroupId().Idp - } - destWebdavEndpoint, err := s.getWebdavEndpoint(ctx, granteeIdp) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - destWebdavEndpointURL, err := url.Parse(destWebdavEndpoint) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint \"" + destWebdavEndpoint + "\" into URL structure") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - destWebdavHost, err := s.getWebdavHost(ctx, granteeIdp) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - var dstWebdavURLString string - if strings.Contains(destWebdavHost, "://") { - dstWebdavURLString = destWebdavHost - } else { - dstWebdavURLString = "http://" + destWebdavHost - } - dstWebdavHostURL, err := url.Parse(dstWebdavURLString) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host \"" + dstWebdavURLString + "\" into URL structure") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - destServiceHost := dstWebdavHostURL.Host + dstWebdavHostURL.Path - // optional prefix must only appear in target url path: - // http://...token...@reva.eu/prefix/?name=remote.php/webdav/home/... - destEndpointPath := strings.TrimPrefix(destWebdavEndpointURL.Path, dstWebdavHostURL.Path) - destEndpointScheme := destWebdavEndpointURL.Scheme - destToken := ctxpkg.ContextMustGetToken(ctx) - homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{}) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - destPath := path.Join(destEndpointPath, homeRes.Path, s.c.DataTransfersFolder, path.Base(share.Name)) - destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destServiceHost, destPath) - - shareID := &ocm.ShareId{ - OpaqueId: share.GetId().OpaqueId, - } - req := &datatx.CreateTransferRequest{ - SrcTargetUri: srcTargetURI, - DestTargetUri: destTargetURI, - ShareId: shareID, - } - res, err := s.CreateTransfer(ctx, req) - if err != nil { - log.Err(err).Msg("gateway: error calling CreateTransfer") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{ - Code: rpc.Code_CODE_INTERNAL, - }, - }, err - } - - log.Info().Msgf("gateway: CreateTransfer: %v", res.TxInfo) - - // do not create an OCM reference, just return - return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewOK(ctx), - }, nil - } - - createRefStatus, err := s.createOCMReference(ctx, share) - return &ocm.UpdateReceivedOCMShareResponse{ - Status: createRefStatus, - }, err + // for a transfer this is handled elsewhere + case ocm.ShareState_SHARE_STATE_PENDING: + // currently no consequences case ocm.ShareState_SHARE_STATE_REJECTED: - s.removeReference(ctx, req.GetShare().ResourceId) // error is logged inside removeReference + s.removeReference(ctx, share.ResourceId) // error is logged inside removeReference // FIXME we are ignoring an error from removeReference here return res, nil } @@ -388,14 +233,126 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive return nil, errtypes.NotSupported("updating " + req.UpdateMask.Paths[i] + " is not supported") } } + // handle transfer in case it has not already been accepted + if s.isTransferShare(share) && req.GetShare().State == ocm.ShareState_SHARE_STATE_ACCEPTED && share.State != ocm.ShareState_SHARE_STATE_ACCEPTED { + // get provided destination path + transferDestinationPath, err := s.getTransferDestinationPath(ctx, req) + if err != nil { + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, + }, err + } + } + + error := s.handleTransfer(ctx, share, transferDestinationPath) + if error != nil { + log.Err(error).Msg("gateway: error handling transfer in UpdateReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, + }, error + } + } return res, nil } -func isTransferShare(s *ocm.ReceivedShare) bool { - _, ok := getTransferProtocol(s) +func (s *svc) handleTransfer(ctx context.Context, share *ocm.ReceivedShare, transferDestinationPath string) error { + log := appctx.GetLogger(ctx) + + protocol, ok := s.getTransferProtocol(share) + if !ok { + return errors.New("gateway: unable to retrieve transfer protocol") + } + sourceURI := protocol.SourceUri + + // get the webdav endpoint of the grantee's idp + var granteeIdp string + if share.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER { + granteeIdp = share.GetGrantee().GetUserId().Idp + } + if share.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP { + granteeIdp = share.GetGrantee().GetGroupId().Idp + } + destWebdavEndpoint, err := s.getWebdavEndpoint(ctx, granteeIdp) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare") + return err + } + destWebdavEndpointURL, err := url.Parse(destWebdavEndpoint) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint \"" + destWebdavEndpoint + "\" into URL structure") + return err + } + destWebdavHost, err := s.getWebdavHost(ctx, granteeIdp) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare") + return err + } + var dstWebdavURLString string + if strings.Contains(destWebdavHost, "://") { + dstWebdavURLString = destWebdavHost + } else { + dstWebdavURLString = "http://" + destWebdavHost + } + dstWebdavHostURL, err := url.Parse(dstWebdavURLString) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host \"" + dstWebdavURLString + "\" into URL structure") + return err + } + destServiceHost := dstWebdavHostURL.Host + dstWebdavHostURL.Path + // optional prefix must only appear in target url path: + // http://...token...@reva.eu/prefix/?name=remote.php/webdav/home/... + destEndpointPath := strings.TrimPrefix(destWebdavEndpointURL.Path, dstWebdavHostURL.Path) + destEndpointScheme := destWebdavEndpointURL.Scheme + destToken := ctxpkg.ContextMustGetToken(ctx) + destPath := path.Join(destEndpointPath, transferDestinationPath, path.Base(share.Name)) + destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destServiceHost, destPath) + // var destUri string + req := &datatx.CreateTransferRequest{ + SrcTargetUri: sourceURI, + DestTargetUri: destTargetURI, + ShareId: share.Id, + } + + res, err := s.CreateTransfer(ctx, req) + if err != nil { + return err + } + log.Info().Msgf("gateway: CreateTransfer: %v", res.TxInfo) + return nil +} + +func (s *svc) isTransferShare(share *ocm.ReceivedShare) bool { + _, ok := s.getTransferProtocol(share) return ok } +func (s *svc) getTransferDestinationPath(ctx context.Context, req *ocm.UpdateReceivedOCMShareRequest) (string, error) { + log := appctx.GetLogger(ctx) + // the destination path is not part of any protocol, but an opaque field + destPathOpaque, ok := req.GetOpaque().GetMap()["transfer_destination_path"] + if ok { + switch destPathOpaque.Decoder { + case "plain": + if string(destPathOpaque.Value) != "" { + return string(destPathOpaque.Value), nil + } + default: + return "", errtypes.NotSupported("decoder of opaque entry 'transfer_destination_path' not recognized: " + destPathOpaque.Decoder) + } + } + log.Info().Msg("destination path not provided, trying default transfer destination folder") + if s.c.DataTransfersFolder == "" { + return "", errtypes.NotSupported("no destination path provided and default transfer destination folder is not set") + } + return s.c.DataTransfersFolder, nil +} + func (s *svc) GetReceivedOCMShare(ctx context.Context, req *ocm.GetReceivedOCMShareRequest) (*ocm.GetReceivedOCMShareResponse, error) { c, err := pool.GetOCMShareProviderClient(pool.Endpoint(s.c.OCMShareProviderEndpoint)) if err != nil { @@ -413,7 +370,7 @@ func (s *svc) GetReceivedOCMShare(ctx context.Context, req *ocm.GetReceivedOCMSh return res, nil } -func getTransferProtocol(share *ocm.ReceivedShare) (*ocm.TransferProtocol, bool) { +func (s *svc) getTransferProtocol(share *ocm.ReceivedShare) (*ocm.TransferProtocol, bool) { for _, p := range share.Protocols { if d, ok := p.Term.(*ocm.Protocol_TransferOptions); ok { return d.TransferOptions, true @@ -421,53 +378,3 @@ func getTransferProtocol(share *ocm.ReceivedShare) (*ocm.TransferProtocol, bool) } return nil, false } - -func (s *svc) createOCMReference(ctx context.Context, share *ocm.ReceivedShare) (*rpc.Status, error) { - log := appctx.GetLogger(ctx) - - d, _ := getTransferProtocol(share) - - homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{}) - if err != nil { - err := errors.Wrap(err, "gateway: error calling GetHome") - return status.NewInternal(ctx, err, "error updating received share"), nil - } - - var refPath, targetURI string - // reference path is the home path + some name on the corresponding - // mesh provider (/home/MyShares/x) - // It is the responsibility of the gateway to resolve these references and merge the response back - // from the main request. - refPath = path.Join(homeRes.Path, s.c.ShareFolder, path.Base(share.Name)) - // webdav is the scheme, token@host the opaque part and the share name the query of the URL. - targetURI = fmt.Sprintf("webdav://%s@%s?name=%s", d.SharedSecret, share.Creator.Idp, share.Name) - - log.Info().Msg("mount path will be:" + refPath) - createRefReq := &provider.CreateReferenceRequest{ - Ref: &provider.Reference{Path: refPath}, - TargetUri: targetURI, - } - - c, err := s.findByPath(ctx, refPath) - if err != nil { - if _, ok := err.(errtypes.IsNotFound); ok { - return status.NewNotFound(ctx, "storage provider not found"), nil - } - return status.NewInternal(ctx, err, "error finding storage provider"), nil - } - - createRefRes, err := c.CreateReference(ctx, createRefReq) - if err != nil { - log.Err(err).Msg("gateway: error calling GetHome") - return &rpc.Status{ - Code: rpc.Code_CODE_INTERNAL, - }, nil - } - - if createRefRes.Status.Code != rpc.Code_CODE_OK { - err := status.NewErrorFromCode(createRefRes.Status.GetCode(), "gateway") - return status.NewInternal(ctx, err, "error updating received share"), nil - } - - return status.NewOK(ctx), nil -} diff --git a/internal/grpc/services/ocmshareprovider/ocmshareprovider.go b/internal/grpc/services/ocmshareprovider/ocmshareprovider.go index 87401ccecb0..9112af18fe2 100644 --- a/internal/grpc/services/ocmshareprovider/ocmshareprovider.go +++ b/internal/grpc/services/ocmshareprovider/ocmshareprovider.go @@ -208,6 +208,14 @@ func (s *service) getWebappProtocol(share *ocm.Share) *ocmd.Webapp { } } +func (s *service) getDataTransferProtocol(ctx context.Context, share *ocm.Share) *ocmd.Datatx { + // TODO discover the size + return &ocmd.Datatx{ + SourceURI: s.webdavURL(ctx, share), + Size: 0, + } +} + func (s *service) getProtocols(ctx context.Context, share *ocm.Share) ocmd.Protocols { var p ocmd.Protocols for _, m := range share.AccessMethods { @@ -217,7 +225,7 @@ func (s *service) getProtocols(ctx context.Context, share *ocm.Share) ocmd.Proto case *ocm.AccessMethod_WebappOptions: p = append(p, s.getWebappProtocol(share)) case *ocm.AccessMethod_TransferOptions: - // TODO + p = append(p, s.getDataTransferProtocol(ctx, share)) } } return p diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go index 25df9aaeef2..21e3d6e3f0a 100644 --- a/pkg/datatx/manager/rclone/rclone.go +++ b/pkg/datatx/manager/rclone/rclone.go @@ -879,8 +879,13 @@ func (driver *rclone) extractEndpointInfo(ctx context.Context, targetURL string) return nil, errors.Wrap(err, "datatx service: error parsing target resource name") } + var path string + if m["name"] != nil { + path = m["name"][0] + } + return &endpoint{ - filePath: m["name"][0], + filePath: path, endpoint: uri.Host + uri.Path, endpointScheme: uri.Scheme, token: uri.User.String(),