Skip to content

Commit

Permalink
Add create cmd of Pulsar Functions (streamnative/pulsarctl#15)
Browse files Browse the repository at this point in the history
Add create cmd of Pulsar Functions
  • Loading branch information
wolfstudy authored and tisonkun committed Aug 16, 2023
1 parent 61615ea commit 1f646f5
Show file tree
Hide file tree
Showing 11 changed files with 563 additions and 16 deletions.
2 changes: 2 additions & 0 deletions pulsaradmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/streamnative/pulsar-admin-go
go 1.12

require (
github.com/davecgh/go-spew v1.1.1
github.com/fatih/color v1.7.0 // indirect
github.com/google/go-cmp v0.3.1 // indirect
github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06
Expand All @@ -15,4 +16,5 @@ require (
github.com/spf13/pflag v1.0.3
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.3.0
gopkg.in/yaml.v2 v2.2.2
)
39 changes: 39 additions & 0 deletions pulsaradmin/pkg/pulsar/WindowConfing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

const WindowConfigKey = "__WINDOWCONFIGS__"

type WindowConfig struct {
WindowLengthCount int
WindowLengthDurationMs int64
SlidingIntervalCount int
SlidingIntervalDurationMs int64
LateDataTopic string
MaxLagMs int64
WatermarkEmitIntervalMs int64
TimestampExtractorClassName string
ActualWindowFunctionClassName string
}

func NewDefaultWindowConfing() *WindowConfig {
windowConfig := &WindowConfig{}

return windowConfig
}

43 changes: 35 additions & 8 deletions pulsaradmin/pkg/pulsar/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func DefaultConfig() *Config {
// Client provides a client to the Pulsar Restful API
type Client interface {
Clusters() Clusters
Functions() Functions
}

type client struct {
Expand Down Expand Up @@ -209,6 +210,27 @@ func (c *client) post(endpoint string, in, obj interface{}) error {
return err
}
req.obj = in
resp, err := checkSuccessful(c.doRequest(req))
if err != nil {
return err
}
defer safeRespClose(resp)
if obj != nil {
if err := decodeJsonBody(resp, &obj); err != nil {
return err
}
}
return nil
}

func (c *client) postWithMultiPart(endpoint string, in, obj interface{}, body io.Reader, contentType string) error {
req, err := c.newRequest(http.MethodPost, endpoint)
if err != nil {
return err
}
req.obj = in
req.body = body
req.contentType = contentType

resp, err := checkSuccessful(c.doRequest(req))
if err != nil {
Expand All @@ -226,9 +248,10 @@ func (c *client) post(endpoint string, in, obj interface{}) error {
}

type request struct {
method string
url *url.URL
params url.Values
method string
contentType string
url *url.URL
params url.Values

obj interface{}
body io.Reader
Expand Down Expand Up @@ -288,9 +311,14 @@ func (c *client) doRequest(r *request) (*http.Response, error) {
return nil, err
}

// add default headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
if r.contentType != "" {
req.Header.Set("Content-Type", r.contentType)
} else {
// add default headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
}

req.Header.Set("User-Agent", c.useragent())

hc := c.httpClient
Expand All @@ -301,8 +329,7 @@ func (c *client) doRequest(r *request) (*http.Response, error) {
hc.Transport = c.transport
}

resp, err := hc.Do(req)
return resp, err
return hc.Do(req)
}

// decodeJsonBody is used to JSON encode a body
Expand Down
3 changes: 1 addition & 2 deletions pulsaradmin/pkg/pulsar/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func (c *clusters) Get(name string) (ClusterData, error) {

func (c *clusters) Create(cdata ClusterData) error {
endpoint := c.client.endpoint(c.basePath, cdata.Name)
err := c.client.put(endpoint, &cdata, nil)
return err
return c.client.put(endpoint, &cdata, nil)
}

func (c *clusters) Delete(name string) error {
Expand Down
25 changes: 25 additions & 0 deletions pulsaradmin/pkg/pulsar/consumerConfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

type ConsumerConfig struct {
SchemaType string
SerdeClassName string
IsRegexPattern bool
ReceiverQueueSize int
}
52 changes: 46 additions & 6 deletions pulsaradmin/pkg/pulsar/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,50 @@ package pulsar

// ClusterData information on a cluster
type ClusterData struct {
Name string `json:"-"`
ServiceURL string `json:"serviceUrl"`
ServiceURLTls string `json:"serviceUrlTls"`
BrokerServiceURL string `json:"brokerServiceUrl"`
BrokerServiceURLTls string `json:"brokerServiceUrlTls"`
PeerClusterNames []string `json:"peerClusterNames"`
Name string `json:"-"`
ServiceURL string `json:"serviceUrl"`
ServiceURLTls string `json:"serviceUrlTls"`
BrokerServiceURL string `json:"brokerServiceUrl"`
BrokerServiceURLTls string `json:"brokerServiceUrlTls"`
PeerClusterNames []string `json:"peerClusterNames"`
}

// FunctionData information for a Pulsar Function
type FunctionData struct {
FQFN string `json:"fqfn"`
Tenant string `json:"tenant"`
Namespace string `json:"namespace"`
FuncName string `json:"functionName"`
ClassName string `json:"className"`
Jar string `json:"jarFile"`
Py string `json:"pyFile"`
Go string `json:"goFile"`
Inputs string `json:"inputs"`
TopicsPattern string `json:"topicsPattern"`
Output string `json:"output"`
LogTopic string `json:"logTopic"`
SchemaType string `json:"schemaType"`
CustomSerDeInputs string `json:"customSerdeInputString"`
CustomSchemaInput string `json:"customSchemaInputString"`
OutputSerDeClassName string `json:"outputSerdeClassName"`
FunctionConfigFile string `json:"fnConfigFile"`
ProcessingGuarantees string `json:"processingGuarantees"`
UserConfig string `json:"userConfigString"`
RetainOrdering bool `json:"retainOrdering"`
SubsName string `json:"subsName"`
Parallelism int `json:"parallelism"`
CPU float64 `json:"cpu"`
RAM int64 `json:"ram"`
Disk int64 `json:"disk"`
WindowLengthCount int `json:"windowLengthCount"`
WindowLengthDurationMs int64 `json:"windowLengthDurationMs"`
SlidingIntervalCount int `json:"slidingIntervalCount"`
SlidingIntervalDurationMs int64 `json:"slidingIntervalDurationMs"`
AutoAck bool `json:"autoAck"`
TimeoutMs int64 `json:"timeoutMs"`
MaxMessageRetries int `json:"maxMessageRetries"`
DeadLetterTopic string `json:"deadLetterTopic"`

FuncConf *FunctionConfig `json:"-"`
UserCodeFile string `json:"-"`
}
87 changes: 87 additions & 0 deletions pulsaradmin/pkg/pulsar/functionConfg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

type ProcessingGuarantees int

type Runtime int

const (
AtLeasetOnce ProcessingGuarantees = iota
AtMostOnce
EffectivelyOnce
)

const (
Java Runtime = iota
Python
Go
)

type FunctionConfig struct {
// Any flags that you want to pass to the runtime.
// note that in thread mode, these flags will have no impact
RuntimeFlags string `json:"runtimeFlags" yaml:"runtimeFlags"`

Tenant string `json:"tenant" yaml:"tenant"`
Namespace string `json:"namespace" yaml:"namespace"`
Name string `json:"name" yaml:"name"`
ClassName string `json:"className" yaml:"className"`

Inputs []string `json:"inputs" yaml:"inputs"`
CustomSerdeInputs map[string]string `json:"customSerdeInputs" yaml:"customSerdeInputs"`
TopicsPattern *string `json:"topicsPattern" yaml:"topicsPattern"`
CustomSchemaInputs map[string]string `json:"customSchemaInputs" yaml:"customSchemaInputs"`

// A generalized way of specifying inputs
InputSpecs map[string]ConsumerConfig `json:"inputSpecs" yaml:"inputSpecs"`

Output string `json:"output" yaml:"output"`

// Represents either a builtin schema type (eg: 'avro', 'json', ect) or the class name for a Schema implementation
OutputSchemaType string `json:"outputSchemaType" yaml:"outputSchemaType"`

OutputSerdeClassName string `json:"outputSerdeClassName" yaml:"outputSerdeClassName"`
LogTopic string `json:"logTopic" yaml:"logTopic"`
ProcessingGuarantees ProcessingGuarantees `json:"processingGuarantees" yaml:"processingGuarantees"`
RetainOrdering bool `json:"retainOrdering" yaml:"retainOrdering"`
UserConfig map[string]interface{} `json:"userConfig" yaml:"userConfig"`

// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
// encapsulates how the secret is fetched by the underlying
// secrets provider. The type of an value here can be found by the
// SecretProviderConfigurator.getSecretObjectType() method.
Secrets map[string]interface{} `json:"secrets" yaml:"secrets"`

Runtime Runtime `json:"runtime" yaml:"runtime"`
AutoAck bool `json:"autoAck" yaml:"autoAck"`
MaxMessageRetries int `json:"maxMessageRetries" yaml:"maxMessageRetries"`
DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
SubName string `json:"subName" yaml:"subName"`
Parallelism int `json:"parallelism" yaml:"parallelism"`
Resources *Resources `json:"resources" yaml:"resources"`
FQFN string `json:"fqfn" yaml:"fqfn"`
WindowConfig *WindowConfig `json:"windowConfig" yaml:"windowConfig"`
TimeoutMs *int64 `json:"timeoutMs" yaml:"timeoutMs"`
Jar string `json:"jar" yaml:"jar"`
Py string `json:"py" yaml:"py"`
Go string `json:"go" yaml:"go"`
// Whether the subscriptions the functions created/used should be deleted when the functions is deleted
CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"`
}
Loading

0 comments on commit 1f646f5

Please sign in to comment.