Skip to content

Commit

Permalink
Add stats, status, querystate, putstate, trigger cmds for Pulsar Func…
Browse files Browse the repository at this point in the history
…tions (streamnative/pulsarctl#34)

Master Issue: streamnative/pulsarctl#2 

Add `stats`, `status`, `querystate`, `putstate`, `trigger` cmds for Pulsar Functions
  • Loading branch information
wolfstudy authored and tisonkun committed Aug 16, 2023
1 parent 428fb71 commit 8cce397
Show file tree
Hide file tree
Showing 5 changed files with 419 additions and 0 deletions.
24 changes: 24 additions & 0 deletions pulsaradmin/pkg/pulsar/data.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

// ClusterData information on a cluster
Expand Down Expand Up @@ -47,6 +64,13 @@ type FunctionData struct {
MaxMessageRetries int `json:"maxMessageRetries"`
DeadLetterTopic string `json:"deadLetterTopic"`

Key string `json:"key"`
Watch bool `json:"watch"`
State string `json:"state"`
TriggerValue string `json:"triggerValue"`
TriggerFile string `json:"triggerFile"`
Topic string `json:"topic"`

UpdateAuthData bool `json:"updateAuthData"`

FuncConf *FunctionConfig `json:"-"`
Expand Down
26 changes: 26 additions & 0 deletions pulsaradmin/pkg/pulsar/function_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

type FunctionState struct {
Key string `json:"key"`
StringValue string `json:"stringValue"`
ByteValue []byte `json:"byteValue"`
NumValue int64 `json:"numberValue"`
Version int64 `json:"version"`
}
53 changes: 53 additions & 0 deletions pulsaradmin/pkg/pulsar/function_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

type FunctionStatus struct {
NumInstances int `json:"numInstances"`
NumRunning int `json:"numRunning"`
Instances []FunctionInstanceStatus `json:"instances"`
}

type FunctionInstanceStatus struct {
InstanceId int `json:"instanceId"`
Status FunctionInstanceStatusData `json:"status"`
}

type FunctionInstanceStatusData struct {
Running bool `json:"running"`
Err string `json:"error"`
NumRestarts int64 `json:"numRestarts"`
NumReceived int64 `json:"numReceived"`
NumSuccessfullyProcessed int64 `json:"numSuccessfullyProcessed"`
NumUserExceptions int64 `json:"numUserExceptions"`
LatestUserExceptions []ExceptionInformation `json:"latestUserExceptions"`
NumSystemExceptions int64 `json:"numSystemExceptions"`
LatestSystemExceptions []ExceptionInformation `json:"latestSystemExceptions"`
AverageLatency float64 `json:"averageLatency"`
LastInvocationTime int64 `json:"lastInvocationTime"`
WorkerId string `json:"workerId"`
}

type ExceptionInformation struct {
ExceptionString string `json:"exceptionString"`
TimestampMs int64 `json:"timestampMs"`
}

func (fs *FunctionStatus) AddInstance(functionInstanceStatus FunctionInstanceStatus) {
fs.Instances = append(fs.Instances, functionInstanceStatus)
}
171 changes: 171 additions & 0 deletions pulsaradmin/pkg/pulsar/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,30 @@ type Functions interface {
// Get the configuration for the specified function
GetFunction(tenant, namespace, name string) (FunctionConfig, error)

// Gets the current status of a function
GetFunctionStatus(tenant, namespace, name string) (FunctionStatus, error)

// Gets the current status of a function instance
GetFunctionStatusWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatusData, error)

// Gets the current stats of a function
GetFunctionStats(tenant, namespace, name string) (FunctionStats, error)

// Gets the current stats of a function instance
GetFunctionStatsWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatsData, error)

// Fetch the current state associated with a Pulsar Function
//
// Response Example:
// { "value : 12, version : 2"}
GetFunctionState(tenant, namespace, name, key string) (FunctionState, error)

// Puts the given state associated with a Pulsar Function
PutFunctionState(tenant, namespace, name string, state FunctionState) error

// Triggers the function by writing to the input topic
TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error)

