Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Gcs remote data #121

Merged
merged 18 commits into from
Sep 3, 2020
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/lyft/flyteadmin
go 1.13

require (
cloud.google.com/go v0.56.0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the last version staying on protobuf v1.3.5. Since v1.4.0, github.com/golang/protobuf started to depend on google.golang.org/protobuf: https://github.com/golang/protobuf/blob/v1.4.0/ptypes/timestamp/timestamp.pb.go#L9

So sooner or later we will need to face this issue again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this is awesome, thank you for finding it

cloud.google.com/go/storage v1.6.0
github.com/NYTimes/gizmo v1.3.5
github.com/Selvatico/go-mocket v1.0.7
github.com/aws/aws-sdk-go v1.29.23
Expand All @@ -13,6 +15,7 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.3.5
github.com/googleapis/gax-go/v2 v2.0.5
github.com/gorilla/handlers v1.4.2
github.com/gorilla/securecookie v1.1.1
github.com/graymeta/stow v0.2.5
Expand All @@ -34,6 +37,8 @@ require (
github.com/stretchr/testify v1.6.1
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/tools v0.0.0-20200818005847-188abfa75333 // indirect
google.golang.org/api v0.20.0
google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940
google.golang.org/grpc v1.28.0
gopkg.in/gormigrate.v1 v1.6.0
k8s.io/api v0.17.3
Expand Down
268 changes: 9 additions & 259 deletions go.sum

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions pkg/data/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type RemoteDataHandlerConfig struct {
Retries int // Number of times to attempt to initialize a new config on failure.
Region string
SignedURLDurationMinutes int
SigningPrincipal string
RemoteDataStoreClient *storage.DataStore
}

Expand All @@ -45,6 +46,12 @@ func GetRemoteDataHandler(cfg RemoteDataHandlerConfig) RemoteDataHandler {
return &remoteDataHandler{
remoteURL: implementations.NewAWSRemoteURL(awsConfig, presignedURLDuration),
}
case common.GCP:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@katrogan / @honnix with the new change to return the entire data as part of the getData API, we should probably think of making signing optional (we have to keep it around to ensure that very large datasets can still be served). By optional I mean, that if it is not specified, the signing should just be skipped.

I do not think my comment needs to be added as part of this commit, but as a follow up? @katrogan what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that was what i had in the original idl pr - adding a mode to specify what data we want returned and making only the signed url a non-default option. returning the signed url data optionally sounds good to me

signedURLDuration := time.Minute * time.Duration(cfg.SignedURLDurationMinutes)
return &remoteDataHandler{
remoteURL: implementations.NewGCPRemoteURL(cfg.SigningPrincipal, signedURLDuration),
}

case common.Local:
logger.Infof(context.TODO(), "setting up local signer ----- ")
// Since minio = aws s3, we are creating the same client but using the config primitives from aws
Expand Down
6 changes: 3 additions & 3 deletions pkg/data/implementations/aws_remote_url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
"github.com/stretchr/testify/assert"
)

func TestSplitURI(t *testing.T) {
func TestAWSSplitURI(t *testing.T) {
remoteURL := AWSRemoteURL{}
s3Object, err := remoteURL.splitURI(context.Background(), "s3://i/am/valid")
assert.Nil(t, err)
assert.Equal(t, "i", s3Object.bucket)
assert.Equal(t, "am/valid", s3Object.key)
}

func TestSplitURI_InvalidScheme(t *testing.T) {
func TestAWSSplitURI_InvalidScheme(t *testing.T) {
remoteURL := AWSRemoteURL{}
_, err := remoteURL.splitURI(context.Background(), "azure://i/am/invalid")
assert.NotNil(t, err)
Expand All @@ -46,7 +46,7 @@ func (m *mockS3Impl) GetObjectRequest(input *s3.GetObjectInput) (req *request.Re
return m.getObjectFunc(input)
}

func TestGet(t *testing.T) {
func TestAWSGet(t *testing.T) {
contentLength := int64(100)
presignDuration := 3 * time.Minute

Expand Down
193 changes: 193 additions & 0 deletions pkg/data/implementations/gcp_remote_url.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package implementations

import (
"context"
"time"

gax "github.com/googleapis/gax-go/v2"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"golang.org/x/oauth2"

credentials "cloud.google.com/go/iam/credentials/apiv1"
gcs "cloud.google.com/go/storage"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/lyft/flyteadmin/pkg/data/interfaces"
"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/storage"
"google.golang.org/api/option"
credentialspb "google.golang.org/genproto/googleapis/iam/credentials/v1"
"google.golang.org/grpc/codes"
)

const gcsScheme = "gs"

type iamCredentialsInterface interface {
SignBlob(ctx context.Context, req *credentialspb.SignBlobRequest, opts ...gax.CallOption) (*credentialspb.SignBlobResponse, error)
GenerateAccessToken(ctx context.Context, req *credentialspb.GenerateAccessTokenRequest, opts ...gax.CallOption) (*credentialspb.GenerateAccessTokenResponse, error)
}

type gcsClientWrapper struct {
delegate *gcs.Client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @honnix I'm not super familiar with the delegate pattern, what's the reason for using it here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this so you can mock out the bucket like you commented below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That is the only way I could figure out how to test this fluid API.

}

type bucketHandleWrapper struct {
delegate *gcs.BucketHandle
}

type objectHandleWrapper struct {
delegate *gcs.ObjectHandle
}

type gcsInterface interface {
Bucket(name string) bucketHandleInterface
}

type bucketHandleInterface interface {
Object(name string) objectHandleInterface
}

type objectHandleInterface interface {
Attrs(ctx context.Context) (attrs *gcs.ObjectAttrs, err error)
}

// GCP-specific implementation of RemoteURLInterface
type GCPRemoteURL struct {
iamCredentialsClient iamCredentialsInterface
gcsClient gcsInterface
signDuration time.Duration
signingPrincipal string
}

type GCPGCSObject struct {
bucket string
object string
}

type impersonationTokenSource struct {
iamCredentialsClient iamCredentialsInterface
signingPrincipal string
}

func (c *gcsClientWrapper) Bucket(name string) bucketHandleInterface {
return &bucketHandleWrapper{delegate: c.delegate.Bucket(name)}
}

func (b *bucketHandleWrapper) Object(name string) objectHandleInterface {
return &objectHandleWrapper{delegate: b.delegate.Object(name)}
}

func (o *objectHandleWrapper) Attrs(ctx context.Context) (attrs *gcs.ObjectAttrs, err error) {
return o.delegate.Attrs(ctx)
}

func (g *GCPRemoteURL) splitURI(ctx context.Context, uri string) (GCPGCSObject, error) {
scheme, container, key, err := storage.DataReference(uri).Split()
if err != nil {
return GCPGCSObject{}, err
}
if scheme != gcsScheme {
logger.Debugf(ctx, "encountered unexpected scheme: %s for GCS URI: %s", scheme, uri)
return GCPGCSObject{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"unexpected scheme %s for GCS URI", scheme)
}
return GCPGCSObject{
bucket: container,
object: key,
}, nil
}

func (g *GCPRemoteURL) signURL(ctx context.Context, gcsURI GCPGCSObject) (string, error) {
opts := &gcs.SignedURLOptions{
Method: "GET",
GoogleAccessID: g.signingPrincipal,
SignBytes: func(b []byte) ([]byte, error) {
req := &credentialspb.SignBlobRequest{
Payload: b,
Name: "projects/-/serviceAccounts/" + g.signingPrincipal,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the meaning of this string prefix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a GCP resource name: https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/signBlob

Basically, it means the service account in some GCP project (-) that doesn't matter.

}
resp, err := g.iamCredentialsClient.SignBlob(ctx, req)
if err != nil {
return nil, err
}
return resp.SignedBlob, nil
},
Expires: time.Now().Add(g.signDuration),
}

return gcs.SignedURL(gcsURI.bucket, gcsURI.object, opts)
}

func (g *GCPRemoteURL) Get(ctx context.Context, uri string) (admin.UrlBlob, error) {
logger.Debugf(ctx, "Getting signed url for - %s", uri)
gcsURI, err := g.splitURI(ctx, uri)
if err != nil {
logger.Debugf(ctx, "failed to extract gcs bucket and object from uri: %s", uri)
return admin.UrlBlob{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid uri: %s", uri)
}

// First, get the size of the url blob.
attrs, err := g.gcsClient.Bucket(gcsURI.bucket).Object(gcsURI.object).Attrs(ctx)
honnix marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logger.Debugf(ctx, "failed to get object size for %s with %v", uri, err)
return admin.UrlBlob{}, errors.NewFlyteAdminErrorf(
codes.Internal, "failed to get object size for %s with %v", uri, err)
}

urlStr, err := g.signURL(ctx, gcsURI)
if err != nil {
logger.Warning(ctx,
"failed to presign url for uri [%s] for %v with err %v", uri, g.signDuration, err)
return admin.UrlBlob{}, errors.NewFlyteAdminErrorf(codes.Internal,
"failed to presign url for uri [%s] for %v with err %v", uri, g.signDuration, err)
}
return admin.UrlBlob{
Url: urlStr,
Bytes: attrs.Size,
}, nil
}

func (ts impersonationTokenSource) Token() (*oauth2.Token, error) {
req := credentialspb.GenerateAccessTokenRequest{
Name: "projects/-/serviceAccounts/" + ts.signingPrincipal,
Scope: []string{"https://www.googleapis.com/auth/devstorage.read_only"},
}

resp, err := ts.iamCredentialsClient.GenerateAccessToken(context.Background(), &req)
if err != nil {
return nil, err
}

return &oauth2.Token{
AccessToken: resp.AccessToken,
Expiry: asTime(resp.ExpireTime),
}, nil
}

func asTime(t *timestamp.Timestamp) time.Time {
return time.Unix(t.GetSeconds(), int64(t.GetNanos())).UTC()
}

func NewGCPRemoteURL(signingPrincipal string, signDuration time.Duration) interfaces.RemoteURLInterface {
iamCredentialsClient, err := credentials.NewIamCredentialsClient(context.Background())
if err != nil {
panic(err)
}

gcsClient, err := gcs.NewClient(context.Background(),
option.WithScopes(gcs.ScopeReadOnly),
option.WithTokenSource(impersonationTokenSource{
iamCredentialsClient: iamCredentialsClient,
signingPrincipal: signingPrincipal,
}))
if err != nil {
panic(err)
}

return &GCPRemoteURL{
iamCredentialsClient: iamCredentialsClient,
gcsClient: &gcsClientWrapper{delegate: gcsClient},
signDuration: signDuration,
signingPrincipal: signingPrincipal,
}
}
Loading