Skip to content

Commit

Permalink
wire-subsystems: Make PushSlowly a NotificationSubsystem action
Browse files Browse the repository at this point in the history
This makes it easier to get the delay interval from the config
  • Loading branch information
akshaymankar committed Jan 8, 2024
1 parent 4783ed4 commit b115394
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 42 deletions.
17 changes: 17 additions & 0 deletions libs/polysemy-wire-zoo/src/Wire/Sem/Delay.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,20 @@ makeSem ''Delay
runDelay :: Member (Embed IO) r => Sem (Delay ': r) a -> Sem r a
runDelay = interpret $ \case
Delay i -> threadDelay i

runControlledDelay :: forall r a. (Member (Embed IO) r) => MVar Int -> Sem (Delay : r) a -> Sem r a
runControlledDelay tickSource = interpret $ \case
Delay n -> waitForTicks n
where
waitForTicks :: Int -> Sem r ()
waitForTicks 0 = pure ()
waitForTicks remaining0 = do
passedTicks <- takeMVar tickSource
let remaining = remaining0 - passedTicks
if remaining <= 0
then pure ()
else waitForTicks remaining

runDelayInstantly :: Sem (Delay : r) a -> Sem r a
runDelayInstantly = interpret $ \case
Delay _ -> pure ()
17 changes: 1 addition & 16 deletions libs/wire-subsystems/src/Wire/NotificationSubsystem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import Gundeck.Types hiding (Push (..), Recipient, newPush)
import Imports
import Polysemy
import Wire.Arbitrary
import Wire.Sem.Delay (Delay, delay)

data RecipientBy user = Recipient
{ _recipientUserId :: user,
Expand Down Expand Up @@ -41,24 +40,10 @@ type PushToUser = PushTo UserId

data NotificationSubsystem m a where
Push :: [PushToUser] -> NotificationSubsystem m ()
PushSlowly :: [PushToUser] -> NotificationSubsystem m ()

makeSem ''NotificationSubsystem

-- TODO: Test
pushSlowly ::
( Member NotificationSubsystem r,
Member Delay r
) =>
[PushToUser] ->
Sem r ()
pushSlowly ps = do
-- TODO this comes from the app configuration
let mmillies = 10000
d = 1000 * mmillies
for_ ps \p -> do
delay d
push [p]

newPush1 :: Maybe UserId -> Object -> NonEmpty Recipient -> PushToUser
newPush1 from e rr =
PushTo
Expand Down
22 changes: 20 additions & 2 deletions libs/wire-subsystems/src/Wire/NotificationSubsystem/Interpreter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@ import Polysemy.Input
import Wire.API.Team.Member
import Wire.GundeckAPIAccess
import Wire.NotificationSubsystem
import Wire.Sem.Delay

-- | We interpret this using 'GundeckAPIAccess' so we can mock it out for testing.
runNotificationSubsystemGundeck ::
( Member (GundeckAPIAccess) r,
Member Async r
Member Async r,
Member Delay r
) =>
NotificationSubsystemConfig ->
Sem (NotificationSubsystem : r) a ->
Sem r a
runNotificationSubsystemGundeck cfg = interpret $ \case
Push ps -> runInputConst cfg $ pushImpl ps
PushSlowly ps -> runInputConst cfg $ pushSlowlyImpl ps

data NotificationSubsystemConfig = NotificationSubsystemConfig
{ fanoutLimit :: Range 1 HardTruncationLimit Int32,
chunkSize :: Natural
chunkSize :: Natural,
-- | Microseconds
slowPushDelay :: Int
}

-- TODO: write a test for listtype
Expand Down Expand Up @@ -104,3 +109,16 @@ chunkPushes maxRecipients
splitPush n p =
let (r1, r2) = splitAt (fromIntegral n) (toList p._pushRecipients)
in (p {_pushRecipients = fromJust $ nonEmpty r1}, p {_pushRecipients = fromJust $ nonEmpty r2})

pushSlowlyImpl ::
( Member Delay r,
Member (Input NotificationSubsystemConfig) r,
Member GundeckAPIAccess r,
Member Async r
) =>
[PushToUser] ->
Sem r ()
pushSlowlyImpl ps =
for_ ps \p -> do
delay =<< inputs slowPushDelay
pushImpl [p]
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Wire.NotificationSubsystem.InterpreterSpec (spec) where

import Control.Concurrent.Async (async, wait)
import Data.Data (Proxy (Proxy))
import Data.List.NonEmpty (NonEmpty ((:|)), fromList)
import Data.List1 qualified as List1
Expand All @@ -9,15 +10,16 @@ import Gundeck.Types.Push.V2 qualified as V2
import Imports
import Numeric.Natural (Natural)
import Polysemy
import Polysemy.Async (asyncToIOFinal)
import Polysemy.Async (Async, asyncToIOFinal)
import Polysemy.Input
import Polysemy.Writer (tell, writerToIOFinal)
import System.Timeout (timeout)
import Test.Hspec
import Test.QuickCheck
import Test.QuickCheck.Instances ()
import Wire.GundeckAPIAccess
import Wire.NotificationSubsystem
import Wire.NotificationSubsystem.Interpreter
import Wire.Sem.Delay

spec :: Spec
spec = describe "NotificationSubsystem.Interpreter" do
Expand All @@ -26,7 +28,8 @@ spec = describe "NotificationSubsystem.Interpreter" do
let mockConfig =
NotificationSubsystemConfig
{ fanoutLimit = toRange $ Proxy @30,
chunkSize = 12
chunkSize = 12,
slowPushDelay = 0
}

connId2 <- generate arbitrary
Expand Down Expand Up @@ -68,12 +71,7 @@ spec = describe "NotificationSubsystem.Interpreter" do
largePush
]

actualPushes <-
runFinal
. asyncToIOFinal
. runGundeckAPIAccessMock
. runInputConst mockConfig
$ pushImpl pushes
(_, actualPushes) <- runMockStack mockConfig $ pushImpl pushes

let expectedPushes =
map toV2Push
Expand All @@ -87,7 +85,8 @@ spec = describe "NotificationSubsystem.Interpreter" do
let mockConfig =
NotificationSubsystemConfig
{ fanoutLimit = toRange $ Proxy @30,
chunkSize = 12
chunkSize = 12,
slowPushDelay = 0
}

connId2 <- generate arbitrary
Expand Down Expand Up @@ -122,12 +121,7 @@ spec = describe "NotificationSubsystem.Interpreter" do
pushSmallerThanFanoutLimit
]

