diff --git a/prototypes/ScheduledMerges.hs b/prototypes/ScheduledMerges.hs index 56cb319d2..89b7bee5b 100644 --- a/prototypes/ScheduledMerges.hs +++ b/prototypes/ScheduledMerges.hs @@ -1,7 +1,3 @@ -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE EmptyCase #-} -{-# LANGUAGE ScopedTypeVariables #-} - -- | A prototype of an LSM with explicitly scheduled incremental merges. -- -- The scheduled incremental merges is about ensuring that the merging @@ -46,6 +42,7 @@ module ScheduledMerges ( import Prelude hiding (lookup) import Data.Bits +import Data.Foldable (for_, toList) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.STRef @@ -53,7 +50,7 @@ import Data.STRef import Control.Exception (assert) import Control.Monad.ST import Control.Tracer (Tracer, contramap, traceWith) -import GHC.Stack (HasCallStack) +import GHC.Stack (HasCallStack, callStack) import Database.LSMTree.Normal (LookupResult (..), Update (..)) @@ -136,17 +133,20 @@ tieringRunSize n = 4^n levellingRunSize :: Int -> Int levellingRunSize n = 4^(n+1) -tieringRunSizeToLevel :: Run -> Int -tieringRunSizeToLevel r +tieringLevel :: Int -> Int +tieringLevel s | s <= bufferSize = 1 -- level numbers start at 1 | otherwise = 1 + (finiteBitSize s - countLeadingZeros (s-1) - 1) `div` 2 - where - s = Map.size r + +levellingLevel :: Int -> Int +levellingLevel s = max 1 (tieringLevel s - 1) -- level numbers start at 1 + +tieringRunSizeToLevel :: Run -> Int +tieringRunSizeToLevel = tieringLevel . Map.size levellingRunSizeToLevel :: Run -> Int -levellingRunSizeToLevel r = - max 1 (tieringRunSizeToLevel r - 1) -- level numbers start at 1 +levellingRunSizeToLevel = levellingLevel . Map.size bufferSize :: Int bufferSize = tieringRunSize 1 -- 4 @@ -163,11 +163,11 @@ mergeLastForLevel _ = MergeMidLevel -- | Note that the invariants rely on the fact that levelling is only used on -- the last level. -- -invariant :: forall s. Levels s -> ST s Bool +invariant :: forall s. Levels s -> ST s () invariant = go 1 where - go :: Int -> [Level s] -> ST s Bool - go !_ [] = return True + go :: Int -> [Level s] -> ST s () + go !_ [] = return () go !ln (Level mr rs : ls) = do @@ -175,20 +175,19 @@ invariant = go 1 SingleRun r -> return (CompletedMerge r) MergingRun _ _ ref -> readSTRef ref - assert (case mr of - SingleRun{} -> True - MergingRun mp ml _ -> mergePolicyForLevel ln ls == mp - && mergeLastForLevel ls == ml) - assert (length rs <= 3) $ - assert (expectedRunLengths ln rs ls) $ - assert (expectedMergingRunLengths ln mr mrs ls) $ - return () + assertST $ case mr of + SingleRun{} -> True + MergingRun mp ml _ -> mergePolicyForLevel ln ls == mp + && mergeLastForLevel ls == ml + assertST $ length rs <= 3 + expectedRunLengths ln rs ls + expectedMergingRunLengths ln mr mrs ls go (ln+1) ls -- All runs within a level "proper" (as opposed to the incoming runs -- being merged) should be of the correct size for the level. - expectedRunLengths :: Int -> [Run] -> [Level s] -> Bool + expectedRunLengths :: Int -> [Run] -> [Level s] -> ST s () expectedRunLengths ln rs ls = case mergePolicyForLevel ln ls of -- Levels using levelling have only one run, and that single run is @@ -196,68 +195,81 @@ invariant = go 1 -- other "normal" runs. The exception is when a levelling run becomes -- too large and is promoted, in that case initially there's no merge, -- but it is still represented as a 'MergingRun', using 'SingleRun'. - MergePolicyLevelling -> null rs - MergePolicyTiering -> all (\r -> tieringRunSizeToLevel r == ln) rs + MergePolicyLevelling -> assertST $ null rs + -- Runs in tiering levels fit that size. + MergePolicyTiering -> assertST $ all (\r -> tieringRunSizeToLevel r == ln) rs -- Incoming runs being merged also need to be of the right size, but the -- conditions are more complicated. expectedMergingRunLengths :: Int -> MergingRun s -> MergingRunState - -> [Level s] -> Bool + -> [Level s] -> ST s () expectedMergingRunLengths ln mr mrs ls = case mergePolicyForLevel ln ls of MergePolicyLevelling -> + assert (mergeLastForLevel ls == MergeLastLevel) $ case (mr, mrs) of -- A single incoming run (which thus didn't need merging) must be -- of the expected size range already - (SingleRun r, CompletedMerge{}) -> - assert (levellingRunSizeToLevel r == ln) True + (SingleRun r, m) -> do + assertST $ case m of CompletedMerge{} -> True + OngoingMerge{} -> False + assertST $ levellingRunSizeToLevel r == ln -- A completed merge for levelling can be of almost any size at all! -- It can be smaller, due to deletions in the last level. But it -- can't be bigger than would fit into the next level. (_, CompletedMerge r) -> - assert (levellingRunSizeToLevel r <= ln+1) True + assertST $ levellingRunSizeToLevel r <= ln+1 -- An ongoing merge for levelling should have 4 incoming runs of - -- the right size for the level below, and 1 run from this level, - -- but the run from this level can be of almost any size for the - -- same reasons as above. Although if this is the first merge for - -- a new level, it'll have only 4 runs. - (_, OngoingMerge _ rs _) -> - assert (length rs == 4 || length rs == 5) True - && assert (all (\r -> tieringRunSizeToLevel r == ln-1) (take 4 rs)) True - && assert (all (\r -> levellingRunSizeToLevel r <= ln+1) (drop 4 rs)) True + -- the right size for the level below (or slightly smaller), + -- and 1 run from this level, but the run from this level can be of + -- almost any size for the same reasons as above. + -- Although if this is the first merge for a new level, it'll have + -- only 4 runs. + (_, OngoingMerge _ rs _) -> do + let incoming = take 4 rs + let resident = drop 4 rs + assertST $ length incoming == 4 + assertST $ length resident <= 1 + assertST $ all (\r -> tieringRunSizeToLevel r == ln-1) incoming + assertST $ all (\r -> levellingRunSizeToLevel r <= ln+1) resident MergePolicyTiering -> case (mr, mrs, mergeLastForLevel ls) of -- A single incoming run (which thus didn't need merging) must be -- of the expected size already - (SingleRun r, CompletedMerge{}, _) -> - tieringRunSizeToLevel r == ln + (SingleRun r, m, _) -> do + assertST $ case m of CompletedMerge{} -> True + OngoingMerge{} -> False + assertST $ tieringRunSizeToLevel r == ln -- A completed last level run can be of almost any smaller size due -- to deletions, but it can't be bigger than the next level down. -- Note that tiering on the last level only occurs when there is -- a single level only. - (_, CompletedMerge r, MergeLastLevel) -> - ln == 1 - && tieringRunSizeToLevel r <= ln+1 + (_, CompletedMerge r, MergeLastLevel) -> do + assertST $ ln == 1 + assertST $ tieringRunSizeToLevel r <= ln -- A completed mid level run is usually of the size for the -- level it is entering, but can also be one smaller (in which case -- it'll be held back and merged again). (_, CompletedMerge r, MergeMidLevel) -> - rln == ln || rln == ln+1 - where - rln = tieringRunSizeToLevel r + assertST $ tieringRunSizeToLevel r `elem` [ln-1, ln] -- An ongoing merge for tiering should have 4 incoming runs of -- the right size for the level below, and at most 1 run held back -- due to being too small (which would thus also be of the size of -- the level below). - (_, OngoingMerge _ rs _, _) -> - (length rs == 4 || length rs == 5) - && all (\r -> tieringRunSizeToLevel r == ln-1) rs + (_, OngoingMerge _ rs _, _) -> do + assertST $ length rs == 4 || length rs == 5 + assertST $ all (\r -> tieringRunSizeToLevel r == ln-1) rs + +-- 'callStack' just ensures that the 'HasCallStack' constraint is not redundant +-- when compiling with debug assertions disabled. +assertST :: HasCallStack => Bool -> ST s () +assertST p = assert p $ return (const () callStack) ------------------------------------------------------------------------------- @@ -276,18 +288,21 @@ newMerge tr level mergepolicy mergelast rs = do mergeCost = cost, mergeRunsSize = map Map.size rs } - assert (let l = length rs in l >= 2 && l <= 5) $ - MergingRun mergepolicy mergelast <$> newSTRef (OngoingMerge debt rs r) + assert (length rs `elem` [4, 5]) $ + assert (mergeDebtLeft debt >= cost) $ + MergingRun mergepolicy mergelast <$> newSTRef (OngoingMerge debt rs r) where cost = sum (map Map.size rs) -- How much we need to discharge before the merge can be guaranteed - -- complete. + -- complete. More precisely, this is the maximum amount a merge at this + -- level could need. This overestimation means that merges will only + -- complete at the last possible moment. -- Note that for levelling this is includes the single run in the current -- level. - debt = case mergepolicy of - MergePolicyLevelling -> newMergeDebt (4 * tieringRunSize (level-1) - + levellingRunSize level) - MergePolicyTiering -> newMergeDebt (4 * tieringRunSize (level-1)) + debt = newMergeDebt $ case mergepolicy of + MergePolicyLevelling -> 4 * tieringRunSize (level-1) + + levellingRunSize level + MergePolicyTiering -> 4 * tieringRunSize (level-1) -- deliberately lazy: r = case mergelast of MergeMidLevel -> (mergek rs) @@ -352,6 +367,10 @@ data MergeDebt = newMergeDebt :: Debt -> MergeDebt newMergeDebt d = MergeDebt 0 d +mergeDebtLeft :: MergeDebt -> Int +mergeDebtLeft (MergeDebt c d) = + assert (c < d) $ d - c + -- | As credits are paid, debt is reduced in batches when sufficient credits have accumulated. data MergeDebtPaydown = -- | This remaining merge debt is fully paid off with credits. @@ -425,8 +444,7 @@ supply (LSMHandle scr lsmr) credits = do LSMContent _ ls <- readSTRef lsmr modifySTRef' scr (+1) supplyCredits credits ls - ok <- invariant ls - assert ok $ return () + invariant ls lookups :: LSM s -> [Key] -> ST s [(Key, LookupResult Value Blob)] lookups lsm = mapM (\k -> (k,) <$> lookup lsm k) @@ -450,20 +468,27 @@ bufferToRun = id supplyCredits :: Credit -> Levels s -> ST s () supplyCredits n ls = sequence_ - [ supplyMergeCredits (n * creditsForMerge mr) mr | Level mr _rs <- ls ] + [ supplyMergeCredits (ceiling (fromIntegral n * creditsForMerge mr)) mr + | Level mr _rs <- ls + ] -- | The general case (and thus worst case) of how many merge credits we need -- for a level. This is based on the merging policy at the level. -- -creditsForMerge :: MergingRun s -> Credit +creditsForMerge :: MergingRun s -> Float creditsForMerge SingleRun{} = 0 --- A levelling merge is 5x the cost of a tiering merge. --- That's because for levelling one of the runs as an input to the merge --- is the one levelling run which is (up to) 4x bigger than the others put --- together, so it's 1 + 4. -creditsForMerge (MergingRun MergePolicyLevelling _ _) = 5 -creditsForMerge (MergingRun MergePolicyTiering _ _) = 1 +-- A levelling merge has 1 input run and one resident run, which is (up to) 4x +-- bigger than the others. +-- It needs to be completed before another run comes in. +creditsForMerge (MergingRun MergePolicyLevelling _ _) = (1 + 4) / 1 + +-- A tiering merge has 4 runs at most (once could be held back to merged again) +-- and must be completed before the level is full (once 3 more runs come in, +-- as it could have started out with an additional refused run). +-- TODO: We could only increase the merging speed for the merges where this +-- applies, which should be rare. +creditsForMerge (MergingRun MergePolicyTiering _ _) = 4 / 3 type Event = EventAt EventDetail data EventAt e = EventAt { @@ -495,69 +520,99 @@ data EventDetail = increment :: forall s. Tracer (ST s) Event -> Counter -> Run -> Levels s -> ST s (Levels s) increment tr sc = \r ls -> do - ls' <- go 1 [r] ls - ok <- invariant ls' - assert ok (return ls') + (ls', refused) <- go 1 [r] ls + assertST $ null refused + invariant ls' + return ls' where - go :: Int -> [Run] -> Levels s -> ST s (Levels s) - go !ln rs [] = do + go, go' :: Int -> [Run] -> Levels s -> ST s (Levels s, Maybe Run) + go !ln incoming ls = do + case incoming of + [r] -> do + assertST $ tieringRunSizeToLevel r `elem` [ln, ln+1] -- +1 from levelling + _ -> do + assertST $ length incoming == 4 + assertST $ all (\r -> tieringRunSizeToLevel r == ln-1) incoming + assertST $ tieringLevel (sum (map Map.size incoming)) == ln + (ls', refused) <- go' ln incoming ls + for_ refused $ assertST . (== head incoming) + return (ls', refused) + + go' !ln incoming [] = do let mergepolicy = mergePolicyForLevel ln [] traceWith tr' AddLevelEvent - mr <- newMerge tr' ln mergepolicy MergeLastLevel rs - return (Level mr [] : []) + mr <- newMerge tr' ln mergepolicy MergeLastLevel incoming + return (Level mr [] : [], Nothing) where tr' = contramap (EventAt sc ln) tr - go !ln rs' (Level mr rs : ls) = do + go' !ln incoming (Level mr rs : ls) = do r <- expectCompletedMerge tr' mr + let resident = r:rs case mergePolicyForLevel ln ls of -- If r is still too small for this level then keep it and merge again - -- with the incoming runs. - MergePolicyTiering | tieringRunSizeToLevel r < ln -> do - let mergelast = mergeLastForLevel ls - mr' <- newMerge tr' ln MergePolicyTiering mergelast (rs' ++ [r]) - return (Level mr' rs : ls) + -- with the incoming runs, but only if the resulting run is guaranteed + -- not to be too large for this level. + -- If it might become too large, only create a 4-way merge and refuse + -- the most recent of the incoming runs. + MergePolicyTiering | tieringRunSizeToLevel r < ln -> + if sum (map Map.size (r : incoming)) <= tieringRunSize ln + then do + let mergelast = mergeLastForLevel ls + mr' <- newMerge tr' ln MergePolicyTiering mergelast (incoming ++ [r]) + return (Level mr' rs : ls, Nothing) + else do + -- TODO: comment + let mergelast = mergeLastForLevel ls + mr' <- newMerge tr' ln MergePolicyTiering mergelast (tail incoming ++ [r]) + return (Level mr' rs : ls, Just (head incoming)) -- This tiering level is now full. We take the completed merged run -- (the previous incoming runs), plus all the other runs on this level -- as a bundle and move them down to the level below. We start a merge -- for the new incoming runs. This level is otherwise empty. - MergePolicyTiering | levelIsFull rs -> do - mr' <- newMerge tr' ln MergePolicyTiering MergeMidLevel rs' - ls' <- go (ln+1) (r:rs) ls - return (Level mr' [] : ls') + MergePolicyTiering | tieringLevelIsFull ln incoming resident -> do + mr' <- newMerge tr' ln MergePolicyTiering MergeMidLevel incoming + (ls', refused) <- go (ln+1) resident ls + return (Level mr' (toList refused) : ls', Nothing) -- This tiering level is not yet full. We move the completed merged run -- into the level proper, and start the new merge for the incoming runs. MergePolicyTiering -> do let mergelast = mergeLastForLevel ls - mr' <- newMerge tr' ln MergePolicyTiering mergelast rs' - traceWith tr' (AddRunEvent (length (r:rs))) - return (Level mr' (r:rs) : ls) + mr' <- newMerge tr' ln MergePolicyTiering mergelast incoming + traceWith tr' (AddRunEvent (length resident)) + return (Level mr' resident : ls, Nothing) -- The final level is using levelling. If the existing completed merge -- run is too large for this level, we promote the run to the next -- level and start merging the incoming runs into this (otherwise -- empty) level . - MergePolicyLevelling | levellingRunSizeToLevel r > ln -> do + MergePolicyLevelling | levellingLevelIsFull ln incoming r -> do assert (null rs && null ls) $ return () - mr' <- newMerge tr' ln MergePolicyTiering MergeMidLevel rs' - ls' <- go (ln+1) [r] [] - return (Level mr' [] : ls') + mr' <- newMerge tr' ln MergePolicyTiering MergeMidLevel incoming + (ls', refused) <- go (ln+1) [r] [] + return (Level mr' (toList refused) : ls', Nothing) -- Otherwise we start merging the incoming runs into the run. MergePolicyLevelling -> do assert (null rs && null ls) $ return () mr' <- newMerge tr' ln MergePolicyLevelling MergeLastLevel - (rs' ++ [r]) - return (Level mr' [] : []) + (incoming ++ [r]) + return (Level mr' [] : [], Nothing) where tr' = contramap (EventAt sc ln) tr -levelIsFull :: [Run] -> Bool -levelIsFull rs = length rs + 1 >= 4 +-- | Only based on run count, not their sizes. +tieringLevelIsFull :: Int -> [Run] -> [Run] -> Bool +tieringLevelIsFull _ln _incoming resident = length resident >= 4 + +-- | The level is only considered full once the resident run is /too large/ for +-- the level. +levellingLevelIsFull :: Int -> [Run] -> Run -> Bool +levellingLevelIsFull ln _incoming resident = levellingRunSizeToLevel resident > ln duplicate :: LSM s -> ST s (LSM s) duplicate (LSMHandle _scr lsmr) = do diff --git a/prototypes/ScheduledMergesTestQLS.hs b/prototypes/ScheduledMergesTestQLS.hs index 22250f59b..a763200da 100644 --- a/prototypes/ScheduledMergesTestQLS.hs +++ b/prototypes/ScheduledMergesTestQLS.hs @@ -1,11 +1,4 @@ -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE StandaloneDeriving #-} -{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE TypeFamilies #-} module ScheduledMergesTestQLS (tests) where @@ -15,10 +8,12 @@ import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Constraint (Dict (..)) +import Data.Foldable (traverse_) import Data.Proxy import Data.STRef import Control.Exception +import Control.Monad (replicateM_) import Control.Monad.ST import Control.Tracer (Tracer (Tracer), nullTracer) import qualified Control.Tracer as Tracer @@ -41,8 +36,10 @@ import Test.Tasty.QuickCheck (testProperty) tests :: TestTree tests = testGroup "ScheduledMerges" [ - testProperty "ScheduledMerges vs model" prop_LSM + testProperty "ScheduledMerges vs model" $ mapSize (*10) prop_LSM -- still <10s , testCase "regression_empty_run" test_regression_empty_run + , testCase "merge_again_with_incoming" test_merge_again_with_incoming + , testCase "merge_again_with_incoming'" test_merge_again_with_incoming' ] prop_LSM :: Actions (Lockstep Model) -> Property @@ -84,6 +81,48 @@ test_regression_empty_run = -- finish merge LSM.supply lsm 16 +-- | Covers the case where a run ends up too small for a level, so it gets +-- merged again with the next incoming runs. +-- That merge gets completed by supplying credits. +test_merge_again_with_incoming :: IO () +test_merge_again_with_incoming = + runWithTracer $ \tracer -> do + stToIO $ do + lsm <- LSM.new + let ins k = LSM.insert tracer lsm k 0 + -- get something to 3rd level (so 2nd level is not levelling) + -- (needs 5 runs to go to level 2 so the resulting run becomes too big) + traverse_ ins [101..100+(5*16)] + -- get a very small run (4 elements) to 2nd level + replicateM_ 4 $ + traverse_ ins [201..200+4] + -- get another run to 2nd level, which the small run can be merged with + traverse_ ins [301..300+16] + -- complete the merge + LSM.supply lsm 32 + +-- | Covers the case where a run ends up too small for a level, so it gets +-- merged again with the next incoming runs. +-- That merge gets completed and becomes part of another merge. +test_merge_again_with_incoming' :: IO () +test_merge_again_with_incoming' = + runWithTracer $ \tracer -> do + stToIO $ do + lsm <- LSM.new + let ins k = LSM.insert tracer lsm k 0 + -- get something to 3rd level (so 2nd level is not levelling) + -- (needs 5 runs to go to level 2 so the resulting run becomes too big) + traverse_ ins [101..100+(5*16)] + -- get a very small run (4 elements) to 2nd level + replicateM_ 4 $ + traverse_ ins [201..200+4] + -- get another run to 2nd level, which the small run can be merged with + traverse_ ins [301..300+16] + -- get 3 more to 2nd level, so the merge above is expected to complete + -- (actually more, as runs only move once a fifth run arrives...) + traverse_ ins [401..400+(6*16)] + + -- | Provides a tracer and will add the log of traced events to the reported -- failure. runWithTracer :: (Tracer (ST RealWorld) Event -> IO a) -> IO a @@ -170,6 +209,13 @@ instance StateModel (Lockstep Model) where ADuplicate :: ModelVar Model (LSM RealWorld) -> Action (Lockstep Model) (LSM RealWorld) + -- | Without this, the prototype only completes tiering merges when the next + -- merging run on this level is created, so a level would never contain a + -- completed merge. + ASupply :: ModelVar Model (LSM RealWorld) + -> Int + -> Action (Lockstep Model) () + ADump :: ModelVar Model (LSM RealWorld) -> Action (Lockstep Model) (Map Key Value) @@ -210,6 +256,7 @@ instance InLockstep Model where usedVars (ALookup v evk) = SomeGVar v : case evk of Left vk -> [SomeGVar vk]; _ -> [] usedVars (ADuplicate v) = [SomeGVar v] + usedVars (ASupply v _) = [SomeGVar v] usedVars (ADump v) = [SomeGVar v] modelNextState = runModel @@ -258,10 +305,13 @@ instance InLockstep Model where , not (null kvars) ] ++ [ (1, fmap Some $ - ADump <$> elements vars) + ADuplicate <$> elements vars) ] ++ [ (1, fmap Some $ - ADuplicate <$> elements vars) + ASupply <$> elements vars <*> (getSmall . getPositive <$> arbitrary)) + ] + ++ [ (1, fmap Some $ + ADump <$> elements vars) ] shrinkWithVars _findVars _model (AInsert var (Right k) v) = @@ -286,15 +336,17 @@ instance RunLockstep Model IO where (AInsert{}, x) -> OId x (ADelete{}, x) -> OId x (ALookup{}, x) -> OId x - (ADump{}, x) -> OId x (ADuplicate{}, _) -> ORef + (ASupply{}, x) -> OId x + (ADump{}, x) -> OId x showRealResponse _ ANew = Nothing showRealResponse _ AInsert{} = Just Dict showRealResponse _ ADelete{} = Just Dict showRealResponse _ ALookup{} = Just Dict - showRealResponse _ ADump{} = Just Dict showRealResponse _ ADuplicate{} = Nothing + showRealResponse _ ASupply{} = Nothing + showRealResponse _ ADump{} = Just Dict deriving stock instance Show (Action (Lockstep Model) a) deriving stock instance Show (Observable Model a) @@ -319,6 +371,7 @@ runActionIO action lookUp = ALookup var evk -> lookupResultValue <$> lookup (lookUpVar var) k where k = either lookUpVar id evk ADuplicate var -> duplicate (lookUpVar var) + ASupply var n -> supply (lookUpVar var) n ADump var -> logicalValue (lookUpVar var) where lookUpVar :: ModelVar Model a -> a @@ -354,6 +407,8 @@ runModel action lookUp m = ADuplicate var -> (MLSM mlsm', m') where (mlsm', m') = modelDuplicate (lookUpLsMVar var) m + ASupply _ _ -> (MUnit (), m) -- noop + ADump var -> (MDump mapping, m) where (mapping, _) = modelDump (lookUpLsMVar var) m where