Skip to content

Commit

Permalink
Merge pull request #17 from sercand/v3
Browse files Browse the repository at this point in the history
V3
  • Loading branch information
sercand committed Aug 1, 2020
2 parents 26b1db3 + bcc807b commit 60149b9
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 40 deletions.
39 changes: 32 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
# kuberesolver
Grpc Client-Side Load Balancer with Kubernetes name resolver

A Grpc name resolver by using kubernetes API.
It comes with a small ~250 LOC kubernetes client to find service endpoints. Therefore it won't bloat your binaries.

```go
// Register kuberesolver to grpc
// Register kuberesolver to grpc before calling grpc.Dial
kuberesolver.RegisterInCluster()

// is same as
resolver.Register(kuberesolver.NewBuilder(nil))
// you can bring your own k8s client, below is default behaviour
client, err := kuberesolver.NewInClusterK8sClient()
resolver.Register(kuberesolver.NewBuilder(client))
resolver.Register(kuberesolver.NewBuilder(nil /*custom kubernetes client*/ , "kubernetes"))

// USAGE:
// if schema is 'kubernetes' then grpc will use kuberesolver to resolve addresses
cc, err := grpc.Dial("kubernetes:///service-name.namespace:portname", opts...)
cc, err := grpc.Dial("kubernetes:///service.namespace:portname", opts...)
```

