Skip to content

Commit

Permalink
- ouroboros-network-protocols: add support for GetMeasures
Browse files Browse the repository at this point in the history
- add a property test `codecIdM` for `codecLocalTxMonitorId`

- "parametrised" -> "parameterised"
  • Loading branch information
fraser-iohk committed Jan 27, 2025
1 parent 73e48c8 commit 788630d
Show file tree
Hide file tree
Showing 15 changed files with 311 additions and 122 deletions.
80 changes: 49 additions & 31 deletions docs/network-spec/miniprotocols.tex
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,10 @@ \section{Local Tx-Monitor mini-protocol}
\newcommand{\MsgGetSizes}{\msg{MsgGetSizes}}
\newcommand{\MsgReplyGetSizes}{\msg{MsgReplyGetSizes}}

\newcommand{\GetMeasures}{\state{GetMeasures}}
\newcommand{\MsgGetMeasures}{\msg{MsgGetMeasures}}
\newcommand{\MsgReplyGetMeasures}{\msg{MsgReplyGetMeasures}}

\subsection{Description}

A mini-protocol which allows to monitor transactions in the local mempool. This
Expand All @@ -1822,25 +1826,28 @@ \subsection{State machine}
\begin{figure}[h]
\begin{tikzpicture}[->,shorten >=1pt,auto,node distance=4cm, semithick]
\tikzstyle{every state}=[fill=red,draw=none,text=white]
\node[state, mygreen, initial] (Idle) {\StIdle};
\node[state, myblue, right of=Idle] (Acquiring) {\StAcquiring};
\node[state, mygreen, right of=Acquiring] (Acquired) {\StAcquired};
\node[state, myblue, right of=Acquired] (BusyHasTx) {\StBusy\ \HasTx};
\node[state, myblue, above of=BusyHasTx] (BusyNextTx) {\StBusy\ \NextTx};
\node[state, myblue, below of=BusyHasTx] (BusyGetSizes) {\StBusy\ \GetSizes};
\node[state, below of=Idle] (Done) {\StDone};

\draw (Idle) edge node{\MsgAcquire} (Acquiring);
\draw (Acquiring) edge[bend left=45] node{\MsgAcquired} (Acquired);
\draw (Acquired) edge[bend left=45] node{\MsgAwaitAcquire} (Acquiring);
\draw (Acquired) edge[bend left=45] node{\MsgRelease} (Idle);
\draw (Acquired) edge[bend left=20] node{\MsgNextTx} (BusyNextTx);
\draw (BusyNextTx) edge[bend left=20] node{\MsgReplyNextTx} (Acquired);
\draw (Acquired) edge[bend left=20] node{\MsgHasTx} (BusyHasTx);
\draw (BusyHasTx) edge[bend left=20] node{\MsgReplyHasTx} (Acquired);
\draw (Acquired) edge[bend left=20] node{\MsgGetSizes} (BusyGetSizes);
\draw (BusyGetSizes) edge[bend left=20] node{\MsgReplyGetSizes} (Acquired);
\draw (Idle) edge node{\MsgDone} (Done);
\node[state, mygreen, initial] (Idle) {\StIdle};
\node[state, myblue, right of=Idle] (Acquiring) {\StAcquiring};
\node[state, mygreen, right of=Acquiring] (Acquired) {\StAcquired};
\node[state, myblue, right of=Acquired] (BusyHasTx) {\StBusy\ \HasTx};
\node[state, myblue, above of=BusyHasTx] (BusyNextTx) {\StBusy\ \NextTx};
\node[state, myblue, below of=BusyHasTx] (BusyGetSizes) {\StBusy\ \GetSizes};
\node[state, myblue, below of=BusyHasTx] (BusyGetMeasures) {\StBusy\ \GetMeasures};
\node[state, below of=Idle] (Done) {\StDone};

