Skip to content

Commit

Permalink
Merge pull request #272 from zouyx/feature/addAsyncCall
Browse files Browse the repository at this point in the history
Ftr: add async call for dubbo protocol
  • Loading branch information
fangyincheng committed Dec 19, 2019
2 parents d65659f + 25d2c23 commit 9c83f5e
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 17 deletions.
4 changes: 4 additions & 0 deletions common/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,7 @@ func (p *Proxy) Implement(v common.RPCService) {
func (p *Proxy) Get() common.RPCService {
return p.rpc
}

func (p *Proxy) GetCallback() interface{} {
return p.callBack
}
1 change: 1 addition & 0 deletions common/proxy/proxy_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

type ProxyFactory interface {
GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy
GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy
GetInvoker(url common.URL) protocol.Invoker
}

Expand Down
7 changes: 6 additions & 1 deletion common/proxy/proxy_factory/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,16 @@ func NewDefaultProxyFactory(options ...proxy.Option) proxy.ProxyFactory {
return &DefaultProxyFactory{}
}
func (factory *DefaultProxyFactory) GetProxy(invoker protocol.Invoker, url *common.URL) *proxy.Proxy {
return factory.GetAsyncProxy(invoker, nil, url)
}

func (factory *DefaultProxyFactory) GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *proxy.Proxy {
//create proxy
attachments := map[string]string{}
attachments[constant.ASYNC_KEY] = url.GetParam(constant.ASYNC_KEY, "false")
return proxy.NewProxy(invoker, nil, attachments)
return proxy.NewProxy(invoker, callBack, attachments)
}

func (factory *DefaultProxyFactory) GetInvoker(url common.URL) protocol.Invoker {
return &ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
Expand Down
16 changes: 16 additions & 0 deletions common/proxy/proxy_factory/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package proxy_factory

import (
"fmt"
"testing"
)

Expand All @@ -37,6 +38,21 @@ func Test_GetProxy(t *testing.T) {
assert.NotNil(t, proxy)
}

type TestAsync struct {
}

func (u *TestAsync) CallBack(res common.CallbackResponse) {
fmt.Println("CallBack res:", res)
}

func Test_GetAsyncProxy(t *testing.T) {
proxyFactory := NewDefaultProxyFactory()
url := common.NewURLWithOptions()
async := &TestAsync{}
proxy := proxyFactory.GetAsyncProxy(protocol.NewBaseInvoker(*url), async.CallBack, url)
assert.NotNil(t, proxy)
}

func Test_GetInvoker(t *testing.T) {
proxyFactory := NewDefaultProxyFactory()
url := common.NewURLWithOptions()
Expand Down
12 changes: 12 additions & 0 deletions common/rpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ type RPCService interface {
Reference() string // rpc service id or reference id
}

//AsyncCallbackService callback interface for async
type AsyncCallbackService interface {
CallBack(response CallbackResponse) // callback
}

//CallbackResponse for different protocol
type CallbackResponse interface {
}

//AsyncCallback async callback method
type AsyncCallback func(response CallbackResponse)

// for lowercase func
// func MethodMapper() map[string][string] {
// return map[string][string]{}
Expand Down
11 changes: 8 additions & 3 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type ReferenceConfig struct {
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
async bool `yaml:"async" json:"async,omitempty" property:"async"`
Async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
Expand Down Expand Up @@ -141,7 +141,12 @@ func (refconfig *ReferenceConfig) Refer() {
}

//create proxy
refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url)
if refconfig.Async {
callback := GetCallback(refconfig.id)
refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(refconfig.invoker, callback, url)
} else {
refconfig.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(refconfig.invoker, url)
}
}

// @v is service provider implemented RPCService
Expand Down Expand Up @@ -169,7 +174,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic))
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
//getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async))
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async))

