Skip to content

Commit

Permalink
Fix race condition in memoiseWithCycleDetection
Browse files Browse the repository at this point in the history
Keep track of the threadids that are waiting for the result of a query
and make sure that they are removed from the deps map _before_ the
result of the query is written to the result MVar.

As it was before the threadids were sometimes removed too late from the
deps map, which could mean false positive `Cyclic` exceptions.
  • Loading branch information
ollef committed Nov 16, 2021
1 parent 7a9e657 commit bdb4cff
Showing 1 changed file with 45 additions and 32 deletions.
77 changes: 45 additions & 32 deletions src/Rock/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ newtype Cyclic f = Cyclic (Some f)
instance (GShow f, Typeable f) => Exception (Cyclic f)

data MemoEntry a
= Started !ThreadId !(MVar (Maybe a))
= Started !ThreadId !(MVar (Maybe a)) !(MVar (Maybe [ThreadId]))
| Done !a

-- | Like 'memoise', but throw @'Cyclic' f@ if a query depends on itself, directly or
Expand All @@ -219,29 +219,40 @@ memoiseWithCycleDetection startedVar depsVar rules =
rules'
where
rules' (key :: f a) = do
maybeEntry <- DHashMap.lookup key <$> liftIO (readIORef startedVar)
maybeEntry <- DHashMap.lookup key <$> readIORef startedVar
case maybeEntry of
Nothing -> do
threadId <- liftIO myThreadId
valueVar <- liftIO newEmptyMVar
join $ liftIO $ atomicModifyIORef startedVar $ \started ->
case DHashMap.alterLookup (Just . fromMaybe (Started threadId valueVar)) key started of
threadId <- myThreadId
valueVar <- newEmptyMVar
waitVar <- newMVar $ Just []
join $ atomicModifyIORef startedVar $ \started ->
case DHashMap.alterLookup (Just . fromMaybe (Started threadId valueVar waitVar)) key started of
(Nothing, started') ->
( started'
, (do
value <- rules key
liftIO $ do
atomicModifyIORef startedVar $ \started'' ->
(DHashMap.insert key (Done value) started'', ())
putMVar valueVar $ Just value
return value
) `catch` \(e :: Cyclic f) ->
(liftIO $ do
join $ modifyMVar waitVar $ \maybeWaitingThreads -> do
case maybeWaitingThreads of
Nothing ->
error "impossible"

Just waitingThreads ->
return
( Nothing
, atomicModifyIORef depsVar $ \deps ->
( foldl' (flip HashMap.delete) deps waitingThreads
, ()
)
)
atomicModifyIORef startedVar $ \started'' ->
(DHashMap.insert key (Done value) started'', ())
putMVar valueVar $ Just value
return value
) `catch` \(e :: Cyclic f) -> do
atomicModifyIORef startedVar $ \started'' ->
(DHashMap.delete key started'', ())
putMVar valueVar Nothing
throwIO e
)
)

(Just entry, _started') ->
Expand All @@ -252,24 +263,26 @@ memoiseWithCycleDetection startedVar depsVar rules =
where
waitFor entry =
case entry of
Started onThread valueVar -> do
threadId <- liftIO myThreadId
join $ liftIO $ atomicModifyIORef depsVar $ \deps -> do
let
deps' =
HashMap.insert threadId onThread deps

if detectCycle threadId deps' then
( deps
, throwIO $ Cyclic $ Some key
)
else
( deps'
, do
maybeValue <- liftIO $ readMVar valueVar
liftIO $ atomicModifyIORef depsVar $ \deps'' -> (HashMap.delete threadId deps'', ())
maybe (rules' key) return maybeValue
)
Started onThread valueVar waitVar -> do
threadId <- myThreadId
modifyMVar_ waitVar $ \maybeWaitingThreads -> do
case maybeWaitingThreads of
Nothing ->
return maybeWaitingThreads
Just waitingThreads -> do
join $ atomicModifyIORef depsVar $ \deps -> do
let deps' = HashMap.insert threadId onThread deps
if detectCycle threadId deps' then
( deps
, throwIO $ Cyclic $ Some key
)
else
( deps'
, return ()
)
return $ Just $ threadId : waitingThreads
maybeValue <- readMVar valueVar
maybe (rules' key) return maybeValue

Done value ->
return value
Expand Down

0 comments on commit bdb4cff

Please sign in to comment.