Skip to content

Commit

Permalink
add a writeMany option for batch sending of SDUs
Browse files Browse the repository at this point in the history
Permit sending of multiple SDUs through a single call to sendMany for
Socket bearers
Bearers without vector IO support emulate it through multiple calls to
write.
  • Loading branch information
karknu committed Feb 11, 2025
1 parent 68e7433 commit 63f6e67
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 55 deletions.
115 changes: 88 additions & 27 deletions network-mux/bench/socket_read_write/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@

import Control.Exception (bracket)
import Control.Concurrent.Class.MonadSTM.Strict
import Data.Functor (void)
import Control.Monad (forever, replicateM_)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer
import Data.Int
import qualified Network.Socket as Socket
import Network.Socket qualified as Socket
import Network.Socket (Socket)
import qualified Data.ByteString.Lazy as BL
import Data.ByteString.Lazy qualified as BL
import Test.Tasty.Bench

import Network.Mux.Bearer
import Network.Mux
import Network.Mux.Types
import Network.Mux.Channel

import Network.Mux.Timeout (withTimeoutSerial)

sduTimeout :: DiffTime
sduTimeout = 10

Expand All @@ -28,6 +31,9 @@ numberOfPackets = 100000
totalPayloadLen :: Int64 -> Int64
totalPayloadLen sndSize = sndSize * numberOfPackets

-- | Run a client that connects to the specified addr.
-- Signals the message sndSize to the server by writing it
-- in the provided TMVar.
readBenchmark :: StrictTMVar IO Int64 -> Int64 -> Socket.SockAddr -> IO ()
readBenchmark sndSizeV sndSize addr = do
bracket
Expand All @@ -51,48 +57,103 @@ readBenchmark sndSizeV sndSize addr = do
doRead maxData chan (cnt + BL.length msg)
Nothing -> error "doRead: nullread"


-- Start the server in a separate thread
-- | Run a server that accept connections on `ad`.
startServer :: StrictTMVar IO Int64 -> Socket -> IO ()
startServer sndSizeV ad = forever $ do

(sd, _) <- Socket.accept ad
bearer <- getBearer makeSocketBearer sduTimeout nullTracer sd
sndSize <- atomically $ takeTMVar sndSizeV

let chan = muxBearerAsChannel bearer (MiniProtocolNum 42) ResponderDir
payload = BL.replicate sndSize 0xa5
-- maxData = totalPayloadLen bearer
maxData = totalPayloadLen sndSize
numberOfSdus = fromIntegral $ maxData `div` sndSize
replicateM_ numberOfSdus $ do
send chan payload

-- | Like startServer but it uses the `writeMany` function
-- for vector IO.
startServerMany :: StrictTMVar IO Int64 -> Socket -> IO ()
startServerMany sndSizeV ad = forever $ do
(sd, _) <- Socket.accept ad
bearer <- getBearer makeSocketBearer sduTimeout nullTracer sd
sndSize <- atomically $ takeTMVar sndSizeV

let maxData = totalPayloadLen sndSize
numberOfSdus = fromIntegral $ maxData `div` sndSize
numberOfCalls = numberOfSdus `div` 10
runtSdus = numberOfSdus `mod` 10

withTimeoutSerial $ \timeoutFn -> do
replicateM_ numberOfCalls $ do
let sdus = replicate 10 $ wrap $ BL.replicate sndSize 0xa5
void $ writeMany bearer timeoutFn sdus
if runtSdus > 0
then do
let sdus = replicate runtSdus $ wrap $ BL.replicate sndSize 0xa5
void $ writeMany bearer timeoutFn sdus
else return ()

where
-- wrap a 'ByteString' as 'MuxSDU'
wrap :: BL.ByteString -> MuxSDU
wrap blob = MuxSDU {
-- it will be filled when the 'MuxSDU' is send by the 'bearer'
msHeader = MuxSDUHeader {
mhTimestamp = RemoteClockModel 0,
mhNum = MiniProtocolNum 42,
mhDir = ResponderDir,
mhLength = fromIntegral $ BL.length blob
},
msBlob = blob
}

