Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Fix load reporting when pick first is used for locality-routing. #11495

Merged
merged 6 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions api/src/main/java/io/grpc/InternalSubchannelAddressAttributes.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

/**
* An internal class. Do not use.
*
* <p>An interface to provide the attributes for address connected by subchannel.
*/
@Internal
public interface InternalSubchannelAddressAttributes {

/**
* Return attributes of the server address connected by sub channel.
*/
public Attributes getConnectedAddressAttributes();
}
12 changes: 12 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,18 @@
public Object getInternalSubchannel() {
throw new UnsupportedOperationException();
}

/**
* (Internal use only) returns attributes of the address subchannel is connected to.
*
* <p>Warning: this is INTERNAL API, is not supposed to be used by external users, and may
* change without notice. If you think you must use it, please file an issue and we can consider
* removing its "internal" status.
*/
@Internal
public Attributes getConnectedAddressAttributes() {
throw new UnsupportedOperationException();

Check warning on line 1441 in api/src/main/java/io/grpc/LoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

api/src/main/java/io/grpc/LoadBalancer.java#L1441

Added line #L1441 was not covered by tests
}
}

/**
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/grpc/internal/InternalSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ protected void handleNotInUse() {

private Status shutdownReason;

private volatile Attributes connectedAddressAttributes;

InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
Expand Down Expand Up @@ -525,6 +527,13 @@ public void run() {
return channelStatsFuture;
}

/**
* Return attributes for server address connected by sub channel.
*/
public Attributes getConnectedAddressAttributes() {
return connectedAddressAttributes;
}

ConnectivityState getState() {
return state.getState();
}
Expand Down Expand Up @@ -568,6 +577,7 @@ public void run() {
} else if (pendingTransport == transport) {
activeTransport = transport;
pendingTransport = null;
connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
gotoNonErrorState(READY);
}
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2044,6 +2044,11 @@ public void updateAddresses(List<EquivalentAddressGroup> addrs) {
subchannel.updateAddresses(addrs);
}

@Override
public Attributes getConnectedAddressAttributes() {
return subchannel.getConnectedAddressAttributes();
}

private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
List<EquivalentAddressGroup> eags) {
List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
Expand Down
26 changes: 26 additions & 0 deletions core/src/test/java/io/grpc/internal/InternalSubchannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,32 @@ public void channelzStatContainsTransport() throws Exception {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2);
}

@Test
public void connectedAddressAttributes_ready() {
SocketAddress addr = new SocketAddress() {};
Attributes attr = Attributes.newBuilder().set(Attributes.Key.create("some-key"), "1").build();
createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addr), attr));

assertEquals(IDLE, internalSubchannel.getState());
assertNoCallbackInvoke();
assertNull(internalSubchannel.obtainActiveTransport());
assertNull(internalSubchannel.getConnectedAddressAttributes());

assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory).newClientTransport(
eq(addr),
eq(createClientTransportOptions().setEagAttributes(attr)),
isA(TransportLogger.class));
assertNull(internalSubchannel.getConnectedAddressAttributes());

internalSubchannel.obtainActiveTransport();
transports.peek().listener.transportReady();
assertExactCallbackInvokes("onStateChange:READY");
assertEquals(READY, internalSubchannel.getState());
assertEquals(attr, internalSubchannel.getConnectedAddressAttributes());
}

