diff --git a/yoda/executor/docker.go b/yoda/executor/docker.go index 53a2c1a44..f9f97fa0c 100644 --- a/yoda/executor/docker.go +++ b/yoda/executor/docker.go @@ -3,72 +3,128 @@ package executor import ( "bytes" "context" - "io" - "io/ioutil" - "os" + "encoding/base64" + "fmt" + "net/url" "os/exec" - "path/filepath" + "strconv" "time" - "github.com/google/shlex" - - "github.com/bandprotocol/chain/v2/x/oracle/types" + "github.com/levigross/grequests" ) +// Only use in testnet. No intensive testing, use at your own risk type DockerExec struct { image string + name string timeout time.Duration + // portLists chan string + port string + maxTry int } -func NewDockerExec(image string, timeout time.Duration) *DockerExec { - return &DockerExec{image: image, timeout: timeout} +func NewDockerExec(image string, timeout time.Duration, maxTry int, startPort int, endPort int) *DockerExec { + // portLists := make(chan string, endPort-startPort+1) + name := "docker-runtime-executor-" + for i := startPort; i <= endPort; i++ { + port := strconv.Itoa(i) + StartContainer(name, port, image) + // portLists <- port + } + + return &DockerExec{image: image, name: name, timeout: timeout, port: strconv.Itoa(startPort), maxTry: maxTry} } -func (e *DockerExec) Exec(code []byte, arg string, env interface{}) (ExecResult, error) { - // TODO: Handle env if we are to revive Docker - dir, err := ioutil.TempDir("/tmp", "executor") +func StartContainer(name string, port string, image string) error { + err := exec.Command("docker", "restart", name+port).Run() if err != nil { - return ExecResult{}, err + dockerArgs := append([]string{ + "run", + "--name", name + port, + "-p", port + ":5000", + "--restart=always", + "--memory=512m", + image, + }) + + cmd := exec.CommandContext(context.Background(), "docker", dockerArgs...) + var buf bytes.Buffer + cmd.Stdout = &buf + cmd.Stderr = &buf + err = cmd.Start() } - defer os.RemoveAll(dir) - err = ioutil.WriteFile(filepath.Join(dir, "exec"), code, 0777) + return err +} + +func (e *DockerExec) PostRequest( + code []byte, + arg string, + env interface{}, + name string, + port string, +) (ExecResult, error) { + executable := base64.StdEncoding.EncodeToString(code) + resp, err := grequests.Post( + "http://localhost:"+port, + &grequests.RequestOptions{ + Headers: map[string]string{ + "Content-Type": "application/json", + }, + JSON: map[string]interface{}{ + "executable": executable, + "calldata": arg, + "timeout": e.timeout.Milliseconds(), + "env": env, + }, + RequestTimeout: e.timeout, + }, + ) + if err != nil { - return ExecResult{}, err + urlErr, ok := err.(*url.Error) + if !ok || !urlErr.Timeout() { + return ExecResult{}, err + } + // Return timeout code + return ExecResult{Output: []byte{}, Code: 111}, nil } - name := filepath.Base(dir) - args, err := shlex.Split(arg) + + if !resp.Ok { + return ExecResult{}, ErrRestNotOk + } + + r := externalExecutionResponse{} + err = resp.JSON(&r) + if err != nil { return ExecResult{}, err } - dockerArgs := append([]string{ - "run", "--rm", - "-v", dir + ":/scratch:ro", - "--name", name, - e.image, - "/scratch/exec", - }, args...) - ctx, cancel := context.WithTimeout(context.Background(), e.timeout) - defer cancel() - cmd := exec.CommandContext(ctx, "docker", dockerArgs...) - var buf bytes.Buffer - cmd.Stdout = &buf - cmd.Stderr = &buf - err = cmd.Run() - if ctx.Err() == context.DeadlineExceeded { - exec.Command("docker", "kill", name).Start() - return ExecResult{}, ErrExecutionimeout + + // go func() { + // // StartContainer(name, port, e.image) + // err := exec.Command("docker", "restart", name+port).Run() + // for err != nil { + // err = StartContainer(name, port, e.image) + // } + // e.portLists <- port + // }() + if r.Returncode == 0 { + return ExecResult{Output: []byte(r.Stdout), Code: 0, Version: r.Version}, nil + } else { + return ExecResult{Output: []byte(r.Stderr), Code: r.Returncode, Version: r.Version}, nil } - exitCode := uint32(0) - if err != nil { - if exitError, ok := err.(*exec.ExitError); ok { - exitCode = uint32(exitError.ExitCode()) - } else { - return ExecResult{}, err +} + +func (e *DockerExec) Exec(code []byte, arg string, env interface{}) (ExecResult, error) { + // port := <-e.portLists + errs := []error{} + for i := 0; i < e.maxTry; i++ { + execResult, err := e.PostRequest(code, arg, env, e.name, e.port) + if err == nil { + return execResult, err } + errs = append(errs, err) + time.Sleep(500 * time.Millisecond) } - output, err := ioutil.ReadAll(io.LimitReader(&buf, int64(types.DefaultMaxReportDataSize))) - if err != nil { - return ExecResult{}, err - } - return ExecResult{Output: output, Code: exitCode}, nil + return ExecResult{}, fmt.Errorf(ErrReachMaxTry.Error()+", tried errors: %#q", errs) } diff --git a/yoda/executor/docker_test.go b/yoda/executor/docker_test.go index 2797e5324..1ef5a1125 100644 --- a/yoda/executor/docker_test.go +++ b/yoda/executor/docker_test.go @@ -2,48 +2,31 @@ package executor import ( "testing" -) - -func TestDockerSuccess(t *testing.T) { - // TODO: Enable test when CI has docker installed. - // e := NewDockerExec("bandprotocol/runtime:1.0.2", 10*time.Second) - // res, err := e.Exec([]byte(`#!/usr/bin/env python3 - // import json - // import urllib.request - // import sys + "time" - // BINANCE_URL = "https://api.binance.com/api/v1/depth?symbol={}USDT&limit=5" - - // def make_json_request(url): - // req = urllib.request.Request(url) - // req.add_header( - // "User-Agent", - // "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36", - // ) - // return json.loads(urllib.request.urlopen(req).read()) - - // def main(symbol): - // res = make_json_request(BINANCE_URL.format(symbol)) - // bid = float(res["bids"][0][0]) - // ask = float(res["asks"][0][0]) - // return (bid + ask) / 2 + "github.com/stretchr/testify/require" +) - // if __name__ == "__main__": - // try: - // print(main(*sys.argv[1:])) - // except Exception as e: - // print(str(e), file=sys.stderr) - // sys.exit(1) - // `), "BTC") - // fmt.Println(string(res.Output), res.Code, err) - // require.True(t, false) +func SetupDockerTest(t *testing.T) { } -func TestDockerLongStdout(t *testing.T) { +func TestDockerSuccess(t *testing.T) { // TODO: Enable test when CI has docker installed. - // e := NewDockerExec("bandprotocol/runtime:1.0.2", 10*time.Second) - // res, err := e.Exec([]byte(`#!/usr/bin/env python3 - // print("A"*1000)`), "BTC") - // fmt.Println(string(res.Output), res.Code, err) - // require.True(t, false) + // Prerequisite: please build docker image before running test + e := NewDockerExec("ongartbandprotocol/docker-executor:0.2.0", 12*time.Second, 100, 30001, 30001) + for i := 0; i < 20; i++ { + res, err := e.Exec([]byte( + "#!/usr/bin/env python3\nimport os\nimport sys\nprint(sys.argv[1], os.getenv('BAND_CHAIN_ID'))", + ), "TEST_ARG", map[string]interface{}{ + "BAND_CHAIN_ID": "test-chain-id", + "BAND_VALIDATOR": "test-validator", + "BAND_REQUEST_ID": "test-request-id", + "BAND_EXTERNAL_ID": "test-external-id", + "BAND_REPORTER": "test-reporter", + "BAND_SIGNATURE": "test-signature", + }) + require.Equal(t, []byte("TEST_ARG test-chain-id\n"), res.Output) + require.Equal(t, uint32(0), res.Code) + require.NoError(t, err) + } } diff --git a/yoda/executor/executor.go b/yoda/executor/executor.go index 86251e5c3..6f40460c4 100644 --- a/yoda/executor/executor.go +++ b/yoda/executor/executor.go @@ -4,17 +4,21 @@ import ( "errors" "fmt" "net/url" + "strconv" "strings" "time" ) const ( - flagQueryTimeout = "timeout" + flagQueryTimeout = "timeout" + flagQueryMaxTry = "maxTry" + flagQueryPortRange = "portRange" ) var ( ErrExecutionimeout = errors.New("execution timeout") ErrRestNotOk = errors.New("rest return non 2XX response") + ErrReachMaxTry = errors.New("execution reach max try") ) type ExecResult struct { @@ -33,7 +37,7 @@ var testProgram []byte = []byte( // NewExecutor returns executor by name and executor URL func NewExecutor(executor string) (exec Executor, err error) { - name, base, timeout, err := parseExecutor(executor) + name, base, timeout, maxTry, startPort, endPort, err := parseExecutor(executor) if err != nil { return nil, err } @@ -41,7 +45,14 @@ func NewExecutor(executor string) (exec Executor, err error) { case "rest": exec = NewRestExec(base, timeout) case "docker": - return nil, fmt.Errorf("Docker executor is currently not supported") + // Only use in testnet. No intensive testing, use at your own risk + if endPort < startPort { + return nil, fmt.Errorf("portRange invalid: startPort: %d, endPort: %d", startPort, endPort) + } + if maxTry < 1 { + return nil, fmt.Errorf("maxTry invalid: %d", maxTry) + } + exec = NewDockerExec(base, timeout, maxTry, startPort, endPort) default: return nil, fmt.Errorf("Invalid executor name: %s, base: %s", name, base) } @@ -68,29 +79,61 @@ func NewExecutor(executor string) (exec Executor, err error) { return exec, nil } -// parseExecutor splits the executor string in the form of "name:base?timeout=" into parts. -func parseExecutor(executorStr string) (name string, base string, timeout time.Duration, err error) { +// parseExecutor splits the executor string in the form of "name:base?timeout=&maxTry=&portRange=" into parts. +func parseExecutor( + executorStr string, +) (name string, base string, timeout time.Duration, maxTry int, startPort int, endPort int, err error) { executor := strings.SplitN(executorStr, ":", 2) if len(executor) != 2 { - return "", "", 0, fmt.Errorf("Invalid executor, cannot parse executor: %s", executorStr) + return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid executor, cannot parse executor: %s", executorStr) } u, err := url.Parse(executor[1]) if err != nil { - return "", "", 0, fmt.Errorf("Invalid url, cannot parse %s to url with error: %s", executor[1], err.Error()) + return "", "", 0, 0, 0, 0, fmt.Errorf( + "Invalid url, cannot parse %s to url with error: %s", + executor[1], + err.Error(), + ) } query := u.Query() timeoutStr := query.Get(flagQueryTimeout) if timeoutStr == "" { - return "", "", 0, fmt.Errorf("Invalid timeout, executor requires query timeout") + return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid timeout, executor requires query timeout") } - // Remove timeout from query because we need to return `base` - query.Del(flagQueryTimeout) - u.RawQuery = query.Encode() - timeout, err = time.ParseDuration(timeoutStr) if err != nil { - return "", "", 0, fmt.Errorf("Invalid timeout, cannot parse duration with error: %s", err.Error()) + return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid timeout, cannot parse duration with error: %s", err.Error()) + } + + maxTryStr := query.Get(flagQueryMaxTry) + if maxTryStr == "" { + maxTryStr = "1" + } + maxTry, err = strconv.Atoi(maxTryStr) + if err != nil { + return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid maxTry, cannot parse integer with error: %s", err.Error()) + } + + portRangeStr := query.Get(flagQueryPortRange) + ports := strings.SplitN(portRangeStr, "-", 2) + if len(ports) != 2 { + ports = []string{"0", "0"} } - return executor[0], u.String(), timeout, nil + startPort, err = strconv.Atoi(ports[0]) + if err != nil { + return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid portRange, cannot parse integer with error: %s", err.Error()) + } + endPort, err = strconv.Atoi(ports[1]) + if err != nil { + return "", "", 0, 0, 0, 0, fmt.Errorf("Invalid portRange, cannot parse integer with error: %s", err.Error()) + } + + // Remove timeout from query because we need to return `base` + query.Del(flagQueryTimeout) + query.Del(flagQueryMaxTry) + query.Del(flagQueryPortRange) + + u.RawQuery = query.Encode() + return executor[0], u.String(), timeout, maxTry, startPort, endPort, nil } diff --git a/yoda/executor/executor_test.go b/yoda/executor/executor_test.go index 44672f74e..ffa8541d7 100644 --- a/yoda/executor/executor_test.go +++ b/yoda/executor/executor_test.go @@ -8,19 +8,26 @@ import ( ) func TestParseExecutor(t *testing.T) { - name, url, timeout, err := parseExecutor("beeb:www.beebprotocol.com?timeout=3s") + name, url, timeout, maxTry, startPort, endPort, err := parseExecutor("beeb:www.beebprotocol.com?timeout=3s") require.Equal(t, name, "beeb") require.Equal(t, timeout, 3*time.Second) require.Equal(t, url, "www.beebprotocol.com") + require.Equal(t, maxTry, 1) + require.Equal(t, startPort, 0) + require.Equal(t, endPort, 0) require.NoError(t, err) - name, url, timeout, err = parseExecutor("beeb2:www.beeb.com/anna/kondanna?timeout=300ms") + name, url, timeout, _, _, _, err = parseExecutor( + "beeb2:www.beeb.com/anna/kondanna?timeout=300ms", + ) require.Equal(t, name, "beeb2") require.Equal(t, timeout, 300*time.Millisecond) require.Equal(t, url, "www.beeb.com/anna/kondanna") require.NoError(t, err) - name, url, timeout, err = parseExecutor("beeb3:https://bandprotocol.com/gg/gg2/bandchain?timeout=1s300ms") + name, url, timeout, _, _, _, err = parseExecutor( + "beeb3:https://bandprotocol.com/gg/gg2/bandchain?timeout=1s300ms", + ) require.Equal(t, name, "beeb3") require.Equal(t, timeout, 1*time.Second+300*time.Millisecond) require.Equal(t, url, "https://bandprotocol.com/gg/gg2/bandchain") @@ -28,16 +35,38 @@ func TestParseExecutor(t *testing.T) { } func TestParseExecutorWithoutRawQuery(t *testing.T) { - _, _, _, err := parseExecutor("beeb:www.beebprotocol.com") + _, _, _, _, _, _, err := parseExecutor("beeb:www.beebprotocol.com") require.EqualError(t, err, "Invalid timeout, executor requires query timeout") } func TestParseExecutorInvalidExecutorError(t *testing.T) { - _, _, _, err := parseExecutor("beeb") + _, _, _, _, _, _, err := parseExecutor("beeb") require.EqualError(t, err, "Invalid executor, cannot parse executor: beeb") } func TestParseExecutorInvalidTimeoutError(t *testing.T) { - _, _, _, err := parseExecutor("beeb:www.beebprotocol.com?timeout=beeb") + _, _, _, _, _, _, err := parseExecutor("beeb:www.beebprotocol.com?timeout=beeb") require.EqualError(t, err, "Invalid timeout, cannot parse duration with error: time: invalid duration \"beeb\"") } + +func TestExecuteDockerExecutorSuccess(t *testing.T) { + e, err := NewExecutor( + "docker:ongartbandprotocol/docker-executor:0.2.7?timeout=12s&maxTry=100&portRange=40001-40001", + ) + require.NoError(t, err) + for i := 0; i < 100; i++ { + res, err := e.Exec([]byte( + "#!/usr/bin/env python3\nimport os\nimport sys\nprint(sys.argv[1], os.getenv('BAND_CHAIN_ID'))", + ), "TEST_ARG", map[string]interface{}{ + "BAND_CHAIN_ID": "test-chain-id", + "BAND_VALIDATOR": "test-validator", + "BAND_REQUEST_ID": "test-request-id", + "BAND_EXTERNAL_ID": "test-external-id", + "BAND_REPORTER": "test-reporter", + "BAND_SIGNATURE": "test-signature", + }) + require.Equal(t, []byte("TEST_ARG test-chain-id\n"), res.Output) + require.Equal(t, uint32(0), res.Code) + require.NoError(t, err) + } +}