From 169537cd4743a3b6b2d88c777398e00a03303ba6 Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Wed, 28 Aug 2024 14:30:12 -0700 Subject: [PATCH 1/6] Determine subchannel locality from connected address, discarding assumption that all addresses for a subchannel are in the same locality. --- .../io/grpc/InternalSubchannelAddress.java | 31 +++ api/src/main/java/io/grpc/LoadBalancer.java | 12 ++ .../io/grpc/internal/InternalSubchannel.java | 10 + .../io/grpc/internal/ManagedChannelImpl.java | 5 + .../grpc/internal/InternalSubchannelTest.java | 26 +++ .../io/grpc/util/ForwardingSubchannel.java | 6 + .../io/grpc/xds/ClusterImplLoadBalancer.java | 116 ++++++++--- .../io/grpc/xds/client/LoadStatsManager2.java | 2 +- .../grpc/xds/ClusterImplLoadBalancerTest.java | 191 +++++++++++++++--- 9 files changed, 346 insertions(+), 53 deletions(-) create mode 100644 api/src/main/java/io/grpc/InternalSubchannelAddress.java diff --git a/api/src/main/java/io/grpc/InternalSubchannelAddress.java b/api/src/main/java/io/grpc/InternalSubchannelAddress.java new file mode 100644 index 00000000000..338504c3669 --- /dev/null +++ b/api/src/main/java/io/grpc/InternalSubchannelAddress.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +/** + * An internal class. Do not use. + * + *

An interface to provide the address connected by subchannel. + */ +@Internal +public interface InternalSubchannelAddress { + + /** + * Return attributes of the server address connected by sub channel. + */ + public Attributes getConnectedAddressAttributes(); +} diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 15106a5ffc6..0fbce5fa5be 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -1428,6 +1428,18 @@ public void updateAddresses(List addrs) { public Object getInternalSubchannel() { throw new UnsupportedOperationException(); } + + /** + * (Internal use only) returns attributes of the address subchannel is connected to. + * + *

Warning: this is INTERNAL API, is not supposed to be used by external users, and may + * change without notice. If you think you must use it, please file an issue and we can consider + * removing its "internal" status. + */ + @Internal + public Attributes getConnectedAddressAttributes() { + throw new UnsupportedOperationException(); + } } /** diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index a986cb2deff..70e42e2f5f1 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -157,6 +157,8 @@ protected void handleNotInUse() { private Status shutdownReason; + private volatile Attributes connectedAddressAttributes; + InternalSubchannel(List addressGroups, String authority, String userAgent, BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, @@ -525,6 +527,13 @@ public void run() { return channelStatsFuture; } + /** + * Return attributes for server address connected by sub channel. + */ + public Attributes getConnectedAddressAttributes() { + return connectedAddressAttributes; + } + ConnectivityState getState() { return state.getState(); } @@ -568,6 +577,7 @@ public void run() { } else if (pendingTransport == transport) { activeTransport = transport; pendingTransport = null; + connectedAddressAttributes = addressIndex.getCurrentEagAttributes(); gotoNonErrorState(READY); } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 7f45ca967ea..07dcf9ee7bb 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -2044,6 +2044,11 @@ public void updateAddresses(List addrs) { subchannel.updateAddresses(addrs); } + @Override + public Attributes getConnectedAddressAttributes() { + return subchannel.getConnectedAddressAttributes(); + } + private List stripOverrideAuthorityAttributes( List eags) { List eagsWithoutOverrideAttr = new ArrayList<>(); diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index f7631c34c0d..e4d9f27ed46 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -1339,6 +1339,32 @@ public void channelzStatContainsTransport() throws Exception { assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2); } + @Test + public void connectedAddressAttributes_ready() { + SocketAddress addr = new SocketAddress() {}; + Attributes attr = Attributes.newBuilder().set(Attributes.Key.create("some-key"), "1").build(); + createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addr), attr)); + + assertEquals(IDLE, internalSubchannel.getState()); + assertNoCallbackInvoke(); + assertNull(internalSubchannel.obtainActiveTransport()); + assertNull(internalSubchannel.getConnectedAddressAttributes()); + + assertExactCallbackInvokes("onStateChange:CONNECTING"); + assertEquals(CONNECTING, internalSubchannel.getState()); + verify(mockTransportFactory).newClientTransport( + eq(addr), + eq(createClientTransportOptions().setEagAttributes(attr)), + isA(TransportLogger.class)); + assertNull(internalSubchannel.getConnectedAddressAttributes()); + + internalSubchannel.obtainActiveTransport(); + transports.peek().listener.transportReady(); + assertExactCallbackInvokes("onStateChange:READY"); + assertEquals(READY, internalSubchannel.getState()); + assertEquals(attr, internalSubchannel.getConnectedAddressAttributes()); + } + /** Create ClientTransportOptions. Should not be reused if it may be mutated. */ private ClientTransportFactory.ClientTransportOptions createClientTransportOptions() { return new ClientTransportFactory.ClientTransportOptions() diff --git a/util/src/main/java/io/grpc/util/ForwardingSubchannel.java b/util/src/main/java/io/grpc/util/ForwardingSubchannel.java index 51f2583186e..416be378162 100644 --- a/util/src/main/java/io/grpc/util/ForwardingSubchannel.java +++ b/util/src/main/java/io/grpc/util/ForwardingSubchannel.java @@ -74,11 +74,17 @@ public Object getInternalSubchannel() { return delegate().getInternalSubchannel(); } + @Override public void updateAddresses(List addrs) { delegate().updateAddresses(addrs); } + @Override + public Attributes getConnectedAddressAttributes() { + return delegate().getConnectedAddressAttributes(); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 702b2aa6caa..6d4b05bd482 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -27,6 +27,7 @@ import io.grpc.ClientStreamTracer; import io.grpc.ClientStreamTracer.StreamInfo; import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.InternalLogId; import io.grpc.LoadBalancer; @@ -59,6 +60,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; /** @@ -77,10 +79,9 @@ final class ClusterImplLoadBalancer extends LoadBalancer { Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING")) || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING")); - private static final Attributes.Key ATTR_CLUSTER_LOCALITY_STATS = - Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityStats"); - private static final Attributes.Key ATTR_CLUSTER_LOCALITY_NAME = - Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityName"); + private static final Attributes.Key> + ATTR_CLUSTER_LOCALITY = + Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality"); private final XdsLogger logger; private final Helper helper; @@ -213,35 +214,43 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne @Override public Subchannel createSubchannel(CreateSubchannelArgs args) { List addresses = withAdditionalAttributes(args.getAddresses()); - Locality locality = args.getAddresses().get(0).getAttributes().get( - InternalXdsAttributes.ATTR_LOCALITY); // all addresses should be in the same locality - String localityName = args.getAddresses().get(0).getAttributes().get( - InternalXdsAttributes.ATTR_LOCALITY_NAME); - // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain - // attributes with its locality, including endpoints in LOGICAL_DNS clusters. - // In case of not (which really shouldn't), loads are aggregated under an empty locality. - if (locality == null) { - locality = Locality.create("", "", ""); - localityName = ""; - } - final ClusterLocalityStats localityStats = - (lrsServerInfo == null) - ? null - : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster, - edsServiceName, locality); - + // This clusterLocality ideally should not be utilized. We derive locality + // information from the first address, even prior to the subchannel becoming ready. + // This is primarily for the purpose of facilitating load reporting in the pre-READY + // state of the subchannel. + ClusterLocality clusterLocality = createClusterLocalityFromAttributes( + args.getAddresses().get(0).getAttributes()); + AtomicReference localityAtomicReference = new AtomicReference<>( + clusterLocality); Attributes attrs = args.getAttributes().toBuilder() - .set(ATTR_CLUSTER_LOCALITY_STATS, localityStats) - .set(ATTR_CLUSTER_LOCALITY_NAME, localityName) + .set(ATTR_CLUSTER_LOCALITY, localityAtomicReference) .build(); args = args.toBuilder().setAddresses(addresses).setAttributes(attrs).build(); final Subchannel subchannel = delegate().createSubchannel(args); return new ForwardingSubchannel() { + @Override + public void start(SubchannelStateListener listener) { + delegate().start(new SubchannelStateListener() { + @Override + public void onSubchannelState(ConnectivityStateInfo newState) { + if (newState.getState().equals(ConnectivityState.READY)) { + // Get locality based on the connected address attributes + ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes( + subchannel.getConnectedAddressAttributes()); + ClusterLocality oldClusterLocality = localityAtomicReference + .getAndSet(updatedClusterLocality); + oldClusterLocality.release(); + } + listener.onSubchannelState(newState); + } + }); + } + @Override public void shutdown() { - if (localityStats != null) { - localityStats.release(); + if (localityAtomicReference.get() != null) { + localityAtomicReference.get().release(); } delegate().shutdown(); } @@ -274,6 +283,28 @@ private List withAdditionalAttributes( return newAddresses; } + private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) { + Locality locality = addressAttributes.get(InternalXdsAttributes.ATTR_LOCALITY); + String localityName = addressAttributes.get(InternalXdsAttributes.ATTR_LOCALITY_NAME); + + // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain + // attributes with its locality, including endpoints in LOGICAL_DNS clusters. + // In case of not (which really shouldn't), loads are aggregated under an empty + // locality. + if (locality == null) { + locality = Locality.create("", "", ""); + localityName = ""; + } + + final ClusterLocalityStats localityStats = + (lrsServerInfo == null) + ? null + : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster, + edsServiceName, locality); + + return new ClusterLocality(localityStats, localityName); + } + @Override protected Helper delegate() { return helper; @@ -362,10 +393,12 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } } final ClusterLocalityStats stats = - result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_STATS); + result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get() + .getClusterLocalityStats(); if (stats != null) { String localityName = - result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_NAME); + result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get() + .getClusterLocalityName(); args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName); ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory( @@ -447,4 +480,33 @@ public void onLoadReport(MetricReport report) { stats.recordBackendLoadMetricStats(report.getNamedMetrics()); } } + + /** + * Represents the locality attributes of a cluster. + */ + static final class ClusterLocality { + private final ClusterLocalityStats clusterLocalityStats; + private final String clusterLocalityName; + + @VisibleForTesting + ClusterLocality(ClusterLocalityStats localityStats, String localityName) { + this.clusterLocalityStats = localityStats; + this.clusterLocalityName = localityName; + } + + ClusterLocalityStats getClusterLocalityStats() { + return clusterLocalityStats; + } + + String getClusterLocalityName() { + return clusterLocalityName; + } + + @VisibleForTesting + void release() { + if (clusterLocalityStats != null) { + clusterLocalityStats.release(); + } + } + } } diff --git a/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java b/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java index 393cce16194..d8800eaa19c 100644 --- a/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java +++ b/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java @@ -91,7 +91,7 @@ private synchronized void releaseClusterDropCounter( String cluster, @Nullable String edsServiceName) { checkState(allDropStats.containsKey(cluster) && allDropStats.get(cluster).containsKey(edsServiceName), - "stats for cluster %s, edsServiceName %s not exits", cluster, edsServiceName); + "stats for cluster %s, edsServiceName %s not exist", cluster, edsServiceName); ReferenceCounted ref = allDropStats.get(cluster).get(edsServiceName); ref.release(); } diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 4e12a5717ae..766bab2472e 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -16,6 +16,7 @@ package io.grpc.xds; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; @@ -29,6 +30,7 @@ import io.grpc.CallOptions; import io.grpc.ClientStreamTracer; import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.InsecureChannelCredentials; import io.grpc.LoadBalancer; @@ -40,7 +42,9 @@ import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancer.SubchannelStateListener; import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Status; @@ -76,9 +80,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Queue; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -145,7 +151,7 @@ public AtomicLong getOrCreate(String cluster, @Nullable String edsServiceName) { return new AtomicLong(); } }; - private final Helper helper = new FakeLbHelper(); + private final FakeLbHelper helper = new FakeLbHelper(); private PickSubchannelArgs pickSubchannelArgs = new PickSubchannelArgsImpl( TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT, new PickDetailsConsumer() {}); @@ -272,9 +278,10 @@ public void pick_addsLocalityLabel() { EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); - Subchannel subchannel = leafBalancer.helper.createSubchannel( - CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build()); - leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + leafBalancer.createSubChannel(); + FakeSubchannel fakeSubchannel = helper.subchannels.poll(); + fakeSubchannel.updateConnectedAtrributes(locality); + fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); assertThat(currentState).isEqualTo(ConnectivityState.READY); PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class); @@ -300,9 +307,10 @@ public void recordLoadStats() { EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); - Subchannel subchannel = leafBalancer.helper.createSubchannel( - CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build()); - leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + Subchannel subchannel = leafBalancer.createSubChannel(); + FakeSubchannel fakeSubchannel = helper.subchannels.poll(); + fakeSubchannel.updateConnectedAtrributes(locality); + fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); assertThat(currentState).isEqualTo(ConnectivityState.READY); PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); assertThat(result.getStatus().isOk()).isTrue(); @@ -357,7 +365,7 @@ public void recordLoadStats() { TOLERANCE).of(0.009); streamTracer3.streamClosed(Status.OK); - subchannel.shutdown(); // stats recorder released + subchannel.shutdown(); // stats recorder released clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER)); // Locality load is reported for one last time in case of loads occurred since the previous // load report. @@ -373,6 +381,95 @@ public void recordLoadStats() { assertThat(clusterStats.upstreamLocalityStatsList()).isEmpty(); // no longer reported } + // TODO(dnvindhya): This test has been added as a fix to verify + // https://github.com/grpc/grpc-java/issues/11434. + // Once we update PickFirstLeafLoadBalancer as default LoadBalancer, update the test. + @Test + public void pickFirstLoadReport_onUpdateAddress() { + Locality locality1 = + Locality.create("test-region", "test-zone", "test-subzone"); + Locality locality2 = + Locality.create("other-region", "other-zone", "other-subzone"); + + LoadBalancerProvider pickFirstProvider = LoadBalancerRegistry + .getDefaultRegistry().getProvider("pick_first"); + Object pickFirstConfig = pickFirstProvider.parseLoadBalancingPolicyConfig(new HashMap<>()) + .getConfig(); + ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, + null, Collections.emptyList(), + GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(pickFirstProvider, + pickFirstConfig), + null, Collections.emptyMap()); + EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality1); + EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr2", locality2); + deliverAddressesAndConfig(Arrays.asList(endpoint1, endpoint2), config); + + // Leaf balancer is created by Pick First. Get FakeSubchannel created to update attributes + // A real subchannel would get these attributes from the connected address's EAG locality. + FakeSubchannel fakeSubchannel = helper.subchannels.poll(); + fakeSubchannel.updateConnectedAtrributes(locality1); + fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + assertThat(currentState).isEqualTo(ConnectivityState.READY); + PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); + assertThat(result.getStatus().isOk()).isTrue(); + + ClientStreamTracer streamTracer1 = result.getStreamTracerFactory().newClientStreamTracer( + ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // first RPC call + streamTracer1.streamClosed(Status.OK); + + ClusterStats clusterStats = Iterables.getOnlyElement( + loadStatsManager.getClusterStatsReports(CLUSTER)); + UpstreamLocalityStats localityStats = Iterables.getOnlyElement( + clusterStats.upstreamLocalityStatsList()); + assertThat(localityStats.locality()).isEqualTo(locality1); + assertThat(localityStats.totalIssuedRequests()).isEqualTo(1L); + assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(1L); + assertThat(localityStats.totalErrorRequests()).isEqualTo(0L); + assertThat(localityStats.totalRequestsInProgress()).isEqualTo(0L); + + fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.IDLE)); + loadBalancer.requestConnection(); + fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING)); + + // Faksubchannel mimics update address and returns different locality + fakeSubchannel.updateConnectedAtrributes(locality2); + fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + ClientStreamTracer streamTracer2 = result.getStreamTracerFactory().newClientStreamTracer( + ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // second RPC call + streamTracer2.streamClosed(Status.UNAVAILABLE); + + clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER)); + UpstreamLocalityStats localityStats1 = Iterables.get(clusterStats.upstreamLocalityStatsList(), + 0); + assertThat(localityStats1.locality()).isEqualTo(locality1); + assertThat(localityStats1.totalIssuedRequests()).isEqualTo(1L); + assertThat(localityStats1.totalSuccessfulRequests()).isEqualTo(0L); + assertThat(localityStats1.totalErrorRequests()).isEqualTo(1L); + assertThat(localityStats1.totalRequestsInProgress()).isEqualTo(0L); + UpstreamLocalityStats localityStats2 = Iterables.get(clusterStats.upstreamLocalityStatsList(), + 1); + assertThat(localityStats2.locality()).isEqualTo(locality2); + assertThat(localityStats2.totalIssuedRequests()).isEqualTo(0L); + assertThat(localityStats2.totalSuccessfulRequests()).isEqualTo(0L); + assertThat(localityStats2.totalErrorRequests()).isEqualTo(0L); + assertThat(localityStats2.totalRequestsInProgress()).isEqualTo(0L); + + // No more references are held for localityStats1 hence dropped. + // Locality load is reported for one last time in case of loads occurred since the previous + // load report. + clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER)); + localityStats2 = Iterables.getOnlyElement(clusterStats.upstreamLocalityStatsList()); + + assertThat(localityStats2.locality()).isEqualTo(locality2); + assertThat(localityStats2.totalIssuedRequests()).isEqualTo(0L); + assertThat(localityStats2.totalSuccessfulRequests()).isEqualTo(0L); + assertThat(localityStats2.totalErrorRequests()).isEqualTo(0L); + assertThat(localityStats2.totalRequestsInProgress()).isEqualTo(0L); + + // Reference to localityStats will be released once loadbalancer is shutdown, that is when we + // will have no LocalityStats in clusterStats.upstreamLocalityStatsList(); + } + @Test public void dropRpcsWithRespectToLbConfigDropCategories() { LoadBalancerProvider weightedTargetProvider = new WeightedTargetLoadBalancerProvider(); @@ -391,9 +488,11 @@ public void dropRpcsWithRespectToLbConfigDropCategories() { assertThat(leafBalancer.name).isEqualTo("round_robin"); assertThat(Iterables.getOnlyElement(leafBalancer.addresses).getAddresses()) .isEqualTo(endpoint.getAddresses()); - Subchannel subchannel = leafBalancer.helper.createSubchannel( - CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build()); - leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + leafBalancer.createSubChannel(); + FakeSubchannel fakeSubchannel = helper.subchannels.poll(); + fakeSubchannel.updateConnectedAtrributes(locality); + fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + assertThat(currentState).isEqualTo(ConnectivityState.READY); PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); assertThat(result.getStatus().isOk()).isFalse(); @@ -470,9 +569,11 @@ private void subtest_maxConcurrentRequests_appliedByLbConfig(boolean enableCircu assertThat(leafBalancer.name).isEqualTo("round_robin"); assertThat(Iterables.getOnlyElement(leafBalancer.addresses).getAddresses()) .isEqualTo(endpoint.getAddresses()); - Subchannel subchannel = leafBalancer.helper.createSubchannel( - CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build()); - leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + leafBalancer.createSubChannel(); + FakeSubchannel fakeSubchannel = helper.subchannels.poll(); + fakeSubchannel.updateConnectedAtrributes(locality); + fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + assertThat(currentState).isEqualTo(ConnectivityState.READY); assertThat(currentState).isEqualTo(ConnectivityState.READY); for (int i = 0; i < maxConcurrentRequests; i++) { PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); @@ -562,9 +663,11 @@ private void subtest_maxConcurrentRequests_appliedWithDefaultValue( assertThat(leafBalancer.name).isEqualTo("round_robin"); assertThat(Iterables.getOnlyElement(leafBalancer.addresses).getAddresses()) .isEqualTo(endpoint.getAddresses()); - Subchannel subchannel = leafBalancer.helper.createSubchannel( - CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build()); - leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + leafBalancer.createSubChannel(); + FakeSubchannel fakeSubchannel = helper.subchannels.poll(); + fakeSubchannel.updateConnectedAtrributes(locality); + fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + assertThat(currentState).isEqualTo(ConnectivityState.READY); assertThat(currentState).isEqualTo(ConnectivityState.READY); for (int i = 0; i < ClusterImplLoadBalancer.DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; i++) { PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); @@ -830,19 +933,24 @@ public void shutdown() { downstreamBalancers.remove(this); } - void deliverSubchannelState(final Subchannel subchannel, ConnectivityState state) { - SubchannelPicker picker = new SubchannelPicker() { - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withSubchannel(subchannel); + Subchannel createSubChannel() { + Subchannel subchannel = helper.createSubchannel( + CreateSubchannelArgs.newBuilder().setAddresses(addresses).build()); + subchannel.start(infoObject -> { + if (infoObject.getState() == ConnectivityState.READY) { + helper.updateBalancingState( + ConnectivityState.READY, + new FixedResultPicker(PickResult.withSubchannel(subchannel))); } - }; - helper.updateBalancingState(state, picker); + }); + return subchannel; } } private final class FakeLbHelper extends LoadBalancer.Helper { + private final Queue subchannels = new LinkedList<>(); + @Override public SynchronizationContext getSynchronizationContext() { return syncContext; @@ -857,7 +965,9 @@ public void updateBalancingState( @Override public Subchannel createSubchannel(CreateSubchannelArgs args) { - return new FakeSubchannel(args.getAddresses(), args.getAttributes()); + FakeSubchannel subchannel = new FakeSubchannel(args.getAddresses(), args.getAttributes()); + subchannels.add(subchannel); + return subchannel; } @Override @@ -869,17 +979,27 @@ public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String author public String getAuthority() { return AUTHORITY; } + + @Override + public void refreshNameResolution() {} } private static final class FakeSubchannel extends Subchannel { private final List eags; private final Attributes attrs; + private SubchannelStateListener listener; + private Attributes connectedAtrributes; private FakeSubchannel(List eags, Attributes attrs) { this.eags = eags; this.attrs = attrs; } + @Override + public void start(SubchannelStateListener listener) { + this.listener = checkNotNull(listener, "listener"); + } + @Override public void shutdown() { } @@ -901,6 +1021,27 @@ public Attributes getAttributes() { @Override public void updateAddresses(List addrs) { } + + @Override + public Attributes getConnectedAddressAttributes() { + return connectedAtrributes; + } + + public void updateState(ConnectivityStateInfo newState) { + listener.onSubchannelState(newState); + } + + public void updateConnectedAtrributes(Locality locality) { + String localityName = "Locality{region=" + locality.region() + + ", zone=" + locality.zone() + + ", subZone=" + locality.subZone() + + "}"; + Attributes attrsWithLocality = attrs.toBuilder() + .set(InternalXdsAttributes.ATTR_LOCALITY, locality) + .set(InternalXdsAttributes.ATTR_LOCALITY_NAME, localityName) + .build(); + this.connectedAtrributes = attrsWithLocality; + } } private final class FakeXdsClient extends XdsClient { From abaf93df830986d02cdb335a6e0dfa37ce408a71 Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Wed, 28 Aug 2024 16:11:38 -0700 Subject: [PATCH 2/6] fix unit test: get updated PickReslt after locality is updated --- .../grpc/xds/ClusterImplLoadBalancerTest.java | 44 ++++++++----------- 1 file changed, 18 insertions(+), 26 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 766bab2472e..23f5e346fc5 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -280,7 +280,7 @@ public void pick_addsLocalityLabel() { FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); leafBalancer.createSubChannel(); FakeSubchannel fakeSubchannel = helper.subchannels.poll(); - fakeSubchannel.updateConnectedAtrributes(locality); + fakeSubchannel.setConnectedEagIndex(0); fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); assertThat(currentState).isEqualTo(ConnectivityState.READY); @@ -309,7 +309,7 @@ public void recordLoadStats() { FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); Subchannel subchannel = leafBalancer.createSubChannel(); FakeSubchannel fakeSubchannel = helper.subchannels.poll(); - fakeSubchannel.updateConnectedAtrributes(locality); + fakeSubchannel.setConnectedEagIndex(0); fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); assertThat(currentState).isEqualTo(ConnectivityState.READY); PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); @@ -407,7 +407,7 @@ public void pickFirstLoadReport_onUpdateAddress() { // Leaf balancer is created by Pick First. Get FakeSubchannel created to update attributes // A real subchannel would get these attributes from the connected address's EAG locality. FakeSubchannel fakeSubchannel = helper.subchannels.poll(); - fakeSubchannel.updateConnectedAtrributes(locality1); + fakeSubchannel.setConnectedEagIndex(0); fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); assertThat(currentState).isEqualTo(ConnectivityState.READY); PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); @@ -425,15 +425,16 @@ public void pickFirstLoadReport_onUpdateAddress() { assertThat(localityStats.totalIssuedRequests()).isEqualTo(1L); assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(1L); assertThat(localityStats.totalErrorRequests()).isEqualTo(0L); - assertThat(localityStats.totalRequestsInProgress()).isEqualTo(0L); fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.IDLE)); loadBalancer.requestConnection(); fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING)); // Faksubchannel mimics update address and returns different locality - fakeSubchannel.updateConnectedAtrributes(locality2); + fakeSubchannel.setConnectedEagIndex(1); fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + result = currentPicker.pickSubchannel(pickSubchannelArgs); + assertThat(result.getStatus().isOk()).isTrue(); ClientStreamTracer streamTracer2 = result.getStreamTracerFactory().newClientStreamTracer( ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // second RPC call streamTracer2.streamClosed(Status.UNAVAILABLE); @@ -442,18 +443,18 @@ public void pickFirstLoadReport_onUpdateAddress() { UpstreamLocalityStats localityStats1 = Iterables.get(clusterStats.upstreamLocalityStatsList(), 0); assertThat(localityStats1.locality()).isEqualTo(locality1); - assertThat(localityStats1.totalIssuedRequests()).isEqualTo(1L); + assertThat(localityStats1.totalIssuedRequests()).isEqualTo(0L); assertThat(localityStats1.totalSuccessfulRequests()).isEqualTo(0L); - assertThat(localityStats1.totalErrorRequests()).isEqualTo(1L); - assertThat(localityStats1.totalRequestsInProgress()).isEqualTo(0L); + assertThat(localityStats1.totalErrorRequests()).isEqualTo(0L); UpstreamLocalityStats localityStats2 = Iterables.get(clusterStats.upstreamLocalityStatsList(), 1); assertThat(localityStats2.locality()).isEqualTo(locality2); - assertThat(localityStats2.totalIssuedRequests()).isEqualTo(0L); + assertThat(localityStats2.totalIssuedRequests()).isEqualTo(1L); assertThat(localityStats2.totalSuccessfulRequests()).isEqualTo(0L); - assertThat(localityStats2.totalErrorRequests()).isEqualTo(0L); - assertThat(localityStats2.totalRequestsInProgress()).isEqualTo(0L); + assertThat(localityStats2.totalErrorRequests()).isEqualTo(1L); + loadBalancer.shutdown(); + loadBalancer = null; // No more references are held for localityStats1 hence dropped. // Locality load is reported for one last time in case of loads occurred since the previous // load report. @@ -466,8 +467,7 @@ public void pickFirstLoadReport_onUpdateAddress() { assertThat(localityStats2.totalErrorRequests()).isEqualTo(0L); assertThat(localityStats2.totalRequestsInProgress()).isEqualTo(0L); - // Reference to localityStats will be released once loadbalancer is shutdown, that is when we - // will have no LocalityStats in clusterStats.upstreamLocalityStatsList(); + assertThat(loadStatsManager.getClusterStatsReports(CLUSTER)).isEmpty(); } @Test @@ -490,7 +490,7 @@ public void dropRpcsWithRespectToLbConfigDropCategories() { .isEqualTo(endpoint.getAddresses()); leafBalancer.createSubChannel(); FakeSubchannel fakeSubchannel = helper.subchannels.poll(); - fakeSubchannel.updateConnectedAtrributes(locality); + fakeSubchannel.setConnectedEagIndex(0); fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); assertThat(currentState).isEqualTo(ConnectivityState.READY); @@ -571,7 +571,7 @@ private void subtest_maxConcurrentRequests_appliedByLbConfig(boolean enableCircu .isEqualTo(endpoint.getAddresses()); leafBalancer.createSubChannel(); FakeSubchannel fakeSubchannel = helper.subchannels.poll(); - fakeSubchannel.updateConnectedAtrributes(locality); + fakeSubchannel.setConnectedEagIndex(0); fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); assertThat(currentState).isEqualTo(ConnectivityState.READY); assertThat(currentState).isEqualTo(ConnectivityState.READY); @@ -665,7 +665,7 @@ private void subtest_maxConcurrentRequests_appliedWithDefaultValue( .isEqualTo(endpoint.getAddresses()); leafBalancer.createSubChannel(); FakeSubchannel fakeSubchannel = helper.subchannels.poll(); - fakeSubchannel.updateConnectedAtrributes(locality); + fakeSubchannel.setConnectedEagIndex(0); fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); assertThat(currentState).isEqualTo(ConnectivityState.READY); assertThat(currentState).isEqualTo(ConnectivityState.READY); @@ -1031,16 +1031,8 @@ public void updateState(ConnectivityStateInfo newState) { listener.onSubchannelState(newState); } - public void updateConnectedAtrributes(Locality locality) { - String localityName = "Locality{region=" + locality.region() - + ", zone=" + locality.zone() - + ", subZone=" + locality.subZone() - + "}"; - Attributes attrsWithLocality = attrs.toBuilder() - .set(InternalXdsAttributes.ATTR_LOCALITY, locality) - .set(InternalXdsAttributes.ATTR_LOCALITY_NAME, localityName) - .build(); - this.connectedAtrributes = attrsWithLocality; + public void setConnectedEagIndex(int eagIndex) { + this.connectedAtrributes = eags.get(eagIndex).getAttributes(); } } From 3a08e3ffdd8058199362cd77958e4d0170a10e14 Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Thu, 29 Aug 2024 17:34:25 -0700 Subject: [PATCH 3/6] addressed review comments --- ...ddress.java => InternalSubchannelAddressAttributes.java} | 4 ++-- xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java | 6 ++---- xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) rename api/src/main/java/io/grpc/{InternalSubchannelAddress.java => InternalSubchannelAddressAttributes.java} (85%) diff --git a/api/src/main/java/io/grpc/InternalSubchannelAddress.java b/api/src/main/java/io/grpc/InternalSubchannelAddressAttributes.java similarity index 85% rename from api/src/main/java/io/grpc/InternalSubchannelAddress.java rename to api/src/main/java/io/grpc/InternalSubchannelAddressAttributes.java index 338504c3669..cfc2f7c5137 100644 --- a/api/src/main/java/io/grpc/InternalSubchannelAddress.java +++ b/api/src/main/java/io/grpc/InternalSubchannelAddressAttributes.java @@ -19,10 +19,10 @@ /** * An internal class. Do not use. * - *

