Skip to content

Commit

Permalink
Add namespace and topic name check (streamnative/pulsarctl#37)
Browse files Browse the repository at this point in the history
Add namespace and topic name check
  • Loading branch information
zymap authored and tisonkun committed Aug 16, 2023
1 parent d519e6b commit 3530319
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 0 deletions.
75 changes: 75 additions & 0 deletions pulsaradmin/pkg/pulsar/namespace_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package pulsar

import (
"fmt"
"github.com/pkg/errors"
"regexp"
"strings"
)

type NameSpaceName struct {
tenant string
nameSpace string
}

func GetNameSpaceName(tenant, namespace string) (*NameSpaceName, error) {
return GetNamespaceName(fmt.Sprintf("%s/%s", tenant,namespace))
}

func GetNamespaceName(completeName string) (*NameSpaceName, error) {
var n NameSpaceName

if completeName == "" {
return nil, errors.New("The namespace complete name is empty.")
}

parts := strings.Split(completeName, "/")
if len(parts) == 2 {
n.tenant = parts[0]
n.nameSpace = parts[1]
err := validateNamespaceName(n.tenant, n.nameSpace)
if err != nil {
return nil, err
}
} else {
return nil, errors.Errorf("The complete name of namespace is invalid. complete name : [%s]", completeName)
}

return &n, nil
}

func (n *NameSpaceName) String() string {
return fmt.Sprintf("%s/%s", n.tenant, n.nameSpace)
}

func validateNamespaceName(tenant, namespace string) error {
if tenant == "" || namespace == "" {
return errors.Errorf("Invalid tenant or namespace. [%s/%s]", tenant, namespace)
}

ok := checkName(tenant)
if !ok {
return errors.Errorf("Tenant name include unsupported special chars. tenant : [%s]", tenant)
}

ok = checkName(namespace)
if !ok {
return errors.Errorf("Namespace name include unsupported special chars. namespace : [%s]", namespace)
}

return nil
}

// allowed characters for property, namespace, cluster and topic
// names are alphanumeric (a-zA-Z0-9) and these special chars -=:.
// and % is allowed as part of valid URL encoding
const PATTEN = "^[-=:.\\w]*$"

func checkName(name string) bool {
patten, err := regexp.Compile(PATTEN)
if err != nil {
return false
}

return patten.MatchString(name)
}
46 changes: 46 additions & 0 deletions pulsaradmin/pkg/pulsar/namespace_name_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package pulsar

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestGetNamespaceName(t *testing.T) {
success, err := GetNamespaceName("public/default")
assert.Nil(t, err)
assert.Equal(t, "public/default", success.String())

empty, err := GetNamespaceName("")
assert.NotNil(t, err)
assert.Equal(t, "The namespace complete name is empty.", err.Error())
assert.Nil(t, empty)

empty, err = GetNamespaceName("/")
assert.NotNil(t, err)
assert.Equal(t, "Invalid tenant or namespace. [/]", err.Error())
assert.Nil(t, empty)

invalid, err := GetNamespaceName("public/default/fail")
assert.NotNil(t, err)
assert.Equal(t, "The complete name of namespace is invalid. complete name : [public/default/fail]", err.Error())
assert.Nil(t, invalid)

invalid, err = GetNamespaceName("public")
assert.NotNil(t, err)
assert.Equal(t, "The complete name of namespace is invalid. complete name : [public]", err.Error())
assert.Nil(t, invalid)

special, err := GetNamespaceName("-=.:/-=.:")
assert.Nil(t, err)
assert.Equal(t, "-=.:/-=.:", special.String())

tenantInvalid, err := GetNamespaceName("\"/namespace")
assert.NotNil(t, err)
assert.Equal(t, "Tenant name include unsupported special chars. tenant : [\"]", err.Error())
assert.Nil(t, tenantInvalid)

namespaceInvalid, err := GetNamespaceName("tenant/}")
assert.NotNil(t, err)
assert.Equal(t, "Namespace name include unsupported special chars. namespace : [}]", err.Error())
assert.Nil(t, namespaceInvalid)
}
26 changes: 26 additions & 0 deletions pulsaradmin/pkg/pulsar/topic_domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package pulsar

import "github.com/pkg/errors"

type TopicDomain string

const (
persistent TopicDomain = "persistent"
non_persistent TopicDomain = "non-persistent"
)

func ParseTopicDomain(domain string) (TopicDomain, error) {
switch domain {
case "persistent":
return persistent, nil
case "non-persistent":
return non_persistent, nil
default:
return "", errors.Errorf("The domain only can be specified as 'persistent' or " +
"'non-persistent'. Input domain is '%s'.", domain)
}
}

func (t TopicDomain) String() string {
return string(t)
}
102 changes: 102 additions & 0 deletions pulsaradmin/pkg/pulsar/topic_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package pulsar

import (
"fmt"
"github.com/pkg/errors"
"strconv"
"strings"
)

const (
PUBLIC_TENANT = "public"
DEFAULT_NAMESPACE = "default"
PARTITIONED_TOPIC_SUFFIX = "-partition-"
)

type TopicName struct {
domain TopicDomain
tenant string
namespace string
topic string
partitionIndex int

namespaceName *NameSpaceName
}

// The topic name can be in two different forms, one is fully qualified topic name,
// the other one is short topic name
func GetTopicName(completeName string) (*TopicName, error) {
var topicname TopicName
// The short topic name can be:
// - <topic>
// - <tenant>/<namespace>/<topic>
if !strings.Contains(completeName, "://") {
parts := strings.Split(completeName, "/")
if len(parts) == 3 {
completeName = persistent.String() + "://" + completeName
} else if len(parts) == 1 {
completeName = persistent.String() + "://" + PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE + "/" + parts[0]
} else {
return nil, errors.Errorf("Invalid short topic name '%s', it should be "+
"in the format of <tenant>/<namespace>/<topic> or <topic>", completeName)
}
}

// The fully qualified topic name can be:
// <domain>://<tenant>/<namespace>/<topic>

parts := strings.Split(completeName, "://")
if len(parts) != 2 {
return nil, errors.Errorf("Invalid complete topic name '%s', it should be in "+
"the format of <domain>://<tenant>/<namespace>/<topic>", completeName)
}

domain, err := ParseTopicDomain(parts[0])
if err != nil {
return nil, err
}
topicname.domain = domain

rest := parts[1]
parts = strings.Split(rest, "/")
if len(parts) == 3 {
topicname.tenant = parts[0]
topicname.namespace = parts[1]
topicname.topic = parts[2]
topicname.partitionIndex = getPartitionIndex(completeName)
} else {
return nil, errors.Errorf("Invalid topic name '%s', it should be in the format of "+
"<tenant>/<namespace>/<topic>", rest)
}

n, err := GetNameSpaceName(topicname.tenant, topicname.namespace)
if err != nil {
return nil, err
}
topicname.namespaceName = n

return &topicname, nil
}

func (t *TopicName) String() string {
return fmt.Sprintf("%s://%s/%s/%s", t.domain, t.tenant, t.namespace, t.topic)
}

func (t *TopicName) GetDomain() TopicDomain {
return t.domain
}

func (t *TopicName) GetRestPath() string {
return fmt.Sprintf("%s/%s/%s/%s", t.domain, t.tenant, t.namespace, t.topic)
}

func getPartitionIndex(topic string) int {
if strings.Contains(topic, PARTITIONED_TOPIC_SUFFIX) {
parts := strings.Split(topic, "-")
index, err := strconv.Atoi(parts[len(parts)-1])
if err == nil {
return index
}
}
return -1
}
48 changes: 48 additions & 0 deletions pulsaradmin/pkg/pulsar/topic_name_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package pulsar

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestGetTopicName(t *testing.T) {
success, err := GetTopicName("success")
assert.Nil(t, err)
assert.Equal(t, "persistent://public/default/success", success.String())

success, err = GetTopicName("tenant/namespace/success")
assert.Nil(t, err)
assert.Equal(t, "persistent://tenant/namespace/success", success.String())

success, err = GetTopicName("persistent://tenant/namespace/success")
assert.Nil(t, err)
assert.Equal(t, "persistent://tenant/namespace/success", success.String())

success, err = GetTopicName("non-persistent://tenant/namespace/success")
assert.Nil(t, err)
assert.Equal(t, "non-persistent://tenant/namespace/success", success.String())

fail, err := GetTopicName("default/fail")
assert.NotNil(t, err)
assert.Equal(t, "Invalid short topic name 'default/fail', it should be in the "+
"format of <tenant>/<namespace>/<topic> or <topic>", err.Error())
assert.Nil(t, fail)

fail, err = GetTopicName("domain://tenant/namespace/fail")
assert.NotNil(t, err)
assert.Equal(t, "The domain only can be specified as 'persistent' or 'non-persistent'. "+
"Input domain is 'domain'.", err.Error())
assert.Nil(t, fail)

fail, err = GetTopicName("persistent:///tenant/namespace/fail")
assert.NotNil(t, err)
assert.Equal(t, "Invalid topic name '/tenant/namespace/fail', it should be in the format "+
"of <tenant>/<namespace>/<topic>", err.Error())
assert.Nil(t, fail)

fail, err = GetTopicName("persistent://tenant/namespace")
assert.NotNil(t, err)
assert.Equal(t, "Invalid topic name 'tenant/namespace', it should be in the format "+
"of <tenant>/<namespace>/<topic>", err.Error())
assert.Nil(t, fail)
}

0 comments on commit 3530319

Please sign in to comment.