From ca52419c5b201421041d8280fa23a0e2940cf3d2 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 7 Aug 2023 14:14:31 +0100 Subject: [PATCH] Use `wrap_as_background_process` --- synapse/storage/databases/main/cache.py | 49 +++++++++++-------------- 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 7937ec1bd201..2fbd389c7168 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -19,7 +19,7 @@ from synapse.api.constants import EventTypes from synapse.config._base import Config -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.tcp.streams import BackfillStream, CachesStream from synapse.replication.tcp.streams.events import ( EventsStream, @@ -584,36 +584,29 @@ def get_cache_stream_token_for_writer(self, instance_name: str) -> int: else: return 0 - def _clean_up_cache_invalidation_wrapper(self) -> None: - async def _clean_up_cache_invalidation_background() -> None: - """ - Clean up cache invalidation stream table entries occasionally. - If we are behind (i.e. there are entries old enough to - be deleted but too many of them to be deleted in one go), - then we run slightly more frequently. - """ - delete_up_to: int = ( - self.hs.get_clock().time_msec() - - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS - ) - - in_backlog = await self._clean_up_batch_of_old_cache_invalidations( - delete_up_to - ) + @wrap_as_background_process("clean_up_old_cache_invalidations") + async def _clean_up_cache_invalidation_wrapper(self) -> None: + """ + Clean up cache invalidation stream table entries occasionally. + If we are behind (i.e. there are entries old enough to + be deleted but too many of them to be deleted in one go), + then we run slightly more frequently. + """ + delete_up_to: int = ( + self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS + ) - # Vary how long we wait before calling again depending on whether we - # are still sifting through backlog or we have caught up. - if in_backlog: - next_interval = CATCH_UP_CLEANUP_INTERVAL_MS - else: - next_interval = REGULAR_CLEANUP_INTERVAL_MS + in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to) - self.hs.get_clock().call_later( - next_interval / 1000, self._clean_up_cache_invalidation_wrapper - ) + # Vary how long we wait before calling again depending on whether we + # are still sifting through backlog or we have caught up. + if in_backlog: + next_interval = CATCH_UP_CLEANUP_INTERVAL_MS + else: + next_interval = REGULAR_CLEANUP_INTERVAL_MS - run_as_background_process( - "clean_up_old_cache_invalidations", _clean_up_cache_invalidation_background + self.hs.get_clock().call_later( + next_interval / 1000, self._clean_up_cache_invalidation_wrapper ) async def _clean_up_batch_of_old_cache_invalidations(