Skip to content

Commit

Permalink
Add stats, status, querystate, putstate, trigger cmds for Pulsar Func…
Browse files Browse the repository at this point in the history
…tions (#34)

Master Issue: #2 

Add `stats`, `status`, `querystate`, `putstate`, `trigger` cmds for Pulsar Functions
  • Loading branch information
wolfstudy authored and sijie committed Sep 9, 2019
1 parent 798b86d commit bb9c79d
Show file tree
Hide file tree
Showing 18 changed files with 1,846 additions and 2 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ jobs:
- name: Get dependencies
run: |
docker pull apachepulsar/pulsar
docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test apachepulsar/pulsar bin/pulsar standalone
docker run -d -p 8080:8080 -p 6650:6650 -v ${PWD}:/pulsar_test --name pulsarctl apachepulsar/pulsar bin/pulsar standalone
docker cp pulsarctl:/pulsar/examples/api-examples.jar ${PWD}/test/functions/
ls -al test/functions/
sleep 10
- name: Build
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*.so
*.dylib
pulsarctl
api-examples.jar
dummyExample.jar

# Test binary, build with `go test -c`
*.test
Expand Down
13 changes: 13 additions & 0 deletions pkg/ctl/functions/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@
package functions

import (
`errors`
"github.com/spf13/cobra"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

var checkPutStateArgs = func(args []string) error {
if len(args) < 2 {
return errors.New("need to specified the state key and state value")
}
return nil
}

func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
resourceCmd := cmdutils.NewResourceCmd(
"functions",
Expand All @@ -39,6 +47,11 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, listFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updateFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, statusFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, statsFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, querystateFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, putstateFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, triggerFunctionsCmd)

return resourceCmd
}
171 changes: 171 additions & 0 deletions pkg/ctl/functions/putstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package functions

import (
"fmt"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar"
"io/ioutil"
"strings"
)

func putstateFunctionsCmd(vc *cmdutils.VerbCmd) {
desc := pulsar.LongDescription{}
desc.CommandUsedFor = "Put a key/value pair to the state associated with a Pulsar Function."
desc.CommandPermission = "This command requires namespace function permissions."

var examples []pulsar.Example
putstate := pulsar.Example{
Desc: "Put a key/<string value> pair to the state associated with a Pulsar Function",
Command: "pulsarctl functions putstate \n" +
"\t--tenant public\n" +
"\t--namespace default\n" +
"\t--name <the name of Pulsar Function> \n" +
"\t<key name> - <string value> ",
}
examples = append(examples, putstate)

putstateWithByte := pulsar.Example{
Desc: "Put a key/<file path> pair to the state associated with a Pulsar Function",
Command: "pulsarctl functions putstate \n" +
"\t--tenant public\n" +
"\t--namespace default\n" +
"\t--name <the name of Pulsar Function> \n" +
"\t<key name> = <file path> ",
}
examples = append(examples, putstateWithByte)

putstateWithFQFN := pulsar.Example{
Desc: "Put a key/value pair to the state associated with a Pulsar Function with FQFN",
Command: "pulsarctl functions putstate \n" +
"\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions] \n" +
"\t<key name> - <string value> ",
}
examples = append(examples, putstateWithFQFN)
desc.CommandExamples = examples

var out []pulsar.Output
successOut := pulsar.Output{
Desc: "normal output",
Out: "Put state <the function state> successfully",
}

failOut := pulsar.Output{
Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args",
Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)",
}

failOutWithNameNotExist := pulsar.Output{
Desc: "The name of Pulsar Functions doesn't exist, please check the `--name` arg",
Out: "[✖] code: 404 reason: Function <your function name> doesn't exist",
}

failOutWithKeyOrValueNotExist := pulsar.Output{
Desc: "The state key and state value not specified, please check your input format",
Out: "[✖] need to specified the state key and state value",
}

fileOutErrInputFormat := pulsar.Output{
Desc: "The format of the input is incorrect, please check.",
Out: "[✖] error input format",
}

out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithKeyOrValueNotExist, fileOutErrInputFormat)
desc.CommandOutput = out

vc.SetDescription(
"putstate",
"Put a key/value pair to the state associated with a Pulsar Function",
desc.ToString(),
"putstate",
)

functionData := &pulsar.FunctionData{}

// set the run function
vc.SetRunFuncWithNameArgs(func() error {
return doPutStateFunction(vc, functionData)
}, checkPutStateArgs)

// register the params
vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) {
flagSet.StringVar(
&functionData.FQFN,
"fqfn",
"",
"The Fully Qualified Function Name (FQFN) for the function")

flagSet.StringVar(
&functionData.Tenant,
"tenant",
"",
"The tenant of a Pulsar Function")

flagSet.StringVar(
&functionData.Namespace,
"namespace",
"",
"The namespace of a Pulsar Function")

flagSet.StringVar(
&functionData.FuncName,
"name",
"",
"The name of a Pulsar Function")
})
}

func doPutStateFunction(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error {
err := processBaseArguments(funcData)
if err != nil {
vc.Command.Help()
return err
}
admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3)

var state pulsar.FunctionState

state.Key = vc.NameArgs[0]
value := vc.NameArgs[1]

fmt.Println("value:", value)

if value == "-" {
state.StringValue = strings.Join(vc.NameArgs[2:], " ")
} else if value == "=" {
contents, err := ioutil.ReadFile(vc.NameArgs[2])
if err != nil {
return err
}
state.ByteValue = contents
} else {
return errors.New("error input format")
}

err = admin.Functions().PutFunctionState(funcData.Tenant, funcData.Namespace, funcData.FuncName, state)
if err != nil {
cmdutils.PrintError(vc.Command.OutOrStderr(), err)
} else {
vc.Command.Printf("Put state %+v successfully", state)
}

return err
}
Loading

0 comments on commit bb9c79d

Please sign in to comment.