From e0c29a578f29e823e6ccddeb8d433a1896aab5eb Mon Sep 17 00:00:00 2001 From: amudong Date: Sun, 12 Jan 2020 04:29:14 +0800 Subject: [PATCH 01/14] add nacos configcenter --- config_center/nacos/client.go | 247 +++++++++++++++++++++++++++++++ config_center/nacos/facade.go | 98 ++++++++++++ config_center/nacos/factory.go | 42 ++++++ config_center/nacos/impl.go | 160 ++++++++++++++++++++ config_center/nacos/impl_test.go | 115 ++++++++++++++ config_center/nacos/listener.go | 66 +++++++++ 6 files changed, 728 insertions(+) create mode 100644 config_center/nacos/client.go create mode 100644 config_center/nacos/facade.go create mode 100644 config_center/nacos/factory.go create mode 100644 config_center/nacos/impl.go create mode 100644 config_center/nacos/impl_test.go create mode 100644 config_center/nacos/listener.go diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go new file mode 100644 index 0000000000..8dc742e5c4 --- /dev/null +++ b/config_center/nacos/client.go @@ -0,0 +1,247 @@ +package nacos + +import ( + "strconv" + "strings" + "sync" + "time" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/config_client" + nacosconst "github.com/nacos-group/nacos-sdk-go/common/constant" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" +) + +const ( + ConnDelay = 3 + MaxFailTimes = 15 +) + +type NacosClient struct { + name string + NacosAddrs []string + sync.Mutex // for Client + Client *config_client.IConfigClient + exit chan struct{} + Timeout time.Duration +} + +type Option func(*Options) + +type Options struct { + nacosName string + client *NacosClient +} + +func WithNacosName(name string) Option { + return func(opt *Options) { + opt.nacosName = name + } +} + +func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { + var ( + err error + ) + opions := &Options{} + for _, opt := range opts { + opt(opions) + } + + err = nil + + lock := container.NacosClientLock() + url := container.GetUrl() + + lock.Lock() + defer lock.Unlock() + + if container.NacosClient() == nil { + //in dubbo ,every registry only connect one node ,so this is []string{r.Address} + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + logger.Errorf("timeout config %v is invalid ,err is %v", + url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error()) + return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) + } + nacosAddresses := strings.Split(url.Location, ",") + newClient, err := newNacosClient(opions.nacosName, nacosAddresses, timeout) + if err != nil { + logger.Warnf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}", + opions.nacosName, url.Location, timeout.String(), err) + return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) + } + container.SetNacosClient(newClient) + } + + if container.NacosClient().Client == nil { + svrConfList := []nacosconst.ServerConfig{} + for _, nacosAddr := range container.NacosClient().NacosAddrs { + split := strings.Split(nacosAddr,":") + port, err := strconv.ParseUint(split[1], 10, 64) + if err != nil { + continue + } + svrconf := nacosconst.ServerConfig{ + IpAddr: split[0], + Port: port, + } + svrConfList = append(svrConfList, svrconf) + } + + client , err := clients.CreateConfigClient(map[string]interface{}{ + "serverConfigs": svrConfList, + "clientConfig": nacosconst.ClientConfig{ + TimeoutMs: uint64(container.NacosClient().Timeout.Nanoseconds() / 1e6), + ListenInterval: 10000, + NotLoadCacheAtStart: true, + LogDir: "logs/nacos/log", //TODO unified log directory + }, + }) + container.NacosClient().Client = &client + if err != nil { + //TODO + } + } + + return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL) +} + +func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*NacosClient, error) { + var ( + err error + n *NacosClient + ) + + n = &NacosClient{ + name: name, + NacosAddrs: nacosAddrs, + Timeout: timeout, + exit: make(chan struct{}), + } + + svrConfList := []nacosconst.ServerConfig{} + for _, nacosAddr := range n.NacosAddrs { + split := strings.Split(nacosAddr,":") + port, err := strconv.ParseUint(split[1], 10, 64) + if err != nil { + continue + } + svrconf := nacosconst.ServerConfig{ + IpAddr: split[0], + Port: port, + } + svrConfList = append(svrConfList, svrconf) + } + client , err := clients.CreateConfigClient(map[string]interface{}{ + "serverConfigs": svrConfList, + "clientConfig": nacosconst.ClientConfig{ + TimeoutMs: uint64(timeout.Nanoseconds() / 1e6), + ListenInterval: 20000, + NotLoadCacheAtStart: true, + LogDir: "logs/nacos/log", //TODO unified log directory + }, + }) + n.Client = &client + if err != nil { + return nil, perrors.WithMessagef(err, "nacos clients.CreateConfigClient(nacosAddrs:%+v)", nacosAddrs) + } + + return n, nil +} + +func newMockNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*NacosClient, error) { + var ( + err error + n *NacosClient + ) + + n = &NacosClient{ + name: name, + NacosAddrs: nacosAddrs, + Timeout: timeout, + exit: make(chan struct{}), + } + + svrConfList := []nacosconst.ServerConfig{} + for _, nacosAddr := range n.NacosAddrs { + split := strings.Split(nacosAddr,":") + port, err := strconv.ParseUint(split[1], 10, 64) + if err != nil { + continue + } + svrconf := nacosconst.ServerConfig{ + IpAddr: split[0], + Port: port, + } + svrConfList = append(svrConfList, svrconf) + } + + client , err := clients.CreateConfigClient(map[string]interface{}{ + "serverConfigs": svrConfList, + "clientConfig": nacosconst.ClientConfig{ + TimeoutMs: uint64(timeout.Nanoseconds() / 1e6), + ListenInterval: 10000, + NotLoadCacheAtStart: true, + LogDir: "logs/nacos/log", ///TODO unified log directory + }, + }) + if err != nil { + return nil, perrors.WithMessagef(err, "nacos clients.CreateConfigClient(nacosAddrs:%+v)", nacosAddrs) + } + n.Client = &client + return n, nil +} + +func (n *NacosClient) Done() <-chan struct{} { + return n.exit +} + +func (n *NacosClient) stop() bool { + select { + case <-n.exit: + return true + default: + close(n.exit) + } + + return false +} + +func (n *NacosClient) NacosClientValid() bool { + select { + case <-n.exit: + return false + default: + } + + valid := true + n.Lock() + if n.Client == nil { + valid = false + } + n.Unlock() + + return valid +} + +func (n *NacosClient) Close() { + if n == nil { + return + } + + n.stop() + n.Lock() + if n.Client != nil { + n.Client = nil + } + n.Unlock() + logger.Warnf("nacosClient{name:%s, zk addr:%s} exit now.", n.name, n.NacosAddrs) +} \ No newline at end of file diff --git a/config_center/nacos/facade.go b/config_center/nacos/facade.go new file mode 100644 index 0000000000..b0487b08f1 --- /dev/null +++ b/config_center/nacos/facade.go @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "sync" + "time" +) +import ( + "github.com/dubbogo/getty" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" +) + +type nacosClientFacade interface { + NacosClient() *NacosClient + SetNacosClient(*NacosClient) + NacosClientLock() *sync.Mutex + WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container + GetDone() chan struct{} //for nacos client control + RestartCallBack() bool + common.Node +} + +func timeSecondDuration(sec int) time.Duration { + return time.Duration(sec) * time.Second +} + +//TODO nacos HandleClientRestart +func HandleClientRestart(r nacosClientFacade) { + var ( + err error + + failTimes int + ) + + defer r.WaitGroup().Done() +LOOP: + for { + select { + case <-r.GetDone(): + logger.Warnf("(NacosProviderRegistry)reconnectNacosRegistry goroutine exit now...") + break LOOP + // re-register all services + case <-r.NacosClient().Done(): + r.NacosClientLock().Lock() + r.NacosClient().Close() + nacosName := r.NacosClient().name + nacosAddress := r.NacosClient().NacosAddrs + r.SetNacosClient(nil) + r.NacosClientLock().Unlock() + + // Connect nacos until success. + failTimes = 0 + for { + select { + case <-r.GetDone(): + logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...") + break LOOP + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // Prevent crazy reconnection nacos. + } + err = ValidateNacosClient(r, WithNacosName(nacosName)) + logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) = error{%#v}", + nacosAddress, perrors.WithStack(err)) + if err == nil { + if r.RestartCallBack() { + break + } + } + failTimes++ + if MaxFailTimes <= failTimes { + failTimes = MaxFailTimes + } + } + } + } +} + + diff --git a/config_center/nacos/factory.go b/config_center/nacos/factory.go new file mode 100644 index 0000000000..62c8835fa5 --- /dev/null +++ b/config_center/nacos/factory.go @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" +) + +func init() { + extension.SetConfigCenterFactory("nacos", func() config_center.DynamicConfigurationFactory { return &nacosDynamicConfigurationFactory{} }) +} + +type nacosDynamicConfigurationFactory struct { +} + +func (f *nacosDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) { + dynamicConfiguration, err := newNacosDynamicConfiguration(url) + if err != nil { + return nil, err + } + dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{}) + return dynamicConfiguration, err + +} diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go new file mode 100644 index 0000000000..f3b79bc256 --- /dev/null +++ b/config_center/nacos/impl.go @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "sync" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/vo" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" +) + +const NacosClientName = "nacos config_center" + +type nacosDynamicConfiguration struct { + url *common.URL + rootPath string + wg sync.WaitGroup + cltLock sync.Mutex + done chan struct{} + client *NacosClient + keyListeners sync.Map + parser parser.ConfigurationParser +} + +func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, error) { + c := &nacosDynamicConfiguration{ + rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", + url: url, + keyListeners:sync.Map{}, + } + err := ValidateNacosClient(c, WithNacosName(NacosClientName)) + if err != nil { + logger.Errorf("nacos client start error ,error message is %v", err) + return nil, err + } + c.wg.Add(1) + go HandleClientRestart(c) + return c, err + +} + +func (n *nacosDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { + n.addListener(key, listener) +} + +func (n *nacosDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { + n.removeListener(key, listener) +} + +// 在nacos group是dubbo DataId是key configfile 或 appconfigfile +func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) { + + tmpOpts := &config_center.Options{} + for _, opt := range opts { + opt(tmpOpts) + } + content, err := (*n.client.Client).GetConfig(vo.ConfigParam{ + DataId: key, + Group: tmpOpts.Group, + }) + if err != nil { + return "", perrors.WithStack(err) + } else { + return string(content), nil + } + +} + +func (n *nacosDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) { + return n.GetProperties(key, opts...) +} + +func (n *nacosDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) { + return n.GetProperties(key, opts...) +} + +func (n *nacosDynamicConfiguration) Parser() parser.ConfigurationParser { + return n.parser +} + +func (n *nacosDynamicConfiguration) SetParser(p parser.ConfigurationParser) { + n.parser = p +} + +func (n *nacosDynamicConfiguration) NacosClient() *NacosClient { + return n.client +} + +func (n *nacosDynamicConfiguration) SetNacosClient(client *NacosClient) { + n.client = client +} + +func (n *nacosDynamicConfiguration) NacosClientLock() *sync.Mutex { + return &n.cltLock +} + +func (n *nacosDynamicConfiguration) WaitGroup() *sync.WaitGroup { + return &n.wg +} + +func (n *nacosDynamicConfiguration) GetDone() chan struct{} { + return n.done +} + +func (n *nacosDynamicConfiguration) GetUrl() common.URL { + return *n.url +} + +func (n *nacosDynamicConfiguration) Destroy() { + close(n.done) + n.wg.Wait() + n.closeConfigs() +} + +func (n *nacosDynamicConfiguration) IsAvailable() bool { + select { + case <-n.done: + return false + default: + return true + } +} + +func (r *nacosDynamicConfiguration) closeConfigs() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + logger.Infof("begin to close provider zk client") + // Close the old client first to close the tmp node + r.client.Close() + r.client = nil +} + +func (r *nacosDynamicConfiguration) RestartCallBack() bool { + return true +} diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go new file mode 100644 index 0000000000..1eaacdca76 --- /dev/null +++ b/config_center/nacos/impl_test.go @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nacos + +import ( + "context" + "fmt" + "sync" + "testing" + "time" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/vo" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" +) + +func initNacosData(t *testing.T) (*nacosDynamicConfiguration, error) { + regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:8848") + nacosConfiguration, err := newNacosDynamicConfiguration(®url) + if err != nil { + fmt.Println("error:newNacosDynamicConfiguration", err.Error()) + assert.NoError(t, err) + return nil, err + } + nacosConfiguration.SetParser(&parser.DefaultConfigurationParser{}) + data := ` + dubbo.service.com.ikurento.user.UserProvider.cluster=failback + dubbo.service.com.ikurento.user.UserProvider.protocol=myDubbo1 + dubbo.protocols.myDubbo.port=20000 + dubbo.protocols.myDubbo.name=dubbo +` + sucess, err := (*nacosConfiguration.client.Client).PublishConfig(vo.ConfigParam{ + DataId: "dubbo.properties", + Group: "dubbo", + Content: data, + }) + assert.NoError(t, err) + if !sucess { + fmt.Println("error: publishconfig error", data) + } + return nacosConfiguration, err +} + +func Test_GetConfig(t *testing.T) { + nacos, err := initNacosData(t) + assert.NoError(t, err) + configs, err := nacos.GetProperties("dubbo.properties", config_center.WithGroup("dubbo")) + m, err := nacos.Parser().Parse(configs) + assert.NoError(t, err) + fmt.Println(m) +} + +func Test_AddListener(t *testing.T) { + nacos, err := initNacosData(t) + assert.NoError(t, err) + listener := &mockDataListener{} + time.Sleep(time.Second * 2) + nacos.AddListener("dubbo.properties", listener) + listener.wg.Add(2) + fmt.Println("begin to listen") + data := ` + dubbo.service.com.ikurento.user.UserProvider.cluster=failback + dubbo.service.com.ikurento.user.UserProvider.protocol=myDubbo + dubbo.protocols.myDubbo.port=20000 + dubbo.protocols.myDubbo.name=dubbo +` + sucess, err := (*nacos.client.Client).PublishConfig(vo.ConfigParam{ + DataId: "dubbo.properties", + Group: "dubbo", + Content: data, + }) + assert.NoError(t, err) + if !sucess { + fmt.Println("error: publishconfig error", data) + } + listener.wg.Wait() + fmt.Println("end", listener.event) + +} + + +func Test_RemoveListener(t *testing.T) { + //TODO not supported in current go_nacos_sdk version +} +type mockDataListener struct { + wg sync.WaitGroup + event string +} + +func (l *mockDataListener) Process(configType *config_center.ConfigChangeEvent) { + fmt.Println("process!!!!!!!!!!") + l.wg.Done() + l.event = configType.Key +} \ No newline at end of file diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go new file mode 100644 index 0000000000..f284f2d752 --- /dev/null +++ b/config_center/nacos/listener.go @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "context" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/vo" +) + +import ( + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/remoting" +) + +func callback(ctx context.Context, listener config_center.ConfigurationListener, namespace, group, dataId, data string) { + listener.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) +} + +func (l *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) { + _, loaded := l.keyListeners.Load(key) + if !loaded { + _, cancel := context.WithCancel(context.Background()) + (*l.client.Client).ListenConfig(vo.ConfigParam{ //TODO 这个listen接口应该要有个context的 + //(*l.client.Client).ListenConfigWithContext(ctx, vo.ConfigParam{ + DataId: key, + Group: "dubbo", + OnChange: func(namespace, group, dataId, data string) { + //go callback(ctx, listener, namespace, group, dataId, data) + go callback(context.TODO(), listener, namespace, group, dataId, data) + }, + }) + newListener := make(map[config_center.ConfigurationListener]context.CancelFunc) + newListener[listener] = cancel + l.keyListeners.Store(key, newListener) + } else { + // TODO check goroutine + } +} + +func (l *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) { + // TODO not supported in current go_nacos_sdk version + //listeners, loaded := l.keyListeners.Load(key) + //if loaded { + // listenerMap := listeners.(map[config_center.ConfigurationListener]context.CancelFunc) + // listenerMap[listener]() + // delete(listeners.(map[config_center.ConfigurationListener]context.CancelFunc), listener) + //} +} From befdd619eaee948108a113e4b991fdaf4cb2ff57 Mon Sep 17 00:00:00 2001 From: amudong Date: Sun, 12 Jan 2020 16:19:15 +0800 Subject: [PATCH 02/14] fix:gofmt nacos configcenter --- config_center/nacos/client.go | 58 ++++++++++++++++---------------- config_center/nacos/facade.go | 2 -- config_center/nacos/impl.go | 22 ++++++------ config_center/nacos/impl_test.go | 18 +++++----- config_center/nacos/listener.go | 6 ++-- 5 files changed, 52 insertions(+), 54 deletions(-) diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index 8dc742e5c4..3764ea4cf3 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -25,19 +25,19 @@ const ( ) type NacosClient struct { - name string - NacosAddrs []string - sync.Mutex // for Client - Client *config_client.IConfigClient - exit chan struct{} - Timeout time.Duration + name string + NacosAddrs []string + sync.Mutex // for Client + Client *config_client.IConfigClient + exit chan struct{} + Timeout time.Duration } type Option func(*Options) type Options struct { nacosName string - client *NacosClient + client *NacosClient } func WithNacosName(name string) Option { @@ -84,19 +84,19 @@ func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { if container.NacosClient().Client == nil { svrConfList := []nacosconst.ServerConfig{} for _, nacosAddr := range container.NacosClient().NacosAddrs { - split := strings.Split(nacosAddr,":") + split := strings.Split(nacosAddr, ":") port, err := strconv.ParseUint(split[1], 10, 64) if err != nil { continue } svrconf := nacosconst.ServerConfig{ IpAddr: split[0], - Port: port, + Port: port, } svrConfList = append(svrConfList, svrconf) } - client , err := clients.CreateConfigClient(map[string]interface{}{ + client, err := clients.CreateConfigClient(map[string]interface{}{ "serverConfigs": svrConfList, "clientConfig": nacosconst.ClientConfig{ TimeoutMs: uint64(container.NacosClient().Timeout.Nanoseconds() / 1e6), @@ -116,31 +116,31 @@ func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*NacosClient, error) { var ( - err error - n *NacosClient + err error + n *NacosClient ) n = &NacosClient{ - name: name, - NacosAddrs: nacosAddrs, - Timeout: timeout, - exit: make(chan struct{}), + name: name, + NacosAddrs: nacosAddrs, + Timeout: timeout, + exit: make(chan struct{}), } svrConfList := []nacosconst.ServerConfig{} for _, nacosAddr := range n.NacosAddrs { - split := strings.Split(nacosAddr,":") + split := strings.Split(nacosAddr, ":") port, err := strconv.ParseUint(split[1], 10, 64) if err != nil { continue } svrconf := nacosconst.ServerConfig{ IpAddr: split[0], - Port: port, + Port: port, } svrConfList = append(svrConfList, svrconf) } - client , err := clients.CreateConfigClient(map[string]interface{}{ + client, err := clients.CreateConfigClient(map[string]interface{}{ "serverConfigs": svrConfList, "clientConfig": nacosconst.ClientConfig{ TimeoutMs: uint64(timeout.Nanoseconds() / 1e6), @@ -159,32 +159,32 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N func newMockNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*NacosClient, error) { var ( - err error - n *NacosClient + err error + n *NacosClient ) n = &NacosClient{ - name: name, - NacosAddrs: nacosAddrs, - Timeout: timeout, - exit: make(chan struct{}), + name: name, + NacosAddrs: nacosAddrs, + Timeout: timeout, + exit: make(chan struct{}), } svrConfList := []nacosconst.ServerConfig{} for _, nacosAddr := range n.NacosAddrs { - split := strings.Split(nacosAddr,":") + split := strings.Split(nacosAddr, ":") port, err := strconv.ParseUint(split[1], 10, 64) if err != nil { continue } svrconf := nacosconst.ServerConfig{ IpAddr: split[0], - Port: port, + Port: port, } svrConfList = append(svrConfList, svrconf) } - client , err := clients.CreateConfigClient(map[string]interface{}{ + client, err := clients.CreateConfigClient(map[string]interface{}{ "serverConfigs": svrConfList, "clientConfig": nacosconst.ClientConfig{ TimeoutMs: uint64(timeout.Nanoseconds() / 1e6), @@ -244,4 +244,4 @@ func (n *NacosClient) Close() { } n.Unlock() logger.Warnf("nacosClient{name:%s, zk addr:%s} exit now.", n.name, n.NacosAddrs) -} \ No newline at end of file +} diff --git a/config_center/nacos/facade.go b/config_center/nacos/facade.go index b0487b08f1..e4f8ab7850 100644 --- a/config_center/nacos/facade.go +++ b/config_center/nacos/facade.go @@ -94,5 +94,3 @@ LOOP: } } } - - diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index f3b79bc256..5d7213d3f4 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -37,21 +37,21 @@ import ( const NacosClientName = "nacos config_center" type nacosDynamicConfiguration struct { - url *common.URL - rootPath string - wg sync.WaitGroup - cltLock sync.Mutex - done chan struct{} - client *NacosClient + url *common.URL + rootPath string + wg sync.WaitGroup + cltLock sync.Mutex + done chan struct{} + client *NacosClient keyListeners sync.Map parser parser.ConfigurationParser } func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, error) { c := &nacosDynamicConfiguration{ - rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", - url: url, - keyListeners:sync.Map{}, + rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", + url: url, + keyListeners: sync.Map{}, } err := ValidateNacosClient(c, WithNacosName(NacosClientName)) if err != nil { @@ -80,8 +80,8 @@ func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_cen opt(tmpOpts) } content, err := (*n.client.Client).GetConfig(vo.ConfigParam{ - DataId: key, - Group: tmpOpts.Group, + DataId: key, + Group: tmpOpts.Group, }) if err != nil { return "", perrors.WithStack(err) diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go index 1eaacdca76..514f924e37 100644 --- a/config_center/nacos/impl_test.go +++ b/config_center/nacos/impl_test.go @@ -51,9 +51,9 @@ func initNacosData(t *testing.T) (*nacosDynamicConfiguration, error) { dubbo.protocols.myDubbo.name=dubbo ` sucess, err := (*nacosConfiguration.client.Client).PublishConfig(vo.ConfigParam{ - DataId: "dubbo.properties", - Group: "dubbo", - Content: data, + DataId: "dubbo.properties", + Group: "dubbo", + Content: data, }) assert.NoError(t, err) if !sucess { @@ -86,9 +86,9 @@ func Test_AddListener(t *testing.T) { dubbo.protocols.myDubbo.name=dubbo ` sucess, err := (*nacos.client.Client).PublishConfig(vo.ConfigParam{ - DataId: "dubbo.properties", - Group: "dubbo", - Content: data, + DataId: "dubbo.properties", + Group: "dubbo", + Content: data, }) assert.NoError(t, err) if !sucess { @@ -99,12 +99,12 @@ func Test_AddListener(t *testing.T) { } - func Test_RemoveListener(t *testing.T) { //TODO not supported in current go_nacos_sdk version } + type mockDataListener struct { - wg sync.WaitGroup + wg sync.WaitGroup event string } @@ -112,4 +112,4 @@ func (l *mockDataListener) Process(configType *config_center.ConfigChangeEvent) fmt.Println("process!!!!!!!!!!") l.wg.Done() l.event = configType.Key -} \ No newline at end of file +} diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go index f284f2d752..86e1162ed1 100644 --- a/config_center/nacos/listener.go +++ b/config_center/nacos/listener.go @@ -39,9 +39,9 @@ func (l *nacosDynamicConfiguration) addListener(key string, listener config_cent if !loaded { _, cancel := context.WithCancel(context.Background()) (*l.client.Client).ListenConfig(vo.ConfigParam{ //TODO 这个listen接口应该要有个context的 - //(*l.client.Client).ListenConfigWithContext(ctx, vo.ConfigParam{ - DataId: key, - Group: "dubbo", + //(*l.client.Client).ListenConfigWithContext(ctx, vo.ConfigParam{ + DataId: key, + Group: "dubbo", OnChange: func(namespace, group, dataId, data string) { //go callback(ctx, listener, namespace, group, dataId, data) go callback(context.TODO(), listener, namespace, group, dataId, data) From d72c3853382db3f92765c89b260bd42177967719 Mon Sep 17 00:00:00 2001 From: amudong Date: Sun, 12 Jan 2020 19:31:12 +0800 Subject: [PATCH 03/14] fix:nacos test server addr --- config_center/nacos/impl_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go index 514f924e37..07195e2d92 100644 --- a/config_center/nacos/impl_test.go +++ b/config_center/nacos/impl_test.go @@ -36,7 +36,7 @@ import ( ) func initNacosData(t *testing.T) (*nacosDynamicConfiguration, error) { - regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:8848") + regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80") nacosConfiguration, err := newNacosDynamicConfiguration(®url) if err != nil { fmt.Println("error:newNacosDynamicConfiguration", err.Error()) From f7a267c07a25345ff462509c3b83bde031284b9a Mon Sep 17 00:00:00 2001 From: amudong Date: Thu, 23 Jan 2020 14:07:21 +0800 Subject: [PATCH 04/14] fix: Code normalization --- config_center/nacos/client.go | 56 ++++----------------------------- config_center/nacos/impl.go | 3 +- config_center/nacos/listener.go | 10 +----- 3 files changed, 8 insertions(+), 61 deletions(-) diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index 3764ea4cf3..b8f2aa99d5 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -87,6 +87,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { split := strings.Split(nacosAddr, ":") port, err := strconv.ParseUint(split[1], 10, 64) if err != nil { + logger.Warnf("nacos addr port parse error ,error message is %v", err) continue } svrconf := nacosconst.ServerConfig{ @@ -99,10 +100,10 @@ func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { client, err := clients.CreateConfigClient(map[string]interface{}{ "serverConfigs": svrConfList, "clientConfig": nacosconst.ClientConfig{ - TimeoutMs: uint64(container.NacosClient().Timeout.Nanoseconds() / 1e6), + TimeoutMs: uint64(container.NacosClient().Timeout.Milliseconds()), ListenInterval: 10000, NotLoadCacheAtStart: true, - LogDir: "logs/nacos/log", //TODO unified log directory + LogDir: "logs/nacos/log", }, }) container.NacosClient().Client = &client @@ -143,10 +144,10 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N client, err := clients.CreateConfigClient(map[string]interface{}{ "serverConfigs": svrConfList, "clientConfig": nacosconst.ClientConfig{ - TimeoutMs: uint64(timeout.Nanoseconds() / 1e6), + TimeoutMs: uint64(timeout.Milliseconds()), ListenInterval: 20000, NotLoadCacheAtStart: true, - LogDir: "logs/nacos/log", //TODO unified log directory + LogDir: "logs/nacos/log", }, }) n.Client = &client @@ -157,49 +158,6 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N return n, nil } -func newMockNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*NacosClient, error) { - var ( - err error - n *NacosClient - ) - - n = &NacosClient{ - name: name, - NacosAddrs: nacosAddrs, - Timeout: timeout, - exit: make(chan struct{}), - } - - svrConfList := []nacosconst.ServerConfig{} - for _, nacosAddr := range n.NacosAddrs { - split := strings.Split(nacosAddr, ":") - port, err := strconv.ParseUint(split[1], 10, 64) - if err != nil { - continue - } - svrconf := nacosconst.ServerConfig{ - IpAddr: split[0], - Port: port, - } - svrConfList = append(svrConfList, svrconf) - } - - client, err := clients.CreateConfigClient(map[string]interface{}{ - "serverConfigs": svrConfList, - "clientConfig": nacosconst.ClientConfig{ - TimeoutMs: uint64(timeout.Nanoseconds() / 1e6), - ListenInterval: 10000, - NotLoadCacheAtStart: true, - LogDir: "logs/nacos/log", ///TODO unified log directory - }, - }) - if err != nil { - return nil, perrors.WithMessagef(err, "nacos clients.CreateConfigClient(nacosAddrs:%+v)", nacosAddrs) - } - n.Client = &client - return n, nil -} - func (n *NacosClient) Done() <-chan struct{} { return n.exit } @@ -239,9 +197,7 @@ func (n *NacosClient) Close() { n.stop() n.Lock() - if n.Client != nil { - n.Client = nil - } + n.Client = nil n.Unlock() logger.Warnf("nacosClient{name:%s, zk addr:%s} exit now.", n.name, n.NacosAddrs) } diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index 5d7213d3f4..e556370ba0 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -51,7 +51,6 @@ func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, c := &nacosDynamicConfiguration{ rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", url: url, - keyListeners: sync.Map{}, } err := ValidateNacosClient(c, WithNacosName(NacosClientName)) if err != nil { @@ -72,7 +71,7 @@ func (n *nacosDynamicConfiguration) RemoveListener(key string, listener config_c n.removeListener(key, listener) } -// 在nacos group是dubbo DataId是key configfile 或 appconfigfile +//nacos distinguishes configuration files based on group and dataId. defalut group = "dubbo" and dataId = key func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) { tmpOpts := &config_center.Options{} diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go index 86e1162ed1..c99553a5f4 100644 --- a/config_center/nacos/listener.go +++ b/config_center/nacos/listener.go @@ -38,12 +38,10 @@ func (l *nacosDynamicConfiguration) addListener(key string, listener config_cent _, loaded := l.keyListeners.Load(key) if !loaded { _, cancel := context.WithCancel(context.Background()) - (*l.client.Client).ListenConfig(vo.ConfigParam{ //TODO 这个listen接口应该要有个context的 - //(*l.client.Client).ListenConfigWithContext(ctx, vo.ConfigParam{ + (*l.client.Client).ListenConfig(vo.ConfigParam{ DataId: key, Group: "dubbo", OnChange: func(namespace, group, dataId, data string) { - //go callback(ctx, listener, namespace, group, dataId, data) go callback(context.TODO(), listener, namespace, group, dataId, data) }, }) @@ -57,10 +55,4 @@ func (l *nacosDynamicConfiguration) addListener(key string, listener config_cent func (l *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) { // TODO not supported in current go_nacos_sdk version - //listeners, loaded := l.keyListeners.Load(key) - //if loaded { - // listenerMap := listeners.(map[config_center.ConfigurationListener]context.CancelFunc) - // listenerMap[listener]() - // delete(listeners.(map[config_center.ConfigurationListener]context.CancelFunc), listener) - //} } From 3fd3cf8d2577324f4bdfbe1b650869aaceb1fda7 Mon Sep 17 00:00:00 2001 From: amudong Date: Thu, 23 Jan 2020 16:54:48 +0800 Subject: [PATCH 05/14] add config_center nacos client_test.go --- config_center/nacos/client.go | 13 ++++++++---- config_center/nacos/client_test.go | 34 ++++++++++++++++++++++++++++++ config_center/nacos/impl.go | 7 +++--- config_center/nacos/listener.go | 4 +++- 4 files changed, 50 insertions(+), 8 deletions(-) create mode 100644 config_center/nacos/client_test.go diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index b8f2aa99d5..e3771d40b6 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -31,6 +31,8 @@ type NacosClient struct { Client *config_client.IConfigClient exit chan struct{} Timeout time.Duration + once sync.Once + onceClose func() } type Option func(*Options) @@ -108,11 +110,11 @@ func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { }) container.NacosClient().Client = &client if err != nil { - //TODO + logger.Errorf("nacos create config client error:%v", err) } } - return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL) + return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.PrimitiveURL) } func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*NacosClient, error) { @@ -126,6 +128,9 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N NacosAddrs: nacosAddrs, Timeout: timeout, exit: make(chan struct{}), + onceClose: func() { + close(n.exit) + }, } svrConfList := []nacosconst.ServerConfig{} @@ -167,7 +172,7 @@ func (n *NacosClient) stop() bool { case <-n.exit: return true default: - close(n.exit) + n.once.Do(n.onceClose) } return false @@ -199,5 +204,5 @@ func (n *NacosClient) Close() { n.Lock() n.Client = nil n.Unlock() - logger.Warnf("nacosClient{name:%s, zk addr:%s} exit now.", n.name, n.NacosAddrs) + logger.Warnf("nacosClient{name:%s, nacos addr:%s} exit now.", n.name, n.NacosAddrs) } diff --git a/config_center/nacos/client_test.go b/config_center/nacos/client_test.go new file mode 100644 index 0000000000..93bdcf654f --- /dev/null +++ b/config_center/nacos/client_test.go @@ -0,0 +1,34 @@ +package nacos + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" +) + +func Test_newNacosClient(t *testing.T) { + registryUrl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80") + c := &nacosDynamicConfiguration{ + url: ®istryUrl, + done: make(chan struct{}), + } + err := ValidateNacosClient(c, WithNacosName(NacosClientName)) + if err != nil { + fmt.Println("nacos client start error ,error message is", err) + } + assert.NoError(t, err) + c.wg.Add(1) + go HandleClientRestart(c) + c.client.Close() + <-c.client.Done() + fmt.Println("nacos client close done") + c.Destroy() +} diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index e556370ba0..9519523b17 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -49,8 +49,9 @@ type nacosDynamicConfiguration struct { func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, error) { c := &nacosDynamicConfiguration{ - rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", - url: url, + rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", + url: url, + done: make(chan struct{}), } err := ValidateNacosClient(c, WithNacosName(NacosClientName)) if err != nil { @@ -148,7 +149,7 @@ func (n *nacosDynamicConfiguration) IsAvailable() bool { func (r *nacosDynamicConfiguration) closeConfigs() { r.cltLock.Lock() defer r.cltLock.Unlock() - logger.Infof("begin to close provider zk client") + logger.Infof("begin to close provider nacos client") // Close the old client first to close the tmp node r.client.Close() r.client = nil diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go index c99553a5f4..ea03e718b7 100644 --- a/config_center/nacos/listener.go +++ b/config_center/nacos/listener.go @@ -26,6 +26,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/remoting" ) @@ -49,7 +50,8 @@ func (l *nacosDynamicConfiguration) addListener(key string, listener config_cent newListener[listener] = cancel l.keyListeners.Store(key, newListener) } else { - // TODO check goroutine + // TODO check goroutine alive, but this version of go_nacos_sdk is not support. + logger.Infof("profile:%s. this profile is already listening", key) } } From c0b96f3dc7f73f8caf0206083e1e048dd651b5ee Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Mon, 10 Feb 2020 16:04:29 +0800 Subject: [PATCH 06/14] fix test case --- config_center/nacos/client_test.go | 3 +-- config_center/nacos/impl_test.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/config_center/nacos/client_test.go b/config_center/nacos/client_test.go index 93bdcf654f..26cac7fb74 100644 --- a/config_center/nacos/client_test.go +++ b/config_center/nacos/client_test.go @@ -1,7 +1,6 @@ package nacos import ( - "context" "fmt" "testing" ) @@ -15,7 +14,7 @@ import ( ) func Test_newNacosClient(t *testing.T) { - registryUrl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80") + registryUrl, _ := common.NewURL("registry://console.nacos.io:80") c := &nacosDynamicConfiguration{ url: ®istryUrl, done: make(chan struct{}), diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go index 07195e2d92..cb1fae7074 100644 --- a/config_center/nacos/impl_test.go +++ b/config_center/nacos/impl_test.go @@ -17,7 +17,6 @@ package nacos import ( - "context" "fmt" "sync" "testing" @@ -36,7 +35,7 @@ import ( ) func initNacosData(t *testing.T) (*nacosDynamicConfiguration, error) { - regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80") + regurl, _ := common.NewURL("registry://console.nacos.io:80") nacosConfiguration, err := newNacosDynamicConfiguration(®url) if err != nil { fmt.Println("error:newNacosDynamicConfiguration", err.Error()) From e167f6c6fc1d9625e2cefbc2cd025dc64fe96609 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Mon, 10 Feb 2020 16:34:18 +0800 Subject: [PATCH 07/14] fix code problems --- config_center/nacos/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index e3771d40b6..c75ee43e89 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -102,7 +102,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { client, err := clients.CreateConfigClient(map[string]interface{}{ "serverConfigs": svrConfList, "clientConfig": nacosconst.ClientConfig{ - TimeoutMs: uint64(container.NacosClient().Timeout.Milliseconds()), + TimeoutMs: uint64(int32(container.NacosClient().Timeout / time.Millisecond)), ListenInterval: 10000, NotLoadCacheAtStart: true, LogDir: "logs/nacos/log", @@ -149,7 +149,7 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N client, err := clients.CreateConfigClient(map[string]interface{}{ "serverConfigs": svrConfList, "clientConfig": nacosconst.ClientConfig{ - TimeoutMs: uint64(timeout.Milliseconds()), + TimeoutMs: uint64(timeout / time.Millisecond), ListenInterval: 20000, NotLoadCacheAtStart: true, LogDir: "logs/nacos/log", From b3022724887555b5bf5bd45d389a0dde5d16ab4b Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Mon, 10 Feb 2020 18:28:08 +0800 Subject: [PATCH 08/14] fix test case --- config_center/nacos/client_test.go | 5 +- config_center/nacos/impl_test.go | 76 +++++++++++++++++------------- 2 files changed, 48 insertions(+), 33 deletions(-) diff --git a/config_center/nacos/client_test.go b/config_center/nacos/client_test.go index 26cac7fb74..d661c49ceb 100644 --- a/config_center/nacos/client_test.go +++ b/config_center/nacos/client_test.go @@ -2,6 +2,7 @@ package nacos import ( "fmt" + "strings" "testing" ) @@ -14,7 +15,9 @@ import ( ) func Test_newNacosClient(t *testing.T) { - registryUrl, _ := common.NewURL("registry://console.nacos.io:80") + server := mockCommonNacosServer() + nacosURL := strings.ReplaceAll(server.URL, "http", "registry") + registryUrl, _ := common.NewURL(nacosURL) c := &nacosDynamicConfiguration{ url: ®istryUrl, done: make(chan struct{}), diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go index cb1fae7074..27a9a02355 100644 --- a/config_center/nacos/impl_test.go +++ b/config_center/nacos/impl_test.go @@ -18,13 +18,15 @@ package nacos import ( "fmt" + "net/http" + "net/http/httptest" + "strings" "sync" "testing" "time" ) import ( - "github.com/nacos-group/nacos-sdk-go/vo" "github.com/stretchr/testify/assert" ) @@ -34,8 +36,46 @@ import ( "github.com/apache/dubbo-go/config_center/parser" ) +// run mock config server +func runMockConfigServer(configHandler func(http.ResponseWriter, *http.Request), + configListenHandler func(http.ResponseWriter, *http.Request)) *httptest.Server { + uriHandlerMap := make(map[string]func(http.ResponseWriter, *http.Request), 0) + + uriHandlerMap["/nacos/v1/cs/configs"] = configHandler + uriHandlerMap["/nacos/v1/cs/configs/listener"] = configListenHandler + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + uri := r.RequestURI + for path, handler := range uriHandlerMap { + if uri == path { + handler(w, r) + break + } + } + })) + + return ts +} + +func mockCommonNacosServer() *httptest.Server { + return runMockConfigServer(func(writer http.ResponseWriter, request *http.Request) { + data := ` + dubbo.service.com.ikurento.user.UserProvider.cluster=failback + dubbo.service.com.ikurento.user.UserProvider.protocol=myDubbo1 + dubbo.protocols.myDubbo.port=20000 + dubbo.protocols.myDubbo.name=dubbo +` + fmt.Fprintf(writer, "%s", data) + }, func(writer http.ResponseWriter, request *http.Request) { + data := `dubbo.properties%02dubbo%02dubbo.service.com.ikurento.user.UserProvider.cluster=failback` + fmt.Fprintf(writer, "%s", data) + }) +} + func initNacosData(t *testing.T) (*nacosDynamicConfiguration, error) { - regurl, _ := common.NewURL("registry://console.nacos.io:80") + server := mockCommonNacosServer() + nacosURL := strings.ReplaceAll(server.URL, "http", "registry") + regurl, _ := common.NewURL(nacosURL) nacosConfiguration, err := newNacosDynamicConfiguration(®url) if err != nil { fmt.Println("error:newNacosDynamicConfiguration", err.Error()) @@ -43,21 +83,8 @@ func initNacosData(t *testing.T) (*nacosDynamicConfiguration, error) { return nil, err } nacosConfiguration.SetParser(&parser.DefaultConfigurationParser{}) - data := ` - dubbo.service.com.ikurento.user.UserProvider.cluster=failback - dubbo.service.com.ikurento.user.UserProvider.protocol=myDubbo1 - dubbo.protocols.myDubbo.port=20000 - dubbo.protocols.myDubbo.name=dubbo -` - sucess, err := (*nacosConfiguration.client.Client).PublishConfig(vo.ConfigParam{ - DataId: "dubbo.properties", - Group: "dubbo", - Content: data, - }) + assert.NoError(t, err) - if !sucess { - fmt.Println("error: publishconfig error", data) - } return nacosConfiguration, err } @@ -76,23 +103,8 @@ func Test_AddListener(t *testing.T) { listener := &mockDataListener{} time.Sleep(time.Second * 2) nacos.AddListener("dubbo.properties", listener) - listener.wg.Add(2) + listener.wg.Add(1) fmt.Println("begin to listen") - data := ` - dubbo.service.com.ikurento.user.UserProvider.cluster=failback - dubbo.service.com.ikurento.user.UserProvider.protocol=myDubbo - dubbo.protocols.myDubbo.port=20000 - dubbo.protocols.myDubbo.name=dubbo -` - sucess, err := (*nacos.client.Client).PublishConfig(vo.ConfigParam{ - DataId: "dubbo.properties", - Group: "dubbo", - Content: data, - }) - assert.NoError(t, err) - if !sucess { - fmt.Println("error: publishconfig error", data) - } listener.wg.Wait() fmt.Println("end", listener.event) From abc8d647cc3c3a1584b725bd56c543c5e4fd0254 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Tue, 11 Feb 2020 21:37:54 +0800 Subject: [PATCH 09/14] add comment --- config_center/nacos/client.go | 60 +++++++++++++++++++-------------- config_center/nacos/facade.go | 27 ++++++++------- config_center/nacos/factory.go | 1 + config_center/nacos/impl.go | 29 ++++++++++------ config_center/nacos/listener.go | 7 ++-- 5 files changed, 71 insertions(+), 53 deletions(-) diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index c75ee43e89..7110a7c633 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -19,52 +19,58 @@ import ( "github.com/apache/dubbo-go/common/logger" ) -const ( - ConnDelay = 3 - MaxFailTimes = 15 -) - +// NacosClient Nacos client type NacosClient struct { name string NacosAddrs []string sync.Mutex // for Client - Client *config_client.IConfigClient + client *config_client.IConfigClient exit chan struct{} Timeout time.Duration once sync.Once onceClose func() } -type Option func(*Options) +// Client Get Client +func (n *NacosClient) Client() *config_client.IConfigClient { + return n.client +} + +// SetClient Set client +func (n *NacosClient) SetClient(client *config_client.IConfigClient) { + n.Lock() + n.client = client + n.Unlock() +} + +type option func(*options) -type Options struct { +type options struct { nacosName string client *NacosClient } -func WithNacosName(name string) Option { - return func(opt *Options) { +// WithNacosName Set nacos name +func WithNacosName(name string) option { + return func(opt *options) { opt.nacosName = name } } -func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { +// ValidateNacosClient Validate nacos client , if null then create it +func ValidateNacosClient(container nacosClientFacade, opts ...option) error { var ( err error ) - opions := &Options{} + os := &options{} for _, opt := range opts { - opt(opions) + opt(os) } err = nil - lock := container.NacosClientLock() url := container.GetUrl() - lock.Lock() - defer lock.Unlock() - if container.NacosClient() == nil { //in dubbo ,every registry only connect one node ,so this is []string{r.Address} timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) @@ -74,16 +80,16 @@ func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) } nacosAddresses := strings.Split(url.Location, ",") - newClient, err := newNacosClient(opions.nacosName, nacosAddresses, timeout) + newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout) if err != nil { logger.Warnf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}", - opions.nacosName, url.Location, timeout.String(), err) + os.nacosName, url.Location, timeout.String(), err) return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) } container.SetNacosClient(newClient) } - if container.NacosClient().Client == nil { + if container.NacosClient().Client() == nil { svrConfList := []nacosconst.ServerConfig{} for _, nacosAddr := range container.NacosClient().NacosAddrs { split := strings.Split(nacosAddr, ":") @@ -108,7 +114,8 @@ func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { LogDir: "logs/nacos/log", }, }) - container.NacosClient().Client = &client + + container.NacosClient().SetClient(&client) if err != nil { logger.Errorf("nacos create config client error:%v", err) } @@ -155,7 +162,7 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N LogDir: "logs/nacos/log", }, }) - n.Client = &client + n.SetClient(&client) if err != nil { return nil, perrors.WithMessagef(err, "nacos clients.CreateConfigClient(nacosAddrs:%+v)", nacosAddrs) } @@ -163,6 +170,7 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N return n, nil } +// Done Get nacos client exit signal func (n *NacosClient) Done() <-chan struct{} { return n.exit } @@ -178,6 +186,7 @@ func (n *NacosClient) stop() bool { return false } +// NacosClientValid Get nacos client valid status func (n *NacosClient) NacosClientValid() bool { select { case <-n.exit: @@ -187,7 +196,7 @@ func (n *NacosClient) NacosClientValid() bool { valid := true n.Lock() - if n.Client == nil { + if n.Client() == nil { valid = false } n.Unlock() @@ -195,14 +204,13 @@ func (n *NacosClient) NacosClientValid() bool { return valid } +// Close Close nacos client , then set null func (n *NacosClient) Close() { if n == nil { return } n.stop() - n.Lock() - n.Client = nil - n.Unlock() + n.SetClient(nil) logger.Warnf("nacosClient{name:%s, nacos addr:%s} exit now.", n.name, n.NacosAddrs) } diff --git a/config_center/nacos/facade.go b/config_center/nacos/facade.go index e4f8ab7850..fc83e14eac 100644 --- a/config_center/nacos/facade.go +++ b/config_center/nacos/facade.go @@ -31,13 +31,18 @@ import ( "github.com/apache/dubbo-go/common/logger" ) +const ( + connDelay = 3 + maxFailTimes = 15 +) + type nacosClientFacade interface { NacosClient() *NacosClient SetNacosClient(*NacosClient) - NacosClientLock() *sync.Mutex - WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container - GetDone() chan struct{} //for nacos client control - RestartCallBack() bool + // WaitGroup for wait group control, zk client listener & zk client container + WaitGroup() *sync.WaitGroup + // GetDone For nacos client control RestartCallBack() bool + GetDone() chan struct{} common.Node } @@ -45,7 +50,7 @@ func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second } -//TODO nacos HandleClientRestart +// HandleClientRestart Restart client handler func HandleClientRestart(r nacosClientFacade) { var ( err error @@ -62,12 +67,10 @@ LOOP: break LOOP // re-register all services case <-r.NacosClient().Done(): - r.NacosClientLock().Lock() r.NacosClient().Close() nacosName := r.NacosClient().name nacosAddress := r.NacosClient().NacosAddrs r.SetNacosClient(nil) - r.NacosClientLock().Unlock() // Connect nacos until success. failTimes = 0 @@ -76,19 +79,17 @@ LOOP: case <-r.GetDone(): logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...") break LOOP - case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // Prevent crazy reconnection nacos. + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * connDelay)): // Prevent crazy reconnection nacos. } err = ValidateNacosClient(r, WithNacosName(nacosName)) logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) = error{%#v}", nacosAddress, perrors.WithStack(err)) if err == nil { - if r.RestartCallBack() { - break - } + break } failTimes++ - if MaxFailTimes <= failTimes { - failTimes = MaxFailTimes + if maxFailTimes <= failTimes { + failTimes = maxFailTimes } } } diff --git a/config_center/nacos/factory.go b/config_center/nacos/factory.go index 62c8835fa5..3de91ea013 100644 --- a/config_center/nacos/factory.go +++ b/config_center/nacos/factory.go @@ -31,6 +31,7 @@ func init() { type nacosDynamicConfigurationFactory struct { } +// GetDynamicConfiguration Get Configuration with URL func (f *nacosDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) { dynamicConfiguration, err := newNacosDynamicConfiguration(url) if err != nil { diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index 9519523b17..d4ff3d9e9d 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -34,7 +34,7 @@ import ( "github.com/apache/dubbo-go/config_center/parser" ) -const NacosClientName = "nacos config_center" +const nacosClientName = "nacos config_center" type nacosDynamicConfiguration struct { url *common.URL @@ -53,7 +53,7 @@ func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, url: url, done: make(chan struct{}), } - err := ValidateNacosClient(c, WithNacosName(NacosClientName)) + err := ValidateNacosClient(c, WithNacosName(nacosClientName)) if err != nil { logger.Errorf("nacos client start error ,error message is %v", err) return nil, err @@ -64,10 +64,12 @@ func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, } +// AddListener Add listener func (n *nacosDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { n.addListener(key, listener) } +// RemoveListener Remove listener func (n *nacosDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { n.removeListener(key, listener) } @@ -79,7 +81,7 @@ func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_cen for _, opt := range opts { opt(tmpOpts) } - content, err := (*n.client.Client).GetConfig(vo.ConfigParam{ + content, err := (*n.client.Client()).GetConfig(vo.ConfigParam{ DataId: key, Group: tmpOpts.Group, }) @@ -91,52 +93,61 @@ func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_cen } +// GetInternalProperty Get properties value by key func (n *nacosDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) { return n.GetProperties(key, opts...) } +// GetRule Get router rule func (n *nacosDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) { return n.GetProperties(key, opts...) } +// Parser Get Parser func (n *nacosDynamicConfiguration) Parser() parser.ConfigurationParser { return n.parser } +// SetParser Set Parser func (n *nacosDynamicConfiguration) SetParser(p parser.ConfigurationParser) { n.parser = p } +// NacosClient Get Nacos Client func (n *nacosDynamicConfiguration) NacosClient() *NacosClient { return n.client } +// SetNacosClient Set Nacos Client func (n *nacosDynamicConfiguration) SetNacosClient(client *NacosClient) { + n.cltLock.Lock() n.client = client + n.cltLock.Unlock() } -func (n *nacosDynamicConfiguration) NacosClientLock() *sync.Mutex { - return &n.cltLock -} - +// WaitGroup for wait group control, zk client listener & zk client container func (n *nacosDynamicConfiguration) WaitGroup() *sync.WaitGroup { return &n.wg } +// GetDone For nacos client control RestartCallBack() bool func (n *nacosDynamicConfiguration) GetDone() chan struct{} { return n.done } +// GetUrl Get Url func (n *nacosDynamicConfiguration) GetUrl() common.URL { return *n.url } +// Destroy Destroy configuration instance func (n *nacosDynamicConfiguration) Destroy() { close(n.done) n.wg.Wait() n.closeConfigs() } +// IsAvailable Get available status func (n *nacosDynamicConfiguration) IsAvailable() bool { select { case <-n.done: @@ -154,7 +165,3 @@ func (r *nacosDynamicConfiguration) closeConfigs() { r.client.Close() r.client = nil } - -func (r *nacosDynamicConfiguration) RestartCallBack() bool { - return true -} diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go index ea03e718b7..90af27dc0b 100644 --- a/config_center/nacos/listener.go +++ b/config_center/nacos/listener.go @@ -31,7 +31,7 @@ import ( "github.com/apache/dubbo-go/remoting" ) -func callback(ctx context.Context, listener config_center.ConfigurationListener, namespace, group, dataId, data string) { +func callback(listener config_center.ConfigurationListener, namespace, group, dataId, data string) { listener.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) } @@ -39,13 +39,14 @@ func (l *nacosDynamicConfiguration) addListener(key string, listener config_cent _, loaded := l.keyListeners.Load(key) if !loaded { _, cancel := context.WithCancel(context.Background()) - (*l.client.Client).ListenConfig(vo.ConfigParam{ + err := (*l.client.Client()).ListenConfig(vo.ConfigParam{ DataId: key, Group: "dubbo", OnChange: func(namespace, group, dataId, data string) { - go callback(context.TODO(), listener, namespace, group, dataId, data) + go callback(listener, namespace, group, dataId, data) }, }) + logger.Errorf("nacos : listen config fail, error:%v ", err) newListener := make(map[config_center.ConfigurationListener]context.CancelFunc) newListener[listener] = cancel l.keyListeners.Store(key, newListener) From bd165e54014a428f3c67bd66a316f19e779ce7a4 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Tue, 11 Feb 2020 22:09:32 +0800 Subject: [PATCH 10/14] fix test case --- config_center/nacos/client.go | 6 ++++-- config_center/nacos/client_test.go | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index 7110a7c633..0458e14891 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -19,6 +19,8 @@ import ( "github.com/apache/dubbo-go/common/logger" ) +const logDir = "logs/nacos/log" + // NacosClient Nacos client type NacosClient struct { name string @@ -111,7 +113,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error { TimeoutMs: uint64(int32(container.NacosClient().Timeout / time.Millisecond)), ListenInterval: 10000, NotLoadCacheAtStart: true, - LogDir: "logs/nacos/log", + LogDir: logDir, }, }) @@ -159,7 +161,7 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N TimeoutMs: uint64(timeout / time.Millisecond), ListenInterval: 20000, NotLoadCacheAtStart: true, - LogDir: "logs/nacos/log", + LogDir: logDir, }, }) n.SetClient(&client) diff --git a/config_center/nacos/client_test.go b/config_center/nacos/client_test.go index d661c49ceb..6408b244d6 100644 --- a/config_center/nacos/client_test.go +++ b/config_center/nacos/client_test.go @@ -22,7 +22,7 @@ func Test_newNacosClient(t *testing.T) { url: ®istryUrl, done: make(chan struct{}), } - err := ValidateNacosClient(c, WithNacosName(NacosClientName)) + err := ValidateNacosClient(c, WithNacosName(nacosClientName)) if err != nil { fmt.Println("nacos client start error ,error message is", err) } From a7b3fd84b34944673952250fed74cf56d730a5e6 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Wed, 12 Feb 2020 21:32:40 +0800 Subject: [PATCH 11/14] fix review comment --- config_center/nacos/client.go | 11 +++++------ config_center/nacos/impl.go | 9 +++++---- config_center/nacos/listener.go | 3 ++- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index 0458e14891..d87ee74dda 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -61,16 +61,14 @@ func WithNacosName(name string) option { // ValidateNacosClient Validate nacos client , if null then create it func ValidateNacosClient(container nacosClientFacade, opts ...option) error { - var ( - err error - ) + if container == nil { + return perrors.Errorf("container can not be null") + } os := &options{} for _, opt := range opts { opt(os) } - err = nil - url := container.GetUrl() if container.NacosClient() == nil { @@ -123,7 +121,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error { } } - return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.PrimitiveURL) + return perrors.WithMessagef(nil, "newNacosClient(address:%+v)", url.PrimitiveURL) } func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*NacosClient, error) { @@ -147,6 +145,7 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N split := strings.Split(nacosAddr, ":") port, err := strconv.ParseUint(split[1], 10, 64) if err != nil { + logger.Warnf("convert port , source:%s , error:%v ", split[1], err) continue } svrconf := nacosconst.ServerConfig{ diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index d4ff3d9e9d..b838e9d4cb 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -159,9 +159,10 @@ func (n *nacosDynamicConfiguration) IsAvailable() bool { func (r *nacosDynamicConfiguration) closeConfigs() { r.cltLock.Lock() - defer r.cltLock.Unlock() - logger.Infof("begin to close provider nacos client") - // Close the old client first to close the tmp node - r.client.Close() + client := r.client r.client = nil + r.cltLock.Unlock() + // Close the old client first to close the tmp node + client.Close() + logger.Infof("begin to close provider nacos client") } diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go index 90af27dc0b..25c586586c 100644 --- a/config_center/nacos/listener.go +++ b/config_center/nacos/listener.go @@ -57,5 +57,6 @@ func (l *nacosDynamicConfiguration) addListener(key string, listener config_cent } func (l *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) { - // TODO not supported in current go_nacos_sdk version + // TODO: not supported in current go_nacos_sdk version + logger.Warn("not supported in current go_nacos_sdk version") } From b2584ae364b5074e8d9c61d83639ef9bb041c5ba Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Wed, 12 Feb 2020 22:20:39 +0800 Subject: [PATCH 12/14] fix code problems --- config_center/nacos/impl.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index b838e9d4cb..60ab89b003 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -76,7 +76,16 @@ func (n *nacosDynamicConfiguration) RemoveListener(key string, listener config_c //nacos distinguishes configuration files based on group and dataId. defalut group = "dubbo" and dataId = key func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) { + return n.GetRule(key, opts...) +} + +// GetInternalProperty Get properties value by key +func (n *nacosDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) { + return n.GetProperties(key, opts...) +} +// GetRule Get router rule +func (n *nacosDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) { tmpOpts := &config_center.Options{} for _, opt := range opts { opt(tmpOpts) @@ -90,17 +99,6 @@ func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_cen } else { return string(content), nil } - -} - -// GetInternalProperty Get properties value by key -func (n *nacosDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) { - return n.GetProperties(key, opts...) -} - -// GetRule Get router rule -func (n *nacosDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) { - return n.GetProperties(key, opts...) } // Parser Get Parser From f7c588d2669dd02cb977fc225b9f560cc1098312 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Wed, 12 Feb 2020 23:23:02 +0800 Subject: [PATCH 13/14] fix review comment --- config_center/nacos/client_test.go | 5 ----- config_center/nacos/impl_test.go | 12 ++---------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/config_center/nacos/client_test.go b/config_center/nacos/client_test.go index 6408b244d6..47a05dec66 100644 --- a/config_center/nacos/client_test.go +++ b/config_center/nacos/client_test.go @@ -1,7 +1,6 @@ package nacos import ( - "fmt" "strings" "testing" ) @@ -23,14 +22,10 @@ func Test_newNacosClient(t *testing.T) { done: make(chan struct{}), } err := ValidateNacosClient(c, WithNacosName(nacosClientName)) - if err != nil { - fmt.Println("nacos client start error ,error message is", err) - } assert.NoError(t, err) c.wg.Add(1) go HandleClientRestart(c) c.client.Close() <-c.client.Done() - fmt.Println("nacos client close done") c.Destroy() } diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go index 27a9a02355..e5f5eb6328 100644 --- a/config_center/nacos/impl_test.go +++ b/config_center/nacos/impl_test.go @@ -77,14 +77,10 @@ func initNacosData(t *testing.T) (*nacosDynamicConfiguration, error) { nacosURL := strings.ReplaceAll(server.URL, "http", "registry") regurl, _ := common.NewURL(nacosURL) nacosConfiguration, err := newNacosDynamicConfiguration(®url) - if err != nil { - fmt.Println("error:newNacosDynamicConfiguration", err.Error()) - assert.NoError(t, err) - return nil, err - } + assert.NoError(t, err) + nacosConfiguration.SetParser(&parser.DefaultConfigurationParser{}) - assert.NoError(t, err) return nacosConfiguration, err } @@ -104,10 +100,7 @@ func Test_AddListener(t *testing.T) { time.Sleep(time.Second * 2) nacos.AddListener("dubbo.properties", listener) listener.wg.Add(1) - fmt.Println("begin to listen") listener.wg.Wait() - fmt.Println("end", listener.event) - } func Test_RemoveListener(t *testing.T) { @@ -120,7 +113,6 @@ type mockDataListener struct { } func (l *mockDataListener) Process(configType *config_center.ConfigChangeEvent) { - fmt.Println("process!!!!!!!!!!") l.wg.Done() l.event = configType.Key } From f0a4d8fe8a7aeb385f15bde2d1e87248da48a11d Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Wed, 12 Feb 2020 23:33:16 +0800 Subject: [PATCH 14/14] fix review comment --- config_center/nacos/impl_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go index e5f5eb6328..07bf8638d2 100644 --- a/config_center/nacos/impl_test.go +++ b/config_center/nacos/impl_test.go @@ -88,9 +88,8 @@ func Test_GetConfig(t *testing.T) { nacos, err := initNacosData(t) assert.NoError(t, err) configs, err := nacos.GetProperties("dubbo.properties", config_center.WithGroup("dubbo")) - m, err := nacos.Parser().Parse(configs) + _, err = nacos.Parser().Parse(configs) assert.NoError(t, err) - fmt.Println(m) } func Test_AddListener(t *testing.T) {