Skip to content

Commit

Permalink
Add perf protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo committed Mar 1, 2023
1 parent 746e2ea commit b76cc80
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 0 deletions.
133 changes: 133 additions & 0 deletions src/perf/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import { logger } from '@libp2p/logger'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Startable } from '@libp2p/interfaces/startable'
import type { AbortOptions } from '@libp2p/interfaces'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'

export const PROTOCOL = '/perf/1.0.0'

const log = logger('libp2p:perf')

const writeBlockSize = BigInt(64 << 10)
const maxStreams = 1 << 10

export interface PerfComponents {
registrar: Registrar
connectionManager: ConnectionManager
}

export class PerfService implements Startable {
public readonly protocol: string
private readonly components: PerfComponents
private started: boolean
private readonly databuf: ArrayBuffer

constructor (components: PerfComponents) {
this.components = components
this.started = false
this.protocol = PROTOCOL
this.databuf = new ArrayBuffer(Number(writeBlockSize))
}

async start () {
await this.components.registrar.handle(this.protocol, (data: IncomingStreamData) => { void this.handleMessage(data) }, {
maxInboundStreams: maxStreams,
maxOutboundStreams: maxStreams
})
this.started = true
}

async stop () {
await this.components.registrar.unhandle(this.protocol)
this.started = false
}

isStarted () {
return this.started
}

async handleMessage (data: IncomingStreamData) {
const { stream } = data

let bytesToSendBack: bigint | null = null
for await (const buf of stream.source) {
if (bytesToSendBack === null) {
bytesToSendBack = BigInt(buf.getBigUint64(0, false))
}
// Ingest all the bufs and wait for the read side to close
}

const uint8Buf = new Uint8Array(this.databuf)

if (bytesToSendBack === null) {
throw new Error('bytesToSendBack was null')
}
await stream.sink(async function * () {
while (bytesToSendBack > 0n) {
let toSend: bigint = writeBlockSize
if (toSend > bytesToSendBack) {
toSend = bytesToSendBack
}
bytesToSendBack = bytesToSendBack - toSend
yield uint8Buf.slice(0, Number(toSend))
}
}())
}

async startPerfOnStream (peer: PeerId, sendBytes: bigint, recvBytes: bigint, options: AbortOptions = {}): Promise<void> {
log('dialing %s to %p', this.protocol, peer)

const uint8Buf = new Uint8Array(this.databuf)

const connection = await this.components.connectionManager.openConnection(peer, options)
const signal = options.signal
const stream = await connection.newStream([this.protocol], {
signal
})

// Convert sendBytes to uint64 big endian buffer
const view = new DataView(this.databuf)
view.setBigInt64(0, recvBytes, false)

await stream.sink((async function * () {
// Send the number of bytes to receive
yield uint8Buf.slice(0, 8)
// Send the number of bytes to send
while (sendBytes > 0n) {
let toSend: bigint = writeBlockSize
if (toSend > sendBytes) {
toSend = sendBytes
}
sendBytes = sendBytes - toSend
yield uint8Buf.slice(0, Number(toSend))
}
})())

// Read the received bytes
let actualRecvdBytes = BigInt(0)
for await (const buf of stream.source) {
actualRecvdBytes += BigInt(buf.length)
}

if (actualRecvdBytes !== recvBytes) {
throw new Error(`Expected to receive ${recvBytes} bytes, but received ${actualRecvdBytes}`)
}

stream.close()
}

// measureDownloadBandwidth returns the measured bandwidth in bits per second
async measureDownloadBandwidth (peer: PeerId, size: bigint) {
const now = Date.now()
await this.startPerfOnStream(peer, 0n, size)
return Number((8000n * size) / BigInt(Date.now() - now))
}

// measureUploadBandwidth returns the measured bandwidth in bit per second
async measureUploadBandwidth (peer: PeerId, size: bigint) {
const now = Date.now()
await this.startPerfOnStream(peer, size, 0n)
return Number((8000n * size) / BigInt(Date.now() - now))
}
}
93 changes: 93 additions & 0 deletions test/perf/index.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/* eslint-env mocha */

import { expect } from 'aegir/chai'
import Peers from '../fixtures/peers.js'
import { PerfService } from '../../src/perf/index.js'
import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-mocks'
import { createFromJSON } from '@libp2p/peer-id-factory'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { start, stop } from '@libp2p/interfaces/startable'
import { CustomEvent } from '@libp2p/interfaces/events'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { MemoryDatastore } from 'datastore-core'
import { DefaultComponents } from '../../src/components.js'

async function createComponents (index: number): Promise<DefaultComponents> {
const peerId = await createFromJSON(Peers[index])

const components = new DefaultComponents({
peerId,
registrar: mockRegistrar(),
upgrader: mockUpgrader(),
datastore: new MemoryDatastore()
})
components.peerStore = new PersistentPeerStore(components)
components.connectionManager = new DefaultConnectionManager(components, {
minConnections: 50,
maxConnections: 1000,
autoDialInterval: 1000,
inboundUpgradeTimeout: 1000
})

return components
}

describe('perf', () => {
let localComponents: DefaultComponents
let remoteComponents: DefaultComponents

beforeEach(async () => {
localComponents = await createComponents(0)
remoteComponents = await createComponents(1)

await Promise.all([
start(localComponents),
start(remoteComponents)
])
})

afterEach(async () => {
await Promise.all([
stop(localComponents),
stop(remoteComponents)
])
})

it('should run perf', async () => {
const client = new PerfService(localComponents)
const server = new PerfService(remoteComponents)

await start(client)
await start(server)

// simulate connection between nodes
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
localComponents.upgrader.dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
remoteComponents.upgrader.dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))

// Run Perf
await expect(client.startPerfOnStream(remoteComponents.peerId, 1n << 10n, 1n << 10n)).to.eventually.be.fulfilled()
})

it('local benchmark', async () => {
const client = new PerfService(localComponents)
const server = new PerfService(remoteComponents)

await start(client)
await start(server)

// simulate connection between nodes
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
localComponents.upgrader.dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
remoteComponents.upgrader.dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))

// Run Perf
const downloadBandwidth = await client.measureDownloadBandwidth(remoteComponents.peerId, 10n << 20n)
// eslint-disable-next-line no-console
console.log('Download bandwidth: ', downloadBandwidth >> 10, ' kiB/s')

const uploadBandwidth = await client.measureDownloadBandwidth(remoteComponents.peerId, 10n << 20n)
// eslint-disable-next-line no-console
console.log('Upload bandwidth: ', uploadBandwidth >> 10, ' kiB/s')
})
})

0 comments on commit b76cc80

Please sign in to comment.