setupServer :: Socket -> IO Socket.SockAddr
setupServer ad = do
muxAddress:_ <- Socket.getAddrInfo Nothing (Just "127.0.0.1") (Just "0")
Socket.setSocketOption ad Socket.ReuseAddr 1
Socket.bind ad (Socket.addrAddress muxAddress)
addr <- Socket.getSocketName ad
Socket.listen ad 3

return addr

-- Main function to run the benchmarks
main :: IO ()
main = do
-- Start the server in a separate thread

bracket
(Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol)
Socket.close
(\ad -> do
sndSizeV <- newEmptyTMVarIO
muxAddress:_ <- Socket.getAddrInfo Nothing (Just "127.0.0.1") (Just "0")
Socket.setSocketOption ad Socket.ReuseAddr 1
Socket.bind ad (Socket.addrAddress muxAddress)
addr <- Socket.getSocketName ad
Socket.listen ad 3

withAsync (startServer sndSizeV ad) $ \said -> do
(do
ad1 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol
ad2 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol

defaultMain [
-- Suggested Max SDU size for Socket bearer
bench "Read/Write Benchmark 12288 byte SDUs" $ nfIO $ readBenchmark sndSizeV 12288 addr
-- Payload size for ChainSync's RequestNext
, bench "Read/Write Benchmark 914 byte SDUs" $ nfIO $ readBenchmark sndSizeV 914 addr
return (ad1, ad2)
)
(\(ad1, ad2) -> do
Socket.close ad1
Socket.close ad2
)
(\(ad1, ad2) -> do
sndSizeV <- newEmptyTMVarIO
sndSizeMV <- newEmptyTMVarIO
addr <- setupServer ad1
addrM <- setupServer ad2

withAsync (startServer sndSizeV ad1) $ \said -> do
withAsync (startServerMany sndSizeMV ad2) $ \saidM -> do

defaultMain [
-- Suggested Max SDU size for Socket bearer
bench "Read/Write Benchmark 12288 byte SDUs" $ nfIO $ readBenchmark sndSizeV 12288 addr
-- Payload size for ChainSync's RequestNext
, bench "Read/Write Benchmark 914 byte SDUs" $ nfIO $ readBenchmark sndSizeV 914 addr
-- Payload size for ChainSync's RequestNext
, bench "Read/Write Benchmark 10 byte SDUs" $ nfIO $ readBenchmark sndSizeV 10 addr
]
cancel said
, bench "Read/Write Benchmark 10 byte SDUs" $ nfIO $ readBenchmark sndSizeV 10 addr

-- Send batches of SDUs at the same time
, bench "Read/Write-Many Benchmark 12288 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 12288 addrM
, bench "Read/Write-Many Benchmark 914 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 914 addrM
, bench "Read/Write-Many Benchmark 10 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 10 addrM
]
cancel said
cancel saidM
)
7 changes: 4 additions & 3 deletions network-mux/src/Network/Mux/Bearer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import Control.Monad.Class.MonadTime.SI
import Control.Tracer (Tracer)

import Data.ByteString.Lazy qualified as BL
import Network.Socket (Socket)
import Network.Socket (getSocketOption, SocketOption (..), Socket)
#if defined(mingw32_HOST_OS)
import System.Win32 (HANDLE)
#endif
Expand Down Expand Up @@ -58,10 +58,11 @@ pureBearer f = \sduTimeout tr fd -> pure (f sduTimeout tr fd)
makeSocketBearer :: MakeBearer IO Socket
makeSocketBearer = MakeBearer $ (\sduTimeout tr fd -> do
readBuffer <- newTVarIO BL.empty
return $ socketAsMuxBearer size readBuffer bufSize sduTimeout tr fd)
batch <- getSocketOption fd SendBuffer
return $ socketAsMuxBearer size batch readBuffer bufSize sduTimeout tr fd)
where
size = SDUSize 12_288
bufSize = 16*1024
bufSize = 16_384

