Skip to content

Commit

Permalink
Merge pull request #1635 from IntersectMBO/cmdv-refactor-shelley-insert
Browse files Browse the repository at this point in the history
Refactor Shelley/Insert.hs to "Universal" used by multiple eras
  • Loading branch information
Cmdv authored Mar 5, 2024
2 parents 15656e4 + 68dbf7b commit 77cd794
Show file tree
Hide file tree
Showing 19 changed files with 2,693 additions and 2,016 deletions.
21 changes: 14 additions & 7 deletions cardano-db-sync/cardano-db-sync.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,16 @@ library
Cardano.DbSync.Era.Byron.Util
Cardano.DbSync.Era.Cardano.Insert
Cardano.DbSync.Era.Cardano.Util
Cardano.DbSync.Era.Shelley.Adjust
Cardano.DbSync.Era.Shelley.Generic
Cardano.DbSync.Era.Shelley.Generic.Block
Cardano.DbSync.Era.Shelley.Generic.EpochUpdate
Cardano.DbSync.Era.Shelley.Generic.Metadata
Cardano.DbSync.Era.Shelley.Generic.ParamProposal
Cardano.DbSync.Era.Shelley.Generic.ProtoParams
Cardano.DbSync.Era.Shelley.Generic.Rewards
Cardano.DbSync.Era.Shelley.Generic.Script
Cardano.DbSync.Era.Shelley.Generic.ScriptData
Cardano.DbSync.Era.Shelley.Generic.StakeDist
Cardano.DbSync.Era.Shelley.Generic.Metadata
Cardano.DbSync.Era.Shelley.Generic.ParamProposal
Cardano.DbSync.Era.Shelley.Generic.Tx
Cardano.DbSync.Era.Shelley.Generic.Tx.Allegra
Cardano.DbSync.Era.Shelley.Generic.Tx.Alonzo
Expand All @@ -85,11 +84,19 @@ library
Cardano.DbSync.Era.Shelley.Generic.Util
Cardano.DbSync.Era.Shelley.Generic.Witness
Cardano.DbSync.Era.Shelley.Genesis
Cardano.DbSync.Era.Shelley.Insert
Cardano.DbSync.Era.Shelley.Insert.Epoch
Cardano.DbSync.Era.Shelley.Insert.Grouped
Cardano.DbSync.Era.Shelley.Query
Cardano.DbSync.Era.Shelley.Validate
Cardano.DbSync.Era.Universal.Adjust
Cardano.DbSync.Era.Universal.Block
Cardano.DbSync.Era.Universal.Epoch
Cardano.DbSync.Era.Universal.Validate
Cardano.DbSync.Era.Universal.Insert.Certificate
Cardano.DbSync.Era.Universal.Insert.GovAction
Cardano.DbSync.Era.Universal.Insert.Grouped
Cardano.DbSync.Era.Universal.Insert.LedgerEvent
Cardano.DbSync.Era.Universal.Insert.Other
Cardano.DbSync.Era.Universal.Insert.Pool
Cardano.DbSync.Era.Universal.Insert.Tx


