From d0afb125e82fff288962c82f31d2629fe33c093e Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 23 Jun 2021 17:34:02 +0800 Subject: [PATCH 1/2] Refactor RawKVClient CAS API Signed-off-by: marsishandsome --- README.md | 4 + .../java/org/tikv/common/ConfigUtils.java | 3 + .../java/org/tikv/common/TiConfiguration.java | 11 +++ .../tikv/common/region/RegionStoreClient.java | 20 +++-- src/main/java/org/tikv/raw/RawKVClient.java | 82 +++++++------------ 5 files changed, 59 insertions(+), 61 deletions(-) diff --git a/README.md b/README.md index c977e86cff8..7e1433c353e 100644 --- a/README.md +++ b/README.md @@ -176,5 +176,9 @@ The following includes ThreadPool related parameters, which can be passed in thr - the thread pool size of deleteRange on client side - default: 20 +#### tikv.enable_atomic_for_cas +- whether to enable Compare And Swap, set true if using `RawKVClient.compareAndSet` or `RawKVClient.putIfAbsent` +- default: false + ## License Apache 2.0 license. See the [LICENSE](./LICENSE) file for details. diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index f4c06598ba2..0c2460e566e 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -51,6 +51,8 @@ public class ConfigUtils { public static final String TIKV_ENABLE_GRPC_FORWARD = "tikv.enable_grpc_forward"; public static final String TIKV_GRPC_HEALTH_CHECK_TIMEOUT = "tikv.grpc.health_check_timeout"; + public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas"; + public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "600ms"; public static final String DEF_SCAN_TIMEOUT = "20s"; @@ -80,6 +82,7 @@ public class ConfigUtils { public static final int DEF_METRICS_PORT = 3140; public static final String DEF_TIKV_NETWORK_MAPPING_NAME = ""; public static final boolean DEF_GRPC_FORWARD_ENABLE = true; + public static final boolean DEF_TIKV_ENABLE_ATOMIC_FOR_CAS = false; public static final String NORMAL_COMMAND_PRIORITY = "NORMAL"; public static final String LOW_COMMAND_PRIORITY = "LOW"; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index a4a6a0226f4..8344f2b0fe6 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -79,6 +79,7 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_NETWORK_MAPPING_NAME, DEF_TIKV_NETWORK_MAPPING_NAME); setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE); setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT); + setIfMissing(TIKV_ENABLE_ATOMIC_FOR_CAS, DEF_TIKV_ENABLE_ATOMIC_FOR_CAS); } public static void listAll() { @@ -266,6 +267,8 @@ private static ReplicaRead getReplicaRead(String key) { private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME); private HostMapping hostMapping = null; + private boolean enableAtomicForCAS = getBoolean(TIKV_ENABLE_ATOMIC_FOR_CAS); + public enum KVMode { TXN, RAW @@ -558,4 +561,12 @@ public boolean getEnableGrpcForward() { public long getGrpcHealthCheckTimeout() { return this.grpcHealthCheckTimeout; } + + public boolean isEnableAtomicForCAS() { + return enableAtomicForCAS; + } + + public void setEnableAtomicForCAS(boolean enableAtomicForCAS) { + this.enableAtomicForCAS = enableAtomicForCAS; + } } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index c313b3ba24e..46d5efec444 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -865,7 +865,7 @@ private Optional rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) { return Optional.of(resp.getTtl()); } - public void rawDelete(BackOffer backOffer, ByteString key) { + public void rawDelete(BackOffer backOffer, ByteString key, boolean atomicForCAS) { Histogram.Timer requestTimer = GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete").startTimer(); try { @@ -874,6 +874,7 @@ public void rawDelete(BackOffer backOffer, ByteString key) { RawDeleteRequest.newBuilder() .setContext(region.getReplicaContext(storeType)) .setKey(key) + .setForCas(atomicForCAS) .build(); RegionErrorHandler handler = @@ -901,7 +902,8 @@ private void rawDeleteHelper(RawDeleteResponse resp, TiRegion region) { } } - public void rawPut(BackOffer backOffer, ByteString key, ByteString value, long ttl) { + public void rawPut( + BackOffer backOffer, ByteString key, ByteString value, long ttl, boolean atomicForCAS) { Histogram.Timer requestTimer = GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put").startTimer(); try { @@ -912,6 +914,7 @@ public void rawPut(BackOffer backOffer, ByteString key, ByteString value, long t .setKey(key) .setValue(value) .setTtl(ttl) + .setForCas(atomicForCAS) .build(); RegionErrorHandler handler = @@ -1029,7 +1032,8 @@ private List handleRawBatchGet(RawBatchGetResponse resp) { return resp.getPairsList(); } - public void rawBatchPut(BackOffer backOffer, List kvPairs, long ttl, boolean atomic) { + public void rawBatchPut( + BackOffer backOffer, List kvPairs, long ttl, boolean atomicForCAS) { Histogram.Timer requestTimer = GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_put").startTimer(); try { @@ -1042,7 +1046,7 @@ public void rawBatchPut(BackOffer backOffer, List kvPairs, long ttl, boo .setContext(region.getReplicaContext(storeType)) .addAllPairs(kvPairs) .setTtl(ttl) - .setForCas(atomic) + .setForCas(atomicForCAS) .build(); RegionErrorHandler handler = new RegionErrorHandler( @@ -1055,7 +1059,7 @@ public void rawBatchPut(BackOffer backOffer, List kvPairs, long ttl, boo } } - public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl, boolean atomic) { + public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl, boolean atomicForCAS) { List pairs = new ArrayList<>(); for (int i = 0; i < batch.getKeys().size(); i++) { pairs.add( @@ -1064,7 +1068,7 @@ public void rawBatchPut(BackOffer backOffer, Batch batch, long ttl, boolean atom .setValue(batch.getValues().get(i)) .build()); } - rawBatchPut(backOffer, pairs, ttl, atomic); + rawBatchPut(backOffer, pairs, ttl, atomicForCAS); } private void handleRawBatchPut(RawBatchPutResponse resp) { @@ -1081,7 +1085,7 @@ private void handleRawBatchPut(RawBatchPutResponse resp) { } } - public void rawBatchDelete(BackOffer backoffer, List keys, boolean atomic) { + public void rawBatchDelete(BackOffer backoffer, List keys, boolean atomicForCAS) { Histogram.Timer requestTimer = GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_delete").startTimer(); try { @@ -1093,7 +1097,7 @@ public void rawBatchDelete(BackOffer backoffer, List keys, boolean a RawBatchDeleteRequest.newBuilder() .setContext(region.getReplicaContext(storeType)) .addAllKeys(keys) - .setForCas(atomic) + .setForCas(atomicForCAS) .build(); RegionErrorHandler handler = new RegionErrorHandler( diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 974df90e658..6c24bf497ec 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -40,6 +40,7 @@ public class RawKVClient implements AutoCloseable { private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; + private final boolean atomicForCAS; private final ExecutorService batchGetThreadPool; private final ExecutorService batchPutThreadPool; private final ExecutorService batchDeleteThreadPool; @@ -90,6 +91,7 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) { this.batchDeleteThreadPool = session.getThreadPoolForBatchDelete(); this.batchScanThreadPool = session.getThreadPoolForBatchScan(); this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange(); + this.atomicForCAS = conf.isEnableAtomicForCAS(); } @Override @@ -120,7 +122,7 @@ public void put(ByteString key, ByteString value, long ttl) { while (true) { RegionStoreClient client = clientBuilder.build(key); try { - client.rawPut(backOffer, key, value, ttl); + client.rawPut(backOffer, key, value, ttl, atomicForCAS); RAW_REQUEST_SUCCESS.labels(label).inc(); return; } catch (final TiKVException e) { @@ -138,6 +140,8 @@ public void put(ByteString key, ByteString value, long ttl) { /** * Put a key-value pair if it does not exist. This API is atomic. * + *

