Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stats, status, querystate, putstate, trigger cmds for Pulsar Functions #34

Merged
merged 14 commits into from
Sep 9, 2019
4 changes: 3 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ jobs:
- name: Get dependencies
run: |
docker pull apachepulsar/pulsar
docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test apachepulsar/pulsar bin/pulsar standalone
docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test --name pulsarctl apachepulsar/pulsar bin/pulsar standalone
docker cp pulsarctl:/pulsar/examples/api-examples.jar ${PWD}/test/functions/
ls -al test/functions/
sleep 10

- name: Build
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*.so
*.dylib
pulsarctl
api-examples.jar
dummyExample.jar

# Test binary, build with `go test -c`
*.test
Expand Down
13 changes: 13 additions & 0 deletions pkg/ctl/functions/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@
package functions

import (
`errors`
"github.com/spf13/cobra"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

var checkPutStateArgs = func(args []string) error {
if len(args) < 2 {
return errors.New("need to specified the state key and state value")
}
return nil
}

func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
resourceCmd := cmdutils.NewResourceCmd(
"functions",
Expand All @@ -39,6 +47,11 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, listFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updateFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, statusFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, statsFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, querystateFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, putstateFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, triggerFunctionsCmd)

return resourceCmd
}
171 changes: 171 additions & 0 deletions pkg/ctl/functions/putstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package functions

import (
"fmt"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar"
"io/ioutil"
"strings"
)

func putstateFunctionsCmd(vc *cmdutils.VerbCmd) {
desc := pulsar.LongDescription{}
desc.CommandUsedFor = "Put a key/value pair to the state associated with a Pulsar Function."
desc.CommandPermission = "This command requires namespace function permissions."

var examples []pulsar.Example
putstate := pulsar.Example{
Desc: "Put a key/<string value> pair to the state associated with a Pulsar Function",
Command: "pulsarctl functions putstate \n" +
"\t--tenant public\n" +
"\t--namespace default\n" +
"\t--name <the name of Pulsar Function> \n" +
"\t<key name> - <string value> ",
}
examples = append(examples, putstate)

putstateWithByte := pulsar.Example{
Desc: "Put a key/<file path> pair to the state associated with a Pulsar Function",
Command: "pulsarctl functions putstate \n" +
"\t--tenant public\n" +
"\t--namespace default\n" +
"\t--name <the name of Pulsar Function> \n" +
"\t<key name> = <file path> ",
}
examples = append(examples, putstateWithByte)

putstateWithFQFN := pulsar.Example{
Desc: "Put a key/value pair to the state associated with a Pulsar Function with FQFN",
Command: "pulsarctl functions putstate \n" +
"\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions] \n" +
"\t<key name> - <string value> ",
}
examples = append(examples, putstateWithFQFN)
desc.CommandExamples = examples

var out []pulsar.Output
successOut := pulsar.Output{
Desc: "normal output",
Out: "Put state <the function state> successfully",
}

failOut := pulsar.Output{
Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args",
Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)",
}

failOutWithNameNotExist := pulsar.Output{
Desc: "The name of Pulsar Functions doesn't exist, please check the `--name` arg",
Out: "[✖] code: 404 reason: Function <your function name> doesn't exist",
}

failOutWithKeyOrValueNotExist := pulsar.Output{
Desc: "The state key and state value not specified, please check your input format",
Out: "[✖] need to specified the state key and state value",
}

fileOutErrInputFormat := pulsar.Output{
Desc: "The format of the input is incorrect, please check.",
Out: "[✖] error input format",
}

out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithKeyOrValueNotExist, fileOutErrInputFormat)
desc.CommandOutput = out

vc.SetDescription(
"putstate",
"Put a key/value pair to the state associated with a Pulsar Function",
desc.ToString(),
"putstate",
)

functionData := &pulsar.FunctionData{}

// set the run function
vc.SetRunFuncWithNameArgs(func() error {
return doPutStateFunction(vc, functionData)
}, checkPutStateArgs)

// register the params
vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) {
flagSet.StringVar(
&functionData.FQFN,
"fqfn",
"",
"The Fully Qualified Function Name (FQFN) for the function")

flagSet.StringVar(
&functionData.Tenant,
"tenant",
"",
"The tenant of a Pulsar Function")

flagSet.StringVar(
&functionData.Namespace,
"namespace",
"",
"The namespace of a Pulsar Function")

flagSet.StringVar(
&functionData.FuncName,
"name",
"",
"The name of a Pulsar Function")
})
}

func doPutStateFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error {
err := processBaseArguments(funcData)
if err != nil {
vc.Command.Help()
return err
}
admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3)

var state pulsar.FunctionState

state.Key = vc.NameArgs[0]
value := vc.NameArgs[1]

fmt.Println("value:", value)

if value == "-" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please take a look at kubectl.

  # List contents of /usr from the first container of pod 123456-7890 and sort by modification time.
  # If the command you want to execute in the pod has any flags in common (e.g. -i),
  # you must use two dashes (--) to separate your command's flags/arguments.
  # Also note, do not surround your command and its flags/arguments with quotes
  # unless that is how you would execute it normally (i.e., do ls -t /usr, not "ls -t /usr").
  kubectl exec 123456-7890 -i -t -- ls -t /usr

As I commented before, I was suggesting all the named args after - (we can use -- similar as kubectl as well) are the string value.

I will explain that using following examples as below:

// the key is `key-1`,  the string-value is `value-1`
pulsarctl functions putstate key-1 value-1

// throw exception since we are expecting one string-value
pulsarctl functions putstate key-1 value-1 value-2

// not sure how the framework work here. but ideally `value-1 value` should be treated as one name arg.
// if so, the string-value should be `value-1 value2`.
pulsarctl functions putstate key-1 "value-1 value2"

// in this example, all the name args after `--` should be treated as one string-value "hello world pulsar ctl!"
pulsarctl functions putstate key-1 -- hello world pulsar ctl!

for bytes-value, I was suggesting using > as separator. but I forgot > is a special character in bash. we can use a different separator. maybe ---?

Copy link
Contributor Author

@wolfstudy wolfstudy Sep 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use = as the separator between key/byteValue pair?

> or < means input or output redirection
-- This marks end of parameter (option) list.
--- bad flag syntax: ---

state.StringValue = strings.Join(vc.NameArgs[2:], " ")
} else if value == "=" {
contents, err := ioutil.ReadFile(vc.NameArgs[2])
if err != nil {
return err
}
state.ByteValue = contents
} else {
return errors.New("error input format")
}

err = admin.Functions().PutFunctionState(funcData.Tenant, funcData.Namespace, funcData.FuncName, state)
if err != nil {
cmdutils.PrintError(vc.Command.OutOrStderr(), err)
} else {
vc.Command.Printf("Put state %+v successfully", state)
}

return err
}
Loading