diff --git a/changelog.d/3-bug-fixes/duplicate-member-notifications b/changelog.d/3-bug-fixes/duplicate-member-notifications new file mode 100644 index 00000000000..120b5bc7ebf --- /dev/null +++ b/changelog.d/3-bug-fixes/duplicate-member-notifications @@ -0,0 +1 @@ +Defederation notifications, federation.delete and federation.connectionRemoved, now deduplicate the user list so that we don't send them more notifications than required. \ No newline at end of file diff --git a/integration/test/Test/Defederation.hs b/integration/test/Test/Defederation.hs index 5712e33088c..73399d96280 100644 --- a/integration/test/Test/Defederation.hs +++ b/integration/test/Test/Defederation.hs @@ -1,10 +1,10 @@ module Test.Defederation where import API.BrigInternal --- import API.BrigInternal qualified as Internal --- import API.Galley (defProteus, getConversation, postConversation, qualifiedUsers) --- import Control.Applicative --- import Data.Aeson qualified as Aeson +import API.BrigInternal qualified as Internal +import API.Galley (defProteus, getConversation, postConversation, qualifiedUsers) +import Control.Applicative +import Data.Aeson qualified as Aeson import GHC.Stack import SetupHelpers import Testlib.Prelude @@ -26,54 +26,60 @@ testDefederationRemoteNotifications = do void $ deleteFedConn OwnDomain remoteDomain void $ awaitNMatches 2 3 (\n -> nPayload n %. "type" `isEqual` "federation.connectionRemoved") ws --- FUTUREWORK: temporarily disabled, enable when fixed on CI --- testDefederationNonFullyConnectedGraph :: HasCallStack => App () --- testDefederationNonFullyConnectedGraph = do --- let setFederationConfig = --- setField "optSettings.setFederationStrategy" "allowDynamic" --- >=> removeField "optSettings.setFederationDomainConfigs" --- >=> setField "optSettings.setFederationDomainConfigsUpdateFreq" (Aeson.Number 1) --- startDynamicBackends --- [ def {dbBrig = setFederationConfig}, --- def {dbBrig = setFederationConfig}, --- def {dbBrig = setFederationConfig} --- ] --- $ \dynDomains -> do --- domains@[domainA, domainB, domainC] <- pure dynDomains --- connectAllDomainsAndWaitToSync 1 domains --- [uA, uB, uC] <- createAndConnectUsers [domainA, domainB, domainC] --- -- create group conversation owned by domainA with users from domainB and domainC --- convId <- bindResponse (postConversation uA (defProteus {qualifiedUsers = [uB, uC]})) $ \r -> do --- r.status `shouldMatchInt` 201 --- r.json %. "qualified_id" +testDefederationNonFullyConnectedGraph :: HasCallStack => App () +testDefederationNonFullyConnectedGraph = do + let setFederationConfig = + setField "optSettings.setFederationStrategy" "allowDynamic" + >=> removeField "optSettings.setFederationDomainConfigs" + >=> setField "optSettings.setFederationDomainConfigsUpdateFreq" (Aeson.Number 1) + startDynamicBackends + [ def {dbBrig = setFederationConfig}, + def {dbBrig = setFederationConfig}, + def {dbBrig = setFederationConfig} + ] + $ \dynDomains -> do + domains@[domainA, domainB, domainC] <- pure dynDomains + connectAllDomainsAndWaitToSync 1 domains --- -- check conversation exists on all backends --- for [uB, uC] objQidObject >>= checkConv convId uA + -- create a few extra users and connections to make sure that does not lead to any extra `connectionRemoved` notifications + [uA, uA2, _, _, uB, uC] <- createAndConnectUsers [domainA, domainA, domainA, domainA, domainB, domainC] >>= traverse objQidObject --- -- one of the 2 non-conversation-owning domains (domainB and domainC) --- -- defederate from the other non-conversation-owning domain --- void $ Internal.deleteFedConn domainB domainC + -- create group conversation owned by domainA with users from domainB and domainC + convId <- bindResponse (postConversation uA (defProteus {qualifiedUsers = [uA2, uB, uC]})) $ \r -> do + r.status `shouldMatchInt` 201 + r.json %. "qualified_id" --- -- assert that clients from domainA receive federation.connectionRemoved events --- -- Notifications being delivered at least n times is what we want to ensure here, --- -- however they are often delivered more than once, so check that it doesn't happen --- -- hundreds of times. --- let isConnectionRemoved n = do --- correctType <- nPayload n %. "type" `isEqual` "federation.connectionRemoved" --- if correctType --- then do --- domsV <- nPayload n %. "domains" & asList --- domsStr <- for domsV asString <&> sort --- pure $ domsStr == sort [domainB, domainC] --- else pure False --- void $ awaitNToMMatches 2 10 20 isConnectionRemoved wsA + -- check conversation exists on all backends + checkConv convId uA [uB, uC, uA2] + checkConv convId uB [uA, uC, uA2] + checkConv convId uC [uA, uB, uA2] --- retryT $ checkConv convId uA [] --- where --- checkConv :: Value -> Value -> [Value] -> App () --- checkConv convId user expectedOtherMembers = do --- bindResponse (getConversation user convId) $ \r -> do --- r.status `shouldMatchInt` 200 --- members <- r.json %. "members.others" & asList --- qIds <- for members (\m -> m %. "qualified_id") --- qIds `shouldMatchSet` expectedOtherMembers + withWebSocket uA $ \wsA -> do + -- one of the 2 non-conversation-owning domains (domainB and domainC) + -- defederate from the other non-conversation-owning domain + void $ Internal.deleteFedConn domainB domainC + + -- assert that clients from domainA receive federation.connectionRemoved events + -- Notifications being delivered exactly twice + void $ awaitNMatches 2 20 (isConnectionRemoved [domainB, domainC]) wsA + + -- remote members should be removed from local conversation eventually + retryT $ checkConv convId uA [uA2] + where + isConnectionRemoved :: [String] -> Value -> App Bool + isConnectionRemoved domains n = do + correctType <- nPayload n %. "type" `isEqual` "federation.connectionRemoved" + if correctType + then do + domsV <- nPayload n %. "domains" & asList + domsStr <- for domsV asString <&> sort + pure $ domsStr == sort domains + else pure False + + checkConv :: Value -> Value -> [Value] -> App () + checkConv convId user expectedOtherMembers = do + bindResponse (getConversation user convId) $ \r -> do + r.status `shouldMatchInt` 200 + members <- r.json %. "members.others" & asList + qIds <- for members (\m -> m %. "qualified_id") + qIds `shouldMatchSet` expectedOtherMembers diff --git a/services/galley/src/Galley/Intra/Effects.hs b/services/galley/src/Galley/Intra/Effects.hs index 481c4ab5301..e6639c4ce96 100644 --- a/services/galley/src/Galley/Intra/Effects.hs +++ b/services/galley/src/Galley/Intra/Effects.hs @@ -28,6 +28,7 @@ import Cassandra (ClientState, Consistency (LocalQuorum), Page (hasMore, nextPag import Control.Lens ((.~)) import Data.Id (ProviderId, ServiceId, UserId) import Data.Range (Range (fromRange)) +import Data.Set qualified as Set import Galley.API.Error import Galley.API.Util (localBotsAndUsers) import Galley.Cassandra.Conversation.Members (toMember) @@ -141,6 +142,13 @@ interpretGundeckAccess = interpret $ \case Push ps -> embedApp $ G.push ps PushSlowly ps -> embedApp $ G.pushSlowly ps +-- FUTUREWORK: +-- This functions uses an in-memory set for tracking UserIds that we have already +-- sent notifications to. This set will only grow throughout the lifttime of this +-- function, and may cause memory & performance problems with millions of users. +-- How we are tracking which users have already been sent 0, 1, or 2 defederation +-- messages should be rethought to be more fault tollerant, e.g. this method doesn't +-- handle the server crashing and restarting. interpretDefederationNotifications :: forall r a. ( Member (Embed IO) r, @@ -154,34 +162,48 @@ interpretDefederationNotifications :: interpretDefederationNotifications = interpret $ \case SendDefederationNotifications domain -> getPage - >>= void . sendNotificationPage (Federation.FederationDelete domain) + >>= void . sendNotificationPage mempty (Federation.FederationDelete domain) SendOnConnectionRemovedNotifications domainA domainB -> getPage - >>= void . sendNotificationPage (Federation.FederationConnectionRemoved (domainA, domainB)) + >>= void . sendNotificationPage mempty (Federation.FederationConnectionRemoved (domainA, domainB)) where getPage :: Sem r (Page PageType) getPage = do maxPage <- inputs (fromRange . currentFanoutLimit . _options) -- This is based on the limits in removeIfLargeFanout + -- selectAllMembers will return duplicate members when they are in more than one chat + -- however we need the full row to build out the bot members to send notifications + -- to them. We have to do the duplicate filtering here. embedClient $ paginate selectAllMembers (paramsP LocalQuorum () maxPage) - pushEvents :: Federation.Event -> [LocalMember] -> Sem r () - pushEvents eventData results = do + pushEvents :: Set UserId -> Federation.Event -> [LocalMember] -> Sem r (Set UserId) + pushEvents seenRecipients eventData results = do let (bots, mems) = localBotsAndUsers results recipients = Intra.recipient <$> mems event = Intra.FederationEvent eventData - for_ (Intra.newPush ListComplete Nothing event recipients) $ \p -> do + filteredRecipients = + -- Deduplicate by UserId the page of recipients that we are working on + nubBy (\a b -> a._recipientUserId == b._recipientUserId) + -- Sort the remaining recipients by their IDs + $ + sortBy (\a b -> a._recipientUserId `compare` b._recipientUserId) + -- Filter out any recipient that we have already seen in a previous page + $ + filter (\r -> r._recipientUserId `notElem` seenRecipients) recipients + for_ (Intra.newPush ListComplete Nothing event filteredRecipients) $ \p -> do -- Futurework: Transient or not? -- RouteAny is used as it will wake up mobile clients -- and notify them of the changes to federation state. push1 $ p & Intra.pushRoute .~ Intra.RouteAny deliverAsync (bots `zip` repeat (G.pushEventJson event)) - sendNotificationPage :: Federation.Event -> Page PageType -> Sem r () - sendNotificationPage eventData page = do + -- Add the users to the set of users we've sent messages to. + pure $ seenRecipients <> Set.fromList ((._recipientUserId) <$> filteredRecipients) + sendNotificationPage :: Set UserId -> Federation.Event -> Page PageType -> Sem r () + sendNotificationPage seenRecipients eventData page = do let res = result page mems = mapMaybe toMember res - pushEvents eventData mems + seenRecipients' <- pushEvents seenRecipients eventData mems when (hasMore page) $ do page' <- embedClient $ nextPage page - sendNotificationPage eventData page' + sendNotificationPage seenRecipients' eventData page' type PageType = ( UserId, diff --git a/services/galley/src/Galley/Intra/Push/Internal.hs b/services/galley/src/Galley/Intra/Push/Internal.hs index 971c9e283a7..0f6deec5619 100644 --- a/services/galley/src/Galley/Intra/Push/Internal.hs +++ b/services/galley/src/Galley/Intra/Push/Internal.hs @@ -60,7 +60,7 @@ data RecipientBy user = Recipient { _recipientUserId :: user, _recipientClients :: RecipientClients } - deriving stock (Functor, Foldable, Traversable, Show) + deriving stock (Functor, Foldable, Traversable, Show, Ord, Eq) makeLenses ''RecipientBy