Skip to content

Commit

Permalink
Refactor RawKVClient CAS API
Browse files Browse the repository at this point in the history
Signed-off-by: marsishandsome <marsishandsome@gmail.com>
  • Loading branch information
marsishandsome committed Jun 23, 2021
1 parent fe6617e commit 3ecb012
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 55 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,9 @@ The following includes ThreadPool related parameters, which can be passed in thr
- the thread pool size of deleteRange on client side
- default: 20

#### tikv.enable_atomic_for_cas
- whether to enable Compare And Swap, set true if using `RawKVClient.compareAndSet` or `RawKVClient.putIfAbsent`
- default: false

## License
Apache 2.0 license. See the [LICENSE](./LICENSE) file for details.
3 changes: 3 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class ConfigUtils {
public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward";
public static final String TIKV_GRPC_HEALTH_CHECK_TIMEOUT = "tikv.grpc.health_check_timeout";

public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas";

public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "600ms";
public static final String DEF_SCAN_TIMEOUT = "20s";
Expand Down Expand Up @@ -80,6 +82,7 @@ public class ConfigUtils {
public static final int DEF_METRICS_PORT = 3140;
public static final String DEF_TIKV_NETWORK_MAPPING_NAME = "";
public static final boolean DEF_GRPC_FORWARD_ENABLE = true;
public static final boolean DEF_TIKV_ENABLE_ATOMIC_FOR_CAS = false;

public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME);
setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE);
setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT);
setIfMissing(TIKV_ENABLE_ATOMIC_FOR_CAS, DEF_TIKV_ENABLE_ATOMIC_FOR_CAS);
}