\draw (Idle) edge node{\MsgAcquire} (Acquiring);
\draw (Acquiring) edge[bend left=45] node{\MsgAcquired} (Acquired);
\draw (Acquired) edge[bend left=45] node{\MsgAwaitAcquire} (Acquiring);
\draw (Acquired) edge[bend left=45] node{\MsgRelease} (Idle);
\draw (Acquired) edge[bend left=20] node{\MsgNextTx} (BusyNextTx);
\draw (BusyNextTx) edge[bend left=20] node{\MsgReplyNextTx} (Acquired);
\draw (Acquired) edge[bend left=20] node{\MsgHasTx} (BusyHasTx);
\draw (BusyHasTx) edge[bend left=20] node{\MsgReplyHasTx} (Acquired);
\draw (Acquired) edge[bend left=20] node{\MsgGetSizes} (BusyGetSizes);
\draw (Acquired) edge[bend left=20] node{\MsgGetMeasures} (BusyGetMeasures);
\draw (BusyGetSizes) edge[bend left=20] node{\MsgReplyGetSizes} (Acquired);
\draw (BusyGetMeasures) edge[bend left=20] node{\MsgReplyGetMeasures} (Acquired);
\draw (Idle) edge node{\MsgDone} (Done);
\end{tikzpicture}
\caption{State machine of the Local Tx-Monitor mini-protocol.}
\end{figure}
Expand Down Expand Up @@ -1890,22 +1897,33 @@ \subsection{State machine}
mempool;
\item[number of transactions] the number of transactions in the mempool.
\end{description}
\item[\MsgGetMeasures{}] The client asks the server for information on the mempool's measures.
\item[\MsgReplyGetMeasures{} (Word32, Map Text (Integer, Integer))] The server responds with
the total number of transactions currently in the mempool, and a map of the measures known to
the mempool. The keys of this map are textual labels of the measure names, which should
typically be considered stable for a given node version, and the values are a pair of integers
representing the current size and maximum capacity respectively for that measure. The maximum
capacity should not be considered fixed and is likely to change due to mempool conditions. The
size should always be less than or equal to the capacity.
\end{description}
\end{description}

\begin{figure}[h]
\begin{tabular}{l|l|l|l}
\header{from state} & \header{message} & \header{parameters} & \header{to state} \\ \hline
\StIdle & \MsgAcquire & & \StAcquiring \\
\StAcquiring & \MsgAcquired & SlotNo & \StAcquired \\
\StAcquired & \MsgAwaitAcquire & & \StAcquiring \\
\StAcquired & \MsgRelease & & \StIdle \\
\StAcquired & \MsgNextTx & & \StBusy\ \NextTx\\
\StBusy\ \NextTx & \MsgReplyNextTx & (\textbf{Nothing} | \textbf{Just} $tx$) & \StAcquired\\
\StAcquired & \MsgHasTx & & \StBusy\ \HasTx\\
\StBusy\ \HasTx & \MsgReplyNextTx & Bool & \StAcquired\\
\StAcquired & \MsgGetSizes & & \StBusy\ \GetSizes\\
\StBusy\ \GetSizes & \MsgReplyGetSizes & Word32,Word32,Word32 & \StAcquired\\
\StIdle & \MsgDone & & \StDone\\
\header{from state} & \header{message} & \header{parameters} & \header{to state} \\ \hline
\StIdle & \MsgAcquire & & \StAcquiring \\
\StAcquiring & \MsgAcquired & SlotNo & \StAcquired \\
\StAcquired & \MsgAwaitAcquire & & \StAcquiring \\
\StAcquired & \MsgRelease & & \StIdle \\
\StAcquired & \MsgNextTx & & \StBusy\ \NextTx\\
\StBusy\ \NextTx & \MsgReplyNextTx & (\textbf{Nothing} | \textbf{Just} $tx$) & \StAcquired\\
\StAcquired & \MsgHasTx & & \StBusy\ \HasTx\\
\StBusy\ \HasTx & \MsgReplyNextTx & Bool & \StAcquired\\
\StAcquired & \MsgGetSizes & & \StBusy\ \GetSizes\\
\StBusy\ \GetSizes & \MsgReplyGetSizes & Word32,Word32,Word32 & \StAcquired\\
\StAcquired & \MsgGetMeasures & & \StBusy\ \GetMeasures\\
\StBusy\ \GetMeasures & \MsgReplyMeasures & Word32,Map Text (Integer,Integer) & \StAcquired\\
\StIdle & \MsgDone & & \StDone\\
\end{tabular}
\caption{Local Transaction Monitor mini-protocol messages.}
\label{fig:ltxm-messages}
Expand Down
2 changes: 2 additions & 0 deletions ouroboros-network-protocols/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* `LocalTxMonitor`,
* `LocalTxSubmission`.