actualPushes <-
runFinal
. asyncToIOFinal
. runGundeckAPIAccessMock
. runInputConst mockConfig
$ pushImpl pushes
(_, actualPushes) <- runMockStack mockConfig $ pushImpl pushes

let expectedPushes =
map toV2Push
Expand All @@ -137,6 +131,61 @@ spec = describe "NotificationSubsystem.Interpreter" do
chunkPushes mockConfig.chunkSize [pushSmallerThanFanoutLimit]
actualPushes `shouldBe` expectedPushes

describe "pushSlowlyImpl" do
it "sends each push one by one with a delay" do
let mockConfig =
NotificationSubsystemConfig
{ fanoutLimit = toRange $ Proxy @30,
chunkSize = 12,
slowPushDelay = 1
}

connId2 <- generate arbitrary
origin2 <- generate arbitrary
(user1, user21, user22) <- generate arbitrary
(payload1, payload2) <- generate $ resize 1 arbitrary
clients1 <- generate $ resize 3 arbitrary
let push1 =
PushTo
{ _pushConn = Nothing,
_pushTransient = True,
_pushRoute = V2.RouteDirect,
_pushNativePriority = Nothing,
pushOrigin = Nothing,
_pushRecipients = Recipient user1 (V2.RecipientClientsSome clients1) :| [],
pushJson = payload1
}
push2 =
PushTo
{ _pushConn = Just connId2,
_pushTransient = True,
_pushRoute = V2.RouteAny,
_pushNativePriority = Just V2.LowPriority,
pushOrigin = Just origin2,
_pushRecipients =
Recipient user21 V2.RecipientClientsAll
:| [Recipient user22 V2.RecipientClientsAll],
pushJson = payload2
}
pushes = [push1, push2]

actualPushesRef <- newIORef []
delayControl <- newEmptyMVar
slowPushThread <-
async $
runMockStackWithControlledDelay mockConfig delayControl actualPushesRef $
pushSlowlyImpl pushes

putMVar delayControl mockConfig.slowPushDelay
actualPushes1 <- timeout 100_000 $ (waitUntilPushes actualPushesRef 1)
actualPushes1 `shouldBe` Just [[toV2Push push1]]

