From 6dac45802e400272c43129dd4ff3241a27232637 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 27 Jul 2022 21:03:07 +0800 Subject: [PATCH] cherry pick #636 to release-3.3 Signed-off-by: ti-srebot --- .../org/tikv/common/AbstractGRPCClient.java | 4 +- src/main/java/org/tikv/common/PDClient.java | 31 ++++++---- .../tikv/common/operation/PDErrorHandler.java | 4 +- .../org/tikv/common/PDMockServerTest.java | 11 ++++ .../java/org/tikv/common/TimeoutTest.java | 60 +++++++++++++++++++ 5 files changed, 94 insertions(+), 16 deletions(-) create mode 100644 src/test/java/org/tikv/common/TimeoutTest.java diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index 616bcfbaebe..d2675f94385 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -40,7 +40,6 @@ import org.tikv.common.util.BackOffFunction.BackOffFuncType; import org.tikv.common.util.BackOffer; import org.tikv.common.util.ChannelFactory; -import org.tikv.common.util.ConcreteBackOffer; public abstract class AbstractGRPCClient< BlockingStubT extends AbstractStub, @@ -198,8 +197,7 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin } } - protected boolean checkHealth(String addressStr, HostMapping hostMapping) { - BackOffer backOffer = ConcreteBackOffer.newCustomBackOff((int) (timeout * 2)); + protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) { try { return doCheckHealth(backOffer, addressStr, hostMapping); } catch (Exception e) { diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index cfafbff4550..695a1565b1c 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -440,6 +440,8 @@ PDClientWrapper getPdClientWrapper() { private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) { while (true) { + backOffer.checkTimeout(); + try { ManagedChannel probChan = channelFactory.getChannel(uriToAddr(uri), hostMapping); PDGrpc.PDBlockingStub stub = @@ -459,8 +461,7 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) { } } - private GetMembersResponse getMembers(URI uri) { - BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); + private GetMembersResponse getMembers(BackOffer backOffer, URI uri) { try { return doGetMembers(backOffer, uri); } catch (Exception e) { @@ -497,11 +498,12 @@ private synchronized boolean createLeaderClientWrapper(String leaderUrlStr) { return true; } - synchronized boolean createFollowerClientWrapper(String followerUrlStr, String leaderUrls) { + synchronized boolean createFollowerClientWrapper( + BackOffer backOffer, String followerUrlStr, String leaderUrls) { // TODO: Why not strip protocol info on server side since grpc does not need it try { - if (!checkHealth(followerUrlStr, hostMapping)) { + if (!checkHealth(backOffer, followerUrlStr, hostMapping)) { return false; } @@ -516,13 +518,13 @@ synchronized boolean createFollowerClientWrapper(String followerUrlStr, String l return true; } - public synchronized void updateLeaderOrForwardFollower() { + public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) { return; } for (URI url : this.pdAddrs) { // since resp is null, we need update leader's address by walking through all pd server. - GetMembersResponse resp = getMembers(url); + GetMembersResponse resp = getMembers(backOffer, url); if (resp == null) { continue; } @@ -534,7 +536,8 @@ public synchronized void updateLeaderOrForwardFollower() { leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); // if leader is switched, just return. - if (checkHealth(leaderUrlStr, hostMapping) && createLeaderClientWrapper(leaderUrlStr)) { + if (checkHealth(backOffer, leaderUrlStr, hostMapping) + && createLeaderClientWrapper(leaderUrlStr)) { lastUpdateLeaderTime = System.currentTimeMillis(); return; } @@ -561,7 +564,8 @@ public synchronized void updateLeaderOrForwardFollower() { hasReachNextMember = true; continue; } - if (hasReachNextMember && createFollowerClientWrapper(followerUrlStr, leaderUrlStr)) { + if (hasReachNextMember + && createFollowerClientWrapper(backOffer, followerUrlStr, leaderUrlStr)) { logger.warn( String.format("forward request to pd [%s] by pd [%s]", leaderUrlStr, followerUrlStr)); return; @@ -577,8 +581,9 @@ public synchronized void updateLeaderOrForwardFollower() { public void tryUpdateLeader() { for (URI url : this.pdAddrs) { + BackOffer backOffer = defaultBackOffer(); // since resp is null, we need update leader's address by walking through all pd server. - GetMembersResponse resp = getMembers(url); + GetMembersResponse resp = getMembers(backOffer, url); if (resp == null) { continue; } @@ -591,7 +596,7 @@ public void tryUpdateLeader() { leaderUrlStr = uriToAddr(addrToUri(leaderUrlStr)); // If leader is not change but becomes available, we can cancel follower forward. - if (checkHealth(leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { + if (checkHealth(backOffer, leaderUrlStr, hostMapping) && trySwitchLeader(leaderUrlStr)) { if (!urls.equals(this.pdAddrs)) { tryUpdateMembers(urls); } @@ -705,7 +710,7 @@ private void initCluster() { this.timeout = conf.getPdFirstGetMemberTimeout(); for (URI u : pdAddrs) { logger.info("get members with pd " + u + ": start"); - resp = getMembers(u); + resp = getMembers(defaultBackOffer(), u); logger.info("get members with pd " + u + ": end"); if (resp != null) { break; @@ -825,4 +830,8 @@ public List getPdAddrs() { public RequestKeyCodec getCodec() { return codec; } + + private static BackOffer defaultBackOffer() { + return ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); + } } diff --git a/src/main/java/org/tikv/common/operation/PDErrorHandler.java b/src/main/java/org/tikv/common/operation/PDErrorHandler.java index df403789503..4f9cc7fbabb 100644 --- a/src/main/java/org/tikv/common/operation/PDErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/PDErrorHandler.java @@ -59,7 +59,7 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { case PD_ERROR: backOffer.doBackOff( BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString())); - client.updateLeaderOrForwardFollower(); + client.updateLeaderOrForwardFollower(backOffer); return true; case REGION_PEER_NOT_ELECTED: logger.debug(error.getMessage()); @@ -80,7 +80,7 @@ public boolean handleRequestError(BackOffer backOffer, Exception e) { return false; } backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e); - client.updateLeaderOrForwardFollower(); + client.updateLeaderOrForwardFollower(backOffer); return true; } } diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index 08018d0d495..edd3a19f444 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.function.Function; import org.junit.After; import org.junit.Before; import org.tikv.common.TiConfiguration.ApiVersion; @@ -51,6 +52,16 @@ void upgradeToV2Cluster() throws Exception { session = TiSession.create(conf); } + void updateConf(Function update) throws Exception { + if (session == null) { + throw new IllegalStateException("Cluster is not initialized"); + } + + session.close(); + + session = TiSession.create(update.apply(session.getConf())); + } + void setup(String addr) throws IOException { int[] ports = new int[3]; for (int i = 0; i < ports.length; i++) { diff --git a/src/test/java/org/tikv/common/TimeoutTest.java b/src/test/java/org/tikv/common/TimeoutTest.java new file mode 100644 index 00000000000..e0f3d01d1ce --- /dev/null +++ b/src/test/java/org/tikv/common/TimeoutTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2022 TiKV Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common; + +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.tikv.raw.RawKVClient; + +public class TimeoutTest extends MockThreeStoresTest { + @Before + public void init() throws Exception { + updateConf( + conf -> { + conf.setEnableAtomicForCAS(true); + conf.setTimeout(150); + conf.setForwardTimeout(200); + conf.setRawKVReadTimeoutInMS(400); + conf.setRawKVWriteTimeoutInMS(400); + conf.setRawKVBatchReadTimeoutInMS(400); + conf.setRawKVBatchWriteTimeoutInMS(400); + conf.setRawKVWriteSlowLogInMS(50); + conf.setRawKVReadSlowLogInMS(50); + conf.setRawKVBatchReadSlowLogInMS(50); + conf.setRawKVBatchWriteSlowLogInMS(50); + return conf; + }); + } + + private RawKVClient createClient() { + return session.createRawClient(); + } + + @Test + public void testTimeoutInTime() { + try (RawKVClient client = createClient()) { + pdServers.get(0).stop(); + long start = System.currentTimeMillis(); + client.get(ByteString.copyFromUtf8("key")); + long end = System.currentTimeMillis(); + Assert.assertTrue(end - start < session.getConf().getRawKVReadTimeoutInMS() * 2L); + } + } +}