diff --git a/bench/macro/lsm-tree-bench-lookups.hs b/bench/macro/lsm-tree-bench-lookups.hs index cb4ad902e..7ada00299 100644 --- a/bench/macro/lsm-tree-bench-lookups.hs +++ b/bench/macro/lsm-tree-bench-lookups.hs @@ -133,7 +133,7 @@ benchSalt :: Bloom.Salt benchSalt = 4 benchmarks :: Run.RunDataCaching -> IO () -benchmarks !caching = withFS $ \hfs hbio -> do +benchmarks !caching = withFS $ \hfs hbio refCtx -> do #ifdef NO_IGNORE_ASSERTS putStrLn "WARNING: Benchmarking in debug mode." putStrLn " To benchmark in release mode, pass:" @@ -163,7 +163,7 @@ benchmarks !caching = withFS $ \hfs hbio -> do -- instead of sequentially. let keyRng0 = mkStdGen 17 - (!runs, !blooms, !indexes, !handles) <- lookupsEnv runSizes keyRng0 hfs hbio caching + (!runs, !blooms, !indexes, !handles) <- lookupsEnv runSizes keyRng0 hfs hbio refCtx caching putStrLn "" traceMarkerIO "Computing statistics for generated runs" @@ -210,7 +210,7 @@ benchmarks !caching = withFS $ \hfs hbio -> do "Calculate batches of keys, and perform disk lookups for each batch. This is roughly doing the same as benchPrepLookups, but also performing the disk I/O and resolving values. Net time/allocation is the result of subtracting the cost of benchGenKeyBatches." (\n -> do let wb_unused = WB.empty - bracket (WBB.new hfs (FS.mkFsPath ["wbblobs_unused"])) releaseRef $ \wbblobs_unused -> + bracket (WBB.new hfs refCtx (FS.mkFsPath ["wbblobs_unused"])) releaseRef $ \wbblobs_unused -> benchLookupsIO hbio arenaManager benchmarkResolveSerialisedValue wb_unused wbblobs_unused runs blooms indexes handles keyRng0 n) @@ -308,13 +308,14 @@ totalNumEntriesSanityCheck l1 runSizes = sum [ 2^l1 * sizeFactor | (_, sizeFactor) <- runSizes ] withFS :: - (FS.HasFS IO FS.HandleIO -> FS.HasBlockIO IO FS.HandleIO -> IO a) + (FS.HasFS IO FS.HandleIO -> FS.HasBlockIO IO FS.HandleIO -> RefCtx -> IO a) -> IO a withFS action = + withRefCtx $ \refCtx -> FS.withIOHasBlockIO (FS.MountPoint "_bench_lookups") FS.defaultIOCtxParams $ \hfs hbio -> do exists <- FS.doesDirectoryExist hfs (FS.mkFsPath [""]) unless exists $ error ("_bench_lookups directory does not exist") - action hfs hbio + action hfs hbio refCtx -- | Input environment for benchmarking lookup functions. -- @@ -336,13 +337,14 @@ lookupsEnv :: -> StdGen -- ^ Key RNG -> FS.HasFS IO FS.HandleIO -> FS.HasBlockIO IO FS.HandleIO + -> RefCtx -> Run.RunDataCaching -> IO ( V.Vector (Ref (Run IO FS.HandleIO)) , V.Vector (Bloom SerialisedKey) , V.Vector Index , V.Vector (FS.Handle FS.HandleIO) ) -lookupsEnv runSizes keyRng0 hfs hbio caching = do +lookupsEnv runSizes keyRng0 hfs hbio refCtx caching = do -- create the vector of initial keys (mvec :: VUM.MVector RealWorld UTxOKey) <- VUM.unsafeNew (totalNumEntries runSizes) !keyRng1 <- vectorOfUniforms mvec keyRng0 @@ -381,7 +383,7 @@ lookupsEnv runSizes keyRng0 hfs hbio caching = do putStr "DONE" -- return runs - runs <- V.fromList <$> mapM Run.fromBuilder rbs + runs <- V.fromList <$> mapM (Run.fromBuilder refCtx) rbs let blooms = V.map (\(DeRef r) -> Run.runFilter r) runs indexes = V.map (\(DeRef r) -> Run.runIndex r) runs handles = V.map (\(DeRef r) -> Run.runKOpsFile r) runs diff --git a/bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs b/bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs index 5ca58854b..e3bc3d80b 100644 --- a/bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs +++ b/bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs @@ -90,7 +90,7 @@ benchSalt = 4 benchLookups :: Config -> Benchmark benchLookups conf@Config{name} = - withEnv $ \ ~(_dir, arenaManager, _hasFS, hasBlockIO, wbblobs, rs, ks) -> + withEnv $ \ ~(_dir, arenaManager, _hasFS, hasBlockIO, _refCtx, wbblobs, rs, ks) -> env ( pure ( V.map (\(DeRef r) -> Run.runFilter r) rs , V.map (\(DeRef r) -> Run.runIndex r) rs , V.map (\(DeRef r) -> Run.runKOpsFile r) rs @@ -182,6 +182,7 @@ lookupsInBatchesEnv :: , ArenaManager RealWorld , FS.HasFS IO FS.HandleIO , FS.HasBlockIO IO FS.HandleIO + , RefCtx , Ref (WBB.WriteBufferBlobs IO FS.HandleIO) , V.Vector (Ref (Run IO FS.HandleIO)) , V.Vector SerialisedKey @@ -192,10 +193,11 @@ lookupsInBatchesEnv Config {..} = do benchTmpDir <- createTempDirectory sysTmpDir "lookupsInBatchesEnv" (storedKeys, lookupKeys) <- lookupsEnv (mkStdGen 17) nentries npos nneg (hasFS, hasBlockIO) <- FS.ioHasBlockIO (FS.MountPoint benchTmpDir) (fromMaybe FS.defaultIOCtxParams ioctxps) - wbblobs <- WBB.new hasFS (FS.mkFsPath ["0.wbblobs"]) + refCtx <- newRefCtx + wbblobs <- WBB.new hasFS refCtx (FS.mkFsPath ["0.wbblobs"]) wb <- WB.fromMap <$> traverse (traverse (WBB.addBlob hasFS wbblobs)) storedKeys let fsps = RunFsPaths (FS.mkFsPath []) (RunNumber 0) - r <- Run.fromWriteBuffer hasFS hasBlockIO benchSalt runParams fsps wb wbblobs + r <- Run.fromWriteBuffer hasFS hasBlockIO refCtx benchSalt runParams fsps wb wbblobs let NumEntries nentriesReal = Run.size r assertEqual nentriesReal nentries $ pure () -- 42 to 43 entries per page @@ -204,6 +206,7 @@ lookupsInBatchesEnv Config {..} = do , arenaManager , hasFS , hasBlockIO + , refCtx , wbblobs , V.singleton r , lookupKeys @@ -222,16 +225,18 @@ lookupsInBatchesCleanup :: , ArenaManager RealWorld , FS.HasFS IO FS.HandleIO , FS.HasBlockIO IO FS.HandleIO + , RefCtx , Ref (WBB.WriteBufferBlobs IO FS.HandleIO) , V.Vector (Ref (Run IO FS.HandleIO)) , V.Vector SerialisedKey ) -> IO () -lookupsInBatchesCleanup (tmpDir, _arenaManager, _hasFS, hasBlockIO, wbblobs, rs, _) = do +lookupsInBatchesCleanup (tmpDir, _arenaManager, _hasFS, hasBlockIO, refCtx, wbblobs, rs, _) = do FS.close hasBlockIO forM_ rs releaseRef releaseRef wbblobs removeDirectoryRecursive tmpDir + closeRefCtx refCtx -- | Generate keys to store and keys to lookup lookupsEnv :: diff --git a/bench/micro/Bench/Database/LSMTree/Internal/Merge.hs b/bench/micro/Bench/Database/LSMTree/Internal/Merge.hs index 7028a48ad..d21de7dc9 100644 --- a/bench/micro/Bench/Database/LSMTree/Internal/Merge.hs +++ b/bench/micro/Bench/Database/LSMTree/Internal/Merge.hs @@ -234,7 +234,7 @@ runParams = benchMerge :: Config -> Benchmark benchMerge conf@Config{name} = - withEnv $ \ ~(_dir, hasFS, hasBlockIO, runs) -> + withEnv $ \ ~(_dir, hasFS, hasBlockIO, refCtx, runs) -> bgroup name [ bench "merge" $ -- We'd like to do: `whnfAppIO (runs' -> ...) runs`. @@ -252,7 +252,7 @@ benchMerge conf@Config{name} = Cr.perRunEnvWithCleanup ((runs,) <$> newIORef Nothing) (releaseRun . snd) $ \(runs', ref) -> do - !run <- merge hasFS hasBlockIO conf outputRunPaths runs' + !run <- merge hasFS hasBlockIO refCtx conf outputRunPaths runs' writeIORef ref $ Just $ releaseRef run ] where @@ -270,15 +270,16 @@ benchMerge conf@Config{name} = merge :: FS.HasFS IO FS.HandleIO -> FS.HasBlockIO IO FS.HandleIO + -> RefCtx -> Config -> Run.RunFsPaths -> InputRuns -> IO (Ref (Run IO FS.HandleIO)) -merge fs hbio Config {..} targetPaths runs = do +merge fs hbio refCtx Config {..} targetPaths runs = do let f = fromMaybe const mergeResolve m <- fromMaybe (error "empty inputs, no merge created") <$> Merge.new fs hbio benchSalt runParams mergeType f targetPaths runs - Merge.stepsToCompletion m stepSize + Merge.stepsToCompletion refCtx m stepSize fsPath :: FS.FsPath fsPath = FS.mkFsPath [] @@ -368,39 +369,44 @@ mergeEnv :: -> IO ( FilePath -- ^ Temporary directory , FS.HasFS IO FS.HandleIO , FS.HasBlockIO IO FS.HandleIO + , RefCtx , InputRuns ) mergeEnv config = do sysTmpDir <- getCanonicalTemporaryDirectory benchTmpDir <- createTempDirectory sysTmpDir "mergeEnv" (hasFS, hasBlockIO) <- FS.ioHasBlockIO (FS.MountPoint benchTmpDir) FS.defaultIOCtxParams - runs <- randomRuns hasFS hasBlockIO config (mkStdGen 17) - pure (benchTmpDir, hasFS, hasBlockIO, runs) + refCtx <- newRefCtx + runs <- randomRuns hasFS hasBlockIO refCtx config (mkStdGen 17) + pure (benchTmpDir, hasFS, hasBlockIO, refCtx, runs) mergeEnvCleanup :: ( FilePath -- ^ Temporary directory , FS.HasFS IO FS.HandleIO , FS.HasBlockIO IO FS.HandleIO + , RefCtx , InputRuns ) -> IO () -mergeEnvCleanup (tmpDir, _hasFS, hasBlockIO, runs) = do +mergeEnvCleanup (tmpDir, _hasFS, hasBlockIO, refCtx, runs) = do traverse_ releaseRef runs removeDirectoryRecursive tmpDir FS.close hasBlockIO + checkForgottenRefs refCtx -- | Generate keys and entries to insert into the write buffer. -- They are already serialised to exclude the cost from the benchmark. randomRuns :: FS.HasFS IO FS.HandleIO -> FS.HasBlockIO IO FS.HandleIO + -> RefCtx -> Config -> StdGen -> IO InputRuns -randomRuns hasFS hasBlockIO config@Config {..} rng0 = do +randomRuns hasFS hasBlockIO refCtx config@Config {..} rng0 = do counter <- inputRunPathsCounter fmap V.fromList $ - mapM (unsafeCreateRun hasFS hasBlockIO benchSalt runParams fsPath counter) $ + mapM (unsafeCreateRun hasFS hasBlockIO refCtx benchSalt runParams fsPath counter) $ zipWith (randomRunData config) nentries diff --git a/src-control/Control/RefCount.hs b/src-control/Control/RefCount.hs index f123ba7c4..874f777ca 100644 --- a/src-control/Control/RefCount.hs +++ b/src-control/Control/RefCount.hs @@ -32,6 +32,12 @@ module Control.RefCount ( , ignoreForgottenRefs , enableForgottenRefChecks , disableForgottenRefChecks + + -- * Reference context + , RefCtx + , withRefCtx + , newRefCtx + , closeRefCtx ) where import Control.DeepSeq @@ -39,6 +45,7 @@ import Control.Exception (assert) import Control.Monad (void, when) import Control.Monad.Class.MonadThrow import Control.Monad.Primitive +import Data.Kind (Type) import Data.Primitive.PrimVar import GHC.Show (appPrec) import GHC.Stack (CallStack, prettyCallStack) @@ -47,7 +54,7 @@ import GHC.Stack (CallStack, prettyCallStack) import Control.Concurrent (yield) import Data.IORef import GHC.Stack (HasCallStack, callStack) -import System.IO.Unsafe (unsafeDupablePerformIO, unsafePerformIO) +import System.IO.Unsafe (unsafeDupablePerformIO) import System.Mem.Weak hiding (deRefWeak) #if MIN_VERSION_base(4,20,0) import System.Mem (performBlockingMajorGC) @@ -209,7 +216,8 @@ class RefCounted m obj | obj -> m where newRef :: RefCounted IO obj => HasCallStackIfDebug - => IO () + => RefCtx + -> IO () -> (RefCounter IO -> obj) -> IO (Ref obj) #-} @@ -221,14 +229,15 @@ class RefCounted m obj | obj -> m where newRef :: (RefCounted m obj, PrimMonad m) => HasCallStackIfDebug - => m () + => RefCtx + -> m () -> (RefCounter m -> obj) -> m (Ref obj) -newRef finaliser mkObject = do +newRef refCtx finaliser mkObject = do rc <- newRefCounter finaliser let !obj = mkObject rc assert (countVar (getRefCounter obj) == countVar rc) $ - newRefWithTracker obj + newRefWithTracker refCtx obj -- | Release a reference to an object that will no longer be used (via this -- reference). @@ -247,7 +256,7 @@ releaseRef :: -> m () releaseRef ref@Ref{refobj} = do assertNoDoubleRelease ref - assertNoForgottenRefs + assertNoForgottenRefs (getRefCtx ref) releaseRefTracker ref decrementRefCounter (getRefCounter refobj) @@ -288,7 +297,7 @@ withRef :: -> m a withRef ref@Ref{refobj} f = do assertNoUseAfterRelease ref - assertNoForgottenRefs + assertNoForgottenRefs (getRefCtx ref) f refobj #ifndef NO_IGNORE_ASSERTS where @@ -311,9 +320,9 @@ dupRef :: -> m (Ref obj) dupRef ref@Ref{refobj} = do assertNoUseAfterRelease ref - assertNoForgottenRefs + assertNoForgottenRefs (getRefCtx ref) incrementRefCounter (getRefCounter refobj) - newRefWithTracker refobj + newRefWithTracker (getRefCtx ref) refobj #ifndef NO_IGNORE_ASSERTS where _unused = throwIO @m @SomeException @@ -343,7 +352,8 @@ mkWeakRefFromRaw obj = WeakRef obj deRefWeak :: RefCounted IO obj => HasCallStackIfDebug - => WeakRef obj + => RefCtx + -> WeakRef obj -> IO (Maybe (Ref obj)) #-} -- | If the object is still alive, obtain a /new/ normal reference. The normal @@ -352,22 +362,23 @@ mkWeakRefFromRaw obj = WeakRef obj deRefWeak :: (RefCounted m obj, PrimMonad m) => HasCallStackIfDebug - => WeakRef obj + => RefCtx + -> WeakRef obj -> m (Maybe (Ref obj)) -deRefWeak (WeakRef obj) = do +deRefWeak refCtx (WeakRef obj) = do success <- tryIncrementRefCounter (getRefCounter obj) - if success then Just <$> newRefWithTracker obj + if success then Just <$> newRefWithTracker refCtx obj else pure Nothing {-# INLINE newRefWithTracker #-} #ifndef NO_IGNORE_ASSERTS -newRefWithTracker :: PrimMonad m => obj -> m (Ref obj) -newRefWithTracker obj = +newRefWithTracker :: PrimMonad m => RefCtx -> obj -> m (Ref obj) +newRefWithTracker _ obj = pure $! Ref obj #else -newRefWithTracker :: (PrimMonad m, HasCallStack) => obj -> m (Ref obj) -newRefWithTracker obj = do - reftracker' <- newRefTracker callStack +newRefWithTracker :: (PrimMonad m, HasCallStack) => RefCtx -> obj -> m (Ref obj) +newRefWithTracker refCtx obj = do + reftracker' <- newRefTracker refCtx callStack pure $! Ref obj reftracker' #endif @@ -412,8 +423,8 @@ releaseRefTracker :: PrimMonad m => Ref a -> m () releaseRefTracker _ = pure () {-# INLINE assertNoForgottenRefs #-} -assertNoForgottenRefs :: PrimMonad m => m () -assertNoForgottenRefs = pure () +assertNoForgottenRefs :: PrimMonad m => RefCtx -> m () +assertNoForgottenRefs _ = pure () {-# INLINE assertNoUseAfterRelease #-} assertNoUseAfterRelease :: PrimMonad m => Ref a -> m () @@ -446,16 +457,7 @@ data RefTracker = RefTracker !RefId !(Weak (IORef (IORef (Maybe CallStack)))) !(IORef (IORef (Maybe CallStack))) -- ^ Release site !CallStack -- ^ Allocation site - -{-# NOINLINE globalRefIdSupply #-} -globalRefIdSupply :: PrimVar RealWorld Int -globalRefIdSupply = unsafePerformIO $ newPrimVar 0 - -data Enabled a = Enabled !a | Disabled - -{-# NOINLINE globalForgottenRef #-} -globalForgottenRef :: IORef (Enabled (Maybe (RefId, CallStack))) -globalForgottenRef = unsafePerformIO $ newIORef (Enabled Nothing) + !RefCtx -- | This version of 'unsafeIOToPrim' is strict in the result of the argument -- action. @@ -468,24 +470,24 @@ unsafeIOToPrimStrict k = do !x <- unsafeIOToPrim k pure x -newRefTracker :: PrimMonad m => CallStack -> m RefTracker -newRefTracker allocSite = unsafeIOToPrimStrict $ do +newRefTracker :: PrimMonad m => RefCtx -> CallStack -> m RefTracker +newRefTracker refCtx@RefCtx{..} allocSite = unsafeIOToPrimStrict $ do inner <- newIORef Nothing outer <- newIORef inner refid <- fetchAddInt globalRefIdSupply 1 weak <- mkWeakIORef outer $ - finaliserRefTracker inner (RefId refid) allocSite - pure (RefTracker (RefId refid) weak outer allocSite) + finaliserRefTracker refCtx inner (RefId refid) allocSite + pure (RefTracker (RefId refid) weak outer allocSite refCtx) releaseRefTracker :: (HasCallStack, PrimMonad m) => Ref a -> m () -releaseRefTracker Ref { reftracker = RefTracker _refid _weak outer _ } = +releaseRefTracker Ref { reftracker = RefTracker _refid _weak outer _ _ } = unsafeIOToPrimStrict $ do inner <- readIORef outer let releaseSite = callStack writeIORef inner (Just releaseSite) -finaliserRefTracker :: IORef (Maybe CallStack) -> RefId -> CallStack -> IO () -finaliserRefTracker inner refid allocSite = do +finaliserRefTracker :: RefCtx -> IORef (Maybe CallStack) -> RefId -> CallStack -> IO () +finaliserRefTracker RefCtx{..} inner refid allocSite = do released <- readIORef inner case released of Just _releaseSite -> pure () @@ -504,8 +506,8 @@ finaliserRefTracker inner refid allocSite = do Enabled (Just (refid', _)) | refid < refid' -> pure () Enabled _ -> writeIORef globalForgottenRef (Enabled (Just (refid, allocSite))) -assertNoForgottenRefs :: (PrimMonad m, MonadThrow m) => m () -assertNoForgottenRefs = do +assertNoForgottenRefs :: (PrimMonad m, MonadThrow m) => RefCtx -> m () +assertNoForgottenRefs RefCtx{..} = do mrefs <- unsafeIOToPrimStrict $ readIORef globalForgottenRef case mrefs of Disabled -> pure () @@ -521,7 +523,7 @@ assertNoForgottenRefs = do assertNoUseAfterRelease :: (PrimMonad m, MonadThrow m, HasCallStack) => Ref a -> m () -assertNoUseAfterRelease Ref { reftracker = RefTracker refid _weak outer allocSite } = do +assertNoUseAfterRelease Ref { reftracker = RefTracker refid _weak outer allocSite _ } = do released <- unsafeIOToPrimStrict (readIORef =<< readIORef outer) case released of Nothing -> pure () @@ -535,7 +537,7 @@ assertNoUseAfterRelease Ref { reftracker = RefTracker refid _weak outer allocSit #endif assertNoDoubleRelease :: (PrimMonad m, MonadThrow m, HasCallStack) => Ref a -> m () -assertNoDoubleRelease Ref { reftracker = RefTracker refid _weak outer allocSite } = do +assertNoDoubleRelease Ref { reftracker = RefTracker refid _weak outer allocSite _ } = do released <- unsafeIOToPrimStrict (readIORef =<< readIORef outer) case released of Nothing -> pure () @@ -556,11 +558,12 @@ assertNoDoubleRelease Ref { reftracker = RefTracker refid _weak outer allocSite -- Note however that this is not the only place where 'RefNeverReleased' -- exceptions can be thrown. All Ref operations poll for forgotten refs. -- -checkForgottenRefs :: forall m. (PrimMonad m, MonadThrow m) => m () -checkForgottenRefs = do +checkForgottenRefs :: forall m. (PrimMonad m, MonadThrow m) => RefCtx -> m () +checkForgottenRefs = #ifndef NO_IGNORE_ASSERTS - pure () + \_ -> pure () #else + \refCtx -> do -- The hope is that by combining `performMajorGC` with `yield` that the -- former starts the finalizer threads for all dropped weak references and -- the latter suspends the current process and puts it at the end of the @@ -574,7 +577,7 @@ checkForgottenRefs = do yield performMajorGCWithBlockingIfAvailable yield - assertNoForgottenRefs + assertNoForgottenRefs refCtx #endif where _unused = throwIO @m @SomeException @@ -584,8 +587,8 @@ checkForgottenRefs = do -- -- This is especillay important in QC tests with shrinking which otherwise -- leads to confusion. -ignoreForgottenRefs :: (PrimMonad m, MonadCatch m) => m () -ignoreForgottenRefs = void $ try @_ @SomeException $ checkForgottenRefs +ignoreForgottenRefs :: (PrimMonad m, MonadCatch m) => RefCtx -> m () +ignoreForgottenRefs refCtx = void $ try @_ @SomeException $ checkForgottenRefs refCtx #ifdef NO_IGNORE_ASSERTS performMajorGCWithBlockingIfAvailable :: IO () @@ -597,25 +600,92 @@ performMajorGCWithBlockingIfAvailable = do #endif #endif --- | Enable forgotten reference checks. -enableForgottenRefChecks :: IO () +{------------------------------------------------------------------------------- + Reference context +-------------------------------------------------------------------------------} --- | Disable forgotten reference checks. This will error if there are already --- forgotten references while we are trying to disable the checks. -disableForgottenRefChecks :: IO () +-- | A 'RefCtx' defines the scope within which 'Ref's should exist. +-- +-- In debug mode (when using CPP define @NO_IGNORE_ASSERTS@), the 'RefCtx' +-- records forgotten references. +type RefCtx :: Type + +-- | Run an action with a local 'RefCtx'. +withRefCtx :: (PrimMonad m, MonadThrow m) => (RefCtx -> m a) -> m a +withRefCtx = bracket newRefCtx closeRefCtx + +-- | Create a new 'RefCtx'. +-- +-- It is preferable to use 'withRefCtx'. +newRefCtx :: PrimMonad m => m RefCtx + +-- | Close a 'RefCtx'. +-- +-- It is preferable to use 'withRefCtx'. +closeRefCtx :: forall m. (PrimMonad m, MonadThrow m) => RefCtx -> m () + +-- | In debug mode, enable forgotten reference checks. +enableForgottenRefChecks :: PrimMonad m => RefCtx -> m () + +-- | In debug mode, disable forgotten reference checks. +disableForgottenRefChecks :: PrimMonad m => RefCtx -> m () + +-- | Return the 'RefCtx' that the given 'Ref' lives in. +getRefCtx :: Ref a -> RefCtx #ifdef NO_IGNORE_ASSERTS -enableForgottenRefChecks = + +data RefCtx = RefCtx { + globalForgottenRef :: !(IORef (Enabled (Maybe (RefId, CallStack)))) + , globalRefIdSupply :: !(PrimVar RealWorld Int) + } + +data Enabled a = Enabled !a | Disabled + +instance NFData RefCtx where + rnf (RefCtx a b) = rnf a `seq` rwhnf b + +newRefCtx = unsafeIOToPrimStrict $ do + globalForgottenRef <- newIORef $ Enabled Nothing + globalRefIdSupply <- newPrimVar 0 + pure $! RefCtx globalForgottenRef globalRefIdSupply + +closeRefCtx = checkForgottenRefs + +enableForgottenRefChecks refCtx@RefCtx{..} = + unsafeIOToPrimStrict $ do + checkForgottenRefs refCtx modifyIORef globalForgottenRef $ \case Disabled -> Enabled Nothing - Enabled _ -> error "enableForgottenRefChecks: already enabled" + Enabled x -> Enabled x -disableForgottenRefChecks = +disableForgottenRefChecks refCtx@RefCtx{..} = + unsafeIOToPrimStrict $ do + checkForgottenRefs refCtx modifyIORef globalForgottenRef $ \case - Disabled -> error "disableForgottenRefChecks: already disabled" + Disabled -> Disabled Enabled Nothing -> Disabled Enabled _ -> error "disableForgottenRefChecks: can not disable when there are forgotten references" + +getRefCtx (Ref _ (RefTracker _ _ _ _ refCtx)) = refCtx + #else -enableForgottenRefChecks = pure () -disableForgottenRefChecks = pure () + +data RefCtx = RefCtx + +instance NFData RefCtx where + rnf RefCtx = () + +newRefCtx = pure RefCtx + +closeRefCtx _ = pure () + where + _unused = throwIO @m (userError "unused") + +enableForgottenRefChecks _ = pure () + +disableForgottenRefChecks _ = pure () + +getRefCtx _ = RefCtx + #endif diff --git a/src-extras/Database/LSMTree/Extras/MergingRunData.hs b/src-extras/Database/LSMTree/Extras/MergingRunData.hs index a3b3a7caf..c89affccb 100644 --- a/src-extras/Database/LSMTree/Extras/MergingRunData.hs +++ b/src-extras/Database/LSMTree/Extras/MergingRunData.hs @@ -46,6 +46,7 @@ withMergingRun :: MR.IsMergeType t => HasFS IO h -> HasBlockIO IO h + -> RefCtx -> ResolveSerialisedValue -> Bloom.Salt -> RunBuilder.RunParams @@ -54,9 +55,9 @@ withMergingRun :: -> SerialisedMergingRunData t -> (Ref (MergingRun t IO h) -> IO a) -> IO a -withMergingRun hfs hbio resolve salt runParams path counter mrd = do +withMergingRun hfs hbio refCtx resolve salt runParams path counter mrd = do bracket - (unsafeCreateMergingRun hfs hbio resolve salt runParams path counter mrd) + (unsafeCreateMergingRun hfs hbio refCtx resolve salt runParams path counter mrd) releaseRef -- | Flush serialised merging run data to disk. @@ -69,6 +70,7 @@ unsafeCreateMergingRun :: MR.IsMergeType t => HasFS IO h -> HasBlockIO IO h + -> RefCtx -> ResolveSerialisedValue -> Bloom.Salt -> RunBuilder.RunParams @@ -76,19 +78,19 @@ unsafeCreateMergingRun :: -> UniqCounter IO -> SerialisedMergingRunData t -> IO (Ref (MergingRun t IO h)) -unsafeCreateMergingRun hfs hbio resolve salt runParams path counter = \case +unsafeCreateMergingRun hfs hbio refCtx resolve salt runParams path counter = \case CompletedMergeData _ rd -> do - withRun hfs hbio salt runParams path counter rd $ \run -> do + withRun hfs hbio refCtx salt runParams path counter rd $ \run -> do -- slightly hacky, generally it's larger let totalDebt = MR.numEntriesToMergeDebt (Run.size run) - MR.newCompleted totalDebt run + MR.newCompleted refCtx totalDebt run OngoingMergeData mergeType rds -> do - withRuns hfs hbio salt runParams path counter (toRunData <$> rds) + withRuns hfs hbio refCtx salt runParams path counter (toRunData <$> rds) $ \runs -> do n <- incrUniqCounter counter let fsPaths = RunFsPaths path (RunNumber (uniqueToInt n)) - MR.new hfs hbio resolve salt runParams mergeType + MR.new hfs hbio refCtx resolve salt runParams mergeType fsPaths (V.fromList runs) {------------------------------------------------------------------------------- diff --git a/src-extras/Database/LSMTree/Extras/MergingTreeData.hs b/src-extras/Database/LSMTree/Extras/MergingTreeData.hs index 675de359e..850d73dfd 100644 --- a/src-extras/Database/LSMTree/Extras/MergingTreeData.hs +++ b/src-extras/Database/LSMTree/Extras/MergingTreeData.hs @@ -44,6 +44,7 @@ import Test.QuickCheck as QC withMergingTree :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> ResolveSerialisedValue -> Bloom.Salt -> RunParams @@ -52,9 +53,9 @@ withMergingTree :: -> SerialisedMergingTreeData -> (Ref (MergingTree IO h) -> IO a) -> IO a -withMergingTree hfs hbio resolve salt runParams path counter mrd = do +withMergingTree hfs hbio refCtx resolve salt runParams path counter mrd = do bracket - (unsafeCreateMergingTree hfs hbio resolve salt runParams path counter mrd) + (unsafeCreateMergingTree hfs hbio refCtx resolve salt runParams path counter mrd) releaseRef -- | Flush serialised merging tree data to disk. @@ -66,6 +67,7 @@ withMergingTree hfs hbio resolve salt runParams path counter mrd = do unsafeCreateMergingTree :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> ResolveSerialisedValue -> Bloom.Salt -> RunParams @@ -73,22 +75,22 @@ unsafeCreateMergingTree :: -> UniqCounter IO -> SerialisedMergingTreeData -> IO (Ref (MergingTree IO h)) -unsafeCreateMergingTree hfs hbio resolve salt runParams path counter = go +unsafeCreateMergingTree hfs hbio refCtx resolve salt runParams path counter = go where go = \case CompletedTreeMergeData rd -> - withRun hfs hbio salt runParams path counter rd $ \run -> - MT.newCompletedMerge run + withRun hfs hbio refCtx salt runParams path counter rd $ \run -> + MT.newCompletedMerge refCtx run OngoingTreeMergeData mrd -> - withMergingRun hfs hbio resolve salt runParams path counter mrd $ \mr -> - MT.newOngoingMerge mr + withMergingRun hfs hbio refCtx resolve salt runParams path counter mrd $ \mr -> + MT.newOngoingMerge refCtx mr PendingLevelMergeData prds mtd -> withPreExistingRuns prds $ \prs -> withMaybeTree mtd $ \mt -> - MT.newPendingLevelMerge prs mt + MT.newPendingLevelMerge refCtx prs mt PendingUnionMergeData mtds -> withTrees mtds $ \mts -> - MT.newPendingUnionMerge mts + MT.newPendingUnionMerge refCtx mts withTrees [] act = act [] withTrees (mtd:rest) act = @@ -103,11 +105,11 @@ unsafeCreateMergingTree hfs hbio resolve salt runParams path counter = go withPreExistingRuns [] act = act [] withPreExistingRuns (PreExistingRunData rd : rest) act = - withRun hfs hbio salt runParams path counter rd $ \r -> + withRun hfs hbio refCtx salt runParams path counter rd $ \r -> withPreExistingRuns rest $ \prs -> act (MT.PreExistingRun r : prs) withPreExistingRuns (PreExistingMergingRunData mrd : rest) act = - withMergingRun hfs hbio resolve salt runParams path counter mrd $ \mr -> + withMergingRun hfs hbio refCtx resolve salt runParams path counter mrd $ \mr -> withPreExistingRuns rest $ \prs -> act (MT.PreExistingMergingRun mr : prs) diff --git a/src-extras/Database/LSMTree/Extras/NoThunks.hs b/src-extras/Database/LSMTree/Extras/NoThunks.hs index 8e9d36857..2ac0f7f33 100644 --- a/src-extras/Database/LSMTree/Extras/NoThunks.hs +++ b/src-extras/Database/LSMTree/Extras/NoThunks.hs @@ -685,6 +685,9 @@ instance Typeable (PrimState m) => NoThunks (RefCounter m) where , noThunks ctx $ (OnlyCheckWhnfNamed b :: OnlyCheckWhnfNamed "finaliser" (m ())) ] +-- RefCtx constructor not exported +deriving via OnlyCheckWhnf RefCtx instance NoThunks RefCtx + -- Ref constructor not exported, cannot derive Generic, use DeRef instead. instance (NoThunks obj, Typeable obj) => NoThunks (Ref obj) where showTypeOf p@(_ :: Proxy (Ref obj)) = show $ typeRep p diff --git a/src-extras/Database/LSMTree/Extras/RunData.hs b/src-extras/Database/LSMTree/Extras/RunData.hs index b313ded60..9f2e438bf 100644 --- a/src-extras/Database/LSMTree/Extras/RunData.hs +++ b/src-extras/Database/LSMTree/Extras/RunData.hs @@ -76,6 +76,7 @@ import Test.QuickCheck withRun :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> Bloom.Salt -> RunParams -> FS.FsPath @@ -83,24 +84,25 @@ withRun :: -> SerialisedRunData -> (Ref (Run IO h) -> IO a) -> IO a -withRun hfs hbio salt runParams path counter rd = do +withRun hfs hbio refCtx salt runParams path counter rd = do bracket - (unsafeCreateRun hfs hbio salt runParams path counter rd) + (unsafeCreateRun hfs hbio refCtx salt runParams path counter rd) releaseRef -- | Create a temporary 'Run' using 'unsafeCreateRunAt'. withRunAt :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> Bloom.Salt -> RunParams -> RunFsPaths -> SerialisedRunData -> (Ref (Run IO h) -> IO a) -> IO a -withRunAt hfs hbio salt runParams path rd = do +withRunAt hfs hbio refCtx salt runParams path rd = do bracket - (unsafeCreateRunAt hfs hbio salt runParams path rd) + (unsafeCreateRunAt hfs hbio refCtx salt runParams path rd) releaseRef {-# INLINABLE withRuns #-} @@ -108,6 +110,7 @@ withRunAt hfs hbio salt runParams path rd = do withRuns :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> Bloom.Salt -> RunParams -> FS.FsPath @@ -115,11 +118,11 @@ withRuns :: -> [SerialisedRunData] -> ([Ref (Run IO h)] -> IO a) -> IO a -withRuns hfs hbio salt runParams path counter = go +withRuns hfs hbio refCtx salt runParams path counter = go where go [] act = act [] go (rd:rds) act = - withRun hfs hbio salt runParams path counter rd $ \r -> + withRun hfs hbio refCtx salt runParams path counter rd $ \r -> go rds $ \rs -> act (r:rs) @@ -128,16 +131,17 @@ withRuns hfs hbio salt runParams path counter = go unsafeCreateRun :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> Bloom.Salt -> RunParams -> FS.FsPath -> UniqCounter IO -> SerialisedRunData -> IO (Ref (Run IO h)) -unsafeCreateRun fs hbio salt runParams path counter rd = do +unsafeCreateRun fs hbio refCtx salt runParams path counter rd = do n <- incrUniqCounter counter let fsPaths = RunFsPaths path (uniqueToRunNumber n) - unsafeCreateRunAt fs hbio salt runParams fsPaths rd + unsafeCreateRunAt fs hbio refCtx salt runParams fsPaths rd -- | Flush serialised run data to disk as if it were a write buffer. -- @@ -148,19 +152,20 @@ unsafeCreateRun fs hbio salt runParams path counter rd = do unsafeCreateRunAt :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> Bloom.Salt -> RunParams -> RunFsPaths -> SerialisedRunData -> IO (Ref (Run IO h)) -unsafeCreateRunAt fs hbio salt runParams fsPaths (RunData m) = do +unsafeCreateRunAt fs hbio refCtx salt runParams fsPaths (RunData m) = do -- the WBB file path doesn't have to be at a specific place relative to -- the run we want to create, but fsPaths should already point to a unique -- location, so we just append something to not conflict with that. let blobpath = FS.addExtension (runBlobPath fsPaths) ".wb" - bracket (WBB.new fs blobpath) releaseRef $ \wbblobs -> do + bracket (WBB.new fs refCtx blobpath) releaseRef $ \wbblobs -> do wb <- WB.fromMap <$> traverse (traverse (WBB.addBlob fs wbblobs)) m - Run.fromWriteBuffer fs hbio salt runParams fsPaths wb wbblobs + Run.fromWriteBuffer fs hbio refCtx salt runParams fsPaths wb wbblobs -- | Create a 'RunFsPaths' using an empty 'FsPath'. The empty path corresponds -- to the "root" or "mount point" of a 'HasFS' instance. @@ -178,16 +183,17 @@ simplePaths ns = fmap simplePath ns -- | Use 'SerialisedRunData' to 'WriteBuffer' and 'WriteBufferBlobs'. withRunDataAsWriteBuffer :: FS.HasFS IO h + -> RefCtx -> ResolveSerialisedValue -> WriteBufferFsPaths -> SerialisedRunData -> (WB.WriteBuffer -> Ref (WBB.WriteBufferBlobs IO h) -> IO a) -> IO a -withRunDataAsWriteBuffer hfs f fsPaths rd action = do +withRunDataAsWriteBuffer hfs refCtx f fsPaths rd action = do let es = V.fromList . Map.toList $ unRunData rd let maxn = NumEntries $ V.length es let wbbPath = Paths.writeBufferBlobPath fsPaths - bracket (WBB.new hfs wbbPath) releaseRef $ \wbb -> do + bracket (WBB.new hfs refCtx wbbPath) releaseRef $ \wbb -> do (wb, _) <- addWriteBufferEntries hfs f wbb maxn WB.empty es action wb wbb diff --git a/src/Database/LSMTree/Internal/BlobFile.hs b/src/Database/LSMTree/Internal/BlobFile.hs index 81cc9ecde..7df2cafa5 100644 --- a/src/Database/LSMTree/Internal/BlobFile.hs +++ b/src/Database/LSMTree/Internal/BlobFile.hs @@ -59,15 +59,16 @@ instance NFData BlobSpan where -- -- ASYNC: this should be called with asynchronous exceptions masked because it -- allocates/creates resources. -{-# SPECIALISE openBlobFile :: HasCallStack => HasFS IO h -> FS.FsPath -> FS.OpenMode -> IO (Ref (BlobFile IO h)) #-} +{-# SPECIALISE openBlobFile :: HasCallStack => HasFS IO h -> RefCtx -> FS.FsPath -> FS.OpenMode -> IO (Ref (BlobFile IO h)) #-} openBlobFile :: (PrimMonad m, MonadCatch m) => HasCallStack => HasFS m h + -> RefCtx -> FS.FsPath -> FS.OpenMode -> m (Ref (BlobFile m h)) -openBlobFile fs path mode = +openBlobFile fs refCtx path mode = bracketOnError (FS.hOpen fs path mode) (FS.hClose fs) $ \blobFileHandle -> do let finaliser = FS.hClose fs blobFileHandle `finally` @@ -76,7 +77,7 @@ openBlobFile fs path mode = -- surprise errors when the file is also deleted elsewhere. Maybe -- file paths should be guarded by 'Ref's as well? FS.removeFile fs (FS.handlePath blobFileHandle) - newRef finaliser $ \blobFileRefCounter -> + newRef refCtx finaliser $ \blobFileRefCounter -> BlobFile { blobFileHandle, blobFileRefCounter diff --git a/src/Database/LSMTree/Internal/BlobRef.hs b/src/Database/LSMTree/Internal/BlobRef.hs index 324e9d6d4..15f07991f 100644 --- a/src/Database/LSMTree/Internal/BlobRef.hs +++ b/src/Database/LSMTree/Internal/BlobRef.hs @@ -109,14 +109,16 @@ newtype WeakBlobRefInvalid = WeakBlobRefInvalid Int deriving anyclass (Exception) {-# SPECIALISE deRefWeakBlobRef :: - WeakBlobRef IO h + RefCtx + -> WeakBlobRef IO h -> IO (StrongBlobRef IO h) #-} deRefWeakBlobRef :: (MonadThrow m, PrimMonad m) - => WeakBlobRef m h + => RefCtx + -> WeakBlobRef m h -> m (StrongBlobRef m h) -deRefWeakBlobRef WeakBlobRef{weakBlobRefFile, weakBlobRefSpan} = do - mstrongBlobRefFile <- deRefWeak weakBlobRefFile +deRefWeakBlobRef refCtx WeakBlobRef{weakBlobRefFile, weakBlobRefSpan} = do + mstrongBlobRefFile <- deRefWeak refCtx weakBlobRefFile case mstrongBlobRefFile of Just strongBlobRefFile -> pure StrongBlobRef { @@ -126,17 +128,19 @@ deRefWeakBlobRef WeakBlobRef{weakBlobRefFile, weakBlobRefSpan} = do Nothing -> throwIO (WeakBlobRefInvalid 0) {-# SPECIALISE deRefWeakBlobRefs :: - V.Vector (WeakBlobRef IO h) + RefCtx + -> V.Vector (WeakBlobRef IO h) -> IO (V.Vector (StrongBlobRef IO h)) #-} deRefWeakBlobRefs :: forall m h. (MonadMask m, PrimMonad m) - => V.Vector (WeakBlobRef m h) + => RefCtx + -> V.Vector (WeakBlobRef m h) -> m (V.Vector (StrongBlobRef m h)) -deRefWeakBlobRefs wrefs = do +deRefWeakBlobRefs refCtx wrefs = do refs <- VM.new (V.length wrefs) V.iforM_ wrefs $ \i WeakBlobRef {weakBlobRefFile, weakBlobRefSpan} -> do - mstrongBlobRefFile <- deRefWeak weakBlobRefFile + mstrongBlobRefFile <- deRefWeak refCtx weakBlobRefFile case mstrongBlobRefFile of Just strongBlobRefFile -> VM.write refs i StrongBlobRef { @@ -162,25 +166,27 @@ readRawBlobRef :: readRawBlobRef fs RawBlobRef {rawBlobRefFile, rawBlobRefSpan} = BlobFile.readBlobRaw fs rawBlobRefFile rawBlobRefSpan -{-# SPECIALISE readWeakBlobRef :: HasFS IO h -> WeakBlobRef IO h -> IO SerialisedBlob #-} +{-# SPECIALISE readWeakBlobRef :: HasFS IO h -> RefCtx -> WeakBlobRef IO h -> IO SerialisedBlob #-} readWeakBlobRef :: (MonadMask m, PrimMonad m) => HasFS m h + -> RefCtx -> WeakBlobRef m h -> m SerialisedBlob -readWeakBlobRef fs wref = - bracket (deRefWeakBlobRef wref) releaseBlobRef $ +readWeakBlobRef fs refCtx wref = + bracket (deRefWeakBlobRef refCtx wref) releaseBlobRef $ \StrongBlobRef {strongBlobRefFile, strongBlobRefSpan} -> BlobFile.readBlob fs strongBlobRefFile strongBlobRefSpan -{-# SPECIALISE readWeakBlobRefs :: HasBlockIO IO h -> V.Vector (WeakBlobRef IO h) -> IO (V.Vector SerialisedBlob) #-} +{-# SPECIALISE readWeakBlobRefs :: HasBlockIO IO h -> RefCtx -> V.Vector (WeakBlobRef IO h) -> IO (V.Vector SerialisedBlob) #-} readWeakBlobRefs :: (MonadMask m, PrimMonad m) => HasBlockIO m h + -> RefCtx -> V.Vector (WeakBlobRef m h) -> m (V.Vector SerialisedBlob) -readWeakBlobRefs hbio wrefs = - bracket (deRefWeakBlobRefs wrefs) (V.mapM_ releaseBlobRef) $ \refs -> do +readWeakBlobRefs hbio refCtx wrefs = + bracket (deRefWeakBlobRefs refCtx wrefs) (V.mapM_ releaseBlobRef) $ \refs -> do -- Prepare the IOOps: -- We use a single large memory buffer, with appropriate offsets within -- the buffer. diff --git a/src/Database/LSMTree/Internal/IncomingRun.hs b/src/Database/LSMTree/Internal/IncomingRun.hs index 665a126f2..1d3e38469 100644 --- a/src/Database/LSMTree/Internal/IncomingRun.hs +++ b/src/Database/LSMTree/Internal/IncomingRun.hs @@ -182,7 +182,8 @@ nominalDebtAsCredits :: NominalDebt -> NominalCredits nominalDebtAsCredits (NominalDebt c) = NominalCredits c {-# SPECIALISE supplyCreditsIncomingRun :: - TableConfig + RefCtx + -> TableConfig -> LevelNo -> IncomingRun IO h -> NominalCredits @@ -191,13 +192,14 @@ nominalDebtAsCredits (NominalDebt c) = NominalCredits c -- This is a relative addition of credits, not a new absolute total value. supplyCreditsIncomingRun :: (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) - => TableConfig + => RefCtx + -> TableConfig -> LevelNo -> IncomingRun m h -> NominalCredits -> m () -supplyCreditsIncomingRun _ _ (Single _r) _ = pure () -supplyCreditsIncomingRun conf ln (Merging _ nominalDebt nominalCreditsVar mr) +supplyCreditsIncomingRun _ _ _ (Single _r) _ = pure () +supplyCreditsIncomingRun refCtx conf ln (Merging _ nominalDebt nominalCreditsVar mr) deposit = do (_nominalCredits, nominalCredits') <- depositNominalCredits nominalDebt nominalCreditsVar @@ -207,7 +209,7 @@ supplyCreditsIncomingRun conf ln (Merging _ nominalDebt nominalCreditsVar mr) nominalCredits' !thresh = creditThresholdForLevel conf ln (_suppliedCredits, - _suppliedCredits') <- MR.supplyCreditsAbsolute mr thresh mergeCredits' + _suppliedCredits') <- MR.supplyCreditsAbsolute refCtx mr thresh mergeCredits' pure () --TODO: currently each supplying credits action results in contributing -- credits to the underlying merge, but this need not be the case. We @@ -344,26 +346,28 @@ timesDivABC_fast (W# a) (W# b) (W# c) = (# q, _r #) -> W# q {-# SPECIALISE immediatelyCompleteIncomingRun :: - TableConfig + RefCtx + -> TableConfig -> LevelNo -> IncomingRun IO h -> IO (Ref (Run IO h)) #-} -- | Supply enough credits to complete the merge now. immediatelyCompleteIncomingRun :: (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) - => TableConfig + => RefCtx + -> TableConfig -> LevelNo -> IncomingRun m h -> m (Ref (Run m h)) -immediatelyCompleteIncomingRun conf ln ir = +immediatelyCompleteIncomingRun refCtx conf ln ir = case ir of Single r -> dupRef r Merging _ (NominalDebt nominalDebt) nominalCreditsVar mr -> do NominalCredits nominalCredits <- readPrimVar nominalCreditsVar let !deposit = NominalCredits (nominalDebt - nominalCredits) - supplyCreditsIncomingRun conf ln ir deposit + supplyCreditsIncomingRun refCtx conf ln ir deposit -- This ensures the merge is really completed. However, we don't -- release the merge yet, but we do return a new reference to the run. - MR.expectCompleted mr + MR.expectCompleted refCtx mr diff --git a/src/Database/LSMTree/Internal/Merge.hs b/src/Database/LSMTree/Internal/Merge.hs index edf51d491..1b26e453f 100644 --- a/src/Database/LSMTree/Internal/Merge.hs +++ b/src/Database/LSMTree/Internal/Merge.hs @@ -215,7 +215,8 @@ abort Merge {..} = do writeMutVar mergeState $! Closed {-# SPECIALISE complete :: - Merge t IO h + RefCtx + -> Merge t IO h -> IO (Ref (Run IO h)) #-} -- | Complete a 'Merge', returning a new 'Run' as the result of merging the -- input runs. @@ -235,21 +236,23 @@ abort Merge {..} = do -- complete :: (MonadSTM m, MonadST m, MonadMask m) - => Merge t m h + => RefCtx + -> Merge t m h -> m (Ref (Run m h)) -complete Merge{..} = do +complete refCtx Merge{..} = do readMutVar mergeState >>= \case Merging -> error "complete: Merge is not done" MergingDone -> do -- the readers are already drained, therefore closed - r <- Run.fromBuilder mergeBuilder + r <- Run.fromBuilder refCtx mergeBuilder writeMutVar mergeState $! Completed pure r Completed -> error "complete: Merge is already completed" Closed -> error "complete: Merge is closed" {-# SPECIALISE stepsToCompletion :: - Merge t IO h + RefCtx + -> Merge t IO h -> Int -> IO (Ref (Run IO h)) #-} -- | Like 'steps', but calling 'complete' once the merge is finished. @@ -257,18 +260,20 @@ complete Merge{..} = do -- Note: run with async exceptions masked. See 'complete'. stepsToCompletion :: (MonadMask m, MonadSTM m, MonadST m) - => Merge t m h + => RefCtx + -> Merge t m h -> Int -> m (Ref (Run m h)) -stepsToCompletion m stepBatchSize = go +stepsToCompletion refCtx m stepBatchSize = go where go = do steps m stepBatchSize >>= \case (_, MergeInProgress) -> go - (_, MergeDone) -> complete m + (_, MergeDone) -> complete refCtx m {-# SPECIALISE stepsToCompletionCounted :: - Merge t IO h + RefCtx + -> Merge t IO h -> Int -> IO (Int, Ref (Run IO h)) #-} -- | Like 'steps', but calling 'complete' once the merge is finished. @@ -276,16 +281,17 @@ stepsToCompletion m stepBatchSize = go -- Note: run with async exceptions masked. See 'complete'. stepsToCompletionCounted :: (MonadMask m, MonadSTM m, MonadST m) - => Merge t m h + => RefCtx + -> Merge t m h -> Int -> m (Int, Ref (Run m h)) -stepsToCompletionCounted m stepBatchSize = go 0 +stepsToCompletionCounted refCtx m stepBatchSize = go 0 where go !stepsSum = do steps m stepBatchSize >>= \case (n, MergeInProgress) -> go (stepsSum + n) (n, MergeDone) -> let !stepsSum' = stepsSum + n - in (stepsSum',) <$> complete m + in (stepsSum',) <$> complete refCtx m data StepResult = MergeInProgress | MergeDone deriving stock Eq diff --git a/src/Database/LSMTree/Internal/MergeSchedule.hs b/src/Database/LSMTree/Internal/MergeSchedule.hs index b951d7bf2..315147b3e 100644 --- a/src/Database/LSMTree/Internal/MergeSchedule.hs +++ b/src/Database/LSMTree/Internal/MergeSchedule.hs @@ -445,6 +445,7 @@ releaseUnionCache reg (UnionCache mt) = -> ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h + -> RefCtx -> SessionRoot -> Bloom.Salt -> UniqCounter IO @@ -484,6 +485,7 @@ updatesWithInterleavedFlushes :: -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h + -> RefCtx -> SessionRoot -> Bloom.Salt -> UniqCounter m @@ -491,7 +493,7 @@ updatesWithInterleavedFlushes :: -> ActionRegistry m -> TableContent m h -> m (TableContent m h) -updatesWithInterleavedFlushes tr conf resolve hfs hbio root salt uc es reg tc = do +updatesWithInterleavedFlushes tr conf resolve hfs hbio refCtx root salt uc es reg tc = do let wb = tableWriteBuffer tc wbblobs = tableWriteBufferBlobs tc (wb', es') <- addWriteBufferEntries hfs resolve wbblobs maxn wb es @@ -499,20 +501,20 @@ updatesWithInterleavedFlushes tr conf resolve hfs hbio root salt uc es reg tc = -- number of supplied credits is based on the size increase of the write -- buffer, not the number of processed entries @length es' - length es@. let numAdded = unNumEntries (WB.numEntries wb') - unNumEntries (WB.numEntries wb) - supplyCredits conf (NominalCredits numAdded) (tableLevels tc) + supplyCredits refCtx conf (NominalCredits numAdded) (tableLevels tc) let tc' = tc { tableWriteBuffer = wb' } if WB.numEntries wb' < maxn then do pure $! tc' -- If the write buffer did reach capacity, then we flush. else do - tc'' <- flushWriteBuffer tr conf resolve hfs hbio root salt uc reg tc' + tc'' <- flushWriteBuffer tr conf resolve hfs hbio refCtx root salt uc reg tc' -- In the fortunate case where we have already performed all the updates, -- return, if V.null es' then pure $! tc'' -- otherwise, keep going else - updatesWithInterleavedFlushes tr conf resolve hfs hbio root salt uc es' reg tc'' + updatesWithInterleavedFlushes tr conf resolve hfs hbio refCtx root salt uc es' reg tc'' where AllocNumEntries (NumEntries -> maxn) = confWriteBufferAlloc conf @@ -565,6 +567,7 @@ addWriteBufferEntries hfs f wbblobs maxn = -> ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h + -> RefCtx -> SessionRoot -> Bloom.Salt -> UniqCounter IO @@ -582,13 +585,14 @@ flushWriteBuffer :: -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h + -> RefCtx -> SessionRoot -> Bloom.Salt -> UniqCounter m -> ActionRegistry m -> TableContent m h -> m (TableContent m h) -flushWriteBuffer tr conf resolve hfs hbio root salt uc reg tc +flushWriteBuffer tr conf resolve hfs hbio refCtx root salt uc reg tc | WB.null (tableWriteBuffer tc) = pure tc | otherwise = do !uniq <- incrUniqCounter uc @@ -602,15 +606,15 @@ flushWriteBuffer tr conf resolve hfs hbio root salt uc reg tc TraceFlushWriteBuffer size (runNumber runPaths) runParams r <- withRollback reg (Run.fromWriteBuffer - hfs hbio salt + hfs hbio refCtx salt runParams runPaths (tableWriteBuffer tc) (tableWriteBufferBlobs tc)) releaseRef delayedCommit reg (releaseRef (tableWriteBufferBlobs tc)) - wbblobs' <- withRollback reg (WBB.new hfs (Paths.tableBlobPath root uniq)) + wbblobs' <- withRollback reg (WBB.new hfs refCtx (Paths.tableBlobPath root uniq)) releaseRef - levels' <- addRunToLevels tr conf resolve hfs hbio root salt uc r reg + levels' <- addRunToLevels tr conf resolve hfs hbio refCtx root salt uc r reg (tableLevels tc) (tableUnionLevel tc) tableCache' <- rebuildCache reg (tableCache tc) levels' @@ -629,6 +633,7 @@ flushWriteBuffer tr conf resolve hfs hbio root salt uc reg tc -> ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h + -> RefCtx -> SessionRoot -> Bloom.Salt -> UniqCounter IO @@ -649,6 +654,7 @@ addRunToLevels :: -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h + -> RefCtx -> SessionRoot -> Bloom.Salt -> UniqCounter m @@ -657,7 +663,7 @@ addRunToLevels :: -> Levels m h -> UnionLevel m h -> m (Levels m h) -addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root salt uc r0 reg levels ul = do +addRunToLevels tr conf@TableConfig{..} resolve hfs hbio refCtx root salt uc r0 reg levels ul = do go (LevelNo 1) (V.singleton r0) levels where -- NOTE: @go@ is based on the @increment@ function from the @@ -723,7 +729,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root salt uc r0 reg leve r <- case ir of Single r -> pure r Merging _ _ _ mr -> do - r <- withRollback reg (MR.expectCompleted mr) releaseRef + r <- withRollback reg (MR.expectCompleted refCtx mr) releaseRef delayedCommit reg (releaseRef mr) pure r traceWith tr $ AtLevel ln $ @@ -738,7 +744,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root salt uc r0 reg leve -> m (IncomingRun m h) newMerge mergePolicy mergeType ln rs = do ir <- withRollback reg - (newIncomingRunAtLevel tr hfs hbio + (newIncomingRunAtLevel tr hfs hbio refCtx root salt uc conf resolve mergePolicy mergeType ln rs) releaseIncomingRun @@ -750,7 +756,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root salt uc r0 reg leve Incremental -> pure () OneShot -> bracket - (immediatelyCompleteIncomingRun conf ln ir) + (immediatelyCompleteIncomingRun refCtx conf ln ir) releaseRef $ \r -> traceWith tr $ AtLevel ln $ @@ -762,6 +768,7 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root salt uc r0 reg leve Tracer IO (AtLevel MergeTrace) -> HasFS IO h -> HasBlockIO IO h + -> RefCtx -> SessionRoot -> Bloom.Salt -> UniqCounter IO @@ -777,6 +784,7 @@ newIncomingRunAtLevel :: => Tracer m (AtLevel MergeTrace) -> HasFS m h -> HasBlockIO m h + -> RefCtx -> SessionRoot -> Bloom.Salt -> UniqCounter m @@ -787,7 +795,7 @@ newIncomingRunAtLevel :: -> LevelNo -> V.Vector (Ref (Run m h)) -> m (IncomingRun m h) -newIncomingRunAtLevel tr hfs hbio +newIncomingRunAtLevel tr hfs hbio refCtx root salt uc conf resolve mergePolicy mergeType ln rs | Just (r, rest) <- V.uncons rs, V.null rest = do @@ -808,7 +816,7 @@ newIncomingRunAtLevel tr hfs hbio runParams mergePolicy mergeType bracket - (MR.new hfs hbio resolve salt runParams mergeType runPaths rs) + (MR.new hfs hbio refCtx resolve salt runParams mergeType runPaths rs) releaseRef $ \mr -> assert (MR.totalMergeDebt mr <= maxMergeDebt conf mergePolicy ln) $ let nominalDebt = nominalDebtForLevel conf ln in @@ -948,7 +956,8 @@ levelIsFull sr rs = V.length rs + 1 >= (sizeRatioInt sr) -} {-# SPECIALISE supplyCredits :: - TableConfig + RefCtx + -> TableConfig -> NominalCredits -> Levels IO h -> IO () @@ -957,13 +966,14 @@ levelIsFull sr rs = V.length rs + 1 >= (sizeRatioInt sr) -- This /may/ cause some merges to progress. supplyCredits :: (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) - => TableConfig + => RefCtx + -> TableConfig -> NominalCredits -> Levels m h -> m () -supplyCredits conf deposit levels = +supplyCredits refCtx conf deposit levels = iforLevelM_ levels $ \ln (Level ir _rs) -> - supplyCreditsIncomingRun conf ln ir deposit + supplyCreditsIncomingRun refCtx conf ln ir deposit --TODO: consider tracing supply of credits, -- supplyCreditsIncomingRun could easily return the supplied credits -- before & after, which may be useful for tracing. diff --git a/src/Database/LSMTree/Internal/MergingRun.hs b/src/Database/LSMTree/Internal/MergingRun.hs index 71000fba7..f87eab75f 100644 --- a/src/Database/LSMTree/Internal/MergingRun.hs +++ b/src/Database/LSMTree/Internal/MergingRun.hs @@ -132,6 +132,7 @@ instance NFData MergeKnownCompleted where Merge.IsMergeType t => HasFS IO h -> HasBlockIO IO h + -> RefCtx -> ResolveSerialisedValue -> Bloom.Salt -> RunParams @@ -150,6 +151,7 @@ new :: (Merge.IsMergeType t, MonadMVar m, MonadMask m, MonadSTM m, MonadST m) => HasFS m h -> HasBlockIO m h + -> RefCtx -> ResolveSerialisedValue -> Bloom.Salt -> RunParams @@ -157,7 +159,7 @@ new :: -> RunFsPaths -> V.Vector (Ref (Run m h)) -> m (Ref (MergingRun t m h)) -new hfs hbio resolve salt runParams ty runPaths inputRuns = +new hfs hbio refCtx resolve salt runParams ty runPaths inputRuns = assert (V.length inputRuns > 0) $ do -- there can be empty runs, which we don't want to include in the merge -- TODO: making runs non-empty would involve introducing a constructor @@ -175,8 +177,9 @@ new hfs hbio resolve salt runParams ty runPaths inputRuns = -- as we do in the prototype. but that would mean that the result -- doesn't follow the supplied @runParams@. -- TODO: decide whether that optimisation is okay - r <- Run.newEmpty hfs hbio salt runParams runPaths + r <- Run.newEmpty hfs hbio refCtx salt runParams runPaths unsafeNew + refCtx (MergeDebt 0) (SpentCredits 0) MergeKnownCompleted @@ -186,13 +189,15 @@ new hfs hbio resolve salt runParams ty runPaths inputRuns = merge <- fromMaybe (error "newMerge: merges can not be empty") <$> Merge.new hfs hbio salt runParams ty resolve runPaths rs unsafeNew + refCtx (numEntriesToMergeDebt (V.foldMap' Run.size rs)) (SpentCredits 0) MergeMaybeCompleted (OngoingMerge rs merge) {-# SPECIALISE newCompleted :: - MergeDebt + RefCtx + -> MergeDebt -> Ref (Run IO h) -> IO (Ref (MergingRun t IO h)) #-} -- | Create a merging run that is already in the completed state, returning a @@ -204,13 +209,15 @@ new hfs hbio resolve salt runParams ty runPaths inputRuns = -- failing after internal resources have already been created. newCompleted :: (MonadMVar m, MonadMask m, MonadSTM m, MonadST m) - => MergeDebt -- ^ Since there are no longer any input runs, we need to be + => RefCtx + -> MergeDebt -- ^ Since there are no longer any input runs, we need to be -- told what the merge debt was. -> Ref (Run m h) -> m (Ref (MergingRun t m h)) -newCompleted mergeDebt inputRun = do +newCompleted refCtx mergeDebt inputRun = do bracketOnError (dupRef inputRun) releaseRef $ \run -> unsafeNew + refCtx mergeDebt (SpentCredits (mergeDebtAsCredits mergeDebt)) -- since it is completed MergeKnownCompleted @@ -225,22 +232,23 @@ data TableTooLargeError {-# INLINE unsafeNew #-} unsafeNew :: (MonadMVar m, MonadMask m, MonadSTM m, MonadST m) - => MergeDebt + => RefCtx + -> MergeDebt -> SpentCredits -> MergeKnownCompleted -> MergingRunState t m h -> m (Ref (MergingRun t m h)) -unsafeNew (MergeDebt mergeDebt) _ _ _ +unsafeNew _ (MergeDebt mergeDebt) _ _ _ | SpentCredits mergeDebt > maxBound = throwIO ErrTableTooLarge -unsafeNew mergeDebt (SpentCredits spentCredits) +unsafeNew refCtx mergeDebt (SpentCredits spentCredits) knownCompleted state = do let !credits = CreditsPair (SpentCredits spentCredits) (UnspentCredits 0) mergeCreditsVar <- CreditsVar <$> newPrimVar credits mergeKnownCompleted <- newMutVar knownCompleted mergeState <- newMVar $! state - newRef (finalise mergeState) $ \mergeRefCounter -> + newRef refCtx (finalise mergeState) $ \mergeRefCounter -> MergingRun { mergeDebt , mergeCreditsVar @@ -826,14 +834,15 @@ supplyChecked _query supply x credits = do -- supplyCreditsRelative :: forall t m h. (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) - => Ref (MergingRun t m h) + => RefCtx + -> Ref (MergingRun t m h) -> CreditThreshold -> MergeCredits -> m MergeCredits -supplyCreditsRelative = flip $ \th -> +supplyCreditsRelative refCtx = flip $ \th -> supplyChecked remainingMergeDebt $ \mr c -> do (_suppliedCredits, suppliedCredits', leftoverCredits) - <- supplyCredits mr th (SupplyMergeCredits SupplyRelative c) + <- supplyCredits refCtx mr th (SupplyMergeCredits SupplyRelative c) assert (suppliedCredits' == mergeDebtAsCredits (totalMergeDebt mr) || leftoverCredits == 0) $ @@ -862,20 +871,22 @@ supplyCreditsRelative = flip $ \th -> -- supplyCreditsAbsolute :: forall t m h. (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) - => Ref (MergingRun t m h) + => RefCtx + -> Ref (MergingRun t m h) -> CreditThreshold -> MergeCredits -> m (MergeCredits, MergeCredits) -- ^ (suppliedCredits, suppliedCredits') -supplyCreditsAbsolute mr th c = +supplyCreditsAbsolute refCtx mr th c = assert (0 <= c && c <= mergeDebtAsCredits (totalMergeDebt mr)) $ do (suppliedCredits, suppliedCredits', _leftoverCredits) - <- supplyCredits mr th (SupplyMergeCredits SupplyAbsolute c) + <- supplyCredits refCtx mr th (SupplyMergeCredits SupplyAbsolute c) assert (suppliedCredits' == max c suppliedCredits) $ pure (suppliedCredits, suppliedCredits') {-# SPECIALISE supplyCredits :: - Ref (MergingRun t IO h) + RefCtx + -> Ref (MergingRun t IO h) -> CreditThreshold -> SupplyMergeCredits -> IO (MergeCredits, MergeCredits, MergeCredits) #-} @@ -883,12 +894,14 @@ supplyCreditsAbsolute mr th c = -- ongoing merge to progress. supplyCredits :: forall t m h. (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) - => Ref (MergingRun t m h) + => RefCtx + -> Ref (MergingRun t m h) -> CreditThreshold -> SupplyMergeCredits -> m (MergeCredits, MergeCredits, MergeCredits) -- ^ (suppliedCredits, suppliedCredits', leftoverCredits) -supplyCredits (DeRef MergingRun { +supplyCredits refCtx + (DeRef MergingRun { mergeKnownCompleted, mergeDebt, mergeCreditsVar, @@ -931,7 +944,7 @@ supplyCredits (DeRef MergingRun { -- completion, then that is fine. The next supplyCredits will -- complete the merge. when weFinishedMerge $ - completeMerge mergeState mergeKnownCompleted + completeMerge refCtx mergeState mergeKnownCompleted assert ( 0 <= suppliedCredits) $ assert (suppliedCredits <= suppliedCredits') $ @@ -975,16 +988,18 @@ performMergeSteps mergeVar creditsVar credits = pure $ stepResult == MergeDone {-# SPECIALISE completeMerge :: - StrictMVar IO (MergingRunState t IO h) + RefCtx + -> StrictMVar IO (MergingRunState t IO h) -> MutVar RealWorld MergeKnownCompleted -> IO () #-} -- | Convert an 'OngoingMerge' to a 'CompletedMerge'. completeMerge :: (MonadSTM m, MonadST m, MonadMVar m, MonadMask m) - => StrictMVar m (MergingRunState t m h) + => RefCtx + -> StrictMVar m (MergingRunState t m h) -> MutVar (PrimState m) MergeKnownCompleted -> m () -completeMerge mergeVar mergeKnownCompletedVar = do +completeMerge refCtx mergeVar mergeKnownCompletedVar = do modifyMVarMasked_ mergeVar $ \case mrs@CompletedMerge{} -> pure $! mrs (OngoingMerge rs m) -> do @@ -993,21 +1008,22 @@ completeMerge mergeVar mergeKnownCompletedVar = do --TODO: Run.fromBuilder (used in Merge.complete) claims not to be -- exception safe so we should probably be using the resource registry -- and test for exception safety. - r <- Merge.complete m + r <- Merge.complete refCtx m V.forM_ rs releaseRef -- Cache the knowledge that we completed the merge writeMutVar mergeKnownCompletedVar MergeKnownCompleted pure $! CompletedMerge r {-# SPECIALISE expectCompleted :: - Ref (MergingRun t IO h) + RefCtx + -> Ref (MergingRun t IO h) -> IO (Ref (Run IO h)) #-} -- | This does /not/ release the reference, but allocates a new reference for -- the returned run, which must be released at some point. expectCompleted :: (MonadMVar m, MonadSTM m, MonadST m, MonadMask m) - => Ref (MergingRun t m h) -> m (Ref (Run m h)) -expectCompleted (DeRef MergingRun {..}) = do + => RefCtx -> Ref (MergingRun t m h) -> m (Ref (Run m h)) +expectCompleted refCtx (DeRef MergingRun {..}) = do knownCompleted <- readMutVar mergeKnownCompleted -- The merge is not guaranteed to be complete, so we do the remaining steps when (knownCompleted == MergeMaybeCompleted) $ do @@ -1022,7 +1038,7 @@ expectCompleted (DeRef MergingRun {..}) = do -- If an async exception happens before we get to perform the -- completion, then that is fine. The next 'expectCompleted' will -- complete the merge. - when weFinishedMerge $ completeMerge mergeState mergeKnownCompleted + when weFinishedMerge $ completeMerge refCtx mergeState mergeKnownCompleted withMVar mergeState $ \case CompletedMerge r -> dupRef r -- return a fresh reference to the run OngoingMerge{} -> do diff --git a/src/Database/LSMTree/Internal/MergingTree.hs b/src/Database/LSMTree/Internal/MergingTree.hs index 133a79189..e0606e560 100644 --- a/src/Database/LSMTree/Internal/MergingTree.hs +++ b/src/Database/LSMTree/Internal/MergingTree.hs @@ -147,28 +147,33 @@ data PreExistingRun m h = | PreExistingMergingRun !(Ref (MergingRun MR.LevelMergeType m h)) {-# SPECIALISE newCompletedMerge :: - Ref (Run IO h) + RefCtx + -> Ref (Run IO h) -> IO (Ref (MergingTree IO h)) #-} newCompletedMerge :: (MonadMVar m, PrimMonad m, MonadMask m) - => Ref (Run m h) + => RefCtx + -> Ref (Run m h) -> m (Ref (MergingTree m h)) -newCompletedMerge run = mkMergingTree . CompletedTreeMerge =<< dupRef run +newCompletedMerge refCtx run = mkMergingTree refCtx . CompletedTreeMerge =<< dupRef run {-# SPECIALISE newOngoingMerge :: - Ref (MergingRun MR.TreeMergeType IO h) + RefCtx + -> Ref (MergingRun MR.TreeMergeType IO h) -> IO (Ref (MergingTree IO h)) #-} -- | Create a new 'MergingTree' representing the merge of an ongoing run. -- The usage of this function is primarily to facilitate the reloading of an -- ongoing merge from a persistent snapshot. newOngoingMerge :: (MonadMVar m, PrimMonad m, MonadMask m) - => Ref (MergingRun MR.TreeMergeType m h) + => RefCtx + -> Ref (MergingRun MR.TreeMergeType m h) -> m (Ref (MergingTree m h)) -newOngoingMerge mr = mkMergingTree . OngoingTreeMerge =<< dupRef mr +newOngoingMerge refCtx mr = mkMergingTree refCtx . OngoingTreeMerge =<< dupRef mr {-# SPECIALISE newPendingLevelMerge :: - [PreExistingRun IO h] + RefCtx + -> [PreExistingRun IO h] -> Maybe (Ref (MergingTree IO h)) -> IO (Ref (MergingTree IO h)) #-} -- | Create a new 'MergingTree' representing the merge of a sequence of @@ -191,11 +196,12 @@ newOngoingMerge mr = mkMergingTree . OngoingTreeMerge =<< dupRef mr newPendingLevelMerge :: forall m h. (MonadMVar m, MonadMask m, PrimMonad m) - => [PreExistingRun m h] + => RefCtx + -> [PreExistingRun m h] -> Maybe (Ref (MergingTree m h)) -> m (Ref (MergingTree m h)) -newPendingLevelMerge [] (Just t) = dupRef t -newPendingLevelMerge [PreExistingRun r] Nothing = do +newPendingLevelMerge _ [] (Just t) = dupRef t +newPendingLevelMerge refCtx [PreExistingRun r] Nothing = do -- No need to create a pending merge here. -- -- We could do something similar for PreExistingMergingRun, but it's: @@ -207,15 +213,15 @@ newPendingLevelMerge [PreExistingRun r] Nothing = do r' <- dupRef r -- There are no interruption points here, and thus provided async -- exceptions are masked then there can be no async exceptions here at all. - mkMergingTree (CompletedTreeMerge r') + mkMergingTree refCtx (CompletedTreeMerge r') -newPendingLevelMerge prs mmt = do +newPendingLevelMerge refCtx prs mmt = do -- isStructurallyEmpty is an interruption point, and can receive async -- exceptions even when masked. So we use it first, *before* allocating -- new references. mmt' <- dupMaybeMergingTree mmt prs' <- traverse dupPreExistingRun (V.fromList prs) - mkMergingTree (PendingTreeMerge (PendingLevelMerge prs' mmt')) + mkMergingTree refCtx (PendingTreeMerge (PendingLevelMerge prs' mmt')) where dupPreExistingRun (PreExistingRun r) = PreExistingRun <$!> dupRef r @@ -232,7 +238,8 @@ newPendingLevelMerge prs mmt = do else Just <$!> dupRef mt {-# SPECIALISE newPendingUnionMerge :: - [Ref (MergingTree IO h)] + RefCtx + -> [Ref (MergingTree IO h)] -> IO (Ref (MergingTree IO h)) #-} -- | Create a new 'MergingTree' representing the union of one or more merging -- trees. This is for unioning the content of multiple tables (represented @@ -251,9 +258,10 @@ newPendingLevelMerge prs mmt = do -- allocates\/creates resources. newPendingUnionMerge :: (MonadMVar m, MonadMask m, PrimMonad m) - => [Ref (MergingTree m h)] + => RefCtx + -> [Ref (MergingTree m h)] -> m (Ref (MergingTree m h)) -newPendingUnionMerge mts = do +newPendingUnionMerge refCtx mts = do mts' <- V.filterM (fmap not . isStructurallyEmpty) (V.fromList mts) -- isStructurallyEmpty is interruptible even with async exceptions masked, -- but we use it before allocating new references. @@ -261,7 +269,7 @@ newPendingUnionMerge mts = do case V.uncons mts'' of Just (mt, x) | V.null x -> pure mt - _ -> mkMergingTree (PendingTreeMerge (PendingUnionMerge mts'')) + _ -> mkMergingTree refCtx (PendingTreeMerge (PendingUnionMerge mts'')) {-# SPECIALISE isStructurallyEmpty :: Ref (MergingTree IO h) -> IO Bool #-} -- | Test if a 'MergingTree' is \"obviously\" empty by virtue of its structure. @@ -281,7 +289,8 @@ isStructurallyEmptyState = \case -- a zero length runs as empty. {-# SPECIALISE mkMergingTree :: - MergingTreeState IO h + RefCtx + -> MergingTreeState IO h -> IO (Ref (MergingTree IO h)) #-} -- | Constructor helper. -- @@ -291,11 +300,12 @@ isStructurallyEmptyState = \case -- mkMergingTree :: (MonadMVar m, PrimMonad m, MonadMask m) - => MergingTreeState m h + => RefCtx + -> MergingTreeState m h -> m (Ref (MergingTree m h)) -mkMergingTree mergeTreeState = do +mkMergingTree refCtx mergeTreeState = do mergeState <- newMVar mergeTreeState - newRef (finalise mergeState) $ \mergeRefCounter -> + newRef refCtx (finalise mergeState) $ \mergeRefCounter -> MergingTree { mergeState , mergeRefCounter @@ -368,6 +378,7 @@ debtOfNestedMerge debts = {-# SPECIALISE supplyCredits :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> ResolveSerialisedValue -> Bloom.Salt -> Run.RunParams @@ -382,6 +393,7 @@ supplyCredits :: (MonadMVar m, MonadST m, MonadSTM m, MonadMask m) => HasFS m h -> HasBlockIO m h + -> RefCtx -> ResolveSerialisedValue -> Bloom.Salt -> Run.RunParams @@ -391,7 +403,7 @@ supplyCredits :: -> Ref (MergingTree m h) -> MR.MergeCredits -> m MR.MergeCredits -supplyCredits hfs hbio resolve salt runParams threshold root uc = \mt0 c0 -> do +supplyCredits hfs hbio refCtx resolve salt runParams threshold root uc = \mt0 c0 -> do if c0 <= 0 then pure 0 else supplyTree mt0 c0 @@ -417,13 +429,13 @@ supplyCredits hfs hbio resolve salt runParams threshold root uc = \mt0 c0 -> do pure (state, credits) OngoingTreeMerge mr -> do - leftovers <- MR.supplyCreditsRelative mr threshold credits + leftovers <- MR.supplyCreditsRelative refCtx mr threshold credits if leftovers <= 0 then pure (state, 0) else do -- complete ongoing merge - r <- withRollback reg (MR.expectCompleted mr) releaseRef + r <- withRollback reg (MR.expectCompleted refCtx mr) releaseRef delayedCommit reg (releaseRef mr) -- all work is done, we can't spend any more credits pure (CompletedTreeMerge r, leftovers) @@ -440,7 +452,7 @@ supplyCredits hfs hbio resolve salt runParams threshold root uc = \mt0 c0 -> do withRollback reg -- TODO: the builder's handles aren't cleaned up if we fail -- before fromBuilder closes them - (Run.newEmpty hfs hbio salt runParams runPaths) + (Run.newEmpty hfs hbio refCtx salt runParams runPaths) releaseRef pure (CompletedTreeMerge run, credits) @@ -469,7 +481,7 @@ supplyCredits hfs hbio resolve salt runParams threshold root uc = \mt0 c0 -> do supplyPreExisting c = \case PreExistingRun _r -> pure c -- no work to do, all leftovers - PreExistingMergingRun mr -> MR.supplyCreditsRelative mr threshold c + PreExistingMergingRun mr -> MR.supplyCreditsRelative refCtx mr threshold c -- supply credits left to right until they are used up leftToRight :: @@ -504,7 +516,7 @@ supplyCredits hfs hbio resolve salt runParams threshold root uc = \mt0 c0 -> do runPaths <- mkFreshRunPaths mr <- withRollback reg - (MR.new hfs hbio resolve salt runParams mergeType runPaths rs) + (MR.new hfs hbio refCtx resolve salt runParams mergeType runPaths rs) releaseRef -- no need for the runs anymore, 'MR.new' made duplicates traverse_ (\r -> delayedCommit reg (releaseRef r)) rs @@ -527,23 +539,24 @@ supplyCredits hfs hbio resolve salt runParams threshold root uc = \mt0 c0 -> do withRollback reg (dupRef r) releaseRef PreExistingMergingRun mr -> do delayedCommit reg (releaseRef mr) -- only released at the end - withRollback reg (MR.expectCompleted mr) releaseRef + withRollback reg (MR.expectCompleted refCtx mr) releaseRef rs2 <- V.forM mts $ \mt -> do delayedCommit reg (releaseRef mt) -- only released at the end - withRollback reg (expectCompleted mt) releaseRef + withRollback reg (expectCompleted refCtx mt) releaseRef pure (ty, rs1 <> rs2) -- | This does /not/ release the reference, but allocates a new reference for -- the returned run, which must be released at some point. {-# SPECIALISE expectCompleted :: - Ref (MergingTree IO h) + RefCtx + -> Ref (MergingTree IO h) -> IO (Ref (Run IO h)) #-} expectCompleted :: (MonadMVar m, MonadSTM m, MonadST m, MonadMask m) - => Ref (MergingTree m h) -> m (Ref (Run m h)) -expectCompleted (DeRef MergingTree {..}) = do + => RefCtx -> Ref (MergingTree m h) -> m (Ref (Run m h)) +expectCompleted refCtx (DeRef MergingTree {..}) = do withMVar mergeState $ \case CompletedTreeMerge r -> dupRef r -- return a fresh reference to the run - OngoingTreeMerge mr -> MR.expectCompleted mr + OngoingTreeMerge mr -> MR.expectCompleted refCtx mr PendingTreeMerge{} -> error "expectCompleted: expected a completed merging tree, but found a pending one" diff --git a/src/Database/LSMTree/Internal/Run.hs b/src/Database/LSMTree/Internal/Run.hs index 9dd77cba7..ad9f69dbc 100644 --- a/src/Database/LSMTree/Internal/Run.hs +++ b/src/Database/LSMTree/Internal/Run.hs @@ -186,6 +186,7 @@ setRunDataCaching hbio runKOpsFile NoCacheRunData = do {-# SPECIALISE newEmpty :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> Bloom.Salt -> RunParams -> RunFsPaths @@ -196,37 +197,42 @@ newEmpty :: (MonadST m, MonadSTM m, MonadMask m) => HasFS m h -> HasBlockIO m h + -> RefCtx -> Bloom.Salt -> RunParams -> RunFsPaths -> m (Ref (Run m h)) -newEmpty hfs hbio salt runParams runPaths = do +newEmpty hfs hbio refCtx salt runParams runPaths = do builder <- Builder.new hfs hbio salt runParams runPaths (NumEntries 0) - fromBuilder builder + fromBuilder refCtx builder {-# SPECIALISE fromBuilder :: - RunBuilder IO h + RefCtx + -> RunBuilder IO h -> IO (Ref (Run IO h)) #-} -- TODO: make exception safe fromBuilder :: (MonadST m, MonadSTM m, MonadMask m) - => RunBuilder m h + => RefCtx + -> RunBuilder m h -> m (Ref (Run m h)) -fromBuilder builder = do +fromBuilder refCtx builder = do (runHasFS, runHasBlockIO, runRunFsPaths, runFilter, runIndex, RunParams {runParamCaching = runRunDataCaching}, runNumEntries) <- Builder.unsafeFinalise builder runKOpsFile <- FS.hOpen runHasFS (runKOpsPath runRunFsPaths) FS.ReadMode -- TODO: openBlobFile should be called with exceptions masked - runBlobFile <- openBlobFile runHasFS (runBlobPath runRunFsPaths) FS.ReadMode + runBlobFile <- openBlobFile runHasFS refCtx (runBlobPath runRunFsPaths) FS.ReadMode setRunDataCaching runHasBlockIO runKOpsFile runRunDataCaching - newRef (finaliser runHasFS runKOpsFile runBlobFile runRunFsPaths) + newRef refCtx + (finaliser runHasFS runKOpsFile runBlobFile runRunFsPaths) (\runRefCounter -> Run { .. }) {-# SPECIALISE fromWriteBuffer :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> Bloom.Salt -> RunParams -> RunFsPaths @@ -247,18 +253,19 @@ fromWriteBuffer :: (MonadST m, MonadSTM m, MonadMask m) => HasFS m h -> HasBlockIO m h + -> RefCtx -> Bloom.Salt -> RunParams -> RunFsPaths -> WriteBuffer -> Ref (WriteBufferBlobs m h) -> m (Ref (Run m h)) -fromWriteBuffer fs hbio salt params fsPaths buffer blobs = do +fromWriteBuffer fs hbio refCtx salt params fsPaths buffer blobs = do builder <- Builder.new fs hbio salt params fsPaths (WB.numEntries buffer) for_ (WB.toList buffer) $ \(k, e) -> Builder.addKeyOp builder k (fmap (WBB.mkRawBlobRef blobs) e) --TODO: the fmap entry here reallocates even when there are no blobs - fromBuilder builder + fromBuilder refCtx builder {------------------------------------------------------------------------------- Snapshot @@ -267,6 +274,7 @@ fromWriteBuffer fs hbio salt params fsPaths buffer blobs = do {-# SPECIALISE openFromDisk :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> RunDataCaching -> IndexType -> Bloom.Salt @@ -293,13 +301,14 @@ openFromDisk :: (MonadSTM m, MonadMask m, PrimMonad m) => HasFS m h -> HasBlockIO m h + -> RefCtx -> RunDataCaching -> IndexType -> Bloom.Salt -- ^ Expected salt -> RunFsPaths -> m (Ref (Run m h)) -- TODO: make exception safe -openFromDisk fs hbio runRunDataCaching indexType expectedSalt runRunFsPaths = do +openFromDisk fs hbio refCtx runRunDataCaching indexType expectedSalt runRunFsPaths = do expectedChecksums <- CRC.expectValidFile fs (runChecksumsPath runRunFsPaths) CRC.FormatChecksumsFile . fromChecksumsFile @@ -323,9 +332,9 @@ openFromDisk fs hbio runRunDataCaching indexType expectedSalt runRunFsPaths = do runKOpsFile <- FS.hOpen fs (runKOpsPath runRunFsPaths) FS.ReadMode -- TODO: openBlobFile should be called with exceptions masked - runBlobFile <- openBlobFile fs (runBlobPath runRunFsPaths) FS.ReadMode + runBlobFile <- openBlobFile fs refCtx (runBlobPath runRunFsPaths) FS.ReadMode setRunDataCaching hbio runKOpsFile runRunDataCaching - newRef (finaliser fs runKOpsFile runBlobFile runRunFsPaths) $ \runRefCounter -> + newRef refCtx (finaliser fs runKOpsFile runBlobFile runRunFsPaths) $ \runRefCounter -> Run { runHasFS = fs , runHasBlockIO = hbio diff --git a/src/Database/LSMTree/Internal/Snapshot.hs b/src/Database/LSMTree/Internal/Snapshot.hs index a5681c9b9..d953040cd 100644 --- a/src/Database/LSMTree/Internal/Snapshot.hs +++ b/src/Database/LSMTree/Internal/Snapshot.hs @@ -232,6 +232,7 @@ instance NFData r => NFData (SnapPreExistingRun r) where {-# SPECIALISE fromSnapMergingTree :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> Bloom.Salt -> UniqCounter IO -> ResolveSerialisedValue @@ -247,6 +248,7 @@ fromSnapMergingTree :: forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m) => HasFS m h -> HasBlockIO m h + -> RefCtx -> Bloom.Salt -> UniqCounter m -> ResolveSerialisedValue @@ -254,7 +256,7 @@ fromSnapMergingTree :: -> ActionRegistry m -> SnapMergingTree (Ref (Run m h)) -> m (Ref (MT.MergingTree m h)) -fromSnapMergingTree hfs hbio salt uc resolve dir = +fromSnapMergingTree hfs hbio refCtx salt uc resolve dir = go where -- Reference strategy: @@ -269,7 +271,7 @@ fromSnapMergingTree hfs hbio salt uc resolve dir = go reg (SnapMergingTree (SnapCompletedTreeMerge run)) = withRollback reg - (MT.newCompletedMerge run) + (MT.newCompletedMerge refCtx run) releaseRef go reg (SnapMergingTree (SnapPendingTreeMerge @@ -277,7 +279,7 @@ fromSnapMergingTree hfs hbio salt uc resolve dir = prs' <- traverse (fromSnapPreExistingRun reg) prs mmt' <- traverse (go reg) mmt mt <- withRollback reg - (MT.newPendingLevelMerge prs' mmt') + (MT.newPendingLevelMerge refCtx prs' mmt') releaseRef traverse_ (delayedCommit reg . releasePER) prs' traverse_ (delayedCommit reg . releaseRef) mmt' @@ -287,17 +289,17 @@ fromSnapMergingTree hfs hbio salt uc resolve dir = (SnapPendingUnionMerge mts))) = do mts' <- traverse (go reg) mts mt <- withRollback reg - (MT.newPendingUnionMerge mts') + (MT.newPendingUnionMerge refCtx mts') releaseRef traverse_ (delayedCommit reg . releaseRef) mts' pure mt go reg (SnapMergingTree (SnapOngoingTreeMerge smrs)) = do mr <- withRollback reg - (fromSnapMergingRun hfs hbio salt uc resolve dir smrs) + (fromSnapMergingRun hfs hbio refCtx salt uc resolve dir smrs) releaseRef mt <- withRollback reg - (MT.newOngoingMerge mr) + (MT.newOngoingMerge refCtx mr) releaseRef delayedCommit reg (releaseRef mr) pure mt @@ -312,7 +314,7 @@ fromSnapMergingTree hfs hbio salt uc resolve dir = fromSnapPreExistingRun reg (SnapPreExistingMergingRun smrs) = MT.PreExistingMergingRun <$> withRollback reg - (fromSnapMergingRun hfs hbio salt uc resolve dir smrs) + (fromSnapMergingRun hfs hbio refCtx salt uc resolve dir smrs) releaseRef releasePER (MT.PreExistingRun r) = releaseRef r @@ -487,6 +489,7 @@ snapshotWriteBuffer hfs hbio activeUc snapUc reg activeDir snapDir wb wbb = do -> ResolveSerialisedValue -> HasFS IO h -> HasBlockIO IO h + -> RefCtx -> UniqCounter IO -> ActiveDir -> WriteBufferFsPaths @@ -498,11 +501,12 @@ openWriteBuffer :: -> ResolveSerialisedValue -> HasFS m h -> HasBlockIO m h + -> RefCtx -> UniqCounter m -> ActiveDir -> WriteBufferFsPaths -> m (WriteBuffer, Ref (WriteBufferBlobs m h)) -openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths = do +openWriteBuffer reg resolve hfs hbio refCtx uc activeDir snapWriteBufferPaths = do -- Check the checksums -- TODO: This reads the blobfile twice: once to check the CRC and once more -- to copy it from the snapshot directory to the active directory. @@ -519,7 +523,7 @@ openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths = do copyFile hfs reg (writeBufferBlobPath snapWriteBufferPaths) activeWriteBufferBlobPath writeBufferBlobs <- withRollback reg - (WBB.open hfs activeWriteBufferBlobPath FS.AllowExisting) + (WBB.open hfs refCtx activeWriteBufferBlobPath FS.AllowExisting) releaseRef -- Read write buffer key/ops let kOpsPath = ForKOps (writeBufferKOpsPath snapWriteBufferPaths) @@ -582,6 +586,7 @@ snapshotRun hfs hbio snapUc reg (NamedSnapshotDir targetDir) run = do {-# SPECIALISE openRun :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> UniqCounter IO -> ActionRegistry IO -> NamedSnapshotDir @@ -600,6 +605,7 @@ openRun :: (MonadMask m, MonadSTM m, MonadST m) => HasFS m h -> HasBlockIO m h + -> RefCtx -> UniqCounter m -> ActionRegistry m -> NamedSnapshotDir @@ -607,7 +613,7 @@ openRun :: -> Bloom.Salt -> SnapshotRun -> m (Ref (Run m h)) -openRun hfs hbio uc reg +openRun hfs hbio refCtx uc reg (NamedSnapshotDir sourceDir) (ActiveDir targetDir) expectedSalt SnapshotRun { @@ -621,7 +627,7 @@ openRun hfs hbio uc reg hardLinkRunFiles hfs hbio reg sourcePaths targetPaths withRollback reg - (Run.openFromDisk hfs hbio caching indexType expectedSalt targetPaths) + (Run.openFromDisk hfs hbio refCtx caching indexType expectedSalt targetPaths) releaseRef {------------------------------------------------------------------------------- @@ -631,6 +637,7 @@ openRun hfs hbio uc reg {-# SPECIALISE fromSnapLevels :: HasFS IO h -> HasBlockIO IO h + -> RefCtx -> Bloom.Salt -> UniqCounter IO -> TableConfig @@ -645,6 +652,7 @@ fromSnapLevels :: forall m h. (MonadMask m, MonadMVar m, MonadSTM m, MonadST m) => HasFS m h -> HasBlockIO m h + -> RefCtx -> Bloom.Salt -> UniqCounter m -> TableConfig @@ -653,7 +661,7 @@ fromSnapLevels :: -> ActiveDir -> SnapLevels (Ref (Run m h)) -> m (Levels m h) -fromSnapLevels hfs hbio salt uc conf resolve reg dir (SnapLevels levels) = +fromSnapLevels hfs hbio refCtx salt uc conf resolve reg dir (SnapLevels levels) = V.iforM levels $ \i -> fromSnapLevel (LevelNo (i+1)) where -- TODO: we may wish to trace the merges created during snapshot restore: @@ -679,20 +687,21 @@ fromSnapLevels hfs hbio salt uc conf resolve reg dir (SnapLevels levels) = fromSnapIncomingRun ln (SnapIncomingMergingRun mergePolicy nominalDebt nominalCredits smrs) = bracket - (fromSnapMergingRun hfs hbio salt uc resolve dir smrs) + (fromSnapMergingRun hfs hbio refCtx salt uc resolve dir smrs) releaseRef $ \mr -> do ir <- newIncomingMergingRun mergePolicy nominalDebt mr -- This will set the correct nominal credits, but it will not do any -- more merging work because fromSnapMergingRun already supplies -- all the merging credits already. - supplyCreditsIncomingRun conf ln ir nominalCredits + supplyCreditsIncomingRun refCtx conf ln ir nominalCredits pure ir {-# SPECIALISE fromSnapMergingRun :: MR.IsMergeType t => HasFS IO h -> HasBlockIO IO h + -> RefCtx -> Bloom.Salt -> UniqCounter IO -> ResolveSerialisedValue @@ -703,21 +712,22 @@ fromSnapMergingRun :: (MonadMask m, MonadMVar m, MonadSTM m, MonadST m, MR.IsMergeType t) => HasFS m h -> HasBlockIO m h + -> RefCtx -> Bloom.Salt -> UniqCounter m -> ResolveSerialisedValue -> ActiveDir -> SnapMergingRun t (Ref (Run m h)) -> m (Ref (MR.MergingRun t m h)) -fromSnapMergingRun _ _ _ _ _ _ (SnapCompletedMerge mergeDebt r) = - MR.newCompleted mergeDebt r +fromSnapMergingRun _ _ refCtx _ _ _ _ (SnapCompletedMerge mergeDebt r) = + MR.newCompleted refCtx mergeDebt r -fromSnapMergingRun hfs hbio salt uc resolve dir +fromSnapMergingRun hfs hbio refCtx salt uc resolve dir (SnapOngoingMerge runParams mergeCredits rs mergeType) = do bracketOnError (do uniq <- incrUniqCounter uc let runPaths = runPath dir (uniqueToRunNumber uniq) - MR.new hfs hbio resolve salt runParams mergeType runPaths rs) + MR.new hfs hbio refCtx resolve salt runParams mergeType runPaths rs) releaseRef $ \mr -> do -- When a snapshot is created, merge progress is lost, so we have to -- redo merging work here. The MergeCredits in SnapMergingRun tracks @@ -726,7 +736,7 @@ fromSnapMergingRun hfs hbio salt uc resolve dir --TODO: the threshold should be stored with the MergingRun -- here we want to supply the credits now, so we can use a threshold of 1 let thresh = MR.CreditThreshold (MR.UnspentCredits 1) - _ <- MR.supplyCreditsAbsolute mr thresh mergeCredits + _ <- MR.supplyCreditsAbsolute refCtx mr thresh mergeCredits pure mr {------------------------------------------------------------------------------- diff --git a/src/Database/LSMTree/Internal/Unsafe.hs b/src/Database/LSMTree/Internal/Unsafe.hs index d39c7a40e..4afa73a38 100644 --- a/src/Database/LSMTree/Internal/Unsafe.hs +++ b/src/Database/LSMTree/Internal/Unsafe.hs @@ -428,6 +428,10 @@ data SessionEnv m h = SessionEnv { -- | Similarly to tables, open cursors are tracked so they can be closed -- once the session is closed. See 'sessionOpenTables'. , sessionOpenCursors :: !(StrictMVar m (Map CursorId (Cursor m h))) + + -- | The scope within which references exist. This context is created once a + -- new session is created, and closed once the session is closed. + , sessionRefCtx :: !RefCtx } {-# INLINE sessionId #-} @@ -742,11 +746,11 @@ closeSession :: -> m () closeSession Session{sessionState, sessionTracer} = do traceWith sessionTracer TraceCloseSession - modifyWithActionRegistry_ + mayRefCtx <- modifyWithActionRegistry (RW.unsafeAcquireWriteAccess sessionState) (atomically . RW.unsafeReleaseWriteAccess sessionState) $ \reg -> \case - SessionClosed -> pure SessionClosed + SessionClosed -> pure (SessionClosed, Nothing) SessionOpen seshEnv -> do -- Close tables and cursors first, so that we know none are open when we -- release session-wide resources. @@ -783,7 +787,11 @@ closeSession Session{sessionState, sessionTracer} = do -- message as late as possible. delayedCommit reg $ traceWith sessionTracer TraceClosedSession - pure SessionClosed + pure (SessionClosed, Just (sessionRefCtx seshEnv)) + + -- Check for forgotten references as the very last thing before returning + -- from 'closeSession'. + forM_ mayRefCtx closeRefCtx {-# SPECIALISE acquireSessionLock :: HasFS IO h @@ -840,6 +848,7 @@ mkSession :: -> m (Session m h) mkSession tr hfs hbio reg root@(SessionRoot dir) lockFile salt = do counterVar <- newUniqCounter 0 + refCtx <- newRefCtx openTablesVar <- newMVar Map.empty openCursorsVar <- newMVar Map.empty sessionVar <- RW.new $ SessionOpen $ SessionEnv { @@ -850,6 +859,7 @@ mkSession tr hfs hbio reg root@(SessionRoot dir) lockFile salt = do , sessionLockFile = lockFile , sessionOpenTables = openTablesVar , sessionOpenCursors = openCursorsVar + , sessionRefCtx = refCtx } -- Note: we're "abusing" the action registry to trace the success @@ -1051,7 +1061,7 @@ newEmptyTableContent uc seshEnv reg = do let tableWriteBuffer = WB.empty tableWriteBufferBlobs <- withRollback reg - (WBB.new (sessionHasFS seshEnv) blobpath) + (WBB.new (sessionHasFS seshEnv) (sessionRefCtx seshEnv) blobpath) releaseRef let tableLevels = V.empty tableCache <- mkLevelsCache reg tableLevels @@ -1274,6 +1284,7 @@ updates resolve es t = do resolve hfs (tableHasBlockIO tEnv) + (sessionRefCtx (tableSessionEnv tEnv)) (tableSessionRoot tEnv) (tableSessionSalt tEnv) (tableSessionUniqCounter t) @@ -1322,7 +1333,7 @@ retrieveBlobs sesh wrefs = do let hbio = sessionHasBlockIO seshEnv in handle (\(BlobRef.WeakBlobRefInvalid i) -> throwIO (ErrBlobRefInvalid i)) $ - BlobRef.readWeakBlobRefs hbio wrefs + BlobRef.readWeakBlobRefs hbio (sessionRefCtx seshEnv) wrefs {------------------------------------------------------------------------------- Cursors @@ -1772,15 +1783,15 @@ openTableFromSnapshot policyOveride sesh snap label resolve = do -- Read write buffer let snapWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer (tableWriteBuffer, tableWriteBufferBlobs) <- - openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths + openWriteBuffer reg resolve hfs hbio (sessionRefCtx seshEnv) uc activeDir snapWriteBufferPaths -- Hard link runs into the active directory, - snapLevels' <- traverse (openRun hfs hbio uc reg snapDir activeDir salt) snapLevels + snapLevels' <- traverse (openRun hfs hbio (sessionRefCtx seshEnv) uc reg snapDir activeDir salt) snapLevels unionLevel <- case mTreeOpt of Nothing -> pure NoUnion Just mTree -> do - snapTree <- traverse (openRun hfs hbio uc reg snapDir activeDir salt) mTree - mt <- fromSnapMergingTree hfs hbio salt uc resolve activeDir reg snapTree + snapTree <- traverse (openRun hfs hbio (sessionRefCtx seshEnv) uc reg snapDir activeDir salt) mTree + mt <- fromSnapMergingTree hfs hbio (sessionRefCtx seshEnv) salt uc resolve activeDir reg snapTree isStructurallyEmpty mt >>= \case True -> pure NoUnion @@ -1790,7 +1801,7 @@ openTableFromSnapshot policyOveride sesh snap label resolve = do pure (Union mt cache) -- Convert from the snapshot format, restoring merge progress in the process - tableLevels <- fromSnapLevels hfs hbio salt uc conf resolve reg activeDir snapLevels' + tableLevels <- fromSnapLevels hfs hbio (sessionRefCtx seshEnv) salt uc conf resolve reg activeDir snapLevels' traverse_ (delayedCommit reg . releaseRef) snapLevels' tableCache <- mkLevelsCache reg tableLevels @@ -2002,7 +2013,7 @@ unionsInOpenSession reg sesh seshEnv conf tr !tableId ts = do withRollback reg (tableContentToMergingTree (sessionUniqCounter sesh) seshEnv conf tc) releaseRef - mt <- withRollback reg (newPendingUnionMerge mts) releaseRef + mt <- withRollback reg (newPendingUnionMerge (sessionRefCtx seshEnv) mts) releaseRef -- The mts here is a temporary value, since newPendingUnionMerge -- will make its own references, so release mts at the end of @@ -2055,7 +2066,7 @@ tableContentToMergingTree uc seshEnv conf NoUnion -> Nothing Union mt _ -> Just mt -- we could reuse the cache, but it -- would complicate things - in newPendingLevelMerge runs unionmt + in newPendingLevelMerge (sessionRefCtx seshEnv) runs unionmt where levelToPreExistingRuns :: Level m h -> [PreExistingRun m h] levelToPreExistingRuns Level{incomingRun, residentRuns} = @@ -2083,7 +2094,8 @@ writeBufferToNewRun uc sessionRoot = root, sessionSalt = salt, sessionHasFS = hfs, - sessionHasBlockIO = hbio + sessionHasBlockIO = hbio, + sessionRefCtx = refCtx } conf TableContent{ @@ -2096,7 +2108,7 @@ writeBufferToNewRun uc let (!runParams, !runPaths) = mergingRunParamsForLevel (Paths.activeDir root) conf uniq (LevelNo 1) Run.fromWriteBuffer - hfs hbio salt + hfs hbio refCtx salt runParams runPaths tableWriteBuffer tableWriteBufferBlobs @@ -2193,6 +2205,7 @@ supplyUnionCredits resolve t credits = do MT.supplyCredits (tableHasFS tEnv) (tableHasBlockIO tEnv) + (sessionRefCtx (tableSessionEnv tEnv)) resolve (tableSessionSalt tEnv) (runParamsForLevel conf UnionLevel) diff --git a/src/Database/LSMTree/Internal/WriteBufferBlobs.hs b/src/Database/LSMTree/Internal/WriteBufferBlobs.hs index 72f3702aa..db16c59b1 100644 --- a/src/Database/LSMTree/Internal/WriteBufferBlobs.hs +++ b/src/Database/LSMTree/Internal/WriteBufferBlobs.hs @@ -124,7 +124,7 @@ instance NFData h => NFData (WriteBufferBlobs m h) where instance RefCounted m (WriteBufferBlobs m h) where getRefCounter = writeBufRefCounter -{-# SPECIALISE new :: HasFS IO h -> FS.FsPath -> IO (Ref (WriteBufferBlobs IO h)) #-} +{-# SPECIALISE new :: HasFS IO h -> RefCtx -> FS.FsPath -> IO (Ref (WriteBufferBlobs IO h)) #-} -- | Create a new 'WriteBufferBlobs' with a new file. -- -- REF: the resulting reference must be released once it is no longer used. @@ -134,11 +134,12 @@ instance RefCounted m (WriteBufferBlobs m h) where new :: (PrimMonad m, MonadMask m) => HasFS m h + -> RefCtx -> FS.FsPath -> m (Ref (WriteBufferBlobs m h)) -new fs blobFileName = open fs blobFileName FS.MustBeNew +new fs refCtx blobFileName = open fs refCtx blobFileName FS.MustBeNew -{-# SPECIALISE open :: HasFS IO h -> FS.FsPath -> FS.AllowExisting -> IO (Ref (WriteBufferBlobs IO h)) #-} +{-# SPECIALISE open :: HasFS IO h -> RefCtx -> FS.FsPath -> FS.AllowExisting -> IO (Ref (WriteBufferBlobs IO h)) #-} -- | Open a `WriteBufferBlobs` file and sets the file pointer to the end of the file. -- -- REF: the resulting reference must be released once it is no longer used. @@ -148,18 +149,19 @@ new fs blobFileName = open fs blobFileName FS.MustBeNew open :: (PrimMonad m, MonadMask m) => HasFS m h + -> RefCtx -> FS.FsPath -> FS.AllowExisting -> m (Ref (WriteBufferBlobs m h)) -open fs blobFileName blobFileAllowExisting = do +open fs refCtx blobFileName blobFileAllowExisting = do -- Must use read/write mode because we write blobs when adding, but -- we can also be asked to retrieve blobs at any time. bracketOnError - (openBlobFile fs blobFileName (FS.ReadWriteMode blobFileAllowExisting)) + (openBlobFile fs refCtx blobFileName (FS.ReadWriteMode blobFileAllowExisting)) releaseRef - (fromBlobFile fs) + (fromBlobFile fs refCtx) -{-# SPECIALISE fromBlobFile :: HasFS IO h -> Ref (BlobFile IO h) -> IO (Ref (WriteBufferBlobs IO h)) #-} +{-# SPECIALISE fromBlobFile :: HasFS IO h -> RefCtx -> Ref (BlobFile IO h) -> IO (Ref (WriteBufferBlobs IO h)) #-} -- | Make a `WriteBufferBlobs` from a `BlobFile` and set the file pointer to the -- end of the file. -- @@ -170,14 +172,15 @@ open fs blobFileName blobFileAllowExisting = do fromBlobFile :: (PrimMonad m, MonadMask m) => HasFS m h + -> RefCtx -> Ref (BlobFile m h) -> m (Ref (WriteBufferBlobs m h)) -fromBlobFile fs blobFile = do +fromBlobFile fs refCtx blobFile = do blobFilePointer <- newFilePointer -- Set the blob file pointer to the end of the file blobFileSize <- withRef blobFile $ FS.hGetSize fs . blobFileHandle void . updateFilePointer blobFilePointer . fromIntegral $ blobFileSize - newRef (releaseRef blobFile) $ \writeBufRefCounter -> + newRef refCtx (releaseRef blobFile) $ \writeBufRefCounter -> WriteBufferBlobs { blobFile, blobFilePointer, diff --git a/test-control/Test/Control/RefCount.hs b/test-control/Test/Control/RefCount.hs index c6180038d..c33e8279d 100644 --- a/test-control/Test/Control/RefCount.hs +++ b/test-control/Test/Control/RefCount.hs @@ -164,13 +164,13 @@ instance RefCounted m (TestObject2 m) where prop_ref_double_free :: (PrimMonad m, MonadMask m, MonadFail m) => m Property -prop_ref_double_free = do +prop_ref_double_free = withRefCtx $ \refCtx -> do finalised <- newMutVar False - ref <- newRef (writeMutVar finalised True) TestObject + ref <- newRef refCtx (writeMutVar finalised True) TestObject releaseRef ref True <- readMutVar finalised Left e@RefDoubleRelease{} <- try $ releaseRef ref - checkForgottenRefs + checkForgottenRefs refCtx -- Print the displayed exception as an example pure $ tabulate "displayException" [displayException e] () @@ -178,9 +178,9 @@ prop_ref_use_after_free :: (PrimMonad m, MonadMask m, MonadFail m) => Bool -- ^ Test the DeRef pattern -> m Property -prop_ref_use_after_free testDeRef = do +prop_ref_use_after_free testDeRef = withRefCtx $ \refCtx -> do finalised <- newMutVar False - ref <- newRef (writeMutVar finalised True) TestObject + ref <- newRef refCtx (writeMutVar finalised True) TestObject releaseRef ref True <- readMutVar finalised Left e@RefUseAfterRelease{} <- try $ withRef ref return @@ -188,18 +188,18 @@ prop_ref_use_after_free testDeRef = do Left RefUseAfterRelease{} <- try $ case ref of DeRef _ -> pure () pure () Left RefUseAfterRelease{} <- try $ dupRef ref - checkForgottenRefs + checkForgottenRefs refCtx -- Print the displayed exception as an example pure $ tabulate "displayException" [displayException e] () prop_ref_never_released0 :: (PrimMonad m, MonadMask m) => m () -prop_ref_never_released0 = do +prop_ref_never_released0 = withRefCtx $ \refCtx -> do finalised <- newMutVar False - ref <- newRef (writeMutVar finalised True) TestObject + ref <- newRef refCtx (writeMutVar finalised True) TestObject _ <- case ref of DeRef _ -> pure () - checkForgottenRefs + checkForgottenRefs refCtx -- ref is still being used, so check should not fail _ <- case ref of DeRef _ -> pure () releaseRef ref @@ -207,29 +207,29 @@ prop_ref_never_released0 = do prop_ref_never_released1 :: (PrimMonad m, MonadMask m) => m Property -prop_ref_never_released1 = +prop_ref_never_released1 = withRefCtx $ \refCtx -> do handle expectRefNeverReleased $ do finalised <- newMutVar False - ref <- newRef (writeMutVar finalised True) TestObject + ref <- newRef refCtx (writeMutVar finalised True) TestObject _ <- withRef ref return _ <- case ref of DeRef _ -> pure () -- ref is never released, so should fail - checkForgottenRefs + checkForgottenRefs refCtx pure (counterexample "no forgotten refs detected" $ property False) prop_ref_never_released2 :: (PrimMonad m, MonadMask m) => m Property -prop_ref_never_released2 = +prop_ref_never_released2 = withRefCtx $ \refCtx -> do handle expectRefNeverReleased $ do finalised <- newMutVar False - ref <- newRef (writeMutVar finalised True) TestObject + ref <- newRef refCtx (writeMutVar finalised True) TestObject ref2 <- dupRef ref releaseRef ref _ <- withRef ref2 return _ <- case ref2 of DeRef _ -> pure () -- ref2 is never released, so should fail - checkForgottenRefs + checkForgottenRefs refCtx pure (counterexample "no forgotten refs detected" $ property False) expectRefNeverReleased :: Monad m => RefException -> m Property @@ -243,10 +243,10 @@ expectRefNeverReleased e = prop_release_ref_exception :: (PrimMonad m, MonadMask m) => m () -prop_release_ref_exception = do +prop_release_ref_exception = withRefCtx $ \refCtx -> do finalised <- newMutVar False - ref <- newRef (writeMutVar finalised True >> throwIO (userError "oops")) TestObject + ref <- newRef refCtx (writeMutVar finalised True >> throwIO (userError "oops")) TestObject _ <- try @_ @SomeException (releaseRef ref) - checkForgottenRefs + checkForgottenRefs refCtx #endif diff --git a/test/Main.hs b/test/Main.hs index 6369428a1..f6c8c3bd2 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -2,8 +2,6 @@ -- module Main (main) where -import qualified Control.RefCount - import qualified Test.Database.LSMTree import qualified Test.Database.LSMTree.Class import qualified Test.Database.LSMTree.Generators @@ -98,16 +96,3 @@ main = do , Test.Database.LSMTree.UnitTests.tests , Test.FS.tests ] - Control.RefCount.checkForgottenRefs - -- This use of checkForgottenRefs is a last resort. Refs that are forgotten - -- before being released are detected by the first Ref operation after a - -- major GC. So they may be thrown during the run of individual tests (though - -- depending on GC timing this may be during a subsequent test to the one - -- that triggered the bug). As a last resort, checkForgottenRefs does a last - -- major GC and will trigger any forgotten refs. So this will reliably catch - -- the errors, but will not identify where they come from, not even which - -- test! - -- - -- If this exception occurs, it may be necessary to put proper use of - -- checkForgottenRefs into the tests suspected of being the culprit to - -- identiyfy which one is really failing. diff --git a/test/Test/Database/LSMTree/Generators.hs b/test/Test/Database/LSMTree/Generators.hs index e75ddce44..46a42957e 100644 --- a/test/Test/Database/LSMTree/Generators.hs +++ b/test/Test/Database/LSMTree/Generators.hs @@ -2,6 +2,7 @@ module Test.Database.LSMTree.Generators ( tests ) where +import Control.RefCount (withRefCtx) import Data.Bifoldable (bifoldMap) import Data.Coerce (Coercible, coerce) import qualified Data.Map.Strict as Map @@ -158,12 +159,12 @@ prop_withRunDoesntLeak :: -> FS.HasBlockIO IO h -> SerialisedRunData -> IO Property -prop_withRunDoesntLeak hfs hbio rd = do +prop_withRunDoesntLeak hfs hbio rd = withRefCtx $ \refCtx ->do let indexType = Index.Ordinary let path = FS.mkFsPath ["something-1"] let fsPaths = RunFsPaths path (RunNumber 0) FS.createDirectory hfs path - withRunAt hfs hbio testSalt (runParams indexType) fsPaths rd $ \_run -> do + withRunAt hfs hbio refCtx testSalt (runParams indexType) fsPaths rd $ \_run -> do pure (QC.property True) prop_withMergingRunDoesntLeak :: @@ -171,12 +172,12 @@ prop_withMergingRunDoesntLeak :: -> FS.HasBlockIO IO h -> SerialisedMergingRunData MR.LevelMergeType -> IO Property -prop_withMergingRunDoesntLeak hfs hbio mrd = do +prop_withMergingRunDoesntLeak hfs hbio mrd = withRefCtx $ \refCtx -> do let indexType = Index.Ordinary let path = FS.mkFsPath ["something-2"] FS.createDirectory hfs path counter <- newUniqCounter 0 - withMergingRun hfs hbio resolveVal testSalt (runParams indexType) path counter mrd $ + withMergingRun hfs hbio refCtx resolveVal testSalt (runParams indexType) path counter mrd $ \_mr -> do pure (QC.property True) @@ -187,12 +188,12 @@ prop_withMergingTreeDoesntLeak :: -> FS.HasBlockIO IO h -> SerialisedMergingTreeData -> IO Property -prop_withMergingTreeDoesntLeak hfs hbio mrd = do +prop_withMergingTreeDoesntLeak hfs hbio mrd = withRefCtx $ \refCtx -> do let indexType = Index.Ordinary let path = FS.mkFsPath ["something-3"] FS.createDirectory hfs path counter <- newUniqCounter 0 - withMergingTree hfs hbio resolveVal testSalt (runParams indexType) path counter mrd $ + withMergingTree hfs hbio refCtx resolveVal testSalt (runParams indexType) path counter mrd $ \_tree -> do pure (QC.property True) diff --git a/test/Test/Database/LSMTree/Internal.hs b/test/Test/Database/LSMTree/Internal.hs index dfaf33e2c..a655b009f 100644 --- a/test/Test/Database/LSMTree/Internal.hs +++ b/test/Test/Database/LSMTree/Internal.hs @@ -4,6 +4,7 @@ module Test.Database.LSMTree.Internal (tests) where +import Control.RefCount import Control.Tracer import Data.Coerce (coerce) import qualified Data.Map.Strict as Map @@ -59,13 +60,13 @@ prop_roundtripCursor :: -> Maybe SerialisedKey -- ^ Inclusive upper bound -> V.Vector (SerialisedKey, Entry SerialisedValue SerialisedBlob) -> Property -prop_roundtripCursor lb ub kops = ioProperty $ +prop_roundtripCursor lb ub kops = ioProperty $ withRefCtx $ \refCtx -> withTempIOHasBlockIO "prop_roundtripCursor" $ \hfs hbio -> do withOpenSession nullTracer hfs hbio testSalt (FS.mkFsPath []) $ \sesh -> do withTable sesh conf $ \t -> do updates resolve (coerce kops) t fromCursor <- withCursor resolve (toOffsetKey lb) t $ \c -> - fetchBlobs hfs =<< readCursorUntil ub c + fetchBlobs hfs refCtx =<< readCursorUntil ub c pure $ tabulate "duplicates" (show <$> Map.elems duplicates) $ tabulate "any blobs" [show (any (isJust . snd . snd) fromCursor)] $ @@ -73,10 +74,10 @@ prop_roundtripCursor lb ub kops = ioProperty $ where conf = testTableConfig - fetchBlobs :: FS.HasFS IO h + fetchBlobs :: FS.HasFS IO h -> RefCtx -> V.Vector (k, (v, Maybe (WeakBlobRef IO h))) -> IO (V.Vector (k, (v, Maybe SerialisedBlob))) - fetchBlobs hfs = traverse (traverse (traverse (traverse (readWeakBlobRef hfs)))) + fetchBlobs hfs refCtx = traverse (traverse (traverse (traverse (readWeakBlobRef hfs refCtx)))) toOffsetKey = maybe NoOffsetKey (OffsetKey . coerce) diff --git a/test/Test/Database/LSMTree/Internal/BlobFile/FS.hs b/test/Test/Database/LSMTree/Internal/BlobFile/FS.hs index da2f5c0c9..c96a2702b 100644 --- a/test/Test/Database/LSMTree/Internal/BlobFile/FS.hs +++ b/test/Test/Database/LSMTree/Internal/BlobFile/FS.hs @@ -29,11 +29,12 @@ prop_fault_openRelease doCreateFile om (NoCleanupErrors openErrors) (NoCleanupErrors releaseErrors) = ioProperty $ + withRefCtx $ \refCtx -> withSimErrorHasFS propPost MockFS.empty emptyErrors $ \hfs fsVar errsVar -> do when doCreateFile $ withFile hfs path (WriteMode MustBeNew) $ \_ -> pure () eith <- try @_ @FsError $ - bracket (acquire hfs errsVar) (release errsVar) $ \_blobFile -> do + bracket (acquire hfs refCtx errsVar) (release errsVar) $ \_blobFile -> do fs' <- atomically $ readTMVar fsVar pure $ propNumOpenHandles 1 fs' .&&. propNumDirEntries (mkFsPath []) 1 fs' pure $ case eith of @@ -45,8 +46,8 @@ prop_fault_openRelease doCreateFile om root = mkFsPath [] path = mkFsPath ["blobfile"] - acquire hfs errsVar = - withErrors errsVar openErrors $ openBlobFile hfs path om + acquire hfs refCtx errsVar = + withErrors errsVar openErrors $ openBlobFile hfs refCtx path om release errsVar blobFile = withErrors errsVar releaseErrors $ releaseRef blobFile diff --git a/test/Test/Database/LSMTree/Internal/Lookup.hs b/test/Test/Database/LSMTree/Internal/Lookup.hs index 5c0421cf8..ed573676e 100644 --- a/test/Test/Database/LSMTree/Internal/Lookup.hs +++ b/test/Test/Database/LSMTree/Internal/Lookup.hs @@ -322,8 +322,9 @@ prop_roundtripFromWriteBufferLookupIO :: -> Property prop_roundtripFromWriteBufferLookupIO (SmallList dats) = ioProperty $ + withRefCtx $ \refCtx -> withTempIOHasBlockIO "prop_roundtripFromWriteBufferLookupIO" $ \hfs hbio -> - withWbAndRuns hfs hbio Index.Ordinary dats $ \wb wbblobs runs -> do + withWbAndRuns hfs hbio refCtx Index.Ordinary dats $ \wb wbblobs runs -> do let model :: Map SerialisedKey (Entry SerialisedValue SerialisedBlob) model = Map.unionsWith (Entry.combine resolveV) (map runData dats) keys = V.fromList [ k | InMemLookupData{lookups} <- dats @@ -332,7 +333,7 @@ prop_roundtripFromWriteBufferLookupIO (SmallList dats) = modelres = V.map (\k -> Map.lookup k model) keys arenaManager <- newArenaManager realres <- - fetchBlobs hfs =<< -- retrieve blobs to match type of model result + fetchBlobs hfs refCtx =<< -- retrieve blobs to match type of model result lookupsIOWithWriteBuffer hbio arenaManager @@ -348,10 +349,10 @@ prop_roundtripFromWriteBufferLookupIO (SmallList dats) = where resolveV = \(SerialisedValue v1) (SerialisedValue v2) -> SerialisedValue (v1 <> v2) - fetchBlobs :: FS.HasFS IO h + fetchBlobs :: FS.HasFS IO h -> RefCtx -> (V.Vector (Maybe (Entry v (WeakBlobRef IO h)))) -> IO (V.Vector (Maybe (Entry v SerialisedBlob))) - fetchBlobs hfs = traverse (traverse (traverse (readWeakBlobRef hfs))) + fetchBlobs hfs refCtx = traverse (traverse (traverse (readWeakBlobRef hfs refCtx))) -- | Given a bunch of 'InMemLookupData', prepare the data into the form needed -- for 'lookupsIOWithWriteBuffer': a write buffer (and blobs) and a vector of @@ -360,6 +361,7 @@ prop_roundtripFromWriteBufferLookupIO (SmallList dats) = -- withWbAndRuns :: FS.HasFS IO h -> FS.HasBlockIO IO h + -> RefCtx -> IndexType -> [InMemLookupData SerialisedKey SerialisedValue SerialisedBlob] -> ( WB.WriteBuffer @@ -367,20 +369,20 @@ withWbAndRuns :: FS.HasFS IO h -> V.Vector (Ref (Run.Run IO h)) -> IO a) -> IO a -withWbAndRuns hfs _ _ [] action = +withWbAndRuns hfs _ refCtx _ [] action = bracket - (WBB.new hfs (FS.mkFsPath ["wbblobs"])) + (WBB.new hfs refCtx (FS.mkFsPath ["wbblobs"])) releaseRef (\wbblobs -> action WB.empty wbblobs V.empty) -withWbAndRuns hfs hbio indexType (wbdat:rundats) action = - bracket (WBB.new hfs (FS.mkFsPath ["wbblobs"])) releaseRef $ \wbblobs -> do +withWbAndRuns hfs hbio refCtx indexType (wbdat:rundats) action = + bracket (WBB.new hfs refCtx (FS.mkFsPath ["wbblobs"])) releaseRef $ \wbblobs -> do wbkops <- traverse (traverse (WBB.addBlob hfs wbblobs)) (runData wbdat) let wb = WB.fromMap wbkops let rds = map (RunData . runData) rundats counter <- newUniqCounter 1 - withRuns hfs hbio testSalt (runParams indexType) (FS.mkFsPath []) counter rds $ + withRuns hfs hbio refCtx testSalt (runParams indexType) (FS.mkFsPath []) counter rds $ \runs -> action wb wbblobs (V.fromList runs) diff --git a/test/Test/Database/LSMTree/Internal/Merge.hs b/test/Test/Database/LSMTree/Internal/Merge.hs index da72e6c18..8c8725a7c 100644 --- a/test/Test/Database/LSMTree/Internal/Merge.hs +++ b/test/Test/Database/LSMTree/Internal/Merge.hs @@ -78,16 +78,16 @@ prop_MergeDistributes :: StepSize -> SmallList (RunData SerialisedKey SerialisedValue SerialisedBlob) -> IO Property -prop_MergeDistributes fs hbio mergeType stepSize (SmallList rds) = do +prop_MergeDistributes fs hbio mergeType stepSize (SmallList rds) = withRefCtx $ \refCtx -> do let path = FS.mkFsPath [] counter <- newUniqCounter 0 - withRuns fs hbio testSalt runParams path counter rds' $ \runs -> do + withRuns fs hbio refCtx testSalt runParams path counter rds' $ \runs -> do let stepsNeeded = sum (map (Map.size . unRunData) rds) fsPathLhs <- RunFsPaths path . uniqueToRunNumber <$> incrUniqCounter counter (stepsDone, lhs) <- mergeRuns fs hbio mergeType stepSize fsPathLhs runs let runData = RunData $ mergeWriteBuffers mergeType $ fmap unRunData rds' - withRun fs hbio testSalt runParams path counter runData $ \rhs -> do + withRun fs hbio refCtx testSalt runParams path counter runData $ \rhs -> do (lhsSize, lhsFilter, lhsIndex, lhsKOps, lhsKOpsFileContent, lhsBlobFileContent) <- getRunContent lhs @@ -155,11 +155,11 @@ prop_AbortMerge :: StepSize -> SmallList (RunData SerialisedKey SerialisedValue SerialisedBlob) -> IO Property -prop_AbortMerge fs hbio mergeType (Positive stepSize) (SmallList wbs) = do +prop_AbortMerge fs hbio mergeType (Positive stepSize) (SmallList wbs) = withRefCtx $ \refCtx ->do let path = FS.mkFsPath [] let pathOut = RunFsPaths path (RunNumber 0) counter <- newUniqCounter 1 - withRuns fs hbio testSalt runParams path counter wbs' $ \runs -> do + withRuns fs hbio refCtx testSalt runParams path counter wbs' $ \runs -> do mergeToClose <- makeInProgressMerge pathOut runs traverse_ Merge.abort mergeToClose @@ -198,12 +198,12 @@ mergeRuns :: RunFsPaths -> [Ref (Run.Run IO h)] -> IO (Int, Ref (Run.Run IO h)) -mergeRuns fs hbio mergeType (Positive stepSize) fsPath runs = do +mergeRuns fs hbio mergeType (Positive stepSize) fsPath runs = withRefCtx $ \refCtx -> do Merge.new fs hbio testSalt runParams mergeType resolveVal fsPath (V.fromList runs) >>= \case - Just m -> Merge.stepsToCompletionCounted m stepSize - Nothing -> (,) 0 <$> unsafeCreateRunAt fs hbio testSalt runParams fsPath + Just m -> Merge.stepsToCompletionCounted refCtx m stepSize + Nothing -> (,) 0 <$> unsafeCreateRunAt fs hbio refCtx testSalt runParams fsPath (RunData Map.empty) type SerialisedEntry = Entry.Entry SerialisedValue SerialisedBlob diff --git a/test/Test/Database/LSMTree/Internal/MergingTree.hs b/test/Test/Database/LSMTree/Internal/MergingTree.hs index 25ea5659c..55fcf4885 100644 --- a/test/Test/Database/LSMTree/Internal/MergingTree.hs +++ b/test/Test/Database/LSMTree/Internal/MergingTree.hs @@ -70,8 +70,8 @@ testSalt = 4 -- prop_isStructurallyEmpty :: EmptyMergingTree -> Property prop_isStructurallyEmpty emt = - ioProperty $ - bracket (mkEmptyMergingTree emt) + ioProperty $ withRefCtx $ \refCtx -> + bracket (mkEmptyMergingTree refCtx emt) releaseRef isStructurallyEmpty @@ -102,17 +102,17 @@ instance Arbitrary EmptyMergingTree where : [ NonObviouslyEmptyUnionMerge mt' | mt' <- shrink mt ] -mkEmptyMergingTree :: EmptyMergingTree -> IO (Ref (MergingTree IO h)) -mkEmptyMergingTree ObviouslyEmptyLevelMerge = newPendingLevelMerge [] Nothing -mkEmptyMergingTree ObviouslyEmptyUnionMerge = newPendingUnionMerge [] -mkEmptyMergingTree (NonObviouslyEmptyLevelMerge emt) = do - mt <- mkEmptyMergingTree emt - mt' <- newPendingLevelMerge [] (Just mt) +mkEmptyMergingTree :: RefCtx -> EmptyMergingTree -> IO (Ref (MergingTree IO h)) +mkEmptyMergingTree refCtx ObviouslyEmptyLevelMerge = newPendingLevelMerge refCtx [] Nothing +mkEmptyMergingTree refCtx ObviouslyEmptyUnionMerge = newPendingUnionMerge refCtx [] +mkEmptyMergingTree refCtx (NonObviouslyEmptyLevelMerge emt) = do + mt <- mkEmptyMergingTree refCtx emt + mt' <- newPendingLevelMerge refCtx [] (Just mt) releaseRef mt pure mt' -mkEmptyMergingTree (NonObviouslyEmptyUnionMerge emts) = do - mts <- mapM mkEmptyMergingTree emts - mt' <- newPendingUnionMerge mts +mkEmptyMergingTree refCtx (NonObviouslyEmptyUnionMerge emts) = do + mts <- mapM (mkEmptyMergingTree refCtx) emts + mt' <- newPendingUnionMerge refCtx mts mapM_ releaseRef mts pure mt' @@ -127,21 +127,22 @@ prop_lookupTree :: -> V.Vector SerialisedKey -> MergingTreeData SerialisedKey SerialisedValue SerialisedBlob -> IO Property -prop_lookupTree hfs hbio keys mtd = do +prop_lookupTree hfs hbio keys mtd = withRefCtx $ \refCtx -> do let path = FS.mkFsPath [] counter <- newUniqCounter 0 - withMergingTree hfs hbio resolveVal testSalt runParams path counter mtd $ \tree -> do + withMergingTree hfs hbio refCtx resolveVal testSalt runParams path counter mtd $ \tree -> do arenaManager <- newArenaManager withActionRegistry $ \reg -> do - res <- fetchBlobs =<< lookupsIO reg arenaManager tree + res <- fetchBlobs refCtx =<< lookupsIO reg arenaManager tree pure $ normalise res === normalise (modelLookup (modelFoldMergingTree mtd) keys) where fetchBlobs :: - V.Vector (Maybe (Entry v (WeakBlobRef IO h))) + RefCtx + -> V.Vector (Maybe (Entry v (WeakBlobRef IO h))) -> IO (V.Vector (Maybe (Entry v SerialisedBlob))) - fetchBlobs = traverse (traverse (traverse (readWeakBlobRef hfs))) + fetchBlobs refCtx = traverse (traverse (traverse (readWeakBlobRef hfs refCtx))) -- the lookup accs might be different between implementation and model -- (Nothing vs. Just Delete, Insert vs. Mupsert), but this doesn't matter @@ -230,11 +231,11 @@ prop_supplyCredits :: -> NonEmpty MR.MergeCredits -> MergingTreeData SerialisedKey SerialisedValue SerialisedBlob -> IO Property -prop_supplyCredits hfs hbio threshold credits mtd = do +prop_supplyCredits hfs hbio threshold credits mtd = withRefCtx $ \refCtx -> do FS.createDirectory hfs setupPath FS.createDirectory hfs (FS.mkFsPath ["active"]) counter <- newUniqCounter 0 - withMergingTree hfs hbio resolveVal testSalt runParams setupPath counter mtd $ \tree -> do + withMergingTree hfs hbio refCtx resolveVal testSalt runParams setupPath counter mtd $ \tree -> do (MR.MergeDebt initialDebt, _) <- remainingMergeDebt tree props <- for credits $ \c -> do (MR.MergeDebt debt, _) <- remainingMergeDebt tree @@ -243,7 +244,7 @@ prop_supplyCredits hfs hbio threshold credits mtd = do pure $ property True else do leftovers <- - supplyCredits hfs hbio resolveVal testSalt runParams threshold root counter tree c + supplyCredits hfs hbio refCtx resolveVal testSalt runParams threshold root counter tree c (MR.MergeDebt debt', _) <- remainingMergeDebt tree pure $ -- semi-useful, but mainly tells us in how many steps we supplied diff --git a/test/Test/Database/LSMTree/Internal/Readers.hs b/test/Test/Database/LSMTree/Internal/Readers.hs index bdb2a4d05..dd0e833c4 100644 --- a/test/Test/Database/LSMTree/Internal/Readers.hs +++ b/test/Test/Database/LSMTree/Internal/Readers.hs @@ -53,9 +53,10 @@ tests :: TestTree tests = testGroup "Database.LSMTree.Internal.Readers" [ testProperty "prop_lockstep" $ Lockstep.runActionsBracket (Proxy @ReadersState) - mempty mempty $ \act () -> do + mempty mempty $ \act () -> + withRefCtx $ \refCtx -> do (prop, mockFS) <- FsSim.runSimHasBlockIO MockFS.empty $ \hfs hbio -> do - (prop, RealState _ mCtx) <- runRealMonad hfs hbio + (prop, RealState _ mCtx) <- runRealMonad hfs hbio refCtx (RealState 0 Nothing) act traverse_ closeReadersCtx mCtx -- close current readers pure prop @@ -371,15 +372,17 @@ trimap :: (a -> a') -> (b -> b') -> (c -> c') -> (a, b, c) -> (a', b', c') trimap f g h (a, b, c) = (f a, g b, h c) type RealMonad = ReaderT (FS.HasFS IO MockFS.HandleMock, - FS.HasBlockIO IO MockFS.HandleMock) + FS.HasBlockIO IO MockFS.HandleMock, + RefCtx) (StateT RealState IO) runRealMonad :: FS.HasFS IO MockFS.HandleMock -> FS.HasBlockIO IO MockFS.HandleMock + -> RefCtx -> RealState -> RealMonad a -> IO (a, RealState) -runRealMonad hfs hbio st = (`runStateT` st) . (`runReaderT` (hfs, hbio)) +runRealMonad hfs hbio refCtx st = (`runStateT` st) . (`runReaderT` (hfs, hbio, refCtx)) data RealState = RealState @@ -416,12 +419,12 @@ instance RunLockstep ReadersState RealMonad where runIO :: LockstepAction ReadersState a -> LookUp -> RealMonad a runIO act lu = case act of - New offset srcDatas -> ReaderT $ \(hfs, hbio) -> do + New offset srcDatas -> ReaderT $ \(hfs, hbio, refCtx) -> do RealState numRuns mCtx <- get -- if runs are still being read, they need to be cleaned up traverse_ (liftIO . closeReadersCtx) mCtx counter <- liftIO $ newUniqCounter numRuns - sources <- liftIO $ forM srcDatas (fromSourceData hfs hbio counter) + sources <- liftIO $ forM srcDatas (fromSourceData hfs hbio refCtx counter) newReaders <- liftIO $ do let offsetKey = maybe Readers.NoOffsetKey (Readers.OffsetKey . coerce) offset mreaders <- Readers.new resolve offsetKey sources @@ -447,18 +450,18 @@ runIO act lu = case act of (n, hasMore) <- Readers.dropWhileKey resolve r k pure (hasMore, (n, hasMore)) where - fromSourceData hfs hbio counter = \case + fromSourceData hfs hbio refCtx counter = \case FromWriteBufferData rd -> do n <- incrUniqCounter counter - wbblobs <- WBB.new hfs (FS.mkFsPath [show (uniqueToInt n) <> ".wb.blobs"]) + wbblobs <- WBB.new hfs refCtx (FS.mkFsPath [show (uniqueToInt n) <> ".wb.blobs"]) let kops = unRunData (serialiseRunData rd) wb <- WB.fromMap <$> traverse (traverse (WBB.addBlob hfs wbblobs)) kops pure $ Readers.FromWriteBuffer wb wbblobs FromRunData rd -> do - r <- unsafeCreateRun hfs hbio testSalt runParams (FS.mkFsPath []) counter $ serialiseRunData rd + r <- unsafeCreateRun hfs hbio refCtx testSalt runParams (FS.mkFsPath []) counter $ serialiseRunData rd pure $ Readers.FromRun r FromReadersData ty rds -> do - Readers.FromReaders ty <$> traverse (fromSourceData hfs hbio counter) rds + Readers.FromReaders ty <$> traverse (fromSourceData hfs hbio refCtx counter) rds pop = expectReaders $ \hfs r -> do (key, e, hasMore) <- Readers.pop resolve r @@ -469,7 +472,7 @@ runIO act lu = case act of (FS.HasFS IO MockFS.HandleMock -> Readers IO MockFS.HandleMock -> IO (HasMore, a)) -> RealMonad (Either () a) expectReaders f = - ReaderT $ \(hfs, _hbio) -> do + ReaderT $ \(hfs, _hbio, _refCtx) -> do get >>= \case RealState _ Nothing -> pure (Left ()) RealState n (Just (sources, readers)) -> do diff --git a/test/Test/Database/LSMTree/Internal/Run.hs b/test/Test/Database/LSMTree/Internal/Run.hs index 3056df92a..aa4ecc377 100644 --- a/test/Test/Database/LSMTree/Internal/Run.hs +++ b/test/Test/Database/LSMTree/Internal/Run.hs @@ -104,11 +104,12 @@ testSalt = 4 -- | Runs in IO, with a real file system. testSingleInsert :: FilePath -> SerialisedKey -> SerialisedValue -> Maybe SerialisedBlob -> IO () testSingleInsert sessionRoot key val mblob = + withRefCtx $ \refCtx -> FS.withIOHasBlockIO (FS.MountPoint sessionRoot) FS.defaultIOCtxParams $ \fs hbio -> do -- flush write buffer let e = case mblob of Nothing -> Insert val; Just blob -> InsertWithBlob val blob wb = Map.singleton key e - withRunAt fs hbio testSalt runParams (simplePath 42) (RunData wb) $ \_ -> do + withRunAt fs hbio refCtx testSalt runParams (simplePath 42) (RunData wb) $ \_ -> do -- check all files have been written let activeDir = sessionRoot bsKOps <- BS.readFile (activeDir "42.keyops") @@ -190,7 +191,8 @@ prop_WriteNumEntries :: -> RunData SerialisedKey SerialisedValue SerialisedBlob -> IO Property prop_WriteNumEntries fs hbio wb@(RunData m) = - withRunAt fs hbio testSalt runParams (simplePath 42) wb' $ \run -> do + withRefCtx $ \refCtx -> + withRunAt fs hbio refCtx testSalt runParams (simplePath 42) wb' $ \run -> do let !runSize = Run.size run pure . labelRunData wb' $ @@ -208,12 +210,13 @@ prop_WriteAndOpen :: -> RunData SerialisedKey SerialisedValue SerialisedBlob -> IO Property prop_WriteAndOpen fs hbio wb = - withRunAt fs hbio testSalt runParams (simplePath 1337) (serialiseRunData wb) $ \written -> + withRefCtx $ \refCtx -> + withRunAt fs hbio refCtx testSalt runParams (simplePath 1337) (serialiseRunData wb) $ \written -> withActionRegistry $ \reg -> do let paths = Run.runFsPaths written paths' = paths { runNumber = RunNumber 17} hardLinkRunFiles fs hbio reg paths paths' - loaded <- openFromDisk fs hbio (runParamCaching runParams) + loaded <- openFromDisk fs hbio refCtx (runParamCaching runParams) (runParamIndex runParams) testSalt (simplePath 17) @@ -241,12 +244,12 @@ prop_WriteAndOpenWriteBuffer :: -> FS.HasBlockIO IO h -> RunData SerialisedKey SerialisedValue SerialisedBlob -> IO Property -prop_WriteAndOpenWriteBuffer hfs hbio rd = do +prop_WriteAndOpenWriteBuffer hfs hbio rd = withRefCtx $ \refCtx -> do -- Serialise run data as write buffer: let srd = serialiseRunData rd let inPaths = WrapRunFsPaths $ simplePath 1111 let resolve (SerialisedValue x) (SerialisedValue y) = SerialisedValue (x <> y) - withRunDataAsWriteBuffer hfs resolve inPaths srd $ \wb wbb -> do + withRunDataAsWriteBuffer hfs refCtx resolve inPaths srd $ \wb wbb -> do -- Write write buffer to disk: let wbPaths = WrapRunFsPaths $ simplePath 1312 withSerialisedWriteBuffer hfs hbio wbPaths wb wbb $ do @@ -265,17 +268,17 @@ prop_WriteRunEqWriteWriteBuffer :: -> FS.HasBlockIO IO h -> RunData SerialisedKey SerialisedValue SerialisedBlob -> IO Property -prop_WriteRunEqWriteWriteBuffer hfs hbio rd = do +prop_WriteRunEqWriteWriteBuffer hfs hbio rd = withRefCtx $ \refCtx -> do -- Serialise run data as run: let srd = serialiseRunData rd let rdPaths = simplePath 1337 let rdKOpsFile = Paths.runKOpsPath rdPaths let rdBlobFile = Paths.runBlobPath rdPaths - withRunAt hfs hbio testSalt runParams rdPaths srd $ \_run -> do + withRunAt hfs hbio refCtx testSalt runParams rdPaths srd $ \_run -> do -- Serialise run data as write buffer: let f (SerialisedValue x) (SerialisedValue y) = SerialisedValue (x <> y) let inPaths = WrapRunFsPaths $ simplePath 1111 - withRunDataAsWriteBuffer hfs f inPaths srd $ \wb wbb -> do + withRunDataAsWriteBuffer hfs refCtx f inPaths srd $ \wb wbb -> do let wbPaths = WrapRunFsPaths $ simplePath 1312 let wbKOpsPath = Paths.writeBufferKOpsPath wbPaths let wbBlobPath = Paths.writeBufferBlobPath wbPaths diff --git a/test/Test/Database/LSMTree/Internal/RunReader.hs b/test/Test/Database/LSMTree/Internal/RunReader.hs index a80709b5c..6322c610c 100644 --- a/test/Test/Database/LSMTree/Internal/RunReader.hs +++ b/test/Test/Database/LSMTree/Internal/RunReader.hs @@ -93,7 +93,8 @@ prop_readAtOffset :: -> Maybe BiasedKey -> IO Property prop_readAtOffset fs hbio rd offsetKey = - withRunAt fs hbio testSalt runParams (simplePath 42) rd' $ \run -> do + withRefCtx $ \refCtx -> + withRunAt fs hbio refCtx testSalt runParams (simplePath 42) rd' $ \run -> do rhs <- readKOps (coerce offsetKey) run pure . labelRunData rd' $ @@ -137,7 +138,8 @@ prop_readAtOffsetIdempotence :: -> Maybe BiasedKey -> IO Property prop_readAtOffsetIdempotence fs hbio rd offsetKey = - withRunAt fs hbio testSalt runParams (simplePath 42) rd' $ \run -> do + withRefCtx $ \refCtx -> + withRunAt fs hbio refCtx testSalt runParams (simplePath 42) rd' $ \run -> do lhs <- readKOps (coerce offsetKey) run rhs <- readKOps (coerce offsetKey) run @@ -161,7 +163,8 @@ prop_readAtOffsetReadHead :: -> RunData BiasedKey SerialisedValue SerialisedBlob -> IO Property prop_readAtOffsetReadHead fs hbio rd = - withRunAt fs hbio testSalt runParams (simplePath 42) rd' $ \run -> do + withRefCtx $ \refCtx -> + withRunAt fs hbio refCtx testSalt runParams (simplePath 42) rd' $ \run -> do lhs <- readKOps Nothing run rhs <- case lhs of [] -> pure [] diff --git a/test/Test/Database/LSMTree/Internal/WriteBufferBlobs/FS.hs b/test/Test/Database/LSMTree/Internal/WriteBufferBlobs/FS.hs index 5a4bf3ace..06116aad6 100644 --- a/test/Test/Database/LSMTree/Internal/WriteBufferBlobs/FS.hs +++ b/test/Test/Database/LSMTree/Internal/WriteBufferBlobs/FS.hs @@ -40,11 +40,12 @@ prop_fault_WriteBufferBlobs doCreateFile ae (NoCleanupErrors releaseErrors) b1 b2 = ioProperty $ + withRefCtx $ \refCtx -> withSimErrorHasFS propPost MockFS.empty emptyErrors $ \hfs fsVar errsVar -> do when doCreateFile $ withFile hfs path (WriteMode MustBeNew) $ \_ -> pure () eith <- try @_ @FsError $ - bracket (acquire hfs errsVar) (release errsVar) $ \wbb -> do + bracket (acquire hfs refCtx errsVar) (release errsVar) $ \wbb -> do fs' <- atomically $ readTMVar fsVar let prop = propNumOpenHandles 1 fs' .&&. propNumDirEntries root 1 fs' props <- blobRoundtrips hfs errsVar wbb @@ -58,7 +59,7 @@ prop_fault_WriteBufferBlobs doCreateFile ae root = mkFsPath [] path = mkFsPath ["wbb"] - acquire hfs errsVar = withErrors errsVar openErrors $ open hfs path ae + acquire hfs refCtx errsVar = withErrors errsVar openErrors $ open hfs refCtx path ae -- Test that we can roundtrip blobs blobRoundtrips hfs errsVar wbb = withErrors errsVar errs $ do diff --git a/test/Test/Database/LSMTree/Internal/WriteBufferReader/FS.hs b/test/Test/Database/LSMTree/Internal/WriteBufferReader/FS.hs index 2316af3c0..b7f035e9f 100644 --- a/test/Test/Database/LSMTree/Internal/WriteBufferReader/FS.hs +++ b/test/Test/Database/LSMTree/Internal/WriteBufferReader/FS.hs @@ -38,8 +38,9 @@ prop_fault_WriteBufferReader :: -> Property prop_fault_WriteBufferReader (NoCleanupErrors readErrors) rdata = ioProperty $ + withRefCtx $ \refCtx -> withSimErrorHasBlockIO propPost MockFS.empty emptyErrors $ \hfs hbio fsVar errsVar -> - withRunDataAsWriteBuffer hfs resolve inPath rdata $ \wb wbb -> + withRunDataAsWriteBuffer hfs refCtx resolve inPath rdata $ \wb wbb -> withSerialisedWriteBuffer hfs hbio outPath wb wbb $ do fsBefore <- atomically $ readTMVar fsVar eith <- diff --git a/test/Test/Database/LSMTree/StateMachine.hs b/test/Test/Database/LSMTree/StateMachine.hs index d41656c10..f9da09889 100644 --- a/test/Test/Database/LSMTree/StateMachine.hs +++ b/test/Test/Database/LSMTree/StateMachine.hs @@ -61,8 +61,8 @@ import Control.Monad.Class.MonadThrow (Exception (..), Handler (..), import Control.Monad.IOSim import Control.Monad.Primitive import Control.Monad.Reader (ReaderT (..)) -import Control.RefCount (RefException, checkForgottenRefs, - ignoreForgottenRefs) +import Control.RefCount (disableForgottenRefChecks, + enableForgottenRefChecks) import Control.Tracer (Tracer, nullTracer) import Data.Bifunctor (Bifunctor (..)) import Data.Constraint (Dict (..)) @@ -174,7 +174,6 @@ propLockstep_ModelIOImpl = runActionsBracket (Proxy @(ModelState IO ModelIO.Table)) CheckCleanup - NoCheckRefs -- there are no references to check for in the ModelIO implementation acquire release (\r (session, errsVar, logVar) -> do @@ -293,7 +292,6 @@ propLockstep_RealImpl_RealFS_IO tr (QC.Fixed salt) = runActionsBracket (Proxy @(ModelState IO R.Table)) CheckCleanup - CheckRefs acquire release (\r (_, session, _, errsVar, logVar) -> do @@ -317,6 +315,7 @@ propLockstep_RealImpl_RealFS_IO tr (QC.Fixed salt) = acquire = do (tmpDir, hasFS, hasBlockIO) <- createSystemTempDirectory "prop_lockstepIO_RealImpl_RealFS" session <- R.openSession tr hasFS hasBlockIO salt (mkFsPath []) + toggleForgottenRefChecksSession CheckRefs session errsVar <- newTVarIO FSSim.emptyErrors logVar <- newTVarIO emptyLog pure (tmpDir, session, hasBlockIO, errsVar, logVar) @@ -341,8 +340,7 @@ propLockstep_RealImpl_MockFS_IO tr cleanupFlag fsFlag refsFlag (QC.Fixed salt) = runActionsBracket (Proxy @(ModelState IO R.Table)) cleanupFlag - refsFlag - (acquire_RealImpl_MockFS tr salt) + (acquire_RealImpl_MockFS tr refsFlag salt) (release_RealImpl_MockFS fsFlag) (\r (_, session, errsVar, logVar) -> do faultsVar <- newMutVar [] @@ -373,8 +371,7 @@ propLockstep_RealImpl_MockFS_IOSim tr cleanupFlag fsFlag refsFlag (QC.Fixed salt runActionsBracket (Proxy @(ModelState (IOSim RealWorld) R.Table)) cleanupFlag - refsFlag - (acquire_RealImpl_MockFS tr salt) + (acquire_RealImpl_MockFS tr refsFlag salt) (release_RealImpl_MockFS fsFlag) (\r (_, session, errsVar, logVar) -> do faultsVar <- newMutVar [] @@ -396,14 +393,16 @@ propLockstep_RealImpl_MockFS_IOSim tr cleanupFlag fsFlag refsFlag (QC.Fixed salt acquire_RealImpl_MockFS :: R.IOLike m => Tracer m R.LSMTreeTrace + -> CheckRefs -> R.Salt -> m (StrictTMVar m MockFS, Class.Session R.Table m, StrictTVar m Errors, StrictTVar m ErrorsLog) -acquire_RealImpl_MockFS tr salt = do +acquire_RealImpl_MockFS tr refsFlag salt = do fsVar <- newTMVarIO MockFS.empty errsVar <- newTVarIO FSSim.emptyErrors logVar <- newTVarIO emptyLog (hfs, hbio) <- simErrorHasBlockIOLogged fsVar errsVar logVar session <- R.openSession tr hfs hbio salt (mkFsPath []) + toggleForgottenRefChecksSession refsFlag session pure (fsVar, session, errsVar, logVar) -- | Flag that turns on\/off file system checks. @@ -2852,29 +2851,16 @@ runActionsBracket :: ) => Proxy state -> CheckCleanup - -> CheckRefs -> n st -> (st -> n prop) -> (m QC.Property -> st -> n QC.Property) -> (Lockstep state -> [(String, [FinalTag])]) -> Actions (Lockstep state) -> QC.Property -runActionsBracket p cleanupFlag refsFlag init cleanup runner tagger actions = +runActionsBracket p cleanupFlag init cleanup runner tagger actions = tagFinalState actions tagger $ QLS.runActionsBracket p init cleanup' runner actions where - cleanup' st = do - -- We want to run forgotten reference checks after cleanup, since cleanup - -- itself may lead to forgotten refs. The reference checks have the - -- crucial side effect of resetting the forgotten refs state. If we don't - -- do this then the next test run (e.g. during shrinking) will encounter a - -- false/stale forgotten refs exception. But we also have to make sure - -- that if cleanup itself fails, that we still run the reference checks. - -- 'propCleanup' will make sure to catch any exceptions that are thrown by - -- the 'cleanup' action. 'propRefs' will then definitely run afterwards so - -- that the forgotten reference checks are definitely performed. - x <- propCleanup cleanupFlag $ cleanup st - y <- propRefs refsFlag - pure (x QC..&&. y) + cleanup' st = propCleanup cleanupFlag $ cleanup st tagFinalState :: forall state. StateModel (Lockstep state) @@ -2928,10 +2914,9 @@ checkCleanupM flag cleanupAction = do -- exceptions are ignored. data CheckRefs = CheckRefs | NoCheckRefs -propRefs :: (PrimMonad m, MonadCatch m) => CheckRefs -> m Property -propRefs flag = propException "Reference exception: " <$> checkRefsM flag - -checkRefsM :: (PrimMonad m, MonadCatch m) => CheckRefs -> m (Either RefException ()) -checkRefsM flag = case flag of - CheckRefs -> try checkForgottenRefs - NoCheckRefs -> Right <$> ignoreForgottenRefs +toggleForgottenRefChecksSession :: R.IOLike m => CheckRefs -> R.Session m -> m () +toggleForgottenRefChecksSession flag (R.Types.Session session) = + R.Unsafe.withKeepSessionOpen session $ \seshEnv -> + case flag of + CheckRefs -> enableForgottenRefChecks (R.Unsafe.sessionRefCtx seshEnv) + NoCheckRefs -> disableForgottenRefChecks (R.Unsafe.sessionRefCtx seshEnv) diff --git a/test/Test/Database/LSMTree/StateMachine/DL.hs b/test/Test/Database/LSMTree/StateMachine/DL.hs index b09ed841e..f2b8e9507 100644 --- a/test/Test/Database/LSMTree/StateMachine/DL.hs +++ b/test/Test/Database/LSMTree/StateMachine/DL.hs @@ -8,7 +8,6 @@ module Test.Database.LSMTree.StateMachine.DL ( ) where import Control.Monad (void) -import Control.RefCount import Control.Tracer import qualified Data.Map.Strict as Map import Data.Typeable (Typeable) @@ -32,7 +31,7 @@ import qualified Test.QuickCheck.Random as QC import Test.QuickCheck.StateModel.Lockstep import qualified Test.QuickCheck.StateModel.Lockstep.Defaults as QLS import Test.QuickCheck.StateModel.Variables -import Test.Tasty (TestTree, testGroup, withResource) +import Test.Tasty (TestTree, testGroup) import qualified Test.Tasty.QuickCheck as QC import Test.Util.FS import Test.Util.PrettyProxy @@ -40,7 +39,12 @@ import Test.Util.PrettyProxy tests :: TestTree tests = testGroup "Test.Database.LSMTree.StateMachine.DL" [ QC.testProperty "prop_example" prop_example - , test_noSwallowedExceptions + , QC.testProperty "prop_noSwallowedExceptions" $ + -- TODO: see #781. I observed a timeout when this test was running, + -- which might have to do with shrinking taking too long. For now, we'll + -- turn off shrinking until we see a test failure that might explain + -- what's going wrong. + noShrinking prop_noSwallowedExceptions ] instance DynLogicModel (Lockstep (ModelState IO R.Table)) @@ -102,18 +106,6 @@ dl_example = do Swallowed exceptions -------------------------------------------------------------------------------} --- | See 'prop_noSwallowedExceptions'. --- --- Forgotten reference checks are disabled completely, because we allow bugs --- (like forgotten references) in exception unsafe code where we inject disk --- faults. -test_noSwallowedExceptions :: TestTree -test_noSwallowedExceptions = - withResource - (checkForgottenRefs >> disableForgottenRefChecks) - (\_ -> enableForgottenRefChecks) $ \ !_ -> - QC.testProperty "prop_noSwallowedExceptions" prop_noSwallowedExceptions - -- | Test that the @lsm-tree@ library does not swallow exceptions. -- -- A functional requirement for the @lsm-tree@ library is that all exceptions