From a1dad2dbd9c95c24d6219847d3fb963103c62a51 Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Thu, 5 Sep 2019 16:32:20 +0800 Subject: [PATCH 01/13] Add stats, status, querystate, putstate, trigger cmds for Pulsar Functions Signed-off-by: xiaolong.ran --- .github/workflows/go.yml | 3 +- .gitignore | 1 + pkg/ctl/functions/functions.go | 5 + pkg/ctl/functions/putstate.go | 142 ++++++++++++++++++++ pkg/ctl/functions/putstate_test.go | 101 +++++++++++++++ pkg/ctl/functions/querystate.go | 176 +++++++++++++++++++++++++ pkg/ctl/functions/stats.go | 201 +++++++++++++++++++++++++++++ pkg/ctl/functions/stats_test.go | 77 +++++++++++ pkg/ctl/functions/status.go | 171 ++++++++++++++++++++++++ pkg/ctl/functions/status_test.go | 77 +++++++++++ pkg/ctl/functions/trigger.go | 175 +++++++++++++++++++++++++ pkg/ctl/functions/trigger_test.go | 125 ++++++++++++++++++ pkg/pulsar/data.go | 24 ++++ pkg/pulsar/function_state.go | 26 ++++ pkg/pulsar/function_status.go | 53 ++++++++ pkg/pulsar/functions.go | 171 +++++++++++++++++++++++- pkg/pulsar/functions_stats.go | 145 +++++++++++++++++++++ 17 files changed, 1670 insertions(+), 3 deletions(-) create mode 100644 pkg/ctl/functions/putstate.go create mode 100644 pkg/ctl/functions/putstate_test.go create mode 100644 pkg/ctl/functions/querystate.go create mode 100644 pkg/ctl/functions/stats.go create mode 100644 pkg/ctl/functions/stats_test.go create mode 100644 pkg/ctl/functions/status.go create mode 100644 pkg/ctl/functions/status_test.go create mode 100644 pkg/ctl/functions/trigger.go create mode 100644 pkg/ctl/functions/trigger_test.go create mode 100644 pkg/pulsar/function_state.go create mode 100644 pkg/pulsar/function_status.go create mode 100644 pkg/pulsar/functions_stats.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index fa9c8d7a..6d158040 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,7 +19,8 @@ jobs: - name: Get dependencies run: | docker pull apachepulsar/pulsar - docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test apachepulsar/pulsar bin/pulsar standalone + docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test --name pulsarctl apachepulsar/pulsar bin/pulsar standalone + docker cp pulsarctl:/pulsar/examples/api-examples.jar ${PWD}/test/functions sleep 10 - name: Build diff --git a/.gitignore b/.gitignore index 57d30eb0..64ded2aa 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ *.so *.dylib pulsarctl +api-examples.jar # Test binary, build with `go test -c` *.test diff --git a/pkg/ctl/functions/functions.go b/pkg/ctl/functions/functions.go index 7b85519b..2b276f05 100644 --- a/pkg/ctl/functions/functions.go +++ b/pkg/ctl/functions/functions.go @@ -38,6 +38,11 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { cmdutils.AddVerbCmd(flagGrouping, resourceCmd, restartFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, listFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, statusFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, statsFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, querystateFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, putstateFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, triggerFunctionsCmd) return resourceCmd } diff --git a/pkg/ctl/functions/putstate.go b/pkg/ctl/functions/putstate.go new file mode 100644 index 00000000..788022cb --- /dev/null +++ b/pkg/ctl/functions/putstate.go @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "encoding/json" + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "Put the state associated with a Pulsar Function." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + putstate := pulsar.Example{ + Desc: "Put the state associated with a Pulsar Function", + Command: "pulsarctl functions putstate \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--state \"{\"key\":\"pulsar\", \"stringValue\":\"hello\"}\" ", + } + examples = append(examples, putstate) + + putstateWithFQFN := pulsar.Example{ + Desc: "Put the state associated with a Pulsar Function with FQFN", + Command: "pulsarctl functions putstate \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions] \n" + + "\t--state \"{\"key\":\"pulsar\", \"stringValue\":\"hello\"}\"", + } + examples = append(examples, putstateWithFQFN) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "PutState successfully", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist) + desc.CommandOutput = out + + vc.SetDescription( + "putstate", + "Put the state associated with a Pulsar Function", + desc.ToString(), + "putstate", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doPutStateFunction(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVarP( + &functionData.State, + "state", + "t", + "", + "The FunctionState that needs to be put") + }) +} + +func doPutStateFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + + var state pulsar.FunctionState + err = json.Unmarshal([]byte(funcData.State), &state) + if err != nil { + return err + } + + err = admin.Functions().PutFunctionState(funcData.Tenant, funcData.Namespace, funcData.FuncName, state) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } else { + vc.Command.Printf("PutState successfully") + } + + return err +} diff --git a/pkg/ctl/functions/putstate_test.go b/pkg/ctl/functions/putstate_test.go new file mode 100644 index 00000000..b25a1aed --- /dev/null +++ b/pkg/ctl/functions/putstate_test.go @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "encoding/json" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestStateFunctions(t *testing.T) { + basePath, err := getDirHelp() + if basePath == "" || err != nil { + t.Error(err) + } + t.Logf("base path: %s", basePath) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-putstate", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", basePath + "/test/functions/api-examples.jar", + } + + out, _, err := TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + assert.Equal(t, out.String(), "Created test-functions-putstate successfully") + + // wait the function create successfully + time.Sleep(time.Second * 3) + + putstateArgs := []string{"putstate", + "--name", "test-functions-putstate", + "--state", "{\"key\":\"pulsar\", \"stringValue\":\"hello\"}", + } + + outPutState, _, err := TestFunctionsCommands(putstateFunctionsCmd, putstateArgs) + assert.Nil(t, err) + assert.Equal(t, outPutState.String(), "PutState successfully") + + // test failure case for put state + failureStateArgs := []string{"putstate", + "--name", "not-exist", + "--state", "{\"key\":\"pulsar\", \"stringValue\":\"hello\"}", + } + + _, execErrMsg, _ := TestFunctionsCommands(putstateFunctionsCmd, failureStateArgs) + assert.NotNil(t, execErrMsg) + exceptMsg := "code: 404 reason: Stream 'not-exist' is not found" + assert.Equal(t, execErrMsg.Error(), exceptMsg) + + // query state + queryStateArgs := []string{"querystate", + "--name", "test-functions-putstate", + "--key", "pulsar", + } + + outQueryState, _, err := TestFunctionsCommands(querystateFunctionsCmd, queryStateArgs) + assert.Nil(t, err) + + var state pulsar.FunctionState + err = json.Unmarshal(outQueryState.Bytes(), &state) + assert.Nil(t, err) + + assert.Equal(t, "pulsar", state.Key) + assert.Equal(t, "hello", state.StringValue) + assert.Equal(t, int64(0), state.Version) + // put state again + outPutStateAgain, _, err := TestFunctionsCommands(putstateFunctionsCmd, putstateArgs) + assert.Nil(t, err) + assert.Equal(t, outPutStateAgain.String(), "PutState successfully") + + // query state again + outQueryStateAgain, _, err := TestFunctionsCommands(querystateFunctionsCmd, queryStateArgs) + assert.Nil(t, err) + + var stateAgain pulsar.FunctionState + err = json.Unmarshal(outQueryStateAgain.Bytes(), &stateAgain) + assert.Nil(t, err) + + assert.Equal(t, int64(1), stateAgain.Version) +} diff --git a/pkg/ctl/functions/querystate.go b/pkg/ctl/functions/querystate.go new file mode 100644 index 00000000..2b18fa98 --- /dev/null +++ b/pkg/ctl/functions/querystate.go @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "time" +) + +func querystateFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "Fetch the current state associated with a Pulsar Function." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + querystate := pulsar.Example{ + Desc: "Fetch the current state associated with a Pulsar Function", + Command: "pulsarctl functions querystate \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--key \n" + + "\t--watch", + } + examples = append(examples, querystate) + + querystateWithFQFN := pulsar.Example{ + Desc: "Fetch the current state associated with a Pulsar Function with FQFN", + Command: "pulsarctl functions querystate \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]\n" + + "\t--key \n" + + "\t--watch", + } + examples = append(examples, querystateWithFQFN) + + querystateNoWatch := pulsar.Example{ + Desc: "Fetch the current state associated with a Pulsar Function", + Command: "pulsarctl functions querystate \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--key ", + } + examples = append(examples, querystateNoWatch) + + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "{\n" + + " \"key\": \"pulsar\",\n" + + " \"stringValue\": \"hello\",\n" + + " \"byteValue\": null,\n" + + " \"numberValue\": 0,\n" + + " \"version\": 6\n" + + "}", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + failOutWithKeyNotExist := pulsar.Output{ + Desc: "key doesn't exist, please check --key args", + Out: "error: key doesn't exist", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithKeyNotExist) + desc.CommandOutput = out + + vc.SetDescription( + "querystate", + "Fetch the current state associated with a Pulsar Function", + desc.ToString(), + "querystate", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doQueryStateFunction(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVarP( + &functionData.Key, + "key", + "k", + "", + "key") + + flagSet.BoolVarP( + &functionData.Watch, + "watch", + "w", + false, + "Watch for changes in the value associated with a key for a Pulsar Function") + }) +} + +func doQueryStateFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + + for { + functionState, err := admin.Functions().GetFunctionState(funcData.Tenant, funcData.Namespace, funcData.FuncName, funcData.Key) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } else { + cmdutils.PrintJson(vc.Command.OutOrStdout(), functionState) + } + + if funcData.Watch { + time.Sleep(time.Millisecond * 1000) + } + + if !funcData.Watch { + break + } + } + return err +} diff --git a/pkg/ctl/functions/stats.go b/pkg/ctl/functions/stats.go new file mode 100644 index 00000000..1ea9fb14 --- /dev/null +++ b/pkg/ctl/functions/stats.go @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "strconv" +) + +func statsFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "Get the current stats of a Pulsar Function." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + stats := pulsar.Example{ + Desc: "Get the current stats of a Pulsar Function", + Command: "pulsarctl functions stats \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name ", + } + examples = append(examples, stats) + + statsWithFQFN := pulsar.Example{ + Desc: "Get the current stats of a Pulsar Function with FQFN", + Command: "pulsarctl functions stats \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, statsWithFQFN) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "{\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0,\n" + + " \"lastInvocation\": 0,\n" + + " \"oneMin\": {\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0\n" + + " },\n" + + " \"instances\": [\n" + + " {\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0,\n" + + " \"instanceId\": 0,\n" + + " \"metrics\": {\n" + + " \"oneMin\": {\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0\n" + + " },\n" + + " \"lastInvocation\": 0,\n" + + " \"userMetrics\": {},\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"instanceId\": 0,\n" + + " \"metrics\": {\n" + + " \"oneMin\": {\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0\n" + + " },\n" + + " \"lastInvocation\": 0,\n" + + " \"userMetrics\": null,\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0\n" + + " }\n" + + "}", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist) + desc.CommandOutput = out + + vc.SetDescription( + "stats", + "Get the current stats of a Pulsar Function", + desc.ToString(), + "stats", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doStatsFunction(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVar( + &functionData.InstanceID, + "instance-id", + "", + "The function instanceId (Get-stats of all instances if instance-id is not provided)") + }) +} + +func doStatsFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + if funcData.InstanceID != "" { + instanceID, err := strconv.Atoi(funcData.InstanceID) + if err != nil { + return err + } + functionInstanceStatsData, err := admin.Functions().GetFunctionStatsWithInstanceID(funcData.Tenant, funcData.Namespace, funcData.FuncName, instanceID) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } + cmdutils.PrintJson(vc.Command.OutOrStdout(), functionInstanceStatsData) + } else { + functionStats, err := admin.Functions().GetFunctionStats(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } + + cmdutils.PrintJson(vc.Command.OutOrStdout(), functionStats) + } + + return err +} diff --git a/pkg/ctl/functions/stats_test.go b/pkg/ctl/functions/stats_test.go new file mode 100644 index 00000000..487ef5da --- /dev/null +++ b/pkg/ctl/functions/stats_test.go @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "encoding/json" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "github.com/stretchr/testify/assert" + "strings" + "testing" + "time" +) + +func TestStatsFunctions(t *testing.T) { + basePath, err := getDirHelp() + if basePath == "" || err != nil { + t.Error(err) + } + t.Logf("base path: %s", basePath) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stats", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", basePath + "/test/functions/api-examples.jar", + } + + out, _, err := TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + assert.Equal(t, out.String(), "Created test-functions-stats successfully") + + // wait the function create successfully + time.Sleep(time.Second * 3) + + statsArgs := []string{"stats", + "--name", "test-functions-stats", + } + + outStats, _, err := TestFunctionsCommands(statsFunctionsCmd, statsArgs) + assert.Nil(t, err) + + var stats pulsar.FunctionStats + err = json.Unmarshal(outStats.Bytes(), &stats) + assert.Nil(t, err) + + assert.Equal(t, int64(0), stats.ReceivedTotal) + assert.Equal(t, int64(0), stats.ProcessedSuccessfullyTotal) +} + +func TestFailureStats(t *testing.T) { + statsArgs := []string{"stats", + "--name", "test-functions-stats-failure", + } + + out, _, err := TestFunctionsCommands(statsFunctionsCmd, statsArgs) + assert.Nil(t, err) + + errMsg := "Function test-functions-stats-failure doesn't exist" + assert.True(t, strings.Contains(out.String(), errMsg)) +} diff --git a/pkg/ctl/functions/status.go b/pkg/ctl/functions/status.go new file mode 100644 index 00000000..b2f24711 --- /dev/null +++ b/pkg/ctl/functions/status.go @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "strconv" +) + +func statusFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "Check the current status of a Pulsar Function." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + status := pulsar.Example{ + Desc: "Check the current status of a Pulsar Function", + Command: "pulsarctl functions status \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name ", + } + examples = append(examples, status) + + statusWithFQFN := pulsar.Example{ + Desc: "Check the current status of a Pulsar Function with FQFN", + Command: "pulsarctl functions status \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, statusWithFQFN) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "{\n" + + " \"numInstances\": 1,\n" + + " \"numRunning\": 1,\n" + + " \"instances\": [\n" + + " {\n" + + " \"instanceId\": 0,\n" + + " \"status\": {\n" + + " \"running\": true,\n" + + " \"error\": \"\",\n" + + " \"numRestarts\": 0,\n" + + " \"numReceived\": 0,\n" + + " \"numSuccessfullyProcessed\": 0,\n" + + " \"numUserExceptions\": 0,\n" + + " \"latestUserExceptions\": [],\n" + + " \"numSystemExceptions\": 0,\n" + + " \"latestSystemExceptions\": [],\n" + + " \"averageLatency\": 0,\n" + + " \"lastInvocationTime\": 0,\n" + + " \"workerId\": \"c-standalone-fw-127.0.0.1-8080\"\n" + + " }\n" + + " }\n" + + " ]\n" + + "}", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + failOutWithWrongInstanceID := pulsar.Output{ + Desc: "Used an instanceID that does not exist or other impermissible actions", + Out: "[✖] code: 400 reason: Operation not permitted", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithWrongInstanceID) + desc.CommandOutput = out + + vc.SetDescription( + "status", + "Check the current status of a Pulsar Function", + desc.ToString(), + "getstatus", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doStatusFunction(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVar( + &functionData.InstanceID, + "instance-id", + "", + "The function instanceId (Get-status of all instances if instance-id is not provided)") + }) +} + +func doStatusFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + if funcData.InstanceID != "" { + instanceID, err := strconv.Atoi(funcData.InstanceID) + if err != nil { + return err + } + functionInstanceStatusData, err := admin.Functions().GetFunctionStatusWithInstanceID(funcData.Tenant, funcData.Namespace, funcData.FuncName, instanceID) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } + cmdutils.PrintJson(vc.Command.OutOrStdout(), functionInstanceStatusData) + } else { + functionStatus, err := admin.Functions().GetFunctionStatus(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } + + cmdutils.PrintJson(vc.Command.OutOrStdout(), functionStatus) + } + + return err +} diff --git a/pkg/ctl/functions/status_test.go b/pkg/ctl/functions/status_test.go new file mode 100644 index 00000000..8f77a174 --- /dev/null +++ b/pkg/ctl/functions/status_test.go @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "encoding/json" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "github.com/stretchr/testify/assert" + "strings" + "testing" + "time" +) + +func TestStatusFunctions(t *testing.T) { + basePath, err := getDirHelp() + if basePath == "" || err != nil { + t.Error(err) + } + t.Logf("base path: %s", basePath) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-status", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", basePath + "/test/functions/api-examples.jar", + } + + out, _, err := TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + assert.Equal(t, out.String(), "Created test-functions-status successfully") + + // wait the function create successfully + time.Sleep(time.Second * 3) + + statusArgs := []string{"status", + "--name", "test-functions-status", + } + + outStatus, _, err := TestFunctionsCommands(statusFunctionsCmd, statusArgs) + assert.Nil(t, err) + + var status pulsar.FunctionStatus + err = json.Unmarshal(outStatus.Bytes(), &status) + assert.Nil(t, err) + + assert.Equal(t, 1, status.NumRunning) + assert.Equal(t, 1, status.NumInstances) +} + +func TestFailureStatus(t *testing.T) { + statusArgs := []string{"status", + "--name", "test-functions-status-failure", + } + + out, _, err := TestFunctionsCommands(statusFunctionsCmd, statusArgs) + assert.Nil(t, err) + + errMsg := "Function test-functions-status-failure doesn't exist" + assert.True(t, strings.Contains(out.String(), errMsg)) +} diff --git a/pkg/ctl/functions/trigger.go b/pkg/ctl/functions/trigger.go new file mode 100644 index 00000000..cd985b33 --- /dev/null +++ b/pkg/ctl/functions/trigger.go @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "errors" + "fmt" + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func triggerFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "Trigger the specified Pulsar Function with a supplied value." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + trigger := pulsar.Example{ + Desc: "Trigger the specified Pulsar Function with a supplied value", + Command: "pulsarctl functions trigger \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--topic \n" + + "\t--trigger-value \"hello pulsar\"", + } + examples = append(examples, trigger) + + triggerWithFQFN := pulsar.Example{ + Desc: "Trigger the specified Pulsar Function with a supplied value", + Command: "pulsarctl functions trigger \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]\n" + + "\t--topic \n" + + "\t--trigger-value \"hello pulsar\"", + } + examples = append(examples, triggerWithFQFN) + + triggerWithFile := pulsar.Example{ + Desc: "Trigger the specified Pulsar Function with a supplied value", + Command: "pulsarctl functions trigger \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--topic \n" + + "\t--trigger-file ", + } + examples = append(examples, triggerWithFile) + + desc.CommandExamples = examples + + var out []pulsar.Output + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + failOutWithWrongInstanceID := pulsar.Output{ + Desc: "Used an instanceID that does not exist or other impermissible actions", + Out: "[✖] code: 400 reason: Operation not permitted", + } + + failOutWithTopic := pulsar.Output{ + Desc: "Function in trigger function has unidentified topic", + Out: "[✖] code: 400 reason: Function in trigger function has unidentified topic", + } + + failOutWithTimeout := pulsar.Output{ + Desc: "Request Timed Out", + Out: "[✖] code: 408 reason: Request Timed Out", + } + + out = append(out, failOut, failOutWithNameNotExist, failOutWithWrongInstanceID, failOutWithTopic, failOutWithTimeout) + desc.CommandOutput = out + + vc.SetDescription( + "trigger", + "Trigger the specified Pulsar Function with a supplied value", + desc.ToString(), + "trigger", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doTriggerFunction(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVar( + &functionData.Topic, + "topic", + "", + "The specific topic name that the function consumes from that you want to inject the data to") + + flagSet.StringVar( + &functionData.TriggerFile, + "trigger-file", + "", + "The path to the file that contains the data with which you want to trigger the function") + + flagSet.StringVar( + &functionData.TriggerValue, + "trigger-value", + "", + "The value with which you want to trigger the function") + }) +} + +func doTriggerFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + + if funcData.TriggerValue == "" && funcData.TriggerFile == "" { + return errors.New("either a trigger value or a trigger filepath needs to be specified") + } + + retval, err := admin.Functions().TriggerFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName, funcData.Topic, funcData.TriggerValue, funcData.TriggerFile) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } else { + fmt.Println(retval) + } + return err +} diff --git a/pkg/ctl/functions/trigger_test.go b/pkg/ctl/functions/trigger_test.go new file mode 100644 index 00000000..730c8875 --- /dev/null +++ b/pkg/ctl/functions/trigger_test.go @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "encoding/json" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestTriggerFunctions(t *testing.T) { + basePath, err := getDirHelp() + if basePath == "" || err != nil { + t.Error(err) + } + t.Logf("base path: %s", basePath) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-trigger", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", basePath + "/test/functions/api-examples.jar", + } + + out, _, err := TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + assert.Equal(t, out.String(), "Created test-functions-trigger successfully") + + // wait the function create successfully + time.Sleep(time.Second * 3) + + triggerArgs := []string{"trigger", + "--name", "test-functions-trigger", + "--topic", "test-input-topic", + "--trigger-value", "hello pulsar", + } + + for i := 0; i < 5; i++ { + _, _, err = TestFunctionsCommands(triggerFunctionsCmd, triggerArgs) + assert.Nil(t, err) + } + + statsArgs := []string{"stats", + "--name", "test-functions-trigger", + } + + outStats, _, err := TestFunctionsCommands(statsFunctionsCmd, statsArgs) + assert.Nil(t, err) + + var stats pulsar.FunctionStats + err = json.Unmarshal(outStats.Bytes(), &stats) + assert.Nil(t, err) + + assert.Equal(t, int64(5), stats.ReceivedTotal) + assert.Equal(t, int64(5), stats.ProcessedSuccessfullyTotal) +} + +func TestTriggerFunctionsFailure(t *testing.T) { + basePath, err := getDirHelp() + if basePath == "" || err != nil { + t.Error(err) + } + t.Logf("base path: %s", basePath) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-trigger-failure", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", basePath + "/test/functions/api-examples.jar", + } + + out, _, err := TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + assert.Equal(t, out.String(), "Created test-functions-trigger-failure successfully") + // wait the function create successfully + time.Sleep(time.Second * 3) + + triggerArgs := []string{"trigger", + "--name", "not-exist", + "--topic", "test-input-topic", + "--trigger-value", "hello pulsar", + } + + _, errMsg, _ := TestFunctionsCommands(triggerFunctionsCmd, triggerArgs) + errorMessage := "code: 404 reason: Function not-exist doesn't exist" + assert.Equal(t, errorMessage, errMsg.Error()) + + triggerArgsNoTopic := []string{"trigger", + "--name", "test-functions-trigger-failure", + "--topic", "test-input-topic-failure", + "--trigger-value", "hello pulsar", + } + _, errMsg, _ = TestFunctionsCommands(triggerFunctionsCmd, triggerArgsNoTopic) + noTopicErr := "code: 400 reason: Function in trigger function has unidentified topic" + assert.Equal(t, noTopicErr, errMsg.Error()) + + triggerArgsNoValueOrFile := []string{"trigger", + "--name", "test-functions-trigger-failure", + "--topic", "test-input-topic", + } + _, errMsg, _ = TestFunctionsCommands(triggerFunctionsCmd, triggerArgsNoValueOrFile) + noValueOrFile := "either a trigger value or a trigger filepath needs to be specified" + assert.Equal(t, noValueOrFile, errMsg.Error()) +} diff --git a/pkg/pulsar/data.go b/pkg/pulsar/data.go index 04924f35..2dca2438 100644 --- a/pkg/pulsar/data.go +++ b/pkg/pulsar/data.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package pulsar // ClusterData information on a cluster @@ -47,6 +64,13 @@ type FunctionData struct { MaxMessageRetries int `json:"maxMessageRetries"` DeadLetterTopic string `json:"deadLetterTopic"` + Key string `json:"key"` + Watch bool `json:"watch"` + State string `json:"state"` + TriggerValue string `json:"triggerValue"` + TriggerFile string `json:"triggerFile"` + Topic string `json:"topic"` + FuncConf *FunctionConfig `json:"-"` UserCodeFile string `json:"-"` } diff --git a/pkg/pulsar/function_state.go b/pkg/pulsar/function_state.go new file mode 100644 index 00000000..a740e20e --- /dev/null +++ b/pkg/pulsar/function_state.go @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type FunctionState struct { + Key string `json:"key"` + StringValue string `json:"stringValue"` + ByteValue []byte `json:"byteValue"` + NumValue int64 `json:"numberValue"` + Version int64 `json:"version"` +} diff --git a/pkg/pulsar/function_status.go b/pkg/pulsar/function_status.go new file mode 100644 index 00000000..1c699beb --- /dev/null +++ b/pkg/pulsar/function_status.go @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type FunctionStatus struct { + NumInstances int `json:"numInstances"` + NumRunning int `json:"numRunning"` + Instances []FunctionInstanceStatus `json:"instances"` +} + +type FunctionInstanceStatus struct { + InstanceId int `json:"instanceId"` + Status FunctionInstanceStatusData `json:"status"` +} + +type FunctionInstanceStatusData struct { + Running bool `json:"running"` + Err string `json:"error"` + NumRestarts int64 `json:"numRestarts"` + NumReceived int64 `json:"numReceived"` + NumSuccessfullyProcessed int64 `json:"numSuccessfullyProcessed"` + NumUserExceptions int64 `json:"numUserExceptions"` + LatestUserExceptions []ExceptionInformation `json:"latestUserExceptions"` + NumSystemExceptions int64 `json:"numSystemExceptions"` + LatestSystemExceptions []ExceptionInformation `json:"latestSystemExceptions"` + AverageLatency float64 `json:"averageLatency"` + LastInvocationTime int64 `json:"lastInvocationTime"` + WorkerId string `json:"workerId"` +} + +type ExceptionInformation struct { + ExceptionString string `json:"exceptionString"` + TimestampMs int64 `json:"timestampMs"` +} + +func (fs *FunctionStatus) AddInstance(functionInstanceStatus FunctionInstanceStatus) { + fs.Instances = append(fs.Instances, functionInstanceStatus) +} diff --git a/pkg/pulsar/functions.go b/pkg/pulsar/functions.go index 089c8c50..5e6140dd 100644 --- a/pkg/pulsar/functions.go +++ b/pkg/pulsar/functions.go @@ -70,6 +70,30 @@ type Functions interface { // Get the configuration for the specified function GetFunction(tenant, namespace, name string) (FunctionConfig, error) + + // Gets the current status of a function + GetFunctionStatus(tenant, namespace, name string) (FunctionStatus, error) + + // Gets the current status of a function instance + GetFunctionStatusWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatusData, error) + + // Gets the current stats of a function + GetFunctionStats(tenant, namespace, name string) (FunctionStats, error) + + // Gets the current stats of a function instance + GetFunctionStatsWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatsData, error) + + // Fetch the current state associated with a Pulsar Function + // + // Response Example: + // { "value : 12, version : 2"} + GetFunctionState(tenant, namespace, name, key string) (FunctionState, error) + + // Puts the given state associated with a Pulsar Function + PutFunctionState(tenant, namespace, name string, state FunctionState) error + + // Triggers the function by writing to the input topic + TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error) } type functions struct { @@ -86,14 +110,14 @@ func (c *client) Functions() Functions { func (f *functions) createStringFromField(w *multipart.Writer, value string) (io.Writer, error) { h := make(textproto.MIMEHeader) - h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s" `, "functionConfig")) + h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s" `, value)) h.Set("Content-Type", "application/json") return w.CreatePart(h) } func (f *functions) createTextFromFiled(w *multipart.Writer, value string) (io.Writer, error) { h := make(textproto.MIMEHeader) - h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s" `, "url")) + h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s" `, value)) h.Set("Content-Type", "text/plain") return w.CreatePart(h) } @@ -256,3 +280,146 @@ func (f *functions) GetFunction(tenant, namespace, name string) (FunctionConfig, err := f.client.get(endpoint, &functionConfig) return functionConfig, err } + +func (f *functions) GetFunctionStatus(tenant, namespace, name string) (FunctionStatus, error) { + var functionStatus FunctionStatus + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + err := f.client.get(endpoint+"/status", &functionStatus) + return functionStatus, err +} + +func (f *functions) GetFunctionStatusWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatusData, error) { + var functionInstanceStatusData FunctionInstanceStatusData + id := fmt.Sprintf("%d", instanceID) + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) + err := f.client.get(endpoint+"/status", &functionInstanceStatusData) + return functionInstanceStatusData, err +} + +func (f *functions) GetFunctionStats(tenant, namespace, name string) (FunctionStats, error) { + var functionStats FunctionStats + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + err := f.client.get(endpoint+"/stats", &functionStats) + return functionStats, err +} + +func (f *functions) GetFunctionStatsWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatsData, error) { + var functionInstanceStatsData FunctionInstanceStatsData + id := fmt.Sprintf("%d", instanceID) + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) + err := f.client.get(endpoint+"/stats", &functionInstanceStatsData) + return functionInstanceStatsData, err +} + +func (f *functions)GetFunctionState(tenant, namespace, name, key string) (FunctionState, error) { + var functionState FunctionState + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, "state", key) + err := f.client.get(endpoint, &functionState) + return functionState, err +} + +func (f *functions) PutFunctionState(tenant, namespace, name string, state FunctionState) error { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, "state", state.Key) + + // buffer to store our request as bytes + bodyBuf := bytes.NewBufferString("") + + multiPartWriter := multipart.NewWriter(bodyBuf) + + stateData, err := json.Marshal(state) + if err != nil { + return err + } + + stateWriter, err := f.createStringFromField(multiPartWriter, "state") + if err != nil { + return err + } + + _, err = stateWriter.Write(stateData) + if err != nil { + return err + } + + // In here, we completed adding the file and the fields, let's close the multipart writer + // So it writes the ending boundary + if err = multiPartWriter.Close(); err != nil { + return err + } + + contentType := multiPartWriter.FormDataContentType() + err = f.client.postWithMultiPart(endpoint, nil, nil, bodyBuf, contentType) + if err != nil { + return err + } + + return nil +} + +func (f *functions) TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error) { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, "trigger") + + // buffer to store our request as bytes + bodyBuf := bytes.NewBufferString("") + + multiPartWriter := multipart.NewWriter(bodyBuf) + + if triggerFile != "" { + file, err := os.Open(triggerFile) + if err != nil { + return "", err + } + defer file.Close() + + part, err := multiPartWriter.CreateFormFile("dataStream", filepath.Base(file.Name())) + + if err != nil { + return "", err + } + + // copy the actual file content to the filed's writer + _, err = io.Copy(part, file) + if err != nil { + return "", err + } + } + + if triggerValue != "" { + valueWriter, err := f.createTextFromFiled(multiPartWriter, "data") + if err != nil { + return "", err + } + + _, err = valueWriter.Write([]byte(triggerValue)) + if err != nil { + return "", err + } + } + + if topic != "" { + topicWriter, err := f.createTextFromFiled(multiPartWriter, "topic") + if err != nil { + return "", err + } + + _, err = topicWriter.Write([]byte(topic)) + if err != nil { + return "", err + } + } + + // In here, we completed adding the file and the fields, let's close the multipart writer + // So it writes the ending boundary + if err := multiPartWriter.Close(); err != nil { + return "", err + } + + contentType := multiPartWriter.FormDataContentType() + var str string + err := f.client.postWithMultiPart(endpoint, &str, nil, bodyBuf, contentType) + if err != nil { + return "", err + } + + return str, nil +} diff --git a/pkg/pulsar/functions_stats.go b/pkg/pulsar/functions_stats.go new file mode 100644 index 00000000..3f644388 --- /dev/null +++ b/pkg/pulsar/functions_stats.go @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type FunctionStats struct { + // Overall total number of records function received from source + ReceivedTotal int64 `json:"receivedTotal"` + + // Overall total number of records successfully processed by user function + ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"` + + // Overall total number of system exceptions thrown + SystemExceptionsTotal int64 `json:"systemExceptionsTotal"` + + // Overall total number of user exceptions thrown + UserExceptionsTotal int64 `json:"userExceptionsTotal"` + + // Average process latency for function + AvgProcessLatency float64 `json:"avgProcessLatency"` + + // Timestamp of when the function was last invoked by any instance + LastInvocation int64 `json:"lastInvocation"` + + OneMin FunctionInstanceStatsDataBase `json:"oneMin"` + + Instances []FunctionInstanceStats `json:"instances"` + + FunctionInstanceStats +} + +type FunctionInstanceStats struct { + FunctionInstanceStatsDataBase + + InstanceId int64 `json:"instanceId"` + + Metrics FunctionInstanceStatsData `json:"metrics"` +} + +type FunctionInstanceStatsDataBase struct { + // Total number of records function received from source for instance + ReceivedTotal int64 `json:"receivedTotal"` + + // Total number of records successfully processed by user function for instance + ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"` + + // Total number of system exceptions thrown for instance + SystemExceptionsTotal int64 `json:"systemExceptionsTotal"` + + // Total number of user exceptions thrown for instance + UserExceptionsTotal int64 `json:"userExceptionsTotal"` + + // Average process latency for function for instance + AvgProcessLatency float64 `json:"avgProcessLatency"` +} + +type FunctionInstanceStatsData struct { + OneMin FunctionInstanceStatsDataBase `json:"oneMin"` + + // Timestamp of when the function was last invoked for instance + LastInvocation int64 `json:"lastInvocation"` + + // Map of user defined metrics + UserMetrics map[string]float64 `json:"userMetrics"` + + FunctionInstanceStatsDataBase +} + +func (fs *FunctionStats) AddInstance(functionInstanceStats FunctionInstanceStats) { + fs.Instances = append(fs.Instances, functionInstanceStats) +} + +func (fs *FunctionStats) CalculateOverall() *FunctionStats { + var ( + nonNullInstances int + nonNullInstancesOneMin int + ) + + for _, functionInstanceStats := range fs.Instances { + functionInstanceStatsData := functionInstanceStats.Metrics + fs.ReceivedTotal += functionInstanceStatsData.ReceivedTotal + fs.ProcessedSuccessfullyTotal += functionInstanceStatsData.ProcessedSuccessfullyTotal + fs.SystemExceptionsTotal += functionInstanceStatsData.SystemExceptionsTotal + fs.UserExceptionsTotal += functionInstanceStatsData.UserExceptionsTotal + + if functionInstanceStatsData.AvgProcessLatency != 0 { + if fs.AvgProcessLatency == 0 { + fs.AvgProcessLatency = 0.0 + } + + fs.AvgProcessLatency += functionInstanceStatsData.AvgProcessLatency + nonNullInstances++ + } + + fs.OneMin.ReceivedTotal += functionInstanceStatsData.OneMin.ReceivedTotal + fs.OneMin.ProcessedSuccessfullyTotal += functionInstanceStatsData.OneMin.ProcessedSuccessfullyTotal + fs.OneMin.SystemExceptionsTotal += functionInstanceStatsData.OneMin.SystemExceptionsTotal + fs.OneMin.UserExceptionsTotal += functionInstanceStatsData.OneMin.UserExceptionsTotal + + if functionInstanceStatsData.OneMin.AvgProcessLatency != 0 { + if fs.OneMin.AvgProcessLatency == 0 { + fs.OneMin.AvgProcessLatency = 0.0 + } + + fs.OneMin.AvgProcessLatency += functionInstanceStatsData.OneMin.AvgProcessLatency + nonNullInstancesOneMin++ + } + + if functionInstanceStatsData.LastInvocation != 0 { + if fs.LastInvocation == 0 || functionInstanceStatsData.LastInvocation > fs.LastInvocation { + fs.LastInvocation = functionInstanceStatsData.LastInvocation + } + } + } + + // calculate average from sum + if nonNullInstances > 0 { + fs.AvgProcessLatency = fs.AvgProcessLatency / float64(nonNullInstances) + } else { + fs.AvgProcessLatency = 0 + } + + // calculate 1min average from sum + if nonNullInstancesOneMin > 0 { + fs.OneMin.AvgProcessLatency = fs.OneMin.AvgProcessLatency / float64(nonNullInstancesOneMin) + } else { + fs.AvgProcessLatency = 0 + } + + return fs +} From 63586d4eb3a6ff88e711e610c7c18432c834414b Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Thu, 5 Sep 2019 20:43:02 +0800 Subject: [PATCH 02/13] debug ci error Signed-off-by: xiaolong.ran --- .github/workflows/go.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 6d158040..a2ad815a 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,7 +19,10 @@ jobs: - name: Get dependencies run: | docker pull apachepulsar/pulsar + echo ${PWD} docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test --name pulsarctl apachepulsar/pulsar bin/pulsar standalone + echo ${PWD} + ls -al docker cp pulsarctl:/pulsar/examples/api-examples.jar ${PWD}/test/functions sleep 10 From 368311cece4ee68c285ee165448465c1646938c6 Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Thu, 5 Sep 2019 20:49:26 +0800 Subject: [PATCH 03/13] debug ci error Signed-off-by: xiaolong.ran --- .github/workflows/go.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index a2ad815a..3c4c220d 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -23,7 +23,11 @@ jobs: docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test --name pulsarctl apachepulsar/pulsar bin/pulsar standalone echo ${PWD} ls -al - docker cp pulsarctl:/pulsar/examples/api-examples.jar ${PWD}/test/functions + ls -al test/ + ls -al test/functions/ + docker cp pulsarctl:/pulsar/examples/api-examples.jar ${PWD}/test/functions/ + ls -al test/ + ls -al test/functions/ sleep 10 - name: Build From 16c613a859130c2e790fcfb5605ef09904d7ba2a Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Thu, 5 Sep 2019 21:09:25 +0800 Subject: [PATCH 04/13] fix ci error Signed-off-by: xiaolong.ran --- .github/workflows/go.yml | 6 ------ pkg/ctl/functions/test_help.go | 4 +++- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 3c4c220d..acb33184 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,14 +19,8 @@ jobs: - name: Get dependencies run: | docker pull apachepulsar/pulsar - echo ${PWD} docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test --name pulsarctl apachepulsar/pulsar bin/pulsar standalone - echo ${PWD} - ls -al - ls -al test/ - ls -al test/functions/ docker cp pulsarctl:/pulsar/examples/api-examples.jar ${PWD}/test/functions/ - ls -al test/ ls -al test/functions/ sleep 10 diff --git a/pkg/ctl/functions/test_help.go b/pkg/ctl/functions/test_help.go index b1ba4318..9d81d2a4 100644 --- a/pkg/ctl/functions/test_help.go +++ b/pkg/ctl/functions/test_help.go @@ -60,9 +60,11 @@ func TestFunctionsCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) ( var ( flag bool + basePath string ) -func getDirHelp() (basePath string, err error) { +func getDirHelp() (string, error) { + var err error if !flag { basePath, err = os.Getwd() if err != nil { From ffc24b026cb2edd83545aafbc26a500aea1343aa Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Fri, 6 Sep 2019 10:01:26 +0800 Subject: [PATCH 05/13] fix ci error Signed-off-by: xiaolong.ran --- pkg/ctl/functions/putstate_test.go | 9 ++++++--- pkg/ctl/functions/trigger_test.go | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/ctl/functions/putstate_test.go b/pkg/ctl/functions/putstate_test.go index b25a1aed..922d4420 100644 --- a/pkg/ctl/functions/putstate_test.go +++ b/pkg/ctl/functions/putstate_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "github.com/streamnative/pulsarctl/pkg/pulsar" "github.com/stretchr/testify/assert" + "strings" "testing" "time" ) @@ -46,7 +47,7 @@ func TestStateFunctions(t *testing.T) { assert.Equal(t, out.String(), "Created test-functions-putstate successfully") // wait the function create successfully - time.Sleep(time.Second * 3) + time.Sleep(time.Second * 5) putstateArgs := []string{"putstate", "--name", "test-functions-putstate", @@ -55,6 +56,7 @@ func TestStateFunctions(t *testing.T) { outPutState, _, err := TestFunctionsCommands(putstateFunctionsCmd, putstateArgs) assert.Nil(t, err) + t.Logf("outPutState:%s", outPutState.String()) assert.Equal(t, outPutState.String(), "PutState successfully") // test failure case for put state @@ -65,8 +67,8 @@ func TestStateFunctions(t *testing.T) { _, execErrMsg, _ := TestFunctionsCommands(putstateFunctionsCmd, failureStateArgs) assert.NotNil(t, execErrMsg) - exceptMsg := "code: 404 reason: Stream 'not-exist' is not found" - assert.Equal(t, execErrMsg.Error(), exceptMsg) + exceptMsg := "'not-exist' is not found" + assert.True(t, strings.Contains(execErrMsg.Error(), exceptMsg)) // query state queryStateArgs := []string{"querystate", @@ -76,6 +78,7 @@ func TestStateFunctions(t *testing.T) { outQueryState, _, err := TestFunctionsCommands(querystateFunctionsCmd, queryStateArgs) assert.Nil(t, err) + t.Logf("outQueryState:%s", outQueryState.String()) var state pulsar.FunctionState err = json.Unmarshal(outQueryState.Bytes(), &state) diff --git a/pkg/ctl/functions/trigger_test.go b/pkg/ctl/functions/trigger_test.go index 730c8875..a4e9bd31 100644 --- a/pkg/ctl/functions/trigger_test.go +++ b/pkg/ctl/functions/trigger_test.go @@ -46,7 +46,7 @@ func TestTriggerFunctions(t *testing.T) { assert.Equal(t, out.String(), "Created test-functions-trigger successfully") // wait the function create successfully - time.Sleep(time.Second * 3) + time.Sleep(time.Second * 5) triggerArgs := []string{"trigger", "--name", "test-functions-trigger", From 0cc45afbe7e2705c4888572ef1369cc260735cc3 Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Fri, 6 Sep 2019 10:57:51 +0800 Subject: [PATCH 06/13] fix ci error Signed-off-by: xiaolong.ran --- pkg/ctl/functions/putstate_test.go | 4 ++++ pkg/ctl/functions/status_test.go | 2 ++ pkg/ctl/functions/trigger_test.go | 4 ++++ 3 files changed, 10 insertions(+) diff --git a/pkg/ctl/functions/putstate_test.go b/pkg/ctl/functions/putstate_test.go index 922d4420..9f59b619 100644 --- a/pkg/ctl/functions/putstate_test.go +++ b/pkg/ctl/functions/putstate_test.go @@ -50,6 +50,8 @@ func TestStateFunctions(t *testing.T) { time.Sleep(time.Second * 5) putstateArgs := []string{"putstate", + "--tenant", "public", + "--namespace", "default", "--name", "test-functions-putstate", "--state", "{\"key\":\"pulsar\", \"stringValue\":\"hello\"}", } @@ -72,6 +74,8 @@ func TestStateFunctions(t *testing.T) { // query state queryStateArgs := []string{"querystate", + "--tenant", "public", + "--namespace", "default", "--name", "test-functions-putstate", "--key", "pulsar", } diff --git a/pkg/ctl/functions/status_test.go b/pkg/ctl/functions/status_test.go index 8f77a174..71035609 100644 --- a/pkg/ctl/functions/status_test.go +++ b/pkg/ctl/functions/status_test.go @@ -50,6 +50,8 @@ func TestStatusFunctions(t *testing.T) { time.Sleep(time.Second * 3) statusArgs := []string{"status", + "--tenant", "public", + "--namespace", "default", "--name", "test-functions-status", } diff --git a/pkg/ctl/functions/trigger_test.go b/pkg/ctl/functions/trigger_test.go index a4e9bd31..dcbc5d71 100644 --- a/pkg/ctl/functions/trigger_test.go +++ b/pkg/ctl/functions/trigger_test.go @@ -49,6 +49,8 @@ func TestTriggerFunctions(t *testing.T) { time.Sleep(time.Second * 5) triggerArgs := []string{"trigger", + "--tenant", "public", + "--namespace", "default", "--name", "test-functions-trigger", "--topic", "test-input-topic", "--trigger-value", "hello pulsar", @@ -60,6 +62,8 @@ func TestTriggerFunctions(t *testing.T) { } statsArgs := []string{"stats", + "--tenant", "public", + "--namespace", "default", "--name", "test-functions-trigger", } From 3b0a0cbd4f1abd437eb34f7147429e6b282b6677 Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Fri, 6 Sep 2019 11:25:54 +0800 Subject: [PATCH 07/13] fix ci error Signed-off-by: xiaolong.ran --- pkg/ctl/functions/putstate_test.go | 10 ++++++++-- pkg/ctl/functions/trigger_test.go | 15 ++++++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/pkg/ctl/functions/putstate_test.go b/pkg/ctl/functions/putstate_test.go index 9f59b619..26005830 100644 --- a/pkg/ctl/functions/putstate_test.go +++ b/pkg/ctl/functions/putstate_test.go @@ -42,8 +42,11 @@ func TestStateFunctions(t *testing.T) { "--jar", basePath + "/test/functions/api-examples.jar", } - out, _, err := TestFunctionsCommands(createFunctionsCmd, args) + out, execErr, err := TestFunctionsCommands(createFunctionsCmd, args) assert.Nil(t, err) + if execErr != nil { + t.Errorf("create functions error value: %s", execErr.Error()) + } assert.Equal(t, out.String(), "Created test-functions-putstate successfully") // wait the function create successfully @@ -56,8 +59,11 @@ func TestStateFunctions(t *testing.T) { "--state", "{\"key\":\"pulsar\", \"stringValue\":\"hello\"}", } - outPutState, _, err := TestFunctionsCommands(putstateFunctionsCmd, putstateArgs) + outPutState, execE, err := TestFunctionsCommands(putstateFunctionsCmd, putstateArgs) assert.Nil(t, err) + if execE != nil { + t.Errorf("put state functions error value: %s ", execE.Error()) + } t.Logf("outPutState:%s", outPutState.String()) assert.Equal(t, outPutState.String(), "PutState successfully") diff --git a/pkg/ctl/functions/trigger_test.go b/pkg/ctl/functions/trigger_test.go index dcbc5d71..8127eb03 100644 --- a/pkg/ctl/functions/trigger_test.go +++ b/pkg/ctl/functions/trigger_test.go @@ -41,8 +41,11 @@ func TestTriggerFunctions(t *testing.T) { "--jar", basePath + "/test/functions/api-examples.jar", } - out, _, err := TestFunctionsCommands(createFunctionsCmd, args) + out, execErr, err := TestFunctionsCommands(createFunctionsCmd, args) assert.Nil(t, err) + if execErr != nil { + t.Errorf("create fucntions error value: %s", execErr.Error()) + } assert.Equal(t, out.String(), "Created test-functions-trigger successfully") // wait the function create successfully @@ -57,8 +60,11 @@ func TestTriggerFunctions(t *testing.T) { } for i := 0; i < 5; i++ { - _, _, err = TestFunctionsCommands(triggerFunctionsCmd, triggerArgs) + _, execE, err := TestFunctionsCommands(triggerFunctionsCmd, triggerArgs) assert.Nil(t, err) + if execE != nil { + t.Errorf("trigger functions error value: %s", execE.Error()) + } } statsArgs := []string{"stats", @@ -67,7 +73,10 @@ func TestTriggerFunctions(t *testing.T) { "--name", "test-functions-trigger", } - outStats, _, err := TestFunctionsCommands(statsFunctionsCmd, statsArgs) + outStats, statsErr, err := TestFunctionsCommands(statsFunctionsCmd, statsArgs) + if statsErr != nil { + t.Errorf("stats functions error value: %s", statsErr.Error()) + } assert.Nil(t, err) var stats pulsar.FunctionStats From 228ea1748b6498a3d10737e4ce63d74626bdc699 Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Fri, 6 Sep 2019 11:40:47 +0800 Subject: [PATCH 08/13] fix ci error Signed-off-by: xiaolong.ran --- pkg/ctl/functions/putstate_test.go | 4 ++-- pkg/ctl/functions/status_test.go | 2 +- pkg/ctl/functions/trigger_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/ctl/functions/putstate_test.go b/pkg/ctl/functions/putstate_test.go index 26005830..f51413ec 100644 --- a/pkg/ctl/functions/putstate_test.go +++ b/pkg/ctl/functions/putstate_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/assert" "strings" "testing" - "time" + `time` ) func TestStateFunctions(t *testing.T) { @@ -50,7 +50,7 @@ func TestStateFunctions(t *testing.T) { assert.Equal(t, out.String(), "Created test-functions-putstate successfully") // wait the function create successfully - time.Sleep(time.Second * 5) + time.Sleep(time.Second * 20) putstateArgs := []string{"putstate", "--tenant", "public", diff --git a/pkg/ctl/functions/status_test.go b/pkg/ctl/functions/status_test.go index 71035609..01c60c46 100644 --- a/pkg/ctl/functions/status_test.go +++ b/pkg/ctl/functions/status_test.go @@ -47,7 +47,7 @@ func TestStatusFunctions(t *testing.T) { assert.Equal(t, out.String(), "Created test-functions-status successfully") // wait the function create successfully - time.Sleep(time.Second * 3) + time.Sleep(time.Second * 30) statusArgs := []string{"status", "--tenant", "public", diff --git a/pkg/ctl/functions/trigger_test.go b/pkg/ctl/functions/trigger_test.go index 8127eb03..5a4258b6 100644 --- a/pkg/ctl/functions/trigger_test.go +++ b/pkg/ctl/functions/trigger_test.go @@ -49,7 +49,7 @@ func TestTriggerFunctions(t *testing.T) { assert.Equal(t, out.String(), "Created test-functions-trigger successfully") // wait the function create successfully - time.Sleep(time.Second * 5) + time.Sleep(time.Second * 50) triggerArgs := []string{"trigger", "--tenant", "public", From 6c231fd944e051ed298a48fd1626810623fe7989 Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Fri, 6 Sep 2019 17:18:59 +0800 Subject: [PATCH 09/13] fix comments Signed-off-by: xiaolong.ran --- .gitignore | 1 + pkg/ctl/functions/putstate.go | 23 ++++++++++++-------- pkg/ctl/functions/putstate_test.go | 23 ++++++++++---------- pkg/ctl/functions/querystate.go | 10 ++++----- pkg/ctl/functions/stats.go | 2 +- pkg/ctl/functions/stats_test.go | 4 ---- pkg/ctl/functions/status.go | 4 ++-- pkg/ctl/functions/status_test.go | 34 ++++++++++++++++++++++++------ pkg/ctl/functions/trigger.go | 9 +++++--- pkg/ctl/functions/trigger_test.go | 20 +++++++++--------- 10 files changed, 78 insertions(+), 52 deletions(-) diff --git a/.gitignore b/.gitignore index 64ded2aa..ecd834e0 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ *.dylib pulsarctl api-examples.jar +dummyExample.jar # Test binary, build with `go test -c` *.test diff --git a/pkg/ctl/functions/putstate.go b/pkg/ctl/functions/putstate.go index 788022cb..c574710b 100644 --- a/pkg/ctl/functions/putstate.go +++ b/pkg/ctl/functions/putstate.go @@ -26,12 +26,12 @@ import ( func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { desc := pulsar.LongDescription{} - desc.CommandUsedFor = "Put the state associated with a Pulsar Function." - desc.CommandPermission = "This command requires super-user permissions." + desc.CommandUsedFor = "Put a key/value pair to the state associated with a Pulsar Function." + desc.CommandPermission = "This command requires user permissions." var examples []pulsar.Example putstate := pulsar.Example{ - Desc: "Put the state associated with a Pulsar Function", + Desc: "Put a key/value pair to the state associated with a Pulsar Function", Command: "pulsarctl functions putstate \n" + "\t--tenant public\n" + "\t--namespace default\n" + @@ -41,7 +41,7 @@ func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { examples = append(examples, putstate) putstateWithFQFN := pulsar.Example{ - Desc: "Put the state associated with a Pulsar Function with FQFN", + Desc: "Put a key/value pair to the state associated with a Pulsar Function with FQFN", Command: "pulsarctl functions putstate \n" + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions] \n" + "\t--state \"{\"key\":\"pulsar\", \"stringValue\":\"hello\"}\"", @@ -52,7 +52,7 @@ func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { var out []pulsar.Output successOut := pulsar.Output{ Desc: "normal output", - Out: "PutState successfully", + Out: "Put state successfully", } failOut := pulsar.Output{ @@ -61,16 +61,21 @@ func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { } failOutWithNameNotExist := pulsar.Output{ - Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Desc: "The name of Pulsar Functions doesn't exist, please check the `--name` arg", Out: "[✖] code: 404 reason: Function doesn't exist", } - out = append(out, successOut, failOut, failOutWithNameNotExist) + failOutWithWrongJson := pulsar.Output{ + Desc: "unexpected end of JSON input, please check the `--state` arg", + Out: "[✖] unexpected end of JSON input", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithWrongJson) desc.CommandOutput = out vc.SetDescription( "putstate", - "Put the state associated with a Pulsar Function", + "Put a key/value pair to the state associated with a Pulsar Function", desc.ToString(), "putstate", ) @@ -135,7 +140,7 @@ func doPutStateFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) err if err != nil { cmdutils.PrintError(vc.Command.OutOrStderr(), err) } else { - vc.Command.Printf("PutState successfully") + vc.Command.Printf("Put state %+v successfully", state) } return err diff --git a/pkg/ctl/functions/putstate_test.go b/pkg/ctl/functions/putstate_test.go index f51413ec..eb5601df 100644 --- a/pkg/ctl/functions/putstate_test.go +++ b/pkg/ctl/functions/putstate_test.go @@ -18,12 +18,12 @@ package functions import ( + `bytes` "encoding/json" "github.com/streamnative/pulsarctl/pkg/pulsar" "github.com/stretchr/testify/assert" "strings" "testing" - `time` ) func TestStateFunctions(t *testing.T) { @@ -49,9 +49,6 @@ func TestStateFunctions(t *testing.T) { } assert.Equal(t, out.String(), "Created test-functions-putstate successfully") - // wait the function create successfully - time.Sleep(time.Second * 20) - putstateArgs := []string{"putstate", "--tenant", "public", "--namespace", "default", @@ -59,13 +56,17 @@ func TestStateFunctions(t *testing.T) { "--state", "{\"key\":\"pulsar\", \"stringValue\":\"hello\"}", } - outPutState, execE, err := TestFunctionsCommands(putstateFunctionsCmd, putstateArgs) - assert.Nil(t, err) - if execE != nil { - t.Errorf("put state functions error value: %s ", execE.Error()) + outPutState := new(bytes.Buffer) + + for { + outPutState, _, _ = TestFunctionsCommands(putstateFunctionsCmd, putstateArgs) + + if strings.Contains(outPutState.String(), "successfully") { + break + } } - t.Logf("outPutState:%s", outPutState.String()) - assert.Equal(t, outPutState.String(), "PutState successfully") + + assert.True(t, strings.Contains(outPutState.String(), "successfully")) // test failure case for put state failureStateArgs := []string{"putstate", @@ -100,7 +101,7 @@ func TestStateFunctions(t *testing.T) { // put state again outPutStateAgain, _, err := TestFunctionsCommands(putstateFunctionsCmd, putstateArgs) assert.Nil(t, err) - assert.Equal(t, outPutStateAgain.String(), "PutState successfully") + assert.True(t, strings.Contains(outPutStateAgain.String(), "successfully")) // query state again outQueryStateAgain, _, err := TestFunctionsCommands(querystateFunctionsCmd, queryStateArgs) diff --git a/pkg/ctl/functions/querystate.go b/pkg/ctl/functions/querystate.go index 2b18fa98..fdbcb7a0 100644 --- a/pkg/ctl/functions/querystate.go +++ b/pkg/ctl/functions/querystate.go @@ -26,8 +26,8 @@ import ( func querystateFunctionsCmd(vc *cmdutils.VerbCmd) { desc := pulsar.LongDescription{} - desc.CommandUsedFor = "Fetch the current state associated with a Pulsar Function." - desc.CommandPermission = "This command requires super-user permissions." + desc.CommandUsedFor = "Fetch a key/value pair from the state associated with a Pulsar Function." + desc.CommandPermission = "This command requires user permissions." var examples []pulsar.Example querystate := pulsar.Example{ @@ -42,7 +42,7 @@ func querystateFunctionsCmd(vc *cmdutils.VerbCmd) { examples = append(examples, querystate) querystateWithFQFN := pulsar.Example{ - Desc: "Fetch the current state associated with a Pulsar Function with FQFN", + Desc: "Fetch a key/value pair from the state associated with a Pulsar Function with FQFN", Command: "pulsarctl functions querystate \n" + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]\n" + "\t--key \n" + @@ -51,7 +51,7 @@ func querystateFunctionsCmd(vc *cmdutils.VerbCmd) { examples = append(examples, querystateWithFQFN) querystateNoWatch := pulsar.Example{ - Desc: "Fetch the current state associated with a Pulsar Function", + Desc: "Fetch a key/value pair from the state associated with a Pulsar Function", Command: "pulsarctl functions querystate \n" + "\t--tenant public\n" + "\t--namespace default\n" + @@ -94,7 +94,7 @@ func querystateFunctionsCmd(vc *cmdutils.VerbCmd) { vc.SetDescription( "querystate", - "Fetch the current state associated with a Pulsar Function", + "Fetch a key/value pair from the state associated with a Pulsar Function", desc.ToString(), "querystate", ) diff --git a/pkg/ctl/functions/stats.go b/pkg/ctl/functions/stats.go index 1ea9fb14..426d6e5b 100644 --- a/pkg/ctl/functions/stats.go +++ b/pkg/ctl/functions/stats.go @@ -27,7 +27,7 @@ import ( func statsFunctionsCmd(vc *cmdutils.VerbCmd) { desc := pulsar.LongDescription{} desc.CommandUsedFor = "Get the current stats of a Pulsar Function." - desc.CommandPermission = "This command requires super-user permissions." + desc.CommandPermission = "This command requires user permissions." var examples []pulsar.Example stats := pulsar.Example{ diff --git a/pkg/ctl/functions/stats_test.go b/pkg/ctl/functions/stats_test.go index 487ef5da..8642b850 100644 --- a/pkg/ctl/functions/stats_test.go +++ b/pkg/ctl/functions/stats_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" "strings" "testing" - "time" ) func TestStatsFunctions(t *testing.T) { @@ -46,9 +45,6 @@ func TestStatsFunctions(t *testing.T) { assert.Nil(t, err) assert.Equal(t, out.String(), "Created test-functions-stats successfully") - // wait the function create successfully - time.Sleep(time.Second * 3) - statsArgs := []string{"stats", "--name", "test-functions-stats", } diff --git a/pkg/ctl/functions/status.go b/pkg/ctl/functions/status.go index b2f24711..1dd268ce 100644 --- a/pkg/ctl/functions/status.go +++ b/pkg/ctl/functions/status.go @@ -27,7 +27,7 @@ import ( func statusFunctionsCmd(vc *cmdutils.VerbCmd) { desc := pulsar.LongDescription{} desc.CommandUsedFor = "Check the current status of a Pulsar Function." - desc.CommandPermission = "This command requires super-user permissions." + desc.CommandPermission = "This command requires user permissions." var examples []pulsar.Example status := pulsar.Example{ @@ -163,7 +163,7 @@ func doStatusFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error if err != nil { cmdutils.PrintError(vc.Command.OutOrStderr(), err) } - + vc.Command.Printf("Get status successfully") cmdutils.PrintJson(vc.Command.OutOrStdout(), functionStatus) } diff --git a/pkg/ctl/functions/status_test.go b/pkg/ctl/functions/status_test.go index 01c60c46..28f9bb91 100644 --- a/pkg/ctl/functions/status_test.go +++ b/pkg/ctl/functions/status_test.go @@ -18,12 +18,12 @@ package functions import ( - "encoding/json" + `bytes` + `encoding/json` "github.com/streamnative/pulsarctl/pkg/pulsar" "github.com/stretchr/testify/assert" "strings" "testing" - "time" ) func TestStatusFunctions(t *testing.T) { @@ -46,8 +46,22 @@ func TestStatusFunctions(t *testing.T) { assert.Nil(t, err) assert.Equal(t, out.String(), "Created test-functions-status successfully") - // wait the function create successfully - time.Sleep(time.Second * 30) + getArgs := []string{"get", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-get", + } + + out, _, _ = TestFunctionsCommands(getFunctionsCmd, getArgs) + assert.Nil(t, err) + + var functionConfig pulsar.FunctionConfig + err = json.Unmarshal(out.Bytes(), &functionConfig) + assert.Nil(t, err) + + assert.Equal(t, functionConfig.Tenant, "public") + assert.Equal(t, functionConfig.Namespace, "default") + assert.Equal(t, functionConfig.Name, "test-functions-status") statusArgs := []string{"status", "--tenant", "public", @@ -55,10 +69,16 @@ func TestStatusFunctions(t *testing.T) { "--name", "test-functions-status", } - outStatus, _, err := TestFunctionsCommands(statusFunctionsCmd, statusArgs) - assert.Nil(t, err) - + outStatus := new(bytes.Buffer) var status pulsar.FunctionStatus + + for { + outStatus, _, _ = TestFunctionsCommands(statusFunctionsCmd, statusArgs) + if strings.Contains(outStatus.String(), "successfully") { + break + } + } + err = json.Unmarshal(outStatus.Bytes(), &status) assert.Nil(t, err) diff --git a/pkg/ctl/functions/trigger.go b/pkg/ctl/functions/trigger.go index cd985b33..f39eac89 100644 --- a/pkg/ctl/functions/trigger.go +++ b/pkg/ctl/functions/trigger.go @@ -19,7 +19,6 @@ package functions import ( "errors" - "fmt" "github.com/spf13/pflag" "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/streamnative/pulsarctl/pkg/pulsar" @@ -28,7 +27,7 @@ import ( func triggerFunctionsCmd(vc *cmdutils.VerbCmd) { desc := pulsar.LongDescription{} desc.CommandUsedFor = "Trigger the specified Pulsar Function with a supplied value." - desc.CommandPermission = "This command requires super-user permissions." + desc.CommandPermission = "This command requires user permissions." var examples []pulsar.Example trigger := pulsar.Example{ @@ -165,11 +164,15 @@ func doTriggerFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) erro return errors.New("either a trigger value or a trigger filepath needs to be specified") } + if funcData.TriggerValue != "" && funcData.TriggerFile != "" { + return errors.New("either a triggerValue or a triggerFile needs to specified for the function, cannot specify both") + } + retval, err := admin.Functions().TriggerFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName, funcData.Topic, funcData.TriggerValue, funcData.TriggerFile) if err != nil { cmdutils.PrintError(vc.Command.OutOrStderr(), err) } else { - fmt.Println(retval) + vc.Command.Printf(retval) } return err } diff --git a/pkg/ctl/functions/trigger_test.go b/pkg/ctl/functions/trigger_test.go index 5a4258b6..d7db0099 100644 --- a/pkg/ctl/functions/trigger_test.go +++ b/pkg/ctl/functions/trigger_test.go @@ -18,9 +18,11 @@ package functions import ( + `bytes` "encoding/json" "github.com/streamnative/pulsarctl/pkg/pulsar" "github.com/stretchr/testify/assert" + `strings` "testing" "time" ) @@ -48,9 +50,6 @@ func TestTriggerFunctions(t *testing.T) { } assert.Equal(t, out.String(), "Created test-functions-trigger successfully") - // wait the function create successfully - time.Sleep(time.Second * 50) - triggerArgs := []string{"trigger", "--tenant", "public", "--namespace", "default", @@ -59,11 +58,12 @@ func TestTriggerFunctions(t *testing.T) { "--trigger-value", "hello pulsar", } - for i := 0; i < 5; i++ { - _, execE, err := TestFunctionsCommands(triggerFunctionsCmd, triggerArgs) - assert.Nil(t, err) - if execE != nil { - t.Errorf("trigger functions error value: %s", execE.Error()) + triggerOut := new(bytes.Buffer) + errStr := "Function in trigger function is not ready" + for { + triggerOut, execErr, _ = TestFunctionsCommands(triggerFunctionsCmd, triggerArgs) + if !strings.Contains(triggerOut.String(), errStr) { + break } } @@ -83,8 +83,8 @@ func TestTriggerFunctions(t *testing.T) { err = json.Unmarshal(outStats.Bytes(), &stats) assert.Nil(t, err) - assert.Equal(t, int64(5), stats.ReceivedTotal) - assert.Equal(t, int64(5), stats.ProcessedSuccessfullyTotal) + assert.Equal(t, int64(1), stats.ReceivedTotal) + assert.Equal(t, int64(1), stats.ProcessedSuccessfullyTotal) } func TestTriggerFunctionsFailure(t *testing.T) { From f2ab31e8a9e9b1564632d7fe9bec7d8411844a09 Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Fri, 6 Sep 2019 18:32:32 +0800 Subject: [PATCH 10/13] fix comments Signed-off-by: xiaolong.ran --- pkg/ctl/functions/status.go | 1 - pkg/ctl/functions/status_test.go | 9 +++++---- pkg/ctl/functions/trigger_test.go | 26 +++++++++++++++++++------- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pkg/ctl/functions/status.go b/pkg/ctl/functions/status.go index 1dd268ce..1c60b107 100644 --- a/pkg/ctl/functions/status.go +++ b/pkg/ctl/functions/status.go @@ -163,7 +163,6 @@ func doStatusFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error if err != nil { cmdutils.PrintError(vc.Command.OutOrStderr(), err) } - vc.Command.Printf("Get status successfully") cmdutils.PrintJson(vc.Command.OutOrStdout(), functionStatus) } diff --git a/pkg/ctl/functions/status_test.go b/pkg/ctl/functions/status_test.go index 28f9bb91..a270776f 100644 --- a/pkg/ctl/functions/status_test.go +++ b/pkg/ctl/functions/status_test.go @@ -49,14 +49,14 @@ func TestStatusFunctions(t *testing.T) { getArgs := []string{"get", "--tenant", "public", "--namespace", "default", - "--name", "test-functions-get", + "--name", "test-functions-status", } - out, _, _ = TestFunctionsCommands(getFunctionsCmd, getArgs) + outGet, _, _ := TestFunctionsCommands(getFunctionsCmd, getArgs) assert.Nil(t, err) var functionConfig pulsar.FunctionConfig - err = json.Unmarshal(out.Bytes(), &functionConfig) + err = json.Unmarshal(outGet.Bytes(), &functionConfig) assert.Nil(t, err) assert.Equal(t, functionConfig.Tenant, "public") @@ -74,11 +74,12 @@ func TestStatusFunctions(t *testing.T) { for { outStatus, _, _ = TestFunctionsCommands(statusFunctionsCmd, statusArgs) - if strings.Contains(outStatus.String(), "successfully") { + if strings.Contains(outStatus.String(), "true") { break } } + t.Log(outStatus.String()) err = json.Unmarshal(outStatus.Bytes(), &status) assert.Nil(t, err) diff --git a/pkg/ctl/functions/trigger_test.go b/pkg/ctl/functions/trigger_test.go index d7db0099..91b8395a 100644 --- a/pkg/ctl/functions/trigger_test.go +++ b/pkg/ctl/functions/trigger_test.go @@ -50,6 +50,22 @@ func TestTriggerFunctions(t *testing.T) { } assert.Equal(t, out.String(), "Created test-functions-trigger successfully") + statusArgs := []string{"status", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-status", + } + + outStatus := new(bytes.Buffer) + + for { + outStatus, _, _ = TestFunctionsCommands(statusFunctionsCmd, statusArgs) + t.Log(outStatus.String()) + if strings.Contains(outStatus.String(), "true") { + break + } + } + triggerArgs := []string{"trigger", "--tenant", "public", "--namespace", "default", @@ -59,13 +75,9 @@ func TestTriggerFunctions(t *testing.T) { } triggerOut := new(bytes.Buffer) - errStr := "Function in trigger function is not ready" - for { - triggerOut, execErr, _ = TestFunctionsCommands(triggerFunctionsCmd, triggerArgs) - if !strings.Contains(triggerOut.String(), errStr) { - break - } - } + //errStr := "Function in trigger function is not ready" + triggerOut, execErr, _ = TestFunctionsCommands(triggerFunctionsCmd, triggerArgs) + t.Log(triggerOut.String()) statsArgs := []string{"stats", "--tenant", "public", From bd416cc77ee2285d30dcdc31a43f0a55e45728d0 Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Fri, 6 Sep 2019 20:47:21 +0800 Subject: [PATCH 11/13] fix comments Signed-off-by: xiaolong.ran --- pkg/ctl/functions/trigger_test.go | 56 +++++++++++-------------------- 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/pkg/ctl/functions/trigger_test.go b/pkg/ctl/functions/trigger_test.go index 91b8395a..8aef0d31 100644 --- a/pkg/ctl/functions/trigger_test.go +++ b/pkg/ctl/functions/trigger_test.go @@ -19,10 +19,9 @@ package functions import ( `bytes` - "encoding/json" - "github.com/streamnative/pulsarctl/pkg/pulsar" + `encoding/json` + `github.com/streamnative/pulsarctl/pkg/pulsar` "github.com/stretchr/testify/assert" - `strings` "testing" "time" ) @@ -39,7 +38,7 @@ func TestTriggerFunctions(t *testing.T) { "--name", "test-functions-trigger", "--inputs", "test-input-topic", "--output", "persistent://public/default/test-output-topic", - "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--classname", "org.apache.pulsar.functions.api.examples.WordCountFunction", "--jar", basePath + "/test/functions/api-examples.jar", } @@ -50,22 +49,21 @@ func TestTriggerFunctions(t *testing.T) { } assert.Equal(t, out.String(), "Created test-functions-trigger successfully") - statusArgs := []string{"status", + statsArgs := []string{"stats", "--tenant", "public", "--namespace", "default", - "--name", "test-functions-status", + "--name", "test-functions-trigger", } + outStats := new(bytes.Buffer) - outStatus := new(bytes.Buffer) - - for { - outStatus, _, _ = TestFunctionsCommands(statusFunctionsCmd, statusArgs) - t.Log(outStatus.String()) - if strings.Contains(outStatus.String(), "true") { - break - } - } + outStats, _, _ = TestFunctionsCommands(statsFunctionsCmd, statsArgs) + var stats pulsar.FunctionStats + err = json.Unmarshal(outStats.Bytes(), &stats) + assert.Nil(t, err) + assert.Equal(t, int64(0), stats.ReceivedTotal) + assert.Equal(t, int64(0), stats.ProcessedSuccessfullyTotal) + // send trigger cmd to broker triggerArgs := []string{"trigger", "--tenant", "public", "--namespace", "default", @@ -75,28 +73,14 @@ func TestTriggerFunctions(t *testing.T) { } triggerOut := new(bytes.Buffer) - //errStr := "Function in trigger function is not ready" - triggerOut, execErr, _ = TestFunctionsCommands(triggerFunctionsCmd, triggerArgs) - t.Log(triggerOut.String()) - - statsArgs := []string{"stats", - "--tenant", "public", - "--namespace", "default", - "--name", "test-functions-trigger", - } - - outStats, statsErr, err := TestFunctionsCommands(statsFunctionsCmd, statsArgs) - if statsErr != nil { - t.Errorf("stats functions error value: %s", statsErr.Error()) + for i := 0; i < 2; i++ { + triggerOut, execErr, err = TestFunctionsCommands(triggerFunctionsCmd, triggerArgs) + assert.Nil(t, err) + if execErr != nil { + t.Error(execErr.Error()) + } + t.Log(triggerOut.String()) } - assert.Nil(t, err) - - var stats pulsar.FunctionStats - err = json.Unmarshal(outStats.Bytes(), &stats) - assert.Nil(t, err) - - assert.Equal(t, int64(1), stats.ReceivedTotal) - assert.Equal(t, int64(1), stats.ProcessedSuccessfullyTotal) } func TestTriggerFunctionsFailure(t *testing.T) { From 7750959513dde5ebc301559599f99b4eaf19b22b Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Sat, 7 Sep 2019 18:36:21 +0800 Subject: [PATCH 12/13] fix comments Signed-off-by: xiaolong.ran --- pkg/ctl/functions/functions.go | 8 +++++ pkg/ctl/functions/putstate.go | 47 +++++++++++++++++++----------- pkg/ctl/functions/putstate_test.go | 4 +-- pkg/ctl/functions/querystate.go | 2 +- pkg/ctl/functions/stats.go | 2 +- pkg/ctl/functions/status.go | 2 +- pkg/ctl/functions/trigger.go | 2 +- 7 files changed, 44 insertions(+), 23 deletions(-) diff --git a/pkg/ctl/functions/functions.go b/pkg/ctl/functions/functions.go index 7301da0f..f9c7a384 100644 --- a/pkg/ctl/functions/functions.go +++ b/pkg/ctl/functions/functions.go @@ -18,10 +18,18 @@ package functions import ( + `errors` "github.com/spf13/cobra" "github.com/streamnative/pulsarctl/pkg/cmdutils" ) +var checkPutStateArgs = func(args []string) error { + if len(args) < 2 { + return errors.New("need to specified the state key and state value") + } + return nil +} + func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { resourceCmd := cmdutils.NewResourceCmd( "functions", diff --git a/pkg/ctl/functions/putstate.go b/pkg/ctl/functions/putstate.go index c574710b..8a5ab6d8 100644 --- a/pkg/ctl/functions/putstate.go +++ b/pkg/ctl/functions/putstate.go @@ -18,33 +18,44 @@ package functions import ( - "encoding/json" "github.com/spf13/pflag" "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/streamnative/pulsarctl/pkg/pulsar" + `io/ioutil` + `strings` ) func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { desc := pulsar.LongDescription{} desc.CommandUsedFor = "Put a key/value pair to the state associated with a Pulsar Function." - desc.CommandPermission = "This command requires user permissions." + desc.CommandPermission = "This command requires namespace function permissions." var examples []pulsar.Example putstate := pulsar.Example{ - Desc: "Put a key/value pair to the state associated with a Pulsar Function", + Desc: "Put a key/ pair to the state associated with a Pulsar Function", Command: "pulsarctl functions putstate \n" + "\t--tenant public\n" + "\t--namespace default\n" + "\t--name \n" + - "\t--state \"{\"key\":\"pulsar\", \"stringValue\":\"hello\"}\" ", + "\t ", } examples = append(examples, putstate) + putstateWithByte := pulsar.Example{ + Desc: "Put a key/ pair to the state associated with a Pulsar Function", + Command: "pulsarctl functions putstate \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t - ", + } + examples = append(examples, putstateWithByte) + putstateWithFQFN := pulsar.Example{ Desc: "Put a key/value pair to the state associated with a Pulsar Function with FQFN", Command: "pulsarctl functions putstate \n" + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions] \n" + - "\t--state \"{\"key\":\"pulsar\", \"stringValue\":\"hello\"}\"", + "\t - ", } examples = append(examples, putstateWithFQFN) desc.CommandExamples = examples @@ -83,9 +94,9 @@ func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { functionData := &pulsar.FunctionData{} // set the run function - vc.SetRunFunc(func() error { + vc.SetRunFuncWithNameArgs(func() error { return doPutStateFunction(vc, functionData) - }) + }, checkPutStateArgs) // register the params vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { @@ -112,13 +123,6 @@ func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { "name", "", "The name of a Pulsar Function") - - flagSet.StringVarP( - &functionData.State, - "state", - "t", - "", - "The FunctionState that needs to be put") }) } @@ -131,9 +135,18 @@ func doPutStateFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) err admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) var state pulsar.FunctionState - err = json.Unmarshal([]byte(funcData.State), &state) - if err != nil { - return err + + state.Key = vc.NameArgs[0] + value := vc.NameArgs[1] + + if value == "-" { + contents, err := ioutil.ReadFile(vc.NameArgs[2]) + if err != nil { + return err + } + state.ByteValue = contents + } else { + state.StringValue = strings.Join(vc.NameArgs[1:], " ") } err = admin.Functions().PutFunctionState(funcData.Tenant, funcData.Namespace, funcData.FuncName, state) diff --git a/pkg/ctl/functions/putstate_test.go b/pkg/ctl/functions/putstate_test.go index eb5601df..0504e1f4 100644 --- a/pkg/ctl/functions/putstate_test.go +++ b/pkg/ctl/functions/putstate_test.go @@ -53,7 +53,7 @@ func TestStateFunctions(t *testing.T) { "--tenant", "public", "--namespace", "default", "--name", "test-functions-putstate", - "--state", "{\"key\":\"pulsar\", \"stringValue\":\"hello\"}", + "pulsar", "hello", } outPutState := new(bytes.Buffer) @@ -71,7 +71,7 @@ func TestStateFunctions(t *testing.T) { // test failure case for put state failureStateArgs := []string{"putstate", "--name", "not-exist", - "--state", "{\"key\":\"pulsar\", \"stringValue\":\"hello\"}", + "pulsar", "hello", } _, execErrMsg, _ := TestFunctionsCommands(putstateFunctionsCmd, failureStateArgs) diff --git a/pkg/ctl/functions/querystate.go b/pkg/ctl/functions/querystate.go index fdbcb7a0..dc5b22d0 100644 --- a/pkg/ctl/functions/querystate.go +++ b/pkg/ctl/functions/querystate.go @@ -27,7 +27,7 @@ import ( func querystateFunctionsCmd(vc *cmdutils.VerbCmd) { desc := pulsar.LongDescription{} desc.CommandUsedFor = "Fetch a key/value pair from the state associated with a Pulsar Function." - desc.CommandPermission = "This command requires user permissions." + desc.CommandPermission = "This command requires namespace function permissions." var examples []pulsar.Example querystate := pulsar.Example{ diff --git a/pkg/ctl/functions/stats.go b/pkg/ctl/functions/stats.go index 426d6e5b..87045027 100644 --- a/pkg/ctl/functions/stats.go +++ b/pkg/ctl/functions/stats.go @@ -27,7 +27,7 @@ import ( func statsFunctionsCmd(vc *cmdutils.VerbCmd) { desc := pulsar.LongDescription{} desc.CommandUsedFor = "Get the current stats of a Pulsar Function." - desc.CommandPermission = "This command requires user permissions." + desc.CommandPermission = "This command requires namespace function permissions." var examples []pulsar.Example stats := pulsar.Example{ diff --git a/pkg/ctl/functions/status.go b/pkg/ctl/functions/status.go index 1c60b107..89427806 100644 --- a/pkg/ctl/functions/status.go +++ b/pkg/ctl/functions/status.go @@ -27,7 +27,7 @@ import ( func statusFunctionsCmd(vc *cmdutils.VerbCmd) { desc := pulsar.LongDescription{} desc.CommandUsedFor = "Check the current status of a Pulsar Function." - desc.CommandPermission = "This command requires user permissions." + desc.CommandPermission = "This command requires namespace function permissions." var examples []pulsar.Example status := pulsar.Example{ diff --git a/pkg/ctl/functions/trigger.go b/pkg/ctl/functions/trigger.go index f39eac89..1533796e 100644 --- a/pkg/ctl/functions/trigger.go +++ b/pkg/ctl/functions/trigger.go @@ -27,7 +27,7 @@ import ( func triggerFunctionsCmd(vc *cmdutils.VerbCmd) { desc := pulsar.LongDescription{} desc.CommandUsedFor = "Trigger the specified Pulsar Function with a supplied value." - desc.CommandPermission = "This command requires user permissions." + desc.CommandPermission = "This command requires namespace function permissions." var examples []pulsar.Example trigger := pulsar.Example{ From 665dc6311da45342f3459abf0850b95c5646bf2c Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Mon, 9 Sep 2019 10:48:59 +0800 Subject: [PATCH 13/13] fix comments Signed-off-by: xiaolong.ran --- pkg/ctl/functions/putstate.go | 37 +++++++----- pkg/ctl/functions/putstate_test.go | 92 +++++++++++++++++++++++++++++- 2 files changed, 114 insertions(+), 15 deletions(-) diff --git a/pkg/ctl/functions/putstate.go b/pkg/ctl/functions/putstate.go index 8a5ab6d8..eec35e7d 100644 --- a/pkg/ctl/functions/putstate.go +++ b/pkg/ctl/functions/putstate.go @@ -18,11 +18,13 @@ package functions import ( + "fmt" + "github.com/pkg/errors" "github.com/spf13/pflag" "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/streamnative/pulsarctl/pkg/pulsar" - `io/ioutil` - `strings` + "io/ioutil" + "strings" ) func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { @@ -37,17 +39,17 @@ func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { "\t--tenant public\n" + "\t--namespace default\n" + "\t--name \n" + - "\t ", + "\t - ", } examples = append(examples, putstate) putstateWithByte := pulsar.Example{ - Desc: "Put a key/ pair to the state associated with a Pulsar Function", + Desc: "Put a key/ pair to the state associated with a Pulsar Function", Command: "pulsarctl functions putstate \n" + - "\t--tenant public\n" + - "\t--namespace default\n" + - "\t--name \n" + - "\t - ", + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t = ", } examples = append(examples, putstateWithByte) @@ -76,12 +78,17 @@ func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { Out: "[✖] code: 404 reason: Function doesn't exist", } - failOutWithWrongJson := pulsar.Output{ - Desc: "unexpected end of JSON input, please check the `--state` arg", - Out: "[✖] unexpected end of JSON input", + failOutWithKeyOrValueNotExist := pulsar.Output{ + Desc: "The state key and state value not specified, please check your input format", + Out: "[✖] need to specified the state key and state value", } - out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithWrongJson) + fileOutErrInputFormat := pulsar.Output{ + Desc: "The format of the input is incorrect, please check.", + Out: "[✖] error input format", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithKeyOrValueNotExist, fileOutErrInputFormat) desc.CommandOutput = out vc.SetDescription( @@ -139,14 +146,18 @@ func doPutStateFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) err state.Key = vc.NameArgs[0] value := vc.NameArgs[1] + fmt.Println("value:", value) + if value == "-" { + state.StringValue = strings.Join(vc.NameArgs[2:], " ") + } else if value == "=" { contents, err := ioutil.ReadFile(vc.NameArgs[2]) if err != nil { return err } state.ByteValue = contents } else { - state.StringValue = strings.Join(vc.NameArgs[1:], " ") + return errors.New("error input format") } err = admin.Functions().PutFunctionState(funcData.Tenant, funcData.Namespace, funcData.FuncName, state) diff --git a/pkg/ctl/functions/putstate_test.go b/pkg/ctl/functions/putstate_test.go index 0504e1f4..f7eadd36 100644 --- a/pkg/ctl/functions/putstate_test.go +++ b/pkg/ctl/functions/putstate_test.go @@ -18,10 +18,12 @@ package functions import ( - `bytes` + "bytes" "encoding/json" "github.com/streamnative/pulsarctl/pkg/pulsar" "github.com/stretchr/testify/assert" + "io/ioutil" + "os" "strings" "testing" ) @@ -53,7 +55,7 @@ func TestStateFunctions(t *testing.T) { "--tenant", "public", "--namespace", "default", "--name", "test-functions-putstate", - "pulsar", "hello", + "pulsar", "-", "hello", } outPutState := new(bytes.Buffer) @@ -71,14 +73,26 @@ func TestStateFunctions(t *testing.T) { // test failure case for put state failureStateArgs := []string{"putstate", "--name", "not-exist", + "pulsar", "-", "hello", + } + + stateArgsErrInFormat := []string{"putstate", + "--name", "test-functions-putstate", "pulsar", "hello", } _, execErrMsg, _ := TestFunctionsCommands(putstateFunctionsCmd, failureStateArgs) assert.NotNil(t, execErrMsg) exceptMsg := "'not-exist' is not found" + t.Logf("error message:%s", execErrMsg.Error()) assert.True(t, strings.Contains(execErrMsg.Error(), exceptMsg)) + _, errMsg, _ := TestFunctionsCommands(putstateFunctionsCmd, stateArgsErrInFormat) + assert.NotNil(t, errMsg) + exceptErrMsg := "error input format" + t.Logf("err message:%s", errMsg.Error()) + assert.True(t, strings.Contains(errMsg.Error(), exceptErrMsg)) + // query state queryStateArgs := []string{"querystate", "--tenant", "public", @@ -113,3 +127,77 @@ func TestStateFunctions(t *testing.T) { assert.Equal(t, int64(1), stateAgain.Version) } + +func TestByteValue(t *testing.T) { + basePath, err := getDirHelp() + if basePath == "" || err != nil { + t.Error(err) + } + t.Logf("base path: %s", basePath) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-putstate-byte-value", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", basePath + "/test/functions/api-examples.jar", + } + + out, execErr, err := TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + if execErr != nil { + t.Errorf("create functions error value: %s", execErr.Error()) + } + assert.Equal(t, out.String(), "Created test-functions-putstate-byte-value successfully") + + buf := "hello pulsar!" + file, err := ioutil.TempFile("", "tmpfile") + if err != nil { + panic(err) + } + defer os.Remove(file.Name()) + if _, err := file.Write([]byte(buf)); err != nil { + panic(err) + } + + t.Logf("file name:%s", file.Name()) + + putstateArgs := []string{"putstate", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-putstate-byte-value", + "pulsar", "=", file.Name(), + } + + outPutState := new(bytes.Buffer) + + for { + outPutState, _, _ = TestFunctionsCommands(putstateFunctionsCmd, putstateArgs) + + if strings.Contains(outPutState.String(), "successfully") { + break + } + } + + assert.True(t, strings.Contains(outPutState.String(), "successfully")) + + // query state + queryStateArgs := []string{"querystate", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-putstate-byte-value", + "--key", "pulsar", + } + + outQueryState, _, err := TestFunctionsCommands(querystateFunctionsCmd, queryStateArgs) + assert.Nil(t, err) + t.Logf("outQueryState:%s", outQueryState.String()) + + var state pulsar.FunctionState + err = json.Unmarshal(outQueryState.Bytes(), &state) + assert.Nil(t, err) + + assert.Equal(t, "pulsar", state.Key) + assert.Equal(t, "hello pulsar!", state.StringValue) +}