Skip to content

Commit

Permalink
xds: ClusterManagerLB must update child configuration
Browse files Browse the repository at this point in the history
While child LB policies are unlikey to change for each cluster name (RLS
returns regular cluster names, so should be unique), and the
configuration for CDS policies won't change, RLS configuration can
definitely change.
  • Loading branch information
ejona86 committed Aug 28, 2024
1 parent d034a56 commit 10d6002
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 45 deletions.
39 changes: 25 additions & 14 deletions xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import com.google.common.base.MoreObjects;
import io.grpc.ConnectivityState;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.util.MultiChildLoadBalancer;
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
import io.grpc.xds.client.XdsLogger;
Expand Down Expand Up @@ -71,7 +71,10 @@ class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {

@Override
protected ResolvedAddresses getChildAddresses(Object key, ResolvedAddresses resolvedAddresses,
Object childConfig) {
Object unusedChildConfig) {
ClusterManagerConfig config = (ClusterManagerConfig)
resolvedAddresses.getLoadBalancingPolicyConfig();
Object childConfig = config.childPolicies.get(key);
return resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(childConfig).build();
}

Expand All @@ -81,13 +84,12 @@ protected Map<Object, ChildLbState> createChildLbMap(ResolvedAddresses resolvedA
resolvedAddresses.getLoadBalancingPolicyConfig();
Map<Object, ChildLbState> newChildPolicies = new HashMap<>();
if (config != null) {
for (Entry<String, PolicySelection> entry : config.childPolicies.entrySet()) {
ChildLbState child = getChildLbState(entry.getKey());
for (String key : config.childPolicies.keySet()) {
ChildLbState child = getChildLbState(key);
if (child == null) {
child = new ClusterManagerLbState(entry.getKey(),
entry.getValue().getProvider(), entry.getValue().getConfig());
child = new ClusterManagerLbState(key, GracefulSwitchLoadBalancerFactory.INSTANCE, null);
}
newChildPolicies.put(entry.getKey(), child);
newChildPolicies.put(key, child);
}
}
logger.log(
Expand All @@ -108,8 +110,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
resolvedAddresses.getLoadBalancingPolicyConfig();
ClusterManagerConfig lastConfig = (ClusterManagerConfig)
lastResolvedAddresses.getLoadBalancingPolicyConfig();
Map<String, PolicySelection> adjChildPolicies = new HashMap<>(config.childPolicies);
for (Entry<String, PolicySelection> entry : lastConfig.childPolicies.entrySet()) {
Map<String, Object> adjChildPolicies = new HashMap<>(config.childPolicies);
for (Entry<String, Object> entry : lastConfig.childPolicies.entrySet()) {
ClusterManagerLbState state = (ClusterManagerLbState) getChildLbState(entry.getKey());
if (adjChildPolicies.containsKey(entry.getKey())) {
if (state.deletionTimer != null) {
Expand Down Expand Up @@ -202,9 +204,9 @@ private class ClusterManagerLbState extends ChildLbState {
@Nullable
ScheduledHandle deletionTimer;

public ClusterManagerLbState(Object key, LoadBalancerProvider policyProvider,
public ClusterManagerLbState(Object key, LoadBalancer.Factory policyFactory,
Object childConfig) {
super(key, policyProvider, childConfig);
super(key, policyFactory, childConfig);
}

@Override
Expand Down Expand Up @@ -237,8 +239,8 @@ class DeletionTask implements Runnable {
public void run() {
ClusterManagerConfig config = (ClusterManagerConfig)
lastResolvedAddresses.getLoadBalancingPolicyConfig();
Map<String, PolicySelection> childPolicies = new HashMap<>(config.childPolicies);
PolicySelection removed = childPolicies.remove(getKey());
Map<String, Object> childPolicies = new HashMap<>(config.childPolicies);
Object removed = childPolicies.remove(getKey());
assert removed != null;
config = new ClusterManagerConfig(childPolicies);
lastResolvedAddresses =
Expand Down Expand Up @@ -276,4 +278,13 @@ public void updateBalancingState(final ConnectivityState newState,
}
}
}

static final class GracefulSwitchLoadBalancerFactory extends LoadBalancer.Factory {
static final LoadBalancer.Factory INSTANCE = new GracefulSwitchLoadBalancerFactory();

@Override
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
return new GracefulSwitchLoadBalancer(helper);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.JsonUtil;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.util.GracefulSwitchLoadBalancer;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -73,7 +70,7 @@ public String getPolicyName() {

@Override
public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawConfig) {
Map<String, PolicySelection> parsedChildPolicies = new LinkedHashMap<>();
Map<String, Object> parsedChildPolicies = new LinkedHashMap<>();
try {
Map<String, ?> childPolicies = JsonUtil.getObject(rawConfig, "childPolicy");
if (childPolicies == null || childPolicies.isEmpty()) {
Expand All @@ -86,27 +83,19 @@ public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawConfig) {
return ConfigOrError.fromError(Status.INTERNAL.withDescription(
"No config for child " + name + " in cluster_manager LB policy: " + rawConfig));
}
List<LbConfig> childConfigCandidates =
ServiceConfigUtil.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(childPolicy, "lbPolicy"));
if (childConfigCandidates == null || childConfigCandidates.isEmpty()) {
return ConfigOrError.fromError(Status.INTERNAL.withDescription(
"No config specified for child " + name + " in cluster_manager Lb policy: "
+ rawConfig));
}
LoadBalancerRegistry registry =
lbRegistry != null ? lbRegistry : LoadBalancerRegistry.getDefaultRegistry();
ConfigOrError selectedConfig =
ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, registry);
if (selectedConfig.getError() != null) {
Status error = selectedConfig.getError();
ConfigOrError childConfig = GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
JsonUtil.getListOfObjects(childPolicy, "lbPolicy"), registry);
if (childConfig.getError() != null) {
Status error = childConfig.getError();
return ConfigOrError.fromError(
Status.INTERNAL
.withCause(error.getCause())
.withDescription(error.getDescription())
.augmentDescription("Failed to select config for child " + name));
.augmentDescription("Failed to parse config for child " + name));
}
parsedChildPolicies.put(name, (PolicySelection) selectedConfig.getConfig());
parsedChildPolicies.put(name, childConfig.getConfig());
}
} catch (RuntimeException e) {
return ConfigOrError.fromError(
Expand All @@ -122,9 +111,9 @@ public LoadBalancer newLoadBalancer(Helper helper) {
}

static class ClusterManagerConfig {
final Map<String, PolicySelection> childPolicies;
final Map<String, Object> childPolicies;

ClusterManagerConfig(Map<String, PolicySelection> childPolicies) {
ClusterManagerConfig(Map<String, Object> childPolicies) {
this.childPolicies = Collections.unmodifiableMap(childPolicies);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.internal.JsonParser;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -133,10 +133,9 @@ public ConfigOrError parseLoadBalancingPolicyConfig(
assertThat(config.childPolicies)
.containsExactly(
"child1",
new PolicySelection(
lbProviderFoo, fooConfig),
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(lbProviderFoo, fooConfig),
"child2",
new PolicySelection(lbProviderBar, barConfig));
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(lbProviderBar, barConfig));
}

@Test
Expand Down
24 changes: 18 additions & 6 deletions xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
import io.grpc.internal.PickSubchannelArgsImpl;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.testing.TestMethodDescriptors;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -288,16 +289,27 @@ private void deliverResolvedAddresses(final Map<String, String> childPolicies, b
.build());
}

// Prevent ClusterManagerLB from detecting different providers even when the configuration is the
// same.
private Map<List<Object>, FakeLoadBalancerProvider> fakeLoadBalancerProviderCache
= new HashMap<>();

private ClusterManagerConfig buildConfig(Map<String, String> childPolicies, boolean failing) {
Map<String, PolicySelection> childPolicySelections = new LinkedHashMap<>();
Map<String, Object> childConfigs = new LinkedHashMap<>();
for (String name : childPolicies.keySet()) {
String childPolicyName = childPolicies.get(name);
Object childConfig = lbConfigInventory.get(name);
PolicySelection policy =
new PolicySelection(new FakeLoadBalancerProvider(childPolicyName, failing), childConfig);
childPolicySelections.put(name, policy);
FakeLoadBalancerProvider lbProvider =
fakeLoadBalancerProviderCache.get(Arrays.asList(childPolicyName, failing));
if (lbProvider == null) {
lbProvider = new FakeLoadBalancerProvider(childPolicyName, failing);
fakeLoadBalancerProviderCache.put(Arrays.asList(childPolicyName, failing), lbProvider);
}
Object policy =
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(lbProvider, childConfig);
childConfigs.put(name, policy);
}
return new ClusterManagerConfig(childPolicySelections);
return new ClusterManagerConfig(childConfigs);
}

private static PickResult pickSubchannel(SubchannelPicker picker, String clusterName) {
Expand Down

0 comments on commit 10d6002

Please sign in to comment.