Skip to content

Commit

Permalink
Merge pull request #1176 from warren830/feat-devlake-config-update
Browse files Browse the repository at this point in the history
feat: add devlake-config update method
  • Loading branch information
IronCore864 authored Oct 20, 2022
2 parents f7718d8 + 76e7569 commit 30cf3af
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 41 deletions.
168 changes: 130 additions & 38 deletions internal/pkg/plugin/devlakeconfig/devlakeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
Expand All @@ -17,25 +18,6 @@ var httpClient = &http.Client{
Timeout: 5 * time.Second,
}

func RenderAuthConfig(options configmanager.RawOptions) (configmanager.RawOptions, error) {
opts, err := NewOptions(options)
if err != nil {
return nil, err
}

for _, p := range opts.Plugins {
for _, c := range p.Connections {
c.Token = c.Authx.Token
c.Username = c.Authx.Username
c.Password = c.Authx.Password
c.AppId = c.Authx.AppId
c.SecretKey = c.Authx.SecretKey
}
}

return opts.Encode()
}

func ApplyConfig(options configmanager.RawOptions) error {
opts, err := NewOptions(options)
if err != nil {
Expand All @@ -55,42 +37,92 @@ func ApplyConfig(options configmanager.RawOptions) error {
func createConnections(host string, pluginName string, connections []Connection) error {
for i, c := range connections {
log.Infof("Connection %d: %s", i, c.Name)
configs, err := json.Marshal(c)
if err != nil {
return err
}
log.Debugf("Connection configs: %s", string(configs))

url := fmt.Sprintf("%s/plugins/%s/connections", strings.TrimRight(host, "/"), pluginName)
log.Debugf("URL: %s", url)

if err := createConnection(url, configs); err != nil {
if err := createConnection(host, pluginName, c); err != nil {
return err
}
}

log.Infof("All %s connections have been created.", pluginName)
return nil
}

func createConnection(url string, bodyWithJson []byte) error {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(bodyWithJson))
func createConnection(host string, pluginName string, c Connection) error {
configs, err := json.Marshal(c)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
log.Debugf("Connection configs: %s", string(configs))
url := fmt.Sprintf("%s/plugins/%s/connections", strings.TrimRight(host, "/"), pluginName)
log.Debugf("URL: %s", url)
if _, err = apiRequest(http.MethodPost, url, configs); err != nil {
return err
}
return nil
}

resp, err := httpClient.Do(req)
// update existed connection in devlake backend
func updateConnection(host string, pluginName string, c Connection) error {
configs, err := json.Marshal(c)
if err != nil {
return err
}
log.Debugf("UPDATE Connection configs: %s", string(configs))
url := fmt.Sprintf("%s/plugins/%s/connections/%d", strings.TrimRight(host, "/"), pluginName, c.ID)
log.Debugf("URL: %s", url)
if _, err = apiRequest(http.MethodPatch, url, configs); err != nil {
return err
}
return nil
}

// delete existed connection in devlake backend
func deleteConnection(host string, pluginName string, c Connection) error {
configs, err := json.Marshal(c)
if err != nil {
return err
}
log.Debugf("DELETE Connection configs: %s", string(configs))
url := fmt.Sprintf("%s/plugins/%s/connections/%d", strings.TrimRight(host, "/"), pluginName, c.ID)
log.Debugf("URL: %s", url)
if _, err = apiRequest(http.MethodDelete, url, configs); err != nil {
return err
}
return nil
}

func getConnections(host string, pluginName string) ([]Connection, error) {
url := fmt.Sprintf("%s/plugins/%s/connections", strings.TrimRight(host, "/"), pluginName)
log.Debugf("URL: %s", url)
resp, err := apiRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
resBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
connections := make([]Connection, 0)
err = json.Unmarshal(resBody, &connections)
if err != nil {
return nil, err
}
return connections, nil
}

func apiRequest(method string, url string, bodyWithJson []byte) (*http.Response, error) {
req, err := http.NewRequest(method, url, bytes.NewBuffer(bodyWithJson))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
resp, err := httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusOK {
return nil
return resp, nil
}

return fmt.Errorf(resp.Status)
return nil, fmt.Errorf(resp.Status)
}

func DeleteConfig(options configmanager.RawOptions) error {
Expand All @@ -99,11 +131,71 @@ func DeleteConfig(options configmanager.RawOptions) error {
}

func UpdateConfig(options configmanager.RawOptions) error {
// TODO(daniel-hutao): implement later
opts, err := NewOptions(options)
if err != nil {
return err
}
for _, p := range opts.Plugins {
if updatePluginConfig(opts.DevLakeAddr, p) != nil {
return err
}
}
return nil
}

func updatePluginConfig(devlakeAddr string, plugin DevLakePlugin) error {
connections, err := getConnections(devlakeAddr, plugin.Name)
if err != nil {
return err
}

// Connection.Name -> Connection for config
connMapForConfig := make(map[string]Connection)
// Connection.Name -> Connection for status
connMapForStatus := make(map[string]Connection)
for _, c := range plugin.Connections {
connMapForConfig[c.Name] = c
}
for _, c := range connections {
connMapForStatus[c.Name] = c
}

return updatePluginConnections(connMapForConfig, connMapForStatus, devlakeAddr, plugin)
}

func updatePluginConnections(connMapForConfig, connMapForStatus map[string]Connection, devlakeAddr string, plugin DevLakePlugin) error {
for name := range connMapForConfig {
// Create connection which is not in ResourceStatus
c, ok := connMapForStatus[name]
if ok {
if err := createConnection(devlakeAddr, plugin.Name, connMapForConfig[name]); err != nil {
return err
}
continue
}

// Update connection which is different from State
if c != connMapForConfig[name] {
if err := updateConnection(devlakeAddr, plugin.Name, connMapForStatus[name]); err != nil {
return err
}
}
}

// Delete connection which is not in Config
for name := range connMapForStatus {
if _, ok := connMapForConfig[name]; !ok {
if err := deleteConnection(devlakeAddr, plugin.Name, connMapForStatus[name]); err != nil {
return err
}
}
}

return nil
}

func GetStatus(options configmanager.RawOptions) (statemanager.ResourceStatus, error) {
// TODO(daniel-hutao): get the real status later
resStatus := statemanager.ResourceStatus(options)
return resStatus, nil
}
42 changes: 40 additions & 2 deletions internal/pkg/plugin/devlakeconfig/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,25 @@ type DevLakePlugin struct {

type Connection struct {
staging.RestConnection `mapstructure:",squash"`
Authx Auth `mapstructure:"auth" validate:"required"`
Auth `mapstructure:",squash"`
// a connection format in dtm config:
// - name: ""
// endpoint: ""
// proxy: ""
// rateLimitPerHour: 0
// auth:
// username: "changeme"
// password: "changeme"
Auth Auth `mapstructure:"auth" validate:"required"`
// a connection format in DevLake api:
// {
// "name": ""
// "endpoint": ""
// "proxy": ""
// "rateLimitPerHour": 0
// "username": "changeme"
// "password": "changeme"
// }
InlineAuth `mapstructure:",squash"`
}

type Auth struct {
Expand All @@ -53,10 +70,31 @@ type Auth struct {
staging.AppKey `mapstructure:",squash"`
}

type InlineAuth Auth

func (o *Options) Encode() (configmanager.RawOptions, error) {
var options configmanager.RawOptions
if err := mapstructure.Decode(o, &options); err != nil {
return nil, err
}
return options, nil
}

func RenderAuthConfig(options configmanager.RawOptions) (configmanager.RawOptions, error) {
opts, err := NewOptions(options)
if err != nil {
return nil, err
}

for _, p := range opts.Plugins {
for _, c := range p.Connections {
c.Token = c.Auth.Token
c.Username = c.Auth.Username
c.Password = c.Auth.Password
c.AppId = c.Auth.AppId
c.SecretKey = c.Auth.SecretKey
}
}

return opts.Encode()
}
1 change: 0 additions & 1 deletion internal/pkg/plugin/devlakeconfig/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ func Read(options configmanager.RawOptions) (statemanager.ResourceStatus, error)
operator := &plugininstaller.Operator{
PreExecuteOperations: plugininstaller.PreExecuteOperations{
validate,
RenderAuthConfig,
},
GetStatusOperation: GetStatus,
}
Expand Down

0 comments on commit 30cf3af

Please sign in to comment.