Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stats, status, querystate, putstate, trigger cmds for Pulsar Functions #34

Merged
merged 14 commits into from
Sep 9, 2019
4 changes: 3 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ jobs:
- name: Get dependencies
run: |
docker pull apachepulsar/pulsar
docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test apachepulsar/pulsar bin/pulsar standalone
docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test --name pulsarctl apachepulsar/pulsar bin/pulsar standalone
docker cp pulsarctl:/pulsar/examples/api-examples.jar ${PWD}/test/functions/
ls -al test/functions/
sleep 10

- name: Build
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*.so
*.dylib
pulsarctl
api-examples.jar

# Test binary, build with `go test -c`
*.test
Expand Down
5 changes: 5 additions & 0 deletions pkg/ctl/functions/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, listFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updateFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, statusFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, statsFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, querystateFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, putstateFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, triggerFunctionsCmd)

return resourceCmd
}
142 changes: 142 additions & 0 deletions pkg/ctl/functions/putstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package functions

import (
"encoding/json"
"github.com/spf13/pflag"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar"
)

func putstateFunctionsCmd(vc *cmdutils.VerbCmd) {
desc := pulsar.LongDescription{}
desc.CommandUsedFor = "Put the state associated with a Pulsar Function."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
desc.CommandUsedFor = "Put the state associated with a Pulsar Function."
desc.CommandUsedFor = "Put a key/value pair to the state associated with a Pulsar Function."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FunctionState as follows:

type FunctionState struct {
	Key         string `json:"key"`
	StringValue string `json:"stringValue"`
	ByteValue   []byte `json:"byteValue"`
	NumValue    int64  `json:"numberValue"`
	Version     int64  `json:"version"`
}

It isn't a key/value pair.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fundamentally, the state of a function is a key/value map. The value is converted and stored as bytes under the hood. You can interpret the value as string, byte[] or number. The version is assigned when you put the key/value pair to the state.

The Pulsar state related restful api is not done well. For example, we shouldn't pass any version number when putting a key/value pair to the state; only one of the value fields (StringValue, ByteValue and NumValue) should be set.

The pulsar-admin state command is also bad because you shouldn't let user construct a json string in the command. That's a very bad user experience.

The good putstate commands should be:

// put a key/string-value pair to state
pulsarctl functions putstate key string-value

// put a key/string-value pair to the state ('hello word' is treated as the value)
pulsarctl functions putstate key - hello world

// put a key/number-value pair to the state
pulsarctl functions putstate key 12345

// put a key/bytes-value pair to the state
pulsarctl functions putstate key < /path/to/value/file

The pulsarctl should interpret the value and set it to the right field in FunctionState struct and post it to the broker.

desc.CommandPermission = "This command requires super-user permissions."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this command requires super-user permission?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am wrong


var examples []pulsar.Example
putstate := pulsar.Example{
Desc: "Put the state associated with a Pulsar Function",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a key/value pair to the state associated with a Pulsar Function

Command: "pulsarctl functions putstate \n" +
"\t--tenant public\n" +
"\t--namespace default\n" +
"\t--name <the name of Pulsar Function> \n" +
"\t--state \"{\"key\":\"pulsar\", \"stringValue\":\"hello\"}\" ",
}
examples = append(examples, putstate)

putstateWithFQFN := pulsar.Example{
Desc: "Put the state associated with a Pulsar Function with FQFN",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the description. see my comment above.

Command: "pulsarctl functions putstate \n" +
"\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions] \n" +
"\t--state \"{\"key\":\"pulsar\", \"stringValue\":\"hello\"}\"",
}
examples = append(examples, putstateWithFQFN)
desc.CommandExamples = examples

var out []pulsar.Output
successOut := pulsar.Output{
Desc: "normal output",
Out: "PutState successfully",
}

failOut := pulsar.Output{
Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args",
Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)",
}

failOutWithNameNotExist := pulsar.Output{
Desc: "The name of Pulsar Functions doesn't exist, please check the --name args",
Out: "[✖] code: 404 reason: Function <your function name> doesn't exist",
}

out = append(out, successOut, failOut, failOutWithNameNotExist)
wolfstudy marked this conversation as resolved.
Show resolved Hide resolved
desc.CommandOutput = out

vc.SetDescription(
"putstate",
"Put the state associated with a Pulsar Function",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the description

desc.ToString(),
"putstate",
)

functionData := &pulsar.FunctionData{}

// set the run function
vc.SetRunFunc(func() error {
return doPutStateFunction(vc, functionData)
})

