Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Process device list updates asynchronously #12365

Merged
merged 8 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12365.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Enable processing of device list updates asynchronously.
8 changes: 0 additions & 8 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,14 +680,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
config.get("use_account_validity_in_account_status") or False
)

# This is a temporary option that enables fully using the new
# `device_lists_changes_in_room` without the backwards compat code. This
# is primarily for testing. If enabled the server should *not* be
# downgraded, as it may lead to missing device list updates.
self.use_new_device_lists_changes_in_room = (
config.get("use_new_device_lists_changes_in_room") or False
)

self.rooms_to_exclude_from_sync: List[str] = (
config.get("exclude_rooms_from_sync") or []
)
Expand Down
28 changes: 0 additions & 28 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,6 @@ def __init__(self, hs: "HomeServer"):
# On start up check if there are any updates pending.
hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)

# Used to decide if we calculate outbound pokes up front or not. By
# default we do to allow safely downgrading Synapse.
self.use_new_device_lists_changes_in_room = (
hs.config.server.use_new_device_lists_changes_in_room
)

def _check_device_name_length(self, name: Optional[str]) -> None:
"""
Checks whether a device name is longer than the maximum allowed length.
Expand Down Expand Up @@ -490,23 +484,9 @@ async def notify_device_update(

room_ids = await self.store.get_rooms_for_user(user_id)

hosts: Optional[Set[str]] = None
if not self.use_new_device_lists_changes_in_room:
hosts = set()

if self.hs.is_mine_id(user_id):
for room_id in room_ids:
joined_users = await self.store.get_users_in_room(room_id)
hosts.update(get_domain_from_id(u) for u in joined_users)

set_tag("target_hosts", hosts)

hosts.discard(self.server_name)

position = await self.store.add_device_change_to_streams(
user_id,
device_ids,
hosts=hosts,
room_ids=room_ids,
)

Expand All @@ -528,14 +508,6 @@ async def notify_device_update(
# We may need to do some processing asynchronously.
self._handle_new_device_update_async()

if hosts:
logger.info(
"Sending device list update notif for %r to: %r", user_id, hosts
)
for host in hosts:
self.federation_sender.send_device_messages(host, immediate=False)
log_kv({"message": "sent device update to host", "host": host})

async def notify_user_signature_update(
self, from_user_id: str, user_ids: List[str]
) -> None:
Expand Down
61 changes: 9 additions & 52 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,6 @@ async def add_device_change_to_streams(
self,
user_id: str,
device_ids: Collection[str],
hosts: Optional[Collection[str]],
room_ids: Collection[str],
) -> Optional[int]:
"""Persist that a user's devices have been updated, and which hosts
Expand All @@ -1592,9 +1591,6 @@ async def add_device_change_to_streams(
user_id: The ID of the user whose device changed.
device_ids: The IDs of any changed devices. If empty, this function will
return None.
hosts: The remote destinations that should be notified of the change. If
None then the set of hosts have *not* been calculated, and will be
calculated later by a background task.
room_ids: The rooms that the user is in

Returns:
Expand All @@ -1606,58 +1602,30 @@ async def add_device_change_to_streams(

context = get_active_span_text_map()

def add_device_changes_txn(
txn, stream_ids_for_device_change, stream_ids_for_outbound_pokes
):
def add_device_changes_txn(txn, stream_ids):
self._add_device_change_to_stream_txn(
txn,
user_id,
device_ids,
stream_ids_for_device_change,
stream_ids,
)

self._add_device_outbound_room_poke_txn(
txn,
user_id,
device_ids,
room_ids,
stream_ids_for_device_change,
context,
hosts_have_been_calculated=hosts is not None,
)

# If the set of hosts to send to has not been calculated yet (and so
# `hosts` is None) or there are no `hosts` to send to, then skip
# trying to persist them to the DB.
if not hosts:
return

self._add_device_outbound_poke_to_stream_txn(
txn,
user_id,
device_ids,
hosts,
stream_ids_for_outbound_pokes,
stream_ids,
context,
)

# `device_lists_stream` wants a stream ID per device update.
num_stream_ids = len(device_ids)

if hosts:
# `device_lists_outbound_pokes` wants a different stream ID for
# each row, which is a row per host per device update.
num_stream_ids += len(hosts) * len(device_ids)

async with self._device_list_id_gen.get_next_mult(num_stream_ids) as stream_ids:
stream_ids_for_device_change = stream_ids[: len(device_ids)]
stream_ids_for_outbound_pokes = stream_ids[len(device_ids) :]

async with self._device_list_id_gen.get_next_mult(
len(device_ids)
) as stream_ids:
await self.db_pool.runInteraction(
"add_device_change_to_stream",
add_device_changes_txn,
stream_ids_for_device_change,
stream_ids_for_outbound_pokes,
stream_ids,
)

return stream_ids[-1]
Expand Down Expand Up @@ -1752,19 +1720,8 @@ def _add_device_outbound_room_poke_txn(
room_ids: Collection[str],
stream_ids: List[str],
context: Dict[str, str],
hosts_have_been_calculated: bool,
) -> None:
"""Record the user in the room has updated their device.

Args:
hosts_have_been_calculated: True if `device_lists_outbound_pokes`
has been updated already with the updates.
"""

# We only need to convert to outbound pokes if they are our user.
converted_to_destinations = (
hosts_have_been_calculated or not self.hs.is_mine_id(user_id)
)
"""Record the user in the room has updated their device."""

encoded_context = json_encoder.encode(context)

Expand All @@ -1789,7 +1746,7 @@ def _add_device_outbound_room_poke_txn(
device_id,
room_id,
stream_id,
converted_to_destinations,
False,
encoded_context,
)
for room_id in room_ids
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@


SCHEMA_COMPAT_VERSION = (
# we now have `state_key` columns in both `events` and `state_events`, so
# now incompatible with synapses wth SCHEMA_VERSION < 66.
66
# We now assume that `device_lists_changes_in_room` has been filled out for
# recent device_list_updates.
69
)
"""Limit on how far the synapse codebase can be rolled back without breaking db compat

Expand Down
8 changes: 0 additions & 8 deletions tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from typing import Optional
from unittest.mock import Mock

from parameterized import parameterized_class
from signedjson import key, sign
from signedjson.types import BaseKey, SigningKey

Expand Down Expand Up @@ -155,12 +154,6 @@ def test_send_receipts_with_backoff(self):
)


@parameterized_class(
[
{"enable_room_poke_code_path": False},
{"enable_room_poke_code_path": True},
]
)
class FederationSenderDevicesTestCases(HomeserverTestCase):
servlets = [
admin.register_servlets,
Expand All @@ -175,7 +168,6 @@ def make_homeserver(self, reactor, clock):
def default_config(self):
c = super().default_config()
c["send_federation"] = True
c["use_new_device_lists_changes_in_room"] = self.enable_room_poke_code_path
return c

def prepare(self, reactor, clock, hs):
Expand Down
47 changes: 27 additions & 20 deletions tests/storage/test_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,29 @@ class DeviceStoreTestCase(HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastores().main

def add_device_change(self, user_id, device_ids, host):
"""Add a device list change for the given device to
`device_lists_outbound_pokes` table.
"""

for device_id in device_ids:
stream_id = self.get_success(
self.store.add_device_change_to_streams(
"user_id", [device_id], ["!some:room"]
)
)

self.get_success(
self.store.add_device_list_outbound_pokes(
user_id=user_id,
device_id=device_id,
room_id="!some:room",
stream_id=stream_id,
hosts=[host],
context={},
)
)

def test_store_new_device(self):
self.get_success(
self.store.store_device("user_id", "device_id", "display_name")
Expand Down Expand Up @@ -95,11 +118,7 @@ def test_get_device_updates_by_remote(self):
device_ids = ["device_id1", "device_id2"]

# Add two device updates with sequential `stream_id`s
self.get_success(
self.store.add_device_change_to_streams(
"user_id", device_ids, ["somehost"], ["!some:room"]
)
)
self.add_device_change("user_id", device_ids, "somehost")

# Get all device updates ever meant for this remote
now_stream_id, device_updates = self.get_success(
Expand All @@ -123,11 +142,7 @@ def test_get_device_updates_by_remote_can_limit_properly(self):
"device_id4",
"device_id5",
]
self.get_success(
self.store.add_device_change_to_streams(
"user_id", device_ids, ["somehost"], ["!some:room"]
)
)
self.add_device_change("user_id", device_ids, "somehost")

# Get device updates meant for this remote
next_stream_id, device_updates = self.get_success(
Expand All @@ -147,11 +162,7 @@ def test_get_device_updates_by_remote_can_limit_properly(self):

# Add some more device updates to ensure it still resumes properly
device_ids = ["device_id6", "device_id7"]
self.get_success(
self.store.add_device_change_to_streams(
"user_id", device_ids, ["somehost"], ["!some:room"]
)
)
self.add_device_change("user_id", device_ids, "somehost")

# Get the next batch of device updates
next_stream_id, device_updates = self.get_success(
Expand Down Expand Up @@ -224,11 +235,7 @@ def test_get_device_updates_by_remote_cross_signing_key_updates(
"fakeSelfSigning",
]

self.get_success(
self.store.add_device_change_to_streams(
"@user_id:test", device_ids, ["somehost"], ["!some:room"]
)
)
self.add_device_change("@user_id:test", device_ids, "somehost")

# Get device updates meant for this remote
next_stream_id, device_updates = self.get_success(
Expand Down