-- Temporary debugging validation
Cardano.DbSync.Era.Shelley.ValidateWithdrawal
Expand Down
6 changes: 3 additions & 3 deletions cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import Cardano.DbSync.Api.Types
import Cardano.DbSync.Era.Shelley.Generic.Tx.Babbage (fromTxOut)
import Cardano.DbSync.Era.Shelley.Generic.Tx.Types (DBPlutusScript)
import qualified Cardano.DbSync.Era.Shelley.Generic.Util as Generic
import qualified Cardano.DbSync.Era.Shelley.Insert as Insert
import Cardano.DbSync.Era.Shelley.Insert.Grouped
import Cardano.DbSync.Era.Universal.Insert.Grouped
import Cardano.DbSync.Era.Universal.Insert.Tx (insertTxOut)
import Cardano.DbSync.Era.Util
import Cardano.DbSync.Error
import Cardano.DbSync.Ledger.State
Expand Down Expand Up @@ -171,7 +171,7 @@ prepareTxOut syncEnv txCache (TxIn txHash (TxIx index), txOut) = do
let txHashByteString = Generic.safeHashToByteString $ unTxId txHash
let genTxOut = fromTxOut index txOut
txId <- queryTxIdWithCache txCache txHashByteString
Insert.prepareTxOut trce cache iopts (txId, txHashByteString) genTxOut
insertTxOut trce cache iopts (txId, txHashByteString) genTxOut
where
trce = getTrace syncEnv
cache = envCache syncEnv
Expand Down
127 changes: 17 additions & 110 deletions cardano-db-sync/src/Cardano/DbSync/Default.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,15 @@ import qualified Cardano.Db as DB
import Cardano.DbSync.Api
import Cardano.DbSync.Api.Ledger
import Cardano.DbSync.Api.Types (ConsistentLevel (..), InsertOptions (..), LedgerEnv (..), SyncEnv (..), SyncOptions (..))
import Cardano.DbSync.Cache.Types (textShowStats)
import Cardano.DbSync.Epoch (epochHandler)
import Cardano.DbSync.Era.Byron.Insert (insertByronBlock)
import Cardano.DbSync.Era.Cardano.Insert (insertEpochSyncTime)
import Cardano.DbSync.Era.Shelley.Adjust (adjustEpochRewards)
import qualified Cardano.DbSync.Era.Shelley.Generic as Generic
import Cardano.DbSync.Era.Shelley.Insert (insertShelleyBlock, mkAdaPots)
import Cardano.DbSync.Era.Shelley.Insert.Epoch (insertInstantRewards, insertPoolDepositRefunds, insertRewards)
import Cardano.DbSync.Era.Shelley.Validate (validateEpochRewards)
import Cardano.DbSync.Era.Universal.Block (insertBlockUniversal)
import Cardano.DbSync.Era.Universal.Epoch (hasEpochStartEvent, hasNewEpochEvent)
import Cardano.DbSync.Era.Universal.Insert.Certificate (mkAdaPots)
import Cardano.DbSync.Era.Universal.Insert.LedgerEvent (insertBlockLedgerEvents)
import Cardano.DbSync.Error
import Cardano.DbSync.Fix.EpochStake
import Cardano.DbSync.Ledger.Event (LedgerEvent (..))
import Cardano.DbSync.Ledger.State (applyBlockAndSnapshot, defaultApplyResult)
import Cardano.DbSync.Ledger.Types (ApplyResult (..))
import Cardano.DbSync.LocalStateQuery
Expand All @@ -37,17 +34,15 @@ import Cardano.DbSync.Util
import Cardano.DbSync.Util.Constraint (addConstraintsIfNotExist)
import qualified Cardano.Ledger.Alonzo.Scripts as Ledger
import Cardano.Ledger.Shelley.AdaPots as Shelley
import Cardano.Node.Configuration.Logging (Trace)
import Cardano.Prelude
import Cardano.Slotting.Slot (EpochNo (..), SlotNo)
import Control.Monad.Logger (LoggingT)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Trans.Except.Extra (newExceptT)
import qualified Data.ByteString.Short as SBS
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Data.Strict.Maybe as Strict
import Database.Persist.SqlBackend.Internal
import Database.Persist.SqlBackend.Internal.StatementCache
import Ouroboros.Consensus.Cardano.Block (HardForkBlock (..))
import qualified Ouroboros.Consensus.HardFork.Combinator as Consensus
import Ouroboros.Network.Block (blockHash, blockNo, getHeaderFields, headerFieldBlockNo, unBlockNo)
Expand All @@ -59,15 +54,16 @@ insertListBlocks ::
insertListBlocks synEnv blocks = do
DB.runDbIohkLogging (envBackend synEnv) tracer
. runExceptT
$ traverse_ (applyAndInsertBlockMaybe synEnv) blocks
$ traverse_ (applyAndInsertBlockMaybe synEnv tracer) blocks
where
tracer = getTrace synEnv

applyAndInsertBlockMaybe ::
SyncEnv ->
Trace IO Text ->
CardanoBlock ->
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
applyAndInsertBlockMaybe syncEnv cblk = do
applyAndInsertBlockMaybe syncEnv tracer cblk = do
bl <- liftIO $ isConsistent syncEnv
(!applyRes, !tookSnapshot) <- liftIO (mkApplyResult bl)
if bl
Expand Down Expand Up @@ -100,8 +96,6 @@ applyAndInsertBlockMaybe syncEnv cblk = do
liftIO $ logInfo tracer $ "Reached " <> textShow epochNo
_ -> pure ()
where
tracer = getTrace syncEnv

mkApplyResult :: Bool -> IO (ApplyResult, Bool)
mkApplyResult isCons = do
case envLedgerEnv syncEnv of
Expand Down Expand Up @@ -135,12 +129,12 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
let !details = apSlotDetails applyResult
let !withinTwoMin = isWithinTwoMin details
let !withinHalfHour = isWithinHalfHour details
insertLedgerEvents syncEnv (sdEpochNo details) (apEvents applyResult)
insertBlockLedgerEvents syncEnv (sdEpochNo details) (apEvents applyResult)
let isNewEpochEvent = hasNewEpochEvent (apEvents applyResult)
let isStartEventOrRollback = hasEpochStartEvent (apEvents applyResult) || firstAfterRollback
let isMember poolId = Set.member poolId (apPoolsRegistered applyResult)
let insertShelley blk =
insertShelleyBlock
let insertBlockUniversal' blk =
insertBlockUniversal
syncEnv
isStartEventOrRollback
withinTwoMin
Expand All @@ -158,27 +152,27 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
insertByronBlock syncEnv isStartEventOrRollback blk details
BlockShelley blk ->
newExceptT $
insertShelley $
insertBlockUniversal' $
Generic.fromShelleyBlock blk
BlockAllegra blk ->
newExceptT $
insertShelley $
insertBlockUniversal' $
Generic.fromAllegraBlock blk
BlockMary blk ->
newExceptT $
insertShelley $
insertBlockUniversal' $
Generic.fromMaryBlock blk
BlockAlonzo blk ->
newExceptT $
insertShelley $
insertBlockUniversal' $
Generic.fromAlonzoBlock (ioPlutusExtra iopts) (getPrices applyResult) blk
BlockBabbage blk ->
newExceptT $
insertShelley $
insertBlockUniversal' $
Generic.fromBabbageBlock (ioPlutusExtra iopts) (getPrices applyResult) blk
BlockConway blk ->
newExceptT $
insertShelley $
insertBlockUniversal' $
Generic.fromConwayBlock (ioPlutusExtra iopts) (getPrices applyResult) blk
-- update the epoch
updateEpoch details isNewEpochEvent
Expand Down Expand Up @@ -231,90 +225,3 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
isWithinHalfHour sd = isSyncedWithinSeconds sd 1800 == SyncFollowing

