Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

chore: complete pubsub tests #81

Merged
merged 3 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/pubsub/tests/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
/* eslint-env mocha */
'use strict'

const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')

const pDefer = require('p-defer')
Expand Down
306 changes: 306 additions & 0 deletions src/pubsub/tests/connection-handlers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const pDefer = require('p-defer')
const pWaitFor = require('p-wait-for')
const uint8ArrayToString = require('uint8arrays/to-string')

const { expectSet } = require('./utils')

module.exports = (common) => {
describe('pubsub connection handlers', () => {
let psA, psB

describe('nodes send state on connection', () => {
// Create pubsub nodes and connect them
before(async () => {
[psA, psB] = await common.setup(2)

expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)

// Start pubsub
psA.start()
psB.start()
})

// Make subscriptions prior to nodes connected
before(() => {
psA.subscribe('Za')
psB.subscribe('Zb')

expect(psA.peers.size).to.equal(0)
expectSet(psA.subscriptions, ['Za'])
expect(psB.peers.size).to.equal(0)
expectSet(psB.subscriptions, ['Zb'])
})

after(async () => {
sinon.restore()
await common.teardown()
})

it('existing subscriptions are sent upon peer connection', async function () {
this.timeout(10e3)

await Promise.all([
psA._libp2p.dial(psB.peerId),
new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)),
new Promise((resolve) => psB.once('pubsub:subscription-change', resolve))
])

expect(psA.peers.size).to.equal(1)
expect(psB.peers.size).to.equal(1)

expectSet(psA.subscriptions, ['Za'])
expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()])

expectSet(psB.subscriptions, ['Zb'])
expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()])
})
})

describe('pubsub started before connect', () => {
// Create pubsub nodes and start them
beforeEach(async () => {
[psA, psB] = await common.setup(2)

psA.start()
psB.start()
})

afterEach(async () => {
sinon.restore()

await common.teardown()
})

it('should get notified of connected peers on dial', async () => {
const connection = await psA._libp2p.dial(psB.peerId)
expect(connection).to.exist()

return Promise.all([
pWaitFor(() => psA.peers.size === 1),
pWaitFor(() => psB.peers.size === 1)
])
})

it('should receive pubsub messages', async () => {
const defer = pDefer()
const topic = 'test-topic'
const data = 'hey!'

await psA._libp2p.dial(psB.peerId)

let subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.not.include(topic)

psA.on(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
defer.resolve()
})
psA.subscribe(topic)

subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.include(topic)

// wait for psB to know about psA subscription
await pWaitFor(() => {
const subscribedPeers = psB.getSubscribers(topic)
return subscribedPeers.includes(psA.peerId.toB58String())
})
psB.publish(topic, data)

await defer.promise
})
})

describe('pubsub started after connect', () => {
// Create pubsub nodes
beforeEach(async () => {
[psA, psB] = await common.setup(2)
})

afterEach(async () => {
sinon.restore()

psA && psA.stop()
psB && psB.stop()

await common.teardown()
})

it('should get notified of connected peers after starting', async () => {
const connection = await psA._libp2p.dial(psB.peerId)
expect(connection).to.exist()
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)

psA.start()
psB.start()

return Promise.all([
pWaitFor(() => psA.peers.size === 1),
pWaitFor(() => psB.peers.size === 1)
])
})

it('should receive pubsub messages', async () => {
const defer = pDefer()
const topic = 'test-topic'
const data = 'hey!'

await psA._libp2p.dial(psB.peerId)

psA.start()
psB.start()

await Promise.all([
pWaitFor(() => psA.peers.size === 1),
pWaitFor(() => psB.peers.size === 1)
])

let subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.not.include(topic)

psA.on(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
defer.resolve()
})
psA.subscribe(topic)

subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.include(topic)

// wait for psB to know about psA subscription
await pWaitFor(() => {
const subscribedPeers = psB.getSubscribers(topic)
return subscribedPeers.includes(psA.peerId.toB58String())
})
psB.publish(topic, data)

await defer.promise
})
})