An interface to provide the address connected by subchannel. + *

An interface to provide the attributes for address connected by subchannel. */ @Internal -public interface InternalSubchannelAddress { +public interface InternalSubchannelAddressAttributes { /** * Return attributes of the server address connected by sub channel. diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 6d4b05bd482..2e2369de18a 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -249,9 +249,7 @@ public void onSubchannelState(ConnectivityStateInfo newState) { @Override public void shutdown() { - if (localityAtomicReference.get() != null) { - localityAtomicReference.get().release(); - } + localityAtomicReference.get().release(); delegate().shutdown(); } @@ -482,7 +480,7 @@ public void onLoadReport(MetricReport report) { } /** - * Represents the locality attributes of a cluster. + * Represents the {@link ClusterLocalityStats} and network locality name of a cluster. */ static final class ClusterLocality { private final ClusterLocalityStats clusterLocalityStats; diff --git a/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java b/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java index d8800eaa19c..be9d3587d14 100644 --- a/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java +++ b/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java @@ -91,7 +91,7 @@ private synchronized void releaseClusterDropCounter( String cluster, @Nullable String edsServiceName) { checkState(allDropStats.containsKey(cluster) && allDropStats.get(cluster).containsKey(edsServiceName), - "stats for cluster %s, edsServiceName %s not exist", cluster, edsServiceName); + "stats for cluster %s, edsServiceName %s do not exist", cluster, edsServiceName); ReferenceCounted ref = allDropStats.get(cluster).get(edsServiceName); ref.release(); } From 885bf997e5d9dcb37e0bd0c62b41891889a8777a Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Fri, 30 Aug 2024 13:50:34 -0700 Subject: [PATCH 4/6] Add check for subchannel was used that didn't have ATTR_CLUSTER_LOCALITY set. --- .../io/grpc/xds/ClusterImplLoadBalancer.java | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 2e2369de18a..dc995786cc3 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -214,10 +214,13 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne @Override public Subchannel createSubchannel(CreateSubchannelArgs args) { List addresses = withAdditionalAttributes(args.getAddresses()); - // This clusterLocality ideally should not be utilized. We derive locality - // information from the first address, even prior to the subchannel becoming ready. - // This is primarily for the purpose of facilitating load reporting in the pre-READY - // state of the subchannel. + // This value for ClusterLocality is not recommended for general use. + // Currently, we extract locality data from the first address, even before the subchannel is + // READY. + // This is mainly to accommodate scenarios where a Load Balancing API (like "pick first") + // might return the subchannel before it is READY. Typically, we wouldn't report load for such + // selections because the channel will disregard the chosen (not-ready) subchannel. + // However, we needed to ensure this case is handled. ClusterLocality clusterLocality = createClusterLocalityFromAttributes( args.getAddresses().get(0).getAttributes()); AtomicReference localityAtomicReference = new AtomicReference<>( @@ -390,20 +393,23 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { "Cluster max concurrent requests limit exceeded")); } } - final ClusterLocalityStats stats = - result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get() - .getClusterLocalityStats(); - if (stats != null) { - String localityName = - result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get() - .getClusterLocalityName(); - args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName); - - ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory( - stats, inFlights, result.getStreamTracerFactory()); - ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance() - .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats)); - return PickResult.withSubchannel(result.getSubchannel(), orcaTracerFactory); + final AtomicReference clusterLocality = + result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY); + + if (clusterLocality != null) { + ClusterLocalityStats stats = clusterLocality.get().getClusterLocalityStats(); + if (stats != null) { + String localityName = + result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get() + .getClusterLocalityName(); + args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName); + + ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory( + stats, inFlights, result.getStreamTracerFactory()); + ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance() + .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats)); + return PickResult.withSubchannel(result.getSubchannel(), orcaTracerFactory); + } } } return result; From 0a2e1fbc2c5238f56bf678d06be94cd141df47a3 Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Fri, 30 Aug 2024 14:42:06 -0700 Subject: [PATCH 5/6] Addressed larry-safran's comment on formatting --- xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index dc995786cc3..0ea2c7dd75f 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -79,8 +79,7 @@ final class ClusterImplLoadBalancer extends LoadBalancer { Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING")) || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING")); - private static final Attributes.Key> - ATTR_CLUSTER_LOCALITY = + private static final Attributes.Key> ATTR_CLUSTER_LOCALITY = Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality"); private final XdsLogger logger; From 60fb499a13081ca26222b76e7fa2d03ac2b11251 Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Fri, 30 Aug 2024 16:56:37 -0700 Subject: [PATCH 6/6] fix typo --- .../test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 23f5e346fc5..0082a2aa59d 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -988,7 +988,7 @@ private static final class FakeSubchannel extends Subchannel { private final List eags; private final Attributes attrs; private SubchannelStateListener listener; - private Attributes connectedAtrributes; + private Attributes connectedAttributes; private FakeSubchannel(List eags, Attributes attrs) { this.eags = eags; @@ -1024,7 +1024,7 @@ public void updateAddresses(List addrs) { @Override public Attributes getConnectedAddressAttributes() { - return connectedAtrributes; + return connectedAttributes; } public void updateState(ConnectivityStateInfo newState) { @@ -1032,7 +1032,7 @@ public void updateState(ConnectivityStateInfo newState) { } public void setConnectedEagIndex(int eagIndex) { - this.connectedAtrributes = eags.get(eagIndex).getAttributes(); + this.connectedAttributes = eags.get(eagIndex).getAttributes(); } }