Skip to content

Commit

Permalink
fix: graceful shutdown (#1994)
Browse files Browse the repository at this point in the history
* Shutdown to revoke connection retries

* SocksClient to use an existing connection to tor

* ConnextClient disconnect to abort pending requests

* Track reachability verification peer, for shutdown purposes

* Pool disconnection to announce 'shutdown' to all peers

* Fix jest test

* Revert npm-shrinkwrap.json

* Use Set instead of Map
  • Loading branch information
LePremierHomme authored Nov 18, 2020
1 parent 43e7c8e commit cde3203
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 11 deletions.
22 changes: 21 additions & 1 deletion lib/connextclient/ConnextClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class ConnextClient extends SwapClient {
private outboundAmounts = new Map<string, number>();
private inboundAmounts = new Map<string, number>();

private pendingRequests = new Set<http.ClientRequest>();
private criticalRequestPaths = ['/hashlock-resolve', '/hashlock-transfer'];

/** The minimum incremental quantity that we may use for collateral requests. */
private static MIN_COLLATERAL_REQUEST_SIZES: { [key: string]: number | undefined } = {
ETH: 0.1 * 10 ** 8,
Expand Down Expand Up @@ -866,6 +869,16 @@ class ConnextClient extends SwapClient {
/** Connext client specific cleanup. */
protected disconnect = async () => {
this.setStatus(ClientStatus.Disconnected);

for (const req of this.pendingRequests) {
if (this.criticalRequestPaths.includes(req.path)) {
this.logger.warn(`critical request is pending: ${req.path}`);
continue;
}

this.logger.info(`aborting pending request: ${req.path}`);
req.destroy();
}
}

/**
Expand Down Expand Up @@ -893,7 +906,11 @@ class ConnextClient extends SwapClient {
}

this.logger.trace(`sending request to ${endpoint}${payloadStr ? `: ${payloadStr}` : ''}`);
const req = http.request(options, async (res) => {

let req: http.ClientRequest;
req = http.request(options, async (res) => {
this.pendingRequests.delete(req);

let err: XudError | undefined;
let body;
switch (res.statusCode) {
Expand Down Expand Up @@ -935,6 +952,7 @@ class ConnextClient extends SwapClient {
});

req.on('error', async (err: any) => {
this.pendingRequests.delete(req);
if (err.code === 'ECONNREFUSED') {
await this.disconnect();
}
Expand All @@ -945,7 +963,9 @@ class ConnextClient extends SwapClient {
if (payloadStr) {
req.write(payloadStr);
}

req.end();
this.pendingRequests.add(req);
});
}
}
Expand Down
11 changes: 8 additions & 3 deletions lib/p2p/Peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ class Peer extends EventEmitter {
}

this.status = PeerStatus.Closed;
this.revokeConnectionRetries();

if (this.socket) {
if (!this.socket.destroyed) {
Expand Down Expand Up @@ -521,6 +522,8 @@ class Peer extends EventEmitter {
this.connectionRetriesRevoked = false;

const connectViaProxy = () => {
this.socket = net.connect(torport, 'localhost');

const proxyOptions: SocksClientOptions = {
proxy: {
host: 'localhost',
Expand All @@ -532,11 +535,11 @@ class Peer extends EventEmitter {
host: this.address.host,
port: this.address.port,
},
existing_socket: this.socket,
};
SocksClient.createConnection(proxyOptions)
.then((info) => {
// a raw net.Socket that is established to the destination host through the given proxy server
this.socket = info.socket;
assert(this.socket === info.socket);
onConnect();
})
.catch(onError);
Expand Down Expand Up @@ -614,7 +617,9 @@ class Peer extends EventEmitter {
}

private initStall = (): void => {
assert(this.status !== PeerStatus.Closed);
if (this.status !== PeerStatus.Closed) {
return;
}
assert(!this.stallTimer);

this.stallTimer = setInterval(this.checkTimeout, Peer.STALL_INTERVAL);
Expand Down
20 changes: 14 additions & 6 deletions lib/p2p/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,11 @@ class Pool extends EventEmitter {
if (this.loadingNodesPromise) {
await this.loadingNodesPromise;
}
await Promise.all([this.unlisten(), this.closePendingConnections(), this.closePeers()]);
await Promise.all([
this.unlisten(),
this.closePendingConnections(DisconnectionReason.Shutdown),
this.closePeers(DisconnectionReason.Shutdown)],
);
this.connected = false;
this.disconnecting = false;
}
Expand All @@ -303,13 +307,17 @@ class Pool extends EventEmitter {
this.logger.debug(`Verifying reachability of advertised address: ${externalAddress}`);
try {
const peer = new Peer(Logger.DISABLED_LOGGER, address, this.network);

this.pendingOutboundPeers.set(this.nodePubKey, peer);
await peer.beginOpen({
ownNodeState: this.nodeState,
ownNodeKey: this.nodeKey,
ownVersion: this.version,
expectedNodePubKey: this.nodePubKey,
torport: this.config.torport,
});
this.pendingOutboundPeers.delete(this.nodePubKey);

await peer.close();
assert.fail();
} catch (err) {
Expand Down Expand Up @@ -1002,21 +1010,21 @@ class Pool extends EventEmitter {
}
}

private closePeers = () => {
private closePeers = (reason?: DisconnectionReason) => {
const closePromises = [];
for (const peer of this.peers.values()) {
closePromises.push(peer.close(DisconnectionReason.Shutdown));
closePromises.push(peer.close(reason));
}
return Promise.all(closePromises);
}

private closePendingConnections = () => {
private closePendingConnections = (reason?: DisconnectionReason) => {
const closePromises = [];
for (const peer of this.pendingOutboundPeers.values()) {
closePromises.push(peer.close());
closePromises.push(peer.close(reason));
}
for (const peer of this.pendingInboundPeers) {
closePromises.push(peer.close());
closePromises.push(peer.close(reason));
}
return Promise.all(closePromises);
}
Expand Down
30 changes: 29 additions & 1 deletion test/jest/Connext.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,20 @@ jest.mock('http', () => {
statusCode: 404,
});
}

let errorCb: any;
return {
path: options.path,
write: jest.fn(),
on: jest.fn(),
end: jest.fn(),
on: jest.fn().mockImplementation((event, cb) => {
if (event === 'error') {
errorCb = cb;
}
}),
destroy: jest.fn().mockImplementation(() => {
errorCb();
}),
};
}),
};
Expand All @@ -55,6 +65,8 @@ describe('ConnextClient', () => {
logger.trace = jest.fn();
logger.error = jest.fn();
logger.debug = jest.fn();
logger.warn = jest.fn();
logger.info = jest.fn();
const currencyInstances = [
{
id: 'ETH',
Expand Down Expand Up @@ -427,4 +439,20 @@ describe('ConnextClient', () => {
expect(connext['sendRequest']).toHaveBeenCalledTimes(0);
});
});

describe('disconnect', () => {
it('aborts pending requests, except critical ones', async () => {
expect(connext['pendingRequests'].size).toEqual(0);

connext['sendRequest'](connext['criticalRequestPaths'][0], '', {});
connext['sendRequest']('/path1', '', {});
connext['sendRequest']('/path1', '', {});
connext['sendRequest']('/path2', '', {});
connext['sendRequest'](connext['criticalRequestPaths'][1], '', {});
expect(connext['pendingRequests'].size).toEqual(5);

connext['disconnect']();
expect(connext['pendingRequests'].size).toEqual(2);
});
});
});

0 comments on commit cde3203

Please sign in to comment.