Skip to content

Commit

Permalink
Merge branch 'master' into feat/dcutr
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain authored Aug 10, 2023
2 parents c7ccd63 + a1fbb7e commit 7d1c021
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 333 deletions.
20 changes: 8 additions & 12 deletions packages/transport-websockets/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AbortError } from '@libp2p/interface/errors'
import { AbortError, CodeError } from '@libp2p/interface/errors'
import { type Transport, type MultiaddrFilter, symbol, type CreateListenerOptions, type DialOptions, type Listener } from '@libp2p/interface/transport'
import { logger } from '@libp2p/logger'
import { multiaddrToUri as toUri } from '@multiformats/multiaddr-to-uri'
Expand Down Expand Up @@ -55,19 +55,15 @@ class WebSockets implements Transport {
log('dialing %s:%s', cOpts.host, cOpts.port)

const errorPromise = pDefer()
const errfn = (err: any): void => {
const rawSocket = connect(toUri(ma), this.init)
rawSocket.socket.addEventListener('error', () => {
// the WebSocket.ErrorEvent type doesn't actually give us any useful
// information about what happened
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/error_event
const err = new CodeError(`Could not connect to ${ma.toString()}`, 'ERR_CONNECTION_FAILED')
log.error('connection error:', err)

errorPromise.reject(err)
}

const rawSocket = connect(toUri(ma), this.init)

if (rawSocket.socket.on != null) {
rawSocket.socket.on('error', errfn)
} else {
rawSocket.socket.onerror = errfn
}
})

if (options.signal == null) {
await Promise.race([rawSocket.connected(), errorPromise.promise])
Expand Down
3 changes: 2 additions & 1 deletion packages/transport-webtransport/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
"@multiformats/multiaddr": "^12.1.3",
"it-stream-types": "^2.0.1",
"multiformats": "^12.0.1",
"uint8arraylist": "^2.4.3"
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.6"
},
"devDependencies": {
"aegir": "^40.0.1",
Expand Down
307 changes: 7 additions & 300 deletions packages/transport-webtransport/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { noise } from '@chainsafe/libp2p-noise'
import { type Transport, symbol, type CreateListenerOptions, type DialOptions, type Listener } from '@libp2p/interface/transport'
import { logger } from '@libp2p/logger'
import { peerIdFromString } from '@libp2p/peer-id'
import { type Multiaddr, protocols, type AbortOptions } from '@multiformats/multiaddr'
import { bases, digest } from 'multiformats/basics'
import { Uint8ArrayList } from 'uint8arraylist'
import type { Connection, Direction, MultiaddrConnection, Stream } from '@libp2p/interface/connection'
import { type Multiaddr, type AbortOptions } from '@multiformats/multiaddr'
import { webtransportBiDiStreamToStream } from './stream.js'
import { inertDuplex } from './utils/inert-duplex.js'
import { isSubset } from './utils/is-subset.js'
import { parseMultiaddr } from './utils/parse-multiaddr.js'
import type { Connection, MultiaddrConnection, Stream } from '@libp2p/interface/connection'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { StreamMuxerFactory, StreamMuxerInit, StreamMuxer } from '@libp2p/interface/stream-muxer'
import type { Duplex, Source } from 'it-stream-types'
import type { Source } from 'it-stream-types'
import type { MultihashDigest } from 'multiformats/hashes/interface'

declare global {
Expand All @@ -17,300 +18,6 @@ declare global {

const log = logger('libp2p:webtransport')

// @ts-expect-error - Not easy to combine these types.
const multibaseDecoder = Object.values(bases).map(b => b.decoder).reduce((d, b) => d.or(b))

function decodeCerthashStr (s: string): MultihashDigest {
return digest.decode(multibaseDecoder.decode(s))
}

// Duplex that does nothing. Needed to fulfill the interface
function inertDuplex (): Duplex<any, any, any> {
return {
source: {
[Symbol.asyncIterator] () {
return {
async next () {
// This will never resolve
return new Promise(() => { })
}
}
}
},
sink: async (source: Source<any>) => {
// This will never resolve
return new Promise(() => { })
}
}
}

async function webtransportBiDiStreamToStream (bidiStream: any, streamId: string, direction: Direction, activeStreams: Stream[], onStreamEnd: undefined | ((s: Stream) => void)): Promise<Stream> {
const writer = bidiStream.writable.getWriter()
const reader = bidiStream.readable.getReader()
await writer.ready

function cleanupStreamFromActiveStreams (): void {
const index = activeStreams.findIndex(s => s === stream)
if (index !== -1) {
activeStreams.splice(index, 1)
stream.timeline.close = Date.now()
onStreamEnd?.(stream)
}
}

let writerClosed = false
let readerClosed = false;
(async function () {
const err: Error | undefined = await writer.closed.catch((err: Error) => err)
if (err != null) {
const msg = err.message
if (!(msg.includes('aborted by the remote server') || msg.includes('STOP_SENDING'))) {
log.error(`WebTransport writer closed unexpectedly: streamId=${streamId} err=${err.message}`)
}
}
writerClosed = true
if (writerClosed && readerClosed) {
cleanupStreamFromActiveStreams()
}
})().catch(() => {
log.error('WebTransport failed to cleanup closed stream')
});

(async function () {
const err: Error | undefined = await reader.closed.catch((err: Error) => err)
if (err != null) {
log.error(`WebTransport reader closed unexpectedly: streamId=${streamId} err=${err.message}`)
}
readerClosed = true
if (writerClosed && readerClosed) {
cleanupStreamFromActiveStreams()
}
})().catch(() => {
log.error('WebTransport failed to cleanup closed stream')
})

let sinkSunk = false
const stream: Stream = {
id: streamId,
status: 'open',
writeStatus: 'ready',
readStatus: 'ready',
abort (err: Error) {
if (!writerClosed) {
writer.abort()
writerClosed = true
}
stream.abort(err)
readerClosed = true

this.status = 'aborted'
this.writeStatus = 'closed'
this.readStatus = 'closed'

this.timeline.reset =
this.timeline.close =
this.timeline.closeRead =
this.timeline.closeWrite = Date.now()

cleanupStreamFromActiveStreams()
},
async close (options?: AbortOptions) {
this.status = 'closing'

await Promise.all([
stream.closeRead(options),
stream.closeWrite(options)
])

cleanupStreamFromActiveStreams()

this.status = 'closed'
this.timeline.close = Date.now()
},

async closeRead (options?: AbortOptions) {
if (!readerClosed) {
this.readStatus = 'closing'

try {
await reader.cancel()
} catch (err: any) {
if (err.toString().includes('RESET_STREAM') === true) {
writerClosed = true
}
}

this.timeline.closeRead = Date.now()
this.readStatus = 'closed'

readerClosed = true
}

if (writerClosed) {
cleanupStreamFromActiveStreams()
}
},

async closeWrite (options?: AbortOptions) {
if (!writerClosed) {
writerClosed = true

this.writeStatus = 'closing'

try {
await writer.close()
} catch (err: any) {
if (err.toString().includes('RESET_STREAM') === true) {
readerClosed = true
}
}

this.timeline.closeWrite = Date.now()
this.writeStatus = 'closed'
}

if (readerClosed) {
cleanupStreamFromActiveStreams()
}
},
direction,
timeline: { open: Date.now() },
metadata: {},
source: (async function * () {
while (true) {
const val = await reader.read()
if (val.done === true) {
readerClosed = true
if (writerClosed) {
cleanupStreamFromActiveStreams()
}
return
}

yield new Uint8ArrayList(val.value)
}
})(),
sink: async function (source: Source<Uint8Array | Uint8ArrayList>) {
if (sinkSunk) {
throw new Error('sink already called on stream')
}
sinkSunk = true
try {
this.writeStatus = 'writing'

for await (const chunks of source) {
if (chunks instanceof Uint8Array) {
await writer.write(chunks)
} else {
for (const buf of chunks) {
await writer.write(buf)
}
}
}

this.writeStatus = 'done'
} finally {
this.timeline.closeWrite = Date.now()
this.writeStatus = 'closed'

await stream.closeWrite()
}
}
}

return stream
}

function parseMultiaddr (ma: Multiaddr): { url: string, certhashes: MultihashDigest[], remotePeer?: PeerId } {
const parts = ma.stringTuples()

// This is simpler to have inline than extract into a separate function
// eslint-disable-next-line complexity
const { url, certhashes, remotePeer } = parts.reduce((state: { url: string, certhashes: MultihashDigest[], seenHost: boolean, seenPort: boolean, remotePeer?: PeerId }, [proto, value]) => {
switch (proto) {
case protocols('ip6').code:
// @ts-expect-error - ts error on switch fallthrough
case protocols('dns6').code:
if (value?.includes(':') === true) {
/**
* This resolves cases where `new globalThis.WebTransport` fails to construct because of an invalid URL being passed.
*
* `new URL('https://::1:4001/blah')` will throw a `TypeError: Failed to construct 'URL': Invalid URL`
* `new URL('https://[::1]:4001/blah')` is valid and will not.
*
* @see https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
*/
value = `[${value}]`
}
// eslint-disable-next-line no-fallthrough
case protocols('ip4').code:
case protocols('dns4').code:
if (state.seenHost || state.seenPort) {
throw new Error('Invalid multiaddr, saw host and already saw the host or port')
}
return {
...state,
url: `${state.url}${value ?? ''}`,
seenHost: true
}
case protocols('quic').code:
case protocols('quic-v1').code:
case protocols('webtransport').code:
if (!state.seenHost || !state.seenPort) {
throw new Error("Invalid multiaddr, Didn't see host and port, but saw quic/webtransport")
}
return state
case protocols('udp').code:
if (state.seenPort) {
throw new Error('Invalid multiaddr, saw port but already saw the port')
}
return {
...state,
url: `${state.url}:${value ?? ''}`,
seenPort: true
}
case protocols('certhash').code:
if (!state.seenHost || !state.seenPort) {
throw new Error('Invalid multiaddr, saw the certhash before seeing the host and port')
}
return {
...state,
certhashes: state.certhashes.concat([decodeCerthashStr(value ?? '')])
}
case protocols('p2p').code:
return {
...state,
remotePeer: peerIdFromString(value ?? '')
}
default:
throw new Error(`unexpected component in multiaddr: ${proto} ${protocols(proto).name} ${value ?? ''} `)
}
},
// All webtransport urls are https
{ url: 'https://', seenHost: false, seenPort: false, certhashes: [] })

return { url, certhashes, remotePeer }
}

// Determines if `maybeSubset` is a subset of `set`. This means that all byte arrays in `maybeSubset` are present in `set`.
export function isSubset (set: Uint8Array[], maybeSubset: Uint8Array[]): boolean {
const intersection = maybeSubset.filter(byteArray => {
return Boolean(set.find((otherByteArray: Uint8Array) => {
if (byteArray.length !== otherByteArray.length) {
return false
}

for (let index = 0; index < byteArray.length; index++) {
if (otherByteArray[index] !== byteArray[index]) {
return false
}
}
return true
}))
})
return (intersection.length === maybeSubset.length)
}

export interface WebTransportInit {
maxInboundStreams?: number
}
Expand Down
Loading

0 comments on commit 7d1c021

Please sign in to comment.