/** Create ClientTransportOptions. Should not be reused if it may be mutated. */
private ClientTransportFactory.ClientTransportOptions createClientTransportOptions() {
return new ClientTransportFactory.ClientTransportOptions()
Expand Down
6 changes: 6 additions & 0 deletions util/src/main/java/io/grpc/util/ForwardingSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,17 @@ public Object getInternalSubchannel() {
return delegate().getInternalSubchannel();
}


@Override
public void updateAddresses(List<EquivalentAddressGroup> addrs) {
delegate().updateAddresses(addrs);
}

@Override
public Attributes getConnectedAddressAttributes() {
return delegate().getConnectedAddressAttributes();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
Expand Down
116 changes: 88 additions & 28 deletions xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

/**
Expand All @@ -77,10 +79,9 @@
Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
|| Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));

private static final Attributes.Key<ClusterLocalityStats> ATTR_CLUSTER_LOCALITY_STATS =
Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityStats");
private static final Attributes.Key<String> ATTR_CLUSTER_LOCALITY_NAME =
Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityName");
private static final Attributes.Key<AtomicReference<ClusterLocality>>
ATTR_CLUSTER_LOCALITY =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can put ATTR_CLUSTER_LOCALITY on the same line as the type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality");

private final XdsLogger logger;
private final Helper helper;
Expand Down Expand Up @@ -213,36 +214,42 @@
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses());
Locality locality = args.getAddresses().get(0).getAttributes().get(
InternalXdsAttributes.ATTR_LOCALITY); // all addresses should be in the same locality
String localityName = args.getAddresses().get(0).getAttributes().get(
InternalXdsAttributes.ATTR_LOCALITY_NAME);
// Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
// attributes with its locality, including endpoints in LOGICAL_DNS clusters.
// In case of not (which really shouldn't), loads are aggregated under an empty locality.
if (locality == null) {
locality = Locality.create("", "", "");
localityName = "";
}
final ClusterLocalityStats localityStats =
(lrsServerInfo == null)
? null
: xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
edsServiceName, locality);

// This clusterLocality ideally should not be utilized. We derive locality
// information from the first address, even prior to the subchannel becoming ready.
// This is primarily for the purpose of facilitating load reporting in the pre-READY
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no load reporting pre-READY. But an LB API could return the subchannel even before it is ready, and PF does this. We generally won't end up reporting load for such picks because the channel will ignore the selected (not-ready) subchannel, but we needed to handle the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, updated the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you miss pushing the update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, we both are able to see the updated comment.

// state of the subchannel.
ClusterLocality clusterLocality = createClusterLocalityFromAttributes(
args.getAddresses().get(0).getAttributes());
AtomicReference<ClusterLocality> localityAtomicReference = new AtomicReference<>(
clusterLocality);
Attributes attrs = args.getAttributes().toBuilder()
.set(ATTR_CLUSTER_LOCALITY_STATS, localityStats)
.set(ATTR_CLUSTER_LOCALITY_NAME, localityName)
.set(ATTR_CLUSTER_LOCALITY, localityAtomicReference)
.build();
args = args.toBuilder().setAddresses(addresses).setAttributes(attrs).build();
final Subchannel subchannel = delegate().createSubchannel(args);

return new ForwardingSubchannel() {
@Override
public void start(SubchannelStateListener listener) {
delegate().start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
if (newState.getState().equals(ConnectivityState.READY)) {
// Get locality based on the connected address attributes
ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes(
subchannel.getConnectedAddressAttributes());
ClusterLocality oldClusterLocality = localityAtomicReference
.getAndSet(updatedClusterLocality);
oldClusterLocality.release();
}
listener.onSubchannelState(newState);
}
});
}

@Override
public void shutdown() {
if (localityStats != null) {
localityStats.release();
}
localityAtomicReference.get().release();
delegate().shutdown();
}

Expand Down Expand Up @@ -274,6 +281,28 @@
return newAddresses;
}

private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) {
Locality locality = addressAttributes.get(InternalXdsAttributes.ATTR_LOCALITY);
String localityName = addressAttributes.get(InternalXdsAttributes.ATTR_LOCALITY_NAME);

// Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
// attributes with its locality, including endpoints in LOGICAL_DNS clusters.
// In case of not (which really shouldn't), loads are aggregated under an empty
// locality.
if (locality == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be, but is possible that locality is set but locality name is null. You should probably have an else clause that does a null check on localityName.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is existing code, and it seems in general code assumes the values to be non-null. I'd much rather we handle that centrally in io.grpc.xds.client.Locality.create() than lots of random not-possible-to-trigger checks. The data is primarily coming from a proto, so it will be "" when unset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting to move the locality name null check to io.grpc.xds.client.Locality.create()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see now Larry's comment wasn't about the Locality struct, but instead the attribute. That had been discussed when the code was originally introduced:
#11133 (comment)

This PR is likely to be reverted (in some way) later, once the old PF policy goes away. So changes to the existing code are likely to be lost.

locality = Locality.create("", "", "");
localityName = "";

Check warning on line 294 in xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java#L293-L294

Added lines #L293 - L294 were not covered by tests
}

final ClusterLocalityStats localityStats =
(lrsServerInfo == null)
? null
: xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
edsServiceName, locality);

return new ClusterLocality(localityStats, localityName);
}

@Override
protected Helper delegate() {
return helper;
Expand Down Expand Up @@ -362,10 +391,12 @@
}
}
final ClusterLocalityStats stats =
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_STATS);
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get()
.getClusterLocalityStats();
if (stats != null) {
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
String localityName =
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_NAME);
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get()
.getClusterLocalityName();
args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);

ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
Expand Down Expand Up @@ -447,4 +478,33 @@
stats.recordBackendLoadMetricStats(report.getNamedMetrics());
}
}

/**
* Represents the {@link ClusterLocalityStats} and network locality name of a cluster.
*/
static final class ClusterLocality {
private final ClusterLocalityStats clusterLocalityStats;
private final String clusterLocalityName;

@VisibleForTesting
ClusterLocality(ClusterLocalityStats localityStats, String localityName) {
this.clusterLocalityStats = localityStats;
this.clusterLocalityName = localityName;
}

ClusterLocalityStats getClusterLocalityStats() {
return clusterLocalityStats;
}

String getClusterLocalityName() {
return clusterLocalityName;
}

@VisibleForTesting
void release() {
if (clusterLocalityStats != null) {
clusterLocalityStats.release();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private synchronized void releaseClusterDropCounter(
String cluster, @Nullable String edsServiceName) {
checkState(allDropStats.containsKey(cluster)
&& allDropStats.get(cluster).containsKey(edsServiceName),
"stats for cluster %s, edsServiceName %s not exits", cluster, edsServiceName);
"stats for cluster %s, edsServiceName %s do not exist", cluster, edsServiceName);
ReferenceCounted<ClusterDropStats> ref = allDropStats.get(cluster).get(edsServiceName);
ref.release();
}
Expand Down
Loading