Skip to content

Commit

Permalink
Data transfers new ocm impl (cs3org#3847)
Browse files Browse the repository at this point in the history
* * remove unnecessary creation of ocm reference in ocmshareprovider when accepting an ocm share
* set transfer protocol when creating transfer type ocm share
* refactor transfer endpoints for new ocm impl
* refactor/cleanup gateway.ocmshareprovider.UpdateReceivedOCMShare() code
* refactor data transfers folder config
* new transfers config setting 'remove_on_cancel'
* implement transfer destination path
* update datatx example toml
* update cli ocm-share-update-received with path flag

* Add changelog

* Add #PR

---------

Co-authored-by: Antoon P <antoon@redblom.com>
  • Loading branch information
2 people authored and gmgigi96 committed Jun 28, 2023
1 parent 800fdc7 commit 204e3ba
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 295 deletions.
4 changes: 4 additions & 0 deletions changelog/unreleased/datatx-new-ocm-impl.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Enhancement: Update data transfers for current OCM shares implementation

https://github.com/cs3org/reva/pull/3847
https://github.com/cs3org/reva/issues/3846
28 changes: 28 additions & 0 deletions cmd/reva/ocm-share-update-received.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)
Expand All @@ -33,9 +34,11 @@ func ocmShareUpdateReceivedCommand() *command {
cmd.Description = func() string { return "update a received OCM share" }
cmd.Usage = func() string { return "Usage: ocm-share-update-received [-flags] <share_id>" }
state := cmd.String("state", "pending", "the state of the share (pending, accepted or rejected)")
path := cmd.String("path", "", "the destination path of the data transfer (ignored if this is not a transfer type share)")

cmd.ResetFlags = func() {
*state = "pending"
*path = ""
}

cmd.Action = func(w ...io.Writer) error {
Expand Down Expand Up @@ -75,9 +78,25 @@ func ocmShareUpdateReceivedCommand() *command {
}
shareRes.Share.State = shareState

// check if we are dealing with a transfer in case the destination path needs to be set
_, ok := getTransferProtocol(shareRes.Share)
var opaque *typesv1beta1.Opaque
if ok {
// transfer_destination_path is not part of TransferProtocol and is specified as an opaque field
opaque = &typesv1beta1.Opaque{
Map: map[string]*typesv1beta1.OpaqueEntry{
"transfer_destination_path": {
Decoder: "plain",
Value: []byte(*path),
},
},
}
}

shareRequest := &ocm.UpdateReceivedOCMShareRequest{
Share: shareRes.Share,
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"state"}},
Opaque: opaque,
}

updateRes, err := shareClient.UpdateReceivedOCMShare(ctx, shareRequest)
Expand All @@ -95,6 +114,15 @@ func ocmShareUpdateReceivedCommand() *command {
return cmd
}

func getTransferProtocol(share *ocm.ReceivedShare) (*ocm.TransferProtocol, bool) {
for _, p := range share.Protocols {
if d, ok := p.Term.(*ocm.Protocol_TransferOptions); ok {
return d.TransferOptions, true
}
}
return nil, false
}