* Added new `GetMeasures` message to `LocalTxMonitor`

### Non-breaking changes

## 0.13.0.0 -- 2025-01-02
Expand Down
13 changes: 7 additions & 6 deletions ouroboros-network-protocols/bench-cddl/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,13 @@ main = defaultMain
, bgroup "LocalTxMonitor Codec" $
let printMsg :: AnyMessage (LocalTxMonitor TxId Tx SlotNo) -> String
printMsg msg = case msg of
AnyMessageAndAgency tok (LocalTxMonitor.MsgAcquired _) -> show tok ++ " MsgAcquired"
AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyNextTx _) -> show tok ++ " MsgReplyNextTx"
AnyMessageAndAgency tok (LocalTxMonitor.MsgHasTx _) -> show tok ++ " MsgHasTx"
AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyHasTx _) -> show tok ++ " MsgReplyHasTx"
AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyGetSizes _) -> show tok ++ " MsgReplyGetSizes"
AnyMessageAndAgency tok message -> show tok ++ " " ++ show message
AnyMessageAndAgency tok (LocalTxMonitor.MsgAcquired _) -> show tok ++ " MsgAcquired"
AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyNextTx _) -> show tok ++ " MsgReplyNextTx"
AnyMessageAndAgency tok (LocalTxMonitor.MsgHasTx _) -> show tok ++ " MsgHasTx"
AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyHasTx _) -> show tok ++ " MsgReplyHasTx"
AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyGetSizes _) -> show tok ++ " MsgReplyGetSizes"
AnyMessageAndAgency tok (LocalTxMonitor.MsgReplyGetMeasures _) -> show tok ++ " MsgReplyGetMeasures"
AnyMessageAndAgency tok message -> show tok ++ " " ++ show message
in concat
[ benchmarkCodec ("LocalTxSubmission " ++ printMsg msg)
(const localTxMonitorCodec) maxBound msg
Expand Down
1 change: 1 addition & 0 deletions ouroboros-network-protocols/cddl/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ unit_decodeLocalTxMonitor spec =
, SomeAgency (LocalTxMonitor.SingBusy LocalTxMonitor.SingNextTx)
, SomeAgency (LocalTxMonitor.SingBusy LocalTxMonitor.SingHasTx)
, SomeAgency (LocalTxMonitor.SingBusy LocalTxMonitor.SingGetSizes)
, SomeAgency (LocalTxMonitor.SingBusy LocalTxMonitor.SingGetMeasures)
]
100

Expand Down
26 changes: 15 additions & 11 deletions ouroboros-network-protocols/cddl/specs/local-tx-monitor.cddl
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@ localTxMonitorMessage
/ msgReplyHasTx
/ msgGetSizes
/ msgReplyGetSizes
/ msgGetMeasures
/ msgReplyGetMeasures
/ msgRelease

msgDone = [0]
msgDone = [0]

msgAcquire = [1]
msgAcquired = [2, slotNo]
msgAcquire = [1]
msgAcquired = [2, slotNo]

msgAwaitAcquire = msgAcquire
msgRelease = [3]
msgNextTx = [5]
msgReplyNextTx = [6] / [6, tx]
msgHasTx = [7, txId]
msgReplyHasTx = [8, bool]
msgGetSizes = [9]
msgReplyGetSizes = [10, [word32, word32, word32]]
msgAwaitAcquire = msgAcquire
msgRelease = [3]
msgNextTx = [5]
msgReplyNextTx = [6] / [6, tx]
msgHasTx = [7, txId]
msgReplyHasTx = [8, bool]
msgGetSizes = [9]
msgReplyGetSizes = [10, [word32, word32, word32]]
msgGetMeasures = [11]
msgReplyGetMeasures = [12, word32, {* text => [integer, integer]}]
2 changes: 2 additions & 0 deletions ouroboros-network-protocols/ouroboros-network-protocols.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ library
base >=4.14 && <4.21,
bytestring >=0.10 && <0.13,
cborg >=0.2.1 && <0.3,
containers,
deepseq,
io-classes ^>=1.5.0,
nothunks,
Expand All @@ -108,6 +109,7 @@ library
serialise,
si-timers,
singletons,
text,
typed-protocols ^>=0.3,
typed-protocols-cborg ^>=0.3,
typed-protocols-stateful,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import Ouroboros.Network.Util.ShowProxy (ShowProxy (..))
-- | The kind of the local state query protocol, and the types of
-- the states in the protocol state machine.
--
-- It is parametrised over the type of block (for points), the type of queries
-- It is parameterised over the type of block (for points), the type of queries
-- and query results.
--
type LocalStateQuery :: Type -- ^ block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ data ClientStAcquired txid tx slot m a where
:: (MempoolSizeAndCapacity -> m (ClientStAcquired txid tx slot m a))
-> ClientStAcquired txid tx slot m a

