Skip to content

Commit

Permalink
Do multiple updates in a single transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
kderme committed Aug 27, 2024
1 parent 1aea6f6 commit 61d22d3
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 37 deletions.
57 changes: 29 additions & 28 deletions cardano-db-sync/src/Cardano/DbSync/Fix/ConsumedBy.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{-# LANGUAGE OverloadedStrings #-}

module Cardano.DbSync.Fix.ConsumedBy (fixConsumedBy) where
module Cardano.DbSync.Fix.ConsumedBy (FixEntry, fixConsumedBy, fixEntriesConsumed) where

import Cardano.BM.Trace (Trace, logWarning)
import qualified Cardano.Chain.Block as Byron hiding (blockHash)
Expand All @@ -12,44 +12,45 @@ import Cardano.DbSync.Era.Byron.Util (blockPayload, unTxHash)
import Cardano.DbSync.Era.Util
import Cardano.DbSync.Error
import Cardano.DbSync.Types
import Cardano.Prelude hiding (length)
import Cardano.Prelude hiding (length, (.))
import Database.Persist.SqlBackend.Internal
import Ouroboros.Consensus.Byron.Ledger (ByronBlock (..))
import Ouroboros.Consensus.Cardano.Block (HardForkBlock (..))

fixConsumedBy :: SqlBackend -> Trace IO Text -> Integer -> CardanoBlock -> IO (Integer, Bool)
fixConsumedBy backend tracer lastSize cblk = case cblk of
BlockByron blk -> (\(n, bl) -> (n + lastSize, bl)) <$> fixBlock backend tracer blk
_ -> pure (lastSize, True)
type FixEntry = (DB.TxOutId, DB.TxId)

fixBlock :: SqlBackend -> Trace IO Text -> ByronBlock -> IO (Integer, Bool)
-- | Nothing when the syncing must stop.
fixConsumedBy :: SqlBackend -> Trace IO Text -> CardanoBlock -> IO (Maybe [FixEntry])
fixConsumedBy backend tracer cblk = case cblk of
BlockByron blk -> fixBlock backend tracer blk
_ -> pure Nothing

fixBlock :: SqlBackend -> Trace IO Text -> ByronBlock -> IO (Maybe [FixEntry])
fixBlock backend tracer bblk = case byronBlockRaw bblk of
Byron.ABOBBoundary _ -> pure (0, False)
Byron.ABOBBoundary _ -> pure $ Just []
Byron.ABOBBlock blk -> do
runReaderT (fix 0 (blockPayload blk)) backend
where
fix totalSize [] = pure (totalSize, False)
fix totalSize (tx : txs) = do
mn <- runExceptT $ fixTx tx
case mn of
Right n -> fix (totalSize + n) txs
Left err -> do
liftIO $
logWarning tracer $
mconcat
[ "While fixing tx "
, textShow tx
, ", encountered error "
, textShow err
]
pure (totalSize, True)
mEntries <- runReaderT (runExceptT $ mapM fixTx (blockPayload blk)) backend
case mEntries of
Right newEntries -> pure $ Just $ concat newEntries
Left err -> do
liftIO $
logWarning tracer $
mconcat
[ "While fixing block "
, textShow bblk
, ", encountered error "
, textShow err
]
pure Nothing

fixTx :: MonadIO m => Byron.TxAux -> ExceptT SyncNodeError (ReaderT SqlBackend m) Integer
fixTx :: MonadIO m => Byron.TxAux -> ExceptT SyncNodeError (ReaderT SqlBackend m) [FixEntry]
fixTx tx = do
txId <- liftLookupFail "resolving tx" $ DB.queryTxId txHash
resolvedInputs <- mapM resolveTxInputs (toList $ Byron.txInputs (Byron.taTx tx))
lift $ DB.updateListTxOutConsumedByTxId (prepUpdate txId <$> resolvedInputs)
pure $ fromIntegral $ length resolvedInputs
pure (prepUpdate txId <$> resolvedInputs)
where
txHash = unTxHash $ Crypto.serializeCborHash (Byron.taTx tx)
prepUpdate txId (_, _, txOutId, _) = (txOutId, txId)

fixEntriesConsumed :: SqlBackend -> Trace IO Text -> [FixEntry] -> IO ()
fixEntriesConsumed backend tracer = DB.runDbIohkLogging backend tracer . DB.updateListTxOutConsumedByTxId
32 changes: 23 additions & 9 deletions cardano-db-sync/src/Cardano/DbSync/Sync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -473,26 +473,40 @@ chainSyncClientFixConsumed backend tracer wrongTotalSize = Client.ChainSyncClien
{ Client.recvMsgIntersectFound = \_blk _tip ->
Client.ChainSyncClient $
pure $
Client.SendMsgRequestNext (pure ()) (clientStNext 0)
Client.SendMsgRequestNext (pure ()) (clientStNext (0, (0, [])))
, Client.recvMsgIntersectNotFound = \_tip ->
panic "Failed to find intersection with genesis."
}

clientStNext :: Integer -> Client.ClientStNext CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO Integer
clientStNext lastSize =
clientStNext :: (Integer, (Integer, [[FixEntry]])) -> Client.ClientStNext CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO Integer
clientStNext (sizeFixedTotal, (sizeFixEntries, fixEntries)) =
Client.ClientStNext
{ Client.recvMsgRollForward = \blk _tip -> Client.ChainSyncClient $ do
(lastSize', ended) <- fixConsumedBy backend tracer lastSize blk
logSize lastSize lastSize'
if ended
then pure $ Client.SendMsgDone lastSize'
else pure $ Client.SendMsgRequestNext (pure ()) (clientStNext lastSize')
mNewEntries <- fixConsumedBy backend tracer blk
case mNewEntries of
Nothing -> do
fixAccumulatedEntries fixEntries
pure $ Client.SendMsgDone (sizeFixedTotal + sizeFixEntries)
Just newEntries -> do
let sizeNewEntries = fromIntegral (length newEntries)
(sizeNewFixed, sizeUnfixed, unfixedEntries) <-
fixAccumulatedEntriesMaybe (sizeFixEntries + sizeNewEntries, newEntries : fixEntries)
let sizeNewFixedTotal = sizeFixedTotal + sizeNewFixed
logSize sizeFixedTotal sizeNewFixedTotal
pure $ Client.SendMsgRequestNext (pure ()) (clientStNext (sizeNewFixedTotal, (sizeUnfixed, unfixedEntries)))
, Client.recvMsgRollBackward = \_point _tip ->
Client.ChainSyncClient $
pure $
Client.SendMsgRequestNext (pure ()) (clientStNext lastSize)
Client.SendMsgRequestNext (pure ()) (clientStNext (sizeFixedTotal, (sizeFixEntries, fixEntries)))
}

fixAccumulatedEntries = fixEntriesConsumed backend tracer . concat . reverse

fixAccumulatedEntriesMaybe :: (Integer, [[FixEntry]]) -> IO (Integer, Integer, [[FixEntry]])
fixAccumulatedEntriesMaybe (n, entries)
| n >= 10_000 = fixAccumulatedEntries entries >> pure (n, 0, [])
| otherwise = pure (0, n, entries)

logSize :: Integer -> Integer -> IO ()
logSize lastSize newSize = do
when (newSize `div` 200_000 > lastSize `div` 200_000) $
Expand Down

0 comments on commit 61d22d3

Please sign in to comment.