diff --git a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java index 7c8bd6e6..139ce5c9 100644 --- a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java +++ b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java @@ -17,37 +17,61 @@ package org.apache.dubbo.rpc.cluster.specifyaddress; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.URLBuilder; +import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.threadpool.manager.ExecutorRepository; +import org.apache.dubbo.common.utils.ClassUtils; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.cluster.router.AbstractRouter; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Iterator; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -public class UserSpecifiedAddressRouter extends AbstractRouter { +import static org.apache.dubbo.common.constants.CommonConstants.DUBBO; +import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; + + +public class UserSpecifiedAddressRouter extends AbstractRouter { private final static Logger logger = LoggerFactory.getLogger(UserSpecifiedAddressRouter.class); // protected for ut purpose protected static int EXPIRE_TIME = 10 * 60 * 1000; - private volatile List> invokers = Collections.emptyList(); - private volatile Map> ip2Invoker; - private volatile Map> address2Invoker; + private volatile List> invokers = Collections.emptyList(); + private volatile Map> ip2Invoker; + private volatile Map> address2Invoker; + private final Protocol protocol; private final Lock cacheLock = new ReentrantLock(); + private final ScheduledExecutorService scheduledExecutorService; + private final AtomicBoolean launchRemovalTask = new AtomicBoolean(false); + + + private final Map> newInvokerCache = new LinkedHashMap<>(16, 0.75f, true); + public UserSpecifiedAddressRouter(URL referenceUrl) { super(referenceUrl); + this.protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); + this.scheduledExecutorService = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().nextScheduledExecutor(); } @Override @@ -73,14 +97,14 @@ public List> route(List> invokers, URL url, Invocation // 2. check if set address url if (address.getUrlAddress() != null) { - Invoker invoker = getInvokerByURL(address, invocation); + Invoker invoker = getInvokerByURL(address); result.add((Invoker) invoker); return result; } // 3. check if set ip and port if (StringUtils.isNotEmpty(address.getIp())) { - Invoker invoker = getInvokerByIp(address, invocation); + Invoker invoker = getInvokerByIp(address); result.add((Invoker) invoker); return result; } @@ -88,7 +112,7 @@ public List> route(List> invokers, URL url, Invocation return invokers; } - private Invoker getInvokerByURL(Address address, Invocation invocation) { + private Invoker getInvokerByURL(Address address) { tryLoadSpecifiedMap(); // try to find in directory @@ -112,11 +136,11 @@ private Invoker getInvokerByURL(Address address, Invocation invocation) { } } - // create new one - throw new RpcException("User specified server address not support refer new url in Dubbo 2.x. Please upgrade to Dubbo 3.x and use dubbo-cluster-specify-address-dubbo3."); + URL newUrl = rebuildAddress(address, getUrl()); + return getOrBuildInvokerCache(newUrl); } - public Invoker getInvokerByIp(Address address, Invocation invocation) { + public Invoker getInvokerByIp(Address address) { tryLoadSpecifiedMap(); String ip = address.getIp(); @@ -136,29 +160,31 @@ public Invoker getInvokerByIp(Address address, Invocation invocation) { } if (!address.isNeedToCreate()) { - throwException(invocation, address); + throwException(address); } - throw new RpcException("User specified server address not support refer new url in Dubbo 2.x. Please upgrade to Dubbo 3.x and use dubbo-cluster-specify-address-dubbo3."); + URL newUrl = buildAddress(invokers, address, getUrl()); + return getOrBuildInvokerCache(newUrl); } - private void throwException(Invocation invocation, Address address) { + + private void throwException(Address address) { throw new RpcException("user specified server address : [" + address + "] is not a valid provider for service: [" - + getUrl().getServiceKey() + "]"); + + getUrl().getServiceKey() + "]"); } - private Map> processIp(List> invokerList) { - Map> ip2Invoker = new HashMap<>(); - for (Invoker invoker : invokerList) { + private Map> processIp(List> invokerList) { + Map> ip2Invoker = new HashMap<>(); + for (Invoker invoker : invokerList) { ip2Invoker.put(invoker.getUrl().getHost(), invoker); } return Collections.unmodifiableMap(ip2Invoker); } - private Map> processAddress(List> addresses) { - Map> address2Invoker = new HashMap<>(); - for (Invoker invoker : addresses) { + private Map> processAddress(List> addresses) { + Map> address2Invoker = new HashMap<>(); + for (Invoker invoker : addresses) { address2Invoker.put(invoker.getUrl().getHost() + ":" + invoker.getUrl().getPort(), invoker); } return Collections.unmodifiableMap(address2Invoker); @@ -166,19 +192,19 @@ private Map> processAddress(List> addresses) { // For ut only @Deprecated - protected Map> getIp2Invoker() { + protected Map> getIp2Invoker() { return ip2Invoker; } // For ut only @Deprecated - protected Map> getAddress2Invoker() { + protected Map> getAddress2Invoker() { return address2Invoker; } // For ut only @Deprecated - protected List> getInvokers() { + protected List> getInvokers() { return invokers; } @@ -190,7 +216,7 @@ private void tryLoadSpecifiedMap() { if (ip2Invoker != null) { return; } - List> invokers = this.invokers; + List> invokers = this.invokers; if (CollectionUtils.isEmpty(invokers)) { address2Invoker = Collections.unmodifiableMap(new HashMap<>()); ip2Invoker = Collections.unmodifiableMap(new HashMap<>()); @@ -200,4 +226,108 @@ private void tryLoadSpecifiedMap() { ip2Invoker = processIp(invokers); } } + + + public URL buildAddress(List> invokers, Address address, URL consumerUrl) { + if (!invokers.isEmpty()) { + URL template = invokers.iterator().next().getUrl(); + template = template.setHost(address.getIp()); + if (address.getPort() != 0) { + template = template.setPort(address.getPort()); + } + return template; + } else { + String ip = address.getIp(); + int port = address.getPort(); + if (port == 0) { + port = ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension().getDefaultPort(); + } + return copyConsumerUrl(consumerUrl, ip, port, new HashMap<>()); + } + } + + private URL copyConsumerUrl(URL url, String ip, int port, Map parameters) { + return URLBuilder.from(url) + .setHost(ip) + .setPort(port) + .setProtocol(url.getProtocol() == null ? DUBBO : url.getProtocol()) + .setPath(url.getPath()) + .clearParameters() + .addParameters(parameters) + .removeParameter(MONITOR_KEY) + .build(); + } + + public URL rebuildAddress(Address address, URL consumerUrl) { + URL url = address.getUrlAddress(); + Map parameters = new HashMap<>(url.getParameters()); + parameters.put(VERSION_KEY, consumerUrl.getParameter(VERSION_KEY, "0.0.0")); + parameters.put(GROUP_KEY, consumerUrl.getParameter(GROUP_KEY)); + parameters.putAll(consumerUrl.getParameters()); + return copyConsumerUrl(consumerUrl, url.getHost(), url.getPort(),parameters); + } + + private Invoker getOrBuildInvokerCache(URL url) { + logger.info("Unable to find a proper invoker from directory. Try to create new invoker. New URL: " + url); + + InvokerCache cache; + cacheLock.lock(); + try { + cache = newInvokerCache.get(url); + } finally { + cacheLock.unlock(); + } + if (cache == null) { + Invoker invoker = refer(url); + cacheLock.lock(); + try { + cache = newInvokerCache.get(url); + if (cache == null) { + cache = new InvokerCache<>(invoker); + newInvokerCache.put(url, cache); + if (launchRemovalTask.compareAndSet(false, true)) { + scheduledExecutorService.scheduleAtFixedRate(new RemovalTask(), EXPIRE_TIME / 2, EXPIRE_TIME / 2, TimeUnit.MILLISECONDS); + } + } else { + invoker.destroy(); + } + } finally { + cacheLock.unlock(); + } + } + return cache.getInvoker(); + } + + private Invoker refer(URL url) { + + try { + Class interfaceClass = Class.forName(getUrl().getServiceInterface(), true, ClassUtils.getClassLoader()); + return this.protocol.refer(interfaceClass, url); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + private class RemovalTask implements Runnable { + @Override + public void run() { + cacheLock.lock(); + try { + if (newInvokerCache.size() > 0) { + Iterator>> iterator = newInvokerCache.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + if (System.currentTimeMillis() - entry.getValue().getLastAccess() > EXPIRE_TIME) { + iterator.remove(); + entry.getValue().getInvoker().destroy(); + } else { + break; + } + } + } + } finally { + cacheLock.unlock(); + } + } + } } diff --git a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/resources/META-INF/dubbo/org.apache.dubbo.common.threadpool.manager.ExecutorRepository b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/resources/META-INF/dubbo/org.apache.dubbo.common.threadpool.manager.ExecutorRepository new file mode 100644 index 00000000..44199b02 --- /dev/null +++ b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/resources/META-INF/dubbo/org.apache.dubbo.common.threadpool.manager.ExecutorRepository @@ -0,0 +1 @@ +default=org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository \ No newline at end of file diff --git a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java index 372eb0fb..34cef9e8 100644 --- a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java +++ b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java @@ -37,8 +37,8 @@ public class UserSpecifiedAddressRouterTest { @BeforeEach public void setup() { - consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test", "Value").addParameter("check", "false") - .addParameter("version", "1.0.0").addParameter("group", "Dubbo"); + consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test", "Value").addParameter("check", "false").addParameter("lazy","true") + .addParameter("version", "1.0.0").addParameter("group", "Dubbo").addParameter("interface", DemoService.class.getName()); } @Test @@ -56,7 +56,7 @@ public void testNotify() { // no address Assertions.assertThrows(RpcException.class, () -> - userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, Mockito.mock(Invocation.class))); + userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, Mockito.mock(Invocation.class))); Assertions.assertNotNull(userSpecifiedAddressRouter.getAddress2Invoker()); Assertions.assertNotNull(userSpecifiedAddressRouter.getIp2Invoker()); @@ -72,18 +72,23 @@ public void testGetInvokerByURL() { UserSpecifiedAddressRouter userSpecifiedAddressRouter = new UserSpecifiedAddressRouter(consumerUrl); Assertions.assertEquals(Collections.emptyList(), - userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, Mockito.mock(Invocation.class))); + userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, Mockito.mock(Invocation.class))); - UserSpecifiedAddressUtil.setAddress(new Address(URL.valueOf("127.0.0.1:20880"))); - Assertions.assertThrows(RpcException.class, () -> - userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, Mockito.mock(Invocation.class))); + UserSpecifiedAddressUtil.setAddress(new Address(URL.valueOf("127.0.0.1:20880?lazy=true"))); + List> invokers = userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, Mockito.mock(Invocation.class)); + Assertions.assertEquals(1, invokers.size()); + Assertions.assertEquals("127.0.0.1", invokers.get(0).getUrl().getHost()); + Assertions.assertEquals(20880, invokers.get(0).getUrl().getPort()); + Assertions.assertEquals("Value", invokers.get(0).getUrl().getParameter("Test")); + Assertions.assertEquals(consumerUrl.getParameter("version"), invokers.get(0).getUrl().getParameter("version")); + Assertions.assertEquals(consumerUrl.getParameter("group"), invokers.get(0).getUrl().getParameter("group")); Invoker mockInvoker = Mockito.mock(Invoker.class); Mockito.when(mockInvoker.getUrl()).thenReturn(URL.valueOf("simple://127.0.0.1:20880?Test1=Value")); userSpecifiedAddressRouter.notify(new LinkedList<>(Collections.singletonList(mockInvoker))); UserSpecifiedAddressUtil.setAddress(new Address(URL.valueOf("127.0.0.1:20880"))); - List> invokers = userSpecifiedAddressRouter.route(new LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl, Mockito.mock(Invocation.class)); + invokers = userSpecifiedAddressRouter.route(new LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl, Mockito.mock(Invocation.class)); Assertions.assertEquals(1, invokers.size()); Assertions.assertEquals(mockInvoker, invokers.get(0)); @@ -100,8 +105,15 @@ public void testGetInvokerByURL() { Assertions.assertEquals(mockInvoker, invokers.get(0)); UserSpecifiedAddressUtil.setAddress(new Address(URL.valueOf("127.0.0.1:20880?Test1=Value&Test2=Value&Test3=Value"))); - Assertions.assertThrows(RpcException.class, () -> - userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, Mockito.mock(Invocation.class))); + invokers = userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, Mockito.mock(Invocation.class)); + Assertions.assertEquals(1, invokers.size()); + Assertions.assertEquals("127.0.0.1", invokers.get(0).getUrl().getHost()); + Assertions.assertEquals(20880, invokers.get(0).getUrl().getPort()); + Assertions.assertEquals("Value", invokers.get(0).getUrl().getParameter("Test1")); + Assertions.assertEquals("Value", invokers.get(0).getUrl().getParameter("Test2")); + Assertions.assertEquals("Value", invokers.get(0).getUrl().getParameter("Test3")); + Assertions.assertEquals(consumerUrl.getParameter("version"), invokers.get(0).getUrl().getParameter("version")); + Assertions.assertEquals(consumerUrl.getParameter("group"), invokers.get(0).getUrl().getParameter("group")); } @Test @@ -109,7 +121,7 @@ public void testGetInvokerByIp() { UserSpecifiedAddressRouter userSpecifiedAddressRouter = new UserSpecifiedAddressRouter(consumerUrl); Assertions.assertEquals(Collections.emptyList(), - userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, Mockito.mock(Invocation.class))); + userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, Mockito.mock(Invocation.class))); Invoker mockInvoker = Mockito.mock(Invoker.class); Mockito.when(mockInvoker.getUrl()).thenReturn(consumerUrl); @@ -128,14 +140,19 @@ public void testGetInvokerByIp() { UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 20770)); Assertions.assertThrows(RpcException.class, () -> - userSpecifiedAddressRouter.route(new LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl, Mockito.mock(Invocation.class))); + userSpecifiedAddressRouter.route(new LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl, Mockito.mock(Invocation.class))); UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.3", 20880)); Assertions.assertThrows(RpcException.class, () -> - userSpecifiedAddressRouter.route(new LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl, Mockito.mock(Invocation.class))); + userSpecifiedAddressRouter.route(new LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl, Mockito.mock(Invocation.class))); UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 20770, true)); - Assertions.assertThrows(RpcException.class, () -> - userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, Mockito.mock(Invocation.class))); + invokers = userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, Mockito.mock(Invocation.class)); + Assertions.assertEquals(1, invokers.size()); + Assertions.assertEquals("127.0.0.2", invokers.get(0).getUrl().getHost()); + Assertions.assertEquals(20770, invokers.get(0).getUrl().getPort()); + Assertions.assertEquals("Value", invokers.get(0).getUrl().getParameter("Test")); + Assertions.assertEquals(consumerUrl.getParameter("version"), invokers.get(0).getUrl().getParameter("version")); + Assertions.assertEquals(consumerUrl.getParameter("group"), invokers.get(0).getUrl().getParameter("group")); } }