An url can be one of the following, [grpc naming docs](https://github.com/grpc/grpc/blob/master/doc/naming.md)

```
kubernetes:///service-name:8080
kubernetes:///service-name:portname
Expand All @@ -26,3 +27,27 @@ kubernetes://service-name:8080/
kubernetes://service-name.namespace:8080/
```

### Using alternative Schema

Use `RegisterInClusterWithSchema(schema)` instead of `RegisterInCluster` on start.

### Client Side Load Balancing

You need to pass grpc.WithBalancerName option to grpc on dial:

```go
grpc.DialContext(ctx, "kubernetes:///service:grpc", grpc.WithBalancerName("round_robin") )
```
This will create subconnections for each available service endpoints.

### How is this different from dialing to `service.namespace:8080`

Connecting to a service by dialing to `service.namespace:8080` uses DNS and it returns service stable IP. Therefore, gRPC doesn't know the endpoint IP addresses and it fails to reconnect to target services in case of failure.

Kuberesolver uses kubernetes API to get and watch service endpoint IP addresses.
Since it provides and updates all available service endpoints, together with a client-side balancer you can achive zero downtime deployments.

### RBAC

You need give `GET` and `WATCH` access to the `endpoints` if you are using RBAC in your cluster.
20 changes: 9 additions & 11 deletions builder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kuberesolver

import (
"context"
"fmt"
"io"
"net"
Expand All @@ -11,7 +12,6 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/net/context"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)
Expand Down Expand Up @@ -47,19 +47,20 @@ type targetInfo struct {
}

func (ti targetInfo) String() string {
return fmt.Sprintf("kubernetes:///%s/%s:%s", ti.serviceNamespace, ti.serviceName, ti.port)
return fmt.Sprintf("kubernetes://%s/%s:%s", ti.serviceNamespace, ti.serviceName, ti.port)
}

// RegisterInCluster registers the kuberesolver builder to grpc
// RegisterInCluster registers the kuberesolver builder to grpc with kubernetes schema
func RegisterInCluster() {
RegisterInClusterWithSchema(kubernetesSchema)
}

// RegisterInClusterWithSchema registers the kuberesolver builder to the grpc with custom schema
func RegisterInClusterWithSchema(schema string) {
resolver.Register(NewBuilder(nil, schema))
}

// NewBuilder creates a kubeBuilder which is used to factory Kuberesolvers.
// NewBuilder creates a kubeBuilder which is used by grpc resolver.
func NewBuilder(client K8sClient, schema string) resolver.Builder {
return &kubeBuilder{
k8sClient: client,
Expand All @@ -79,14 +80,8 @@ func parseResolverTarget(target resolver.Target) (targetInfo, error) {
// kubernetes://service.default:port/
if end == "" {
end = target.Authority
snamespace = "default"
snamespace = ""
}
// kubernetes:///service:port
// kubernetes://service:port/
if snamespace == "" {
snamespace = "default"
}

ti := targetInfo{}
if end == "" {
return targetInfo{}, fmt.Errorf("target(%q) is empty", target)
Expand Down Expand Up @@ -140,6 +135,9 @@ func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
if err != nil {
return nil, err
}
if ti.serviceNamespace == "" {
ti.serviceNamespace = getCurrentNamespaceOrDefault()
}
ctx, cancel := context.WithCancel(context.Background())
r := &kResolver{
target: ti,
Expand Down
45 changes: 25 additions & 20 deletions builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package kuberesolver

import (
"fmt"
"google.golang.org/grpc/serviceconfig"
"log"
"strings"
"testing"

"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

func newTestBuilder() resolver.Builder {
Expand Down Expand Up @@ -68,9 +68,10 @@ func TestBuilder(t *testing.T) {

}

//
// copied from grpc package to test parsing endpoints

// split2 returns the values from strings.SplitN(s, sep, 2).
// If sep is not found, it returns ("", s, false) instead.
// If sep is not found, it returns ("", "", false) instead.
func split2(s, sep string) (string, string, bool) {
spl := strings.SplitN(s, sep, 2)
if len(spl) < 2 {
Expand All @@ -79,29 +80,33 @@ func split2(s, sep string) (string, string, bool) {
return spl[0], spl[1], true
}

// copied from grpc package to test parsing endpoints
// ParseTarget splits target into a resolver.Target struct containing scheme,
// authority and endpoint.
//
// parseTarget splits target into a struct containing scheme, authority and
// endpoint.
// If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
// target}.
func parseTarget(target string) (ret resolver.Target) {
var ok bool
ret.Scheme, ret.Endpoint, ok = split2(target, "://")
if !ok {
return resolver.Target{Endpoint: target}
}
ret.Authority, ret.Endpoint, _ = split2(ret.Endpoint, "/")
ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
if !ok {
return resolver.Target{Endpoint: target}
}
return ret
}

func TestParseResolverTarget(t *testing.T) {
for _, test := range []struct {
for i, test := range []struct {
target resolver.Target
want targetInfo
err bool
}{
{resolver.Target{"", "", ""}, targetInfo{"", "", "", false, false}, true},
{resolver.Target{"", "a", ""}, targetInfo{"a", "default", "", false, true}, false},
{resolver.Target{"", "", "a"}, targetInfo{"a", "default", "", false, true}, false},
{resolver.Target{"", "a", ""}, targetInfo{"a", "", "", false, true}, false},
{resolver.Target{"", "", "a"}, targetInfo{"a", "", "", false, true}, false},
{resolver.Target{"", "a", "b"}, targetInfo{"b", "a", "", false, true}, false},
{resolver.Target{"", "a.b", ""}, targetInfo{"a", "b", "", false, true}, false},
{resolver.Target{"", "", "a.b"}, targetInfo{"a", "b", "", false, true}, false},
Expand All @@ -113,50 +118,50 @@ func TestParseResolverTarget(t *testing.T) {
} {
got, err := parseResolverTarget(test.target)
if err == nil && test.err {
t.Errorf("want error but got nil")
t.Errorf("case %d: want error but got nil", i)
continue
}
if err != nil && !test.err {
t.Errorf("got '%v' error but don't want an error", err)
t.Errorf("case %d: got '%v' error but don't want an error", i, err)
continue
}
if got != test.want {
t.Errorf("parseTarget(%q) = %+v, want %+v", test.target, got, test.want)
t.Errorf("case %d parseTarget(%q) = %+v, want %+v", i, test.target, got, test.want)
}
}
}

func TestParseTargets(t *testing.T) {
for _, test := range []struct {
for i, test := range []struct {
target string
want targetInfo
err bool
}{
{"", targetInfo{}, true},
{"kubernetes:///", targetInfo{}, true},
{"kubernetes://a:30", targetInfo{}, true},
{"kubernetes://a/", targetInfo{"a", "default", "", false, true}, false},
{"kubernetes:///a", targetInfo{"a", "default", "", false, true}, false},
{"kubernetes://a/", targetInfo{"a", "", "", false, true}, false},
{"kubernetes:///a", targetInfo{"a", "", "", false, true}, false},
{"kubernetes://a/b", targetInfo{"b", "a", "", false, true}, false},
{"kubernetes://a.b/", targetInfo{"a", "b", "", false, true}, false},
{"kubernetes:///a.b:80", targetInfo{"a", "b", "80", false, false}, false},
{"kubernetes:///a.b:port", targetInfo{"a", "b", "port", true, false}, false},
{"kubernetes:///a:port", targetInfo{"a", "default", "port", true, false}, false},
{"kubernetes:///a:port", targetInfo{"a", "", "port", true, false}, false},
{"kubernetes://x/a:port", targetInfo{"a", "x", "port", true, false}, false},
{"kubernetes://a.x:port/", targetInfo{"a", "x", "port", true, false}, false},
{"kubernetes://a.x:30/", targetInfo{"a", "x", "30", false, false}, false},
} {
got, err := parseResolverTarget(parseTarget(test.target))
if err == nil && test.err {
t.Errorf("want error but got nil")
t.Errorf("case %d: want error but got nil", i)
continue
}
if err != nil && !test.err {
t.Errorf("got '%v' error but don't want an error", err)
t.Errorf("case %d:got '%v' error but don't want an error", i, err)
continue
}
if got != test.want {
t.Errorf("parseTarget(%q) = %+v, want %+v", test.target, got, test.want)
t.Errorf("case %d: parseTarget(%q) = %+v, want %+v", i, test.target, got, test.want)
}
}
}
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/sercand/kuberesolver/v3

go 1.14

require (
github.com/prometheus/client_golang v1.7.1
google.golang.org/grpc v1.31.0
)
Loading

0 comments on commit 60149b9

Please sign in to comment.