Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stats, status, querystate, putstate, trigger cmds for Pulsar Functions #34

Merged
merged 14 commits into from
Sep 9, 2019
4 changes: 3 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ jobs:
- name: Get dependencies
run: |
docker pull apachepulsar/pulsar
docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test apachepulsar/pulsar bin/pulsar standalone
docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test --name pulsarctl apachepulsar/pulsar bin/pulsar standalone
docker cp pulsarctl:/pulsar/examples/api-examples.jar ${PWD}/test/functions/
ls -al test/functions/
sleep 10

- name: Build
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*.so
*.dylib
pulsarctl
api-examples.jar
dummyExample.jar

# Test binary, build with `go test -c`
*.test
Expand Down
13 changes: 13 additions & 0 deletions pkg/ctl/functions/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@
package functions

import (
`errors`
"github.com/spf13/cobra"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

var checkPutStateArgs = func(args []string) error {
if len(args) < 2 {
return errors.New("need to specified the state key and state value")
}
return nil
}

func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
resourceCmd := cmdutils.NewResourceCmd(
"functions",
Expand All @@ -39,6 +47,11 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, listFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updateFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, statusFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, statsFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, querystateFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, putstateFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, triggerFunctionsCmd)

return resourceCmd
}
160 changes: 160 additions & 0 deletions pkg/ctl/functions/putstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package functions

import (
"github.com/spf13/pflag"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar"
`io/ioutil`
`strings`
)

func putstateFunctionsCmd(vc *cmdutils.VerbCmd) {
desc := pulsar.LongDescription{}
desc.CommandUsedFor = "Put a key/value pair to the state associated with a Pulsar Function."
desc.CommandPermission = "This command requires namespace function permissions."

var examples []pulsar.Example
putstate := pulsar.Example{
Desc: "Put a key/<string value> pair to the state associated with a Pulsar Function",
Command: "pulsarctl functions putstate \n" +
"\t--tenant public\n" +
"\t--namespace default\n" +
"\t--name <the name of Pulsar Function> \n" +
"\t<key name> <string value> ",
}
examples = append(examples, putstate)

putstateWithByte := pulsar.Example{
Desc: "Put a key/<byte value> pair to the state associated with a Pulsar Function",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should treat this case as byte value case

Command: "pulsarctl functions putstate \n" +
"\t--tenant public\n" +
"\t--namespace default\n" +
"\t--name <the name of Pulsar Function> \n" +
"\t<key name> - <byte value> ",
}
examples = append(examples, putstateWithByte)

putstateWithFQFN := pulsar.Example{
Desc: "Put a key/value pair to the state associated with a Pulsar Function with FQFN",
Command: "pulsarctl functions putstate \n" +
"\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions] \n" +
"\t<key name> - <string value> ",
}
examples = append(examples, putstateWithFQFN)
desc.CommandExamples = examples

var out []pulsar.Output
successOut := pulsar.Output{
Desc: "normal output",
Out: "Put state <the function state> successfully",
}

failOut := pulsar.Output{
Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args",
Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)",
}

failOutWithNameNotExist := pulsar.Output{
Desc: "The name of Pulsar Functions doesn't exist, please check the `--name` arg",
Out: "[✖] code: 404 reason: Function <your function name> doesn't exist",
}

failOutWithWrongJson := pulsar.Output{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove --state, I don't think we will hit this error.

Desc: "unexpected end of JSON input, please check the `--state` arg",
Out: "[✖] unexpected end of JSON input",
}

out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithWrongJson)
desc.CommandOutput = out

vc.SetDescription(
"putstate",
"Put a key/value pair to the state associated with a Pulsar Function",
desc.ToString(),
"putstate",
)

functionData := &pulsar.FunctionData{}

// set the run function
vc.SetRunFuncWithNameArgs(func() error {
return doPutStateFunction(vc, functionData)
}, checkPutStateArgs)

// register the params
vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) {
flagSet.StringVar(
&functionData.FQFN,
"fqfn",
"",
"The Fully Qualified Function Name (FQFN) for the function")

flagSet.StringVar(
&functionData.Tenant,
"tenant",
"",
"The tenant of a Pulsar Function")

flagSet.StringVar(
&functionData.Namespace,
"namespace",
"",
"The namespace of a Pulsar Function")

flagSet.StringVar(
&functionData.FuncName,
"name",
"",
"The name of a Pulsar Function")
})
}

func doPutStateFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error {
err := processBaseArguments(funcData)
if err != nil {
vc.Command.Help()
return err
}
admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3)

var state pulsar.FunctionState

state.Key = vc.NameArgs[0]
value := vc.NameArgs[1]

if value == "-" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please take a look at kubectl.

  # List contents of /usr from the first container of pod 123456-7890 and sort by modification time.
  # If the command you want to execute in the pod has any flags in common (e.g. -i),
  # you must use two dashes (--) to separate your command's flags/arguments.
  # Also note, do not surround your command and its flags/arguments with quotes
  # unless that is how you would execute it normally (i.e., do ls -t /usr, not "ls -t /usr").
  kubectl exec 123456-7890 -i -t -- ls -t /usr