// register the params
vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) {
flagSet.StringVar(
&functionData.FQFN,
"fqfn",
"",
"The Fully Qualified Function Name (FQFN) for the function")

flagSet.StringVar(
&functionData.Tenant,
"tenant",
"",
"The tenant of a Pulsar Function")

flagSet.StringVar(
&functionData.Namespace,
"namespace",
"",
"The namespace of a Pulsar Function")

flagSet.StringVar(
&functionData.FuncName,
"name",
"",
"The name of a Pulsar Function")

flagSet.StringVarP(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better way is to use NameArg for key/value pair here not option arg.

pulsarctl functions putstate [options] <key> <value>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, the putstate cmd include fqfn and tenant, namespace, name options, these options are treated uniformly in processBaseArguments(In java is FunctionCommand)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pulsar-admin is not doing good at this part. especially the functions related commands. I don't think we should follow what pulsar-admin. In an ideal word, required arguments should be name argument, not option arguments. Because it is the most natural way to spell out an action. Just take a look at the following examples.

  • clusters create <cluster-name> vs clusters create --cluster <cluster-name>
  • tenants create <tenant-name> vs tenants create --name <tenant-name>
  • namespaces delete <ns-name> vs namespaces delete --name <ns-name>

As you can see, the former command of each example is closer to how people speak in english everyday. Ideally function commands should be following this pattern:

functions create tenant/ns/function, functions delete tenant/ns/function and etc.

However since you have already brought in the pattern from pulsar-admin to support two styles of specifying function name. It is fine to leave the function commands as they are. but for other newer commands, I'd prefer going with a more natural way for spelling out the command.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfstudy so what is the take here? are we still using --state with a json string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know this is a relatively ugly implementation. Can we open an issue first and track the problem?

&functionData.State,
"state",
"t",
"",
"The FunctionState that needs to be put")
})
}

func doPutStateFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error {
err := processBaseArguments(funcData)
if err != nil {
vc.Command.Help()
return err
}
admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3)

var state pulsar.FunctionState
err = json.Unmarshal([]byte(funcData.State), &state)
if err != nil {
return err
}

err = admin.Functions().PutFunctionState(funcData.Tenant, funcData.Namespace, funcData.FuncName, state)
if err != nil {
cmdutils.PrintError(vc.Command.OutOrStderr(), err)
} else {
vc.Command.Printf("PutState successfully")
wolfstudy marked this conversation as resolved.
Show resolved Hide resolved
}

return err
}
114 changes: 114 additions & 0 deletions pkg/ctl/functions/putstate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package functions

import (
"encoding/json"
"github.com/streamnative/pulsarctl/pkg/pulsar"
"github.com/stretchr/testify/assert"
"strings"
"testing"
`time`
)

func TestStateFunctions(t *testing.T) {
basePath, err := getDirHelp()
if basePath == "" || err != nil {
t.Error(err)
}
t.Logf("base path: %s", basePath)
args := []string{"create",
"--tenant", "public",
"--namespace", "default",
"--name", "test-functions-putstate",
"--inputs", "test-input-topic",
"--output", "persistent://public/default/test-output-topic",
"--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"--jar", basePath + "/test/functions/api-examples.jar",
}

out, execErr, err := TestFunctionsCommands(createFunctionsCmd, args)
assert.Nil(t, err)
if execErr != nil {
t.Errorf("create functions error value: %s", execErr.Error())
}
assert.Equal(t, out.String(), "Created test-functions-putstate successfully")

// wait the function create successfully
time.Sleep(time.Second * 20)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid using sleep. try to read the key in a retry loop


putstateArgs := []string{"putstate",
"--tenant", "public",
"--namespace", "default",
"--name", "test-functions-putstate",
"--state", "{\"key\":\"pulsar\", \"stringValue\":\"hello\"}",
}

outPutState, execE, err := TestFunctionsCommands(putstateFunctionsCmd, putstateArgs)
assert.Nil(t, err)
if execE != nil {
t.Errorf("put state functions error value: %s ", execE.Error())
}
t.Logf("outPutState:%s", outPutState.String())
assert.Equal(t, outPutState.String(), "PutState successfully")

// test failure case for put state
failureStateArgs := []string{"putstate",
"--name", "not-exist",
"--state", "{\"key\":\"pulsar\", \"stringValue\":\"hello\"}",
}

_, execErrMsg, _ := TestFunctionsCommands(putstateFunctionsCmd, failureStateArgs)
assert.NotNil(t, execErrMsg)
exceptMsg := "'not-exist' is not found"
assert.True(t, strings.Contains(execErrMsg.Error(), exceptMsg))

// query state
queryStateArgs := []string{"querystate",
"--tenant", "public",
"--namespace", "default",
"--name", "test-functions-putstate",
"--key", "pulsar",
}

outQueryState, _, err := TestFunctionsCommands(querystateFunctionsCmd, queryStateArgs)
assert.Nil(t, err)
t.Logf("outQueryState:%s", outQueryState.String())

var state pulsar.FunctionState
err = json.Unmarshal(outQueryState.Bytes(), &state)
assert.Nil(t, err)

assert.Equal(t, "pulsar", state.Key)
assert.Equal(t, "hello", state.StringValue)
assert.Equal(t, int64(0), state.Version)
// put state again
outPutStateAgain, _, err := TestFunctionsCommands(putstateFunctionsCmd, putstateArgs)
assert.Nil(t, err)
assert.Equal(t, outPutStateAgain.String(), "PutState successfully")

// query state again
outQueryStateAgain, _, err := TestFunctionsCommands(querystateFunctionsCmd, queryStateArgs)
assert.Nil(t, err)

var stateAgain pulsar.FunctionState
err = json.Unmarshal(outQueryStateAgain.Bytes(), &stateAgain)
assert.Nil(t, err)

assert.Equal(t, int64(1), stateAgain.Version)
}
Loading