public static void listAll() {
Expand Down Expand Up @@ -266,6 +267,8 @@ private static ReplicaRead getReplicaRead(String key) {
private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);
private HostMapping hostMapping = null;

private boolean enableAtomicForCAS = getBoolean(TIKV_ENABLE_ATOMIC_FOR_CAS);

public enum KVMode {
TXN,
RAW
Expand Down Expand Up @@ -558,4 +561,12 @@ public boolean getEnableGrpcForward() {
public long getGrpcHealthCheckTimeout() {
return this.grpcHealthCheckTimeout;
}

public boolean isEnableAtomicForCAS() {
return enableAtomicForCAS;
}

public void setEnableAtomicForCAS(boolean enableAtomicForCAS) {
this.enableAtomicForCAS = enableAtomicForCAS;
}
}
7 changes: 5 additions & 2 deletions src/main/java/org/tikv/common/region/RegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ private Optional<Long> rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) {
return Optional.of(resp.getTtl());
}

public void rawDelete(BackOffer backOffer, ByteString key) {
public void rawDelete(BackOffer backOffer, ByteString key, boolean atomic) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete").startTimer();
try {
Expand All @@ -874,6 +874,7 @@ public void rawDelete(BackOffer backOffer, ByteString key) {
RawDeleteRequest.newBuilder()
.setContext(region.getReplicaContext(storeType))
.setKey(key)
.setForCas(atomic)
.build();

RegionErrorHandler<RawDeleteResponse> handler =
Expand Down Expand Up @@ -901,7 +902,8 @@ private void rawDeleteHelper(RawDeleteResponse resp, TiRegion region) {
}
}

public void rawPut(BackOffer backOffer, ByteString key, ByteString value, long ttl) {
public void rawPut(
BackOffer backOffer, ByteString key, ByteString value, long ttl, boolean atomic) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put").startTimer();
try {
Expand All @@ -912,6 +914,7 @@ public void rawPut(BackOffer backOffer, ByteString key, ByteString value, long t
.setKey(key)
.setValue(value)
.setTtl(ttl)
.setForCas(atomic)
.build();

RegionErrorHandler<RawPutResponse> handler =
Expand Down
82 changes: 29 additions & 53 deletions src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
public class RawKVClient implements AutoCloseable {
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
private final boolean atomicForCAS;
private final ExecutorService batchGetThreadPool;
private final ExecutorService batchPutThreadPool;
private final ExecutorService batchDeleteThreadPool;
Expand Down Expand Up @@ -90,6 +91,7 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) {
this.batchDeleteThreadPool = session.getThreadPoolForBatchDelete();
this.batchScanThreadPool = session.getThreadPoolForBatchScan();
this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange();
this.atomicForCAS = conf.isEnableAtomicForCAS();
}

@Override
Expand Down Expand Up @@ -120,7 +122,7 @@ public void put(ByteString key, ByteString value, long ttl) {
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
client.rawPut(backOffer, key, value, ttl);
client.rawPut(backOffer, key, value, ttl, atomicForCAS);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (final TiKVException e) {
Expand All @@ -138,6 +140,8 @@ public void put(ByteString key, ByteString value, long ttl) {
/**
* Put a key-value pair if it does not exist. This API is atomic.
*
* <p>To use this API, please enable `tikv.enable_atomic_for_cas`.
*
* @param key key
* @param value value
* @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the
Expand All @@ -150,6 +154,8 @@ public Optional<ByteString> putIfAbsent(ByteString key, ByteString value) {
/**
* Put a key-value pair with TTL if it does not exist. This API is atomic.
*
* <p>To use this API, please enable `tikv.enable_atomic_for_cas`.
*
* @param key key
* @param value value
* @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
Expand All @@ -168,6 +174,8 @@ public Optional<ByteString> putIfAbsent(ByteString key, ByteString value, long t
/**
* Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic.
*
* <p>To use this API, please enable `tikv.enable_atomic_for_cas`.
*
* @param key key
* @param value value
*/
Expand All @@ -179,13 +187,20 @@ public void compareAndSet(ByteString key, Optional<ByteString> prevValue, ByteSt
/**
* pair if the prevValue matched the value in TiKV. This API is atomic.
*
* <p>To use this API, please enable `tikv.enable_atomic_for_cas`.
*
* @param key key
* @param value value
* @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
*/
public void compareAndSet(
ByteString key, Optional<ByteString> prevValue, ByteString value, long ttl)
throws RawCASConflictException {
if (!atomicForCAS) {
throw new IllegalArgumentException(
"To use Compare And Swap, please enable the config tikv.enable_atomic_for_cas.");
}

String label = "client_raw_compare_and_set";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
Expand All @@ -209,7 +224,7 @@ public void compareAndSet(
}

/**
* Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic.
* Put a set of raw key-value pair to TiKV.
*
* @param kvPairs kvPairs
*/
Expand All @@ -218,39 +233,16 @@ public void batchPut(Map<ByteString, ByteString> kvPairs) {
}

/**
* Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic.
* Put a set of raw key-value pair to TiKV.
*
* @param kvPairs kvPairs
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
*/
public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
batchPut(kvPairs, ttl, false);
}

/**
* Put a set of raw key-value pair to TiKV, this API is atomic
*
* @param kvPairs kvPairs
*/
public void batchPutAtomic(Map<ByteString, ByteString> kvPairs) {
batchPutAtomic(kvPairs, 0);
}

/**
* Put a set of raw key-value pair to TiKV, this API is atomic.
*
* @param kvPairs kvPairs
* @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated
*/
public void batchPutAtomic(Map<ByteString, ByteString> kvPairs, long ttl) {
batchPut(kvPairs, ttl, true);
}

private void batchPut(Map<ByteString, ByteString> kvPairs, long ttl, boolean atomic) {
String label = "client_raw_batch_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl, atomic);
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
Expand Down Expand Up @@ -317,24 +309,11 @@ public List<KvPair> batchGet(List<ByteString> keys) {
* @param keys list of raw key
*/
public void batchDelete(List<ByteString> keys) {
batchDelete(keys, false);
}

/**
* Delete a list of raw key-value pair from TiKV if key exists, this API is atomic
*
* @param keys list of raw key
*/
public void batchDeleteAtomic(List<ByteString> keys) {
batchDelete(keys, true);
}

private void batchDelete(List<ByteString> keys, boolean atomic) {
String label = "client_raw_batch_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
doSendBatchDelete(backOffer, keys, atomic);
doSendBatchDelete(backOffer, keys);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (Exception e) {
Expand Down Expand Up @@ -608,7 +587,7 @@ public void delete(ByteString key) {
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
client.rawDelete(defaultBackOff(), key);
client.rawDelete(defaultBackOff(), key, atomicForCAS);
RAW_REQUEST_SUCCESS.labels(label).inc();
return;
} catch (final TiKVException e) {
Expand Down Expand Up @@ -660,8 +639,7 @@ public synchronized void deletePrefix(ByteString key) {
deleteRange(key, endKey);
}

private void doSendBatchPut(
BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl, boolean atomic) {
private void doSendBatchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl) {
ExecutorCompletionService<List<Batch>> completionService =
new ExecutorCompletionService<>(batchPutThreadPool);

Expand All @@ -686,16 +664,15 @@ private void doSendBatchPut(
List<Batch> task = taskQueue.poll();
for (Batch batch : task) {
completionService.submit(
() -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl, atomic));
() -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl));
}
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
}

private List<Batch> doSendBatchPutInBatchesWithRetry(
BackOffer backOffer, Batch batch, long ttl, boolean atomic) {
private List<Batch> doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) {
try (RegionStoreClient client = clientBuilder.build(batch.getRegion())) {
client.rawBatchPut(backOffer, batch, ttl, atomic);
client.rawBatchPut(backOffer, batch, ttl, atomicForCAS);
return new ArrayList<>();
} catch (final TiKVException e) {
// TODO: any elegant way to re-split the ranges if fails?
Expand Down Expand Up @@ -770,7 +747,7 @@ private List<Batch> doSendBatchGetWithRefetchRegion(BackOffer backOffer, Batch b
backOffer, batch.getKeys(), RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder);
}

private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys, boolean atomic) {
private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys) {
ExecutorCompletionService<List<Batch>> completionService =
new ExecutorCompletionService<>(batchDeleteThreadPool);

Expand All @@ -784,17 +761,16 @@ private void doSendBatchDelete(BackOffer backOffer, List<ByteString> keys, boole
List<Batch> task = taskQueue.poll();
for (Batch batch : task) {
completionService.submit(
() -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch, atomic));
() -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch));
}
getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF);
}
}

private List<Batch> doSendBatchDeleteInBatchesWithRetry(
BackOffer backOffer, Batch batch, boolean atomic) {
private List<Batch> doSendBatchDeleteInBatchesWithRetry(BackOffer backOffer, Batch batch) {
RegionStoreClient client = clientBuilder.build(batch.getRegion());
try {
client.rawBatchDelete(backOffer, batch.getKeys(), atomic);
client.rawBatchDelete(backOffer, batch.getKeys(), atomicForCAS);
return new ArrayList<>();
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
Expand Down

0 comments on commit 3ecb012

Please sign in to comment.