Skip to content

Commit

Permalink
chore: handle query parameters as s3
Browse files Browse the repository at this point in the history
as well as update validation logic for provider config, and fix tests
accordingly.

Signed-off-by: Humair Khan <HumairAK@users.noreply.github.com>
  • Loading branch information
HumairAK committed Apr 12, 2024
1 parent 917066b commit 0060d3b
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 54 deletions.
35 changes: 18 additions & 17 deletions backend/src/v2/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"github.com/kubeflow/pipelines/backend/src/v2/objectstore"
"io/ioutil"
"reflect"
"sigs.k8s.io/yaml"
"strconv"
"strings"
Expand Down Expand Up @@ -50,7 +49,7 @@ type BucketProviders struct {
}

type SessionInfoProvider interface {
ProvideSessionInfo(bucketName, bucketPrefix string) (objectstore.SessionInfo, error)
ProvideSessionInfo(path string) (objectstore.SessionInfo, error)
}

// Config is the KFP runtime configuration.
Expand Down Expand Up @@ -107,8 +106,6 @@ func (c *Config) GetStoreSessionInfo(path string) (objectstore.SessionInfo, erro
if err != nil {
return objectstore.SessionInfo{}, err
}
bucketName := bucketConfig.BucketName
bucketPrefix := bucketConfig.Prefix
provider := strings.TrimSuffix(bucketConfig.Scheme, "://")
bucketProviders, err := c.getBucketProviders()
if err != nil {
Expand All @@ -123,34 +120,38 @@ func (c *Config) GetStoreSessionInfo(path string) (objectstore.SessionInfo, erro
return objectstore.SessionInfo{}, nil
}
return sess, nil
} else {
// If not using minio, and no other provider config is provided
// rely on executor env (e.g. IRSA) for authenticating with provider
return objectstore.SessionInfo{}, nil
}
}

var sessProvider SessionInfoProvider

switch provider {
case "minio":
sessProvider = bucketProviders.Minio
if bucketProviders == nil || bucketProviders.Minio == nil {
sessProvider = &MinioProviderConfig{}
} else {
sessProvider = bucketProviders.Minio
}
break
case "s3":
sessProvider = bucketProviders.S3
if bucketProviders == nil || bucketProviders.S3 == nil {
sessProvider = &S3ProviderConfig{}
} else {
sessProvider = bucketProviders.S3
}
break
case "gs":
sessProvider = bucketProviders.GCS
if bucketProviders == nil || bucketProviders.GCS == nil {
sessProvider = &GCSProviderConfig{}
} else {
sessProvider = bucketProviders.GCS
}
break
default:
return objectstore.SessionInfo{}, fmt.Errorf("Encountered unsupported provider in BucketProviders %s", provider)
}

if sessProvider == nil || reflect.ValueOf(sessProvider).IsNil() {
return objectstore.SessionInfo{}, fmt.Errorf("Encountered unsupported provider in provider config %s", provider)
}