func getOCMShareState(state string) ocm.ShareState {
switch state {
case "pending":
Expand Down
86 changes: 47 additions & 39 deletions examples/datatx/datatx.toml
Original file line number Diff line number Diff line change
@@ -1,39 +1,47 @@
# Example data transfer service configuration
[grpc.services.datatx]
# Rclone is the default data transfer driver
txdriver = "rclone"
# The shares,transfers db file (default: /var/tmp/reva/datatx-shares.json)
tx_shares_file = ""
# Base folder of the data transfers (default: /home/DataTransfers)
data_transfers_folder = ""

# Rclone data transfer driver
[grpc.services.datatx.txdrivers.rclone]
# Rclone endpoint
endpoint = "http://..."
# Basic auth is used
auth_user = "...rcloneuser"
auth_pass = "...rcloneusersecret"
# The authentication scheme to use in the src and dest requests by rclone (follows the endpoints' authentication methods)
# Valid values:
# "bearer" (default) will result in rclone using request header: Authorization: "Bearer ...token..."
# "x-access-token" will result in rclone using request header: X-Access-Token: "...token..."
# If not set "bearer" is assumed
auth_header = "x-access-token"
# The transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json)
file = ""
# Check status job interval in milliseconds
job_status_check_interval = 2000
# The job timeout in milliseconds (must be long enough for big transfers!)
job_timeout = 120000

[http.services.ocdav]
# Rclone supports third-party copy push; for that to work with reva enable this setting
enable_http_tpc = true
# The authentication scheme reva uses for the tpc push call (the call to Destination).
# Follows the destination endpoint authentication method.
# Valid values:
# "bearer" (default) will result in header: Authorization: "Bearer ...token..."
# "x-access-token" will result in header: X-Access-Token: "...token..."
# If not set "bearer" is assumed
http_tpc_push_auth_header = "x-access-token"
# all relevant settings for data transfers

[grpc.services.gateway]
datatx = "localhost:19000"
# base folder of the data transfers (eg. /home/DataTransfers)
data_transfers_folder = ""


[grpc.services.datatx]
# rclone is currently the only data transfer driver implementation
txdriver = "rclone"
# the shares,transfers db file (default: /var/tmp/reva/datatx-shares.json)
tx_shares_file = ""
# base folder of the data transfers (eg. /home/DataTransfers)
data_transfers_folder = ""

# rclone driver
[grpc.services.datatx.txdrivers.rclone]
# rclone endpoint
endpoint = "http://..."
# Basic auth is used for authenticating with rclone
auth_user = "{rclone user}"
auth_pass = "{rclone user secret}"
# The authentication scheme to use in the src and dest requests by rclone (follows the endpoints' authentication methods)
# Valid values:
# "bearer" (default) will result in rclone using request header: Authorization: "Bearer ...token..."
# "x-access-token" will result in rclone using request header: X-Access-Token: "...token..."
# If not set "bearer" is assumed
auth_header = "x-access-token"
# the transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json)
file = ""
# check status job interval in milliseconds
job_status_check_interval = 2000
# the job timeout in milliseconds (must be long enough for big transfers!)
job_timeout = 120000

[http.services.ocdav]
# reva supports http third party copy
enable_http_tpc = true
# with rclone reva only supports http tpc push (ie. with the destination header specified)
# The authentication scheme reva uses for the tpc push call (the call to Destination).
# Follows the destination endpoint authentication method.
# Valid values:
# "bearer" (default) will result in header: Authorization: "Bearer ...token..."
# "x-access-token" will result in header: X-Access-Token: "...token..."
# If not set "bearer" is assumed
http_tpc_push_auth_header = "x-access-token"
25 changes: 17 additions & 8 deletions internal/grpc/services/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type config struct {
TxDriver string `mapstructure:"txdriver"`
TxDrivers map[string]map[string]interface{} `mapstructure:"txdrivers"`
// storage driver to persist share/transfer relation
StorageDriver string `mapstructure:"storage_driver"`
StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"`
TxSharesFile string `mapstructure:"tx_shares_file"`
DataTransfersFolder string `mapstructure:"data_transfers_folder"`
StorageDriver string `mapstructure:"storage_driver"`
StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"`
TxSharesFile string `mapstructure:"tx_shares_file"`
RemoveOnCancel bool `mapstructure:"remove_on_cancel"`
}

type service struct {
Expand Down Expand Up @@ -81,9 +81,6 @@ func (c *config) init() {
if c.TxSharesFile == "" {
c.TxSharesFile = "/var/tmp/reva/datatx-shares.json"
}
if c.DataTransfersFolder == "" {
c.DataTransfersFolder = "/home/DataTransfers"
}
}

func (s *service) Register(ss *grpc.Server) {
Expand Down Expand Up @@ -211,10 +208,22 @@ func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransfer
return nil, errtypes.InternalError("datatx service: transfer not found")
}

transferRemovedMessage := ""
if s.conf.RemoveOnCancel {
delete(s.txShareDriver.model.TxShares, req.TxId.GetOpaqueId())
if err := s.txShareDriver.model.saveTxShare(); err != nil {
err = errors.Wrap(err, "datatx service: error deleting transfer: "+datatx.Status_STATUS_INVALID.String())
return &datatx.CancelTransferResponse{
Status: status.NewInvalid(ctx, "error cancelling transfer"),
}, err
}
transferRemovedMessage = "transfer successfully removed"
}

txInfo, err := s.txManager.CancelTransfer(ctx, req.GetTxId().OpaqueId)
if err != nil {
txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID}
err = errors.Wrap(err, "datatx service: error cancelling transfer")
err = errors.Wrapf(err, "(%v) datatx service: error cancelling transfer", transferRemovedMessage)
return &datatx.CancelTransferResponse{
Status: status.NewInternal(ctx, err, "error cancelling transfer"),
TxInfo: txInfo,
Expand Down
4 changes: 0 additions & 4 deletions internal/grpc/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ func (c *config) init() {

c.ShareFolder = strings.Trim(c.ShareFolder, "/")

if c.DataTransfersFolder == "" {
c.DataTransfersFolder = "DataTransfers"
}

if c.TokenManager == "" {
c.TokenManager = "jwt"
}
Expand Down
Loading

0 comments on commit 204e3ba

Please sign in to comment.