Skip to content

Commit

Permalink
Add return value to enqueueNotification
Browse files Browse the repository at this point in the history
  • Loading branch information
pcapriotti committed Sep 19, 2023
1 parent d175b16 commit 19e6fff
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ sendNotification env component path body =
runFederatorClient env . void $
clientIn (Proxy @BackendNotificationAPI) (Proxy @(FederatorClient c)) (withoutFirstSlash path) body

enqueue :: Q.Channel -> Domain -> Domain -> Q.DeliveryMode -> FedQueueClient c () -> IO ()
enqueue :: Q.Channel -> Domain -> Domain -> Q.DeliveryMode -> FedQueueClient c a -> IO a
enqueue channel originDomain targetDomain deliveryMode (FedQueueClient action) =
runReaderT action FedQueueEnv {..}

Expand Down
25 changes: 13 additions & 12 deletions services/galley/src/Galley/API/Action.hs
Original file line number Diff line number Diff line change
Expand Up @@ -804,16 +804,15 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action = do
-- because quid's backend will update local state and notify its users
-- itself using the ConversationUpdate returned by this function
if notifyOrigDomain || tDomain ruids /= qDomain quid
then (void $ fedQueueClient @'Galley @"on-conversation-updated" update, Nothing)
else (pure (), Just update)
let f = fromMaybe (mkUpdate []) . asum . map tUnqualified . rights
update = f updates
failedUpdates = lefts updates
for_ failedUpdates $
logError
"on-conversation-updated"
"An error occurred while communicating with federated server: "
pure update
then fedQueueClient @'Galley @"on-conversation-updated" update $> Nothing
else pure (Just update)
case partitionEithers updates of
(ls :: [Remote ([UserId], FederationError)], rs) -> do
for_ ls $
logError
"on-conversation-updated"
"An error occurred while communicating with federated server: "
pure $ fromMaybe (mkUpdate []) . asum . map tUnqualified $ rs

-- notify local participants and bots
pushConversationEvent con e (qualifyAs lcnv (bmLocals targets)) (bmBots targets)
Expand All @@ -822,10 +821,12 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action = do
-- to the originating domain (if it is remote)
pure $ LocalConversationUpdate e update
where
logError :: (Show a) => String -> String -> (a, FederationError) -> Sem r ()
logError :: String -> String -> Remote (a, FederationError) -> Sem r ()
logError field msg e =
P.warn $
Log.field "federation call" field . Log.msg (msg <> show e)
Log.field "federation call" field
. Log.field "domain" (_domainText (tDomain e))
. Log.msg (msg <> displayException (snd (tUnqualified e)))

-- | Update the local database with information on conversation members joining
-- or leaving. Finally, push out notifications to local users.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ data BackendNotificationQueueAccess m a where
KnownComponent c =>
Remote x ->
Q.DeliveryMode ->
FedQueueClient c () ->
BackendNotificationQueueAccess m (Either FederationError ())
FedQueueClient c a ->
BackendNotificationQueueAccess m (Either FederationError a)
EnqueueNotificationsConcurrently ::
(KnownComponent c, Foldable f, Functor f) =>
Q.DeliveryMode ->
f (Remote x) ->
(Remote [x] -> (FedQueueClient c (), b)) ->
BackendNotificationQueueAccess m [Either (Remote [x], FederationError) (Remote b)]
(Remote [x] -> FedQueueClient c a) ->
BackendNotificationQueueAccess m [Either (Remote ([x], FederationError)) (Remote a)]

makeSem ''BackendNotificationQueueAccess
16 changes: 9 additions & 7 deletions services/galley/src/Galley/Intra/BackendNotificationQueue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module Galley.Intra.BackendNotificationQueue (interpretBackendNotificationQueueA
import Control.Lens (view)
import Control.Monad.Catch
import Control.Retry
import Data.Bifunctor
import Data.Domain
import Data.Qualified
import Galley.Effects.BackendNotificationQueueAccess (BackendNotificationQueueAccess (..))
Expand Down Expand Up @@ -32,7 +33,7 @@ interpretBackendNotificationQueueAccess = interpret $ \case
EnqueueNotificationsConcurrently m xs rpc -> do
embedApp $ enqueueNotificationsConcurrently m xs rpc

enqueueNotification :: Domain -> Q.DeliveryMode -> FedQueueClient c () -> App (Either FederationError ())
enqueueNotification :: Domain -> Q.DeliveryMode -> FedQueueClient c a -> App (Either FederationError a)
enqueueNotification remoteDomain deliveryMode action = do
mChanVar <- view rabbitmqChannel
ownDomain <- view (options . settings . federationDomain)
Expand Down Expand Up @@ -61,14 +62,15 @@ enqueueNotification remoteDomain deliveryMode action = do
enqueueNotificationsConcurrently ::
(Foldable f, Functor f) =>
Q.DeliveryMode ->
f (Remote a) ->
(Remote [a] -> (FedQueueClient c (), b)) ->
App [Either (Remote [a], FederationError) (Remote b)]
f (Remote x) ->
(Remote [x] -> FedQueueClient c a) ->
App [(Either (Remote ([x], FederationError)) (Remote a))]
enqueueNotificationsConcurrently m xs f =
pooledForConcurrentlyN 8 (bucketRemote xs) $ \r ->
let o = f r
in bimap (r,) (qualifyAs r . const (snd o))
<$> enqueueNotification (tDomain r) m (fst o)
bimap
(qualifyAs r . (tUnqualified r,))
(qualifyAs r)
<$> enqueueNotification (tDomain r) m (f r)

data NoRabbitMqChannel = NoRabbitMqChannel
deriving (Show)
Expand Down

0 comments on commit 19e6fff

Please sign in to comment.