From 75b1b5f801a3e06fbbde5a811dd1420497829c4f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 14 Mar 2022 15:01:29 -0400 Subject: [PATCH 01/17] Filter out events from ignored users in /relations. --- synapse/handlers/relations.py | 9 +++- tests/rest/client/test_relations.py | 80 ++++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 57135d45197b..73217d135d7c 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -21,6 +21,7 @@ from synapse.api.errors import SynapseError from synapse.events import EventBase from synapse.types import JsonDict, Requester, StreamToken +from synapse.visibility import filter_events_for_client if TYPE_CHECKING: from synapse.server import HomeServer @@ -62,6 +63,7 @@ def __bool__(self) -> bool: class RelationsHandler: def __init__(self, hs: "HomeServer"): self._main_store = hs.get_datastores().main + self._storage = hs.get_storage() self._auth = hs.get_auth() self._clock = hs.get_clock() self._event_handler = hs.get_event_handler() @@ -103,7 +105,8 @@ async def get_relations( user_id = requester.user.to_string() - await self._auth.check_user_in_room_or_world_readable( + # TODO Properly handle a user leaving a room. + (_, member_event_id) = await self._auth.check_user_in_room_or_world_readable( room_id, user_id, allow_departed_users=True ) @@ -130,6 +133,10 @@ async def get_relations( [c["event_id"] for c in pagination_chunk.chunk] ) + events = await filter_events_for_client( + self._storage, user_id, events, is_peeking=(member_event_id is None) + ) + now = self._clock.time_msec() # Do not bundle aggregations when retrieving the original event because # we want the content before relations are applied to it. diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 329690f8f7c8..fe97a0b3dde1 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -20,7 +20,7 @@ from twisted.test.proto_helpers import MemoryReactor -from synapse.api.constants import EventTypes, RelationTypes +from synapse.api.constants import AccountDataTypes, EventTypes, RelationTypes from synapse.rest import admin from synapse.rest.client import login, register, relations, room, sync from synapse.server import HomeServer @@ -1324,6 +1324,84 @@ def test_bundled_aggregations_with_filter(self) -> None: self.assertIn("m.relations", parent_event["unsigned"]) +class RelationIgnoredUserTestCase(BaseRelationsTestCase): + """Relations sent from an ignored user should be ignored.""" + + def _test_ignored_user( + self, allowed_event_ids: List[str], ignored_event_ids: List[str] + ) -> None: + """ + Fetch the relations and ensure they're all there, then ignore user2, and + repeat. + """ + # Get the relations. + event_ids = self._get_related_events() + self.assertCountEqual(event_ids, allowed_event_ids + ignored_event_ids) + + # Ignore user2 and re-do the requests. + self.get_success( + self.store.add_account_data_for_user( + self.user_id, + AccountDataTypes.IGNORED_USER_LIST, + {"ignored_users": {self.user2_id: {}}}, + ) + ) + + # Get the relations. + event_ids = self._get_related_events() + self.assertCountEqual(event_ids, allowed_event_ids) + + def test_annotation(self) -> None: + """Annotations should ignore""" + # Send 2 from us, 2 from the to be ignored user. + allowed_event_ids = [] + ignored_event_ids = [] + channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="a") + allowed_event_ids.append(channel.json_body["event_id"]) + channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="b") + allowed_event_ids.append(channel.json_body["event_id"]) + channel = self._send_relation( + RelationTypes.ANNOTATION, + "m.reaction", + key="a", + access_token=self.user2_token, + ) + ignored_event_ids.append(channel.json_body["event_id"]) + channel = self._send_relation( + RelationTypes.ANNOTATION, + "m.reaction", + key="c", + access_token=self.user2_token, + ) + ignored_event_ids.append(channel.json_body["event_id"]) + + self._test_ignored_user(allowed_event_ids, ignored_event_ids) + + def test_reference(self) -> None: + """Annotations should ignore""" + channel = self._send_relation(RelationTypes.REFERENCE, "m.room.test") + allowed_event_ids = [channel.json_body["event_id"]] + + channel = self._send_relation( + RelationTypes.REFERENCE, "m.room.test", access_token=self.user2_token + ) + ignored_event_ids = [channel.json_body["event_id"]] + + self._test_ignored_user(allowed_event_ids, ignored_event_ids) + + def test_thread(self) -> None: + """Annotations should ignore""" + channel = self._send_relation(RelationTypes.THREAD, "m.room.test") + allowed_event_ids = [channel.json_body["event_id"]] + + channel = self._send_relation( + RelationTypes.THREAD, "m.room.test", access_token=self.user2_token + ) + ignored_event_ids = [channel.json_body["event_id"]] + + self._test_ignored_user(allowed_event_ids, ignored_event_ids) + + class RelationRedactionTestCase(BaseRelationsTestCase): """ Test the behaviour of relations when the parent or child event is redacted. From 3a51cfe5aeb709af48b850ee454422887701d73a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 16 Mar 2022 14:29:11 -0400 Subject: [PATCH 02/17] Add tests for ignored users in bundled aggregations. --- tests/rest/client/test_relations.py | 73 +++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 5 deletions(-) diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index fe97a0b3dde1..20c55955e1fb 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -1328,16 +1328,27 @@ class RelationIgnoredUserTestCase(BaseRelationsTestCase): """Relations sent from an ignored user should be ignored.""" def _test_ignored_user( - self, allowed_event_ids: List[str], ignored_event_ids: List[str] - ) -> None: + self, + relation_type: str, + allowed_event_ids: List[str], + ignored_event_ids: List[str], + ) -> Tuple[JsonDict, JsonDict]: """ Fetch the relations and ensure they're all there, then ignore user2, and repeat. + + Returns: + A tuple of two JSON dictionaries, each are bundled aggregations, the + first is from before the user is ignored, and the second is after. """ # Get the relations. event_ids = self._get_related_events() self.assertCountEqual(event_ids, allowed_event_ids + ignored_event_ids) + # And the bundled aggregations. + before_aggregations = self._get_bundled_aggregations() + self.assertIn(relation_type, before_aggregations) + # Ignore user2 and re-do the requests. self.get_success( self.store.add_account_data_for_user( @@ -1351,6 +1362,12 @@ def _test_ignored_user( event_ids = self._get_related_events() self.assertCountEqual(event_ids, allowed_event_ids) + # And the bundled aggregations. + after_aggregations = self._get_bundled_aggregations() + self.assertIn(relation_type, after_aggregations) + + return before_aggregations[relation_type], after_aggregations[relation_type] + def test_annotation(self) -> None: """Annotations should ignore""" # Send 2 from us, 2 from the to be ignored user. @@ -1375,7 +1392,26 @@ def test_annotation(self) -> None: ) ignored_event_ids.append(channel.json_body["event_id"]) - self._test_ignored_user(allowed_event_ids, ignored_event_ids) + before_aggregations, after_aggregations = self._test_ignored_user( + RelationTypes.ANNOTATION, allowed_event_ids, ignored_event_ids + ) + + self.assertCountEqual( + before_aggregations["chunk"], + [ + {"type": "m.reaction", "key": "a", "count": 2}, + {"type": "m.reaction", "key": "b", "count": 1}, + {"type": "m.reaction", "key": "c", "count": 1}, + ], + ) + + self.assertCountEqual( + after_aggregations["chunk"], + [ + {"type": "m.reaction", "key": "a", "count": 1}, + {"type": "m.reaction", "key": "b", "count": 1}, + ], + ) def test_reference(self) -> None: """Annotations should ignore""" @@ -1387,7 +1423,18 @@ def test_reference(self) -> None: ) ignored_event_ids = [channel.json_body["event_id"]] - self._test_ignored_user(allowed_event_ids, ignored_event_ids) + before_aggregations, after_aggregations = self._test_ignored_user( + RelationTypes.REFERENCE, allowed_event_ids, ignored_event_ids + ) + + self.assertCountEqual( + [e["event_id"] for e in before_aggregations["chunk"]], + allowed_event_ids + ignored_event_ids, + ) + + self.assertCountEqual( + [e["event_id"] for e in after_aggregations["chunk"]], allowed_event_ids + ) def test_thread(self) -> None: """Annotations should ignore""" @@ -1399,7 +1446,23 @@ def test_thread(self) -> None: ) ignored_event_ids = [channel.json_body["event_id"]] - self._test_ignored_user(allowed_event_ids, ignored_event_ids) + before_aggregations, after_aggregations = self._test_ignored_user( + RelationTypes.THREAD, allowed_event_ids, ignored_event_ids + ) + + self.assertEqual(before_aggregations["count"], 2) + self.assertTrue(before_aggregations["current_user_participated"]) + # The latest thread event has some fields that don't matter. + self.assertEqual( + before_aggregations["latest_event"]["event_id"], ignored_event_ids[0] + ) + + self.assertEqual(after_aggregations["count"], 1) + self.assertTrue(after_aggregations["current_user_participated"]) + # The latest thread event has some fields that don't matter. + self.assertEqual( + after_aggregations["latest_event"]["event_id"], allowed_event_ids[0] + ) class RelationRedactionTestCase(BaseRelationsTestCase): From 5e73a5e7f4deda908f7f34be29899e84a953b3a1 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 16 Mar 2022 14:29:30 -0400 Subject: [PATCH 03/17] Filter out ignored users for aggregation groups. --- synapse/handlers/relations.py | 15 ++++++++++----- synapse/rest/client/relations.py | 8 +++++--- synapse/storage/database.py | 20 +++++++++++++++++--- synapse/storage/databases/main/relations.py | 13 +++++++++++++ 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 73217d135d7c..30efb27231c5 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, Iterable, Optional, cast +from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, Optional, cast import attr from frozendict import frozendict @@ -159,7 +159,7 @@ async def get_relations( return return_value async def _get_bundled_aggregation_for_event( - self, event: EventBase, user_id: str + self, event: EventBase, ignored_users: FrozenSet[str] ) -> Optional[BundledAggregations]: """Generate bundled aggregations for an event. @@ -167,7 +167,7 @@ async def _get_bundled_aggregation_for_event( Args: event: The event to calculate bundled aggregations for. - user_id: The user requesting the bundled aggregations. + ignored_users: The users ignored by the requesting user. Returns: The bundled aggregations for an event, if bundled aggregations are @@ -191,7 +191,7 @@ async def _get_bundled_aggregation_for_event( aggregations = BundledAggregations() annotations = await self._main_store.get_aggregation_groups_for_event( - event_id, room_id + event_id, room_id, ignored_users=ignored_users ) if annotations.chunk: aggregations.annotations = await annotations.to_dict( @@ -230,9 +230,14 @@ async def get_bundled_aggregations( # event ID -> bundled aggregation in non-serialized form. results: Dict[str, BundledAggregations] = {} + # Fetch any ignored users of the requesting user. + ignored_users = await self._main_store.ignored_users(user_id) + # Fetch other relations per event. for event in events_by_id.values(): - event_result = await self._get_bundled_aggregation_for_event(event, user_id) + event_result = await self._get_bundled_aggregation_for_event( + event, ignored_users + ) if event_result: results[event.event_id] = event_result diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index c16078b187ee..e4d4effd75da 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -134,10 +134,9 @@ async def on_GET( ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) + user_id = requester.user.to_string() await self.auth.check_user_in_room_or_world_readable( - room_id, - requester.user.to_string(), - allow_departed_users=True, + room_id, user_id, allow_departed_users=True ) # This checks that a) the event exists and b) the user is allowed to @@ -164,6 +163,8 @@ async def on_GET( if to_token_str: to_token = AggregationPaginationToken.from_string(to_token_str) + ignored_users = await self.store.ignored_users(user_id) + pagination_chunk = await self.store.get_aggregation_groups_for_event( event_id=parent_id, room_id=room_id, @@ -171,6 +172,7 @@ async def on_GET( limit=limit, from_token=from_token, to_token=to_token, + ignored_users=ignored_users, ) return 200, await pagination_chunk.to_dict(self.store) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 9749f0c06e86..d8ffff5af858 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -2171,7 +2171,10 @@ def simple_search_list_txn( def make_in_list_sql_clause( - database_engine: BaseDatabaseEngine, column: str, iterable: Collection[Any] + database_engine: BaseDatabaseEngine, + column: str, + iterable: Collection[Any], + include: bool = True, ) -> Tuple[str, list]: """Returns an SQL clause that checks the given column is in the iterable. @@ -2184,6 +2187,8 @@ def make_in_list_sql_clause( database_engine column: Name of the column iterable: The values to check the column against. + include: True if the resulting rows must include one of the given values, + False if it must exclude them. Returns: A tuple of SQL query and the args @@ -2192,9 +2197,18 @@ def make_in_list_sql_clause( if database_engine.supports_using_any_list: # This should hopefully be faster, but also makes postgres query # stats easier to understand. - return "%s = ANY(?)" % (column,), [list(iterable)] + if include: + sql = f"{column} = ANY(?)" + else: + sql = f"{column} != ANY(?)" + return sql, [list(iterable)] else: - return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable) + values = ",".join("?" for _ in iterable) + if include: + sql = f"{column} IN ({values})" + else: + sql = f"{column} NOT IN ({values})" + return sql, list(iterable) KV = TypeVar("KV") diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index b2295fd51f60..cd924254c2cd 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -17,6 +17,7 @@ TYPE_CHECKING, Collection, Dict, + FrozenSet, Iterable, List, Optional, @@ -260,6 +261,7 @@ async def get_aggregation_groups_for_event( direction: str = "b", from_token: Optional[AggregationPaginationToken] = None, to_token: Optional[AggregationPaginationToken] = None, + ignored_users: FrozenSet[str] = frozenset(), ) -> PaginationChunk: """Get a list of annotations on the event, grouped by event type and aggregation key, sorted by count. @@ -276,6 +278,7 @@ async def get_aggregation_groups_for_event( the lowest count first (`"f"`). from_token: Fetch rows from the given token, or from the start if None. to_token: Fetch rows up to the given token, or up to the end if None. + ignored_users: The users ignored by the requesting user. Returns: List of groups of annotations that match. Each row is a dict with @@ -293,6 +296,16 @@ async def get_aggregation_groups_for_event( where_clause.append("type = ?") where_args.append(event_type) + if ignored_users: + ( + ignored_users_clause_sql, + ignored_users_clause_args, + ) = make_in_list_sql_clause( + self.database_engine, "sender", ignored_users, include=False + ) + where_clause.append(ignored_users_clause_sql) + where_args.extend(ignored_users_clause_args) + having_clause = generate_pagination_where_clause( direction=direction, column_names=("COUNT(*)", "MAX(stream_ordering)"), From 56dd70af29aaec8d90652aa4b361c0c1a44c74bd Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 14 Mar 2022 16:00:11 -0400 Subject: [PATCH 04/17] Rename a variable. --- synapse/storage/databases/main/relations.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index cd924254c2cd..2903be5b7acf 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -152,18 +152,18 @@ def _get_recent_references_for_event_txn( last_topo_id = None last_stream_id = None - events = [] + event_ids = [] for row in txn: # Do not include edits for redacted events as they leak event # content. if not is_redacted or row[1] != RelationTypes.REPLACE: - events.append({"event_id": row[0]}) + event_ids.append({"event_id": row[0]}) last_topo_id = row[2] last_stream_id = row[3] # If there are more events, generate the next pagination key. next_token = None - if len(events) > limit and last_topo_id and last_stream_id: + if len(event_ids) > limit and last_topo_id and last_stream_id: next_key = RoomStreamToken(last_topo_id, last_stream_id) if from_token: next_token = from_token.copy_and_replace("room_key", next_key) @@ -181,7 +181,9 @@ def _get_recent_references_for_event_txn( ) return PaginationChunk( - chunk=list(events[:limit]), next_batch=next_token, prev_batch=from_token + chunk=list(event_ids[:limit]), + next_batch=next_token, + prev_batch=from_token, ) return await self.db_pool.runInteraction( From 3357181373e439ee8f5d8b336b43e884655b3e89 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 16 Mar 2022 14:28:18 -0400 Subject: [PATCH 05/17] Filter out ignored users for threads. --- synapse/handlers/relations.py | 4 +++- synapse/storage/databases/main/relations.py | 22 ++++++++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 30efb27231c5..51ae848884ac 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -253,7 +253,9 @@ async def get_bundled_aggregations( results.setdefault(event_id, BundledAggregations()).replace = edit # Fetch thread summaries. - summaries = await self._main_store.get_thread_summaries(events_by_id.keys()) + summaries = await self._main_store.get_thread_summaries( + events_by_id.keys(), ignored_users + ) # Only fetch participated for a limited selection based on what had # summaries. participated = await self._main_store.get_threads_participated( diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 2903be5b7acf..59ec569d2218 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -452,18 +452,21 @@ def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]: for original_event_id in event_ids } - @cached() - def get_thread_summary(self, event_id: str) -> Optional[Tuple[int, EventBase]]: + @cached(tree=True) + def get_thread_summary( + self, event_id: str, ignored_users: FrozenSet[str] + ) -> Optional[Tuple[int, EventBase]]: raise NotImplementedError() @cachedList(cached_method_name="get_thread_summary", list_name="event_ids") async def get_thread_summaries( - self, event_ids: Collection[str] + self, event_ids: Collection[str], ignored_users: FrozenSet[str] ) -> Dict[str, Optional[Tuple[int, EventBase, Optional[EventBase]]]]: """Get the number of threaded replies, the latest reply (if any), and the latest edit for that reply for the given event. Args: event_ids: Summarize the thread related to this event ID. + ignored_users: The users ignored by the requesting user. Returns: A map of the thread summary each event. A missing event implies there @@ -515,6 +518,16 @@ def _get_thread_summaries_txn( txn.database_engine, "relates_to_id", event_ids ) + if ignored_users: + ( + ignored_users_clause_sql, + ignored_users_clause_args, + ) = make_in_list_sql_clause( + self.database_engine, "child.sender", ignored_users, include=False + ) + clause += " AND " + ignored_users_clause_sql + args.extend(ignored_users_clause_args) + if self._msc3440_enabled: relations_clause = "(relation_type = ? OR relation_type = ?)" args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD)) @@ -551,6 +564,9 @@ def _get_thread_summaries_txn( clause, args = make_in_list_sql_clause( txn.database_engine, "relates_to_id", latest_event_ids.keys() ) + if ignored_users: + clause += " AND " + ignored_users_clause_sql + args.extend(ignored_users_clause_args) if self._msc3440_enabled: relations_clause = "(relation_type = ? OR relation_type = ?)" From 9eef5cbef70eb59ea5b38775b6f82e2bd32b9f0d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 16 Mar 2022 15:06:07 -0400 Subject: [PATCH 06/17] Filter out ignored users for references. --- synapse/handlers/relations.py | 10 +++++++++- synapse/storage/databases/main/relations.py | 12 ++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 51ae848884ac..5c4345e1bca8 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -116,6 +116,9 @@ async def get_relations( if event is None: raise SynapseError(404, "Unknown parent event.") + # Note that ignored users are not passed into get_relations_for_event + # below. Ignored users are handled in filter_events_for_client (and by + # noy passing them in here we should get a better cache hit rate). pagination_chunk = await self._main_store.get_relations_for_event( event_id=event_id, event=event, @@ -199,7 +202,12 @@ async def _get_bundled_aggregation_for_event( ) references = await self._main_store.get_relations_for_event( - event_id, event, room_id, RelationTypes.REFERENCE, direction="f" + event_id, + event, + room_id, + RelationTypes.REFERENCE, + direction="f", + ignored_users=ignored_users, ) if references.chunk: aggregations.references = await references.to_dict(cast("DataStore", self)) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 59ec569d2218..601243c20e87 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -74,6 +74,7 @@ async def get_relations_for_event( direction: str = "b", from_token: Optional[StreamToken] = None, to_token: Optional[StreamToken] = None, + ignored_users: FrozenSet[str] = frozenset(), ) -> PaginationChunk: """Get a list of relations for an event, ordered by topological ordering. @@ -89,6 +90,7 @@ async def get_relations_for_event( oldest first (`"f"`). from_token: Fetch rows from the given token, or from the start if None. to_token: Fetch rows up to the given token, or up to the end if None. + ignored_users: The users ignored by the requesting user. Returns: List of event IDs that match relations requested. The rows are of @@ -114,6 +116,16 @@ async def get_relations_for_event( where_clause.append("aggregation_key = ?") where_args.append(aggregation_key) + if ignored_users: + ( + ignored_users_clause_sql, + ignored_users_clause_args, + ) = make_in_list_sql_clause( + self.database_engine, "sender", ignored_users, include=False + ) + where_clause.append(ignored_users_clause_sql) + where_args.extend(ignored_users_clause_args) + pagination_clause = generate_pagination_where_clause( direction=direction, column_names=("topological_ordering", "stream_ordering"), From 156ef7a2b08b875d690403a8b36210c97437cf67 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 16 Mar 2022 15:26:16 -0400 Subject: [PATCH 07/17] Add a note about edits. --- synapse/handlers/relations.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 5c4345e1bca8..ca7dfb28f206 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -250,6 +250,9 @@ async def get_bundled_aggregations( results[event.event_id] = event_result # Fetch any edits (but not for redacted events). + # + # Note that there is no use in limiting edits by ignored users since the + # parent event should be ignored in the first place if the user is ignored. edits = await self._main_store.get_applicable_edits( [ event_id From 3bb8071a4cc0efe7539c84b0442ea9f610d1b98b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 16 Mar 2022 10:47:31 -0400 Subject: [PATCH 08/17] Newsfragment --- changelog.d/12227.bugfix | 1 + changelog.d/12227.misc | 1 - changelog.d/12232.bugfix | 1 + changelog.d/12232.misc | 1 - changelog.d/12235.bugfix | 1 + 5 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 changelog.d/12227.bugfix delete mode 100644 changelog.d/12227.misc create mode 100644 changelog.d/12232.bugfix delete mode 100644 changelog.d/12232.misc create mode 100644 changelog.d/12235.bugfix diff --git a/changelog.d/12227.bugfix b/changelog.d/12227.bugfix new file mode 100644 index 000000000000..1a7dccf46568 --- /dev/null +++ b/changelog.d/12227.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where events from ignored users were still considered for relations. diff --git a/changelog.d/12227.misc b/changelog.d/12227.misc deleted file mode 100644 index 41c9dcbd37f6..000000000000 --- a/changelog.d/12227.misc +++ /dev/null @@ -1 +0,0 @@ -Refactor the relations endpoints to add a `RelationsHandler`. diff --git a/changelog.d/12232.bugfix b/changelog.d/12232.bugfix new file mode 100644 index 000000000000..1a7dccf46568 --- /dev/null +++ b/changelog.d/12232.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where events from ignored users were still considered for relations. diff --git a/changelog.d/12232.misc b/changelog.d/12232.misc deleted file mode 100644 index 4a4132edff2c..000000000000 --- a/changelog.d/12232.misc +++ /dev/null @@ -1 +0,0 @@ -Refactor relations tests to improve code re-use. diff --git a/changelog.d/12235.bugfix b/changelog.d/12235.bugfix new file mode 100644 index 000000000000..1a7dccf46568 --- /dev/null +++ b/changelog.d/12235.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where events from ignored users were still considered for relations. From ecae2ad7696e5c688ee36d7d0960315b7ba93ef0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 24 Mar 2022 10:32:04 -0400 Subject: [PATCH 09/17] Do not cache on the ignored users parameter when fetching relations. --- synapse/handlers/relations.py | 47 ++++++++++++++++++--- synapse/storage/databases/main/relations.py | 40 ++++++++---------- 2 files changed, 58 insertions(+), 29 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index a9c827f6efae..50f3e19a392a 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, Optional +from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Tuple import attr from frozendict import frozendict @@ -20,6 +20,7 @@ from synapse.api.constants import RelationTypes from synapse.api.errors import SynapseError from synapse.events import EventBase +from synapse.storage.databases.main.relations import _RelatedEvent from synapse.types import JsonDict, Requester, StreamToken from synapse.visibility import filter_events_for_client @@ -117,7 +118,7 @@ async def get_relations( # Note that ignored users are not passed into get_relations_for_event # below. Ignored users are handled in filter_events_for_client (and by - # noy passing them in here we should get a better cache hit rate). + # not passing them in here we should get a better cache hit rate). related_events, next_token = await self._main_store.get_relations_for_event( event_id=event_id, event=event, @@ -131,7 +132,9 @@ async def get_relations( to_token=to_token, ) - events = await self._main_store.get_events_as_list(related_events) + events = await self._main_store.get_events_as_list( + [e.event_id for e in related_events] + ) events = await filter_events_for_client( self._storage, user_id, events, is_peeking=(member_event_id is None) @@ -165,6 +168,38 @@ async def get_relations( return return_value + async def get_references_for_event( + self, + event_id: str, + event: EventBase, + room_id: str, + ignored_users: FrozenSet[str] = frozenset(), + ) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]: + """Get a list of events which relate to an event by reference, ordered by topological ordering. + + Args: + event_id: Fetch events that relate to this event ID. + event: The matching EventBase to event_id. + room_id: The room the event belongs to. + ignored_users: The users ignored by the requesting user. + + Returns: + List of event IDs that match relations requested. The rows are of + the form `{"event_id": "..."}`. + """ + + # Call the underlying storage method, which is cached. + related_events, next_token = await self._main_store.get_relations_for_event( + event_id, event, room_id, RelationTypes.REFERENCE, direction="f" + ) + + # Filter out ignored users and convert to the expected format. + related_events = [ + event for event in related_events if event.sender not in ignored_users + ] + + return related_events, next_token + async def _get_bundled_aggregation_for_event( self, event: EventBase, ignored_users: FrozenSet[str] ) -> Optional[BundledAggregations]: @@ -203,17 +238,15 @@ async def _get_bundled_aggregation_for_event( if annotations: aggregations.annotations = {"chunk": annotations} - references, next_token = await self._main_store.get_relations_for_event( + references, next_token = await self.get_references_for_event( event_id, event, room_id, - RelationTypes.REFERENCE, - direction="f", ignored_users=ignored_users, ) if references: aggregations.references = { - "chunk": [{"event_id": event_id} for event_id in references] + "chunk": [{"event_id": event.event_id} for event in references] } if next_token: diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index f69f15b27735..36692c5a907f 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -27,6 +27,8 @@ cast, ) +import attr + from synapse.api.constants import RelationTypes from synapse.events import EventBase from synapse.storage._base import SQLBaseStore @@ -47,6 +49,12 @@ logger = logging.getLogger(__name__) +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _RelatedEvent: + event_id: str + sender: str + + class RelationsWorkerStore(SQLBaseStore): def __init__( self, @@ -71,8 +79,7 @@ async def get_relations_for_event( direction: str = "b", from_token: Optional[StreamToken] = None, to_token: Optional[StreamToken] = None, - ignored_users: FrozenSet[str] = frozenset(), - ) -> Tuple[List[str], Optional[StreamToken]]: + ) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]: """Get a list of relations for an event, ordered by topological ordering. Args: @@ -87,11 +94,10 @@ async def get_relations_for_event( oldest first (`"f"`). from_token: Fetch rows from the given token, or from the start if None. to_token: Fetch rows up to the given token, or up to the end if None. - ignored_users: The users ignored by the requesting user. Returns: A tuple of: - A list of related event IDs + A list of related event IDs & their senders. The next stream token, if one exists. """ @@ -115,16 +121,6 @@ async def get_relations_for_event( where_clause.append("aggregation_key = ?") where_args.append(aggregation_key) - if ignored_users: - ( - ignored_users_clause_sql, - ignored_users_clause_args, - ) = make_in_list_sql_clause( - self.database_engine, "sender", ignored_users, include=False - ) - where_clause.append(ignored_users_clause_sql) - where_args.extend(ignored_users_clause_args) - pagination_clause = generate_pagination_where_clause( direction=direction, column_names=("topological_ordering", "stream_ordering"), @@ -144,7 +140,7 @@ async def get_relations_for_event( order = "ASC" sql = """ - SELECT event_id, relation_type, topological_ordering, stream_ordering + SELECT event_id, relation_type, sender, topological_ordering, stream_ordering FROM event_relations INNER JOIN events USING (event_id) WHERE %s @@ -158,23 +154,23 @@ async def get_relations_for_event( def _get_recent_references_for_event_txn( txn: LoggingTransaction, - ) -> Tuple[List[str], Optional[StreamToken]]: + ) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]: txn.execute(sql, where_args + [limit + 1]) last_topo_id = None last_stream_id = None - event_ids = [] + events = [] for row in txn: # Do not include edits for redacted events as they leak event # content. if not is_redacted or row[1] != RelationTypes.REPLACE: - event_ids.append(row[0]) - last_topo_id = row[2] - last_stream_id = row[3] + events.append(_RelatedEvent(row[0], row[2])) + last_topo_id = row[3] + last_stream_id = row[4] # If there are more events, generate the next pagination key. next_token = None - if len(event_ids) > limit and last_topo_id and last_stream_id: + if len(events) > limit and last_topo_id and last_stream_id: next_key = RoomStreamToken(last_topo_id, last_stream_id) if from_token: next_token = from_token.copy_and_replace("room_key", next_key) @@ -191,7 +187,7 @@ def _get_recent_references_for_event_txn( groups_key=0, ) - return event_ids[:limit], next_token + return events[:limit], next_token return await self.db_pool.runInteraction( "get_recent_references_for_event", _get_recent_references_for_event_txn From f03a6a8ea0f5d152858d30b4c2c11f34c9529eec Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 31 Mar 2022 08:11:27 -0400 Subject: [PATCH 10/17] Newsfragment --- changelog.d/12235.bugfix | 2 +- changelog.d/12338.bugfix | 1 + changelog.d/12338.misc | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) create mode 100644 changelog.d/12338.bugfix delete mode 100644 changelog.d/12338.misc diff --git a/changelog.d/12235.bugfix b/changelog.d/12235.bugfix index 1a7dccf46568..b5d2bede67c4 100644 --- a/changelog.d/12235.bugfix +++ b/changelog.d/12235.bugfix @@ -1 +1 @@ -Fix a long-standing bug where events from ignored users were still considered for relations. +Fix a long-standing bug where events from ignored users were still considered for bundled aggregations. diff --git a/changelog.d/12338.bugfix b/changelog.d/12338.bugfix new file mode 100644 index 000000000000..b5d2bede67c4 --- /dev/null +++ b/changelog.d/12338.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where events from ignored users were still considered for bundled aggregations. diff --git a/changelog.d/12338.misc b/changelog.d/12338.misc deleted file mode 100644 index 376089f32767..000000000000 --- a/changelog.d/12338.misc +++ /dev/null @@ -1 +0,0 @@ -Refactor relations code to remove an unnecessary class. From 622b621ec244892b1d156d24a777025dd42798d4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 31 Mar 2022 09:17:15 -0400 Subject: [PATCH 11/17] Do not cache on the ignored users parameter when fetching annotations. --- synapse/handlers/relations.py | 47 +++++++++++++- synapse/storage/databases/main/relations.py | 71 ++++++++++++++------- 2 files changed, 94 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 50f3e19a392a..1fde9eaf48c3 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -200,6 +200,51 @@ async def get_references_for_event( return related_events, next_token + async def get_annotations_for_event( + self, + event_id: str, + room_id: str, + limit: int = 5, + ignored_users: FrozenSet[str] = frozenset(), + ) -> List[JsonDict]: + """Get a list of annotations on the event, grouped by event type and + aggregation key, sorted by count. + + This is used e.g. to get the what and how many reactions have happend + on an event. + + Args: + event_id: Fetch events that relate to this event ID. + room_id: The room the event belongs to. + limit: Only fetch the `limit` groups. + ignored_users: The users ignored by the requesting user. + + Returns: + List of groups of annotations that match. Each row is a dict with + `type`, `key` and `count` fields. + """ + # Get the base results for all users. + full_results = await self._main_store.get_aggregation_groups_for_event( + event_id, room_id, limit + ) + + # Then subtract off the results for any ignored users. + ignored_results = await self._main_store.get_aggregation_groups_for_users( + event_id, room_id, limit, ignored_users + ) + + filtered_results = [] + for result in full_results: + key = (result["type"], result["key"]) + if key in ignored_results: + result = result.copy() + result["count"] -= ignored_results[key] + if result["count"] <= 0: + continue + filtered_results.append(result) + + return filtered_results + async def _get_bundled_aggregation_for_event( self, event: EventBase, ignored_users: FrozenSet[str] ) -> Optional[BundledAggregations]: @@ -232,7 +277,7 @@ async def _get_bundled_aggregation_for_event( # while others need more processing during serialization. aggregations = BundledAggregations() - annotations = await self._main_store.get_aggregation_groups_for_event( + annotations = await self.get_annotations_for_event( event_id, room_id, ignored_users=ignored_users ) if annotations: diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 36692c5a907f..9755463a92a2 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -258,11 +258,7 @@ async def event_is_target_of_relation(self, parent_id: str) -> bool: @cached(tree=True) async def get_aggregation_groups_for_event( - self, - event_id: str, - room_id: str, - limit: int = 5, - ignored_users: FrozenSet[str] = frozenset(), + self, event_id: str, room_id: str, limit: int = 5 ) -> List[JsonDict]: """Get a list of annotations on the event, grouped by event type and aggregation key, sorted by count. @@ -274,46 +270,33 @@ async def get_aggregation_groups_for_event( event_id: Fetch events that relate to this event ID. room_id: The room the event belongs to. limit: Only fetch the `limit` groups. - ignored_users: The users ignored by the requesting user. Returns: List of groups of annotations that match. Each row is a dict with `type`, `key` and `count` fields. """ - where_args: List[Union[str, int]] = [ + args = [ event_id, room_id, RelationTypes.ANNOTATION, + limit, ] - ignored_users_clause_sql = "" - if ignored_users: - ( - ignored_users_clause_sql, - ignored_users_clause_args, - ) = make_in_list_sql_clause( - self.database_engine, "sender", ignored_users, include=False - ) - ignored_users_clause_sql = " AND " + ignored_users_clause_sql - where_args.extend(ignored_users_clause_args) - sql = """ SELECT type, aggregation_key, COUNT(DISTINCT sender) FROM event_relations INNER JOIN events USING (event_id) - WHERE relates_to_id = ? AND room_id = ? AND relation_type = ? %s + WHERE relates_to_id = ? AND room_id = ? AND relation_type = ? GROUP BY relation_type, type, aggregation_key ORDER BY COUNT(*) DESC LIMIT ? - """ % ( - ignored_users_clause_sql, - ) + """ def _get_aggregation_groups_for_event_txn( txn: LoggingTransaction, ) -> List[JsonDict]: - txn.execute(sql, where_args + [limit]) + txn.execute(sql, args) return [{"type": row[0], "key": row[1], "count": row[2]} for row in txn] @@ -321,6 +304,48 @@ def _get_aggregation_groups_for_event_txn( "get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn ) + async def get_aggregation_groups_for_users( + self, + event_id: str, + room_id: str, + limit: int, + users: FrozenSet[str] = frozenset(), + ) -> Dict[Tuple[str, str], int]: + if not users: + return {} + + args: List[Union[str, int]] = [ + event_id, + room_id, + RelationTypes.ANNOTATION, + ] + + users_sql, users_args = make_in_list_sql_clause( + self.database_engine, "sender", users + ) + args.extend(users_args) + + sql = f""" + SELECT type, aggregation_key, COUNT(DISTINCT sender) + FROM event_relations + INNER JOIN events USING (event_id) + WHERE relates_to_id = ? AND room_id = ? AND relation_type = ? AND {users_sql} + GROUP BY relation_type, type, aggregation_key + ORDER BY COUNT(*) DESC + LIMIT ? + """ + + def _get_aggregation_groups_for_event_txn( + txn: LoggingTransaction, + ) -> Dict[Tuple[str, str], int]: + txn.execute(sql, args + [limit]) + + return {(row[0], row[1]): row[2] for row in txn} + + return await self.db_pool.runInteraction( + "get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn + ) + @cached() def get_applicable_edit(self, event_id: str) -> Optional[EventBase]: raise NotImplementedError() From 30ce317ea3267cdc039569fabc8d129b3bf8b19c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 1 Apr 2022 10:58:26 -0400 Subject: [PATCH 12/17] Add an intermediate method for threads. --- synapse/handlers/relations.py | 79 +++++++++++++++++++++++++---------- 1 file changed, 57 insertions(+), 22 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 1fde9eaf48c3..72b538674575 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -12,7 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Tuple +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + FrozenSet, + Iterable, + List, + Optional, + Tuple, +) import attr from frozendict import frozendict @@ -302,6 +311,49 @@ async def _get_bundled_aggregation_for_event( # Store the bundled aggregations in the event metadata for later use. return aggregations + async def get_threads_for_events( + self, event_ids: Collection[str], user_id: str, ignored_users: FrozenSet[str] + ) -> Dict[str, _ThreadAggregation]: + """Get the bundled aggregations for threads for the requested events. + + Args: + event_ids: Events to get aggregations for threads. + user_id: The user requesting the bundled aggregations. + ignored_users: The users ignored by the requesting user. + + Returns: + A dictionary mapping event ID to the thread information. + + May not contain a value for all requested event IDs. + """ + # Fetch thread summaries. + summaries = await self._main_store.get_thread_summaries( + event_ids, ignored_users + ) + + # Only fetch participated for a limited selection based on what had + # summaries. + participated = await self._main_store.get_threads_participated( + [event_id for event_id, summary in summaries.items() if summary], user_id + ) + + # A map of event ID to the thread aggregation. + results = {} + + for event_id, summary in summaries.items(): + if summary: + thread_count, latest_thread_event, edit = summary + results[event_id] = _ThreadAggregation( + latest_event=latest_thread_event, + latest_edit=edit, + count=thread_count, + # If there's a thread summary it must also exist in the + # participated dictionary. + current_user_participated=participated[event_id], + ) + + return results + async def get_bundled_aggregations( self, events: Iterable[EventBase], user_id: str ) -> Dict[str, BundledAggregations]: @@ -350,27 +402,10 @@ async def get_bundled_aggregations( for event_id, edit in edits.items(): results.setdefault(event_id, BundledAggregations()).replace = edit - # Fetch thread summaries. - summaries = await self._main_store.get_thread_summaries( - events_by_id.keys(), ignored_users - ) - # Only fetch participated for a limited selection based on what had - # summaries. - participated = await self._main_store.get_threads_participated( - [event_id for event_id, summary in summaries.items() if summary], user_id + threads = await self.get_threads_for_events( + events_by_id.keys(), user_id, ignored_users ) - for event_id, summary in summaries.items(): - if summary: - thread_count, latest_thread_event, edit = summary - results.setdefault( - event_id, BundledAggregations() - ).thread = _ThreadAggregation( - latest_event=latest_thread_event, - latest_edit=edit, - count=thread_count, - # If there's a thread summary it must also exist in the - # participated dictionary. - current_user_participated=participated[event_id], - ) + for event_id, thread in threads.items(): + results.setdefault(event_id, BundledAggregations()).thread = thread return results From b0d44747d3f0ecc30e6fb45b107e29270d3b768d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 6 Apr 2022 08:50:09 -0400 Subject: [PATCH 13/17] Do not cache on the ignored users parameter when fetching threads. --- synapse/handlers/relations.py | 68 +++++++++++++++++--- synapse/storage/databases/main/relations.py | 71 +++++++++++++++------ 2 files changed, 111 insertions(+), 28 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 72b538674575..da2548f7ddbe 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -30,7 +30,7 @@ from synapse.api.errors import SynapseError from synapse.events import EventBase from synapse.storage.databases.main.relations import _RelatedEvent -from synapse.types import JsonDict, Requester, StreamToken +from synapse.types import JsonDict, Requester, StreamToken, UserID from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -177,19 +177,21 @@ async def get_relations( return return_value - async def get_references_for_event( + async def get_relations_for_event( self, event_id: str, event: EventBase, room_id: str, + relation_type: str, ignored_users: FrozenSet[str] = frozenset(), ) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]: - """Get a list of events which relate to an event by reference, ordered by topological ordering. + """Get a list of events which relate to an event, ordered by topological ordering. Args: event_id: Fetch events that relate to this event ID. event: The matching EventBase to event_id. room_id: The room the event belongs to. + relation_type: The type of relation. ignored_users: The users ignored by the requesting user. Returns: @@ -199,7 +201,7 @@ async def get_references_for_event( # Call the underlying storage method, which is cached. related_events, next_token = await self._main_store.get_relations_for_event( - event_id, event, room_id, RelationTypes.REFERENCE, direction="f" + event_id, event, room_id, relation_type, direction="f" ) # Filter out ignored users and convert to the expected format. @@ -292,10 +294,11 @@ async def _get_bundled_aggregation_for_event( if annotations: aggregations.annotations = {"chunk": annotations} - references, next_token = await self.get_references_for_event( + references, next_token = await self.get_relations_for_event( event_id, event, room_id, + RelationTypes.REFERENCE, ignored_users=ignored_users, ) if references: @@ -326,15 +329,23 @@ async def get_threads_for_events( May not contain a value for all requested event IDs. """ + user = UserID.from_string(user_id) + # Fetch thread summaries. - summaries = await self._main_store.get_thread_summaries( - event_ids, ignored_users - ) + summaries = await self._main_store.get_thread_summaries(event_ids) # Only fetch participated for a limited selection based on what had # summaries. + thread_event_ids = [ + event_id for event_id, summary in summaries.items() if summary + ] participated = await self._main_store.get_threads_participated( - [event_id for event_id, summary in summaries.items() if summary], user_id + thread_event_ids, user_id + ) + + # Then subtract off the results for any ignored users. + ignored_results = await self._main_store.get_threaded_messages_per_user( + thread_event_ids, ignored_users ) # A map of event ID to the thread aggregation. @@ -343,6 +354,45 @@ async def get_threads_for_events( for event_id, summary in summaries.items(): if summary: thread_count, latest_thread_event, edit = summary + + # Subtract off the count of any ignored users. + for ignored_user in ignored_users: + thread_count -= ignored_results.get((event_id, ignored_user), 0) + + # This is gnarly, but if the latest event is from an ignored user, + # attempt to find one that isn't from an ignored user. + if latest_thread_event.sender in ignored_users: + room_id = latest_thread_event.room_id + + # If the root event is not found, something went wrong, do + # not include a summary of the thread. + event = await self._event_handler.get_event(user, room_id, event_id) + if event is None: + continue + + potential_events, _ = await self.get_relations_for_event( + event_id, + event, + room_id, + RelationTypes.THREAD, + ignored_users, + ) + + # If all found events are from ignored users, do not include + # a summary of the thread. + if not potential_events: + continue + + # The *last* event returned is the one that is cared about. + # + # This event shuold exist. + event = await self._event_handler.get_event( + user, room_id, potential_events[-1].event_id + ) + if event is None: + continue + latest_thread_event = event + results[event_id] = _ThreadAggregation( latest_event=latest_thread_event, latest_edit=edit, diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 9755463a92a2..8f5f285888d3 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -435,21 +435,18 @@ def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]: for original_event_id in event_ids } - @cached(tree=True) - def get_thread_summary( - self, event_id: str, ignored_users: FrozenSet[str] - ) -> Optional[Tuple[int, EventBase]]: + @cached() + def get_thread_summary(self, event_id: str) -> Optional[Tuple[int, EventBase]]: raise NotImplementedError() @cachedList(cached_method_name="get_thread_summary", list_name="event_ids") async def get_thread_summaries( - self, event_ids: Collection[str], ignored_users: FrozenSet[str] + self, event_ids: Collection[str] ) -> Dict[str, Optional[Tuple[int, EventBase, Optional[EventBase]]]]: """Get the number of threaded replies, the latest reply (if any), and the latest edit for that reply for the given event. Args: event_ids: Summarize the thread related to this event ID. - ignored_users: The users ignored by the requesting user. Returns: A map of the thread summary each event. A missing event implies there @@ -501,16 +498,6 @@ def _get_thread_summaries_txn( txn.database_engine, "relates_to_id", event_ids ) - if ignored_users: - ( - ignored_users_clause_sql, - ignored_users_clause_args, - ) = make_in_list_sql_clause( - self.database_engine, "child.sender", ignored_users, include=False - ) - clause += " AND " + ignored_users_clause_sql - args.extend(ignored_users_clause_args) - if self._msc3440_enabled: relations_clause = "(relation_type = ? OR relation_type = ?)" args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD)) @@ -547,9 +534,6 @@ def _get_thread_summaries_txn( clause, args = make_in_list_sql_clause( txn.database_engine, "relates_to_id", latest_event_ids.keys() ) - if ignored_users: - clause += " AND " + ignored_users_clause_sql - args.extend(ignored_users_clause_args) if self._msc3440_enabled: relations_clause = "(relation_type = ? OR relation_type = ?)" @@ -588,6 +572,55 @@ def _get_thread_summaries_txn( return summaries + async def get_threaded_messages_per_user( + self, + event_ids: Collection[str], + users: FrozenSet[str] = frozenset(), + ) -> Dict[Tuple[str, str], int]: + if not users: + return {} + + # Fetch the number of threaded replies. + sql = """ + SELECT parent.event_id, child.sender, COUNT(child.event_id) FROM events AS child + INNER JOIN event_relations USING (event_id) + INNER JOIN events AS parent ON + parent.event_id = relates_to_id + AND parent.room_id = child.room_id + WHERE + %s + AND %s + AND %s + GROUP BY parent.event_id, child.sender + """ + + def _get_threaded_messages_per_user_txn( + txn: LoggingTransaction, + ) -> Dict[Tuple[str, str], int]: + users_sql, users_args = make_in_list_sql_clause( + self.database_engine, "child.sender", users + ) + events_clause, events_args = make_in_list_sql_clause( + txn.database_engine, "relates_to_id", event_ids + ) + + if self._msc3440_enabled: + relations_clause = "(relation_type = ? OR relation_type = ?)" + relations_args = [RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD] + else: + relations_clause = "relation_type = ?" + relations_args = [RelationTypes.THREAD] + + txn.execute( + sql % (users_sql, events_clause, relations_clause), + users_args + events_args + relations_args, + ) + return {(row[0], row[1]): row[2] for row in txn} + + return await self.db_pool.runInteraction( + "get_threaded_messages_per_user", _get_threaded_messages_per_user_txn + ) + @cached() def get_thread_participated(self, event_id: str, user_id: str) -> bool: raise NotImplementedError() From 1b2c9a1601f78e96ab963d735854e3c39e49b2f5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 6 Apr 2022 09:02:08 -0400 Subject: [PATCH 14/17] Add missing docstrings. --- synapse/storage/databases/main/relations.py | 31 +++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 8f5f285888d3..15768938f4e4 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -311,6 +311,21 @@ async def get_aggregation_groups_for_users( limit: int, users: FrozenSet[str] = frozenset(), ) -> Dict[Tuple[str, str], int]: + """Fetch the partial aggregations for an event for specific users. + + This is used, in conjunction with get_aggregation_groups_for_event, to + remove information from the results for ignored users. + + Args: + event_id: Fetch events that relate to this event ID. + room_id: The room the event belongs to. + limit: Only fetch the `limit` groups. + users: The users to fetch information for. + + Returns: + A map of (event type, aggregation key) to a count of users. + """ + if not users: return {} @@ -335,7 +350,7 @@ async def get_aggregation_groups_for_users( LIMIT ? """ - def _get_aggregation_groups_for_event_txn( + def _get_aggregation_groups_for_users_txn( txn: LoggingTransaction, ) -> Dict[Tuple[str, str], int]: txn.execute(sql, args + [limit]) @@ -343,7 +358,7 @@ def _get_aggregation_groups_for_event_txn( return {(row[0], row[1]): row[2] for row in txn} return await self.db_pool.runInteraction( - "get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn + "get_aggregation_groups_for_users", _get_aggregation_groups_for_users_txn ) @cached() @@ -577,6 +592,18 @@ async def get_threaded_messages_per_user( event_ids: Collection[str], users: FrozenSet[str] = frozenset(), ) -> Dict[Tuple[str, str], int]: + """Get the number of threaded replies for a set of users. + + This is used, in conjunction with get_thread_summaries, to calculate an + accurate count of the replies to a thread by subtracting ignored users. + + Args: + event_ids: The events to check for threaded replies. + users: The user to calculate the count of their replies. + + Returns: + A map of the (event_id, sender) to the count of their replies. + """ if not users: return {} From 96d92151d34522b61a0fdc359b1b6cd1258c0936 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 8 Apr 2022 08:19:44 -0400 Subject: [PATCH 15/17] Revert unused changes. --- synapse/storage/database.py | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 171fc90651a2..12750d9b89d4 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -2204,10 +2204,7 @@ def simple_search_list_txn( def make_in_list_sql_clause( - database_engine: BaseDatabaseEngine, - column: str, - iterable: Collection[Any], - include: bool = True, + database_engine: BaseDatabaseEngine, column: str, iterable: Collection[Any] ) -> Tuple[str, list]: """Returns an SQL clause that checks the given column is in the iterable. @@ -2220,8 +2217,6 @@ def make_in_list_sql_clause( database_engine column: Name of the column iterable: The values to check the column against. - include: True if the resulting rows must include one of the given values, - False if it must exclude them. Returns: A tuple of SQL query and the args @@ -2230,18 +2225,9 @@ def make_in_list_sql_clause( if database_engine.supports_using_any_list: # This should hopefully be faster, but also makes postgres query # stats easier to understand. - if include: - sql = f"{column} = ANY(?)" - else: - sql = f"{column} != ANY(?)" - return sql, [list(iterable)] + return "%s = ANY(?)" % (column,), [list(iterable)] else: - values = ",".join("?" for _ in iterable) - if include: - sql = f"{column} IN ({values})" - else: - sql = f"{column} NOT IN ({values})" - return sql, list(iterable) + return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable) KV = TypeVar("KV") From e2910a44790d318541d33ed255fb64f0a4fe0a4c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 8 Apr 2022 08:21:51 -0400 Subject: [PATCH 16/17] Docstring. --- synapse/storage/databases/main/relations.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 15768938f4e4..db929ef523ee 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -51,7 +51,14 @@ @attr.s(slots=True, frozen=True, auto_attribs=True) class _RelatedEvent: + """ + Contains enough information about a related event in order to properly filter + events from ignored users. + """ + + # The event ID of the related event. event_id: str + # The sender of the related event. sender: str From 7d6fa1ef113c5faf3781923261e00eb75cebde65 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 8 Apr 2022 08:32:28 -0400 Subject: [PATCH 17/17] Add logging. --- synapse/handlers/relations.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index da2548f7ddbe..0be231957750 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -384,12 +384,15 @@ async def get_threads_for_events( continue # The *last* event returned is the one that is cared about. - # - # This event shuold exist. event = await self._event_handler.get_event( user, room_id, potential_events[-1].event_id ) + # It is unexpected that the event will not exist. if event is None: + logger.warning( + "Unable to fetch latest event in a thread with event ID: %s", + potential_events[-1].event_id, + ) continue latest_thread_event = event