diff --git a/pulsaradmin/pkg/pulsar/data.go b/pulsaradmin/pkg/pulsar/data.go index a68c1f13a..16607d397 100644 --- a/pulsaradmin/pkg/pulsar/data.go +++ b/pulsaradmin/pkg/pulsar/data.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package pulsar // ClusterData information on a cluster @@ -47,6 +64,13 @@ type FunctionData struct { MaxMessageRetries int `json:"maxMessageRetries"` DeadLetterTopic string `json:"deadLetterTopic"` + Key string `json:"key"` + Watch bool `json:"watch"` + State string `json:"state"` + TriggerValue string `json:"triggerValue"` + TriggerFile string `json:"triggerFile"` + Topic string `json:"topic"` + UpdateAuthData bool `json:"updateAuthData"` FuncConf *FunctionConfig `json:"-"` diff --git a/pulsaradmin/pkg/pulsar/function_state.go b/pulsaradmin/pkg/pulsar/function_state.go new file mode 100644 index 000000000..a740e20e2 --- /dev/null +++ b/pulsaradmin/pkg/pulsar/function_state.go @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type FunctionState struct { + Key string `json:"key"` + StringValue string `json:"stringValue"` + ByteValue []byte `json:"byteValue"` + NumValue int64 `json:"numberValue"` + Version int64 `json:"version"` +} diff --git a/pulsaradmin/pkg/pulsar/function_status.go b/pulsaradmin/pkg/pulsar/function_status.go new file mode 100644 index 000000000..1c699beb2 --- /dev/null +++ b/pulsaradmin/pkg/pulsar/function_status.go @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type FunctionStatus struct { + NumInstances int `json:"numInstances"` + NumRunning int `json:"numRunning"` + Instances []FunctionInstanceStatus `json:"instances"` +} + +type FunctionInstanceStatus struct { + InstanceId int `json:"instanceId"` + Status FunctionInstanceStatusData `json:"status"` +} + +type FunctionInstanceStatusData struct { + Running bool `json:"running"` + Err string `json:"error"` + NumRestarts int64 `json:"numRestarts"` + NumReceived int64 `json:"numReceived"` + NumSuccessfullyProcessed int64 `json:"numSuccessfullyProcessed"` + NumUserExceptions int64 `json:"numUserExceptions"` + LatestUserExceptions []ExceptionInformation `json:"latestUserExceptions"` + NumSystemExceptions int64 `json:"numSystemExceptions"` + LatestSystemExceptions []ExceptionInformation `json:"latestSystemExceptions"` + AverageLatency float64 `json:"averageLatency"` + LastInvocationTime int64 `json:"lastInvocationTime"` + WorkerId string `json:"workerId"` +} + +type ExceptionInformation struct { + ExceptionString string `json:"exceptionString"` + TimestampMs int64 `json:"timestampMs"` +} + +func (fs *FunctionStatus) AddInstance(functionInstanceStatus FunctionInstanceStatus) { + fs.Instances = append(fs.Instances, functionInstanceStatus) +} diff --git a/pulsaradmin/pkg/pulsar/functions.go b/pulsaradmin/pkg/pulsar/functions.go index d72d3af2b..b02e29d10 100644 --- a/pulsaradmin/pkg/pulsar/functions.go +++ b/pulsaradmin/pkg/pulsar/functions.go @@ -71,6 +71,30 @@ type Functions interface { // Get the configuration for the specified function GetFunction(tenant, namespace, name string) (FunctionConfig, error) + // Gets the current status of a function + GetFunctionStatus(tenant, namespace, name string) (FunctionStatus, error) + + // Gets the current status of a function instance + GetFunctionStatusWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatusData, error) + + // Gets the current stats of a function + GetFunctionStats(tenant, namespace, name string) (FunctionStats, error) + + // Gets the current stats of a function instance + GetFunctionStatsWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatsData, error) + + // Fetch the current state associated with a Pulsar Function + // + // Response Example: + // { "value : 12, version : 2"} + GetFunctionState(tenant, namespace, name, key string) (FunctionState, error) + + // Puts the given state associated with a Pulsar Function + PutFunctionState(tenant, namespace, name string, state FunctionState) error + + // Triggers the function by writing to the input topic + TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error) + // Update the configuration for a function. UpdateFunction(functionConfig *FunctionConfig, fileName string, updateOptions *UpdateOptions) error @@ -406,3 +430,150 @@ func (f *functions) UpdateFunctionWithUrl(functionConfig *FunctionConfig, pkgUrl return nil } + +func (f *functions) GetFunctionStatus(tenant, namespace, name string) (FunctionStatus, error) { + var functionStatus FunctionStatus + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + err := f.client.get(endpoint+"/status", &functionStatus) + return functionStatus, err +} + +func (f *functions) GetFunctionStatusWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatusData, error) { + var functionInstanceStatusData FunctionInstanceStatusData + id := fmt.Sprintf("%d", instanceID) + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) + err := f.client.get(endpoint+"/status", &functionInstanceStatusData) + return functionInstanceStatusData, err +} + +func (f *functions) GetFunctionStats(tenant, namespace, name string) (FunctionStats, error) { + var functionStats FunctionStats + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + err := f.client.get(endpoint+"/stats", &functionStats) + return functionStats, err +} + +func (f *functions) GetFunctionStatsWithInstanceID(tenant, namespace, name string, instanceID int) (FunctionInstanceStatsData, error) { + var functionInstanceStatsData FunctionInstanceStatsData + id := fmt.Sprintf("%d", instanceID) + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) + err := f.client.get(endpoint+"/stats", &functionInstanceStatsData) + return functionInstanceStatsData, err +} + +func (f *functions)GetFunctionState(tenant, namespace, name, key string) (FunctionState, error) { + var functionState FunctionState + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, "state", key) + err := f.client.get(endpoint, &functionState) + return functionState, err +} + +func (f *functions) PutFunctionState(tenant, namespace, name string, state FunctionState) error { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, "state", state.Key) + + // buffer to store our request as bytes + bodyBuf := bytes.NewBufferString("") + + multiPartWriter := multipart.NewWriter(bodyBuf) + + stateData, err := json.Marshal(state) + + if err != nil { + return err + } + + stateWriter, err := f.createStringFromField(multiPartWriter, "state") + if err != nil { + return err + } + + _, err = stateWriter.Write(stateData) + + if err != nil { + return err + } + + // In here, we completed adding the file and the fields, let's close the multipart writer + // So it writes the ending boundary + if err = multiPartWriter.Close(); err != nil { + return err + } + + contentType := multiPartWriter.FormDataContentType() + + err = f.client.postWithMultiPart(endpoint, nil, nil, bodyBuf, contentType) + + if err != nil { + return err + } + + return nil +} + +func (f *functions) TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error) { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, "trigger") + + // buffer to store our request as bytes + bodyBuf := bytes.NewBufferString("") + + multiPartWriter := multipart.NewWriter(bodyBuf) + + if triggerFile != "" { + file, err := os.Open(triggerFile) + if err != nil { + return "", err + } + defer file.Close() + + part, err := multiPartWriter.CreateFormFile("dataStream", filepath.Base(file.Name())) + + if err != nil { + return "", err + } + + // copy the actual file content to the filed's writer + _, err = io.Copy(part, file) + if err != nil { + return "", err + } + } + + if triggerValue != "" { + valueWriter, err := f.createTextFromFiled(multiPartWriter, "data") + if err != nil { + return "", err + } + + _, err = valueWriter.Write([]byte(triggerValue)) + if err != nil { + return "", err + } + } + + if topic != "" { + topicWriter, err := f.createTextFromFiled(multiPartWriter, "topic") + if err != nil { + return "", err + } + + _, err = topicWriter.Write([]byte(topic)) + if err != nil { + return "", err + } + } + + // In here, we completed adding the file and the fields, let's close the multipart writer + // So it writes the ending boundary + if err := multiPartWriter.Close(); err != nil { + return "", err + } + + contentType := multiPartWriter.FormDataContentType() + var str string + err := f.client.postWithMultiPart(endpoint, &str, nil, bodyBuf, contentType) + if err != nil { + return "", err + } + + return str, nil +} diff --git a/pulsaradmin/pkg/pulsar/functions_stats.go b/pulsaradmin/pkg/pulsar/functions_stats.go new file mode 100644 index 000000000..3f644388b --- /dev/null +++ b/pulsaradmin/pkg/pulsar/functions_stats.go @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +type FunctionStats struct { + // Overall total number of records function received from source + ReceivedTotal int64 `json:"receivedTotal"` + + // Overall total number of records successfully processed by user function + ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"` + + // Overall total number of system exceptions thrown + SystemExceptionsTotal int64 `json:"systemExceptionsTotal"` + + // Overall total number of user exceptions thrown + UserExceptionsTotal int64 `json:"userExceptionsTotal"` + + // Average process latency for function + AvgProcessLatency float64 `json:"avgProcessLatency"` + + // Timestamp of when the function was last invoked by any instance + LastInvocation int64 `json:"lastInvocation"` + + OneMin FunctionInstanceStatsDataBase `json:"oneMin"` + + Instances []FunctionInstanceStats `json:"instances"` + + FunctionInstanceStats +} + +type FunctionInstanceStats struct { + FunctionInstanceStatsDataBase + + InstanceId int64 `json:"instanceId"` + + Metrics FunctionInstanceStatsData `json:"metrics"` +} + +type FunctionInstanceStatsDataBase struct { + // Total number of records function received from source for instance + ReceivedTotal int64 `json:"receivedTotal"` + + // Total number of records successfully processed by user function for instance + ProcessedSuccessfullyTotal int64 `json:"processedSuccessfullyTotal"` + + // Total number of system exceptions thrown for instance + SystemExceptionsTotal int64 `json:"systemExceptionsTotal"` + + // Total number of user exceptions thrown for instance + UserExceptionsTotal int64 `json:"userExceptionsTotal"` + + // Average process latency for function for instance + AvgProcessLatency float64 `json:"avgProcessLatency"` +} + +type FunctionInstanceStatsData struct { + OneMin FunctionInstanceStatsDataBase `json:"oneMin"` + + // Timestamp of when the function was last invoked for instance + LastInvocation int64 `json:"lastInvocation"` + + // Map of user defined metrics + UserMetrics map[string]float64 `json:"userMetrics"` + + FunctionInstanceStatsDataBase +} + +func (fs *FunctionStats) AddInstance(functionInstanceStats FunctionInstanceStats) { + fs.Instances = append(fs.Instances, functionInstanceStats) +} + +func (fs *FunctionStats) CalculateOverall() *FunctionStats { + var ( + nonNullInstances int + nonNullInstancesOneMin int + ) + + for _, functionInstanceStats := range fs.Instances { + functionInstanceStatsData := functionInstanceStats.Metrics + fs.ReceivedTotal += functionInstanceStatsData.ReceivedTotal + fs.ProcessedSuccessfullyTotal += functionInstanceStatsData.ProcessedSuccessfullyTotal + fs.SystemExceptionsTotal += functionInstanceStatsData.SystemExceptionsTotal + fs.UserExceptionsTotal += functionInstanceStatsData.UserExceptionsTotal + + if functionInstanceStatsData.AvgProcessLatency != 0 { + if fs.AvgProcessLatency == 0 { + fs.AvgProcessLatency = 0.0 + } + + fs.AvgProcessLatency += functionInstanceStatsData.AvgProcessLatency + nonNullInstances++ + } + + fs.OneMin.ReceivedTotal += functionInstanceStatsData.OneMin.ReceivedTotal + fs.OneMin.ProcessedSuccessfullyTotal += functionInstanceStatsData.OneMin.ProcessedSuccessfullyTotal + fs.OneMin.SystemExceptionsTotal += functionInstanceStatsData.OneMin.SystemExceptionsTotal + fs.OneMin.UserExceptionsTotal += functionInstanceStatsData.OneMin.UserExceptionsTotal + + if functionInstanceStatsData.OneMin.AvgProcessLatency != 0 { + if fs.OneMin.AvgProcessLatency == 0 { + fs.OneMin.AvgProcessLatency = 0.0 + } + + fs.OneMin.AvgProcessLatency += functionInstanceStatsData.OneMin.AvgProcessLatency + nonNullInstancesOneMin++ + } + + if functionInstanceStatsData.LastInvocation != 0 { + if fs.LastInvocation == 0 || functionInstanceStatsData.LastInvocation > fs.LastInvocation { + fs.LastInvocation = functionInstanceStatsData.LastInvocation + } + } + } + + // calculate average from sum + if nonNullInstances > 0 { + fs.AvgProcessLatency = fs.AvgProcessLatency / float64(nonNullInstances) + } else { + fs.AvgProcessLatency = 0 + } + + // calculate 1min average from sum + if nonNullInstancesOneMin > 0 { + fs.OneMin.AvgProcessLatency = fs.OneMin.AvgProcessLatency / float64(nonNullInstancesOneMin) + } else { + fs.AvgProcessLatency = 0 + } + + return fs +}