Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Only send out device list updates for our own users #12465

Merged
merged 4 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12465.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Enable processing of device list updates asynchronously.
10 changes: 7 additions & 3 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,9 +649,13 @@ async def _handle_new_device_update_async(self) -> None:
return

for user_id, device_id, room_id, stream_id, opentracing_context in rows:
joined_user_ids = await self.store.get_users_in_room(room_id)
hosts = {get_domain_from_id(u) for u in joined_user_ids}
hosts.discard(self.server_name)
hosts = set()

# Ignore any users that aren't ours
if self.hs.is_mine_id(user_id):
joined_user_ids = await self.store.get_users_in_room(room_id)
hosts = {get_domain_from_id(u) for u in joined_user_ids}
hosts.discard(self.server_name)

# Check if we've already sent this update to some hosts
if current_stream_id == stream_id:
Expand Down
4 changes: 3 additions & 1 deletion synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1703,7 +1703,9 @@ def _add_device_outbound_poke_to_stream_txn(
next(stream_id_iterator),
user_id,
device_id,
False,
not self.hs.is_mine_id(
user_id
), # We only need to send out update for *our* users
now,
encoded_context if whitelisted_homeserver(destination) else "{}",
)
Expand Down
43 changes: 42 additions & 1 deletion tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):

def make_homeserver(self, reactor, clock):
return self.setup_test_homeserver(
federation_transport_client=Mock(spec=["send_transaction"]),
federation_transport_client=Mock(
spec=["send_transaction", "query_user_devices"]
),
)

def default_config(self):
Expand Down Expand Up @@ -218,6 +220,45 @@ def test_send_device_updates(self):
self.assertEqual(len(self.edus), 1)
self.check_device_update_edu(self.edus.pop(0), u1, "D2", stream_id)

def test_dont_send_device_updates_for_remote_users(self):
"""Check that we don't send device updates for remote users"""

# Send the server a device list EDU for the other user, this will cause
# it to try and resync the device lists.
self.hs.get_federation_transport_client().query_user_devices.return_value = (
defer.succeed(
{
"stream_id": "1",
"user_id": "@user2:host2",
"devices": [{"device_id": "D1"}],
}
)
)

self.get_success(
self.hs.get_device_handler().device_list_updater.incoming_device_list_update(
"host2",
{
"user_id": "@user2:host2",
"device_id": "D1",
"stream_id": "1",
"prev_ids": [],
},
)
)

self.reactor.advance(1)

# We shouldn't see an EDU for that update
self.assertEqual(self.edus, [])

# Check that we did successfully process the inbound EDU (otherwise this
# test would pass if we failed to process the EDU)
devices = self.get_success(
self.hs.get_datastores().main.get_cached_devices_for_user("@user2:host2")
)
self.assertIn("D1", devices)

def test_upload_signatures(self):
"""Uploading signatures on some devices should produce updates for that user"""

Expand Down
6 changes: 3 additions & 3 deletions tests/storage/test_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_get_device_updates_by_remote(self):
device_ids = ["device_id1", "device_id2"]

# Add two device updates with sequential `stream_id`s
self.add_device_change("user_id", device_ids, "somehost")
self.add_device_change("@user_id:test", device_ids, "somehost")

# Get all device updates ever meant for this remote
now_stream_id, device_updates = self.get_success(
Expand All @@ -142,7 +142,7 @@ def test_get_device_updates_by_remote_can_limit_properly(self):
"device_id4",
"device_id5",
]
self.add_device_change("user_id", device_ids, "somehost")
self.add_device_change("@user_id:test", device_ids, "somehost")

# Get device updates meant for this remote
next_stream_id, device_updates = self.get_success(
Expand All @@ -162,7 +162,7 @@ def test_get_device_updates_by_remote_can_limit_properly(self):

# Add some more device updates to ensure it still resumes properly
device_ids = ["device_id6", "device_id7"]
self.add_device_change("user_id", device_ids, "somehost")
self.add_device_change("@user_id:test", device_ids, "somehost")

# Get the next batch of device updates
next_stream_id, device_updates = self.get_success(
Expand Down