Skip to content

Commit

Permalink
🐛 Fix Thread Leak in ChangeStreamWorker Due to Inactive Change Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
ujibang committed Aug 23, 2024
1 parent 88cf353 commit 743e169
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.slf4j.LoggerFactory;

import com.mongodb.MongoException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
Expand All @@ -64,6 +65,8 @@ public class ChangeStreamWorker implements Runnable {
private final String collName;
private final Set<WebSocketSession> websocketSessions = Collections.synchronizedSet(new HashSet<>());

private Thread handlingVirtualThread = null;

public ChangeStreamWorker(ChangeStreamWorkerKey key, List<BsonDocument> resolvedStages, String dbName, String collName) {
super();
this.key = key;
Expand All @@ -84,8 +87,18 @@ public String getCollName() {
return this.collName;
}

public Thread handlingVirtualThread() {
return this.handlingVirtualThread;
}

@Override
public void run() {
if (Thread.currentThread().isVirtual()) {
this.handlingVirtualThread = Thread.currentThread();
}

LOGGER.debug("Change stream worker started {}", Thread.currentThread().getName());

try {
changeStreamEventsLoop();
} catch(Throwable t) {
Expand All @@ -98,6 +111,7 @@ public void run() {
closeAllWebSocketSessions();
} finally {
ChangeStreamWorkers.getInstance().remove(key);
LOGGER.debug("Change stream worker ended");
}
}

Expand All @@ -110,6 +124,8 @@ public void run() {
private void changeStreamEventsLoop() {
try {
_changeStreamEventsLoop();
} catch(MongoInterruptedException mie) {
close();
} catch(MongoException mqe) {
LOGGER.error("MongoDb error on ChangeStreamWorker {}, restarting a new worker", key, mqe);

Expand All @@ -126,6 +142,7 @@ private void changeStreamEventsLoop() {
private void _changeStreamEventsLoop() {
LOGGER.debug("Change Stream Worker {} started listening for change events", this.key);
final var changeStream = startChangeStream();

changeStream.forEach(changeEvent -> {
if (this.websocketSessions.isEmpty()) {
// this terminates the ChangeStreamWorker
Expand Down Expand Up @@ -171,13 +188,17 @@ public void onError(final WebSocketChannel channel, Void context, Throwable thro

/**
* removes the workers form the list of active workers and
* close all its websocket sesssions
* close all its websocket sesssions and interrupt the handling virtual thread
*
* on next change event, the thread will terminate since it has no active websocket sesssions
*/
void close() {
ChangeStreamWorkers.getInstance().remove(key);
closeAllWebSocketSessions();

if (this.handlingVirtualThread != null && !this.handlingVirtualThread.isInterrupted()) {
this.handlingVirtualThread.interrupt();
}
}

void closeAllWebSocketSessions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
* @author Andrea Di Cesare {@literal <andrea@softinstigate.com>}
*/
public class ChangeStreamWorkers {
// todo use caffeine cache
private final Map<ChangeStreamWorkerKey, ChangeStreamWorker> CHANGE_STREAM_WORKERS = new ConcurrentHashMap<>();

public static ChangeStreamWorkers getInstance() {
Expand Down Expand Up @@ -80,6 +79,13 @@ public Set<ChangeStreamWorker> getWorkersOnCollection(String db, String coll) {
}
}

public Optional<ChangeStreamWorker> workerOfSession(WebSocketSession session) {
return CHANGE_STREAM_WORKERS.entrySet().stream()
.map(e -> e.getValue())
.filter(csw -> csw.websocketSessions().contains(session))
.findFirst();
}

private static class SingletonHolder {
private static final ChangeStreamWorkers INSTANCE = new ChangeStreamWorkers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ public WebSocketSession(WebSocketChannel channel, ChangeStreamWorker csw) {

this.channel.addCloseTask((WebSocketChannel channel1) -> {
this.changeStreamWorker.websocketSessions().removeIf(s -> s.getId().equals(id));

if (this.changeStreamWorker.websocketSessions().isEmpty()) {
if (this.changeStreamWorker.handlingVirtualThread() != null) {
LOGGER.debug("Terminating worker {}", this.changeStreamWorker.handlingVirtualThread().getName());
this.changeStreamWorker.handlingVirtualThread().interrupt();
} else {
LOGGER.warn("Cannot terminate worker since handlingVirtualThread is null");
}
}

try {
this.close();
} catch (IOException ex) {
Expand Down

0 comments on commit 743e169

Please sign in to comment.