Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Update the rejected state of events during resync
Browse files Browse the repository at this point in the history
Events can be un-rejected or newly-rejected during resync, so ensure we update
the database and caches when that happens.
  • Loading branch information
richvdh committed Aug 4, 2022
1 parent 6dd7fa1 commit 7c60acd
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/13459.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster joins: update the rejected state of events during de-partial-stating.
55 changes: 55 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2139,3 +2139,58 @@ def _get_partial_state_events_batch_txn(
(room_id,),
)
return [row[0] for row in txn]

def mark_event_rejected_txn(
self,
txn: LoggingTransaction,
event_id: str,
rejection_reason: Optional[str],
) -> None:
"""Mark an event that was previously accepted as rejected, or vice versa
This can happen, for example, when resyncing state during a faster join.
Args:
txn:
event_id: ID of event to update
rejection_reason: reason it has been rejected, or None if it is now accepted
"""
if rejection_reason is None:
logger.info(
"Marking previously-processed event %s as accepted",
event_id,
)
self.db_pool.simple_delete_txn(
txn,
"rejections",
keyvalues={"event_id": event_id},
)
else:
logger.info(
"Marking previously-processed event %s as rejected(%s)",
event_id,
rejection_reason,
)
self.db_pool.simple_upsert_txn(
txn,
table="rejections",
keyvalues={"event_id": event_id},
values={
"reason": rejection_reason,
"last_check": self._clock.time_msec(),
},
)
self.db_pool.simple_update_txn(
txn,
table="events",
keyvalues={"event_id": event_id},
updatevalues={"rejection_reason": rejection_reason},
)

self.invalidate_get_event_cache_after_txn(txn, event_id)

# TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
# call '_send_invalidation_to_replication', but we actually need the other
# end to call _invalidate_local_event_cache() rather than (just)
# _get_event_cache.invalidate().
# https://github.com/matrix-org/synapse/issues/12994
5 changes: 5 additions & 0 deletions synapse/storage/databases/main/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ def _update_state_for_partial_state_event_txn(
updatevalues={"state_group": state_group},
)

# the event may now be rejected where it was not before, or vice versa,
# in which case we need to update the rejected flags.
if bool(context.rejected) != (event.rejected_reason is not None):
self.mark_event_rejected_txn(txn, event.event_id, context.rejected)

self.db_pool.simple_delete_one_txn(
txn,
table="partial_state_events",
Expand Down

0 comments on commit 7c60acd

Please sign in to comment.