sess, err := sessProvider.ProvideSessionInfo(bucketName, bucketPrefix)
sess, err := sessProvider.ProvideSessionInfo(path)
if err != nil {
return objectstore.SessionInfo{}, err
}
Expand All @@ -177,7 +178,7 @@ func getDefaultMinioSessionInfo() (objectstore.SessionInfo, error) {
Params: map[string]string{
"region": "minio",
"endpoint": objectstore.MinioDefaultEndpoint(),
"disableSsl": strconv.FormatBool(true),
"disableSSL": strconv.FormatBool(true),
"fromEnv": strconv.FormatBool(false),
"secretName": minioArtifactSecretName,
// The k8s secret "Key" for "Artifact SecretKey" and "Artifact AccessKey"
Expand Down
160 changes: 139 additions & 21 deletions backend/src/v2/config/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Test_getDefaultMinioSessionInfo(t *testing.T) {
Params: map[string]string{
"region": "minio",
"endpoint": "minio-service.kubeflow:9000",
"disableSsl": "true",
"disableSSL": "true",
"fromEnv": "false",
"secretName": "mlpipeline-minio-artifact",
"accessKeyKey": "accesskey",
Expand Down Expand Up @@ -78,6 +78,16 @@ func TestGetBucketSessionInfo(t *testing.T) {
shouldError: true,
errorMsg: "unsupported Cloud bucket",
},
{
msg: "valid - only s3 pipelineroot no provider config",
pipelineroot: "s3://my-bucket",
expectedSessionInfo: objectstore.SessionInfo{
Provider: "s3",
Params: map[string]string{
"fromEnv": "true",
},
},
},
{
msg: "invalid - unsupported pipeline root format",
pipelineroot: "minio.unsupported.format",
Expand All @@ -93,7 +103,7 @@ func TestGetBucketSessionInfo(t *testing.T) {
Params: map[string]string{
"region": "minio",
"endpoint": "minio-service.kubeflow:9000",
"disableSsl": "true",
"disableSSL": "true",
"fromEnv": "false",
"secretName": "mlpipeline-minio-artifact",
"accessKeyKey": "accesskey",
Expand All @@ -102,20 +112,60 @@ func TestGetBucketSessionInfo(t *testing.T) {
},
},
{
msg: "invalid - unsupported provider in providers config",
pipelineroot: "s3://my-bucket",
expectedSessionInfo: objectstore.SessionInfo{},
shouldError: true,
errorMsg: "unsupported provider in provider config",
testDataCase: "case0",
msg: "valid - no s3 provider match providers config",
pipelineroot: "s3://my-bucket",
expectedSessionInfo: objectstore.SessionInfo{
Provider: "s3",
Params: map[string]string{
"fromEnv": "true",
},
},
testDataCase: "case0",
},
{
msg: "invalid - empty minio provider",
pipelineroot: "minio://my-bucket/v2/artifacts",
expectedSessionInfo: objectstore.SessionInfo{},
shouldError: true,
errorMsg: "invalid provider config",
testDataCase: "case1",
msg: "valid - no gcs provider match providers config",
pipelineroot: "gs://my-bucket",
expectedSessionInfo: objectstore.SessionInfo{
Provider: "gs",
Params: map[string]string{
"fromEnv": "true",
},
},
testDataCase: "case0",
},
{
msg: "valid - no minio provider match providers config, use default minio config",
pipelineroot: "minio://my-bucket",
expectedSessionInfo: objectstore.SessionInfo{
Provider: "minio",
Params: map[string]string{
"region": "minio",
"endpoint": "minio-service.kubeflow:9000",
"disableSSL": "true",
"fromEnv": "false",
"secretName": "mlpipeline-minio-artifact",
"accessKeyKey": "accesskey",
"secretKeyKey": "secretkey",
},
},
testDataCase: "case1",
},
{
msg: "valid - empty minio provider, use default minio config",
pipelineroot: "minio://my-bucket/v2/artifacts",
expectedSessionInfo: objectstore.SessionInfo{
Provider: "minio",
Params: map[string]string{
"region": "minio",
"endpoint": "minio-service.kubeflow:9000",
"disableSSL": "true",
"fromEnv": "false",
"secretName": "mlpipeline-minio-artifact",
"accessKeyKey": "accesskey",
"secretKeyKey": "secretkey",
},
},
testDataCase: "case1",
},
{
msg: "invalid - empty minio provider no override",
Expand Down Expand Up @@ -370,28 +420,96 @@ func TestGetBucketSessionInfo(t *testing.T) {
t.Run(test.msg, func(t *testing.T) {
config := Config{data: map[string]string{}}
if test.testDataCase != "" {
config.data["providers"] = fetchProviderFromdata(providersData, test.testDataCase)
config.data["providers"] = fetchProviderFromData(providersData, test.testDataCase)
if config.data["providers"] == "" {
panic(fmt.Errorf("provider not found in testdata"))
}
}

actualSession, err := config.GetStoreSessionInfo(test.pipelineroot)
actualSession, err1 := config.GetStoreSessionInfo(test.pipelineroot)
if test.shouldError {
assert.Error(t, err)
if err != nil && test.errorMsg != "" {
assert.Contains(t, err.Error(), test.errorMsg)
assert.Error(t, err1)
if err1 != nil && test.errorMsg != "" {
assert.Contains(t, err1.Error(), test.errorMsg)
}
} else {
assert.Nil(t, err)
assert.Nil(t, err1)
}

assert.Equal(t, test.expectedSessionInfo, actualSession)
})
}
}

func fetchProviderFromdata(cases TestcaseData, name string) string {
func Test_QueryParameters(t *testing.T) {
providersDataFile, err := os.ReadFile("testdata/provider_cases.yaml")
if os.IsNotExist(err) {
panic(err)
}

var providersData TestcaseData
err = yaml.Unmarshal(providersDataFile, &providersData)
if err != nil {
panic(err)
}

tt := []struct {
msg string
config Config
expectedSessionInfo objectstore.SessionInfo
pipelineroot string
shouldError bool
errorMsg string
testDataCase string
}{
{
msg: "valid - should fetch fromEnv when when query parameters are present, and when no matching provider config is provided",
pipelineroot: "s3://bucket_name/v2/artifacts/profile_name?region=bucket_region&endpoint=endpoint&disableSSL=not_use_ssl&s3ForcePathStyle=true",
expectedSessionInfo: objectstore.SessionInfo{
Provider: "s3",
Params: map[string]string{
"fromEnv": "true",
},
},
shouldError: false,
},
{
msg: "valid - should fetch fromEnv when when query parameters are present, and when matching provider config is provided",
pipelineroot: "minio://bucket_name/v2/artifacts/profile_name?region=bucket_region&endpoint=endpoint&disableSSL=not_use_ssl&s3ForcePathStyle=true",
expectedSessionInfo: objectstore.SessionInfo{
Provider: "minio",
Params: map[string]string{
"fromEnv": "true",
},
},
shouldError: false,
testDataCase: "case12",
},
}
for _, test := range tt {
t.Run(test.msg, func(t *testing.T) {
config := Config{data: map[string]string{}}
if test.testDataCase != "" {
config.data["providers"] = fetchProviderFromData(providersData, test.testDataCase)
if config.data["providers"] == "" {
panic(fmt.Errorf("provider not found in testdata"))
}
}
actualSession, err1 := config.GetStoreSessionInfo(test.pipelineroot)
if test.shouldError {
assert.Error(t, err1)
if err1 != nil && test.errorMsg != "" {
assert.Contains(t, err1.Error(), test.errorMsg)
}
} else {
assert.Nil(t, err1)
}
assert.Equal(t, test.expectedSessionInfo, actualSession)
})
}
}

func fetchProviderFromData(cases TestcaseData, name string) string {
for _, c := range cases.Testcases {
if c.Name == name {
return c.Value
Expand Down
20 changes: 19 additions & 1 deletion backend/src/v2/config/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,31 @@ type GCSSecretRef struct {
TokenKey string `json:"tokenKey"`
}

func (p GCSProviderConfig) ProvideSessionInfo(bucketName, bucketPrefix string) (objectstore.SessionInfo, error) {
func (p GCSProviderConfig) ProvideSessionInfo(path string) (objectstore.SessionInfo, error) {
bucketConfig, err := objectstore.ParseBucketPathToConfig(path)
if err != nil {
return objectstore.SessionInfo{}, err
}
bucketName := bucketConfig.BucketName
bucketPrefix := bucketConfig.Prefix

invalidConfigErr := func(err error) error {
return fmt.Errorf("invalid provider config: %w", err)
}

params := map[string]string{}

// 1. If provider config did not have a matching configuration for the provider inferred from pipelineroot OR
// 2. If a user has provided query parameters
// then we use blob.OpenBucket(ctx, config.bucketURL()) by setting "FromEnv = True"
if p.Default == nil && p.Overrides == nil {
params["fromEnv"] = strconv.FormatBool(true)
return objectstore.SessionInfo{
Provider: "gs",
Params: params,
}, nil
}

if p.Default == nil || p.Default.Credentials == nil {
return objectstore.SessionInfo{}, invalidConfigErr(fmt.Errorf("missing default credentials"))
}
Expand Down
25 changes: 22 additions & 3 deletions backend/src/v2/config/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,35 @@

package config

import "github.com/kubeflow/pipelines/backend/src/v2/objectstore"
import (
"github.com/kubeflow/pipelines/backend/src/v2/objectstore"
)

type MinioProviderConfig S3ProviderConfig

// ProvideSessionInfo provides the SessionInfo for minio provider.
// this is the same as s3ProviderConfig.ProvideSessionInfo except
// the provider is set to minio
func (p MinioProviderConfig) ProvideSessionInfo(bucketName, bucketPrefix string) (objectstore.SessionInfo, error) {
func (p MinioProviderConfig) ProvideSessionInfo(path string) (objectstore.SessionInfo, error) {
bucketConfig, err := objectstore.ParseBucketPathToConfig(path)
if err != nil {
return objectstore.SessionInfo{}, err
}
queryString := bucketConfig.QueryString

// When using minio root, with no query strings, if no matching provider in kfp-launcher exists
// we use the default minio configurations
if (p.Default == nil && p.Overrides == nil) && queryString == "" {
sess, sessErr := getDefaultMinioSessionInfo()
if sessErr != nil {
return objectstore.SessionInfo{}, nil
}
return sess, nil
}

s3ProviderConfig := S3ProviderConfig(p)
info, err := s3ProviderConfig.ProvideSessionInfo(bucketName, bucketPrefix)

info, err := s3ProviderConfig.ProvideSessionInfo(path)
if err != nil {
return objectstore.SessionInfo{}, err
}
Expand Down
Loading

0 comments on commit 0060d3b

Please sign in to comment.