-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add stats, status, querystate, putstate, trigger cmds for Pulsar Functions #34
Conversation
…tions Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
please make sure it pass CI |
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@sijie PTAL again |
pkg/ctl/functions/putstate.go
Outdated
|
||
func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { | ||
desc := pulsar.LongDescription{} | ||
desc.CommandUsedFor = "Put the state associated with a Pulsar Function." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
desc.CommandUsedFor = "Put the state associated with a Pulsar Function." | |
desc.CommandUsedFor = "Put a key/value pair to the state associated with a Pulsar Function." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FunctionState
as follows:
type FunctionState struct {
Key string `json:"key"`
StringValue string `json:"stringValue"`
ByteValue []byte `json:"byteValue"`
NumValue int64 `json:"numberValue"`
Version int64 `json:"version"`
}
It isn't a key/value pair.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fundamentally, the state of a function is a key/value map. The value is converted and stored as bytes under the hood. You can interpret the value as string, byte[] or number. The version is assigned when you put the key/value pair to the state.
The Pulsar state related restful api is not done well. For example, we shouldn't pass any version number when putting a key/value pair to the state; only one of the value fields (StringValue
, ByteValue
and NumValue
) should be set.
The pulsar-admin state command is also bad because you shouldn't let user construct a json string in the command. That's a very bad user experience.
The good putstate commands should be:
// put a key/string-value pair to state
pulsarctl functions putstate key string-value
// put a key/string-value pair to the state ('hello word' is treated as the value)
pulsarctl functions putstate key - hello world
// put a key/number-value pair to the state
pulsarctl functions putstate key 12345
// put a key/bytes-value pair to the state
pulsarctl functions putstate key < /path/to/value/file
The pulsarctl should interpret the value and set it to the right field in FunctionState struct and post it to the broker.
pkg/ctl/functions/putstate.go
Outdated
func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { | ||
desc := pulsar.LongDescription{} | ||
desc.CommandUsedFor = "Put the state associated with a Pulsar Function." | ||
desc.CommandPermission = "This command requires super-user permissions." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this command requires super-user permission?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am wrong
pkg/ctl/functions/putstate.go
Outdated
|
||
var examples []pulsar.Example | ||
putstate := pulsar.Example{ | ||
Desc: "Put the state associated with a Pulsar Function", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put a key/value pair to the state associated with a Pulsar Function
pkg/ctl/functions/putstate.go
Outdated
examples = append(examples, putstate) | ||
|
||
putstateWithFQFN := pulsar.Example{ | ||
Desc: "Put the state associated with a Pulsar Function with FQFN", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update the description. see my comment above.
pkg/ctl/functions/stats.go
Outdated
func statsFunctionsCmd(vc *cmdutils.VerbCmd) { | ||
desc := pulsar.LongDescription{} | ||
desc.CommandUsedFor = "Get the current stats of a Pulsar Function." | ||
desc.CommandPermission = "This command requires super-user permissions." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
pkg/ctl/functions/stats_test.go
Outdated
assert.Equal(t, out.String(), "Created test-functions-stats successfully") | ||
|
||
// wait the function create successfully | ||
time.Sleep(time.Second * 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid sleeping. use a retry-loop to check the stats.
pkg/ctl/functions/status_test.go
Outdated
assert.Equal(t, out.String(), "Created test-functions-status successfully") | ||
|
||
// wait the function create successfully | ||
time.Sleep(time.Second * 30) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid sleeping
} | ||
admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) | ||
|
||
if funcData.TriggerValue == "" && funcData.TriggerFile == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about if a user provides both arguments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a user provides both arguments, priority use of triggerFile
, code logic as follows:
if (uploadedInputStream != null) {
targetArray = new byte[uploadedInputStream.available()];
uploadedInputStream.read(targetArray);
} else {
targetArray = input.getBytes();
}
uploadedInputStream
=> triggerFile
input
=> triggerValue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wolfstudy that's confusing. please fail the command if both options are provided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get it
pkg/ctl/functions/trigger_test.go
Outdated
assert.Equal(t, out.String(), "Created test-functions-trigger successfully") | ||
|
||
// wait the function create successfully | ||
time.Sleep(time.Second * 50) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid sleeping
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
pkg/ctl/functions/putstate.go
Outdated
func putstateFunctionsCmd(vc *cmdutils.VerbCmd) { | ||
desc := pulsar.LongDescription{} | ||
desc.CommandUsedFor = "Put a key/value pair to the state associated with a Pulsar Function." | ||
desc.CommandPermission = "This command requires user permissions." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's standardize the naming for permission:
- super-user permission
- namespace admin permissions
- namespace produce permissions
- namespace consume permissions
- namespace function permissions
So this command requires namespace function permissions.
pkg/ctl/functions/putstate.go
Outdated
"", | ||
"The name of a Pulsar Function") | ||
|
||
flagSet.StringVarP( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wolfstudy so what is the take here? are we still using --state
with a json string?
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
pkg/ctl/functions/putstate.go
Outdated
examples = append(examples, putstate) | ||
|
||
putstateWithByte := pulsar.Example{ | ||
Desc: "Put a key/<byte value> pair to the state associated with a Pulsar Function", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should treat this case as byte value
case
pkg/ctl/functions/putstate.go
Outdated
Out: "[✖] code: 404 reason: Function <your function name> doesn't exist", | ||
} | ||
|
||
failOutWithWrongJson := pulsar.Output{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we remove --state
, I don't think we will hit this error.
state.Key = vc.NameArgs[0] | ||
value := vc.NameArgs[1] | ||
|
||
if value == "-" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please take a look at kubectl.
# List contents of /usr from the first container of pod 123456-7890 and sort by modification time.
# If the command you want to execute in the pod has any flags in common (e.g. -i),
# you must use two dashes (--) to separate your command's flags/arguments.
# Also note, do not surround your command and its flags/arguments with quotes
# unless that is how you would execute it normally (i.e., do ls -t /usr, not "ls -t /usr").
kubectl exec 123456-7890 -i -t -- ls -t /usr
As I commented before, I was suggesting all the named args after -
(we can use --
similar as kubectl as well) are the string value.
I will explain that using following examples as below:
// the key is `key-1`, the string-value is `value-1`
pulsarctl functions putstate key-1 value-1
// throw exception since we are expecting one string-value
pulsarctl functions putstate key-1 value-1 value-2
// not sure how the framework work here. but ideally `value-1 value` should be treated as one name arg.
// if so, the string-value should be `value-1 value2`.
pulsarctl functions putstate key-1 "value-1 value2"
// in this example, all the name args after `--` should be treated as one string-value "hello world pulsar ctl!"
pulsarctl functions putstate key-1 -- hello world pulsar ctl!
for bytes-value, I was suggesting using >
as separator. but I forgot >
is a special character in bash. we can use a different separator. maybe ---
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use =
as the separator between key/byteValue pair?
>
or <
means input or output redirection
--
This marks end of parameter (option) list.
---
bad flag syntax: ---
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
@sijie PTAL again, thanks |
…tions (streamnative/pulsarctl#34) Master Issue: streamnative/pulsarctl#2 Add `stats`, `status`, `querystate`, `putstate`, `trigger` cmds for Pulsar Functions
…tions (streamnative/pulsarctl#34) Master Issue: streamnative/pulsarctl#2 Add `stats`, `status`, `querystate`, `putstate`, `trigger` cmds for Pulsar Functions
Master Issue: #2
Add
stats
,status
,querystate
,putstate
,trigger
cmds for Pulsar Functions