Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add experimental support for MSC3391: deleting account data #14714

Merged
merged 15 commits into from
Jan 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1898,6 +1898,19 @@ async def simple_update(
updatevalues: Dict[str, Any],
desc: str,
) -> int:
"""
Update rows in the given database table.
If the given keyvalues don't match anything, nothing will be updated.

Args:
table: The database table to update.
keyvalues: A mapping of column name to value to match rows on.
updatevalues: A mapping of column name to value to replace in any matched rows.
desc: description of the transaction, for logging and metrics.

Returns:
The number of rows that were updated. Will be 0 if no matching rows were found.
"""
return await self.runInteraction(
desc, self.simple_update_txn, table, keyvalues, updatevalues
)
Expand All @@ -1909,6 +1922,19 @@ def simple_update_txn(
keyvalues: Dict[str, Any],
updatevalues: Dict[str, Any],
) -> int:
"""
Update rows in the given database table.
If the given keyvalues don't match anything, nothing will be updated.

Args:
txn: The database transaction object.
table: The database table to update.
keyvalues: A mapping of column name to value to match rows on.
updatevalues: A mapping of column name to value to replace in any matched rows.

Returns:
The number of rows that were updated. Will be 0 if no matching rows were found.
"""
if keyvalues:
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
else:
Expand Down
168 changes: 168 additions & 0 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,74 @@ async def add_account_data_to_room(

return self._account_data_id_gen.get_current_token()

async def remove_account_data_for_room(
self, user_id: str, room_id: str, account_data_type: str
) -> Optional[int]:
"""Delete the room account data for the user of a given type.

Args:
user_id: The user to remove account_data for.
room_id: The room ID to scope the request to.
account_data_type: The account data type to delete.

Returns:
The maximum stream position, or None if there was no matching room account
data to delete.
"""
assert self._can_write_to_account_data
assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator)

def _remove_account_data_for_room_txn(
txn: LoggingTransaction, next_id: int
) -> bool:
"""
Args:
txn: The transaction object.
next_id: The stream_id to update any existing rows to.

Returns:
True if an entry in room_account_data had its content set to '{}',
otherwise False. This informs callers of whether there actually was an
existing room account data entry to delete, or if the call was a no-op.
"""
sql = """
UPDATE room_account_data
SET stream_id = ?, content = '{}'
WHERE user_id = ?
AND room_id = ?
AND account_data_type = ?
AND content != '{}'
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""
txn.execute(
sql,
(next_id, user_id, room_id, account_data_type),
)
if txn.rowcount == 0:
# We didn't update any rows. This means that there was no matching room
# account data entry to delete in the first place.
return False

return True
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

async with self._account_data_id_gen.get_next() as next_id:
row_updated = await self.db_pool.runInteraction(
"remove_account_data_for_room",
_remove_account_data_for_room_txn,
next_id,
)

if not row_updated:
return None

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_account_data_for_room.invalidate((user_id, room_id))
self.get_account_data_for_room_and_type.prefill(
(user_id, room_id, account_data_type), {}
)

return self._account_data_id_gen.get_current_token()
clokep marked this conversation as resolved.
Show resolved Hide resolved

async def add_account_data_for_user(
self, user_id: str, account_data_type: str, content: JsonDict
) -> int:
Expand Down Expand Up @@ -569,6 +637,106 @@ def _add_account_data_for_user(
self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))

async def remove_account_data_for_user(
self,
user_id: str,
account_data_type: str,
) -> Optional[int]:
"""
Delete a single piece of user account data by type.

A "delete" is performed by updating a potentially existing row in the
"account_data" database table for (user_id, account_data_type) and
setting its content to "{}".

Args:
user_id: The user ID to modify the account data of.
account_data_type: The type to remove.

Returns:
The maximum stream position, or None if there was no matching account data
to delete.
"""
assert self._can_write_to_account_data
assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator)

def _remove_account_data_for_user_txn(
txn: LoggingTransaction, next_id: int
) -> bool:
"""
Args:
txn: The transaction object.
next_id: The stream_id to update any existing rows to.

Returns:
True if an entry in account_data had its content set to '{}', otherwise
False. This informs callers of whether there actually was an existing
account data entry to delete, or if the call was a no-op.
"""
sql = """
UPDATE account_data
SET stream_id = ?, content = '{}'
WHERE user_id = ?
AND account_data_type = ?
AND content != '{}'
"""
txn.execute(sql, (next_id, user_id, account_data_type))
if txn.rowcount == 0:
# We didn't update any rows. This means that there was no matching room
# account data entry to delete in the first place.
return False

# Ignored users get denormalized into a separate table as an optimisation.
if account_data_type == AccountDataTypes.IGNORED_USER_LIST:
# If this method was called with the ignored users account data type, we
# simply delete all ignored users.

# First pull all the users that this user ignores.
previously_ignored_users = set(
self.db_pool.simple_select_onecol_txn(
txn,
table="ignored_users",
keyvalues={"ignorer_user_id": user_id},
retcol="ignored_user_id",
)
)

# Then delete them from the database.
self.db_pool.simple_delete_txn(
txn,
table="ignored_users",
keyvalues={"ignorer_user_id": user_id},
)

# Invalidate the cache for ignored users which were removed.
for ignored_user_id in previously_ignored_users:
self._invalidate_cache_and_stream(
txn, self.ignored_by, (ignored_user_id,)
)

# Invalidate for this user the cache tracking ignored users.
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))

return True

async with self._account_data_id_gen.get_next() as next_id:
row_updated = await self.db_pool.runInteraction(
"remove_account_data_for_user",
_remove_account_data_for_user_txn,
next_id,
)

if not row_updated:
return None

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_global_account_data_by_type_for_user.prefill(
(user_id, account_data_type), {}
)

return self._account_data_id_gen.get_current_token()

async def purge_account_data_for_user(self, user_id: str) -> None:
"""
Removes ALL the account data for a user.
Expand Down