From 406c8f16cc35f8c9595884b5b32efaa6ab1ba417 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 9 Feb 2021 12:29:16 +0100 Subject: [PATCH 1/7] add fixes --- synapse/handlers/sync.py | 6 +++++- synapse/util/caches/response_cache.py | 9 ++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5c7590f38e4d..cc68033d34a8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -40,7 +40,7 @@ from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.lrucache import LruCache -from synapse.util.caches.response_cache import ResponseCache +from synapse.util.caches.response_cache import NoTimedCache, ResponseCache from synapse.util.metrics import Measure, measure_func from synapse.visibility import filter_events_for_client @@ -331,6 +331,10 @@ def current_sync_callback(before_token, after_token): lazy_loaded = "false" non_empty_sync_counter.labels(sync_type, lazy_loaded).inc() + # hack: sanity check to invalidate cache, prevent a loop + if since_token == result.next_batch: + result = NoTimedCache(result) # type: ignore # because it is unwrapped in ResponseCache.set.() + return result async def current_sync_for_user( diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 32228f42ee59..158095d86eba 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -15,6 +15,8 @@ import logging from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar +import attr + from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -29,6 +31,11 @@ T = TypeVar("T") +@attr.s(slots=True, frozen=True) +class NoTimedCache(Generic[T]): + obj = attr.ib() + + class ResponseCache(Generic[T]): """ This caches a deferred response. Until the deferred completes it will be @@ -101,7 +108,7 @@ def set(self, key: T, deferred: defer.Deferred) -> defer.Deferred: self.pending_result_cache[key] = result def remove(r): - if self.timeout_sec: + if self.timeout_sec and not isinstance(r, NoTimedCache): self.clock.call_later( self.timeout_sec, self.pending_result_cache.pop, key, None ) From bbad98c076f3597317df33f3eea9f84207c9f6e9 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 9 Feb 2021 12:31:22 +0100 Subject: [PATCH 2/7] news --- changelog.d/9358.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9358.misc diff --git a/changelog.d/9358.misc b/changelog.d/9358.misc new file mode 100644 index 000000000000..cc7614afc061 --- /dev/null +++ b/changelog.d/9358.misc @@ -0,0 +1 @@ +Added a fix that invalidates cache for empty timed-out sync responses. \ No newline at end of file From e8296ff66cefc064edaeb8d3ef79cac2f08c1c8e Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 9 Feb 2021 12:37:33 +0100 Subject: [PATCH 3/7] add message for the future --- synapse/handlers/sync.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index cc68033d34a8..686bf747d59c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -331,7 +331,8 @@ def current_sync_callback(before_token, after_token): lazy_loaded = "false" non_empty_sync_counter.labels(sync_type, lazy_loaded).inc() - # hack: sanity check to invalidate cache, prevent a loop + # fixme hack: sanity check to invalidate cache, prevent a self-referral loop + # replace with contextvars approach once it gets working on twisted (twisted/twisted#1262) if since_token == result.next_batch: result = NoTimedCache(result) # type: ignore # because it is unwrapped in ResponseCache.set.() From 3b0829449188954b87f281357da69f9eabb4a0a4 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 9 Feb 2021 12:38:50 +0100 Subject: [PATCH 4/7] add if-else on return (forgot in initial commit) --- synapse/util/caches/response_cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 158095d86eba..2d1d50544655 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -114,7 +114,7 @@ def remove(r): ) else: self.pending_result_cache.pop(key, None) - return r + return r.obj if isinstance(r, NoTimedCache) else r result.addBoth(remove) return result.observe() From b239033d4a2f390e209ff369cf489718ea3d973a Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 9 Feb 2021 12:45:06 +0100 Subject: [PATCH 5/7] forgot to remove Generic[T] --- synapse/util/caches/response_cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 2d1d50544655..3d630dde00b0 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -32,7 +32,7 @@ @attr.s(slots=True, frozen=True) -class NoTimedCache(Generic[T]): +class NoTimedCache: obj = attr.ib() From 04c399d36a47104cf9a13a772ec78b09c66a13f3 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Mon, 22 Feb 2021 12:06:08 +0100 Subject: [PATCH 6/7] change strategy to ResponseCache.wrap_conditional() --- synapse/handlers/sync.py | 10 ++----- synapse/util/caches/response_cache.py | 42 ++++++++++++++++++++------- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 686bf747d59c..bb826df79fd6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -40,7 +40,7 @@ from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.lrucache import LruCache -from synapse.util.caches.response_cache import NoTimedCache, ResponseCache +from synapse.util.caches.response_cache import ResponseCache from synapse.util.metrics import Measure, measure_func from synapse.visibility import filter_events_for_client @@ -277,8 +277,9 @@ async def wait_for_sync_for_user( user_id = sync_config.user.to_string() await self.auth.check_auth_blocking(requester=requester) - res = await self.response_cache.wrap( + res = await self.response_cache.wrap_conditional( sync_config.request_key, + lambda result: since_token != result.next_batch, self._wait_for_sync_for_user, sync_config, since_token, @@ -331,11 +332,6 @@ def current_sync_callback(before_token, after_token): lazy_loaded = "false" non_empty_sync_counter.labels(sync_type, lazy_loaded).inc() - # fixme hack: sanity check to invalidate cache, prevent a self-referral loop - # replace with contextvars approach once it gets working on twisted (twisted/twisted#1262) - if since_token == result.next_batch: - result = NoTimedCache(result) # type: ignore # because it is unwrapped in ResponseCache.set.() - return result async def current_sync_for_user( diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 3d630dde00b0..7401adaa4e0b 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -13,9 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar - -import attr +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, Set, TypeVar from twisted.internet import defer @@ -31,11 +29,6 @@ T = TypeVar("T") -@attr.s(slots=True, frozen=True) -class NoTimedCache: - obj = attr.ib() - - class ResponseCache(Generic[T]): """ This caches a deferred response. Until the deferred completes it will be @@ -47,6 +40,7 @@ class ResponseCache(Generic[T]): def __init__(self, hs: "HomeServer", name: str, timeout_ms: float = 0): # Requests that haven't finished yet. self.pending_result_cache = {} # type: Dict[T, ObservableDeferred] + self.pending_conditionals = {} # type: Dict[T, Set[Callable[[Any], bool]]] self.clock = hs.get_clock() self.timeout_sec = timeout_ms / 1000.0 @@ -108,17 +102,45 @@ def set(self, key: T, deferred: defer.Deferred) -> defer.Deferred: self.pending_result_cache[key] = result def remove(r): - if self.timeout_sec and not isinstance(r, NoTimedCache): + should_cache = all( + func(r) for func in self.pending_conditionals.pop(key, []) + ) + + if self.timeout_sec and should_cache: self.clock.call_later( self.timeout_sec, self.pending_result_cache.pop, key, None ) else: self.pending_result_cache.pop(key, None) - return r.obj if isinstance(r, NoTimedCache) else r + return r result.addBoth(remove) return result.observe() + def add_conditional(self, key: T, conditional: Callable[[Any], bool]): + self.pending_conditionals.setdefault(key, set()).add(conditional) + + def wrap_conditional( + self, + key: T, + should_cache: Callable[[Any], bool], + callback: "Callable[..., Any]", + *args: Any, + **kwargs: Any + ) -> defer.Deferred: + """The same as wrap(), but adds a conditional to the final execution. + + When the final execution completes, *all* conditionals need to return True for it to properly cache, + else it'll not be cached in a timed fashion.""" + + # See if there's already a result on this key that hasn't yet completed, due to the single-threaded nature of + # python, adding a key immediately in the same execution thread will not cause a race condition. + result = self.get(key) + if not result or isinstance(result, defer.Deferred) and not result.called: + self.add_conditional(key, should_cache) + + return self.wrap(key, callback, *args, **kwargs) + def wrap( self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any ) -> defer.Deferred: From ed3922312af3d78b4e27b693e6e4dc3c0e8c9ebb Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Wed, 24 Feb 2021 12:01:50 +0000 Subject: [PATCH 7/7] Apply suggestions from code review --- synapse/util/caches/response_cache.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 7401adaa4e0b..53f85195a710 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -131,9 +131,10 @@ def wrap_conditional( """The same as wrap(), but adds a conditional to the final execution. When the final execution completes, *all* conditionals need to return True for it to properly cache, - else it'll not be cached in a timed fashion.""" + else it'll not be cached in a timed fashion. + """ - # See if there's already a result on this key that hasn't yet completed, due to the single-threaded nature of + # See if there's already a result on this key that hasn't yet completed. Due to the single-threaded nature of # python, adding a key immediately in the same execution thread will not cause a race condition. result = self.get(key) if not result or isinstance(result, defer.Deferred) and not result.called: