Skip to content

Commit

Permalink
Merge branch 'master' into json-resp3-2
Browse files Browse the repository at this point in the history
  • Loading branch information
chayim committed Jul 18, 2023
2 parents 725ddf6 + 2cd2601 commit 148630a
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ To get started with Jedis, first add it as a dependency in your Java project. If
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.0</version>
<version>4.4.3</version>
</dependency>
```

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.1.0</version>
<version>3.1.2</version>
<configuration>
<systemPropertyVariables>
<redis-hosts>${redis-hosts}</redis-hosts>
Expand Down Expand Up @@ -313,7 +313,7 @@
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.1.0</version>
<version>3.1.2</version>
<configuration>
<test>**/examples/*Example.java</test>
</configuration>
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ public abstract class MultiNodePipelineBase extends PipelineBase
private final Map<HostAndPort, Connection> connections;
private volatile boolean syncing = false;

private final ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

public MultiNodePipelineBase(CommandObjects commandObjects) {
super(commandObjects);
pipelinedResponses = new LinkedHashMap<>();
Expand Down Expand Up @@ -104,6 +102,8 @@ public final void sync() {
}
syncing = true;

ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
= pipelinedResponses.entrySet().iterator();
Expand Down Expand Up @@ -136,6 +136,8 @@ public final void sync() {
log.error("Thread is interrupted during sync.", e);
}

executorService.shutdownNow();

syncing = false;
}

Expand Down
70 changes: 57 additions & 13 deletions src/test/java/redis/clients/jedis/ClusterPipeliningTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import redis.clients.jedis.args.*;
Expand All @@ -23,8 +24,6 @@
import redis.clients.jedis.util.JedisClusterTestUtil;
import redis.clients.jedis.util.SafeEncoder;

// SLOW
// TODO: make it fast
public class ClusterPipeliningTest {

private static final String LOCAL_IP = "127.0.0.1";
Expand All @@ -36,13 +35,13 @@ public class ClusterPipeliningTest {
private static Jedis node2;
private static Jedis node3;

private HostAndPort nodeInfo1 = HostAndPorts.getClusterServers().get(0);
private HostAndPort nodeInfo2 = HostAndPorts.getClusterServers().get(1);
private HostAndPort nodeInfo3 = HostAndPorts.getClusterServers().get(2);
private static HostAndPort nodeInfo1 = HostAndPorts.getClusterServers().get(0);
private static HostAndPort nodeInfo2 = HostAndPorts.getClusterServers().get(1);
private static HostAndPort nodeInfo3 = HostAndPorts.getClusterServers().get(2);
private Set<HostAndPort> nodes = new HashSet<>(Arrays.asList(nodeInfo1, nodeInfo2, nodeInfo3));

@Before
public void setUp() throws InterruptedException {
@BeforeClass
public static void setUp() throws InterruptedException {
node1 = new Jedis(nodeInfo1);
node1.auth("cluster");
node1.flushAll();
Expand Down Expand Up @@ -81,8 +80,22 @@ public void setUp() throws InterruptedException {
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3);
}

@Before
public void prepare() {
node1.flushAll();
node2.flushAll();
node3.flushAll();
}

@After
public void cleanUp() {
node1.flushDB();
node2.flushDB();
node3.flushDB();
}

@AfterClass
public static void cleanUp() {
public static void tearDown() throws InterruptedException {
node1.flushDB();
node2.flushDB();
node3.flushDB();
Expand All @@ -91,11 +104,6 @@ public static void cleanUp() {
node3.clusterReset(ClusterResetType.SOFT);
}

@After
public void tearDown() throws InterruptedException {
cleanUp();
}

@Test
public void constructorClientConfig() {
try (ClusterPipeline pipe = new ClusterPipeline(nodes, DEFAULT_CLIENT_CONFIG)) {
Expand Down Expand Up @@ -1053,4 +1061,40 @@ public void transaction() {
assertThrows(UnsupportedOperationException.class, () -> cluster.multi());
}
}

@Test(timeout = 10_000L)
public void multiple() {
final int maxTotal = 100;
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
poolConfig.setMaxTotal(maxTotal);
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG, 5, poolConfig)) {
for (int i = 0; i < maxTotal; i++) {
assertThreadsCount();
String s = Integer.toString(i);
try (ClusterPipeline pipeline = cluster.pipelined()) {
pipeline.set(s, s);
pipeline.sync();
}
assertThreadsCount();
}
}
}

private static void assertThreadsCount() {
// Get the root thread group
final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent();

// Create a buffer to store the thread information
final Thread[] threads = new Thread[rootGroup.activeCount()];

// Enumerate all threads into the buffer
rootGroup.enumerate(threads);

// Assert information about threads
final int count = (int) Arrays.stream(threads)
.filter(thread -> thread != null && thread.getName() != null
&& thread.getName().startsWith("pool-"))
.count();
MatcherAssert.assertThat(count, Matchers.lessThanOrEqualTo(20));
}
}

0 comments on commit 148630a

Please sign in to comment.