makePipeChannelBearer :: MakeBearer IO PipeChannel
makePipeChannelBearer = MakeBearer $ pureBearer (\_ -> pipeAsMuxBearer size)
Expand Down
12 changes: 10 additions & 2 deletions network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,10 @@ attenuationChannelAsMuxBearer :: forall m.
-> MuxBearer m
attenuationChannelAsMuxBearer sduSize sduTimeout muxTracer chan =
MuxBearer {
read = readMux,
write = writeMux,
read = readMux,
write = writeMux,
writeMany = writeMuxMany,
batchSize = fromIntegral $ getSDUSize sduSize,
sduSize
}
where
Expand Down Expand Up @@ -297,6 +299,12 @@ attenuationChannelAsMuxBearer sduSize sduTimeout muxTracer chan =
traceWith muxTracer MuxTraceSendEnd
return ts

writeMuxMany :: TimeoutFn m -> [MuxSDU] -> m Time
writeMuxMany timeoutFn sdus = do
ts <- getMonotonicTime
mapM_ (writeMux timeoutFn) sdus
return ts

--
-- Trace
--
Expand Down
14 changes: 11 additions & 3 deletions network-mux/src/Network/Mux/Bearer/NamedPipe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ namedPipeAsBearer :: Mx.SDUSize
-> MuxBearer IO
namedPipeAsBearer sduSize tracer h =
Mx.MuxBearer {
Mx.read = readNamedPipe,
Mx.write = writeNamedPipe,
Mx.sduSize = sduSize
Mx.read = readNamedPipe,
Mx.write = writeNamedPipe,
Mx.writeMany = writeNamedPipeMany,
Mx.sduSize = sduSize
Mx.batchSize = fromIntegral $ getSDUSize sduSize
}
where
readNamedPipe :: Mx.TimeoutFn IO -> IO (Mx.MuxSDU, Time)
Expand Down Expand Up @@ -81,3 +83,9 @@ namedPipeAsBearer sduSize tracer h =
`catch` Mx.handleIOException "writeHandle errored"
traceWith tracer Mx.MuxTraceSendEnd
return ts

writeNamedPipeMany :: Mx.TimeoutFn IO -> [Mx.MuxSDU] -> IO Time
writeNamedPipeMany timeoutFn sdus = do
ts <- getMonotonicTime
mapM_ (writeNamedPipe timeoutFn) sdus
return ts
14 changes: 11 additions & 3 deletions network-mux/src/Network/Mux/Bearer/Pipe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ pipeAsMuxBearer
-> MuxBearer IO
pipeAsMuxBearer sduSize tracer channel =
Mx.MuxBearer {
Mx.read = readPipe,
Mx.write = writePipe,
Mx.sduSize = sduSize
Mx.read = readPipe,
Mx.write = writePipe,
Mx.writeMany = writePipeMany,
Mx.sduSize = sduSize,
Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize
}
where
readPipe :: Mx.TimeoutFn IO -> IO (Mx.MuxSDU, Time)
Expand Down Expand Up @@ -117,3 +119,9 @@ pipeAsMuxBearer sduSize tracer channel =
traceWith tracer Mx.MuxTraceSendEnd
return ts

writePipeMany :: Mx.TimeoutFn IO -> [Mx.MuxSDU] -> IO Time
writePipeMany timeoutFn sdus = do
ts <- getMonotonicTime
mapM_ (writePipe timeoutFn) sdus
return ts

10 changes: 9 additions & 1 deletion network-mux/src/Network/Mux/Bearer/Queues.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ queueChannelAsMuxBearer sduSize tracer QueueChannel { writeQueue, readQueue } =
Mx.MuxBearer {
Mx.read = readMux,
Mx.write = writeMux,
Mx.sduSize = sduSize
Mx.writeMany = writeMuxMany,
Mx.sduSize = sduSize,
Mx.batchSize = 2 * (fromIntegral $ Mx.getSDUSize sduSize)
}
where
readMux :: Mx.TimeoutFn m -> m (Mx.MuxSDU, Time)
Expand All @@ -69,3 +71,9 @@ queueChannelAsMuxBearer sduSize tracer QueueChannel { writeQueue, readQueue } =
traceWith tracer Mx.MuxTraceSendEnd
return ts

writeMuxMany :: Mx.TimeoutFn m -> [Mx.MuxSDU] -> m Time
writeMuxMany timeoutFn sdus = do
ts <- getMonotonicTime
mapM_ (writeMux timeoutFn) sdus
return ts

43 changes: 39 additions & 4 deletions network-mux/src/Network/Mux/Bearer/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import Control.Monad.Class.MonadTimer.SI hiding (timeout)
import Network.Socket qualified as Socket
#if !defined(mingw32_HOST_OS)
import Network.Socket.ByteString.Lazy qualified as Socket (recv, sendAll)
import Network.Socket.ByteString qualified as Socket (sendMany)
#else
import System.Win32.Async.Socket.ByteString.Lazy qualified as Win32.Async
#endif
Expand Down Expand Up @@ -46,17 +47,20 @@ import Network.Mux.TCPInfo (SocketOption (TCPInfoSocketOption))
--
socketAsMuxBearer
:: Mx.SDUSize
-> Int
-> StrictTVar IO BL.ByteString
-> Int64
-> DiffTime
-> Tracer IO Mx.MuxTrace
-> Socket.Socket
-> MuxBearer IO
socketAsMuxBearer sduSize readBuffer readBufferSize sduTimeout tracer sd =
socketAsMuxBearer sduSize batchSize readBuffer readBufferSize sduTimeout tracer sd =
Mx.MuxBearer {
Mx.read = readSocket,
Mx.write = writeSocket,
Mx.sduSize = sduSize
Mx.read = readSocket,
Mx.write = writeSocket,
Mx.writeMany = writeSocketMany,
Mx.sduSize = sduSize,
Mx.batchSize = batchSize
}
where
hdrLenght = 8
Expand Down Expand Up @@ -166,3 +170,34 @@ socketAsMuxBearer sduSize readBuffer readBufferSize sduTimeout tracer sd =
#endif
return ts

writeSocketMany :: Mx.TimeoutFn IO -> [Mx.MuxSDU] -> IO Time
#if defined(mingw32_HOST_OS)
writeSocketMany timeout sdus = do
ts <- getMonotonicTime
mapM_ (writeSocket timeout) sdus
return ts
#else
writeSocketMany timeout sdus = do
ts <- getMonotonicTime
let ts32 = Mx.timestampMicrosecondsLow32Bits ts
buf = map (Mx.encodeMuxSDU .
(\sdu -> Mx.setTimestamp sdu (Mx.RemoteClockModel ts32))) sdus
r <- timeout ((fromIntegral $ length sdus) * sduTimeout) $
Socket.sendMany sd (concatMap BL.toChunks buf)
`catch` Mx.handleIOException "sendAll errored"
case r of
Nothing -> do
traceWith tracer Mx.MuxTraceSDUWriteTimeoutException
throwIO $ Mx.MuxError Mx.MuxSDUWriteTimeout "Mux SDU Timeout"
Just _ -> do
traceWith tracer Mx.MuxTraceSendEnd
#if defined(linux_HOST_OS) && defined(MUX_TRACE_TCPINFO)
-- If it was possible to detect if the TraceTCPInfo was
-- enable we wouldn't have to hide the getSockOpt
-- syscall in this ifdef. Instead we would only call it if
-- we knew that the information would be traced.
tcpi <- Socket.getSockOpt sd TCPInfoSocketOption
traceWith tracer $ Mx.TraceTCPInfo tcpi (sum $ map (Mx.mhLength . Mx.msHeader) sdus)
#endif
return ts
#endif
Loading

0 comments on commit 63f6e67

Please sign in to comment.