Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PDClient: Add function to call pause checker API #277

Merged
merged 12 commits into from
Sep 28, 2021
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@
<version>3.9</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/org/tikv/common/PDChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.tikv.common;

public enum PDChecker {
Learner,
Replica,
Rule,
Split,
Merge,
JointState,
Priority;

public String apiName() {
switch (this) {
case Learner:
return "learner";
case Replica:
return "replica";
case Rule:
return "rule";
case Split:
return "split";
case Merge:
return "merge";
case JointState:
return "joint-state";
case Priority:
return "priority";
}
throw new IllegalArgumentException();
}
}
77 changes: 77 additions & 0 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import static org.tikv.common.pd.PDUtils.addrToUri;
import static org.tikv.common.pd.PDUtils.uriToAddr;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
Expand All @@ -34,7 +37,9 @@
import io.grpc.stub.MetadataUtils;
import io.prometheus.client.Histogram;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -46,6 +51,11 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.codec.Codec.BytesCodec;
Expand Down Expand Up @@ -93,12 +103,15 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
implements ReadOnlyPDClient {
private static final String TIFLASH_TABLE_SYNC_PROGRESS_PATH = "/tiflash/table/sync";
private static final long MIN_TRY_UPDATE_DURATION = 50;
private static final int PAUSE_CHECKER_TIMEOUT = 300; // in seconds
private static final int KEEP_CHECKER_PAUSE_PERIOD = PAUSE_CHECKER_TIMEOUT / 5; // in seconds
private final Logger logger = LoggerFactory.getLogger(PDClient.class);
private RequestHeader header;
private TsoRequest tsoReq;
private volatile PDClientWrapper pdClientWrapper;
private ScheduledExecutorService service;
private ScheduledExecutorService tiflashReplicaService;
private final HashMap<PDChecker, ScheduledExecutorService> pauseCheckerService = new HashMap<>();
private List<URI> pdAddrs;
private Client etcdClient;
private ConcurrentMap<Long, Double> tiflashReplicaMap;
Expand Down Expand Up @@ -144,6 +157,70 @@ public TiTimestamp getTimestamp(BackOffer backOffer) {
return new TiTimestamp(timestamp.getPhysical(), timestamp.getLogical());
}

public synchronized void keepPauseChecker(PDChecker checker) {
if (!this.pauseCheckerService.containsKey(checker)) {
ScheduledExecutorService newService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat(String.format("PDClient-pause-%s-pool-%%d", checker.name()))
.setDaemon(true)
.build());
newService.scheduleAtFixedRate(
() -> pauseChecker(checker, PAUSE_CHECKER_TIMEOUT),
0,
KEEP_CHECKER_PAUSE_PERIOD,
TimeUnit.SECONDS);
this.pauseCheckerService.put(checker, newService);
}
}

public synchronized void stopKeepPauseChecker(PDChecker checker) {
if (this.pauseCheckerService.containsKey(checker)) {
this.pauseCheckerService.get(checker).shutdown();
this.pauseCheckerService.remove(checker);
}
}

public void resumeChecker(PDChecker checker) {
pauseChecker(checker, 0);
}

private void pauseChecker(PDChecker checker, int timeout) {
String verb = timeout == 0 ? "resume" : "pause";
URI url = pdAddrs.get(0);
String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName();
HashMap<String, Integer> arguments = new HashMap<>();
arguments.put("delay", timeout);
try (CloseableHttpClient client = HttpClients.createDefault()) {
JsonMapper jsonMapper = new JsonMapper();
byte[] body = jsonMapper.writeValueAsBytes(arguments);
HttpPost post = new HttpPost(api);
post.setEntity(new ByteArrayEntity(body));
try (CloseableHttpResponse resp = client.execute(post)) {
if (resp.getStatusLine().getStatusCode() != 200) {
logger.error("failed to {} checker.", verb);
}
logger.info("checker {} {}d", checker.apiName(), verb);
}
} catch (Exception e) {
logger.error(String.format("failed to %s checker.", verb), e);
}
}

public Boolean isCheckerPaused(PDChecker checker) {
URI url = pdAddrs.get(0);
String api = url.toString() + "/pd/api/v1/checker/" + checker.apiName();
try {
ObjectMapper mapper = new ObjectMapper();
HashMap<String, Boolean> status =
mapper.readValue(new URL(api), new TypeReference<HashMap<String, Boolean>>() {});
return status.get("paused");
} catch (Exception e) {
logger.error(String.format("failed to get %s checker status.", checker.apiName()), e);
return null;
}
}

/**
* Sends request to pd to scatter region.
*
Expand Down
44 changes: 44 additions & 0 deletions src/test/java/org/tikv/common/PDClientIntegrationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.tikv.common;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class PDClientIntegrationTest {
private TiSession session;

@Before
public void setup() {
TiConfiguration conf = TiConfiguration.createRawDefault();
conf.setTest(true);
session = TiSession.create(conf);
}

@After
public void tearDown() throws Exception {
if (session != null) {
session.close();
}
}

@Test
public void testPauseCheck() throws Exception {
try (PDClient client = session.getPDClient()) {
PDChecker checker = PDChecker.Merge;
for (int i = 0; i < 2; i++) {
client.keepPauseChecker(checker);
Thread.sleep(1000);
assertTrue(client.isCheckerPaused(checker));

client.stopKeepPauseChecker(checker);
Thread.sleep(1000);

client.resumeChecker(checker);
assertFalse(client.isCheckerPaused(checker));
}
}
}
}