-- | Ask the server about the current mempool's measures
SendMsgGetMeasures
:: (MempoolMeasures -> m (ClientStAcquired txid tx slot m a))
-> ClientStAcquired txid tx slot m a

-- | Await for a new snapshot and acquire it.
--
SendMsgAwaitAcquire
Expand Down Expand Up @@ -145,6 +150,11 @@ localTxMonitorClientPeer (LocalTxMonitorClient mClient) =
Await $ \case
MsgReplyGetSizes sizes ->
Effect $ handleStAcquired <$> stAcquired sizes
SendMsgGetMeasures stAcquired ->
Yield MsgGetMeasures $
Await $ \case
MsgReplyGetMeasures measures ->
Effect $ handleStAcquired <$> stAcquired measures
SendMsgAwaitAcquire stAcquired ->
Yield MsgAwaitAcquire $
Await $ \case
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ module Ouroboros.Network.Protocol.LocalTxMonitor.Codec
, codecLocalTxMonitorId
) where

import Control.Monad
import Control.Monad.Class.MonadST

import Network.TypedProtocol.Codec.CBOR
import Network.TypedProtocol.Core

import Data.ByteString.Lazy (ByteString)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map

import Codec.CBOR.Decoding qualified as CBOR
import Codec.CBOR.Encoding qualified as CBOR
Expand Down Expand Up @@ -64,6 +67,8 @@ codecLocalTxMonitor encodeTxId decodeTxId
CBOR.encodeListLen 2 <> CBOR.encodeWord 7 <> encodeTxId txid
MsgGetSizes ->
CBOR.encodeListLen 1 <> CBOR.encodeWord 9
MsgGetMeasures ->
CBOR.encodeListLen 1 <> CBOR.encodeWord 11
MsgAcquired slot ->
CBOR.encodeListLen 2 <> CBOR.encodeWord 2 <> encodeSlot slot
MsgReplyNextTx Nothing ->
Expand All @@ -79,6 +84,11 @@ codecLocalTxMonitor encodeTxId decodeTxId
<> CBOR.encodeWord32 (capacityInBytes sz)
<> CBOR.encodeWord32 (sizeInBytes sz)
<> CBOR.encodeWord32 (numberOfTxs sz)
MsgReplyGetMeasures measures ->
CBOR.encodeListLen 3
<> CBOR.encodeWord 12
<> CBOR.encodeWord32 (txCount measures)
<> encodeMeasureMap (measuresMap measures)

decode ::
forall s (st :: ptcl).
Expand All @@ -105,6 +115,8 @@ codecLocalTxMonitor encodeTxId decodeTxId
return (SomeMessage (MsgHasTx txid))
(SingAcquired, 1, 9) ->
return (SomeMessage MsgGetSizes)
(SingAcquired, 1, 11) ->
return (SomeMessage MsgGetMeasures)

(SingAcquiring, 2, 2) -> do
slot <- decodeSlot
Expand All @@ -128,12 +140,52 @@ codecLocalTxMonitor encodeTxId decodeTxId
let sizes = MempoolSizeAndCapacity { capacityInBytes, sizeInBytes, numberOfTxs }
return (SomeMessage (MsgReplyGetSizes sizes))

(SingBusy SingGetMeasures, 3, 12) -> do
txCount <- CBOR.decodeWord32
measuresMap <- decodeMeasureMap
let measures = MempoolMeasures { txCount, measuresMap }
pure (SomeMessage (MsgReplyGetMeasures measures))

(SingDone, _, _) -> notActiveState stok

(_, _, _) ->
fail (printf "codecLocalTxMonitor (%s, %s) unexpected key (%d, %d)"
(show (activeAgency :: ActiveAgency st)) (show stok) key len)