blkNo = headerFieldBlockNo $ getHeaderFields cblk

-- -------------------------------------------------------------------------------------------------

insertLedgerEvents ::
(MonadBaseControl IO m, MonadIO m) =>
SyncEnv ->
EpochNo ->
[LedgerEvent] ->
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
insertLedgerEvents syncEnv currentEpochNo@(EpochNo curEpoch) =
mapM_ handler
where
tracer = getTrace syncEnv
cache = envCache syncEnv
ntw = getNetwork syncEnv

subFromCurrentEpoch :: Word64 -> EpochNo
subFromCurrentEpoch m =
if unEpochNo currentEpochNo >= m
then EpochNo $ unEpochNo currentEpochNo - m
else EpochNo 0

toSyncState :: SyncState -> DB.SyncState
toSyncState SyncLagging = DB.SyncLagging
toSyncState SyncFollowing = DB.SyncFollowing

handler ::
(MonadBaseControl IO m, MonadIO m) =>
LedgerEvent ->
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
handler ev =
case ev of
LedgerNewEpoch en ss -> do
lift $
insertEpochSyncTime en (toSyncState ss) (envEpochSyncTime syncEnv)
sqlBackend <- lift ask
persistantCacheSize <- liftIO $ statementCacheSize $ connStmtMap sqlBackend
liftIO . logInfo tracer $ "Persistant SQL Statement Cache size is " <> textShow persistantCacheSize
stats <- liftIO $ textShowStats cache
liftIO . logInfo tracer $ stats
liftIO . logInfo tracer $ "Starting epoch " <> textShow (unEpochNo en)
LedgerStartAtEpoch en ->
-- This is different from the previous case in that the db-sync started
-- in this epoch, for example after a restart, instead of after an epoch boundary.
liftIO . logInfo tracer $ "Starting at epoch " <> textShow (unEpochNo en)
LedgerDeltaRewards _e rwd -> do
let rewards = Map.toList $ Generic.unRewards rwd
insertRewards syncEnv ntw (subFromCurrentEpoch 2) currentEpochNo cache (Map.toList $ Generic.unRewards rwd)
-- This event is only created when it's not empty, so we don't need to check for null here.
liftIO . logInfo tracer $ "Inserted " <> show (length rewards) <> " Delta rewards"
LedgerIncrementalRewards _ rwd -> do
let rewards = Map.toList $ Generic.unRewards rwd
insertRewards syncEnv ntw (subFromCurrentEpoch 1) (EpochNo $ curEpoch + 1) cache rewards
LedgerRestrainedRewards e rwd creds ->
lift $ adjustEpochRewards tracer ntw cache e rwd creds
LedgerTotalRewards _e rwd ->
lift $ validateEpochRewards tracer ntw (subFromCurrentEpoch 2) currentEpochNo rwd
LedgerAdaPots _ ->
pure () -- These are handled separately by insertBlock
LedgerMirDist rwd -> do
unless (Map.null rwd) $ do
let rewards = Map.toList rwd
insertInstantRewards ntw (subFromCurrentEpoch 1) currentEpochNo cache rewards
liftIO . logInfo tracer $ "Inserted " <> show (length rewards) <> " Mir rewards"
LedgerPoolReap en drs ->
unless (Map.null $ Generic.unRewards drs) $ do
insertPoolDepositRefunds syncEnv en drs
LedgerDeposits {} -> pure ()

hasEpochStartEvent :: [LedgerEvent] -> Bool
hasEpochStartEvent = any isNewEpoch
where
isNewEpoch :: LedgerEvent -> Bool
isNewEpoch le =
case le of
LedgerNewEpoch {} -> True
LedgerStartAtEpoch {} -> True
_otherwise -> False

hasNewEpochEvent :: [LedgerEvent] -> Bool
hasNewEpochEvent = any isNewEpoch
where
isNewEpoch :: LedgerEvent -> Bool
isNewEpoch le =
case le of
LedgerNewEpoch {} -> True
_otherwise -> False
Loading

0 comments on commit 77cd794

Please sign in to comment.