diff --git a/mongodb/src/main/java/org/restheart/mongodb/handlers/changestreams/ChangeStreamWorker.java b/mongodb/src/main/java/org/restheart/mongodb/handlers/changestreams/ChangeStreamWorker.java index 46505073f..a8488b080 100644 --- a/mongodb/src/main/java/org/restheart/mongodb/handlers/changestreams/ChangeStreamWorker.java +++ b/mongodb/src/main/java/org/restheart/mongodb/handlers/changestreams/ChangeStreamWorker.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import com.mongodb.MongoException; +import com.mongodb.MongoInterruptedException; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument; @@ -64,6 +65,8 @@ public class ChangeStreamWorker implements Runnable { private final String collName; private final Set websocketSessions = Collections.synchronizedSet(new HashSet<>()); + private Thread handlingVirtualThread = null; + public ChangeStreamWorker(ChangeStreamWorkerKey key, List resolvedStages, String dbName, String collName) { super(); this.key = key; @@ -84,8 +87,18 @@ public String getCollName() { return this.collName; } + public Thread handlingVirtualThread() { + return this.handlingVirtualThread; + } + @Override public void run() { + if (Thread.currentThread().isVirtual()) { + this.handlingVirtualThread = Thread.currentThread(); + } + + LOGGER.debug("Change stream worker started {}", Thread.currentThread().getName()); + try { changeStreamEventsLoop(); } catch(Throwable t) { @@ -98,6 +111,7 @@ public void run() { closeAllWebSocketSessions(); } finally { ChangeStreamWorkers.getInstance().remove(key); + LOGGER.debug("Change stream worker ended"); } } @@ -110,6 +124,8 @@ public void run() { private void changeStreamEventsLoop() { try { _changeStreamEventsLoop(); + } catch(MongoInterruptedException mie) { + close(); } catch(MongoException mqe) { LOGGER.error("MongoDb error on ChangeStreamWorker {}, restarting a new worker", key, mqe); @@ -126,6 +142,7 @@ private void changeStreamEventsLoop() { private void _changeStreamEventsLoop() { LOGGER.debug("Change Stream Worker {} started listening for change events", this.key); final var changeStream = startChangeStream(); + changeStream.forEach(changeEvent -> { if (this.websocketSessions.isEmpty()) { // this terminates the ChangeStreamWorker @@ -171,13 +188,17 @@ public void onError(final WebSocketChannel channel, Void context, Throwable thro /** * removes the workers form the list of active workers and - * close all its websocket sesssions + * close all its websocket sesssions and interrupt the handling virtual thread * * on next change event, the thread will terminate since it has no active websocket sesssions */ void close() { ChangeStreamWorkers.getInstance().remove(key); closeAllWebSocketSessions(); + + if (this.handlingVirtualThread != null && !this.handlingVirtualThread.isInterrupted()) { + this.handlingVirtualThread.interrupt(); + } } void closeAllWebSocketSessions() { diff --git a/mongodb/src/main/java/org/restheart/mongodb/handlers/changestreams/ChangeStreamWorkers.java b/mongodb/src/main/java/org/restheart/mongodb/handlers/changestreams/ChangeStreamWorkers.java index 0ea336974..6c23864e7 100644 --- a/mongodb/src/main/java/org/restheart/mongodb/handlers/changestreams/ChangeStreamWorkers.java +++ b/mongodb/src/main/java/org/restheart/mongodb/handlers/changestreams/ChangeStreamWorkers.java @@ -33,7 +33,6 @@ * @author Andrea Di Cesare {@literal } */ public class ChangeStreamWorkers { - // todo use caffeine cache private final Map CHANGE_STREAM_WORKERS = new ConcurrentHashMap<>(); public static ChangeStreamWorkers getInstance() { @@ -80,6 +79,13 @@ public Set getWorkersOnCollection(String db, String coll) { } } + public Optional workerOfSession(WebSocketSession session) { + return CHANGE_STREAM_WORKERS.entrySet().stream() + .map(e -> e.getValue()) + .filter(csw -> csw.websocketSessions().contains(session)) + .findFirst(); + } + private static class SingletonHolder { private static final ChangeStreamWorkers INSTANCE = new ChangeStreamWorkers(); } diff --git a/mongodb/src/main/java/org/restheart/mongodb/handlers/changestreams/WebSocketSession.java b/mongodb/src/main/java/org/restheart/mongodb/handlers/changestreams/WebSocketSession.java index 8c8400d16..ac5950d04 100644 --- a/mongodb/src/main/java/org/restheart/mongodb/handlers/changestreams/WebSocketSession.java +++ b/mongodb/src/main/java/org/restheart/mongodb/handlers/changestreams/WebSocketSession.java @@ -47,6 +47,16 @@ public WebSocketSession(WebSocketChannel channel, ChangeStreamWorker csw) { this.channel.addCloseTask((WebSocketChannel channel1) -> { this.changeStreamWorker.websocketSessions().removeIf(s -> s.getId().equals(id)); + + if (this.changeStreamWorker.websocketSessions().isEmpty()) { + if (this.changeStreamWorker.handlingVirtualThread() != null) { + LOGGER.debug("Terminating worker {}", this.changeStreamWorker.handlingVirtualThread().getName()); + this.changeStreamWorker.handlingVirtualThread().interrupt(); + } else { + LOGGER.warn("Cannot terminate worker since handlingVirtualThread is null"); + } + } + try { this.close(); } catch (IOException ex) {