encodeMeasureMap :: Map MeasureName (SizeAndCapacity Integer) -> CBOR.Encoding
encodeMeasureMap m =
CBOR.encodeMapLen (fromIntegral (Map.size m)) <>
Map.foldMapWithKey f m
where
f mn sc =
encodeMeasureName mn <> encodeSizeAndCapacity sc

decodeMeasureMap :: CBOR.Decoder s (Map MeasureName (SizeAndCapacity Integer))
decodeMeasureMap = do
len <- CBOR.decodeMapLen
mapContents <- replicateM len $
(,) <$> decodeMeasureName <*> decodeSizeAndCapacity
pure $ Map.fromList mapContents

encodeMeasureName :: MeasureName -> CBOR.Encoding
encodeMeasureName (MeasureName t) = CBOR.encodeString t

decodeMeasureName :: CBOR.Decoder s MeasureName
decodeMeasureName = MeasureName <$> CBOR.decodeString

encodeSizeAndCapacity :: SizeAndCapacity Integer -> CBOR.Encoding
encodeSizeAndCapacity sc =
CBOR.encodeListLen 2 <>
CBOR.encodeInteger (size sc) <>
CBOR.encodeInteger (capacity sc)

decodeSizeAndCapacity :: CBOR.Decoder s (SizeAndCapacity Integer)
decodeSizeAndCapacity = do
_len <- CBOR.decodeListLen
size <- CBOR.decodeInteger
capacity <- CBOR.decodeInteger
pure SizeAndCapacity { size, capacity }

-- | An identity 'Codec' for the 'LocalTxMonitor' protocol. It does not do
-- any serialisation. It keeps the typed messages, wrapped in 'AnyMessage'.
--
Expand Down Expand Up @@ -164,15 +216,19 @@ codecLocalTxMonitorId =
=> Message ptcl st st' -> m (DecodeStep bytes failure m (SomeMessage st))
res msg = return (DecodeDone (SomeMessage msg) Nothing)
in return $ DecodePartial $ \bytes -> case (stok, bytes) of
(SingIdle, Just (AnyMessage msg@MsgAcquire{})) -> res msg
(SingIdle, Just (AnyMessage msg@MsgDone{})) -> res msg
(SingAcquired, Just (AnyMessage msg@MsgAwaitAcquire{})) -> res msg
(SingAcquired, Just (AnyMessage msg@MsgNextTx{})) -> res msg
(SingAcquired, Just (AnyMessage msg@MsgHasTx{})) -> res msg
(SingAcquired, Just (AnyMessage msg@MsgRelease{})) -> res msg
(SingAcquiring, Just (AnyMessage msg@MsgAcquired{})) -> res msg
(SingBusy SingNextTx, Just (AnyMessage msg@MsgReplyNextTx{})) -> res msg
(SingBusy SingHasTx, Just (AnyMessage msg@MsgReplyHasTx{})) -> res msg
(SingIdle, Just (AnyMessage msg@MsgAcquire{})) -> res msg
(SingIdle, Just (AnyMessage msg@MsgDone{})) -> res msg
(SingAcquired, Just (AnyMessage msg@MsgAwaitAcquire{})) -> res msg
(SingAcquired, Just (AnyMessage msg@MsgNextTx{})) -> res msg
(SingAcquired, Just (AnyMessage msg@MsgHasTx{})) -> res msg
(SingAcquired, Just (AnyMessage msg@MsgRelease{})) -> res msg
(SingAcquired, Just (AnyMessage msg@MsgGetSizes{})) -> res msg
(SingAcquired, Just (AnyMessage msg@MsgGetMeasures{})) -> res msg
(SingAcquiring, Just (AnyMessage msg@MsgAcquired{})) -> res msg
(SingBusy SingNextTx, Just (AnyMessage msg@MsgReplyNextTx{})) -> res msg
(SingBusy SingHasTx, Just (AnyMessage msg@MsgReplyHasTx{})) -> res msg
(SingBusy SingGetSizes, Just (AnyMessage msg@MsgReplyGetSizes{})) -> res msg
(SingBusy SingGetMeasures, Just (AnyMessage msg@MsgReplyGetMeasures{})) -> res msg

(SingDone, _) -> notActiveState stok

Expand Down
Loading

0 comments on commit 788630d

Please sign in to comment.