//application info
urlMap.Set(constant.APPLICATION_KEY, consumerConfig.ApplicationConfig.Name)
Expand Down
37 changes: 37 additions & 0 deletions config/reference_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func doInitConsumer() {
},
References: map[string]*ReferenceConfig{
"MockService": {
id: "MockProvider",
Params: map[string]string{
"serviceid": "soa.mock",
"forks": "5",
Expand Down Expand Up @@ -110,6 +111,26 @@ func doInitConsumer() {
}
}

var mockProvider = new(MockProvider)

type MockProvider struct {
}

func (m *MockProvider) Reference() string {
return "MockProvider"
}

func (m *MockProvider) CallBack(res common.CallbackResponse) {
}

func doInitConsumerAsync() {
doInitConsumer()
SetConsumerService(mockProvider)
for _, v := range consumerConfig.References {
v.Async = true
}
}

func doInitConsumerWithSingleRegistry() {
consumerConfig = &ConsumerConfig{
ApplicationConfig: &ApplicationConfig{
Expand Down Expand Up @@ -181,6 +202,22 @@ func Test_Refer(t *testing.T) {
}
consumerConfig = nil
}

func Test_ReferAsync(t *testing.T) {
doInitConsumerAsync()
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)

for _, reference := range consumerConfig.References {
reference.Refer()
assert.Equal(t, "soa.mock", reference.Params["serviceid"])
assert.NotNil(t, reference.invoker)
assert.NotNil(t, reference.pxy)
assert.NotNil(t, reference.pxy.GetCallback())
}
consumerConfig = nil
}

func Test_ReferP2P(t *testing.T) {
doInitConsumer()
extension.SetProtocol("dubbo", GetProtocol)
Expand Down
8 changes: 8 additions & 0 deletions config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,11 @@ func GetConsumerService(name string) common.RPCService {
func GetProviderService(name string) common.RPCService {
return proServices[name]
}

func GetCallback(name string) func(response common.CallbackResponse) {
service := GetConsumerService(name)
if sv, ok := service.(common.AsyncCallbackService); ok {
return sv.CallBack
}
return nil
}
10 changes: 5 additions & 5 deletions protocol/dubbo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,16 @@ type Options struct {
RequestTimeout time.Duration
}

type CallResponse struct {
//AsyncCallbackResponse async response for dubbo
type AsyncCallbackResponse struct {
common.CallbackResponse
Opts Options
Cause error
Start time.Time // invoke(call) start time == write start time
ReadStart time.Time // read start time, write duration = ReadStart - Start
Reply interface{}
}

type AsyncCallback func(response CallResponse)

type Client struct {
opts Options
conf ClientConfig
Expand Down Expand Up @@ -199,12 +199,12 @@ func (c *Client) Call(request *Request, response *Response) error {
return perrors.WithStack(c.call(ct, request, response, nil))
}

func (c *Client) AsyncCall(request *Request, callback AsyncCallback, response *Response) error {
func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, response *Response) error {

return perrors.WithStack(c.call(CT_TwoWay, request, response, callback))
}

func (c *Client) call(ct CallType, request *Request, response *Response, callback AsyncCallback) error {
func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error {

p := &DubboPackage{}
p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
Expand Down
5 changes: 3 additions & 2 deletions protocol/dubbo/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ func TestClient_AsyncCall(t *testing.T) {
user := &User{}
lock := sync.Mutex{}
lock.Lock()
err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response CallResponse) {
assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User))
err := c.AsyncCall(NewRequest("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, nil), func(response common.CallbackResponse) {
r := response.(AsyncCallbackResponse)
assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User))
lock.Unlock()
}, NewResponse(user, nil))
assert.NoError(t, err)
Expand Down
7 changes: 4 additions & 3 deletions protocol/dubbo/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

import (
"github.com/apache/dubbo-go-hessian2"
"github.com/apache/dubbo-go/common"
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -109,7 +110,7 @@ type PendingResponse struct {
err error
start time.Time
readStart time.Time
callback AsyncCallback
callback common.AsyncCallback
response *Response
done chan struct{}
}
Expand All @@ -122,8 +123,8 @@ func NewPendingResponse() *PendingResponse {
}
}

func (r PendingResponse) GetCallResponse() CallResponse {
return CallResponse{
func (r PendingResponse) GetCallResponse() common.CallbackResponse {
return AsyncCallbackResponse{
Cause: r.err,
Start: r.start,
ReadStart: r.readStart,
Expand Down
2 changes: 1 addition & 1 deletion protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
}
response := NewResponse(inv.Reply(), nil)
if async {
if callBack, ok := inv.CallBack().(func(response CallResponse)); ok {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
} else {
result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()))
Expand Down
6 changes: 4 additions & 2 deletions protocol/dubbo/dubbo_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol/invocation"
)
Expand Down Expand Up @@ -65,8 +66,9 @@ func TestDubboInvoker_Invoke(t *testing.T) {
// AsyncCall
lock := sync.Mutex{}
lock.Lock()
inv.SetCallBack(func(response CallResponse) {
assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*Response).reply.(*User))
inv.SetCallBack(func(response common.CallbackResponse) {
r := response.(AsyncCallbackResponse)
assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User))
lock.Unlock()
})
res = invoker.Invoke(inv)
Expand Down

0 comments on commit 9c83f5e

Please sign in to comment.