diff --git a/README.md b/README.md index c28b28f5c0..a1f3132641 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ To get started with Jedis, first add it as a dependency in your Java project. If redis.clients jedis - 4.3.0 + 4.4.3 ``` diff --git a/pom.xml b/pom.xml index 35075af400..f2995791da 100644 --- a/pom.xml +++ b/pom.xml @@ -184,7 +184,7 @@ maven-surefire-plugin - 3.1.0 + 3.1.2 ${redis-hosts} @@ -313,7 +313,7 @@ maven-surefire-plugin - 3.1.0 + 3.1.2 **/examples/*Example.java diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index fae16a8910..eef6b2a810 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -38,8 +38,6 @@ public abstract class MultiNodePipelineBase extends PipelineBase private final Map connections; private volatile boolean syncing = false; - private final ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); - public MultiNodePipelineBase(CommandObjects commandObjects) { super(commandObjects); pipelinedResponses = new LinkedHashMap<>(); @@ -104,6 +102,8 @@ public final void sync() { } syncing = true; + ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); + CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); Iterator>>> pipelinedResponsesIterator = pipelinedResponses.entrySet().iterator(); @@ -136,6 +136,8 @@ public final void sync() { log.error("Thread is interrupted during sync.", e); } + executorService.shutdownNow(); + syncing = false; } diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index 51c7f41730..3ed45bae63 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -10,6 +10,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import redis.clients.jedis.args.*; @@ -23,8 +24,6 @@ import redis.clients.jedis.util.JedisClusterTestUtil; import redis.clients.jedis.util.SafeEncoder; -// SLOW -// TODO: make it fast public class ClusterPipeliningTest { private static final String LOCAL_IP = "127.0.0.1"; @@ -36,13 +35,13 @@ public class ClusterPipeliningTest { private static Jedis node2; private static Jedis node3; - private HostAndPort nodeInfo1 = HostAndPorts.getClusterServers().get(0); - private HostAndPort nodeInfo2 = HostAndPorts.getClusterServers().get(1); - private HostAndPort nodeInfo3 = HostAndPorts.getClusterServers().get(2); + private static HostAndPort nodeInfo1 = HostAndPorts.getClusterServers().get(0); + private static HostAndPort nodeInfo2 = HostAndPorts.getClusterServers().get(1); + private static HostAndPort nodeInfo3 = HostAndPorts.getClusterServers().get(2); private Set nodes = new HashSet<>(Arrays.asList(nodeInfo1, nodeInfo2, nodeInfo3)); - @Before - public void setUp() throws InterruptedException { + @BeforeClass + public static void setUp() throws InterruptedException { node1 = new Jedis(nodeInfo1); node1.auth("cluster"); node1.flushAll(); @@ -81,8 +80,22 @@ public void setUp() throws InterruptedException { JedisClusterTestUtil.waitForClusterReady(node1, node2, node3); } + @Before + public void prepare() { + node1.flushAll(); + node2.flushAll(); + node3.flushAll(); + } + + @After + public void cleanUp() { + node1.flushDB(); + node2.flushDB(); + node3.flushDB(); + } + @AfterClass - public static void cleanUp() { + public static void tearDown() throws InterruptedException { node1.flushDB(); node2.flushDB(); node3.flushDB(); @@ -91,11 +104,6 @@ public static void cleanUp() { node3.clusterReset(ClusterResetType.SOFT); } - @After - public void tearDown() throws InterruptedException { - cleanUp(); - } - @Test public void constructorClientConfig() { try (ClusterPipeline pipe = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) { @@ -1053,4 +1061,40 @@ public void transaction() { assertThrows(UnsupportedOperationException.class, () -> cluster.multi()); } } + + @Test(timeout = 10_000L) + public void multiple() { + final int maxTotal = 100; + ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); + poolConfig.setMaxTotal(maxTotal); + try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG, 5, poolConfig)) { + for (int i = 0; i < maxTotal; i++) { + assertThreadsCount(); + String s = Integer.toString(i); + try (ClusterPipeline pipeline = cluster.pipelined()) { + pipeline.set(s, s); + pipeline.sync(); + } + assertThreadsCount(); + } + } + } + + private static void assertThreadsCount() { + // Get the root thread group + final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent(); + + // Create a buffer to store the thread information + final Thread[] threads = new Thread[rootGroup.activeCount()]; + + // Enumerate all threads into the buffer + rootGroup.enumerate(threads); + + // Assert information about threads + final int count = (int) Arrays.stream(threads) + .filter(thread -> thread != null && thread.getName() != null + && thread.getName().startsWith("pool-")) + .count(); + MatcherAssert.assertThat(count, Matchers.lessThanOrEqualTo(20)); + } }