diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index fa9c8d7a..acb33184 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,7 +19,9 @@ jobs: - name: Get dependencies run: | docker pull apachepulsar/pulsar - docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test apachepulsar/pulsar bin/pulsar standalone + docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test --name pulsarctl apachepulsar/pulsar bin/pulsar standalone + docker cp pulsarctl:/pulsar/examples/api-examples.jar ${PWD}/test/functions/ + ls -al test/functions/ sleep 10 - name: Build diff --git a/.gitignore b/.gitignore index 57d30eb0..ecd834e0 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ *.so *.dylib pulsarctl +api-examples.jar +dummyExample.jar # Test binary, build with `go test -c` *.test diff --git a/pkg/ctl/functions/functions.go b/pkg/ctl/functions/functions.go index 7fd6dad7..f9c7a384 100644 --- a/pkg/ctl/functions/functions.go +++ b/pkg/ctl/functions/functions.go @@ -18,10 +18,18 @@ package functions import ( + `errors` "github.com/spf13/cobra" "github.com/streamnative/pulsarctl/pkg/cmdutils" ) +var checkPutStateArgs = func(args []string) error { + if len(args) < 2 { + return errors.New("need to specified the state key and state value") + } + return nil +} + func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { resourceCmd := cmdutils.NewResourceCmd( "functions", @@ -39,6 +47,11 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { cmdutils.AddVerbCmd(flagGrouping, resourceCmd, listFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updateFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, statusFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, statsFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, querystateFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, putstateFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, triggerFunctionsCmd) return resourceCmd } diff --git a/pkg/ctl/functions/putstate.go b/pkg/ctl/functions/putstate.go new file mode 100644 index 00000000..eec35e7d --- /dev/null +++ b/pkg/ctl/functions/putstate.go @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "fmt" + "github.com/pkg/errors" + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "io/ioutil" + "strings" +) + +func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "Put a key/value pair to the state associated with a Pulsar Function." + desc.CommandPermission = "This command requires namespace function permissions." + + var examples []pulsar.Example + putstate := pulsar.Example{ + Desc: "Put a key/ pair to the state associated with a Pulsar Function", + Command: "pulsarctl functions putstate \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t - ", + } + examples = append(examples, putstate) + + putstateWithByte := pulsar.Example{ + Desc: "Put a key/ pair to the state associated with a Pulsar Function", + Command: "pulsarctl functions putstate \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t = ", + } + examples = append(examples, putstateWithByte) + + putstateWithFQFN := pulsar.Example{ + Desc: "Put a key/value pair to the state associated with a Pulsar Function with FQFN", + Command: "pulsarctl functions putstate \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions] \n" + + "\t - ", + } + examples = append(examples, putstateWithFQFN) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "Put state successfully", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the `--name` arg", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + failOutWithKeyOrValueNotExist := pulsar.Output{ + Desc: "The state key and state value not specified, please check your input format", + Out: "[✖] need to specified the state key and state value", + } + + fileOutErrInputFormat := pulsar.Output{ + Desc: "The format of the input is incorrect, please check.", + Out: "[✖] error input format", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithKeyOrValueNotExist, fileOutErrInputFormat) + desc.CommandOutput = out + + vc.SetDescription( + "putstate", + "Put a key/value pair to the state associated with a Pulsar Function", + desc.ToString(), + "putstate", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFuncWithNameArgs(func() error { + return doPutStateFunction(vc, functionData) + }, checkPutStateArgs) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + }) +} + +func doPutStateFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + + var state pulsar.FunctionState + + state.Key = vc.NameArgs[0] + value := vc.NameArgs[1] + + fmt.Println("value:", value) + + if value == "-" { + state.StringValue = strings.Join(vc.NameArgs[2:], " ") + } else if value == "=" { + contents, err := ioutil.ReadFile(vc.NameArgs[2]) + if err != nil { + return err + } + state.ByteValue = contents + } else { + return errors.New("error input format") + } + + err = admin.Functions().PutFunctionState(funcData.Tenant, funcData.Namespace, funcData.FuncName, state) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } else { + vc.Command.Printf("Put state %+v successfully", state) + } + + return err +} diff --git a/pkg/ctl/functions/putstate_test.go b/pkg/ctl/functions/putstate_test.go new file mode 100644 index 00000000..f7eadd36 --- /dev/null +++ b/pkg/ctl/functions/putstate_test.go @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "bytes" + "encoding/json" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "github.com/stretchr/testify/assert" + "io/ioutil" + "os" + "strings" + "testing" +) + +func TestStateFunctions(t *testing.T) { + basePath, err := getDirHelp() + if basePath == "" || err != nil { + t.Error(err) + } + t.Logf("base path: %s", basePath) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-putstate", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", basePath + "/test/functions/api-examples.jar", + } + + out, execErr, err := TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + if execErr != nil { + t.Errorf("create functions error value: %s", execErr.Error()) + } + assert.Equal(t, out.String(), "Created test-functions-putstate successfully") + + putstateArgs := []string{"putstate", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-putstate", + "pulsar", "-", "hello", + } + + outPutState := new(bytes.Buffer) + + for { + outPutState, _, _ = TestFunctionsCommands(putstateFunctionsCmd, putstateArgs) + + if strings.Contains(outPutState.String(), "successfully") { + break + } + } + + assert.True(t, strings.Contains(outPutState.String(), "successfully")) + + // test failure case for put state + failureStateArgs := []string{"putstate", + "--name", "not-exist", + "pulsar", "-", "hello", + } + + stateArgsErrInFormat := []string{"putstate", + "--name", "test-functions-putstate", + "pulsar", "hello", + } + + _, execErrMsg, _ := TestFunctionsCommands(putstateFunctionsCmd, failureStateArgs) + assert.NotNil(t, execErrMsg) + exceptMsg := "'not-exist' is not found" + t.Logf("error message:%s", execErrMsg.Error()) + assert.True(t, strings.Contains(execErrMsg.Error(), exceptMsg)) + + _, errMsg, _ := TestFunctionsCommands(putstateFunctionsCmd, stateArgsErrInFormat) + assert.NotNil(t, errMsg) + exceptErrMsg := "error input format" + t.Logf("err message:%s", errMsg.Error()) + assert.True(t, strings.Contains(errMsg.Error(), exceptErrMsg)) + + // query state + queryStateArgs := []string{"querystate", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-putstate", + "--key", "pulsar", + } + + outQueryState, _, err := TestFunctionsCommands(querystateFunctionsCmd, queryStateArgs) + assert.Nil(t, err) + t.Logf("outQueryState:%s", outQueryState.String()) + + var state pulsar.FunctionState + err = json.Unmarshal(outQueryState.Bytes(), &state) + assert.Nil(t, err) + + assert.Equal(t, "pulsar", state.Key) + assert.Equal(t, "hello", state.StringValue) + assert.Equal(t, int64(0), state.Version) + // put state again + outPutStateAgain, _, err := TestFunctionsCommands(putstateFunctionsCmd, putstateArgs) + assert.Nil(t, err) + assert.True(t, strings.Contains(outPutStateAgain.String(), "successfully")) + + // query state again + outQueryStateAgain, _, err := TestFunctionsCommands(querystateFunctionsCmd, queryStateArgs) + assert.Nil(t, err) + + var stateAgain pulsar.FunctionState + err = json.Unmarshal(outQueryStateAgain.Bytes(), &stateAgain) + assert.Nil(t, err) + + assert.Equal(t, int64(1), stateAgain.Version) +} + +func TestByteValue(t *testing.T) { + basePath, err := getDirHelp() + if basePath == "" || err != nil { + t.Error(err) + } + t.Logf("base path: %s", basePath) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-putstate-byte-value", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", basePath + "/test/functions/api-examples.jar", + } + + out, execErr, err := TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + if execErr != nil { + t.Errorf("create functions error value: %s", execErr.Error()) + } + assert.Equal(t, out.String(), "Created test-functions-putstate-byte-value successfully") + + buf := "hello pulsar!" + file, err := ioutil.TempFile("", "tmpfile") + if err != nil { + panic(err) + } + defer os.Remove(file.Name()) + if _, err := file.Write([]byte(buf)); err != nil { + panic(err) + } + + t.Logf("file name:%s", file.Name()) + + putstateArgs := []string{"putstate", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-putstate-byte-value", + "pulsar", "=", file.Name(), + } + + outPutState := new(bytes.Buffer) + + for { + outPutState, _, _ = TestFunctionsCommands(putstateFunctionsCmd, putstateArgs) + + if strings.Contains(outPutState.String(), "successfully") { + break + } + } + + assert.True(t, strings.Contains(outPutState.String(), "successfully")) + + // query state + queryStateArgs := []string{"querystate", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-putstate-byte-value", + "--key", "pulsar", + } + + outQueryState, _, err := TestFunctionsCommands(querystateFunctionsCmd, queryStateArgs) + assert.Nil(t, err) + t.Logf("outQueryState:%s", outQueryState.String()) + + var state pulsar.FunctionState + err = json.Unmarshal(outQueryState.Bytes(), &state) + assert.Nil(t, err) + + assert.Equal(t, "pulsar", state.Key) + assert.Equal(t, "hello pulsar!", state.StringValue) +} diff --git a/pkg/ctl/functions/querystate.go b/pkg/ctl/functions/querystate.go new file mode 100644 index 00000000..dc5b22d0 --- /dev/null +++ b/pkg/ctl/functions/querystate.go @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "time" +) + +func querystateFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "Fetch a key/value pair from the state associated with a Pulsar Function." + desc.CommandPermission = "This command requires namespace function permissions." + + var examples []pulsar.Example + querystate := pulsar.Example{ + Desc: "Fetch the current state associated with a Pulsar Function", + Command: "pulsarctl functions querystate \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--key \n" + + "\t--watch", + } + examples = append(examples, querystate) + + querystateWithFQFN := pulsar.Example{ + Desc: "Fetch a key/value pair from the state associated with a Pulsar Function with FQFN", + Command: "pulsarctl functions querystate \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]\n" + + "\t--key \n" + + "\t--watch", + } + examples = append(examples, querystateWithFQFN) + + querystateNoWatch := pulsar.Example{ + Desc: "Fetch a key/value pair from the state associated with a Pulsar Function", + Command: "pulsarctl functions querystate \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--key ", + } + examples = append(examples, querystateNoWatch) + + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "{\n" + + " \"key\": \"pulsar\",\n" + + " \"stringValue\": \"hello\",\n" + + " \"byteValue\": null,\n" + + " \"numberValue\": 0,\n" + + " \"version\": 6\n" + + "}", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + failOutWithKeyNotExist := pulsar.Output{ + Desc: "key doesn't exist, please check --key args", + Out: "error: key doesn't exist", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithKeyNotExist) + desc.CommandOutput = out + + vc.SetDescription( + "querystate", + "Fetch a key/value pair from the state associated with a Pulsar Function", + desc.ToString(), + "querystate", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doQueryStateFunction(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVarP( + &functionData.Key, + "key", + "k", + "", + "key") + + flagSet.BoolVarP( + &functionData.Watch, + "watch", + "w", + false, + "Watch for changes in the value associated with a key for a Pulsar Function") + }) +} + +func doQueryStateFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + + for { + functionState, err := admin.Functions().GetFunctionState(funcData.Tenant, funcData.Namespace, funcData.FuncName, funcData.Key) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } else { + cmdutils.PrintJson(vc.Command.OutOrStdout(), functionState) + } + + if funcData.Watch { + time.Sleep(time.Millisecond * 1000) + } + + if !funcData.Watch { + break + } + } + return err +} diff --git a/pkg/ctl/functions/stats.go b/pkg/ctl/functions/stats.go new file mode 100644 index 00000000..87045027 --- /dev/null +++ b/pkg/ctl/functions/stats.go @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "strconv" +) + +func statsFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "Get the current stats of a Pulsar Function." + desc.CommandPermission = "This command requires namespace function permissions." + + var examples []pulsar.Example + stats := pulsar.Example{ + Desc: "Get the current stats of a Pulsar Function", + Command: "pulsarctl functions stats \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name ", + } + examples = append(examples, stats) + + statsWithFQFN := pulsar.Example{ + Desc: "Get the current stats of a Pulsar Function with FQFN", + Command: "pulsarctl functions stats \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, statsWithFQFN) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "{\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0,\n" + + " \"lastInvocation\": 0,\n" + + " \"oneMin\": {\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0\n" + + " },\n" + + " \"instances\": [\n" + + " {\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0,\n" + + " \"instanceId\": 0,\n" + + " \"metrics\": {\n" + + " \"oneMin\": {\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0\n" + + " },\n" + + " \"lastInvocation\": 0,\n" + + " \"userMetrics\": {},\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"instanceId\": 0,\n" + + " \"metrics\": {\n" + + " \"oneMin\": {\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0\n" + + " },\n" + + " \"lastInvocation\": 0,\n" + + " \"userMetrics\": null,\n" + + " \"receivedTotal\": 0,\n" + + " \"processedSuccessfullyTotal\": 0,\n" + + " \"systemExceptionsTotal\": 0,\n" + + " \"userExceptionsTotal\": 0,\n" + + " \"avgProcessLatency\": 0\n" + + " }\n" + + "}", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist) + desc.CommandOutput = out + + vc.SetDescription( + "stats", + "Get the current stats of a Pulsar Function", + desc.ToString(), + "stats", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doStatsFunction(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVar( + &functionData.InstanceID, + "instance-id", + "", + "The function instanceId (Get-stats of all instances if instance-id is not provided)") + }) +} + +func doStatsFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + if funcData.InstanceID != "" { + instanceID, err := strconv.Atoi(funcData.InstanceID) + if err != nil { + return err + } + functionInstanceStatsData, err := admin.Functions().GetFunctionStatsWithInstanceID(funcData.Tenant, funcData.Namespace, funcData.FuncName, instanceID) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } + cmdutils.PrintJson(vc.Command.OutOrStdout(), functionInstanceStatsData) + } else { + functionStats, err := admin.Functions().GetFunctionStats(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } + + cmdutils.PrintJson(vc.Command.OutOrStdout(), functionStats) + } + + return err +} diff --git a/pkg/ctl/functions/stats_test.go b/pkg/ctl/functions/stats_test.go new file mode 100644 index 00000000..8642b850 --- /dev/null +++ b/pkg/ctl/functions/stats_test.go @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "encoding/json" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "github.com/stretchr/testify/assert" + "strings" + "testing" +) + +func TestStatsFunctions(t *testing.T) { + basePath, err := getDirHelp() + if basePath == "" || err != nil { + t.Error(err) + } + t.Logf("base path: %s", basePath) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stats", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", basePath + "/test/functions/api-examples.jar", + } + + out, _, err := TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + assert.Equal(t, out.String(), "Created test-functions-stats successfully") + + statsArgs := []string{"stats", + "--name", "test-functions-stats", + } + + outStats, _, err := TestFunctionsCommands(statsFunctionsCmd, statsArgs) + assert.Nil(t, err) + + var stats pulsar.FunctionStats + err = json.Unmarshal(outStats.Bytes(), &stats) + assert.Nil(t, err) + + assert.Equal(t, int64(0), stats.ReceivedTotal) + assert.Equal(t, int64(0), stats.ProcessedSuccessfullyTotal) +} + +func TestFailureStats(t *testing.T) { + statsArgs := []string{"stats", + "--name", "test-functions-stats-failure", + } + + out, _, err := TestFunctionsCommands(statsFunctionsCmd, statsArgs) + assert.Nil(t, err) + + errMsg := "Function test-functions-stats-failure doesn't exist" + assert.True(t, strings.Contains(out.String(), errMsg)) +} diff --git a/pkg/ctl/functions/status.go b/pkg/ctl/functions/status.go new file mode 100644 index 00000000..89427806 --- /dev/null +++ b/pkg/ctl/functions/status.go @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "strconv" +) + +func statusFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "Check the current status of a Pulsar Function." + desc.CommandPermission = "This command requires namespace function permissions." + + var examples []pulsar.Example + status := pulsar.Example{ + Desc: "Check the current status of a Pulsar Function", + Command: "pulsarctl functions status \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name ", + } + examples = append(examples, status) + + statusWithFQFN := pulsar.Example{ + Desc: "Check the current status of a Pulsar Function with FQFN", + Command: "pulsarctl functions status \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, statusWithFQFN) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "{\n" + + " \"numInstances\": 1,\n" + + " \"numRunning\": 1,\n" + + " \"instances\": [\n" + + " {\n" + + " \"instanceId\": 0,\n" + + " \"status\": {\n" + + " \"running\": true,\n" + + " \"error\": \"\",\n" + + " \"numRestarts\": 0,\n" + + " \"numReceived\": 0,\n" + + " \"numSuccessfullyProcessed\": 0,\n" + + " \"numUserExceptions\": 0,\n" + + " \"latestUserExceptions\": [],\n" + + " \"numSystemExceptions\": 0,\n" + + " \"latestSystemExceptions\": [],\n" + + " \"averageLatency\": 0,\n" + + " \"lastInvocationTime\": 0,\n" + + " \"workerId\": \"c-standalone-fw-127.0.0.1-8080\"\n" + + " }\n" + + " }\n" + + " ]\n" + + "}", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + failOutWithWrongInstanceID := pulsar.Output{ + Desc: "Used an instanceID that does not exist or other impermissible actions", + Out: "[✖] code: 400 reason: Operation not permitted", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithWrongInstanceID) + desc.CommandOutput = out + + vc.SetDescription( + "status", + "Check the current status of a Pulsar Function", + desc.ToString(), + "getstatus", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doStatusFunction(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVar( + &functionData.InstanceID, + "instance-id", + "", + "The function instanceId (Get-status of all instances if instance-id is not provided)") + }) +} + +func doStatusFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + if funcData.InstanceID != "" { + instanceID, err := strconv.Atoi(funcData.InstanceID) + if err != nil { + return err + } + functionInstanceStatusData, err := admin.Functions().GetFunctionStatusWithInstanceID(funcData.Tenant, funcData.Namespace, funcData.FuncName, instanceID) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } + cmdutils.PrintJson(vc.Command.OutOrStdout(), functionInstanceStatusData) + } else { + functionStatus, err := admin.Functions().GetFunctionStatus(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } + cmdutils.PrintJson(vc.Command.OutOrStdout(), functionStatus) + } + + return err +} diff --git a/pkg/ctl/functions/status_test.go b/pkg/ctl/functions/status_test.go new file mode 100644 index 00000000..a270776f --- /dev/null +++ b/pkg/ctl/functions/status_test.go @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + `bytes` + `encoding/json` + "github.com/streamnative/pulsarctl/pkg/pulsar" + "github.com/stretchr/testify/assert" + "strings" + "testing" +) + +func TestStatusFunctions(t *testing.T) { + basePath, err := getDirHelp() + if basePath == "" || err != nil { + t.Error(err) + } + t.Logf("base path: %s", basePath) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-status", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", basePath + "/test/functions/api-examples.jar", + } + + out, _, err := TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + assert.Equal(t, out.String(), "Created test-functions-status successfully") + + getArgs := []string{"get", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-status", + } + + outGet, _, _ := TestFunctionsCommands(getFunctionsCmd, getArgs) + assert.Nil(t, err) + + var functionConfig pulsar.FunctionConfig + err = json.Unmarshal(outGet.Bytes(), &functionConfig) + assert.Nil(t, err) + + assert.Equal(t, functionConfig.Tenant, "public") + assert.Equal(t, functionConfig.Namespace, "default") + assert.Equal(t, functionConfig.Name, "test-functions-status") + + statusArgs := []string{"status", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-status", + } + + outStatus := new(bytes.Buffer) + var status pulsar.FunctionStatus + + for { + outStatus, _, _ = TestFunctionsCommands(statusFunctionsCmd, statusArgs) + if strings.Contains(outStatus.String(), "true") { + break + } + } + + t.Log(outStatus.String()) + err = json.Unmarshal(outStatus.Bytes(), &status) + assert.Nil(t, err) + + assert.Equal(t, 1, status.NumRunning) + assert.Equal(t, 1, status.NumInstances) +} + +func TestFailureStatus(t *testing.T) { + statusArgs := []string{"status", + "--name", "test-functions-status-failure", + } + + out, _, err := TestFunctionsCommands(statusFunctionsCmd, statusArgs) + assert.Nil(t, err) + + errMsg := "Function test-functions-status-failure doesn't exist" + assert.True(t, strings.Contains(out.String(), errMsg)) +} diff --git a/pkg/ctl/functions/test_help.go b/pkg/ctl/functions/test_help.go index b1ba4318..9d81d2a4 100644 --- a/pkg/ctl/functions/test_help.go +++ b/pkg/ctl/functions/test_help.go @@ -60,9 +60,11 @@ func TestFunctionsCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) ( var ( flag bool + basePath string ) -func getDirHelp() (basePath string, err error) { +func getDirHelp() (string, error) { + var err error if !flag { basePath, err = os.Getwd() if err != nil { diff --git a/pkg/ctl/functions/trigger.go b/pkg/ctl/functions/trigger.go new file mode 100644 index 00000000..1533796e --- /dev/null +++ b/pkg/ctl/functions/trigger.go @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "errors" + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func triggerFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "Trigger the specified Pulsar Function with a supplied value." + desc.CommandPermission = "This command requires namespace function permissions." + + var examples []pulsar.Example + trigger := pulsar.Example{ + Desc: "Trigger the specified Pulsar Function with a supplied value", + Command: "pulsarctl functions trigger \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--topic \n" + + "\t--trigger-value \"hello pulsar\"", + } + examples = append(examples, trigger) + + triggerWithFQFN := pulsar.Example{ + Desc: "Trigger the specified Pulsar Function with a supplied value", + Command: "pulsarctl functions trigger \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]\n" + + "\t--topic \n" + + "\t--trigger-value \"hello pulsar\"", + } + examples = append(examples, triggerWithFQFN) + + triggerWithFile := pulsar.Example{ + Desc: "Trigger the specified Pulsar Function with a supplied value", + Command: "pulsarctl functions trigger \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--topic \n" + + "\t--trigger-file ", + } + examples = append(examples, triggerWithFile) + + desc.CommandExamples = examples + + var out []pulsar.Output + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + failOutWithWrongInstanceID := pulsar.Output{ + Desc: "Used an instanceID that does not exist or other impermissible actions", + Out: "[✖] code: 400 reason: Operation not permitted", + } + + failOutWithTopic := pulsar.Output{ + Desc: "Function in trigger function has unidentified topic", + Out: "[✖] code: 400 reason: Function in trigger function has unidentified topic", + } + + failOutWithTimeout := pulsar.Output{ + Desc: "Request Timed Out", + Out: "[✖] code: 408 reason: Request Timed Out", + } + + out = append(out, failOut, failOutWithNameNotExist, failOutWithWrongInstanceID, failOutWithTopic, failOutWithTimeout) + desc.CommandOutput = out + + vc.SetDescription( + "trigger", + "Trigger the specified Pulsar Function with a supplied value", + desc.ToString(), + "trigger", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doTriggerFunction(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVar( + &functionData.Topic, + "topic", + "", + "The specific topic name that the function consumes from that you want to inject the data to") + + flagSet.StringVar( + &functionData.TriggerFile, + "trigger-file", + "", + "The path to the file that contains the data with which you want to trigger the function") + + flagSet.StringVar( + &functionData.TriggerValue, + "trigger-value", + "", + "The value with which you want to trigger the function") + }) +} + +func doTriggerFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + + if funcData.TriggerValue == "" && funcData.TriggerFile == "" { + return errors.New("either a trigger value or a trigger filepath needs to be specified") + } + + if funcData.TriggerValue != "" && funcData.TriggerFile != "" { + return errors.New("either a triggerValue or a triggerFile needs to specified for the function, cannot specify both") + } + + retval, err := admin.Functions().TriggerFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName, funcData.Topic, funcData.TriggerValue, funcData.TriggerFile) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } else { + vc.Command.Printf(retval) + } + return err +} diff --git a/pkg/ctl/functions/trigger_test.go b/pkg/ctl/functions/trigger_test.go new file mode 100644 index 00000000..8aef0d31 --- /dev/null +++ b/pkg/ctl/functions/trigger_test.go @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + `bytes` + `encoding/json` + `github.com/streamnative/pulsarctl/pkg/pulsar` + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestTriggerFunctions(t *testing.T) { + basePath, err := getDirHelp() + if basePath == "" || err != nil { + t.Error(err) + } + t.Logf("base path: %s", basePath) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-trigger", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.WordCountFunction", + "--jar", basePath + "/test/functions/api-examples.jar", + } + + out, execErr, err := TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + if execErr != nil { + t.Errorf("create fucntions error value: %s", execErr.Error()) + } + assert.Equal(t, out.String(), "Created test-functions-trigger successfully") + + statsArgs := []string{"stats", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-trigger", + } + outStats := new(bytes.Buffer) + + outStats, _, _ = TestFunctionsCommands(statsFunctionsCmd, statsArgs) + var stats pulsar.FunctionStats + err = json.Unmarshal(outStats.Bytes(), &stats) + assert.Nil(t, err) + assert.Equal(t, int64(0), stats.ReceivedTotal) + assert.Equal(t, int64(0), stats.ProcessedSuccessfullyTotal) + + // send trigger cmd to broker + triggerArgs := []string{"trigger", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-trigger", + "--topic", "test-input-topic", + "--trigger-value", "hello pulsar", + } + + triggerOut := new(bytes.Buffer) + for i := 0; i < 2; i++ { + triggerOut, execErr, err = TestFunctionsCommands(triggerFunctionsCmd, triggerArgs) + assert.Nil(t, err) + if execErr != nil { + t.Error(execErr.Error()) + } + t.Log(triggerOut.String()) + } +} + +func TestTriggerFunctionsFailure(t *testing.T) { + basePath, err := getDirHelp() + if basePath == "" || err != nil { + t.Error(err) + } + t.Logf("base path: %s", basePath) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-trigger-failure", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", basePath + "/test/functions/api-examples.jar", + } + + out, _, err := TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + assert.Equal(t, out.String(), "Created test-functions-trigger-failure successfully") + // wait the function create successfully + time.Sleep(time.Second * 3) + + triggerArgs := []string{"trigger", + "--name", "not-exist", + "--topic", "test-input-topic", + "--trigger-value", "hello pulsar", + } + + _, errMsg, _ := TestFunctionsCommands(triggerFunctionsCmd, triggerArgs) + errorMessage := "code: 404 reason: Function not-exist doesn't exist" + assert.Equal(t, errorMessage, errMsg.Error()) + + triggerArgsNoTopic := []string{"trigger", + "--name", "test-functions-trigger-failure", + "--topic", "test-input-topic-failure", + "--trigger-value", "hello pulsar", + } + _, errMsg, _ = TestFunctionsCommands(triggerFunctionsCmd, triggerArgsNoTopic) + noTopicErr := "code: 400 reason: Function in trigger function has unidentified topic" + assert.Equal(t, noTopicErr, errMsg.Error()) + + triggerArgsNoValueOrFile := []string{"trigger", + "--name", "test-functions-trigger-failure", + "--topic", "test-input-topic", + } + _, errMsg, _ = TestFunctionsCommands(triggerFunctionsCmd, triggerArgsNoValueOrFile) + noValueOrFile := "either a trigger value or a trigger filepath needs to be specified" + assert.Equal(t, noValueOrFile, errMsg.Error()) +} diff --git a/pkg/pulsar/data.go b/pkg/pulsar/data.go index a68c1f13..16607d39 100644 --- a/pkg/pulsar/data.go +++ b/pkg/pulsar/data.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package pulsar // ClusterData information on a cluster @@ -47,6 +64,13 @@ type FunctionData struct { MaxMessageRetries int `json:"maxMessageRetries"` DeadLetterTopic string `json:"deadLetterTopic"` + Key string `json:"key"` + Watch bool `json:"watch"` + State string `json:"state"` + TriggerValue string `json:"triggerValue"` + TriggerFile string `json:"triggerFile"` + Topic string `json:"topic"` + UpdateAuthData bool `json:"updateAuthData"` FuncConf *FunctionConfig `json:"-"` diff --git a/pkg/pulsar/function_state.go b/pkg/pulsar/function_state.go new file mode 100644 index 00000000..a740e20e --- /dev/null +++ b/pkg/pulsar/function_state.go @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type FunctionState struct { + Key string `json:"key"` + StringValue string `json:"stringValue"` + ByteValue []byte `json:"byteValue"` + NumValue int64 `json:"numberValue"` + Version int64 `json:"version"` +} diff --git a/pkg/pulsar/function_status.go b/pkg/pulsar/function_status.go new file mode 100644 index 00000000..1c699beb --- /dev/null +++ b/pkg/pulsar/function_status.go @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type FunctionStatus struct { + NumInstances int `json:"numInstances"` + NumRunning int `json:"numRunning"` + Instances []FunctionInstanceStatus `json:"instances"` +} + +type FunctionInstanceStatus struct { + InstanceId int `json:"instanceId"` + Status FunctionInstanceStatusData `json:"status"` +} + +type FunctionInstanceStatusData struct { + Running bool `json:"running"` + Err string `json:"error"` + NumRestarts int64 `json:"numRestarts"` + NumReceived int64 `json:"numReceived"` + NumSuccessfullyProcessed int64 `json:"numSuccessfullyProcessed"` + NumUserExceptions int64 `json:"numUserExceptions"` + LatestUserExceptions []ExceptionInformation `json:"latestUserExceptions"` + NumSystemExceptions int64 `json:"numSystemExceptions"` + LatestSystemExceptions []ExceptionInformation `json:"latestSystemExceptions"` + AverageLatency float64 `json:"averageLatency"` + LastInvocationTime int64 `json:"lastInvocationTime"` + WorkerId string `json:"workerId"` +} + +type ExceptionInformation struct { + ExceptionString string `json:"exceptionString"` + TimestampMs int64 `json:"timestampMs"` +} + +func (fs *FunctionStatus) AddInstance(functionInstanceStatus FunctionInstanceStatus) { + fs.Instances = append(fs.Instances, functionInstanceStatus) +} diff --git a/pkg/pulsar/functions.go b/pkg/pulsar/functions.go index d72d3af2..b02e29d1 100644 --- a/pkg/pulsar/functions.go +++ b/pkg/pulsar/functions.go @@ -71,6 +71,30 @@ type Functions interface { // Get the configuration for the specified function GetFunction(tenant, namespace, name string) (FunctionConfig, error) + // Gets the current status of a function + GetFunctionStatus(tenant, namespace, name string) (FunctionStatus, error) + + // Gets the current status of a function instance + GetFunctionStatusWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatusData, error) + + // Gets the current stats of a function + GetFunctionStats(tenant, namespace, name string) (FunctionStats, error) + + // Gets the current stats of a function instance + GetFunctionStatsWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatsData, error) + + // Fetch the current state associated with a Pulsar Function + // + // Response Example: + // { "value : 12, version : 2"} + GetFunctionState(tenant, namespace, name, key string) (FunctionState, error) + + // Puts the given state associated with a Pulsar Function + PutFunctionState(tenant, namespace, name string, state FunctionState) error + + // Triggers the function by writing to the input topic + TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error) + // Update the configuration for a function. UpdateFunction(functionConfig *FunctionConfig, fileName string, updateOptions *UpdateOptions) error @@ -406,3 +430,150 @@ func (f *functions) UpdateFunctionWithUrl(functionConfig *FunctionConfig, pkgUrl return nil } + +func (f *functions) GetFunctionStatus(tenant, namespace, name string) (FunctionStatus, error) { + var functionStatus FunctionStatus + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + err := f.client.get(endpoint+"/status", &functionStatus) + return functionStatus, err +} + +func (f *functions) GetFunctionStatusWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatusData, error) { + var functionInstanceStatusData FunctionInstanceStatusData + id := fmt.Sprintf("%d", instanceID) + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) + err := f.client.get(endpoint+"/status", &functionInstanceStatusData) + return functionInstanceStatusData, err +} + +func (f *functions) GetFunctionStats(tenant, namespace, name string) (FunctionStats, error) { + var functionStats FunctionStats + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + err := f.client.get(endpoint+"/stats", &functionStats) + return functionStats, err +} + +func (f *functions) GetFunctionStatsWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatsData, error) { + var functionInstanceStatsData FunctionInstanceStatsData + id := fmt.Sprintf("%d", instanceID) + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) + err := f.client.get(endpoint+"/stats", &functionInstanceStatsData) + return functionInstanceStatsData, err +} + +func (f *functions)GetFunctionState(tenant, namespace, name, key string) (FunctionState, error) { + var functionState FunctionState + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, "state", key) + err := f.client.get(endpoint, &functionState) + return functionState, err +} + +func (f *functions) PutFunctionState(tenant, namespace, name string, state FunctionState) error { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, "state", state.Key) + + // buffer to store our request as bytes + bodyBuf := bytes.NewBufferString("") + + multiPartWriter := multipart.NewWriter(bodyBuf) + + stateData, err := json.Marshal(state) + + if err != nil { + return err + } + + stateWriter, err := f.createStringFromField(multiPartWriter, "state") + if err != nil { + return err + } + + _, err = stateWriter.Write(stateData) + + if err != nil { + return err + } + + // In here, we completed adding the file and the fields, let's close the multipart writer + // So it writes the ending boundary + if err = multiPartWriter.Close(); err != nil { + return err + } + + contentType := multiPartWriter.FormDataContentType() + + err = f.client.postWithMultiPart(endpoint, nil, nil, bodyBuf, contentType) + + if err != nil { + return err + } + + return nil +} + +func (f *functions) TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error) { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, "trigger") + + // buffer to store our request as bytes + bodyBuf := bytes.NewBufferString("") + + multiPartWriter := multipart.NewWriter(bodyBuf) + + if triggerFile != "" { + file, err := os.Open(triggerFile) + if err != nil { + return "", err + } + defer file.Close() + + part, err := multiPartWriter.CreateFormFile("dataStream", filepath.Base(file.Name())) + + if err != nil { + return "", err + } + + // copy the actual file content to the filed's writer + _, err = io.Copy(part, file) + if err != nil { + return "", err + } + } + + if triggerValue != "" { + valueWriter, err := f.createTextFromFiled(multiPartWriter, "data") + if err != nil { + return "", err + } + + _, err = valueWriter.Write([]byte(triggerValue)) + if err != nil { + return "", err + } + } + + if topic != "" { + topicWriter, err := f.createTextFromFiled(multiPartWriter, "topic") + if err != nil { + return "", err + } + + _, err = topicWriter.Write([]byte(topic)) + if err != nil { + return "", err + } + } + + // In here, we completed adding the file and the fields, let's close the multipart writer + // So it writes the ending boundary + if err := multiPartWriter.Close(); err != nil { + return "", err + } + + contentType := multiPartWriter.FormDataContentType() + var str string + err := f.client.postWithMultiPart(endpoint, &str, nil, bodyBuf, contentType) + if err != nil { + return "", err + } + + return str, nil +} diff --git a/pkg/pulsar/functions_stats.go b/pkg/pulsar/functions_stats.go new file mode 100644 index 00000000..3f644388 --- /dev/null +++ b/pkg/pulsar/functions_stats.go @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type FunctionStats struct { + // Overall total number of records function received from source + ReceivedTotal int64 `json:"receivedTotal"` + + // Overall total number of records successfully processed by user function + ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"` + + // Overall total number of system exceptions thrown + SystemExceptionsTotal int64 `json:"systemExceptionsTotal"` + + // Overall total number of user exceptions thrown + UserExceptionsTotal int64 `json:"userExceptionsTotal"` + + // Average process latency for function + AvgProcessLatency float64 `json:"avgProcessLatency"` + + // Timestamp of when the function was last invoked by any instance + LastInvocation int64 `json:"lastInvocation"` + + OneMin FunctionInstanceStatsDataBase `json:"oneMin"` + + Instances []FunctionInstanceStats `json:"instances"` + + FunctionInstanceStats +} + +type FunctionInstanceStats struct { + FunctionInstanceStatsDataBase + + InstanceId int64 `json:"instanceId"` + + Metrics FunctionInstanceStatsData `json:"metrics"` +} + +type FunctionInstanceStatsDataBase struct { + // Total number of records function received from source for instance + ReceivedTotal int64 `json:"receivedTotal"` + + // Total number of records successfully processed by user function for instance + ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"` + + // Total number of system exceptions thrown for instance + SystemExceptionsTotal int64 `json:"systemExceptionsTotal"` + + // Total number of user exceptions thrown for instance + UserExceptionsTotal int64 `json:"userExceptionsTotal"` + + // Average process latency for function for instance + AvgProcessLatency float64 `json:"avgProcessLatency"` +} + +type FunctionInstanceStatsData struct { + OneMin FunctionInstanceStatsDataBase `json:"oneMin"` + + // Timestamp of when the function was last invoked for instance + LastInvocation int64 `json:"lastInvocation"` + + // Map of user defined metrics + UserMetrics map[string]float64 `json:"userMetrics"` + + FunctionInstanceStatsDataBase +} + +func (fs *FunctionStats) AddInstance(functionInstanceStats FunctionInstanceStats) { + fs.Instances = append(fs.Instances, functionInstanceStats) +} + +func (fs *FunctionStats) CalculateOverall() *FunctionStats { + var ( + nonNullInstances int + nonNullInstancesOneMin int + ) + + for _, functionInstanceStats := range fs.Instances { + functionInstanceStatsData := functionInstanceStats.Metrics + fs.ReceivedTotal += functionInstanceStatsData.ReceivedTotal + fs.ProcessedSuccessfullyTotal += functionInstanceStatsData.ProcessedSuccessfullyTotal + fs.SystemExceptionsTotal += functionInstanceStatsData.SystemExceptionsTotal + fs.UserExceptionsTotal += functionInstanceStatsData.UserExceptionsTotal + + if functionInstanceStatsData.AvgProcessLatency != 0 { + if fs.AvgProcessLatency == 0 { + fs.AvgProcessLatency = 0.0 + } + + fs.AvgProcessLatency += functionInstanceStatsData.AvgProcessLatency + nonNullInstances++ + } + + fs.OneMin.ReceivedTotal += functionInstanceStatsData.OneMin.ReceivedTotal + fs.OneMin.ProcessedSuccessfullyTotal += functionInstanceStatsData.OneMin.ProcessedSuccessfullyTotal + fs.OneMin.SystemExceptionsTotal += functionInstanceStatsData.OneMin.SystemExceptionsTotal + fs.OneMin.UserExceptionsTotal += functionInstanceStatsData.OneMin.UserExceptionsTotal + + if functionInstanceStatsData.OneMin.AvgProcessLatency != 0 { + if fs.OneMin.AvgProcessLatency == 0 { + fs.OneMin.AvgProcessLatency = 0.0 + } + + fs.OneMin.AvgProcessLatency += functionInstanceStatsData.OneMin.AvgProcessLatency + nonNullInstancesOneMin++ + } + + if functionInstanceStatsData.LastInvocation != 0 { + if fs.LastInvocation == 0 || functionInstanceStatsData.LastInvocation > fs.LastInvocation { + fs.LastInvocation = functionInstanceStatsData.LastInvocation + } + } + } + + // calculate average from sum + if nonNullInstances > 0 { + fs.AvgProcessLatency = fs.AvgProcessLatency / float64(nonNullInstances) + } else { + fs.AvgProcessLatency = 0 + } + + // calculate 1min average from sum + if nonNullInstancesOneMin > 0 { + fs.OneMin.AvgProcessLatency = fs.OneMin.AvgProcessLatency / float64(nonNullInstancesOneMin) + } else { + fs.AvgProcessLatency = 0 + } + + return fs +}