diff --git a/docs/network-spec/miniprotocols.tex b/docs/network-spec/miniprotocols.tex index 6908d9bd65..2a48d6fd26 100644 --- a/docs/network-spec/miniprotocols.tex +++ b/docs/network-spec/miniprotocols.tex @@ -1811,6 +1811,10 @@ \section{Local Tx-Monitor mini-protocol} \newcommand{\MsgGetSizes}{\msg{MsgGetSizes}} \newcommand{\MsgReplyGetSizes}{\msg{MsgReplyGetSizes}} +\newcommand{\GetMeasures}{\state{GetMeasures}} +\newcommand{\MsgGetMeasures}{\msg{MsgGetMeasures}} +\newcommand{\MsgReplyGetMeasures}{\msg{MsgReplyGetMeasures}} + \subsection{Description} A mini-protocol which allows to monitor transactions in the local mempool. This @@ -1822,25 +1826,28 @@ \subsection{State machine} \begin{figure}[h] \begin{tikzpicture}[->,shorten >=1pt,auto,node distance=4cm, semithick] \tikzstyle{every state}=[fill=red,draw=none,text=white] - \node[state, mygreen, initial] (Idle) {\StIdle}; - \node[state, myblue, right of=Idle] (Acquiring) {\StAcquiring}; - \node[state, mygreen, right of=Acquiring] (Acquired) {\StAcquired}; - \node[state, myblue, right of=Acquired] (BusyHasTx) {\StBusy\ \HasTx}; - \node[state, myblue, above of=BusyHasTx] (BusyNextTx) {\StBusy\ \NextTx}; - \node[state, myblue, below of=BusyHasTx] (BusyGetSizes) {\StBusy\ \GetSizes}; - \node[state, below of=Idle] (Done) {\StDone}; - - \draw (Idle) edge node{\MsgAcquire} (Acquiring); - \draw (Acquiring) edge[bend left=45] node{\MsgAcquired} (Acquired); - \draw (Acquired) edge[bend left=45] node{\MsgAwaitAcquire} (Acquiring); - \draw (Acquired) edge[bend left=45] node{\MsgRelease} (Idle); - \draw (Acquired) edge[bend left=20] node{\MsgNextTx} (BusyNextTx); - \draw (BusyNextTx) edge[bend left=20] node{\MsgReplyNextTx} (Acquired); - \draw (Acquired) edge[bend left=20] node{\MsgHasTx} (BusyHasTx); - \draw (BusyHasTx) edge[bend left=20] node{\MsgReplyHasTx} (Acquired); - \draw (Acquired) edge[bend left=20] node{\MsgGetSizes} (BusyGetSizes); - \draw (BusyGetSizes) edge[bend left=20] node{\MsgReplyGetSizes} (Acquired); - \draw (Idle) edge node{\MsgDone} (Done); + \node[state, mygreen, initial] (Idle) {\StIdle}; + \node[state, myblue, right of=Idle] (Acquiring) {\StAcquiring}; + \node[state, mygreen, right of=Acquiring] (Acquired) {\StAcquired}; + \node[state, myblue, right of=Acquired] (BusyHasTx) {\StBusy\ \HasTx}; + \node[state, myblue, above of=BusyHasTx] (BusyNextTx) {\StBusy\ \NextTx}; + \node[state, myblue, below of=BusyHasTx] (BusyGetSizes) {\StBusy\ \GetSizes}; + \node[state, myblue, below of=BusyHasTx] (BusyGetMeasures) {\StBusy\ \GetMeasures}; + \node[state, below of=Idle] (Done) {\StDone}; + + \draw (Idle) edge node{\MsgAcquire} (Acquiring); + \draw (Acquiring) edge[bend left=45] node{\MsgAcquired} (Acquired); + \draw (Acquired) edge[bend left=45] node{\MsgAwaitAcquire} (Acquiring); + \draw (Acquired) edge[bend left=45] node{\MsgRelease} (Idle); + \draw (Acquired) edge[bend left=20] node{\MsgNextTx} (BusyNextTx); + \draw (BusyNextTx) edge[bend left=20] node{\MsgReplyNextTx} (Acquired); + \draw (Acquired) edge[bend left=20] node{\MsgHasTx} (BusyHasTx); + \draw (BusyHasTx) edge[bend left=20] node{\MsgReplyHasTx} (Acquired); + \draw (Acquired) edge[bend left=20] node{\MsgGetSizes} (BusyGetSizes); + \draw (Acquired) edge[bend left=20] node{\MsgGetMeasures} (BusyGetMeasures); + \draw (BusyGetSizes) edge[bend left=20] node{\MsgReplyGetSizes} (Acquired); + \draw (BusyGetMeasures) edge[bend left=20] node{\MsgReplyGetMeasures} (Acquired); + \draw (Idle) edge node{\MsgDone} (Done); \end{tikzpicture} \caption{State machine of the Local Tx-Monitor mini-protocol.} \end{figure} @@ -1890,22 +1897,33 @@ \subsection{State machine} mempool; \item[number of transactions] the number of transactions in the mempool. \end{description} + \item[\MsgGetMeasures{}] The client asks the server for information on the mempool's measures. + \item[\MsgReplyGetMeasures{} (Word32, Map Text (Integer, Integer))] The server responds with + the total number of transactions currently in the mempool, and a map of the measures known to + the mempool. The keys of this map are textual labels of the measure names, which should + typically be considered stable for a given node version, and the values are a pair of integers + representing the current size and maximum capacity respectively for that measure. The maximum + capacity should not be considered fixed and is likely to change due to mempool conditions. The + size should always be less than or equal to the capacity. + \end{description} \end{description} \begin{figure}[h] \begin{tabular}{l|l|l|l} - \header{from state} & \header{message} & \header{parameters} & \header{to state} \\ \hline - \StIdle & \MsgAcquire & & \StAcquiring \\ - \StAcquiring & \MsgAcquired & SlotNo & \StAcquired \\ - \StAcquired & \MsgAwaitAcquire & & \StAcquiring \\ - \StAcquired & \MsgRelease & & \StIdle \\ - \StAcquired & \MsgNextTx & & \StBusy\ \NextTx\\ - \StBusy\ \NextTx & \MsgReplyNextTx & (\textbf{Nothing} | \textbf{Just} $tx$) & \StAcquired\\ - \StAcquired & \MsgHasTx & & \StBusy\ \HasTx\\ - \StBusy\ \HasTx & \MsgReplyNextTx & Bool & \StAcquired\\ - \StAcquired & \MsgGetSizes & & \StBusy\ \GetSizes\\ - \StBusy\ \GetSizes & \MsgReplyGetSizes & Word32,Word32,Word32 & \StAcquired\\ - \StIdle & \MsgDone & & \StDone\\ + \header{from state} & \header{message} & \header{parameters} & \header{to state} \\ \hline + \StIdle & \MsgAcquire & & \StAcquiring \\ + \StAcquiring & \MsgAcquired & SlotNo & \StAcquired \\ + \StAcquired & \MsgAwaitAcquire & & \StAcquiring \\ + \StAcquired & \MsgRelease & & \StIdle \\ + \StAcquired & \MsgNextTx & & \StBusy\ \NextTx\\ + \StBusy\ \NextTx & \MsgReplyNextTx & (\textbf{Nothing} | \textbf{Just} $tx$) & \StAcquired\\ + \StAcquired & \MsgHasTx & & \StBusy\ \HasTx\\ + \StBusy\ \HasTx & \MsgReplyNextTx & Bool & \StAcquired\\ + \StAcquired & \MsgGetSizes & & \StBusy\ \GetSizes\\ + \StBusy\ \GetSizes & \MsgReplyGetSizes & Word32,Word32,Word32 & \StAcquired\\ + \StAcquired & \MsgGetMeasures & & \StBusy\ \GetMeasures\\ + \StBusy\ \GetMeasures & \MsgReplyMeasures & Word32,Map Text (Integer,Integer) & \StAcquired\\ + \StIdle & \MsgDone & & \StDone\\ \end{tabular} \caption{Local Transaction Monitor mini-protocol messages.} \label{fig:ltxm-messages} diff --git a/ouroboros-network-protocols/CHANGELOG.md b/ouroboros-network-protocols/CHANGELOG.md index 8234017c93..f69e59402c 100644 --- a/ouroboros-network-protocols/CHANGELOG.md +++ b/ouroboros-network-protocols/CHANGELOG.md @@ -12,6 +12,8 @@ * `LocalTxMonitor`, * `LocalTxSubmission`. +* Added new `GetMeasures` message to `LocalTxMonitor` + ### Non-breaking changes ## 0.13.0.0 -- 2025-01-02 diff --git a/ouroboros-network-protocols/bench-cddl/Main.hs b/ouroboros-network-protocols/bench-cddl/Main.hs index 4d5767cce7..ba4e45c005 100644 --- a/ouroboros-network-protocols/bench-cddl/Main.hs +++ b/ouroboros-network-protocols/bench-cddl/Main.hs @@ -147,12 +147,13 @@ main = defaultMain , bgroup "LocalTxMonitor Codec" $ let printMsg :: AnyMessage (LocalTxMonitor TxId Tx SlotNo) -> String printMsg msg = case msg of - AnyMessageAndAgency tok (LocalTxMonitor.MsgAcquired _) -> show tok ++ " MsgAcquired" - AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyNextTx _) -> show tok ++ " MsgReplyNextTx" - AnyMessageAndAgency tok (LocalTxMonitor.MsgHasTx _) -> show tok ++ " MsgHasTx" - AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyHasTx _) -> show tok ++ " MsgReplyHasTx" - AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyGetSizes _) -> show tok ++ " MsgReplyGetSizes" - AnyMessageAndAgency tok message -> show tok ++ " " ++ show message + AnyMessageAndAgency tok (LocalTxMonitor.MsgAcquired _) -> show tok ++ " MsgAcquired" + AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyNextTx _) -> show tok ++ " MsgReplyNextTx" + AnyMessageAndAgency tok (LocalTxMonitor.MsgHasTx _) -> show tok ++ " MsgHasTx" + AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyHasTx _) -> show tok ++ " MsgReplyHasTx" + AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyGetSizes _) -> show tok ++ " MsgReplyGetSizes" + AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyGetMeasures _) -> show tok ++ " MsgReplyGetMeasures" + AnyMessageAndAgency tok message -> show tok ++ " " ++ show message in concat [ benchmarkCodec ("LocalTxSubmission " ++ printMsg msg) (const localTxMonitorCodec) maxBound msg diff --git a/ouroboros-network-protocols/cddl/Main.hs b/ouroboros-network-protocols/cddl/Main.hs index dc00fcbfb2..92945e53f8 100644 --- a/ouroboros-network-protocols/cddl/Main.hs +++ b/ouroboros-network-protocols/cddl/Main.hs @@ -915,6 +915,7 @@ unit_decodeLocalTxMonitor spec = , SomeAgency (LocalTxMonitor.SingBusy LocalTxMonitor.SingNextTx) , SomeAgency (LocalTxMonitor.SingBusy LocalTxMonitor.SingHasTx) , SomeAgency (LocalTxMonitor.SingBusy LocalTxMonitor.SingGetSizes) + , SomeAgency (LocalTxMonitor.SingBusy LocalTxMonitor.SingGetMeasures) ] 100 diff --git a/ouroboros-network-protocols/cddl/specs/local-tx-monitor.cddl b/ouroboros-network-protocols/cddl/specs/local-tx-monitor.cddl index d8490db910..21f3b197da 100644 --- a/ouroboros-network-protocols/cddl/specs/local-tx-monitor.cddl +++ b/ouroboros-network-protocols/cddl/specs/local-tx-monitor.cddl @@ -15,18 +15,22 @@ localTxMonitorMessage / msgReplyHasTx / msgGetSizes / msgReplyGetSizes + / msgGetMeasures + / msgReplyGetMeasures / msgRelease -msgDone = [0] +msgDone = [0] -msgAcquire = [1] -msgAcquired = [2, slotNo] +msgAcquire = [1] +msgAcquired = [2, slotNo] -msgAwaitAcquire = msgAcquire -msgRelease = [3] -msgNextTx = [5] -msgReplyNextTx = [6] / [6, tx] -msgHasTx = [7, txId] -msgReplyHasTx = [8, bool] -msgGetSizes = [9] -msgReplyGetSizes = [10, [word32, word32, word32]] +msgAwaitAcquire = msgAcquire +msgRelease = [3] +msgNextTx = [5] +msgReplyNextTx = [6] / [6, tx] +msgHasTx = [7, txId] +msgReplyHasTx = [8, bool] +msgGetSizes = [9] +msgReplyGetSizes = [10, [word32, word32, word32]] +msgGetMeasures = [11] +msgReplyGetMeasures = [12, word32, {* text => [integer, integer]}] diff --git a/ouroboros-network-protocols/ouroboros-network-protocols.cabal b/ouroboros-network-protocols/ouroboros-network-protocols.cabal index 74d49208b7..d055e33679 100644 --- a/ouroboros-network-protocols/ouroboros-network-protocols.cabal +++ b/ouroboros-network-protocols/ouroboros-network-protocols.cabal @@ -100,6 +100,7 @@ library base >=4.14 && <4.21, bytestring >=0.10 && <0.13, cborg >=0.2.1 && <0.3, + containers, deepseq, io-classes ^>=1.5.0, nothunks, @@ -108,6 +109,7 @@ library serialise, si-timers, singletons, + text, typed-protocols ^>=0.3, typed-protocols-cborg ^>=0.3, typed-protocols-stateful, diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalStateQuery/Type.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalStateQuery/Type.hs index 2397c2b01e..2f257c1bc5 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalStateQuery/Type.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalStateQuery/Type.hs @@ -34,7 +34,7 @@ import Ouroboros.Network.Util.ShowProxy (ShowProxy (..)) -- | The kind of the local state query protocol, and the types of -- the states in the protocol state machine. -- --- It is parametrised over the type of block (for points), the type of queries +-- It is parameterised over the type of block (for points), the type of queries -- and query results. -- type LocalStateQuery :: Type -- ^ block diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Client.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Client.hs index b3efd2ad4a..647b81d813 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Client.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Client.hs @@ -91,6 +91,11 @@ data ClientStAcquired txid tx slot m a where :: (MempoolSizeAndCapacity -> m (ClientStAcquired txid tx slot m a)) -> ClientStAcquired txid tx slot m a + -- | Ask the server about the current mempool's measures + SendMsgGetMeasures + :: (MempoolMeasures -> m (ClientStAcquired txid tx slot m a)) + -> ClientStAcquired txid tx slot m a + -- | Await for a new snapshot and acquire it. -- SendMsgAwaitAcquire @@ -145,6 +150,11 @@ localTxMonitorClientPeer (LocalTxMonitorClient mClient) = Await $ \case MsgReplyGetSizes sizes -> Effect $ handleStAcquired <$> stAcquired sizes + SendMsgGetMeasures stAcquired -> + Yield MsgGetMeasures $ + Await $ \case + MsgReplyGetMeasures measures -> + Effect $ handleStAcquired <$> stAcquired measures SendMsgAwaitAcquire stAcquired -> Yield MsgAwaitAcquire $ Await $ \case diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Codec.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Codec.hs index 424e6e82a0..9f7a26334a 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Codec.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Codec.hs @@ -14,12 +14,15 @@ module Ouroboros.Network.Protocol.LocalTxMonitor.Codec , codecLocalTxMonitorId ) where +import Control.Monad import Control.Monad.Class.MonadST import Network.TypedProtocol.Codec.CBOR import Network.TypedProtocol.Core import Data.ByteString.Lazy (ByteString) +import Data.Map.Strict (Map) +import Data.Map.Strict qualified as Map import Codec.CBOR.Decoding qualified as CBOR import Codec.CBOR.Encoding qualified as CBOR @@ -64,6 +67,8 @@ codecLocalTxMonitor encodeTxId decodeTxId CBOR.encodeListLen 2 <> CBOR.encodeWord 7 <> encodeTxId txid MsgGetSizes -> CBOR.encodeListLen 1 <> CBOR.encodeWord 9 + MsgGetMeasures -> + CBOR.encodeListLen 1 <> CBOR.encodeWord 11 MsgAcquired slot -> CBOR.encodeListLen 2 <> CBOR.encodeWord 2 <> encodeSlot slot MsgReplyNextTx Nothing -> @@ -79,6 +84,11 @@ codecLocalTxMonitor encodeTxId decodeTxId <> CBOR.encodeWord32 (capacityInBytes sz) <> CBOR.encodeWord32 (sizeInBytes sz) <> CBOR.encodeWord32 (numberOfTxs sz) + MsgReplyGetMeasures measures -> + CBOR.encodeListLen 3 + <> CBOR.encodeWord 12 + <> CBOR.encodeWord32 (txCount measures) + <> encodeMeasureMap (measuresMap measures) decode :: forall s (st :: ptcl). @@ -105,6 +115,8 @@ codecLocalTxMonitor encodeTxId decodeTxId return (SomeMessage (MsgHasTx txid)) (SingAcquired, 1, 9) -> return (SomeMessage MsgGetSizes) + (SingAcquired, 1, 11) -> + return (SomeMessage MsgGetMeasures) (SingAcquiring, 2, 2) -> do slot <- decodeSlot @@ -128,12 +140,52 @@ codecLocalTxMonitor encodeTxId decodeTxId let sizes = MempoolSizeAndCapacity { capacityInBytes, sizeInBytes, numberOfTxs } return (SomeMessage (MsgReplyGetSizes sizes)) + (SingBusy SingGetMeasures, 3, 12) -> do + txCount <- CBOR.decodeWord32 + measuresMap <- decodeMeasureMap + let measures = MempoolMeasures { txCount, measuresMap } + pure (SomeMessage (MsgReplyGetMeasures measures)) + (SingDone, _, _) -> notActiveState stok (_, _, _) -> fail (printf "codecLocalTxMonitor (%s, %s) unexpected key (%d, %d)" (show (activeAgency :: ActiveAgency st)) (show stok) key len) +encodeMeasureMap :: Map MeasureName (SizeAndCapacity Integer) -> CBOR.Encoding +encodeMeasureMap m = + CBOR.encodeMapLen (fromIntegral (Map.size m)) <> + Map.foldMapWithKey f m + where + f mn sc = + encodeMeasureName mn <> encodeSizeAndCapacity sc + +decodeMeasureMap :: CBOR.Decoder s (Map MeasureName (SizeAndCapacity Integer)) +decodeMeasureMap = do + len <- CBOR.decodeMapLen + mapContents <- replicateM len $ + (,) <$> decodeMeasureName <*> decodeSizeAndCapacity + pure $ Map.fromList mapContents + +encodeMeasureName :: MeasureName -> CBOR.Encoding +encodeMeasureName (MeasureName t) = CBOR.encodeString t + +decodeMeasureName :: CBOR.Decoder s MeasureName +decodeMeasureName = MeasureName <$> CBOR.decodeString + +encodeSizeAndCapacity :: SizeAndCapacity Integer -> CBOR.Encoding +encodeSizeAndCapacity sc = + CBOR.encodeListLen 2 <> + CBOR.encodeInteger (size sc) <> + CBOR.encodeInteger (capacity sc) + +decodeSizeAndCapacity :: CBOR.Decoder s (SizeAndCapacity Integer) +decodeSizeAndCapacity = do + _len <- CBOR.decodeListLen + size <- CBOR.decodeInteger + capacity <- CBOR.decodeInteger + pure SizeAndCapacity { size, capacity } + -- | An identity 'Codec' for the 'LocalTxMonitor' protocol. It does not do -- any serialisation. It keeps the typed messages, wrapped in 'AnyMessage'. -- @@ -164,15 +216,19 @@ codecLocalTxMonitorId = => Message ptcl st st' -> m (DecodeStep bytes failure m (SomeMessage st)) res msg = return (DecodeDone (SomeMessage msg) Nothing) in return $ DecodePartial $ \bytes -> case (stok, bytes) of - (SingIdle, Just (AnyMessage msg@MsgAcquire{})) -> res msg - (SingIdle, Just (AnyMessage msg@MsgDone{})) -> res msg - (SingAcquired, Just (AnyMessage msg@MsgAwaitAcquire{})) -> res msg - (SingAcquired, Just (AnyMessage msg@MsgNextTx{})) -> res msg - (SingAcquired, Just (AnyMessage msg@MsgHasTx{})) -> res msg - (SingAcquired, Just (AnyMessage msg@MsgRelease{})) -> res msg - (SingAcquiring, Just (AnyMessage msg@MsgAcquired{})) -> res msg - (SingBusy SingNextTx, Just (AnyMessage msg@MsgReplyNextTx{})) -> res msg - (SingBusy SingHasTx, Just (AnyMessage msg@MsgReplyHasTx{})) -> res msg + (SingIdle, Just (AnyMessage msg@MsgAcquire{})) -> res msg + (SingIdle, Just (AnyMessage msg@MsgDone{})) -> res msg + (SingAcquired, Just (AnyMessage msg@MsgAwaitAcquire{})) -> res msg + (SingAcquired, Just (AnyMessage msg@MsgNextTx{})) -> res msg + (SingAcquired, Just (AnyMessage msg@MsgHasTx{})) -> res msg + (SingAcquired, Just (AnyMessage msg@MsgRelease{})) -> res msg + (SingAcquired, Just (AnyMessage msg@MsgGetSizes{})) -> res msg + (SingAcquired, Just (AnyMessage msg@MsgGetMeasures{})) -> res msg + (SingAcquiring, Just (AnyMessage msg@MsgAcquired{})) -> res msg + (SingBusy SingNextTx, Just (AnyMessage msg@MsgReplyNextTx{})) -> res msg + (SingBusy SingHasTx, Just (AnyMessage msg@MsgReplyHasTx{})) -> res msg + (SingBusy SingGetSizes, Just (AnyMessage msg@MsgReplyGetSizes{})) -> res msg + (SingBusy SingGetMeasures, Just (AnyMessage msg@MsgReplyGetMeasures{})) -> res msg (SingDone, _) -> notActiveState stok diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Server.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Server.hs index 7dc359deff..43f756ba1d 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Server.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Server.hs @@ -69,6 +69,7 @@ data ServerStAcquired txid tx slot m a = ServerStAcquired { recvMsgNextTx :: m (ServerStBusy NextTx txid tx slot m a) , recvMsgHasTx :: txid -> m (ServerStBusy HasTx txid tx slot m a) , recvMsgGetSizes :: m (ServerStBusy GetSizes txid tx slot m a) + , recvMsgGetMeasures :: m (ServerStBusy GetMeasures txid tx slot m a) , recvMsgAwaitAcquire :: m (ServerStAcquiring txid tx slot m a) , recvMsgRelease :: m (ServerStIdle txid tx slot m a) } @@ -95,6 +96,11 @@ data ServerStBusy (kind :: StBusyKind) txid tx slot m a where -> ServerStAcquired txid tx slot m a -> ServerStBusy GetSizes txid tx slot m a + SendMsgReplyGetMeasures + :: MempoolMeasures + -> ServerStAcquired txid tx slot m a + -> ServerStBusy GetMeasures txid tx slot m a + -- | Interpret a 'LocalTxMonitorServer' action sequence as a 'Peer' on the -- client-side of the 'LocalTxMonitor' protocol. -- @@ -134,6 +140,7 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) = { recvMsgNextTx , recvMsgHasTx , recvMsgGetSizes + , recvMsgGetMeasures , recvMsgAwaitAcquire , recvMsgRelease } -> Await $ \case @@ -143,6 +150,8 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) = Effect $ handleHasTx <$> recvMsgHasTx txid MsgGetSizes -> Effect $ handleGetSizes <$> recvMsgGetSizes + MsgGetMeasures -> + Effect $ handleGetMeasures <$> recvMsgGetMeasures MsgAwaitAcquire -> Effect $ handleStAcquiring <$> recvMsgAwaitAcquire MsgRelease -> @@ -171,3 +180,11 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) = SendMsgReplyGetSizes sizes serverStAcquired -> Yield (MsgReplyGetSizes sizes) $ handleStAcquired serverStAcquired + + handleGetMeasures :: + ServerStBusy GetMeasures txid tx slot m a + -> Server (LocalTxMonitor txid tx slot) NonPipelined (StBusy GetMeasures) m a + handleGetMeasures = \case + SendMsgReplyGetMeasures measures serverStAcquired -> + Yield (MsgReplyGetMeasures measures) $ + handleStAcquired serverStAcquired diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Type.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Type.hs index e531e32503..c641bcf026 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Type.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Type.hs @@ -20,35 +20,39 @@ -- already sent to the client. -- -- @ --- START --- ⇓ --- ┌───────────────┐ --- ┌──────▶│ Idle │⇒ DONE --- │ └───┬───────────┘ --- │ │ --- │ Acquire │ --- │ ▼ --- │ ┌───────────────┐ --- Release │ │ Acquiring │ --- │ └───┬───────────┘ --- │ │ ▲ --- │ Acquired │ │ AwaitAcquire --- │ ▼ │ --- │ ┌───────────┴───┐ --- └───────┤ Acquired │ --- └───┬───────────┘ --- │ ▲ --- HasTx|NextTx|GetSizes │ │ Reply (HasTx|NextTx|GetSizes) --- ▼ │ --- ┌───────────┴───┐ --- │ Busy │ --- └───────────────┘ +-- START +-- ⇓ +-- ┌───────────────┐ +-- ┌──────▶│ Idle │⇒ DONE +-- │ └───┬───────────┘ +-- │ │ +-- │ Acquire │ +-- │ ▼ +-- │ ┌───────────────┐ +-- Release │ │ Acquiring │ +-- │ └───┬───────────┘ +-- │ │ ▲ +-- │ Acquired │ │ AwaitAcquire +-- │ ▼ │ +-- │ ┌───────────┴───┐ +-- └───────┤ Acquired │ +-- └───┬───────────┘ +-- │ ▲ +-- HasTx|NextTx| │ │ Reply (HasTx|NextTx +-- GetSizes|GetMeasures │ │ |GetSizes|GetMeasures) +-- │ │ +-- ▼ │ +-- ┌───────────┴───┐ +-- │ Busy │ +-- └───────────────┘ -- @ module Ouroboros.Network.Protocol.LocalTxMonitor.Type where import Data.Kind +import Data.Map.Strict (Map) import Data.Singletons +import Data.Text (Text) import Data.Word import GHC.Generics (Generic) @@ -60,7 +64,7 @@ import Ouroboros.Network.Util.ShowProxy -- | The kind of the local transaction monitoring protocol, and the types of -- the states in the protocol state machine. -- --- It is parametrised over the type of transactions. +-- It is parameterised over the type of transactions. -- type LocalTxMonitor :: Type -> Type -> Type -> Type data LocalTxMonitor txid tx slot where @@ -132,17 +136,22 @@ data StBusyKind where -- | The server is busy looking for the current size and max capacity of the -- mempool GetSizes :: StBusyKind + -- | The server is busy looking for the current size and max capacity of the + -- mempool + GetMeasures :: StBusyKind type SingBusyKind :: StBusyKind -> Type data SingBusyKind st where - SingNextTx :: SingBusyKind NextTx - SingHasTx :: SingBusyKind HasTx - SingGetSizes :: SingBusyKind GetSizes + SingNextTx :: SingBusyKind NextTx + SingHasTx :: SingBusyKind HasTx + SingGetSizes :: SingBusyKind GetSizes + SingGetMeasures :: SingBusyKind GetMeasures type instance Sing = SingBusyKind -instance SingI NextTx where sing = SingNextTx -instance SingI HasTx where sing = SingHasTx -instance SingI GetSizes where sing = SingGetSizes +instance SingI NextTx where sing = SingNextTx +instance SingI HasTx where sing = SingHasTx +instance SingI GetSizes where sing = SingGetSizes +instance SingI GetMeasures where sing = SingGetMeasures deriving instance Show (SingBusyKind st) @@ -157,6 +166,22 @@ data MempoolSizeAndCapacity = MempoolSizeAndCapacity -- ^ The number of transactions in the mempool } deriving (Generic, Eq, Show, NFData) +data SizeAndCapacity a = SizeAndCapacity + { size :: !a + , capacity :: !a + } deriving (Generic, Eq, Show, NFData) + +instance Functor SizeAndCapacity where + fmap f (SizeAndCapacity s c) = SizeAndCapacity (f s) (f c) + +newtype MeasureName = MeasureName Text + deriving (Generic, Eq, Ord, Show, NFData) + +data MempoolMeasures = MempoolMeasures + { txCount :: !Word32 + , measuresMap :: !(Map MeasureName (SizeAndCapacity Integer)) + } deriving (Generic, Eq, Show, NFData) + instance Protocol (LocalTxMonitor txid tx slot) where -- | The messages in the transaction monitoring protocol. @@ -237,6 +262,16 @@ instance Protocol (LocalTxMonitor txid tx slot) where :: MempoolSizeAndCapacity -> Message (LocalTxMonitor txid tx slot) (StBusy GetSizes) StAcquired + -- | The client asks the server about the current mempool measures + -- + MsgGetMeasures + :: Message (LocalTxMonitor txid tx slot) StAcquired (StBusy GetMeasures) + + -- | The server responds with the current mempool measures + MsgReplyGetMeasures + :: MempoolMeasures + -> Message (LocalTxMonitor txid tx slot) (StBusy GetMeasures) StAcquired + -- | Release the acquired snapshot, in order to loop back to the idle state. -- MsgRelease @@ -260,27 +295,19 @@ instance ( NFData txid , NFData tx , NFData slot ) => NFData (Message (LocalTxMonitor txid tx slot) from to) where - rnf MsgAcquire = () - rnf (MsgAcquired slot) = rnf slot - rnf MsgAwaitAcquire = () - rnf MsgNextTx = () - rnf (MsgReplyNextTx mbTx) = rnf mbTx - rnf (MsgHasTx txid) = rnf txid - rnf (MsgReplyHasTx b) = rnf b - rnf MsgGetSizes = () - rnf (MsgReplyGetSizes msc) = rnf msc - rnf MsgRelease = () - rnf MsgDone = () - -data TokBusyKind (k :: StBusyKind) where - TokNextTx :: TokBusyKind NextTx - TokHasTx :: TokBusyKind HasTx - TokGetSizes :: TokBusyKind GetSizes - -instance NFData (TokBusyKind k) where - rnf TokNextTx = () - rnf TokHasTx = () - rnf TokGetSizes = () + rnf MsgAcquire = () + rnf (MsgAcquired slot) = rnf slot + rnf MsgAwaitAcquire = () + rnf MsgNextTx = () + rnf (MsgReplyNextTx mbTx) = rnf mbTx + rnf (MsgHasTx txid) = rnf txid + rnf (MsgReplyHasTx b) = rnf b + rnf MsgGetSizes = () + rnf (MsgReplyGetSizes msc) = rnf msc + rnf MsgGetMeasures = () + rnf (MsgReplyGetMeasures msc) = rnf msc + rnf MsgRelease = () + rnf MsgDone = () deriving instance (Show txid, Show tx, Show slot) => Show (Message (LocalTxMonitor txid tx slot) from to) diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxSubmission/Type.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxSubmission/Type.hs index 8f9719d1ed..fabc5844ac 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxSubmission/Type.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxSubmission/Type.hs @@ -27,7 +27,7 @@ import Ouroboros.Network.Util.ShowProxy -- | The kind of the local transaction-submission protocol, and the types of -- the states in the protocol state machine. -- --- It is parametrised over the type of transactions and the type of reasons +-- It is parameterised over the type of transactions and the type of reasons -- used when rejecting a transaction. -- type LocalTxSubmission :: Type -> Type -> Type diff --git a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Direct.hs b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Direct.hs index 528f9983e9..5722acad71 100644 --- a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Direct.hs +++ b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Direct.hs @@ -47,6 +47,7 @@ direct (LocalTxMonitorClient mClient) (LocalTxMonitorServer mServer) = do , recvMsgNextTx , recvMsgHasTx , recvMsgGetSizes + , recvMsgGetMeasures } = \case SendMsgRelease mClientStIdle -> do serverStIdle <- recvMsgRelease @@ -71,3 +72,8 @@ direct (LocalTxMonitorClient mClient) (LocalTxMonitorServer mServer) = do SendMsgReplyGetSizes result serverStAcquired -> do clientStAcquired <- mClientStAcquired result directAcquired serverStAcquired clientStAcquired + SendMsgGetMeasures mClientStAcquired -> do + recvMsgGetMeasures >>= \case + SendMsgReplyGetMeasures result serverStAcquired -> do + clientStAcquired <- mClientStAcquired result + directAcquired serverStAcquired clientStAcquired diff --git a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Examples.hs b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Examples.hs index 7223411a6e..f87cd22cfe 100644 --- a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Examples.hs +++ b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Examples.hs @@ -2,6 +2,7 @@ {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} module Ouroboros.Network.Protocol.LocalTxMonitor.Examples @@ -10,6 +11,7 @@ module Ouroboros.Network.Protocol.LocalTxMonitor.Examples ) where import Data.List (find) +import Data.Map qualified as Map import Data.Maybe (isJust) import Ouroboros.Network.Protocol.LocalTxMonitor.Client @@ -99,4 +101,13 @@ localTxMonitorServer txId (slot, allTxs) = , numberOfTxs = fromIntegral (length allTxs) } in pure $ SendMsgReplyGetSizes sizes (serverStAcquired txs) + , recvMsgGetMeasures = + let measures = MempoolMeasures + { txCount = fromIntegral (length allTxs) + , measuresMap = Map.fromList + [ (MeasureName "tx_size_bytes", SizeAndCapacity { size = 192, capacity = 1024 }) + , (MeasureName "ref_scripts_size_bytes", SizeAndCapacity { size = 1024, capacity = 4096 }) + ] + } + in pure $ SendMsgReplyGetMeasures measures (serverStAcquired txs) } diff --git a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Test.hs b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Test.hs index 95ce2789cb..bbbbcae7c4 100644 --- a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Test.hs +++ b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Test.hs @@ -35,6 +35,7 @@ import Ouroboros.Network.Protocol.LocalTxMonitor.Examples import Ouroboros.Network.Protocol.LocalTxMonitor.Server import Ouroboros.Network.Protocol.LocalTxMonitor.Type +import Data.Text qualified as Text import Test.ChainGenerators () import Test.Ouroboros.Network.Protocol.Utils (prop_codec_cborM, prop_codec_valid_cbor_encoding, splits2, splits3) @@ -51,6 +52,8 @@ tests = testGroup "Ouroboros.Network.Protocol" , testProperty "codec cborM" prop_codec_cborM_LocalTxMonitor , testProperty "codec valid cbor encoding" prop_codec_valid_cbor_encoding_LocalTxMonitor + , testProperty "codecIdM" prop_codecIdM_LocalTxMonitor + , testProperty "direct" prop_direct , testProperty "connect" prop_connect @@ -73,6 +76,12 @@ codec = codecLocalTxMonitor S.encode S.decode S.encode S.decode +codecId :: + ( MonadST m + ) + => Codec (LocalTxMonitor TxId Tx SlotNo) CodecFailure m (AnyMessage (LocalTxMonitor TxId Tx SlotNo)) +codecId = codecLocalTxMonitorId + -- -- Properties -- @@ -102,6 +111,12 @@ prop_codec_valid_cbor_encoding_LocalTxMonitor :: prop_codec_valid_cbor_encoding_LocalTxMonitor = prop_codec_valid_cbor_encoding codec +prop_codecIdM_LocalTxMonitor :: + AnyMessage (LocalTxMonitor TxId Tx SlotNo) + -> Bool +prop_codecIdM_LocalTxMonitor msg = + ST.runST $ prop_codecM codecId msg + -- -- Protocol Executions -- @@ -210,10 +225,27 @@ instance (Arbitrary txid, Arbitrary tx, Arbitrary slot) , AnyMessage . MsgReplyHasTx <$> arbitrary , pure $ AnyMessage MsgGetSizes , AnyMessage . MsgReplyGetSizes <$> arbitrary + , pure $ AnyMessage MsgGetMeasures + , AnyMessage . MsgReplyGetMeasures <$> arbitrary , pure $ AnyMessage MsgRelease , pure $ AnyMessage MsgDone ] +instance Arbitrary MempoolMeasures where + arbitrary = + MempoolMeasures + <$> arbitrary + <*> arbitrary + +instance Arbitrary MeasureName where + arbitrary = MeasureName . Text.pack <$> arbitrary + +instance Arbitrary (SizeAndCapacity Integer) where + arbitrary = do + Positive c <- arbitrary + s <- chooseInteger (0, c) + pure $ SizeAndCapacity s c + instance Arbitrary MempoolSizeAndCapacity where arbitrary = MempoolSizeAndCapacity @@ -224,15 +256,17 @@ instance Arbitrary MempoolSizeAndCapacity where instance (Eq txid, Eq tx, Eq slot) => Eq (AnyMessage (LocalTxMonitor txid tx slot)) where - AnyMessage MsgAcquire == AnyMessage MsgAcquire = True - AnyMessage (MsgAcquired a) == AnyMessage (MsgAcquired b) = a == b - AnyMessage MsgAwaitAcquire == AnyMessage MsgAwaitAcquire = True - AnyMessage MsgNextTx == AnyMessage MsgNextTx = True - AnyMessage (MsgReplyNextTx a) == AnyMessage (MsgReplyNextTx b) = a == b - AnyMessage (MsgHasTx a) == AnyMessage (MsgHasTx b) = a == b - AnyMessage (MsgReplyHasTx a) == AnyMessage (MsgReplyHasTx b) = a == b - AnyMessage MsgGetSizes == AnyMessage MsgGetSizes = True - AnyMessage (MsgReplyGetSizes a) == AnyMessage (MsgReplyGetSizes b) = a == b - AnyMessage MsgRelease == AnyMessage MsgRelease = True - AnyMessage MsgDone == AnyMessage MsgDone = True - AnyMessage _ == AnyMessage _ = False + AnyMessage MsgAcquire == AnyMessage MsgAcquire = True + AnyMessage (MsgAcquired a) == AnyMessage (MsgAcquired b) = a == b + AnyMessage MsgAwaitAcquire == AnyMessage MsgAwaitAcquire = True + AnyMessage MsgNextTx == AnyMessage MsgNextTx = True + AnyMessage (MsgReplyNextTx a) == AnyMessage (MsgReplyNextTx b) = a == b + AnyMessage (MsgHasTx a) == AnyMessage (MsgHasTx b) = a == b + AnyMessage (MsgReplyHasTx a) == AnyMessage (MsgReplyHasTx b) = a == b + AnyMessage MsgGetSizes == AnyMessage MsgGetSizes = True + AnyMessage (MsgReplyGetSizes a) == AnyMessage (MsgReplyGetSizes b) = a == b + AnyMessage MsgGetMeasures == AnyMessage MsgGetMeasures = True + AnyMessage (MsgReplyGetMeasures a) == AnyMessage (MsgReplyGetMeasures b) = a == b + AnyMessage MsgRelease == AnyMessage MsgRelease = True + AnyMessage MsgDone == AnyMessage MsgDone = True + AnyMessage _ == AnyMessage _ = False