// Update the configuration for a function.
UpdateFunction(functionConfig *FunctionConfig, fileName string, updateOptions *UpdateOptions) error

Expand Down Expand Up @@ -406,3 +430,150 @@ func (f *functions) UpdateFunctionWithUrl(functionConfig *FunctionConfig, pkgUrl

return nil
}

func (f *functions) GetFunctionStatus(tenant, namespace, name string) (FunctionStatus, error) {
var functionStatus FunctionStatus
endpoint := f.client.endpoint(f.basePath, tenant, namespace, name)
err := f.client.get(endpoint+"/status", &functionStatus)
return functionStatus, err
}

func (f *functions) GetFunctionStatusWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatusData, error) {
var functionInstanceStatusData FunctionInstanceStatusData
id := fmt.Sprintf("%d", instanceID)
endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id)
err := f.client.get(endpoint+"/status", &functionInstanceStatusData)
return functionInstanceStatusData, err
}

func (f *functions) GetFunctionStats(tenant, namespace, name string) (FunctionStats, error) {
var functionStats FunctionStats
endpoint := f.client.endpoint(f.basePath, tenant, namespace, name)
err := f.client.get(endpoint+"/stats", &functionStats)
return functionStats, err
}

func (f *functions) GetFunctionStatsWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatsData, error) {
var functionInstanceStatsData FunctionInstanceStatsData
id := fmt.Sprintf("%d", instanceID)
endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id)
err := f.client.get(endpoint+"/stats", &functionInstanceStatsData)
return functionInstanceStatsData, err
}

func (f *functions)GetFunctionState(tenant, namespace, name, key string) (FunctionState, error) {
var functionState FunctionState
endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, "state", key)
err := f.client.get(endpoint, &functionState)
return functionState, err
}

func (f *functions) PutFunctionState(tenant, namespace, name string, state FunctionState) error {
endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, "state", state.Key)

// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")

multiPartWriter := multipart.NewWriter(bodyBuf)

stateData, err := json.Marshal(state)

if err != nil {
return err
}

stateWriter, err := f.createStringFromField(multiPartWriter, "state")
if err != nil {
return err
}

_, err = stateWriter.Write(stateData)

if err != nil {
return err
}

// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err = multiPartWriter.Close(); err != nil {
return err
}

contentType := multiPartWriter.FormDataContentType()

err = f.client.postWithMultiPart(endpoint, nil, nil, bodyBuf, contentType)

if err != nil {
return err
}

return nil
}

func (f *functions) TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error) {
endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, "trigger")

// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")

multiPartWriter := multipart.NewWriter(bodyBuf)

if triggerFile != "" {
file, err := os.Open(triggerFile)
if err != nil {
return "", err
}
defer file.Close()

part, err := multiPartWriter.CreateFormFile("dataStream", filepath.Base(file.Name()))

if err != nil {
return "", err
}

// copy the actual file content to the filed's writer
_, err = io.Copy(part, file)
if err != nil {
return "", err
}
}

if triggerValue != "" {
valueWriter, err := f.createTextFromFiled(multiPartWriter, "data")
if err != nil {
return "", err
}

_, err = valueWriter.Write([]byte(triggerValue))
if err != nil {
return "", err
}
}

if topic != "" {
topicWriter, err := f.createTextFromFiled(multiPartWriter, "topic")
if err != nil {
return "", err
}

_, err = topicWriter.Write([]byte(topic))
if err != nil {
return "", err
}
}

// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err := multiPartWriter.Close(); err != nil {
return "", err
}

contentType := multiPartWriter.FormDataContentType()
var str string
err := f.client.postWithMultiPart(endpoint, &str, nil, bodyBuf, contentType)
if err != nil {
return "", err
}

return str, nil
}
Loading

0 comments on commit 8cce397

Please sign in to comment.