diff --git a/pom.xml b/pom.xml
index 09b8374f63b..964c1bf2543 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,11 @@
3.9
compile
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.13
+
io.prometheus
simpleclient
diff --git a/src/main/java/org/tikv/common/PDChecker.java b/src/main/java/org/tikv/common/PDChecker.java
new file mode 100644
index 00000000000..a64da7100d0
--- /dev/null
+++ b/src/main/java/org/tikv/common/PDChecker.java
@@ -0,0 +1,31 @@
+package org.tikv.common;
+
+public enum PDChecker {
+ Learner,
+ Replica,
+ Rule,
+ Split,
+ Merge,
+ JointState,
+ Priority;
+
+ public String apiName() {
+ switch (this) {
+ case Learner:
+ return "learner";
+ case Replica:
+ return "replica";
+ case Rule:
+ return "rule";
+ case Split:
+ return "split";
+ case Merge:
+ return "merge";
+ case JointState:
+ return "joint-state";
+ case Priority:
+ return "priority";
+ }
+ throw new IllegalArgumentException();
+ }
+}
diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java
index 0c0b0180fe3..d86363f3c66 100644
--- a/src/main/java/org/tikv/common/PDClient.java
+++ b/src/main/java/org/tikv/common/PDClient.java
@@ -21,6 +21,9 @@
import static org.tikv.common.pd.PDUtils.addrToUri;
import static org.tikv.common.pd.PDUtils.uriToAddr;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
@@ -34,7 +37,9 @@
import io.grpc.stub.MetadataUtils;
import io.prometheus.client.Histogram;
import java.net.URI;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -46,6 +51,11 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.codec.Codec.BytesCodec;
@@ -93,12 +103,15 @@ public class PDClient extends AbstractGRPCClient
implements ReadOnlyPDClient {
private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync";
private static final long MIN_TRY_UPDATE_DURATION = 50;
+ private static final int PAUSE_CHECKER_TIMEOUT = 300; // in seconds
+ private static final int KEEP_CHECKER_PAUSE_PERIOD = PAUSE_CHECKER_TIMEOUT / 5; // in seconds
private final Logger logger = LoggerFactory.getLogger(PDClient.class);
private RequestHeader header;
private TsoRequest tsoReq;
private volatile PDClientWrapper pdClientWrapper;
private ScheduledExecutorService service;
private ScheduledExecutorService tiflashReplicaService;
+ private final HashMap pauseCheckerService = new HashMap<>();
private List pdAddrs;
private Client etcdClient;
private ConcurrentMap tiflashReplicaMap;
@@ -144,6 +157,70 @@ public TiTimestamp getTimestamp(BackOffer backOffer) {
return new TiTimestamp(timestamp.getPhysical(), timestamp.getLogical());
}
+ public synchronized void keepPauseChecker(PDChecker checker) {
+ if (!this.pauseCheckerService.containsKey(checker)) {
+ ScheduledExecutorService newService =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat(String.format("PDClient-pause-%s-pool-%%d", checker.name()))
+ .setDaemon(true)
+ .build());
+ newService.scheduleAtFixedRate(
+ () -> pauseChecker(checker, PAUSE_CHECKER_TIMEOUT),
+ 0,
+ KEEP_CHECKER_PAUSE_PERIOD,
+ TimeUnit.SECONDS);
+ this.pauseCheckerService.put(checker, newService);
+ }
+ }
+
+ public synchronized void stopKeepPauseChecker(PDChecker checker) {
+ if (this.pauseCheckerService.containsKey(checker)) {
+ this.pauseCheckerService.get(checker).shutdown();
+ this.pauseCheckerService.remove(checker);
+ }
+ }
+
+ public void resumeChecker(PDChecker checker) {
+ pauseChecker(checker, 0);
+ }
+
+ private void pauseChecker(PDChecker checker, int timeout) {
+ String verb = timeout == 0 ? "resume" : "pause";
+ URI url = pdAddrs.get(0);
+ String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName();
+ HashMap arguments = new HashMap<>();
+ arguments.put("delay", timeout);
+ try (CloseableHttpClient client = HttpClients.createDefault()) {
+ JsonMapper jsonMapper = new JsonMapper();
+ byte[] body = jsonMapper.writeValueAsBytes(arguments);
+ HttpPost post = new HttpPost(api);
+ post.setEntity(new ByteArrayEntity(body));
+ try (CloseableHttpResponse resp = client.execute(post)) {
+ if (resp.getStatusLine().getStatusCode() != 200) {
+ logger.error("failed to {} checker.", verb);
+ }
+ logger.info("checker {} {}d", checker.apiName(), verb);
+ }
+ } catch (Exception e) {
+ logger.error(String.format("failed to %s checker.", verb), e);
+ }
+ }
+
+ public Boolean isCheckerPaused(PDChecker checker) {
+ URI url = pdAddrs.get(0);
+ String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName();
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ HashMap status =
+ mapper.readValue(new URL(api), new TypeReference>() {});
+ return status.get("paused");
+ } catch (Exception e) {
+ logger.error(String.format("failed to get %s checker status.", checker.apiName()), e);
+ return null;
+ }
+ }
+
/**
* Sends request to pd to scatter region.
*
diff --git a/src/test/java/org/tikv/common/PDClientIntegrationTest.java b/src/test/java/org/tikv/common/PDClientIntegrationTest.java
new file mode 100644
index 00000000000..e76ab08cb5d
--- /dev/null
+++ b/src/test/java/org/tikv/common/PDClientIntegrationTest.java
@@ -0,0 +1,44 @@
+package org.tikv.common;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PDClientIntegrationTest {
+ private TiSession session;
+
+ @Before
+ public void setup() {
+ TiConfiguration conf = TiConfiguration.createRawDefault();
+ conf.setTest(true);
+ session = TiSession.create(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (session != null) {
+ session.close();
+ }
+ }
+
+ @Test
+ public void testPauseCheck() throws Exception {
+ try (PDClient client = session.getPDClient()) {
+ PDChecker checker = PDChecker.Merge;
+ for (int i = 0; i < 2; i++) {
+ client.keepPauseChecker(checker);
+ Thread.sleep(1000);
+ assertTrue(client.isCheckerPaused(checker));
+
+ client.stopKeepPauseChecker(checker);
+ Thread.sleep(1000);
+
+ client.resumeChecker(checker);
+ assertFalse(client.isCheckerPaused(checker));
+ }
+ }
+ }
+}