Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Generate real events when we reject invites #7804

Merged
merged 6 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7804.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix 'stuck invites' which happen when we are unable to reject a room invite received over federation.
23 changes: 16 additions & 7 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Optional, Tuple
from typing import TYPE_CHECKING, Optional, Tuple

from canonicaljson import encode_canonical_json, json

Expand Down Expand Up @@ -55,6 +55,9 @@

from ._base import BaseHandler

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -349,7 +352,7 @@ def _expire_event(self, event_id):


class EventCreationHandler(object):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
Expand Down Expand Up @@ -814,11 +817,17 @@ async def handle_new_client_event(
403, "This event is not allowed in this context", Codes.FORBIDDEN
)

try:
await self.auth.check_from_context(room_version, event, context)
except AuthError as err:
logger.warning("Denying new event %r because %s", event, err)
raise err
if event.internal_metadata.is_out_of_band_membership():
# the only sort of out-of-band-membership events we expect to see here
# are invite rejections we have generated ourselves.
assert event.type == EventTypes.Member
assert event.content["membership"] == Membership.LEAVE
else:
try:
await self.auth.check_from_context(room_version, event, context)
except AuthError as err:
logger.warning("Denying new event %r because %s", event, err)
raise err

# Ensure that we can round trip before trying to persist in db
try:
Expand Down
194 changes: 133 additions & 61 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2016-2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -18,17 +16,21 @@
import abc
import logging
from http import HTTPStatus
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple, Union

from unpaddedbase64 import encode_base64

from synapse import types
from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.api.room_versions import EventFormatVersions
from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase
from synapse.events.builder import create_local_event_from_event_dict
from synapse.events.snapshot import EventContext
from synapse.replication.http.membership import (
ReplicationLocallyRejectInviteRestServlet,
)
from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
from synapse.events.validator import EventValidator
from synapse.storage.roommember import RoomsForUser
from synapse.types import Collection, JsonDict, Requester, RoomAlias, RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room

Expand Down Expand Up @@ -74,10 +76,6 @@ def __init__(self, hs):
)
if self._is_on_event_persistence_instance:
self.persist_event_storage = hs.get_storage().persistence
else:
self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
hs
)

# This is only used to get at ratelimit function, and
# maybe_kick_guest_users. It's fine there are multiple of these as
Expand Down Expand Up @@ -105,46 +103,28 @@ async def _remote_join(
raise NotImplementedError()

@abc.abstractmethod
async def _remote_reject_invite(
async def remote_reject_invite(
self,
invite_event_id: str,
txn_id: Optional[str],
requester: Requester,
remote_room_hosts: List[str],
room_id: str,
target: UserID,
content: dict,
content: JsonDict,
) -> Tuple[Optional[str], int]:
"""Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected.
"""
Rejects an out-of-band invite we have received from a remote server
Args:
requester
remote_room_hosts: List of servers to use to try and reject invite
room_id
target: The user rejecting the invite
content: The content for the rejection event
invite_event_id: ID of the invite to be rejected
txn_id: optional transaction ID supplied by the client
requester: user making the rejection request, according to the access token
content: additional content to include in the rejection event.
Normally an empty dict.
Returns:
A dictionary to be returned to the client, may
include event_id etc, or nothing if we locally rejected
event id, stream_id of the leave event
"""
raise NotImplementedError()

async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
"""Mark the invite has having been rejected even though we failed to
create a leave event for it.
"""
if self._is_on_event_persistence_instance:
return await self.persist_event_storage.locally_reject_invite(
user_id, room_id
)
else:
result = await self._locally_reject_client(
instance_name=self._event_stream_writer_instance,
user_id=user_id,
room_id=room_id,
)
return result["stream_id"]

@abc.abstractmethod
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the
Expand Down Expand Up @@ -485,22 +465,28 @@ async def _update_membership(
elif effective_membership_state == Membership.LEAVE:
if not is_host_in_room:
# perhaps we've been invited
inviter = await self._get_inviter(target.to_string(), room_id)
if not inviter:
invite = await self.store.get_invite_for_local_user_in_room(
user_id=target.to_string(), room_id=room_id
) # type: Optional[RoomsForUser]
if not invite:
raise SynapseError(404, "Not a known room")

if self.hs.is_mine(inviter):
logger.info(
"%s rejects invite to %s from %s", target, room_id, invite.sender
)

if self.hs.is_mine_id(invite.sender):
# the inviter was on our server, but has now left. Carry on
# with the normal rejection codepath.
#
# This is a bit of a hack, because the room might still be
# active on other servers.
pass
else:
# send the rejection to the inviter's HS.
remote_room_hosts = remote_room_hosts + [inviter.domain]
return await self._remote_reject_invite(
requester, remote_room_hosts, room_id, target, content,
# send the rejection to the inviter's HS (with fallback to
# local event)
return await self.remote_reject_invite(
invite.event_id, txn_id, requester, content,
)

return await self._local_membership_update(
Expand Down Expand Up @@ -1014,33 +1000,119 @@ async def _remote_join(

return event_id, stream_id

async def _remote_reject_invite(
async def remote_reject_invite(
self,
invite_event_id: str,
txn_id: Optional[str],
requester: Requester,
remote_room_hosts: List[str],
room_id: str,
target: UserID,
content: dict,
content: JsonDict,
) -> Tuple[Optional[str], int]:
"""Implements RoomMemberHandler._remote_reject_invite
"""
Rejects an out-of-band invite received from a remote user
Implements RoomMemberHandler.remote_reject_invite
"""
invite_event = await self.store.get_event(invite_event_id)
room_id = invite_event.room_id
target_user = invite_event.state_key

# first of all, try doing a rejection via the inviting server
fed_handler = self.federation_handler
try:
inviter_id = UserID.from_string(invite_event.sender)
event, stream_id = await fed_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, target.to_string(), content=content,
[inviter_id.domain], room_id, target_user, content=content
)
return event.event_id, stream_id
except Exception as e:
# if we were unable to reject the exception, just mark
# it as rejected on our end and plough ahead.
# if we were unable to reject the invite, we will generate our own
# leave event.
#
# The 'except' clause is very broad, but we need to
# capture everything from DNS failures upwards
#
logger.warning("Failed to reject invite: %s", e)

stream_id = await self.locally_reject_invite(target.to_string(), room_id)
return None, stream_id
return await self._locally_reject_invite(
invite_event, txn_id, requester, content
)

async def _locally_reject_invite(
self,
invite_event: EventBase,
txn_id: Optional[str],
requester: Requester,
content: JsonDict,
) -> Tuple[str, int]:
"""Generate a local invite rejection
This is called after we fail to reject an invite via a remote server. It
generates an out-of-band membership event locally.
Args:
invite_event: the invite to be rejected
txn_id: optional transaction ID supplied by the client
requester: user making the rejection request, according to the access token
content: additional content to include in the rejection event.
Normally an empty dict.
"""

room_id = invite_event.room_id
target_user = invite_event.state_key
room_version = await self.store.get_room_version(room_id)

content["membership"] = Membership.LEAVE

# the auth events for the new event are the same as that of the invite, plus
# the invite itself.
#
# the prev_events are just the invite.
invite_hash = invite_event.event_id # type: Union[str, Tuple]
if room_version.event_format == EventFormatVersions.V1:
alg, h = compute_event_reference_hash(invite_event)
invite_hash = (invite_event.event_id, {alg: encode_base64(h)})

auth_events = invite_event.auth_events + (invite_hash,)
prev_events = (invite_hash,)

# we cap depth of generated events, to ensure that they are not
# rejected by other servers (and so that they can be persisted in
# the db)
depth = min(invite_event.depth + 1, MAX_DEPTH)

event_dict = {
"depth": depth,
"auth_events": auth_events,
"prev_events": prev_events,
"type": EventTypes.Member,
"room_id": room_id,
"sender": target_user,
"content": content,
"state_key": target_user,
}

event = create_local_event_from_event_dict(
clock=self.clock,
hostname=self.hs.hostname,
signing_key=self.hs.signing_key,
room_version=room_version,
event_dict=event_dict,
)
event.internal_metadata.outlier = True
event.internal_metadata.out_of_band_membership = True
if txn_id is not None:
event.internal_metadata.txn_id = txn_id
if requester.access_token_id is not None:
event.internal_metadata.token_id = requester.access_token_id

EventValidator().validate_new(event, self.config)

context = await self.state_handler.compute_event_context(event)
context.app_service = requester.app_service
stream_id = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[UserID.from_string(target_user)],
)
return event.event_id, stream_id

async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
Expand Down
17 changes: 9 additions & 8 deletions synapse/handlers/room_member_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,22 @@ async def _remote_join(

return ret["event_id"], ret["stream_id"]

async def _remote_reject_invite(
async def remote_reject_invite(
self,
invite_event_id: str,
txn_id: Optional[str],
requester: Requester,
remote_room_hosts: List[str],
room_id: str,
target: UserID,
content: dict,
) -> Tuple[Optional[str], int]:
"""Implements RoomMemberHandler._remote_reject_invite
"""
Rejects an out-of-band invite received from a remote user

Implements RoomMemberHandler.remote_reject_invite
"""
ret = await self._remote_reject_client(
invite_event_id=invite_event_id,
txn_id=txn_id,
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
user_id=target.to_string(),
content=content,
)
return ret["event_id"], ret["stream_id"]
Expand Down
Loading