putMVar delayControl mockConfig.slowPushDelay
actualPushes2 <- timeout 100_000 $ (waitUntilPushes actualPushesRef 2)
actualPushes2 `shouldBe` Just [[toV2Push push1], [toV2Push push2]]

timeout 100_000 (wait slowPushThread) `shouldReturn` Just ()

describe "toV2Push" do
it "does the transformation correctly" $ property \(pushToUser :: PushToUser) ->
let v2Push = toV2Push pushToUser
Expand Down Expand Up @@ -166,10 +215,44 @@ spec = describe "NotificationSubsystem.Interpreter" do
it "respects the chunkSize limit" $ property \limit (pushes :: [PushTo Int]) ->
all ((<= limit) . sizeOfChunks) (chunkPushes limit pushes)

runGundeckAPIAccessMock :: Member (Final IO) r => Sem (GundeckAPIAccess : r) a -> Sem r [[V2.Push]]
runGundeckAPIAccessMock =
fmap fst . writerToIOFinal . reinterpret \case
PushV2 pushes -> tell [pushes]
runMockStack :: NotificationSubsystemConfig -> Sem [Input NotificationSubsystemConfig, Delay, GundeckAPIAccess, Embed IO, Async, Final IO] a -> IO (a, [[V2.Push]])
runMockStack mockConfig action = do
actualPushesRef <- newIORef []
x <-
runFinal
. asyncToIOFinal
. embedToFinal @IO
. runGundeckAPIAccessIORef actualPushesRef
. runDelayInstantly
. runInputConst mockConfig
$ action
(x,) <$> readIORef actualPushesRef

runMockStackWithControlledDelay ::
NotificationSubsystemConfig ->
MVar Int ->
IORef [[V2.Push]] ->
Sem [Input NotificationSubsystemConfig, Delay, GundeckAPIAccess, Embed IO, Async, Final IO] a ->
IO a
runMockStackWithControlledDelay mockConfig delayControl actualPushesRef = do
runFinal
. asyncToIOFinal
. embedToFinal @IO
. runGundeckAPIAccessIORef actualPushesRef
. runControlledDelay delayControl
. runInputConst mockConfig

runGundeckAPIAccessIORef :: Member (Embed IO) r => IORef [[V2.Push]] -> Sem (GundeckAPIAccess : r) a -> Sem r a
runGundeckAPIAccessIORef pushesRef =
interpret \case
PushV2 pushes -> modifyIORef pushesRef (<> [pushes])

waitUntilPushes :: IORef [a] -> Int -> IO [a]
waitUntilPushes pushesRef n = do
ps <- readIORef pushesRef
if length ps >= n
then pure ps
else threadDelay 1000 >> waitUntilPushes pushesRef n

normalisePush :: PushTo a -> [PushTo a]
normalisePush p =
Expand Down
1 change: 1 addition & 0 deletions libs/wire-subsystems/wire-subsystems.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ test-suite wire-subsystems-tests
build-tool-depends: hspec-discover:hspec-discover
build-depends:
, aeson
, async
, base
, bytestring
, containers
Expand Down
4 changes: 1 addition & 3 deletions services/galley/src/Galley/API/Teams.hs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ import Wire.API.User.Identity (UserSSOId (UserSSOId))
import Wire.API.User.RichInfo (RichInfo)
import Wire.NotificationSubsystem
import Wire.NotificationSubsystem qualified as NotificationSubsystem
import Wire.Sem.Delay
import Wire.Sem.Paging qualified as E
import Wire.Sem.Paging.Cassandra

Expand Down Expand Up @@ -411,8 +410,7 @@ uncheckedDeleteTeam ::
Member LegalHoldStore r,
Member MemberStore r,
Member SparAccess r,
Member TeamStore r,
Member Delay r
Member TeamStore r
) =>
Local UserId ->
Maybe ConnId ->
Expand Down
3 changes: 2 additions & 1 deletion services/galley/src/Galley/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,6 @@ notificationSubssystemConfig :: Env -> NotificationSubsystemConfig
notificationSubssystemConfig env =
NotificationSubsystemConfig
{ chunkSize = 128,
fanoutLimit = currentFanoutLimit env._options
fanoutLimit = currentFanoutLimit env._options,
slowPushDelay = 1000 * fromMaybe defDeleteConvThrottleMillis (env ^. options . O.settings . deleteConvThrottleMillis)
}

0 comments on commit b115394

Please sign in to comment.