Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Store information in the database about partial-state joins
Browse files Browse the repository at this point in the history
When we get a partial_state response from send_join, store information in the
database about it:
 * store a record about the room as a whole having partial state, and stash the
   list of member servers too.
 * flag the join event itself as having partial state
 * also, for any new events whose prev-events are partial-stated, note that
   they will *also* be partial-stated.

We don't yet make any attempt to interpret this data, so API calls (and a bunch
of other things) are just going to get incorrect datat.
  • Loading branch information
richvdh committed Feb 16, 2022
1 parent 092a2eb commit 81da3a3
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 5 deletions.
9 changes: 9 additions & 0 deletions synapse/events/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class EventContext:
As with _current_state_ids, this is a private attribute. It should be
accessed via get_prev_state_ids.
partial_state: if True, we may be storing this event with a temporary,
incomplete state.
"""

rejected: Union[bool, str] = False
Expand All @@ -113,12 +116,15 @@ class EventContext:
_current_state_ids: Optional[StateMap[str]] = None
_prev_state_ids: Optional[StateMap[str]] = None

partial_state: bool = False

@staticmethod
def with_state(
state_group: Optional[int],
state_group_before_event: Optional[int],
current_state_ids: Optional[StateMap[str]],
prev_state_ids: Optional[StateMap[str]],
partial_state: bool,
prev_group: Optional[int] = None,
delta_ids: Optional[StateMap[str]] = None,
) -> "EventContext":
Expand All @@ -129,6 +135,7 @@ def with_state(
state_group_before_event=state_group_before_event,
prev_group=prev_group,
delta_ids=delta_ids,
partial_state=partial_state,
)

@staticmethod
Expand Down Expand Up @@ -170,6 +177,7 @@ async def serialize(self, event: EventBase, store: "DataStore") -> JsonDict:
"prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids),
"app_service_id": self.app_service.id if self.app_service else None,
"partial_state": self.partial_state,
}

@staticmethod
Expand All @@ -196,6 +204,7 @@ def deserialize(storage: "Storage", input: JsonDict) -> "EventContext":
prev_group=input["prev_group"],
delta_ids=_decode_state_dict(input["delta_ids"]),
rejected=input["rejected"],
partial_state=input.get("partial_state", False),
)

app_service_id = input["app_service_id"]
Expand Down
11 changes: 10 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,17 @@ async def do_invite_join(
auth_events=auth_chain,
)

if ret.partial_state:
await self.store.store_partial_state_room(room_id, ret.servers_in_room)

max_stream_id = await self._federation_event_handler.process_remote_join(
origin, room_id, auth_chain, state, event, room_version_obj
origin,
room_id,
auth_chain,
state,
event,
room_version_obj,
partial_state=ret.partial_state,
)

# We wait here until this instance has seen the events come down
Expand Down
11 changes: 9 additions & 2 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ async def process_remote_join(
state: List[EventBase],
event: EventBase,
room_version: RoomVersion,
partial_state: bool,
) -> int:
"""Persists the events returned by a send_join
Expand All @@ -412,6 +413,7 @@ async def process_remote_join(
event
room_version: The room version we expect this room to have, and
will raise if it doesn't match the version in the create event.
partial_state: True if the state omits non-critical membership events
Returns:
The stream ID after which all events have been persisted.
Expand Down Expand Up @@ -453,10 +455,14 @@ async def process_remote_join(
)

# and now persist the join event itself.
logger.info("Peristing join-via-remote %s", event)
logger.info(
"Peristing join-via-remote %s (partial_state: %s)", event, partial_state
)
with nested_logging_context(suffix=event.event_id):
context = await self._state_handler.compute_event_context(
event, old_state=state
event,
old_state=state,
partial_state=partial_state,
)

context = await self._check_event_auth(origin, event, context)
Expand Down Expand Up @@ -1791,6 +1797,7 @@ async def _update_context_for_auth_events(
prev_state_ids=prev_state_ids,
prev_group=prev_group,
delta_ids=state_updates,
partial_state=context.partial_state,
)

async def _run_push_actions_and_persist_event(
Expand Down
27 changes: 25 additions & 2 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,10 @@ async def get_hosts_in_room_at_events(
return await self.store.get_joined_hosts(room_id, entry)

async def compute_event_context(
self, event: EventBase, old_state: Optional[Iterable[EventBase]] = None
self,
event: EventBase,
old_state: Optional[Iterable[EventBase]] = None,
partial_state: bool = False,
) -> EventContext:
"""Build an EventContext structure for a non-outlier event.
Expand All @@ -273,6 +276,8 @@ async def compute_event_context(
calculated from existing events. This is normally only specified
when receiving an event from federation where we don't have the
prev events for, e.g. when backfilling.
partial_state: True if `old_state` is partial and omits non-critical
membership events
Returns:
The event context.
"""
Expand All @@ -295,8 +300,24 @@ async def compute_event_context(

else:
# otherwise, we'll need to resolve the state across the prev_events.
logger.debug("calling resolve_state_groups from compute_event_context")

# partial_state should not be set explicitly in this case:
# we work it out dynamically
assert not partial_state

# if any of the prev-events have partial state, so do we.
# (This is slightly racy - the prev-events might get fixed up before we use
# their states - but I don't think that really matters; it just means we
# might redundantly recalculate the state for this event later.)
prev_event_ids = event.prev_event_ids()
incomplete_prev_events = await self.store.get_partial_state_events(
prev_event_ids
)
if any(incomplete_prev_events):
logger.debug("Incoming event refers to prev-events with partial state")
partial_state = True

logger.debug("calling resolve_state_groups from compute_event_context")
entry = await self.resolve_state_groups_for_events(
event.room_id, event.prev_event_ids()
)
Expand Down Expand Up @@ -342,6 +363,7 @@ async def compute_event_context(
prev_state_ids=state_ids_before_event,
prev_group=state_group_before_event_prev_group,
delta_ids=deltas_to_state_group_before_event,
partial_state=partial_state,
)

#
Expand Down Expand Up @@ -373,6 +395,7 @@ async def compute_event_context(
prev_state_ids=state_ids_before_event,
prev_group=state_group_before_event,
delta_ids=delta_ids,
partial_state=partial_state,
)

@measure_func()
Expand Down
17 changes: 17 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2152,6 +2152,23 @@ def _store_event_state_mappings_txn(

state_groups[event.event_id] = context.state_group

# if we have partial state for these events, record the fact. (This happens
# here rather than in _store_event_txn because it also needs to happen when
# we de-outlier an event.)
self.db_pool.simple_insert_many_txn(
txn,
table="partial_state_events",
keys=("room_id", "event_id"),
values=[
(
event.room_id,
event.event_id,
)
for event, ctx in events_and_contexts
if ctx.partial_state
],
)

self.db_pool.simple_upsert_many_txn(
txn,
table="event_to_state_groups",
Expand Down
28 changes: 28 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1953,3 +1953,31 @@ def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]:
"get_event_id_for_timestamp_txn",
get_event_id_for_timestamp_txn,
)

@cachedList("is_partial_state_event", list_name="event_ids")
async def get_partial_state_events(
self, event_ids: Collection[str]
) -> Dict[str, bool]:
"""Checks which of the given events have partial state"""
result = await self.db_pool.simple_select_many_batch(
table="partial_state_events",
column="event_id",
iterable=event_ids,
retcols=["event_id"],
desc="get_partial_state_events",
)
# convert the result to a dict, to make @cachedList work
partial = {r["event_id"] for r in result}
return {e_id: e_id in partial for e_id in event_ids}

@cached()
async def is_partial_state_event(self, event_id: str) -> bool:
"""Checks if the given event has partial state"""
result = await self.db_pool.simple_select_one_onecol(
table="partial_state_events",
keyvalues={"event_id": event_id},
retcol="1",
allow_none=True,
desc="is_partial_state_event",
)
return result is not None
37 changes: 37 additions & 0 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
TYPE_CHECKING,
Any,
Awaitable,
Collection,
Dict,
List,
Optional,
Expand Down Expand Up @@ -1543,6 +1544,42 @@ async def upsert_room_on_join(
lock=False,
)

async def store_partial_state_room(
self,
room_id: str,
servers: Collection[str],
) -> None:
"""Mark the given room as containing events with partial state
Args:
room_id: the ID of the room
servers: other servers known to be in the room
"""
await self.db_pool.runInteraction(
"store_partial_state_room",
self._store_partial_state_room_txn,
room_id,
servers,
)

@staticmethod
def _store_partial_state_room_txn(
txn: LoggingTransaction, room_id: str, servers: Collection[str]
) -> None:
DatabasePool.simple_insert_txn(
txn,
table="partial_state_rooms",
values={
"room_id": room_id,
},
)
DatabasePool.simple_insert_many_txn(
txn,
table="partial_state_rooms_servers",
keys=("room_id", "server_name"),
values=((room_id, s) for s in servers),
)

async def maybe_store_room_on_outlier_membership(
self, room_id: str, room_version: RoomVersion
) -> None:
Expand Down

0 comments on commit 81da3a3

Please sign in to comment.