describe('pubsub with intermittent connections', () => {
// Create pubsub nodes and start them
beforeEach(async () => {
[psA, psB] = await common.setup(2)

psA.start()
psB.start()
})

afterEach(async () => {
sinon.restore()

psA && psA.stop()
psB && psB.stop()

await common.teardown()
})

it('should receive pubsub messages after a node restart', async function () {
this.timeout(10e3)
const topic = 'test-topic'
const data = 'hey!'
const psAid = psA.peerId.toB58String()

let counter = 0
const defer1 = pDefer()
const defer2 = pDefer()

await psA._libp2p.dial(psB.peerId)

let subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.not.include(topic)

psA.on(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
counter++
counter === 1 ? defer1.resolve() : defer2.resolve()
})
psA.subscribe(topic)

subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.include(topic)

// wait for psB to know about psA subscription
await pWaitFor(() => {
const subscribedPeers = psB.getSubscribers(topic)
return subscribedPeers.includes(psAid)
})
psB.publish(topic, data)

await defer1.promise

psB.stop()
await psB._libp2p.stop()
await pWaitFor(() => !psA._libp2p.connectionManager.get(psB.peerId) && !psB._libp2p.connectionManager.get(psA.peerId))
await psB._libp2p.start()
psB.start()

psA._libp2p.peerStore.addressBook.set(psB.peerId, psB._libp2p.multiaddrs)
await psA._libp2p.dial(psB.peerId)

// wait for remoteLibp2p to know about libp2p subscription
await pWaitFor(() => {
const subscribedPeers = psB.getSubscribers(topic)
return subscribedPeers.includes(psAid)
})

psB.publish(topic, data)

await defer2.promise
})

it('should handle quick reconnects with a delayed disconnect', async () => {
// Subscribe on both
const handlerSpy = sinon.spy()
const topic = 'reconnect-channel'

psA.on(topic, handlerSpy)
psB.on(topic, handlerSpy)
await Promise.all([
psA.subscribe(topic),
psB.subscribe(topic)
])

// Create two connections to the remote peer
const originalConnection = await psA._libp2p.dialer.connectToPeer(psB.peerId)
// second connection
await psA._libp2p.dialer.connectToPeer(psB.peerId)
expect(psA._libp2p.connections.get(psB.peerId.toB58String())).to.have.length(2)

// Wait for subscriptions to occur
await pWaitFor(() => {
return psA.getSubscribers(topic).includes(psB.peerId.toB58String()) &&
psB.getSubscribers(topic).includes(psA.peerId.toB58String())
})

// Verify messages go both ways
psA.publish(topic, 'message1')
psB.publish(topic, 'message2')
await pWaitFor(() => handlerSpy.callCount >= 2)
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message1', 'message2'])

// Disconnect the first connection (this acts as a delayed reconnect)
const psAConnUpdateSpy = sinon.spy(psA._libp2p.connectionManager.connections, 'set')

await originalConnection.close()
await pWaitFor(() => psAConnUpdateSpy.callCount === 1)

// Verify messages go both ways after the disconnect
handlerSpy.resetHistory()
psA.publish(topic, 'message3')
psB.publish(topic, 'message4')
await pWaitFor(() => handlerSpy.callCount >= 2)
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message3', 'message4'])
})
})
})
}
3 changes: 1 addition & 2 deletions src/pubsub/tests/emit-self.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
/* eslint-env mocha */
'use strict'

const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')

const uint8ArrayFromString = require('uint8arrays/from-string')
Expand Down
2 changes: 2 additions & 0 deletions src/pubsub/tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
const apiTest = require('./api')
const emitSelfTest = require('./emit-self')
const messagesTest = require('./messages')
const connectionHandlersTest = require('./connection-handlers')
const twoNodesTest = require('./two-nodes')
const multipleNodesTest = require('./multiple-nodes')

Expand All @@ -13,6 +14,7 @@ module.exports = (common) => {
apiTest(common)
emitSelfTest(common)
messagesTest(common)
connectionHandlersTest(common)
twoNodesTest(common)
multipleNodesTest(common)
})
Expand Down
3 changes: 1 addition & 2 deletions src/pubsub/tests/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
/* eslint-env mocha */
'use strict'

const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')

const PeerId = require('peer-id')
Expand Down
3 changes: 1 addition & 2 deletions src/pubsub/tests/multiple-nodes.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
/* eslint max-nested-callbacks: ["error", 6] */
'use strict'

const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')

const delay = require('delay')
Expand Down
Loading