Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dubbo-SPECIFY-ADDRESS]support v2 ip spec #179

Merged
merged 3 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,61 @@
package org.apache.dubbo.rpc.cluster.specifyaddress;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.router.AbstractRouter;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class UserSpecifiedAddressRouter extends AbstractRouter {
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;


public class UserSpecifiedAddressRouter<T> extends AbstractRouter {
private final static Logger logger = LoggerFactory.getLogger(UserSpecifiedAddressRouter.class);
// protected for ut purpose
protected static int EXPIRE_TIME = 10 * 60 * 1000;

private volatile List<Invoker<?>> invokers = Collections.emptyList();
private volatile Map<String, Invoker<?>> ip2Invoker;
private volatile Map<String, Invoker<?>> address2Invoker;
private volatile List<Invoker<T>> invokers = Collections.emptyList();
private volatile Map<String, Invoker<T>> ip2Invoker;
private volatile Map<String, Invoker<T>> address2Invoker;
private final Protocol protocol;

private final Lock cacheLock = new ReentrantLock();

private final ScheduledExecutorService scheduledExecutorService;
private final AtomicBoolean launchRemovalTask = new AtomicBoolean(false);


private final Map<URL, InvokerCache<T>> newInvokerCache = new LinkedHashMap<>(16, 0.75f, true);

public UserSpecifiedAddressRouter(URL referenceUrl) {
super(referenceUrl);
this.protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
this.scheduledExecutorService = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().nextScheduledExecutor();
}

@Override
Expand All @@ -73,22 +97,22 @@ public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation

// 2. check if set address url
if (address.getUrlAddress() != null) {
Invoker<?> invoker = getInvokerByURL(address, invocation);
Invoker<?> invoker = getInvokerByURL(address);
result.add((Invoker) invoker);
return result;
}

// 3. check if set ip and port
if (StringUtils.isNotEmpty(address.getIp())) {
Invoker<?> invoker = getInvokerByIp(address, invocation);
Invoker<?> invoker = getInvokerByIp(address);
result.add((Invoker) invoker);
return result;
}

return invokers;
}

private Invoker<?> getInvokerByURL(Address address, Invocation invocation) {
private Invoker<?> getInvokerByURL(Address address) {
tryLoadSpecifiedMap();

// try to find in directory
Expand All @@ -112,11 +136,11 @@ private Invoker<?> getInvokerByURL(Address address, Invocation invocation) {
}
}

// create new one
throw new RpcException("User specified server address not support refer new url in Dubbo 2.x. Please upgrade to Dubbo 3.x and use dubbo-cluster-specify-address-dubbo3.");
URL newUrl = rebuildAddress(address, getUrl());
return getOrBuildInvokerCache(newUrl);
}

public Invoker<?> getInvokerByIp(Address address, Invocation invocation) {
public Invoker<?> getInvokerByIp(Address address) {
tryLoadSpecifiedMap();

String ip = address.getIp();
Expand All @@ -136,49 +160,51 @@ public Invoker<?> getInvokerByIp(Address address, Invocation invocation) {
}

if (!address.isNeedToCreate()) {
throwException(invocation, address);
throwException(address);
}

throw new RpcException("User specified server address not support refer new url in Dubbo 2.x. Please upgrade to Dubbo 3.x and use dubbo-cluster-specify-address-dubbo3.");
URL newUrl = buildAddress(invokers, address, getUrl());
return getOrBuildInvokerCache(newUrl);
}

private void throwException(Invocation invocation, Address address) {

private void throwException(Address address) {
throw new RpcException("user specified server address : [" + address + "] is not a valid provider for service: ["
+ getUrl().getServiceKey() + "]");
+ getUrl().getServiceKey() + "]");
}


private Map<String, Invoker<?>> processIp(List<Invoker<?>> invokerList) {
Map<String, Invoker<?>> ip2Invoker = new HashMap<>();
for (Invoker<?> invoker : invokerList) {
private Map<String, Invoker<T>> processIp(List<Invoker<T>> invokerList) {
Map<String, Invoker<T>> ip2Invoker = new HashMap<>();
for (Invoker<T> invoker : invokerList) {
ip2Invoker.put(invoker.getUrl().getHost(), invoker);
}
return Collections.unmodifiableMap(ip2Invoker);
}

private Map<String, Invoker<?>> processAddress(List<Invoker<?>> addresses) {
Map<String, Invoker<?>> address2Invoker = new HashMap<>();
for (Invoker<?> invoker : addresses) {
private Map<String, Invoker<T>> processAddress(List<Invoker<T>> addresses) {
Map<String, Invoker<T>> address2Invoker = new HashMap<>();
for (Invoker<T> invoker : addresses) {
address2Invoker.put(invoker.getUrl().getHost() + ":" + invoker.getUrl().getPort(), invoker);
}
return Collections.unmodifiableMap(address2Invoker);
}

// For ut only
@Deprecated
protected Map<String, Invoker<?>> getIp2Invoker() {
protected Map<String, Invoker<T>> getIp2Invoker() {
return ip2Invoker;
}

// For ut only
@Deprecated
protected Map<String, Invoker<?>> getAddress2Invoker() {
protected Map<String, Invoker<T>> getAddress2Invoker() {
return address2Invoker;
}

// For ut only
@Deprecated
protected List<Invoker<?>> getInvokers() {
protected List<Invoker<T>> getInvokers() {
return invokers;
}

Expand All @@ -190,7 +216,7 @@ private void tryLoadSpecifiedMap() {
if (ip2Invoker != null) {
return;
}
List<Invoker<?>> invokers = this.invokers;
List<Invoker<T>> invokers = this.invokers;
if (CollectionUtils.isEmpty(invokers)) {
address2Invoker = Collections.unmodifiableMap(new HashMap<>());
ip2Invoker = Collections.unmodifiableMap(new HashMap<>());
Expand All @@ -200,4 +226,111 @@ private void tryLoadSpecifiedMap() {
ip2Invoker = processIp(invokers);
}
}


public <T> URL buildAddress(List<Invoker<T>> invokers, Address address, URL consumerUrl) {
if (!invokers.isEmpty()) {
URL template = invokers.iterator().next().getUrl();
template = template.setHost(address.getIp());
if (address.getPort() != 0) {
template = template.setPort(address.getPort());
}
return template;
} else {
String ip = address.getIp();
int port = address.getPort();
if (port == 0) {
port = this.protocol.getDefaultPort();
}
return turnRegistryUrlToConsumerUrl(consumerUrl, ip, port);
}
}

private URL turnRegistryUrlToConsumerUrl(URL url, String ip, int port) {
return URLBuilder.from(url)
.setHost(ip)
.setPort(port)
.setProtocol(DUBBO)
.setPath(url.getPath())
.clearParameters()
.addParameters(url.getParameters())
.removeParameter(MONITOR_KEY)
.build();
}

public URL rebuildAddress(Address address, URL consumerUrl) {
URL url = address.getUrlAddress();
Map<String, String> parameters = new HashMap<>(url.getParameters());
parameters.put(VERSION_KEY, consumerUrl.getParameter(VERSION_KEY, "0.0.0"));
parameters.put(GROUP_KEY, consumerUrl.getParameter(GROUP_KEY));
String ip = address.getIp();
int port = address.getPort();
if (port == 0) {
port = this.protocol.getDefaultPort();
}
return turnRegistryUrlToConsumerUrl(consumerUrl, ip, port);
}

private Invoker<T> getOrBuildInvokerCache(URL url) {
logger.info("Unable to find a proper invoker from directory. Try to create new invoker. New URL: " + url);

InvokerCache<T> cache;
cacheLock.lock();
try {
cache = newInvokerCache.get(url);
} finally {
cacheLock.unlock();
}
if (cache == null) {
Invoker<T> invoker = refer(url);
cacheLock.lock();
try {
cache = newInvokerCache.get(url);
if (cache == null) {
cache = new InvokerCache<>(invoker);
newInvokerCache.put(url, cache);
if (launchRemovalTask.compareAndSet(false, true)) {
scheduledExecutorService.scheduleAtFixedRate(new RemovalTask(), EXPIRE_TIME / 2, EXPIRE_TIME / 2, TimeUnit.MILLISECONDS);
}
} else {
invoker.destroy();
}
} finally {
cacheLock.unlock();
}
}
return cache.getInvoker();
}

private Invoker<T> refer(URL url) {
try {
Class interfaceClass = Class.forName(getUrl().getServiceName(), true, ClassUtils.getClassLoader());
return protocol.refer(interfaceClass, url);
} catch (ClassNotFoundException e) {
throw new RpcException(e);
}
}

private class RemovalTask implements Runnable {
@Override
public void run() {
cacheLock.lock();
try {
if (newInvokerCache.size() > 0) {
Iterator<Map.Entry<URL, InvokerCache<T>>> iterator = newInvokerCache.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<URL, InvokerCache<T>> entry = iterator.next();
if (System.currentTimeMillis() - entry.getValue().getLastAccess() > EXPIRE_TIME) {
iterator.remove();
entry.getValue().getInvoker().destroy();
} else {
break;
}
}
}
} finally {
cacheLock.unlock();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
default=org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository