Skip to content

Commit

Permalink
Revert "WPB-3916: Filtering out duplicate members when sending defede…
Browse files Browse the repository at this point in the history
…ration notifications (#3515)"

This reverts commit 13e3f09.
  • Loading branch information
fisx committed Sep 14, 2023
1 parent 1be1316 commit 1b4199f
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 94 deletions.
62 changes: 0 additions & 62 deletions integration/test/Test/Defederation.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
module Test.Defederation where

import API.BrigInternal
import API.BrigInternal qualified as Internal
import API.Galley (defProteus, getConversation, postConversation, qualifiedUsers)
import Control.Applicative
import Data.Aeson qualified as Aeson
import GHC.Stack
import SetupHelpers
import Testlib.Prelude
Expand All @@ -25,61 +21,3 @@ testDefederationRemoteNotifications = do
-- Begin the whole process at Brig, the same as an operator would.
void $ deleteFedConn OwnDomain remoteDomain
void $ awaitNMatches 2 3 (\n -> nPayload n %. "type" `isEqual` "federation.connectionRemoved") ws

testDefederationNonFullyConnectedGraph :: HasCallStack => App ()
testDefederationNonFullyConnectedGraph = do
let setFederationConfig =
setField "optSettings.setFederationStrategy" "allowDynamic"
>=> removeField "optSettings.setFederationDomainConfigs"
>=> setField "optSettings.setFederationDomainConfigsUpdateFreq" (Aeson.Number 1)
startDynamicBackends
[ def {brigCfg = setFederationConfig},
def {brigCfg = setFederationConfig},
def {brigCfg = setFederationConfig}
]
$ \dynDomains -> do
domains@[domainA, domainB, domainC] <- pure dynDomains
connectAllDomainsAndWaitToSync 1 domains

-- create a few extra users and connections to make sure that does not lead to any extra `connectionRemoved` notifications
[uA, uA2, _, _, uB, uC] <- createAndConnectUsers [domainA, domainA, domainA, domainA, domainB, domainC] >>= traverse objQidObject

-- create group conversation owned by domainA with users from domainB and domainC
convId <- bindResponse (postConversation uA (defProteus {qualifiedUsers = [uA2, uB, uC]})) $ \r -> do
r.status `shouldMatchInt` 201
r.json %. "qualified_id"

-- check conversation exists on all backends
checkConv convId uA [uB, uC, uA2]
checkConv convId uB [uA, uC, uA2]
checkConv convId uC [uA, uB, uA2]

withWebSocket uA $ \wsA -> do
-- one of the 2 non-conversation-owning domains (domainB and domainC)
-- defederate from the other non-conversation-owning domain
void $ Internal.deleteFedConn domainB domainC

-- assert that clients from domainA receive federation.connectionRemoved events
-- Notifications being delivered exactly twice
void $ awaitNMatches 2 20 (isConnectionRemoved [domainB, domainC]) wsA

-- remote members should be removed from local conversation eventually
retryT $ checkConv convId uA [uA2]
where
isConnectionRemoved :: [String] -> Value -> App Bool
isConnectionRemoved domains n = do
correctType <- nPayload n %. "type" `isEqual` "federation.connectionRemoved"
if correctType
then do
domsV <- nPayload n %. "domains" & asList
domsStr <- for domsV asString <&> sort
pure $ domsStr == sort domains
else pure False

checkConv :: Value -> Value -> [Value] -> App ()
checkConv convId user expectedOtherMembers = do
bindResponse (getConversation user convId) $ \r -> do
r.status `shouldMatchInt` 200
members <- r.json %. "members.others" & asList
qIds <- for members (\m -> m %. "qualified_id")
qIds `shouldMatchSet` expectedOtherMembers
40 changes: 9 additions & 31 deletions services/galley/src/Galley/Intra/Effects.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import Cassandra (ClientState, Consistency (LocalQuorum), Page (hasMore, nextPag
import Control.Lens ((.~))
import Data.Id (ProviderId, ServiceId, UserId)
import Data.Range (Range (fromRange))
import Data.Set qualified as Set
import Galley.API.Error
import Galley.API.Util (localBotsAndUsers)
import Galley.Cassandra.Conversation.Members (toMember)
Expand Down Expand Up @@ -142,13 +141,6 @@ interpretGundeckAccess = interpret $ \case
Push ps -> embedApp $ G.push ps
PushSlowly ps -> embedApp $ G.pushSlowly ps

-- FUTUREWORK:
-- This functions uses an in-memory set for tracking UserIds that we have already
-- sent notifications to. This set will only grow throughout the lifttime of this
-- function, and may cause memory & performance problems with millions of users.
-- How we are tracking which users have already been sent 0, 1, or 2 defederation
-- messages should be rethought to be more fault tollerant, e.g. this method doesn't
-- handle the server crashing and restarting.
interpretDefederationNotifications ::
forall r a.
( Member (Embed IO) r,
Expand All @@ -162,48 +154,34 @@ interpretDefederationNotifications ::
interpretDefederationNotifications = interpret $ \case
SendDefederationNotifications domain ->
getPage
>>= void . sendNotificationPage mempty (Federation.FederationDelete domain)
>>= void . sendNotificationPage (Federation.FederationDelete domain)
SendOnConnectionRemovedNotifications domainA domainB ->
getPage
>>= void . sendNotificationPage mempty (Federation.FederationConnectionRemoved (domainA, domainB))
>>= void . sendNotificationPage (Federation.FederationConnectionRemoved (domainA, domainB))
where
getPage :: Sem r (Page PageType)
getPage = do
maxPage <- inputs (fromRange . currentFanoutLimit . _options) -- This is based on the limits in removeIfLargeFanout
-- selectAllMembers will return duplicate members when they are in more than one chat
-- however we need the full row to build out the bot members to send notifications
-- to them. We have to do the duplicate filtering here.
embedClient $ paginate selectAllMembers (paramsP LocalQuorum () maxPage)
pushEvents :: Set UserId -> Federation.Event -> [LocalMember] -> Sem r (Set UserId)
pushEvents seenRecipients eventData results = do
pushEvents :: Federation.Event -> [LocalMember] -> Sem r ()
pushEvents eventData results = do
let (bots, mems) = localBotsAndUsers results
recipients = Intra.recipient <$> mems
event = Intra.FederationEvent eventData
filteredRecipients =
-- Deduplicate by UserId the page of recipients that we are working on
nubBy (\a b -> a._recipientUserId == b._recipientUserId)
-- Sort the remaining recipients by their IDs
$
sortBy (\a b -> a._recipientUserId `compare` b._recipientUserId)
-- Filter out any recipient that we have already seen in a previous page
$
filter (\r -> r._recipientUserId `notElem` seenRecipients) recipients
for_ (Intra.newPush ListComplete Nothing event filteredRecipients) $ \p -> do
for_ (Intra.newPush ListComplete Nothing event recipients) $ \p -> do
-- Futurework: Transient or not?
-- RouteAny is used as it will wake up mobile clients
-- and notify them of the changes to federation state.
push1 $ p & Intra.pushRoute .~ Intra.RouteAny
deliverAsync (bots `zip` repeat (G.pushEventJson event))
-- Add the users to the set of users we've sent messages to.
pure $ seenRecipients <> Set.fromList ((._recipientUserId) <$> filteredRecipients)
sendNotificationPage :: Set UserId -> Federation.Event -> Page PageType -> Sem r ()
sendNotificationPage seenRecipients eventData page = do
sendNotificationPage :: Federation.Event -> Page PageType -> Sem r ()
sendNotificationPage eventData page = do
let res = result page
mems = mapMaybe toMember res
seenRecipients' <- pushEvents seenRecipients eventData mems
pushEvents eventData mems
when (hasMore page) $ do
page' <- embedClient $ nextPage page
sendNotificationPage seenRecipients' eventData page'
sendNotificationPage eventData page'

type PageType =
( UserId,
Expand Down
2 changes: 1 addition & 1 deletion services/galley/src/Galley/Intra/Push/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ data RecipientBy user = Recipient
{ _recipientUserId :: user,
_recipientClients :: RecipientClients
}
deriving stock (Functor, Foldable, Traversable, Show, Ord, Eq)
deriving stock (Functor, Foldable, Traversable, Show)

makeLenses ''RecipientBy

Expand Down

0 comments on commit 1b4199f

Please sign in to comment.