As I commented before, I was suggesting all the named args after - (we can use -- similar as kubectl as well) are the string value.

I will explain that using following examples as below:

// the key is `key-1`,  the string-value is `value-1`
pulsarctl functions putstate key-1 value-1

// throw exception since we are expecting one string-value
pulsarctl functions putstate key-1 value-1 value-2

// not sure how the framework work here. but ideally `value-1 value` should be treated as one name arg.
// if so, the string-value should be `value-1 value2`.
pulsarctl functions putstate key-1 "value-1 value2"

// in this example, all the name args after `--` should be treated as one string-value "hello world pulsar ctl!"
pulsarctl functions putstate key-1 -- hello world pulsar ctl!

for bytes-value, I was suggesting using > as separator. but I forgot > is a special character in bash. we can use a different separator. maybe ---?

Copy link
Contributor Author

@wolfstudy wolfstudy Sep 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use = as the separator between key/byteValue pair?

> or < means input or output redirection
-- This marks end of parameter (option) list.
--- bad flag syntax: ---

contents, err := ioutil.ReadFile(vc.NameArgs[2])
if err != nil {
return err
}
state.ByteValue = contents
} else {
state.StringValue = strings.Join(vc.NameArgs[1:], " ")
}

err = admin.Functions().PutFunctionState(funcData.Tenant, funcData.Namespace, funcData.FuncName, state)
if err != nil {
cmdutils.PrintError(vc.Command.OutOrStderr(), err)
} else {
vc.Command.Printf("Put state %+v successfully", state)
}

return err
}
115 changes: 115 additions & 0 deletions pkg/ctl/functions/putstate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package functions

import (
`bytes`
"encoding/json"
"github.com/streamnative/pulsarctl/pkg/pulsar"
"github.com/stretchr/testify/assert"
"strings"
"testing"
)

func TestStateFunctions(t *testing.T) {
basePath, err := getDirHelp()
if basePath == "" || err != nil {
t.Error(err)
}
t.Logf("base path: %s", basePath)
args := []string{"create",
"--tenant", "public",
"--namespace", "default",
"--name", "test-functions-putstate",
"--inputs", "test-input-topic",
"--output", "persistent://public/default/test-output-topic",
"--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"--jar", basePath + "/test/functions/api-examples.jar",
}

out, execErr, err := TestFunctionsCommands(createFunctionsCmd, args)
assert.Nil(t, err)
if execErr != nil {
t.Errorf("create functions error value: %s", execErr.Error())
}
assert.Equal(t, out.String(), "Created test-functions-putstate successfully")

putstateArgs := []string{"putstate",
"--tenant", "public",
"--namespace", "default",
"--name", "test-functions-putstate",
"pulsar", "hello",
}

outPutState := new(bytes.Buffer)

for {
outPutState, _, _ = TestFunctionsCommands(putstateFunctionsCmd, putstateArgs)

if strings.Contains(outPutState.String(), "successfully") {
break
}
}

assert.True(t, strings.Contains(outPutState.String(), "successfully"))

// test failure case for put state
failureStateArgs := []string{"putstate",
"--name", "not-exist",
"pulsar", "hello",
}

_, execErrMsg, _ := TestFunctionsCommands(putstateFunctionsCmd, failureStateArgs)
assert.NotNil(t, execErrMsg)
exceptMsg := "'not-exist' is not found"
assert.True(t, strings.Contains(execErrMsg.Error(), exceptMsg))

// query state
queryStateArgs := []string{"querystate",
"--tenant", "public",
"--namespace", "default",
"--name", "test-functions-putstate",
"--key", "pulsar",
}

outQueryState, _, err := TestFunctionsCommands(querystateFunctionsCmd, queryStateArgs)
assert.Nil(t, err)
t.Logf("outQueryState:%s", outQueryState.String())

var state pulsar.FunctionState
err = json.Unmarshal(outQueryState.Bytes(), &state)
assert.Nil(t, err)

assert.Equal(t, "pulsar", state.Key)
assert.Equal(t, "hello", state.StringValue)
assert.Equal(t, int64(0), state.Version)
// put state again
outPutStateAgain, _, err := TestFunctionsCommands(putstateFunctionsCmd, putstateArgs)
assert.Nil(t, err)
assert.True(t, strings.Contains(outPutStateAgain.String(), "successfully"))

// query state again
outQueryStateAgain, _, err := TestFunctionsCommands(querystateFunctionsCmd, queryStateArgs)
assert.Nil(t, err)

var stateAgain pulsar.FunctionState
err = json.Unmarshal(outQueryStateAgain.Bytes(), &stateAgain)
assert.Nil(t, err)

assert.Equal(t, int64(1), stateAgain.Version)
}
Loading