Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor RawKVClient CAS API #213

Merged
merged 2 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,9 @@ The following includes ThreadPool related parameters, which can be passed in thr
- the thread pool size of deleteRange on client side
- default: 20

#### tikv.enable_atomic_for_cas
- whether to enable `Compare And Set`, set true if using `RawKVClient.compareAndSet` or `RawKVClient.putIfAbsent`
- default: false

## License
Apache 2.0 license. See the [LICENSE](./LICENSE) file for details.
3 changes: 3 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class ConfigUtils {
public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward";
public static final String TIKV_GRPC_HEALTH_CHECK_TIMEOUT = "tikv.grpc.health_check_timeout";

public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas";

public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "600ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
Expand Down Expand Up @@ -80,6 +82,7 @@ public class ConfigUtils {
public static final int DEF_METRICS_PORT = 3140;
public static final String DEF_TIKV_NETWORK_MAPPING_NAME = "";
public static final boolean DEF_GRPC_FORWARD_ENABLE = true;
public static final boolean DEF_TIKV_ENABLE_ATOMIC_FOR_CAS = false;

public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE);
setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT);
setIfMissing(TIKV_ENABLE_ATOMIC_FOR_CAS, DEF_TIKV_ENABLE_ATOMIC_FOR_CAS);
}

public static void listAll() {
Expand Down Expand Up @@ -266,6 +267,8 @@ private static ReplicaRead getReplicaRead(String key) {
private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);
private HostMapping hostMapping = null;

private boolean enableAtomicForCAS = getBoolean(TIKV_ENABLE_ATOMIC_FOR_CAS);

public enum KVMode {
TXN,
RAW
Expand Down Expand Up @@ -558,4 +561,12 @@ public boolean getEnableGrpcForward() {
public long getGrpcHealthCheckTimeout() {
return this.grpcHealthCheckTimeout;
}

public boolean isEnableAtomicForCAS() {
return enableAtomicForCAS;
}

public void setEnableAtomicForCAS(boolean enableAtomicForCAS) {
this.enableAtomicForCAS = enableAtomicForCAS;
}
}
20 changes: 12 additions & 8 deletions src/main/java/org/tikv/common/region/RegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ private Optional<Long> rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) {
return Optional.of(resp.getTtl());
}

public void rawDelete(BackOffer backOffer, ByteString key) {
public void rawDelete(BackOffer backOffer, ByteString key, boolean atomicForCAS) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete").startTimer();
try {
Expand All @@ -874,6 +874,7 @@ public void rawDelete(BackOffer backOffer, ByteString key) {
RawDeleteRequest.newBuilder()
.setContext(region.getReplicaContext(storeType))
.setKey(key)
.setForCas(atomicForCAS)
.build();

RegionErrorHandler<RawDeleteResponse> handler =
Expand Down Expand Up @@ -901,7 +902,8 @@ private void rawDeleteHelper(RawDeleteResponse resp, TiRegion region) {
}
}

public void rawPut(BackOffer backOffer, ByteString key, ByteString value, long ttl) {
public void rawPut(
BackOffer backOffer, ByteString key, ByteString value, long ttl, boolean atomicForCAS) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put").startTimer();
try {
Expand All @@ -912,6 +914,7 @@ public void rawPut(BackOffer backOffer, ByteString key, ByteString value, long t
.setKey(key)
.setValue(value)
.setTtl(ttl)
.setForCas(atomicForCAS)
.build();

RegionErrorHandler<RawPutResponse> handler =
Expand Down Expand Up @@ -1029,7 +1032,8 @@ private List<KvPair> handleRawBatchGet(RawBatchGetResponse resp) {
return resp.getPairsList();
}

public void rawBatchPut(BackOffer backOffer, List<KvPair> kvPairs, long ttl, boolean atomic) {
public void rawBatchPut(
BackOffer backOffer, List<KvPair> kvPairs, long ttl, boolean atomicForCAS) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_put").startTimer();
try {
Expand All @@ -1042,7 +1046,7 @@ public void rawBatchPut(BackOffer backOffer, List<KvPair> kvPairs, long ttl, boo
.setContext(region.getReplicaContext(storeType))
.addAllPairs(kvPairs)
.setTtl(ttl)
.setForCas(atomic)
.setForCas(atomicForCAS)
.build();
RegionErrorHandler<RawBatchPutResponse> handler =
new RegionErrorHandler<RawBatchPutResponse>(
Expand All @@ -1055,7 +1059,7 @@ public void rawBatchPut(BackOffer backOffer, List<KvPair> kvPairs, long ttl, boo
}
}

public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl, boolean atomic) {
public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl, boolean atomicForCAS) {
List<KvPair> pairs = new ArrayList<>();
for (int i = 0; i < batch.getKeys().size(); i++) {
pairs.add(
Expand All @@ -1064,7 +1068,7 @@ public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl, boolean atom
.setValue(batch.getValues().get(i))
.build());
}
rawBatchPut(backOffer, pairs, ttl, atomic);
rawBatchPut(backOffer, pairs, ttl, atomicForCAS);
}

private void handleRawBatchPut(RawBatchPutResponse resp) {
Expand All @@ -1081,7 +1085,7 @@ private void handleRawBatchPut(RawBatchPutResponse resp) {
}
}

public void rawBatchDelete(BackOffer backoffer, List<ByteString> keys, boolean atomic) {
public void rawBatchDelete(BackOffer backoffer, List<ByteString> keys, boolean atomicForCAS) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_delete").startTimer();
try {
Expand All @@ -1093,7 +1097,7 @@ public void rawBatchDelete(BackOffer backoffer, List<ByteString> keys, boolean a
RawBatchDeleteRequest.newBuilder()
.setContext(region.getReplicaContext(storeType))
.addAllKeys(keys)
.setForCas(atomic)
.setForCas(atomicForCAS)
.build();
RegionErrorHandler<RawBatchDeleteResponse> handler =
new RegionErrorHandler<RawBatchDeleteResponse>(
Expand Down
82 changes: 29 additions & 53 deletions src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
public class RawKVClient implements AutoCloseable {
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
private final boolean atomicForCAS;
private final ExecutorService batchGetThreadPool;
private final ExecutorService batchPutThreadPool;
private final ExecutorService batchDeleteThreadPool;
Expand Down Expand Up @@ -90,6 +91,7 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
this.batchDeleteThreadPool = session.getThreadPoolForBatchDelete();
this.batchScanThreadPool = session.getThreadPoolForBatchScan();
this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange();
this.atomicForCAS = conf.isEnableAtomicForCAS();
}

@Override
Expand Down Expand Up @@ -120,7 +122,7 @@ public void put(ByteString key, ByteString value, long ttl) {
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
client.rawPut(backOffer, key, value, ttl);
client.rawPut(backOffer, key, value, ttl, atomicForCAS);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (final TiKVException e) {
Expand All @@ -138,6 +140,8 @@ public void put(ByteString key, ByteString value, long ttl) {
/**
* Put a key-value pair if it does not exist. This API is atomic.
*
* <p>To use this API, please enable `tikv.enable_atomic_for_cas`.
*
* @param key key
* @param value value
* @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the
Expand All @@ -150,6 +154,8 @@ public Optional<ByteString> putIfAbsent(ByteString key, ByteString value) {
/**
* Put a key-value pair with TTL if it does not exist. This API is atomic.
*
* <p>To use this API, please enable `tikv.enable_atomic_for_cas`.
*
* @param key key
* @param value value
* @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
Expand All @@ -168,6 +174,8 @@ public Optional<ByteString> putIfAbsent(ByteString key, ByteString value, long t
/**
* Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic.
*
* <p>To use this API, please enable `tikv.enable_atomic_for_cas`.
*
* @param key key
* @param value value
*/
Expand All @@ -179,13 +187,20 @@ public void compareAndSet(ByteString key, Optional<ByteString> prevValue, ByteSt
/**
* pair if the prevValue matched the value in TiKV. This API is atomic.
*
* <p>To use this API, please enable `tikv.enable_atomic_for_cas`.
*
* @param key key
* @param value value
* @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
*/
public void compareAndSet(
ByteString key, Optional<ByteString> prevValue, ByteString value, long ttl)
throws RawCASConflictException {
if (!atomicForCAS) {
throw new IllegalArgumentException(
"To use compareAndSet or putIfAbsent, please enable the config tikv.enable_atomic_for_cas.");
}

String label = "client_raw_compare_and_set";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
Expand All @@ -209,7 +224,7 @@ public void compareAndSet(
}

/**
* Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic.
* Put a set of raw key-value pair to TiKV.
*
* @param kvPairs kvPairs
*/
Expand All @@ -218,39 +233,16 @@ public void batchPut(Map<ByteString, ByteString> kvPairs) {
}

/**
* Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic.
* Put a set of raw key-value pair to TiKV.
*
* @param kvPairs kvPairs
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
*/
public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
batchPut(kvPairs, ttl, false);
}

/**
* Put a set of raw key-value pair to TiKV, this API is atomic
*
* @param kvPairs kvPairs
*/
public void batchPutAtomic(Map<ByteString, ByteString> kvPairs) {
batchPutAtomic(kvPairs, 0);
}

/**
* Put a set of raw key-value pair to TiKV, this API is atomic.
*
* @param kvPairs kvPairs
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
*/
public void batchPutAtomic(Map<ByteString, ByteString> kvPairs, long ttl) {
batchPut(kvPairs, ttl, true);
}

private void batchPut(Map<ByteString, ByteString> kvPairs, long ttl, boolean atomic) {
String label = "client_raw_batch_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl, atomic);
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
Expand Down Expand Up @@ -317,24 +309,11 @@ public List<KvPair> batchGet(List<ByteString> keys) {
* @param keys list of raw key
*/
public void batchDelete(List<ByteString> keys) {
batchDelete(keys, false);
}

/**
* Delete a list of raw key-value pair from TiKV if key exists, this API is atomic
*
* @param keys list of raw key
*/
public void batchDeleteAtomic(List<ByteString> keys) {
batchDelete(keys, true);
}

private void batchDelete(List<ByteString> keys, boolean atomic) {
String label = "client_raw_batch_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
doSendBatchDelete(backOffer, keys, atomic);
doSendBatchDelete(backOffer, keys);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (Exception e) {
Expand Down Expand Up @@ -608,7 +587,7 @@ public void delete(ByteString key) {
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
client.rawDelete(defaultBackOff(), key);
client.rawDelete(defaultBackOff(), key, atomicForCAS);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (final TiKVException e) {
Expand Down Expand Up @@ -660,8 +639,7 @@ public synchronized void deletePrefix(ByteString key) {
deleteRange(key, endKey);
}

private void doSendBatchPut(
BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl, boolean atomic) {
private void doSendBatchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl) {
ExecutorCompletionService<List<Batch>> completionService =
new ExecutorCompletionService<>(batchPutThreadPool);

Expand All @@ -686,16 +664,15 @@ private void doSendBatchPut(
List<Batch> task = taskQueue.poll();
for (Batch batch : task) {
completionService.submit(
() -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl, atomic));
() -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl));
}
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
}

private List<Batch> doSendBatchPutInBatchesWithRetry(
BackOffer backOffer, Batch batch, long ttl, boolean atomic) {
private List<Batch> doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) {
try (RegionStoreClient client = clientBuilder.build(batch.getRegion())) {
client.rawBatchPut(backOffer, batch, ttl, atomic);
client.rawBatchPut(backOffer, batch, ttl, atomicForCAS);
return new ArrayList<>();
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
Expand Down Expand Up @@ -770,7 +747,7 @@ private List<Batch> doSendBatchGetWithRefetchRegion(BackOffer backOffer, Batch b
backOffer, batch.getKeys(), RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
}

private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys, boolean atomic) {
private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys) {
ExecutorCompletionService<List<Batch>> completionService =
new ExecutorCompletionService<>(batchDeleteThreadPool);

Expand All @@ -784,17 +761,16 @@ private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys, boole
List<Batch> task = taskQueue.poll();
for (Batch batch : task) {
completionService.submit(
() -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch, atomic));
() -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch));
}
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
}

private List<Batch> doSendBatchDeleteInBatchesWithRetry(
BackOffer backOffer, Batch batch, boolean atomic) {
private List<Batch> doSendBatchDeleteInBatchesWithRetry(BackOffer backOffer, Batch batch) {
RegionStoreClient client = clientBuilder.build(batch.getRegion());
try {
client.rawBatchDelete(backOffer, batch.getKeys(), atomic);
client.rawBatchDelete(backOffer, batch.getKeys(), atomicForCAS);
return new ArrayList<>();
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
Expand Down
Loading