To use this API, please enable `tikv.enable_atomic_for_cas`. + * * @param key key * @param value value * @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the @@ -150,6 +154,8 @@ public Optional putIfAbsent(ByteString key, ByteString value) { /** * Put a key-value pair with TTL if it does not exist. This API is atomic. * + *

To use this API, please enable `tikv.enable_atomic_for_cas`. + * * @param key key * @param value value * @param ttl TTL of key (in seconds), 0 means the key will never be outdated. @@ -168,6 +174,8 @@ public Optional putIfAbsent(ByteString key, ByteString value, long t /** * Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic. * + *

To use this API, please enable `tikv.enable_atomic_for_cas`. + * * @param key key * @param value value */ @@ -179,6 +187,8 @@ public void compareAndSet(ByteString key, Optional prevValue, ByteSt /** * pair if the prevValue matched the value in TiKV. This API is atomic. * + *

To use this API, please enable `tikv.enable_atomic_for_cas`. + * * @param key key * @param value value * @param ttl TTL of key (in seconds), 0 means the key will never be outdated. @@ -186,6 +196,11 @@ public void compareAndSet(ByteString key, Optional prevValue, ByteSt public void compareAndSet( ByteString key, Optional prevValue, ByteString value, long ttl) throws RawCASConflictException { + if (!atomicForCAS) { + throw new IllegalArgumentException( + "To use Compare And Swap, please enable the config tikv.enable_atomic_for_cas."); + } + String label = "client_raw_compare_and_set"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { @@ -209,7 +224,7 @@ public void compareAndSet( } /** - * Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic. + * Put a set of raw key-value pair to TiKV. * * @param kvPairs kvPairs */ @@ -218,39 +233,16 @@ public void batchPut(Map kvPairs) { } /** - * Put a set of raw key-value pair to TiKV, this API does not ensure the operation is atomic. + * Put a set of raw key-value pair to TiKV. * * @param kvPairs kvPairs * @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated */ public void batchPut(Map kvPairs, long ttl) { - batchPut(kvPairs, ttl, false); - } - - /** - * Put a set of raw key-value pair to TiKV, this API is atomic - * - * @param kvPairs kvPairs - */ - public void batchPutAtomic(Map kvPairs) { - batchPutAtomic(kvPairs, 0); - } - - /** - * Put a set of raw key-value pair to TiKV, this API is atomic. - * - * @param kvPairs kvPairs - * @param ttl the TTL of keys to be put (in seconds), 0 means the keys will never be outdated - */ - public void batchPutAtomic(Map kvPairs, long ttl) { - batchPut(kvPairs, ttl, true); - } - - private void batchPut(Map kvPairs, long ttl, boolean atomic) { String label = "client_raw_batch_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { - doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl, atomic); + doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl); RAW_REQUEST_SUCCESS.labels(label).inc(); } catch (Exception e) { RAW_REQUEST_FAILURE.labels(label).inc(); @@ -317,24 +309,11 @@ public List batchGet(List keys) { * @param keys list of raw key */ public void batchDelete(List keys) { - batchDelete(keys, false); - } - - /** - * Delete a list of raw key-value pair from TiKV if key exists, this API is atomic - * - * @param keys list of raw key - */ - public void batchDeleteAtomic(List keys) { - batchDelete(keys, true); - } - - private void batchDelete(List keys, boolean atomic) { String label = "client_raw_batch_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { BackOffer backOffer = defaultBackOff(); - doSendBatchDelete(backOffer, keys, atomic); + doSendBatchDelete(backOffer, keys); RAW_REQUEST_SUCCESS.labels(label).inc(); return; } catch (Exception e) { @@ -608,7 +587,7 @@ public void delete(ByteString key) { while (true) { RegionStoreClient client = clientBuilder.build(key); try { - client.rawDelete(defaultBackOff(), key); + client.rawDelete(defaultBackOff(), key, atomicForCAS); RAW_REQUEST_SUCCESS.labels(label).inc(); return; } catch (final TiKVException e) { @@ -660,8 +639,7 @@ public synchronized void deletePrefix(ByteString key) { deleteRange(key, endKey); } - private void doSendBatchPut( - BackOffer backOffer, Map kvPairs, long ttl, boolean atomic) { + private void doSendBatchPut(BackOffer backOffer, Map kvPairs, long ttl) { ExecutorCompletionService> completionService = new ExecutorCompletionService<>(batchPutThreadPool); @@ -686,16 +664,15 @@ private void doSendBatchPut( List task = taskQueue.poll(); for (Batch batch : task) { completionService.submit( - () -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl, atomic)); + () -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl)); } getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF); } } - private List doSendBatchPutInBatchesWithRetry( - BackOffer backOffer, Batch batch, long ttl, boolean atomic) { + private List doSendBatchPutInBatchesWithRetry(BackOffer backOffer, Batch batch, long ttl) { try (RegionStoreClient client = clientBuilder.build(batch.getRegion())) { - client.rawBatchPut(backOffer, batch, ttl, atomic); + client.rawBatchPut(backOffer, batch, ttl, atomicForCAS); return new ArrayList<>(); } catch (final TiKVException e) { // TODO: any elegant way to re-split the ranges if fails? @@ -770,7 +747,7 @@ private List doSendBatchGetWithRefetchRegion(BackOffer backOffer, Batch b backOffer, batch.getKeys(), RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder); } - private void doSendBatchDelete(BackOffer backOffer, List keys, boolean atomic) { + private void doSendBatchDelete(BackOffer backOffer, List keys) { ExecutorCompletionService> completionService = new ExecutorCompletionService<>(batchDeleteThreadPool); @@ -784,17 +761,16 @@ private void doSendBatchDelete(BackOffer backOffer, List keys, boole List task = taskQueue.poll(); for (Batch batch : task) { completionService.submit( - () -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch, atomic)); + () -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch)); } getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF); } } - private List doSendBatchDeleteInBatchesWithRetry( - BackOffer backOffer, Batch batch, boolean atomic) { + private List doSendBatchDeleteInBatchesWithRetry(BackOffer backOffer, Batch batch) { RegionStoreClient client = clientBuilder.build(batch.getRegion()); try { - client.rawBatchDelete(backOffer, batch.getKeys(), atomic); + client.rawBatchDelete(backOffer, batch.getKeys(), atomicForCAS); return new ArrayList<>(); } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); From 2a7ef846a6ec091d8b4efdb775046f4b00962741 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Thu, 24 Jun 2021 11:45:40 +0800 Subject: [PATCH 2/2] fix test Signed-off-by: marsishandsome --- README.md | 2 +- src/main/java/org/tikv/raw/RawKVClient.java | 2 +- src/test/java/org/tikv/raw/CASTest.java | 85 +++++++++++++++++++ .../java/org/tikv/raw/RawKVClientTest.java | 43 +--------- 4 files changed, 88 insertions(+), 44 deletions(-) create mode 100644 src/test/java/org/tikv/raw/CASTest.java diff --git a/README.md b/README.md index 7e1433c353e..cd4ab8ed4b1 100644 --- a/README.md +++ b/README.md @@ -177,7 +177,7 @@ The following includes ThreadPool related parameters, which can be passed in thr - default: 20 #### tikv.enable_atomic_for_cas -- whether to enable Compare And Swap, set true if using `RawKVClient.compareAndSet` or `RawKVClient.putIfAbsent` +- whether to enable `Compare And Set`, set true if using `RawKVClient.compareAndSet` or `RawKVClient.putIfAbsent` - default: false ## License diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 6c24bf497ec..87cac1361b9 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -198,7 +198,7 @@ public void compareAndSet( throws RawCASConflictException { if (!atomicForCAS) { throw new IllegalArgumentException( - "To use Compare And Swap, please enable the config tikv.enable_atomic_for_cas."); + "To use compareAndSet or putIfAbsent, please enable the config tikv.enable_atomic_for_cas."); } String label = "client_raw_compare_and_set"; diff --git a/src/test/java/org/tikv/raw/CASTest.java b/src/test/java/org/tikv/raw/CASTest.java new file mode 100644 index 00000000000..4e4719de127 --- /dev/null +++ b/src/test/java/org/tikv/raw/CASTest.java @@ -0,0 +1,85 @@ +package org.tikv.raw; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import com.google.protobuf.ByteString; +import java.util.Optional; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.exception.RawCASConflictException; + +public class CASTest { + private RawKVClient client; + private boolean initialized; + private static final Logger logger = LoggerFactory.getLogger(RawKVClientTest.class); + private TiSession session; + + @Before + public void setup() { + try { + TiConfiguration conf = TiConfiguration.createRawDefault(); + conf.setEnableAtomicForCAS(true); + session = TiSession.create(conf); + initialized = false; + if (client == null) { + client = session.createRawClient(); + } + initialized = true; + } catch (Exception e) { + logger.warn( + "Cannot initialize raw client, please check whether TiKV is running. Test skipped.", e); + } + } + + @After + public void tearDown() throws Exception { + if (session != null) { + session.close(); + } + } + + @Test + public void rawCASTest() { + if (!initialized) return; + ByteString key = ByteString.copyFromUtf8("key_atomic"); + ByteString value = ByteString.copyFromUtf8("value"); + ByteString value2 = ByteString.copyFromUtf8("value2"); + client.delete(key); + client.compareAndSet(key, Optional.empty(), value); + Assert.assertEquals(value, client.get(key).get()); + try { + client.compareAndSet(key, Optional.empty(), value2); + Assert.fail("compareAndSet should fail."); + } catch (RawCASConflictException err) { + Assert.assertEquals(value, err.getPrevValue().get()); + } + } + + @Test + public void rawPutIfAbsentTest() { + if (!initialized) return; + long ttl = 10; + ByteString key = ByteString.copyFromUtf8("key_atomic"); + ByteString value = ByteString.copyFromUtf8("value"); + ByteString value2 = ByteString.copyFromUtf8("value2"); + client.delete(key); + Optional res1 = client.putIfAbsent(key, value, ttl); + assertFalse(res1.isPresent()); + Optional res2 = client.putIfAbsent(key, value2, ttl); + assertEquals(res2.get(), value); + try { + Thread.sleep(ttl * 1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + Optional res3 = client.putIfAbsent(key, value, ttl); + assertFalse(res3.isPresent()); + } +} diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index c394666fa1a..2a2bc84b1ea 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -3,13 +3,11 @@ import static org.junit.Assert.*; import com.google.protobuf.ByteString; -import java.io.IOException; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -17,7 +15,6 @@ import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; import org.tikv.common.codec.KeyUtils; -import org.tikv.common.exception.RawCASConflictException; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; import org.tikv.common.util.FastByteComparisons; @@ -73,7 +70,7 @@ private static ByteString getRandomValue() { } @Before - public void setup() throws IOException { + public void setup() { try { TiConfiguration conf = TiConfiguration.createRawDefault(); session = TiSession.create(conf); @@ -96,44 +93,6 @@ public void tearDown() throws Exception { } } - @Test - public void rawCASTest() { - if (!initialized) return; - ByteString key = ByteString.copyFromUtf8("key_atomic"); - ByteString value = ByteString.copyFromUtf8("value"); - ByteString value2 = ByteString.copyFromUtf8("value2"); - client.delete(key); - client.compareAndSet(key, Optional.empty(), value); - Assert.assertEquals(value, client.get(key).get()); - try { - client.compareAndSet(key, Optional.empty(), value2); - Assert.fail("compareAndSet should fail."); - } catch (RawCASConflictException err) { - Assert.assertEquals(value, err.getPrevValue().get()); - } - } - - @Test - public void rawPutIfAbsentTest() { - if (!initialized) return; - long ttl = 10; - ByteString key = ByteString.copyFromUtf8("key_atomic"); - ByteString value = ByteString.copyFromUtf8("value"); - ByteString value2 = ByteString.copyFromUtf8("value2"); - client.delete(key); - Optional res1 = client.putIfAbsent(key, value, ttl); - assertFalse(res1.isPresent()); - Optional res2 = client.putIfAbsent(key, value2, ttl); - assertEquals(res2.get(), value); - try { - Thread.sleep(ttl * 1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - Optional res3 = client.putIfAbsent(key, value, ttl); - assertFalse(res3.isPresent()); - } - @Test public void getKeyTTLTest() { if (!initialized) return;