From 48c4b3f4bb5e56c32d1ea4fd633ae1cc07aa9b22 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Wed, 5 Jun 2024 19:00:41 -0700 Subject: [PATCH 1/3] Moving sync removal notifications outside lru lock Signed-off-by: Sagar Upadhyaya --- .../org/opensearch/common/cache/Cache.java | 54 ++++++++++++++++--- 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java index 6d346de25cadf..4e49210612dd8 100644 --- a/server/src/main/java/org/opensearch/common/cache/Cache.java +++ b/server/src/main/java/org/opensearch/common/cache/Cache.java @@ -36,9 +36,11 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.util.concurrent.ReleasableLock; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -396,7 +398,13 @@ private V get(K key, long now, Consumer> onExpiration) { if (entry == null) { return null; } else { - promote(entry, now); + List> removalNotifications = promote(entry, + now).v2(); + if (!removalNotifications.isEmpty()) { + for (RemovalNotification removalNotification: removalNotifications) { + removalListener.onRemoval(removalNotification); + } + } return entry.value; } } @@ -446,8 +454,14 @@ private V compute(K key, CacheLoader loader) throws ExecutionException { BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { if (ok != null) { + List> removalNotifications = new ArrayList<>(); try (ReleasableLock ignored = lruLock.acquire()) { - promote(ok, now); + removalNotifications = promote(ok, now).v2(); + } + if (!removalNotifications.isEmpty()) { + for (RemovalNotification removalNotification: removalNotifications) { + removalListener.onRemoval(removalNotification); + } } return ok.value; } else { @@ -512,16 +526,22 @@ private void put(K key, V value, long now) { CacheSegment segment = getCacheSegment(key); Tuple, Entry> tuple = segment.put(key, value, now); boolean replaced = false; + List> removalNotifications = new ArrayList<>(); try (ReleasableLock ignored = lruLock.acquire()) { if (tuple.v2() != null && tuple.v2().state == State.EXISTING) { if (unlink(tuple.v2())) { replaced = true; } } - promote(tuple.v1(), now); + removalNotifications = promote(tuple.v1(), now).v2(); } if (replaced) { - removalListener.onRemoval(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED)); + removalNotifications.add(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED)); + } + if (!removalNotifications.isEmpty()) { + for (RemovalNotification removalNotification: removalNotifications) { + removalListener.onRemoval(removalNotification); + } } } @@ -767,8 +787,18 @@ public long getEvictions() { } } - private boolean promote(Entry entry, long now) { + /** + * Promotes the desired entry to the head of the lru list and tries to see if it needs to evict any entries in + * case the cache size is exceeding or the entry got expired. + * @param entry Entry to be promoted + * @param now the current time + * @return Returns a tuple. v1 signifies whether an entry got promoted, v2 signifies the list of removal + * notifications that the callers needs to handle. + */ + private Tuple>> promote(Entry entry, + long now) { boolean promoted = true; + List> removalNotifications = new ArrayList<>(); try (ReleasableLock ignored = lruLock.acquire()) { switch (entry.state) { case DELETED: @@ -782,10 +812,20 @@ private boolean promote(Entry entry, long now) { break; } if (promoted) { - evict(now); + while (tail != null && shouldPrune(tail, now)) { + Entry entryToBeRemoved = tail; + CacheSegment segment = getCacheSegment(entryToBeRemoved.key); + if (segment != null) { + segment.remove(entryToBeRemoved.key, entryToBeRemoved.value, f -> {}); + } + if (unlink(entryToBeRemoved)) { + removalNotifications.add(new RemovalNotification<>(entryToBeRemoved.key, entryToBeRemoved.value, + RemovalReason.EVICTED)); + } + } } } - return promoted; + return new Tuple<>(promoted, removalNotifications); } private void evict(long now) { From 5dc4109d4b7b48be2ad50124b11eccf8874d5af5 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Wed, 5 Jun 2024 21:33:44 -0700 Subject: [PATCH 2/3] Add changelog and fix spotless issue Signed-off-by: Sagar Upadhyaya --- CHANGELOG.md | 1 + .../java/org/opensearch/common/cache/Cache.java | 17 ++++++++--------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ffb438172f50..d09dbe73106d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650)) - Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481)) - Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636)) +- Move cache removal notifications outside lru lock ([#14017](https://github.com/opensearch-project/OpenSearch/pull/14017)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java index 4e49210612dd8..caae81e4387b4 100644 --- a/server/src/main/java/org/opensearch/common/cache/Cache.java +++ b/server/src/main/java/org/opensearch/common/cache/Cache.java @@ -398,10 +398,9 @@ private V get(K key, long now, Consumer> onExpiration) { if (entry == null) { return null; } else { - List> removalNotifications = promote(entry, - now).v2(); + List> removalNotifications = promote(entry, now).v2(); if (!removalNotifications.isEmpty()) { - for (RemovalNotification removalNotification: removalNotifications) { + for (RemovalNotification removalNotification : removalNotifications) { removalListener.onRemoval(removalNotification); } } @@ -459,7 +458,7 @@ private V compute(K key, CacheLoader loader) throws ExecutionException { removalNotifications = promote(ok, now).v2(); } if (!removalNotifications.isEmpty()) { - for (RemovalNotification removalNotification: removalNotifications) { + for (RemovalNotification removalNotification : removalNotifications) { removalListener.onRemoval(removalNotification); } } @@ -539,7 +538,7 @@ private void put(K key, V value, long now) { removalNotifications.add(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED)); } if (!removalNotifications.isEmpty()) { - for (RemovalNotification removalNotification: removalNotifications) { + for (RemovalNotification removalNotification : removalNotifications) { removalListener.onRemoval(removalNotification); } } @@ -795,8 +794,7 @@ public long getEvictions() { * @return Returns a tuple. v1 signifies whether an entry got promoted, v2 signifies the list of removal * notifications that the callers needs to handle. */ - private Tuple>> promote(Entry entry, - long now) { + private Tuple>> promote(Entry entry, long now) { boolean promoted = true; List> removalNotifications = new ArrayList<>(); try (ReleasableLock ignored = lruLock.acquire()) { @@ -819,8 +817,9 @@ private Tuple>> promote(Entry entr segment.remove(entryToBeRemoved.key, entryToBeRemoved.value, f -> {}); } if (unlink(entryToBeRemoved)) { - removalNotifications.add(new RemovalNotification<>(entryToBeRemoved.key, entryToBeRemoved.value, - RemovalReason.EVICTED)); + removalNotifications.add( + new RemovalNotification<>(entryToBeRemoved.key, entryToBeRemoved.value, RemovalReason.EVICTED) + ); } } } From 1f920409e67dd3f05e2420e10ff15df0cc1efa39 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 6 Jun 2024 11:11:09 -0700 Subject: [PATCH 3/3] Adding javadoc for removal listener Signed-off-by: Sagar Upadhyaya --- .../java/org/opensearch/common/cache/RemovalListener.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/main/java/org/opensearch/common/cache/RemovalListener.java b/server/src/main/java/org/opensearch/common/cache/RemovalListener.java index 68e1cdf6139e2..eaaaec2bb07e0 100644 --- a/server/src/main/java/org/opensearch/common/cache/RemovalListener.java +++ b/server/src/main/java/org/opensearch/common/cache/RemovalListener.java @@ -42,5 +42,10 @@ @ExperimentalApi @FunctionalInterface public interface RemovalListener { + + /** + * This may be called from multiple threads at once. So implementation needs to be thread safe. + * @param notification removal notification for desired entry. + */ void onRemoval(RemovalNotification notification); }