From 7c60acdbd8751c2823886d7853fa3b3f3a07c1c2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 4 Aug 2022 15:39:58 +0100 Subject: [PATCH] Update the rejected state of events during resync Events can be un-rejected or newly-rejected during resync, so ensure we update the database and caches when that happens. --- changelog.d/13459.misc | 1 + .../storage/databases/main/events_worker.py | 55 +++++++++++++++++++ synapse/storage/databases/main/state.py | 5 ++ 3 files changed, 61 insertions(+) create mode 100644 changelog.d/13459.misc diff --git a/changelog.d/13459.misc b/changelog.d/13459.misc new file mode 100644 index 000000000000..e6082210a0d8 --- /dev/null +++ b/changelog.d/13459.misc @@ -0,0 +1 @@ +Faster joins: update the rejected state of events during de-partial-stating. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 29c99c635735..65542d20b5f0 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -2139,3 +2139,58 @@ def _get_partial_state_events_batch_txn( (room_id,), ) return [row[0] for row in txn] + + def mark_event_rejected_txn( + self, + txn: LoggingTransaction, + event_id: str, + rejection_reason: Optional[str], + ) -> None: + """Mark an event that was previously accepted as rejected, or vice versa + + This can happen, for example, when resyncing state during a faster join. + + Args: + txn: + event_id: ID of event to update + rejection_reason: reason it has been rejected, or None if it is now accepted + """ + if rejection_reason is None: + logger.info( + "Marking previously-processed event %s as accepted", + event_id, + ) + self.db_pool.simple_delete_txn( + txn, + "rejections", + keyvalues={"event_id": event_id}, + ) + else: + logger.info( + "Marking previously-processed event %s as rejected(%s)", + event_id, + rejection_reason, + ) + self.db_pool.simple_upsert_txn( + txn, + table="rejections", + keyvalues={"event_id": event_id}, + values={ + "reason": rejection_reason, + "last_check": self._clock.time_msec(), + }, + ) + self.db_pool.simple_update_txn( + txn, + table="events", + keyvalues={"event_id": event_id}, + updatevalues={"rejection_reason": rejection_reason}, + ) + + self.invalidate_get_event_cache_after_txn(txn, event_id) + + # TODO(faster_joins): invalidate the cache on workers. Ideally we'd just + # call '_send_invalidation_to_replication', but we actually need the other + # end to call _invalidate_local_event_cache() rather than (just) + # _get_event_cache.invalidate(). + # https://github.com/matrix-org/synapse/issues/12994 diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index f70705a0af19..0b10af0e580e 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -430,6 +430,11 @@ def _update_state_for_partial_state_event_txn( updatevalues={"state_group": state_group}, ) + # the event may now be rejected where it was not before, or vice versa, + # in which case we need to update the rejected flags. + if bool(context.rejected) != (event.rejected_reason is not None): + self.mark_event_rejected_txn(txn, event.event_id, context.rejected) + self.db_pool.simple_delete_one_txn( txn, table="partial_state_events",