From 71a99bffd890613b1d6a199cafd1829359927d48 Mon Sep 17 00:00:00 2001 From: platfowner Date: Mon, 15 Apr 2024 13:43:15 +0900 Subject: [PATCH 1/5] Upgrade ws package --- package.json | 2 +- yarn.lock | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 6860319ea..4a43769c5 100644 --- a/package.json +++ b/package.json @@ -99,7 +99,7 @@ "web3-eth-accounts": "^1.6.1", "winston": "^3.3.3", "winston-daily-rotate-file": "^4.4.2", - "ws": "^7.4.6" + "ws": "^8.16.0" }, "devDependencies": { "chai": "^4.2.0", diff --git a/yarn.lock b/yarn.lock index d2ef49a54..663fbf777 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6603,11 +6603,16 @@ write@1.0.3: dependencies: mkdirp "^0.5.1" -ws@^7.4.5, ws@^7.4.6: +ws@^7.4.5: version "7.5.7" resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.7.tgz#9e0ac77ee50af70d58326ecff7e85eb3fa375e67" integrity sha512-KMvVuFzpKBuiIXW3E4u3mySRO2/mCHSyZDJQM5NQ9Q9KHWHWh0NHgfbRMLLrceUK5qAL4ytALJbpRMjixFZh8A== +ws@^8.16.0: + version "8.16.0" + resolved "https://registry.yarnpkg.com/ws/-/ws-8.16.0.tgz#d1cd774f36fbc07165066a60e40323eab6446fd4" + integrity sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ== + xdg-basedir@^4.0.0: version "4.0.0" resolved "https://registry.yarnpkg.com/xdg-basedir/-/xdg-basedir-4.0.0.tgz#4bc8d9984403696225ef83a1573cbbcb4e79db13" From f827128e60a581a1bd5f7c71bd716145ff7be669 Mon Sep 17 00:00:00 2001 From: platfowner Date: Mon, 15 Apr 2024 13:43:46 +0900 Subject: [PATCH 2/5] Add ping-pong message types --- common/constants.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/common/constants.js b/common/constants.js index 0a538512c..1445346d7 100644 --- a/common/constants.js +++ b/common/constants.js @@ -681,6 +681,9 @@ const BlockchainEventMessageTypes = { DEREGISTER_FILTER: 'DEREGISTER_FILTER', EMIT_EVENT: 'EMIT_EVENT', EMIT_ERROR: 'EMIT_ERROR', + // NOTE(platfowner): Message types for custom ping-pong (see https://github.com/ainblockchain/ain-js/issues/171). + PING: 'PING', + PONG: 'PONG', }; const ValueChangedEventSources = { From 37cd8a1d503a1d8c0adb8781547be538f548b18b Mon Sep 17 00:00:00 2001 From: platfowner Date: Mon, 15 Apr 2024 15:35:27 +0900 Subject: [PATCH 3/5] Implement heartbeat with custom ping-pong for event handler --- event-handler/event-channel-manager.js | 91 ++++++++++++++++---------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/event-handler/event-channel-manager.js b/event-handler/event-channel-manager.js index 11535fe6d..a57d1245e 100644 --- a/event-handler/event-channel-manager.js +++ b/event-handler/event-channel-manager.js @@ -82,17 +82,39 @@ class EventChannelManager { // TODO(cshcomcom): Handle MAX connections. logger.info(`[${LOG_HEADER}] New connection (${channelId})`); + webSocket.on('message', (message) => { - this.handleMessage(channel, message); + try { + const parsedMessage = JSON.parse(message); + const messageType = parsedMessage.type; + if (!messageType) { + throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_TYPE_IN_MSG, + `No message type in (${JSON.stringify(message)})`); + } + const messageData = parsedMessage.data; + if (!messageData) { + throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_DATA_IN_MSG, + `No message data in (${JSON.stringify(message)})`); + } + // NOTE(platfowner): A custom ping-pong (see https://github.com/ainblockchain/ain-js/issues/171). + if (messageType === BlockchainEventMessageTypes.PONG) { + this.handlePong(webSocket); + } else { + this.handleMessage(channel, messageType, messageData); + } + } catch (err) { + logger.error(`[${LOG_HEADER}] Error while process message ` + + `(message: ${JSON.stringify(message, null, 2)}, ` + + `error message: ${err.message})`); + this.handleEventError(channel, err); + } }); + webSocket.on('close', (_) => { this.closeChannel(channel); }); - // Heartbeat - webSocket.on('pong', (_) => { - webSocket.isAlive = true; - }) + webSocket.isAlive = true; } catch (err) { webSocket.terminate(); @@ -222,36 +244,28 @@ class EventChannelManager { } } - handleMessage(channel, message) { // TODO(cshcomcom): Manage EVENT_PROTOCOL_VERSION. - const LOG_HEADER = 'handleMessage'; - try { - const parsedMessage = JSON.parse(message); - const messageType = parsedMessage.type; - if (!messageType) { - throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_TYPE_IN_MSG, - `Can't find type from message (${JSON.stringify(message)})`); - } - const messageData = parsedMessage.data; - if (!messageData) { - throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_DATA_IN_MSG, - `Can't find data from message (${JSON.stringify(message)})`); - } - switch (messageType) { - case BlockchainEventMessageTypes.REGISTER_FILTER: - this.handleRegisterFilterMessage(channel, messageData); - break; - case BlockchainEventMessageTypes.DEREGISTER_FILTER: - this.handleDeregisterFilterMessage(channel, messageData); - break; - default: - throw new EventHandlerError(EventHandlerErrorCode.INVALID_MESSAGE_TYPE, - `Invalid message type (${messageType})`); - } - } catch (err) { - logger.error(`[${LOG_HEADER}] Error while process message ` + - `(message: ${JSON.stringify(message, null, 2)}, ` + - `error message: ${err.message})`); - this.handleEventError(channel, err); + /** + * Handles a pong message. + */ + handlePong(webSocket) { + webSocket.isAlive = true; + } + + /** + * Handles a (non-pong) message from the channel. + */ + // TODO(cshcomcom): Manage EVENT_PROTOCOL_VERSION. + handleMessage(channel, messageType, messageData) { + switch (messageType) { + case BlockchainEventMessageTypes.REGISTER_FILTER: + this.handleRegisterFilterMessage(channel, messageData); + break; + case BlockchainEventMessageTypes.DEREGISTER_FILTER: + this.handleDeregisterFilterMessage(channel, messageData); + break; + default: + throw new EventHandlerError(EventHandlerErrorCode.INVALID_MESSAGE_TYPE, + `Invalid message type (${messageType})`); } } @@ -325,11 +339,16 @@ class EventChannelManager { return ws.terminate(); } ws.isAlive = false; - ws.ping(); + this.sendPing(ws); }); }, NodeConfigs.EVENT_HANDLER_HEARTBEAT_INTERVAL_MS || 15000); } + sendPing(webSocket) { + const pingMessage = this.makeMessage(BlockchainEventMessageTypes.PING, {}); + webSocket.send(JSON.stringify(pingMessage)); + } + stopHeartbeat() { clearInterval(this.heartbeatInterval); } From 5b7cd1a1e97f838caa1e7f1d76f3971ff6aa4d92 Mon Sep 17 00:00:00 2001 From: platfowner Date: Mon, 15 Apr 2024 15:36:32 +0900 Subject: [PATCH 4/5] Fix tests broken by the side effect of custom ping-pong --- test/integration/event_handler.test.js | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/test/integration/event_handler.test.js b/test/integration/event_handler.test.js index 8a7569ca1..7a9d8b2d6 100644 --- a/test/integration/event_handler.test.js +++ b/test/integration/event_handler.test.js @@ -217,15 +217,19 @@ describe('Event Handler Test', function() { it('Wait BLOCK_FINALIZED events', function(done) { this.timeout(3 * epochMs); - wsClient.once('message', (message) => { + function messageHandler(message) { const parsedMessage = JSON.parse(message); const messageType = parsedMessage.type; const eventType = _.get(parsedMessage, 'data.type'); if (messageType === BlockchainEventMessageTypes.EMIT_EVENT && eventType === BlockchainEventTypes.BLOCK_FINALIZED) { done(); + // NOTE(platfowner): Avoid test failure with "done() called multiple times". + wsClient.removeListener('message', messageHandler); } - }); + } + // NOTE(platfowner): Use 'on' instead of 'once' due to heartbeats with custom ping-pong. + wsClient.on('message', messageHandler); }); it('Deregister filter & check number of filters === 0', function(done) { @@ -324,7 +328,8 @@ describe('Event Handler Test', function() { block_number: null, }; registerFilter(wsClient, filterId, BlockchainEventTypes.BLOCK_FINALIZED, config); - wsClient.once('message', (message) => { + // NOTE(platfowner): Use 'on' instead of 'once' due to heartbeats with custom ping-pong. + wsClient.on('message', (message) => { const parsedMessage = JSON.parse(message); const messageType = parsedMessage.type; const eventType = _.get(parsedMessage, 'data.type'); @@ -343,7 +348,8 @@ describe('Event Handler Test', function() { path: targetPath, }; registerFilter(wsClient, filterId, BlockchainEventTypes.VALUE_CHANGED, config); - wsClient.once('message', (message) => { + // NOTE(platfowner): Use 'on' instead of 'once' due to heartbeats with custom ping-pong. + wsClient.on('message', (message) => { const parsedMessage = JSON.parse(message); const messageType = parsedMessage.type; const eventType = _.get(parsedMessage, 'data.type'); From 34ce7a5d37ab10e0f81a4369ade247164a119183 Mon Sep 17 00:00:00 2001 From: platfowner Date: Wed, 17 Apr 2024 14:38:32 +0900 Subject: [PATCH 5/5] Upgrade package version to v1.2.0 --- client/protocol_versions.json | 3 +++ package.json | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/client/protocol_versions.json b/client/protocol_versions.json index 68cf1c450..191dc587b 100644 --- a/client/protocol_versions.json +++ b/client/protocol_versions.json @@ -137,5 +137,8 @@ }, "1.1.4": { "min": "1.0.0" + }, + "1.2.0": { + "min": "1.0.0" } } \ No newline at end of file diff --git a/package.json b/package.json index 4a43769c5..79632ffe1 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "ain-blockchain", "description": "AI Network Blockchain", - "version": "1.1.4", + "version": "1.2.0", "private": true, "license": "MIT", "author": "dev@ainetwork.ai",