From 94547ce8fd610b6d374b4c8c7bba70476f91eb4c Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Sun, 24 Sep 2023 18:02:49 -0300 Subject: [PATCH 01/15] add logs --- workers/deserializer.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/workers/deserializer.ts b/workers/deserializer.ts index 16c4ee70..93266041 100644 --- a/workers/deserializer.ts +++ b/workers/deserializer.ts @@ -624,7 +624,14 @@ export default class MainDSWorker extends HyperionWorker { const pool_queue = `${this.chain}:ds_pool:${selected_q}`; if (this.ch_ready) { // console.log('selected_q', pool_queue); - this.ch.sendToQueue(pool_queue, bufferFromJson(trace, true), {headers}); + const enqueueResult = this.ch.sendToQueue(pool_queue, bufferFromJson(trace, true), {headers}); + if(!enqueueResult) { + hLog("Failed to send trace!"); + hLog('-----TRACE-----'); + hLog(trace); + hLog('-----HEADERS-----'); + hLog(headers); + } return true; } else { return false; From 44062fbe0e3da6be80797d9aed966bf63dc52b6d Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Sun, 24 Sep 2023 18:28:07 -0300 Subject: [PATCH 02/15] add logs --- workers/deserializer.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/workers/deserializer.ts b/workers/deserializer.ts index 93266041..94fe5317 100644 --- a/workers/deserializer.ts +++ b/workers/deserializer.ts @@ -625,12 +625,9 @@ export default class MainDSWorker extends HyperionWorker { if (this.ch_ready) { // console.log('selected_q', pool_queue); const enqueueResult = this.ch.sendToQueue(pool_queue, bufferFromJson(trace, true), {headers}); - if(!enqueueResult) { + if (!enqueueResult) { hLog("Failed to send trace!"); - hLog('-----TRACE-----'); - hLog(trace); - hLog('-----HEADERS-----'); - hLog(headers); + console.log("Header size: " + JSON.stringify(headers).length); } return true; } else { From ce6f307bbe574ad88b213b73ad704104a7b22f4b Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Sun, 24 Sep 2023 18:32:15 -0300 Subject: [PATCH 03/15] add logs --- workers/deserializer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/deserializer.ts b/workers/deserializer.ts index 94fe5317..561ea2e6 100644 --- a/workers/deserializer.ts +++ b/workers/deserializer.ts @@ -623,11 +623,11 @@ export default class MainDSWorker extends HyperionWorker { const pool_queue = `${this.chain}:ds_pool:${selected_q}`; if (this.ch_ready) { - // console.log('selected_q', pool_queue); const enqueueResult = this.ch.sendToQueue(pool_queue, bufferFromJson(trace, true), {headers}); if (!enqueueResult) { hLog("Failed to send trace!"); console.log("Header size: " + JSON.stringify(headers).length); + console.log(headers); } return true; } else { From d259008cb42a858c476cd747ea34b96d1726f549 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Sun, 24 Sep 2023 19:13:08 -0300 Subject: [PATCH 04/15] add logs --- connections/amqp.ts | 4 ++-- workers/deserializer.ts | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/connections/amqp.ts b/connections/amqp.ts index 7cef2b67..7d21ea01 100644 --- a/connections/amqp.ts +++ b/connections/amqp.ts @@ -23,11 +23,11 @@ export function getAmpqUrl(config): string { } const u = encodeURIComponent(config.user); const p = encodeURIComponent(config.pass); - const v = encodeURIComponent(config.vhost) + const v = encodeURIComponent(config.vhost); + console.log(`max frame: ${frameMaxValue}`); return `amqp://${u}:${p}@${config.host}/${v}?frameMax=${frameMaxValue}`; } - async function createChannels(connection) { try { const channel = await connection.createChannel(); diff --git a/workers/deserializer.ts b/workers/deserializer.ts index 561ea2e6..02f1e342 100644 --- a/workers/deserializer.ts +++ b/workers/deserializer.ts @@ -529,6 +529,8 @@ export default class MainDSWorker extends HyperionWorker { routeToPool(trace, headers) { + console.log(headers.block_num, headers.signatures); + let first_action; if (trace['action_traces'][0] && trace['action_traces'][0].length === 2) { first_action = trace['action_traces'][0][1]; From 29ed9d372d5bfcf6356c900e06a6dba6ab891a23 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Sun, 24 Sep 2023 19:14:52 -0300 Subject: [PATCH 05/15] add logs --- workers/deserializer.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/workers/deserializer.ts b/workers/deserializer.ts index 02f1e342..e6ca2b1b 100644 --- a/workers/deserializer.ts +++ b/workers/deserializer.ts @@ -529,7 +529,9 @@ export default class MainDSWorker extends HyperionWorker { routeToPool(trace, headers) { - console.log(headers.block_num, headers.signatures); + if(headers.signatures.length > 5) { + console.log(headers.block_num, headers.signatures); + } let first_action; if (trace['action_traces'][0] && trace['action_traces'][0].length === 2) { From bc98cecb527c99ceb0ae65d520f40915af7a3b66 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Sun, 24 Sep 2023 19:30:40 -0300 Subject: [PATCH 06/15] send signatures on payload --- workers/deserializer.ts | 8 ++------ workers/ds-pool.ts | 4 ++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/workers/deserializer.ts b/workers/deserializer.ts index e6ca2b1b..9803b8e8 100644 --- a/workers/deserializer.ts +++ b/workers/deserializer.ts @@ -478,6 +478,7 @@ export default class MainDSWorker extends HyperionWorker { hLog(`${block_num} was filtered with ${inline_count} actions!`); } try { + trace[1].signatures = signatures; this.routeToPool(trace[1], { block_num, block_id, @@ -485,8 +486,7 @@ export default class MainDSWorker extends HyperionWorker { ts, inline_count, filtered, - live: process.env['live_mode'], - signatures + live: process.env['live_mode'] }); } catch (e) { hLog(e); @@ -529,10 +529,6 @@ export default class MainDSWorker extends HyperionWorker { routeToPool(trace, headers) { - if(headers.signatures.length > 5) { - console.log(headers.block_num, headers.signatures); - } - let first_action; if (trace['action_traces'][0] && trace['action_traces'][0].length === 2) { first_action = trace['action_traces'][0][1]; diff --git a/workers/ds-pool.ts b/workers/ds-pool.ts index a51f2ceb..f25dbd19 100644 --- a/workers/ds-pool.ts +++ b/workers/ds-pool.ts @@ -454,8 +454,8 @@ export default class DSPoolWorker extends HyperionWorker { } async processTraces(transaction_trace, extra) { - const {cpu_usage_us, net_usage_words} = transaction_trace; - const {block_num, block_id, producer, ts, inline_count, filtered, live, signatures} = extra; + const {cpu_usage_us, net_usage_words, signatures} = transaction_trace; + const {block_num, block_id, producer, ts, inline_count, filtered, live} = extra; if (transaction_trace.status === 0) { let action_count = 0; From b8faf2ef701a96c053cf6b33d06db11cefd53c35 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Sun, 24 Sep 2023 19:33:55 -0300 Subject: [PATCH 07/15] package updates --- package-lock.json | 40 ++++++++++++++++------------------------ package.json | 6 +++--- 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/package-lock.json b/package-lock.json index a86b65ef..8fd54b0b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -44,11 +44,11 @@ "socket.io-redis": "^6.1.1", "telegraf": "^4.12.3-canary.1", "typescript": "^4.5.2", - "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.24.0", - "ws": "^8.12.1" + "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.31.0", + "ws": "^8.14.2" }, "devDependencies": { - "@types/amqplib": "^0.10.1", + "@types/amqplib": "^0.10.2", "@types/async": "^3.2.18", "@types/global-agent": "^2.1.1", "@types/ioredis": "^4.28.10", @@ -242,9 +242,9 @@ } }, "node_modules/@pm2/io": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/@pm2/io/-/io-5.0.0.tgz", - "integrity": "sha512-3rToDVJaRoob5Lq8+7Q2TZFruoEkdORxwzFpZaqF4bmH6Bkd7kAbdPrI/z8X6k1Meq5rTtScM7MmDgppH6aLlw==", + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/@pm2/io/-/io-5.0.2.tgz", + "integrity": "sha512-XAvrNoQPKOyO/jJyCu8jPhLzlyp35MEf7w/carHXmWKddPzeNOFSEpSEqMzPDawsvpxbE+i918cNN+MwgVsStA==", "dependencies": { "@opencensus/core": "0.0.9", "@opencensus/propagation-b3": "0.0.8", @@ -252,7 +252,7 @@ "debug": "~4.3.1", "eventemitter2": "^6.3.1", "require-in-the-middle": "^5.0.0", - "semver": "6.3.0", + "semver": "~7.5.4", "shimmer": "^1.2.0", "signal-exit": "^3.0.3", "tslib": "1.9.3" @@ -269,14 +269,6 @@ "lodash": "^4.17.14" } }, - "node_modules/@pm2/io/node_modules/semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", - "bin": { - "semver": "bin/semver.js" - } - }, "node_modules/@sindresorhus/is": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/@sindresorhus/is/-/is-4.6.0.tgz", @@ -305,9 +297,9 @@ } }, "node_modules/@types/amqplib": { - "version": "0.10.1", - "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.10.1.tgz", - "integrity": "sha512-j6ANKT79ncUDnAs/+9r9eDujxbeJoTjoVu33gHHcaPfmLQaMhvfbH2GqSe8KUM444epAp1Vl3peVOQfZk3UIqA==", + "version": "0.10.2", + "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.10.2.tgz", + "integrity": "sha512-K0qC2YR0ZcQWWMOifg4yvCAu5wi/V6wY6MnMS4LSvqx6qwXBAhxno6OBN8D76FIaajLNfgHKOXobZBL/uAwXAQ==", "dev": true, "dependencies": { "@types/node": "*" @@ -2518,9 +2510,9 @@ "integrity": "sha512-6aU+Rwsezw7VR8/nyvKTx8QpWH9FrcYiXXlqC4z5d5XQBDRqtbfsRjnwGyqbi3gddNtWHuEk9OANUotL26qKUw==" }, "node_modules/semver": { - "version": "7.5.3", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.3.tgz", - "integrity": "sha512-QBlUtyVk/5EeHbi7X0fw6liDZc7BBmEaSYn01fMU1OUYbf6GPsbTtd8WmnqbI20SeycoHSeiybkE/q1Q+qlThQ==", + "version": "7.5.4", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz", + "integrity": "sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA==", "dependencies": { "lru-cache": "^6.0.0" }, @@ -2987,9 +2979,9 @@ "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==" }, "node_modules/ws": { - "version": "8.13.0", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.13.0.tgz", - "integrity": "sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==", + "version": "8.14.2", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.14.2.tgz", + "integrity": "sha512-wEBG1ftX4jcglPxgFCMJmZ2PLtSbJ2Peg6TmpJFTbe9GZYOQCDPdMYu/Tm0/bGZkw8paZnJY45J4K2PZrLYq8g==", "engines": { "node": ">=10.0.0" }, diff --git a/package.json b/package.json index 3d610459..8fd8cc6d 100644 --- a/package.json +++ b/package.json @@ -60,11 +60,11 @@ "socket.io-redis": "^6.1.1", "telegraf": "^4.12.3-canary.1", "typescript": "^4.5.2", - "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.24.0", - "ws": "^8.12.1" + "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.31.0", + "ws": "^8.14.2" }, "devDependencies": { - "@types/amqplib": "^0.10.1", + "@types/amqplib": "^0.10.2", "@types/async": "^3.2.18", "@types/global-agent": "^2.1.1", "@types/ioredis": "^4.28.10", From 0579da6613076591f32f3ebcab176ab5b4967e6c Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Mon, 25 Sep 2023 00:29:07 -0300 Subject: [PATCH 08/15] include amqp-connection-manager --- connections/amqp.ts | 162 ++++++----- connections/manager.class.ts | 3 +- package-lock.json | 21 ++ package.json | 1 + tests/dsp-consumer.js | 13 +- tests/dsp-consumer.js.map | 2 +- tests/dsp-consumer.ts | 18 +- workers/delta-updater.ts | 8 +- workers/deserializer.ts | 7 +- workers/ds-pool.ts | 14 +- workers/hyperionWorker.ts | 539 ++++++++++++++++++----------------- workers/indexer.ts | 8 +- 12 files changed, 419 insertions(+), 377 deletions(-) diff --git a/connections/amqp.ts b/connections/amqp.ts index 7d21ea01..e9ba1d9f 100644 --- a/connections/amqp.ts +++ b/connections/amqp.ts @@ -1,91 +1,99 @@ import {debugLog, hLog} from "../helpers/common_functions"; import got, {HTTPError} from "got"; -import {connect, Connection} from 'amqplib'; +import amqp, {ChannelWrapper} from "amqp-connection-manager"; +import {IAmqpConnectionManager} from "amqp-connection-manager/dist/types/AmqpConnectionManager"; -export async function createConnection(config): Promise { - try { - const amqp_url = getAmpqUrl(config); - const conn: Connection = await connect(amqp_url); - debugLog("[AMQP] connection established"); - return conn; - } catch (e) { - hLog("[AMQP] failed to connect!"); - hLog(e.message); - await new Promise(resolve => setTimeout(resolve, 5000)); - return await createConnection(config); - } +export async function createConnection(config): Promise { + try { + const amqp_url = getAmpqUrl(config); + // const conn: Connection = await connect(amqp_url); + const conn: IAmqpConnectionManager = amqp.connect(amqp_url); + debugLog("[AMQP] connection established"); + return conn; + } catch (e) { + hLog("[AMQP] failed to connect!"); + hLog(e.message); + await new Promise(resolve => setTimeout(resolve, 5000)); + return await createConnection(config); + } } export function getAmpqUrl(config): string { - let frameMaxValue = '0x10000'; - if (config.frameMax) { - frameMaxValue = config.frameMax; - } - const u = encodeURIComponent(config.user); - const p = encodeURIComponent(config.pass); - const v = encodeURIComponent(config.vhost); - console.log(`max frame: ${frameMaxValue}`); - return `amqp://${u}:${p}@${config.host}/${v}?frameMax=${frameMaxValue}`; + let frameMaxValue = '0x10000'; + if (config.frameMax) { + frameMaxValue = config.frameMax; + } + const u = encodeURIComponent(config.user); + const p = encodeURIComponent(config.pass); + const v = encodeURIComponent(config.vhost); + console.log(`max frame: ${frameMaxValue}`); + return `amqp://${u}:${p}@${config.host}/${v}?frameMax=${frameMaxValue}`; } -async function createChannels(connection) { - try { - const channel = await connection.createChannel(); - const confirmChannel = await connection.createConfirmChannel(); - return [channel, confirmChannel]; - } catch (e) { - hLog("[AMQP] failed to create channels"); - hLog(e); - return null; - } +async function createChannels(connection: IAmqpConnectionManager) { + try { + // const channel = await connection.createChannel(); + const channel = connection.createChannel({ + confirm: false + }); + // const confirmChannel = await connection.createConfirmChannel(); + const confirmChannel = connection.createChannel({ + confirm: true + }) + return [channel, confirmChannel]; + } catch (e) { + hLog("[AMQP] failed to create channels"); + hLog(e); + return null; + } } -export async function amqpConnect(onReconnect, config, onClose) { - let connection = await createConnection(config); - if (connection) { - const channels = await createChannels(connection); - if (channels) { - connection.on('error', (err) => { - hLog(err.message); - }); - connection.on('close', () => { - hLog('Connection closed!'); - onClose(); - setTimeout(async () => { - hLog('Retrying in 5 seconds...'); - const _channels = await amqpConnect(onReconnect, config, onClose); - onReconnect(_channels); - return _channels; - }, 5000); - }); - return channels; - } else { - return null; - } - } else { - return null; - } +export async function amqpConnect(onReconnect, config, onClose): Promise { + let connection = await createConnection(config); + if (connection) { + const channels = await createChannels(connection); + if (channels) { + connection.on('error', (err) => { + hLog(err.message); + }); + connection.on('close', () => { + hLog('Connection closed!'); + onClose(); + setTimeout(async () => { + hLog('Retrying in 5 seconds...'); + const _channels = await amqpConnect(onReconnect, config, onClose); + onReconnect(_channels); + return _channels; + }, 5000); + }); + return channels; + } else { + return null; + } + } else { + return null; + } } export async function checkQueueSize(q_name, config) { - try { - const v = encodeURIComponent(config.vhost); - const apiUrl = `${config.protocol}://${config.api}/api/queues/${v}/${encodeURIComponent(q_name)}`; - const opts = { - username: config.user, - password: config.pass - }; - const data = await got.get(apiUrl, opts).json() as any; - return data.messages; - } catch (e) { - hLog(`[WARNING] Checking queue size failed! - ${e.message}`); - if (e.response && e.response.body) { - if (e instanceof HTTPError) { - hLog(e.response.body); - } else { - hLog(JSON.stringify(e.response.body, null, 2)); - } - } - return 0; - } + try { + const v = encodeURIComponent(config.vhost); + const apiUrl = `${config.protocol}://${config.api}/api/queues/${v}/${encodeURIComponent(q_name)}`; + const opts = { + username: config.user, + password: config.pass + }; + const data = await got.get(apiUrl, opts).json() as any; + return data.messages; + } catch (e) { + hLog(`[WARNING] Checking queue size failed! - ${e.message}`); + if (e.response && e.response.body) { + if (e instanceof HTTPError) { + hLog(e.response.body); + } else { + hLog(JSON.stringify(e.response.body, null, 2)); + } + } + return 0; + } } diff --git a/connections/manager.class.ts b/connections/manager.class.ts index a0c3e79f..c892617a 100644 --- a/connections/manager.class.ts +++ b/connections/manager.class.ts @@ -9,6 +9,7 @@ import {StateHistorySocket} from "./state-history"; import fetch from 'cross-fetch'; import {exec} from "child_process"; import {hLog} from "../helpers/common_functions"; +import {ChannelWrapper} from "amqp-connection-manager"; export class ConnectionManager { @@ -131,7 +132,7 @@ export class ConnectionManager { } } - async createAMQPChannels(onReconnect, onClose) { + async createAMQPChannels(onReconnect, onClose): Promise { return await amqpConnect(onReconnect, this.conn.amqp, onClose); } diff --git a/package-lock.json b/package-lock.json index 8fd54b0b..e27dc396 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,6 +19,7 @@ "@fastify/redis": "^5.0.0", "@fastify/swagger": "6.1.0", "@pm2/io": "^5.0.0", + "amqp-connection-manager": "^4.1.14", "amqplib": "^0.10.3", "async": "^3.2.4", "base-x": "^4.0.0", @@ -443,6 +444,21 @@ "url": "https://github.com/sponsors/epoberezkin" } }, + "node_modules/amqp-connection-manager": { + "version": "4.1.14", + "resolved": "https://registry.npmjs.org/amqp-connection-manager/-/amqp-connection-manager-4.1.14.tgz", + "integrity": "sha512-1km47dIvEr0HhMUazqovSvNwIlSvDX2APdUpULaINtHpiki1O+cLRaTeXb/jav4OLtH+k6GBXx5gsKOT9kcGKQ==", + "dependencies": { + "promise-breaker": "^6.0.0" + }, + "engines": { + "node": ">=10.0.0", + "npm": ">5.0.0" + }, + "peerDependencies": { + "amqplib": "*" + } + }, "node_modules/amqplib": { "version": "0.10.3", "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.3.tgz", @@ -2247,6 +2263,11 @@ "resolved": "https://registry.npmjs.org/process-warning/-/process-warning-1.0.0.tgz", "integrity": "sha512-du4wfLyj4yCZq1VupnVSZmRsPJsNuxoDQFdCFHLaYiEbFBD7QE0a+I4D7hOxrVnh78QE/YipFAj9lXHiXocV+Q==" }, + "node_modules/promise-breaker": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/promise-breaker/-/promise-breaker-6.0.0.tgz", + "integrity": "sha512-BthzO9yTPswGf7etOBiHCVuugs2N01/Q/94dIPls48z2zCmrnDptUUZzfIb+41xq0MnYZ/BzmOd6ikDR4ibNZA==" + }, "node_modules/proxy-addr": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", diff --git a/package.json b/package.json index 8fd8cc6d..46d666c4 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "@fastify/redis": "^5.0.0", "@fastify/swagger": "6.1.0", "@pm2/io": "^5.0.0", + "amqp-connection-manager": "^4.1.14", "amqplib": "^0.10.3", "async": "^3.2.4", "base-x": "^4.0.0", diff --git a/tests/dsp-consumer.js b/tests/dsp-consumer.js index 72e87b24..f9530df3 100644 --- a/tests/dsp-consumer.js +++ b/tests/dsp-consumer.js @@ -16,20 +16,21 @@ class DspEventConsumer { console.log('Starting DSP Consumer...'); [this.ch] = await this.manager.createAMQPChannels((channels) => { [this.ch] = channels; - this.onConnect(); + this.onConnect().catch(console.log); }, () => { this.ch_ready = false; }); - this.onConnect(); + this.onConnect().catch(console.log); } - onConnect() { + async onConnect() { if (this.conf.settings.dsp_parser) { const q = `${this.manager.chain}:dsp`; console.log(q); - this.ch.prefetch(100); - this.ch.assertQueue(q, { durable: true }); - this.ch.consume(q, (data) => { + await this.ch.assertQueue(q, { durable: true }); + await this.ch.consume(q, (data) => { this.onMessage(data); + }, { + prefetch: 100 }); } } diff --git a/tests/dsp-consumer.js.map b/tests/dsp-consumer.js.map index a45d7356..f5cd15ea 100644 --- a/tests/dsp-consumer.js.map +++ b/tests/dsp-consumer.js.map @@ -1 +1 @@ -{"version":3,"file":"dsp-consumer.js","sourceRoot":"","sources":["dsp-consumer.ts"],"names":[],"mappings":";;AAAA,8CAAsD;AACtD,gEAA+D;AAI/D,MAAM,gBAAgB;IAUlB;QALQ,aAAQ,GAAY,KAAK,CAAC;QAC1B,cAAS,GAAG,CAAC,CAAC;QACd,WAAM,GAAG,CAAC,CAAC;QACX,iBAAY,GAAuB,IAAI,GAAG,EAAE,CAAC;QAGjD,MAAM,EAAE,GAAG,IAAI,4BAAmB,EAAE,CAAC;QACrC,IAAI,CAAC,IAAI,GAAG,EAAE,CAAC,MAAM,CAAC;QACtB,IAAI,CAAC,OAAO,GAAG,IAAI,iCAAiB,CAAC,EAAE,CAAC,CAAC;IAC7C,CAAC;IAED,KAAK,CAAC,GAAG;QACL,OAAO,CAAC,GAAG,CAAC,0BAA0B,CAAC,CAAC;QACxC,CAAC,IAAI,CAAC,EAAE,CAAC,GAAG,MAAM,IAAI,CAAC,OAAO,CAAC,kBAAkB,CAAC,CAAC,QAAQ,EAAE,EAAE;YAC3D,CAAC,IAAI,CAAC,EAAE,CAAC,GAAG,QAAQ,CAAC;YACrB,IAAI,CAAC,SAAS,EAAE,CAAC;QACrB,CAAC,EAAE,GAAG,EAAE;YACJ,IAAI,CAAC,QAAQ,GAAG,KAAK,CAAC;QAC1B,CAAC,CAAC,CAAC;QACH,IAAI,CAAC,SAAS,EAAE,CAAC;IACrB,CAAC;IAEO,SAAS;QACb,IAAI,IAAI,CAAC,IAAI,CAAC,QAAQ,CAAC,UAAU,EAAE;YAC/B,MAAM,CAAC,GAAG,GAAG,IAAI,CAAC,OAAO,CAAC,KAAK,MAAM,CAAC;YACtC,OAAO,CAAC,GAAG,CAAC,CAAC,CAAC,CAAC;YACf,IAAI,CAAC,EAAE,CAAC,QAAQ,CAAC,GAAG,CAAC,CAAC;YACtB,IAAI,CAAC,EAAE,CAAC,WAAW,CAAC,CAAC,EAAE,EAAC,OAAO,EAAE,IAAI,EAAC,CAAC,CAAC;YACxC,IAAI,CAAC,EAAE,CAAC,OAAO,CAAC,CAAC,EAAE,CAAC,IAAI,EAAE,EAAE;gBACxB,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,CAAC;YACzB,CAAC,CAAC,CAAC;SACN;IACL,CAAC;IAED,aAAa,CAAC,SAAiB;QAC3B,UAAU,CAAC,GAAG,EAAE;YACZ,IAAI,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,SAAS,CAAC,EAAE;gBAClC,MAAM,aAAa,GAAG,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,EAAE,EAAE,CAAC,CAAC,CAAC,eAAe,GAAG,CAAC,CAAC,eAAe,CAAC,CAAC;gBAC7G,KAAK,MAAM,MAAM,IAAI,aAAa,EAAE;oBAChC,OAAO,CAAC,GAAG,CAAC,MAAM,CAAC,YAAY,CAAC,EAAE,MAAM,CAAC,SAAS,EAAE,MAAM,CAAC,eAAe,CAAC,CAAC;iBAC/E;gBACD,IAAI,CAAC,YAAY,CAAC,MAAM,CAAC,SAAS,CAAC,CAAC;aACvC;QACL,CAAC,EAAE,GAAG,CAAC,CAAC;IACZ,CAAC;IAED,SAAS,CAAC,GAAY;QAClB,IAAI;YACA,MAAM,OAAO,GAAQ,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,OAAO,CAAC,QAAQ,EAAE,CAAC,CAAC;YACxD,OAAO,CAAC,GAAG,CAAC,OAAO,CAAC,CAAC;YACrB,IAAI,OAAO,CAAC,SAAS,GAAG,IAAI,CAAC,SAAS,EAAE;gBACpC,IAAI,CAAC,SAAS,GAAG,OAAO,CAAC,SAAS,CAAC;gBACnC,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,OAAO,CAAC,SAAS,EAAE,CAAC,OAAO,CAAC,CAAC,CAAC;aACvD;iBAAM,IAAI,OAAO,CAAC,SAAS,KAAK,IAAI,CAAC,SAAS,EAAE;gBAC7C,IAAI,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAI,CAAC,SAAS,CAAC,EAAE;oBACvC,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;iBACvD;aACJ;iBAAM;gBACH,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,OAAO,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;aAC1D;YACD,IAAI,CAAC,aAAa,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;YACnC,IAAI,CAAC,EAAE,CAAC,GAAG,CAAC,GAAG,CAAC,CAAC;SACpB;QAAC,OAAO,CAAC,EAAE;YACR,IAAI,CAAC,EAAE,CAAC,IAAI,CAAC,GAAG,CAAC,CAAC;SACrB;IACL,CAAC;CAEJ;AAED,IAAI,gBAAgB,EAAE,CAAC,GAAG,EAAE,CAAC,KAAK,CAAC,OAAO,CAAC,KAAK,CAAC,CAAC"} \ No newline at end of file +{"version":3,"file":"dsp-consumer.js","sourceRoot":"","sources":["dsp-consumer.ts"],"names":[],"mappings":";;AAAA,8CAAsD;AACtD,gEAA+D;AAK/D,MAAM,gBAAgB;IAUlB;QALQ,aAAQ,GAAY,KAAK,CAAC;QAC1B,cAAS,GAAG,CAAC,CAAC;QACd,WAAM,GAAG,CAAC,CAAC;QACX,iBAAY,GAAuB,IAAI,GAAG,EAAE,CAAC;QAGjD,MAAM,EAAE,GAAG,IAAI,4BAAmB,EAAE,CAAC;QACrC,IAAI,CAAC,IAAI,GAAG,EAAE,CAAC,MAAM,CAAC;QACtB,IAAI,CAAC,OAAO,GAAG,IAAI,iCAAiB,CAAC,EAAE,CAAC,CAAC;IAC7C,CAAC;IAED,KAAK,CAAC,GAAG;QACL,OAAO,CAAC,GAAG,CAAC,0BAA0B,CAAC,CAAC;QACxC,CAAC,IAAI,CAAC,EAAE,CAAC,GAAG,MAAM,IAAI,CAAC,OAAO,CAAC,kBAAkB,CAAC,CAAC,QAAQ,EAAE,EAAE;YAC3D,CAAC,IAAI,CAAC,EAAE,CAAC,GAAG,QAAQ,CAAC;YACrB,IAAI,CAAC,SAAS,EAAE,CAAC,KAAK,CAAC,OAAO,CAAC,GAAG,CAAC,CAAC;QACxC,CAAC,EAAE,GAAG,EAAE;YACJ,IAAI,CAAC,QAAQ,GAAG,KAAK,CAAC;QAC1B,CAAC,CAAC,CAAC;QACH,IAAI,CAAC,SAAS,EAAE,CAAC,KAAK,CAAC,OAAO,CAAC,GAAG,CAAC,CAAC;IACxC,CAAC;IAEO,KAAK,CAAC,SAAS;QACnB,IAAI,IAAI,CAAC,IAAI,CAAC,QAAQ,CAAC,UAAU,EAAE;YAC/B,MAAM,CAAC,GAAG,GAAG,IAAI,CAAC,OAAO,CAAC,KAAK,MAAM,CAAC;YACtC,OAAO,CAAC,GAAG,CAAC,CAAC,CAAC,CAAC;YACf,MAAM,IAAI,CAAC,EAAE,CAAC,WAAW,CAAC,CAAC,EAAE,EAAC,OAAO,EAAE,IAAI,EAAC,CAAC,CAAC;YAC9C,MAAM,IAAI,CAAC,EAAE,CAAC,OAAO,CAAC,CAAC,EAAE,CAAC,IAAI,EAAE,EAAE;gBAC9B,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,CAAC;YACzB,CAAC,EAAE;gBACC,QAAQ,EAAE,GAAG;aAChB,CAAC,CAAC;SACN;IACL,CAAC;IAED,aAAa,CAAC,SAAiB;QAC3B,UAAU,CAAC,GAAG,EAAE;YACZ,IAAI,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,SAAS,CAAC,EAAE;gBAClC,MAAM,aAAa,GAAG,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,EAAE,EAAE,CAAC,CAAC,CAAC,eAAe,GAAG,CAAC,CAAC,eAAe,CAAC,CAAC;gBAC7G,KAAK,MAAM,MAAM,IAAI,aAAa,EAAE;oBAChC,OAAO,CAAC,GAAG,CAAC,MAAM,CAAC,YAAY,CAAC,EAAE,MAAM,CAAC,SAAS,EAAE,MAAM,CAAC,eAAe,CAAC,CAAC;iBAC/E;gBACD,IAAI,CAAC,YAAY,CAAC,MAAM,CAAC,SAAS,CAAC,CAAC;aACvC;QACL,CAAC,EAAE,GAAG,CAAC,CAAC;IACZ,CAAC;IAED,SAAS,CAAC,GAAY;QAClB,IAAI;YACA,MAAM,OAAO,GAAQ,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,OAAO,CAAC,QAAQ,EAAE,CAAC,CAAC;YACxD,OAAO,CAAC,GAAG,CAAC,OAAO,CAAC,CAAC;YACrB,IAAI,OAAO,CAAC,SAAS,GAAG,IAAI,CAAC,SAAS,EAAE;gBACpC,IAAI,CAAC,SAAS,GAAG,OAAO,CAAC,SAAS,CAAC;gBACnC,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,OAAO,CAAC,SAAS,EAAE,CAAC,OAAO,CAAC,CAAC,CAAC;aACvD;iBAAM,IAAI,OAAO,CAAC,SAAS,KAAK,IAAI,CAAC,SAAS,EAAE;gBAC7C,IAAI,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAI,CAAC,SAAS,CAAC,EAAE;oBACvC,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;iBACvD;aACJ;iBAAM;gBACH,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,OAAO,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;aAC1D;YACD,IAAI,CAAC,aAAa,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;YACnC,IAAI,CAAC,EAAE,CAAC,GAAG,CAAC,GAAG,CAAC,CAAC;SACpB;QAAC,OAAO,CAAC,EAAE;YACR,IAAI,CAAC,EAAE,CAAC,IAAI,CAAC,GAAG,CAAC,CAAC;SACrB;IACL,CAAC;CAEJ;AAED,IAAI,gBAAgB,EAAE,CAAC,GAAG,EAAE,CAAC,KAAK,CAAC,OAAO,CAAC,KAAK,CAAC,CAAC"} \ No newline at end of file diff --git a/tests/dsp-consumer.ts b/tests/dsp-consumer.ts index eec8de6a..237aa121 100644 --- a/tests/dsp-consumer.ts +++ b/tests/dsp-consumer.ts @@ -1,13 +1,14 @@ import {ConfigurationModule} from "../modules/config"; import {ConnectionManager} from "../connections/manager.class"; import {HyperionConfig} from "../interfaces/hyperionConfig"; -import {Channel, Message} from "amqplib/callback_api"; +import {Message} from "amqplib/callback_api"; +import {ChannelWrapper} from "amqp-connection-manager"; class DspEventConsumer { private conf: HyperionConfig; private manager: ConnectionManager; - private ch: Channel; + private ch: ChannelWrapper; private ch_ready: boolean = false; private lastBlock = 0; private lastGS = 0; @@ -23,21 +24,22 @@ class DspEventConsumer { console.log('Starting DSP Consumer...'); [this.ch] = await this.manager.createAMQPChannels((channels) => { [this.ch] = channels; - this.onConnect(); + this.onConnect().catch(console.log); }, () => { this.ch_ready = false; }); - this.onConnect(); + this.onConnect().catch(console.log); } - private onConnect() { + private async onConnect(): Promise { if (this.conf.settings.dsp_parser) { const q = `${this.manager.chain}:dsp`; console.log(q); - this.ch.prefetch(100); - this.ch.assertQueue(q, {durable: true}); - this.ch.consume(q, (data) => { + await this.ch.assertQueue(q, {durable: true}); + await this.ch.consume(q, (data) => { this.onMessage(data); + }, { + prefetch: 100 }); } } diff --git a/workers/delta-updater.ts b/workers/delta-updater.ts index d8d8a018..bc8b0fee 100644 --- a/workers/delta-updater.ts +++ b/workers/delta-updater.ts @@ -29,9 +29,11 @@ export default class MainDSWorker extends HyperionWorker { if (this.ch) { this.queueName = this.chain + ":delta_rm"; hLog(`Launched delta updater, consuming from ${this.queueName}`); - this.ch.assertQueue(this.queueName, {durable: true}); - this.ch.prefetch(1); - this.ch.consume(this.queueName, this.onConsume.bind(this)); + this.ch.assertQueue(this.queueName, {durable: true}).then(() => { + this.ch.consume(this.queueName, this.onConsume.bind(this),{ + prefetch: 1 + }).catch(console.log); + }); } } diff --git a/workers/deserializer.ts b/workers/deserializer.ts index 9803b8e8..0e69ebda 100644 --- a/workers/deserializer.ts +++ b/workers/deserializer.ts @@ -286,10 +286,11 @@ export default class MainDSWorker extends HyperionWorker { private initConsumer() { if (this.ch_ready) { - this.ch.prefetch(this.conf.prefetch.block); this.ch.consume(process.env['worker_queue'], (data) => { this.consumerQueue.push(data).catch(console.log); - }); + }, { + prefetch: this.conf.prefetch.block + }).catch(console.log); } } @@ -625,7 +626,7 @@ export default class MainDSWorker extends HyperionWorker { if (this.ch_ready) { const enqueueResult = this.ch.sendToQueue(pool_queue, bufferFromJson(trace, true), {headers}); if (!enqueueResult) { - hLog("Failed to send trace!"); + hLog("Backpressure"); console.log("Header size: " + JSON.stringify(headers).length); console.log(headers); } diff --git a/workers/ds-pool.ts b/workers/ds-pool.ts index f25dbd19..ab3e10ef 100644 --- a/workers/ds-pool.ts +++ b/workers/ds-pool.ts @@ -625,12 +625,11 @@ export default class DSPoolWorker extends HyperionWorker { initConsumer() { if (this.ch_ready) { - this.ch.prefetch(this.conf.prefetch.block); this.ch.consume(this.local_queue, (data) => { this.consumerQueue.push(data); - }, {}, (err, ok) => { - hLog(err, ok); - }); + }, { + prefetch: this.conf.prefetch.block + }).catch(console.log); debugLog(`started consuming from ${this.local_queue}`); } } @@ -694,13 +693,14 @@ export default class DSPoolWorker extends HyperionWorker { this.ch_ready = true; this.ch.assertQueue(this.local_queue, { durable: true - }); - this.initConsumer(); + }).then(() => { + this.initConsumer(); + }).catch(console.log); } if (this.conf.settings.dsp_parser) { this.ch.assertQueue(`${queue_prefix}:dsp`, { durable: true - }); + }).catch(console.log); } } diff --git a/workers/hyperionWorker.ts b/workers/hyperionWorker.ts index 3f8129f4..046464a6 100644 --- a/workers/hyperionWorker.ts +++ b/workers/hyperionWorker.ts @@ -11,277 +11,280 @@ import {HeapInfo} from "v8"; import {debugLog, hLog} from "../helpers/common_functions"; import {StateHistorySocket} from "../connections/state-history"; import * as AbiEOS from "@eosrio/node-abieos"; +import {ChannelWrapper} from "amqp-connection-manager"; export abstract class HyperionWorker { - conf: HyperionConfig; - manager: ConnectionManager; - mLoader: HyperionModuleLoader; - chain: string; - chainId: string; - - // AMQP Channels - ch: Channel; - cch: ConfirmChannel; - - rpc: JsonRpc; - client: Client; - ship: StateHistorySocket; - - txEnc = new TextEncoder(); - txDec = new TextDecoder(); - cch_ready = false; - ch_ready = false; - - events: EventEmitter; - - filters: Filters; - - failedAbiMap: Map> = new Map(); - - protected constructor() { - this.checkDebugger(); - const cm = new ConfigurationModule(); - this.conf = cm.config; - this.filters = cm.filters; - this.manager = new ConnectionManager(cm); - this.mLoader = new HyperionModuleLoader(cm); - this.chain = this.conf.settings.chain; - this.chainId = this.manager.conn.chains[this.chain].chain_id; - this.rpc = this.manager.nodeosJsonRPC; - this.client = this.manager.elasticsearchClient; - this.ship = this.manager.shipClient; - this.events = new EventEmitter(); - - this.mLoader.init().then(() => { - // Connect to RabbitMQ (amqplib) - this.events.emit('loader_ready'); - this.connectAMQP().then(() => { - this.onConnect(); - }).catch(console.log); - }); - - const bytesToString = (bytes: number) => { - const e = Math.log(bytes) / Math.log(1024) | 0; - const n = (bytes / Math.pow(1024, e)).toFixed(2); - return n + ' ' + (e == 0 ? 'bytes' : (['KB', 'MB', 'GB', 'TB'])[e - 1]); - }; - - - // handle ipc messages - process.on('message', (msg: any) => { - switch (msg.event) { - case 'request_v8_heap_stats': { - const report: HeapInfo = v8.getHeapStatistics(); - const used_pct = report.used_heap_size / report.heap_size_limit; - process.send({ - event: 'v8_heap_report', - id: process.env.worker_role + ':' + process.env.worker_id, - data: { - heap_usage: (used_pct * 100).toFixed(2) + "%", - ...report - } - }); - break; - } - case 'request_memory_usage': { - const report = process.memoryUsage(); - process.send({ - event: 'memory_report', - id: process.env.worker_role + ':' + process.env.worker_id, - data: { - resident: bytesToString(report.rss), - } - }); - break; - } - default: { - this.onIpcMessage(msg); - } - } - }); - } - - async connectAMQP() { - [this.ch, this.cch] = await this.manager.createAMQPChannels((channels) => { - [this.ch, this.cch] = channels; - hLog('AMQP Reconnecting...'); - this.onConnect(); - }, () => { - this.ch_ready = false; - this.cch_ready = false; - }); - } - - onConnect() { - this.ch_ready = true; - this.cch_ready = true; - this.assertQueues(); - this.ch.on('close', () => { - this.ch_ready = false; - }); - this.cch.on('close', () => { - this.cch_ready = false; - }); - this.events.emit('ready'); - } - - checkDebugger() { - if (/--inspect/.test(process.execArgv.join(' '))) { - const inspector = require('inspector'); - hLog('DEBUGGER ATTACHED', inspector.url()); - } - } - - private anyFromCode(act: any) { - return this.chain + '::' + act.account + '::*' - } - - private anyFromName(act: any) { - return this.chain + '::*::' + act.name; - } - - private codeActionPair(act: any) { - return this.chain + '::' + act.account + '::' + act.name; - } - - private anyFromDeltaCode(delta: any) { - return this.chain + '::' + delta.code + '::*' - } - - private anyFromDeltaTable(delta: any) { - return this.chain + '::*::' + delta.table; - } - - private codeDeltaPair(delta: any) { - return this.chain + '::' + delta.code + '::' + delta.table; - } - - protected checkBlacklist(act) { - - // test for chain::code::* - if (this.filters.action_blacklist.has(this.anyFromCode(act))) { - return true; - } - - // test for chain::*::name - if (this.filters.action_blacklist.has(this.anyFromName(act))) { - return true; - } - - // test for chain::code::name - return this.filters.action_blacklist.has(this.codeActionPair(act)); - } - - protected checkWhitelist(act) { - - // test for chain::code::* - if (this.filters.action_whitelist.has(this.anyFromCode(act))) { - return true; - } - - // test for chain::*::name - if (this.filters.action_whitelist.has(this.anyFromName(act))) { - return true; - } - - // test for chain::code::name - return this.filters.action_whitelist.has(this.codeActionPair(act)); - } - - protected checkDeltaBlacklist(delta) { - - // test blacklist for chain::code::* - if (this.filters.delta_blacklist.has(this.anyFromDeltaCode(delta))) { - return true; - } - - // test blacklist for chain::*::table - if (this.filters.delta_blacklist.has(this.anyFromDeltaTable(delta))) { - return true; - } - - // test blacklist for chain::code::table - return this.filters.delta_blacklist.has(this.codeDeltaPair(delta)); - } - - protected checkDeltaWhitelist(delta) { - - // test whitelist for chain::code::* - if (this.filters.delta_whitelist.has(this.anyFromDeltaCode(delta))) { - return true; - } - - // test whitelist for chain::*::table - if (this.filters.delta_whitelist.has(this.anyFromDeltaTable(delta))) { - return true; - } - - // test whitelist for chain::code::table - return this.filters.delta_whitelist.has(this.codeDeltaPair(delta)); - } - - loadAbiHex(contract, block_num, abi_hex) { - // check local blacklist for corrupted abis that failed to load before - let _status; - if (this.failedAbiMap.has(contract) && this.failedAbiMap.get(contract).has(block_num)) { - _status = false; - debugLog('ignore saved abi for', contract, block_num); - } else { - _status = AbiEOS.load_abi_hex(contract, abi_hex); - if (!_status) { - hLog(`AbiEOS.load_abi_hex error for ${contract} at ${block_num}`); - if (this.failedAbiMap.has(contract)) { - this.failedAbiMap.get(contract).add(block_num); - } else { - this.failedAbiMap.set(contract, new Set([block_num])); - } - } else { - this.removeFromFailed(contract); - } - } - return _status; - } - - removeFromFailed(contract) { - if (this.failedAbiMap.has(contract)) { - this.failedAbiMap.delete(contract); - hLog(`${contract} was removed from the failed map!`); - } - } - - async loadCurrentAbiHex(contract) { - let _status; - if (this.failedAbiMap.has(contract) && this.failedAbiMap.get(contract).has(-1)) { - _status = false; - debugLog('ignore current abi for', contract); - } else { - const currentAbi = await this.rpc.getRawAbi(contract); - if (currentAbi.abi.byteLength > 0) { - const abi_hex = Buffer.from(currentAbi.abi).toString('hex'); - _status = AbiEOS.load_abi_hex(contract, abi_hex); - if (!_status) { - hLog(`AbiEOS.load_abi_hex error for ${contract} at head`); - if (this.failedAbiMap.has(contract)) { - this.failedAbiMap.get(contract).add(-1); - } else { - this.failedAbiMap.set(contract, new Set([-1])); - } - } else { - this.removeFromFailed(contract); - } - } else { - _status = false; - } - } - return _status; - } - - abstract run(): Promise - - abstract assertQueues(): void - - abstract onIpcMessage(msg: any): void + conf: HyperionConfig; + manager: ConnectionManager; + mLoader: HyperionModuleLoader; + chain: string; + chainId: string; + + // AMQP Channels + ch: ChannelWrapper; + cch: ChannelWrapper; + + rpc: JsonRpc; + client: Client; + ship: StateHistorySocket; + + txEnc = new TextEncoder(); + txDec = new TextDecoder(); + cch_ready = false; + ch_ready = false; + + events: EventEmitter; + + filters: Filters; + + failedAbiMap: Map> = new Map(); + + protected constructor() { + this.checkDebugger(); + const cm = new ConfigurationModule(); + this.conf = cm.config; + this.filters = cm.filters; + this.manager = new ConnectionManager(cm); + this.mLoader = new HyperionModuleLoader(cm); + this.chain = this.conf.settings.chain; + this.chainId = this.manager.conn.chains[this.chain].chain_id; + this.rpc = this.manager.nodeosJsonRPC; + this.client = this.manager.elasticsearchClient; + this.ship = this.manager.shipClient; + this.events = new EventEmitter(); + + this.mLoader.init().then(() => { + // Connect to RabbitMQ (amqplib) + this.events.emit('loader_ready'); + this.connectAMQP().then(() => { + this.onConnect(); + }).catch(console.log); + }); + + const bytesToString = (bytes: number) => { + const e = Math.log(bytes) / Math.log(1024) | 0; + const n = (bytes / Math.pow(1024, e)).toFixed(2); + return n + ' ' + (e == 0 ? 'bytes' : (['KB', 'MB', 'GB', 'TB'])[e - 1]); + }; + + + // handle ipc messages + process.on('message', (msg: any) => { + switch (msg.event) { + case 'request_v8_heap_stats': { + const report: HeapInfo = v8.getHeapStatistics(); + const used_pct = report.used_heap_size / report.heap_size_limit; + process.send({ + event: 'v8_heap_report', + id: process.env.worker_role + ':' + process.env.worker_id, + data: { + heap_usage: (used_pct * 100).toFixed(2) + "%", + ...report + } + }); + break; + } + case 'request_memory_usage': { + const report = process.memoryUsage(); + process.send({ + event: 'memory_report', + id: process.env.worker_role + ':' + process.env.worker_id, + data: { + resident: bytesToString(report.rss), + } + }); + break; + } + default: { + this.onIpcMessage(msg); + } + } + }); + } + + async connectAMQP() { + [this.ch, this.cch] = await this.manager.createAMQPChannels((channels: ChannelWrapper[]) => { + if (channels) { + [this.ch, this.cch] = channels; + hLog('AMQP Reconnecting...'); + this.onConnect(); + } + }, () => { + this.ch_ready = false; + this.cch_ready = false; + }); + } + + onConnect() { + this.ch_ready = true; + this.cch_ready = true; + this.assertQueues(); + this.ch.on('close', () => { + this.ch_ready = false; + }); + this.cch.on('close', () => { + this.cch_ready = false; + }); + this.events.emit('ready'); + } + + checkDebugger() { + if (/--inspect/.test(process.execArgv.join(' '))) { + const inspector = require('inspector'); + hLog('DEBUGGER ATTACHED', inspector.url()); + } + } + + private anyFromCode(act: any) { + return this.chain + '::' + act.account + '::*' + } + + private anyFromName(act: any) { + return this.chain + '::*::' + act.name; + } + + private codeActionPair(act: any) { + return this.chain + '::' + act.account + '::' + act.name; + } + + private anyFromDeltaCode(delta: any) { + return this.chain + '::' + delta.code + '::*' + } + + private anyFromDeltaTable(delta: any) { + return this.chain + '::*::' + delta.table; + } + + private codeDeltaPair(delta: any) { + return this.chain + '::' + delta.code + '::' + delta.table; + } + + protected checkBlacklist(act) { + + // test for chain::code::* + if (this.filters.action_blacklist.has(this.anyFromCode(act))) { + return true; + } + + // test for chain::*::name + if (this.filters.action_blacklist.has(this.anyFromName(act))) { + return true; + } + + // test for chain::code::name + return this.filters.action_blacklist.has(this.codeActionPair(act)); + } + + protected checkWhitelist(act) { + + // test for chain::code::* + if (this.filters.action_whitelist.has(this.anyFromCode(act))) { + return true; + } + + // test for chain::*::name + if (this.filters.action_whitelist.has(this.anyFromName(act))) { + return true; + } + + // test for chain::code::name + return this.filters.action_whitelist.has(this.codeActionPair(act)); + } + + protected checkDeltaBlacklist(delta) { + + // test blacklist for chain::code::* + if (this.filters.delta_blacklist.has(this.anyFromDeltaCode(delta))) { + return true; + } + + // test blacklist for chain::*::table + if (this.filters.delta_blacklist.has(this.anyFromDeltaTable(delta))) { + return true; + } + + // test blacklist for chain::code::table + return this.filters.delta_blacklist.has(this.codeDeltaPair(delta)); + } + + protected checkDeltaWhitelist(delta) { + + // test whitelist for chain::code::* + if (this.filters.delta_whitelist.has(this.anyFromDeltaCode(delta))) { + return true; + } + + // test whitelist for chain::*::table + if (this.filters.delta_whitelist.has(this.anyFromDeltaTable(delta))) { + return true; + } + + // test whitelist for chain::code::table + return this.filters.delta_whitelist.has(this.codeDeltaPair(delta)); + } + + loadAbiHex(contract, block_num, abi_hex) { + // check local blacklist for corrupted abis that failed to load before + let _status; + if (this.failedAbiMap.has(contract) && this.failedAbiMap.get(contract).has(block_num)) { + _status = false; + debugLog('ignore saved abi for', contract, block_num); + } else { + _status = AbiEOS.load_abi_hex(contract, abi_hex); + if (!_status) { + hLog(`AbiEOS.load_abi_hex error for ${contract} at ${block_num}`); + if (this.failedAbiMap.has(contract)) { + this.failedAbiMap.get(contract).add(block_num); + } else { + this.failedAbiMap.set(contract, new Set([block_num])); + } + } else { + this.removeFromFailed(contract); + } + } + return _status; + } + + removeFromFailed(contract) { + if (this.failedAbiMap.has(contract)) { + this.failedAbiMap.delete(contract); + hLog(`${contract} was removed from the failed map!`); + } + } + + async loadCurrentAbiHex(contract) { + let _status; + if (this.failedAbiMap.has(contract) && this.failedAbiMap.get(contract).has(-1)) { + _status = false; + debugLog('ignore current abi for', contract); + } else { + const currentAbi = await this.rpc.getRawAbi(contract); + if (currentAbi.abi.byteLength > 0) { + const abi_hex = Buffer.from(currentAbi.abi).toString('hex'); + _status = AbiEOS.load_abi_hex(contract, abi_hex); + if (!_status) { + hLog(`AbiEOS.load_abi_hex error for ${contract} at head`); + if (this.failedAbiMap.has(contract)) { + this.failedAbiMap.get(contract).add(-1); + } else { + this.failedAbiMap.set(contract, new Set([-1])); + } + } else { + this.removeFromFailed(contract); + } + } else { + _status = false; + } + } + return _status; + } + + abstract run(): Promise + + abstract assertQueues(): void + + abstract onIpcMessage(msg: any): void } diff --git a/workers/indexer.ts b/workers/indexer.ts index 9ab55615..acf71a43 100644 --- a/workers/indexer.ts +++ b/workers/indexer.ts @@ -48,9 +48,11 @@ export default class IndexerWorker extends HyperionWorker { this.indexQueue.pause(); this.ch_ready = false; }); - this.ch.assertQueue(process.env.queue, {durable: true}); - this.ch.prefetch(this.conf.prefetch.index); - this.ch.consume(process.env.queue, this.indexQueue.push); + this.ch.assertQueue(process.env.queue, {durable: true}).then(r => { + this.ch.consume(process.env.queue, this.indexQueue.push, { + prefetch: this.conf.prefetch.index + }).catch(console.log); + }); } } catch (e) { console.error('rabbitmq error!'); From 4dcb607fcc4ea655623a0848f6080b70d9ebd673 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Mon, 25 Sep 2023 00:45:23 -0300 Subject: [PATCH 09/15] include amqp-connection-manager --- workers/state-reader.ts | 53 +++++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/workers/state-reader.ts b/workers/state-reader.ts index 38ea93d7..adb25086 100644 --- a/workers/state-reader.ts +++ b/workers/state-reader.ts @@ -297,23 +297,15 @@ export default class StateReader extends HyperionWorker { this.completionSignaled = true; debugLog(`Reader completion signal - ${this.range_size} - ${this.local_distributed_count}`); this.local_distributed_count = 0; + this.completionMonitoring = setInterval(() => { + + let pending = 0; - const unconfirmed = this.cch['unconfirmed']; - if (unconfirmed.length > 0) { - unconfirmed.forEach((elem) => { - if (elem) { - pending++; - } - }); - if (!(pending === this.lastPendingCount && pending > 0)) { - this.lastPendingCount = pending; - } - } - if (pending === 0) { + + if (this.cch.queueLength() === 0) { debugLog(`Reader completed - ${this.range_size} - ${this.local_distributed_count}`); clearInterval(this.completionMonitoring); - // check if there are any pending ranges, then signal completion to master if (this.internalPendingRanges.length === 0) { process.send({ @@ -325,7 +317,42 @@ export default class StateReader extends HyperionWorker { const next = this.internalPendingRanges.shift(); this.newRange(next); } + } else { + pending = this.cch.queueLength(); + if (!(pending === this.lastPendingCount && pending > 0)) { + this.lastPendingCount = pending; + } } + + // const unconfirmed = this.cch['unconfirmed']; + // if (unconfirmed.length > 0) { + // unconfirmed.forEach((elem) => { + // if (elem) { + // pending++; + // } + // }); + // if (!(pending === this.lastPendingCount && pending > 0)) { + // this.lastPendingCount = pending; + // } + // } + // + // if (pending === 0) { + // debugLog(`Reader completed - ${this.range_size} - ${this.local_distributed_count}`); + // clearInterval(this.completionMonitoring); + // + // // check if there are any pending ranges, then signal completion to master + // if (this.internalPendingRanges.length === 0) { + // process.send({ + // event: 'completed', + // id: process.env['worker_id'] + // }); + // } else { + // // process next range in queue + // const next = this.internalPendingRanges.shift(); + // this.newRange(next); + // } + // } + }, 200); } } From d1eca214aee0dfcb65cfc9d0dfaf9c31462f1863 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Mon, 25 Sep 2023 00:54:41 -0300 Subject: [PATCH 10/15] log cleanup --- workers/deserializer.ts | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/workers/deserializer.ts b/workers/deserializer.ts index 0e69ebda..812de5d3 100644 --- a/workers/deserializer.ts +++ b/workers/deserializer.ts @@ -624,12 +624,7 @@ export default class MainDSWorker extends HyperionWorker { const pool_queue = `${this.chain}:ds_pool:${selected_q}`; if (this.ch_ready) { - const enqueueResult = this.ch.sendToQueue(pool_queue, bufferFromJson(trace, true), {headers}); - if (!enqueueResult) { - hLog("Backpressure"); - console.log("Header size: " + JSON.stringify(headers).length); - console.log(headers); - } + this.ch.sendToQueue(pool_queue, bufferFromJson(trace, true), {headers}).catch(console.log); return true; } else { return false; @@ -1165,7 +1160,7 @@ export default class MainDSWorker extends HyperionWorker { let jsonRow = await this.processContractRowNative(payload, block_num); if (jsonRow?.value && !jsonRow['_blacklisted']) { - console.log(jsonRow); + debugLog(jsonRow); debugLog('Delta DS failed ->>', jsonRow); jsonRow = await this.processContractRowNative(payload, block_num - 1); debugLog('Retry with previous ABI ->>', jsonRow); @@ -1500,7 +1495,7 @@ export default class MainDSWorker extends HyperionWorker { } catch (e) { hLog(`Delta struct [${key}] processing error: ${e.message}`); hLog(e); - console.log(data[1]); + hLog(data[1]); } } } From 5d3b248bcd338986b7c0cae6a7423e79e339d3a7 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Mon, 25 Sep 2023 01:03:04 -0300 Subject: [PATCH 11/15] Revert "include amqp-connection-manager" This reverts commit 4dcb607fcc4ea655623a0848f6080b70d9ebd673. --- workers/state-reader.ts | 53 ++++++++++------------------------------- 1 file changed, 13 insertions(+), 40 deletions(-) diff --git a/workers/state-reader.ts b/workers/state-reader.ts index adb25086..38ea93d7 100644 --- a/workers/state-reader.ts +++ b/workers/state-reader.ts @@ -297,15 +297,23 @@ export default class StateReader extends HyperionWorker { this.completionSignaled = true; debugLog(`Reader completion signal - ${this.range_size} - ${this.local_distributed_count}`); this.local_distributed_count = 0; - this.completionMonitoring = setInterval(() => { - - let pending = 0; - - if (this.cch.queueLength() === 0) { + const unconfirmed = this.cch['unconfirmed']; + if (unconfirmed.length > 0) { + unconfirmed.forEach((elem) => { + if (elem) { + pending++; + } + }); + if (!(pending === this.lastPendingCount && pending > 0)) { + this.lastPendingCount = pending; + } + } + if (pending === 0) { debugLog(`Reader completed - ${this.range_size} - ${this.local_distributed_count}`); clearInterval(this.completionMonitoring); + // check if there are any pending ranges, then signal completion to master if (this.internalPendingRanges.length === 0) { process.send({ @@ -317,42 +325,7 @@ export default class StateReader extends HyperionWorker { const next = this.internalPendingRanges.shift(); this.newRange(next); } - } else { - pending = this.cch.queueLength(); - if (!(pending === this.lastPendingCount && pending > 0)) { - this.lastPendingCount = pending; - } } - - // const unconfirmed = this.cch['unconfirmed']; - // if (unconfirmed.length > 0) { - // unconfirmed.forEach((elem) => { - // if (elem) { - // pending++; - // } - // }); - // if (!(pending === this.lastPendingCount && pending > 0)) { - // this.lastPendingCount = pending; - // } - // } - // - // if (pending === 0) { - // debugLog(`Reader completed - ${this.range_size} - ${this.local_distributed_count}`); - // clearInterval(this.completionMonitoring); - // - // // check if there are any pending ranges, then signal completion to master - // if (this.internalPendingRanges.length === 0) { - // process.send({ - // event: 'completed', - // id: process.env['worker_id'] - // }); - // } else { - // // process next range in queue - // const next = this.internalPendingRanges.shift(); - // this.newRange(next); - // } - // } - }, 200); } } From e804aff53c566f6ad60f9d5222ce92c22f23c11f Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Mon, 25 Sep 2023 01:03:35 -0300 Subject: [PATCH 12/15] log cleanup --- connections/amqp.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/connections/amqp.ts b/connections/amqp.ts index e9ba1d9f..e14e2bf1 100644 --- a/connections/amqp.ts +++ b/connections/amqp.ts @@ -26,7 +26,6 @@ export function getAmpqUrl(config): string { const u = encodeURIComponent(config.user); const p = encodeURIComponent(config.pass); const v = encodeURIComponent(config.vhost); - console.log(`max frame: ${frameMaxValue}`); return `amqp://${u}:${p}@${config.host}/${v}?frameMax=${frameMaxValue}`; } From e5d0126d0875267158c49afb99caa9f3786d3aa9 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Mon, 25 Sep 2023 01:05:33 -0300 Subject: [PATCH 13/15] Revert "include amqp-connection-manager" This reverts commit 0579da66 --- connections/amqp.ts | 144 +++++----- connections/manager.class.ts | 3 +- ecosystem.config.js | 7 + package-lock.json | 21 -- package.json | 1 - tests/dsp-consumer.js | 13 +- tests/dsp-consumer.js.map | 2 +- tests/dsp-consumer.ts | 18 +- workers/delta-updater.ts | 8 +- workers/deserializer.ts | 14 +- workers/ds-pool.ts | 14 +- workers/hyperionWorker.ts | 539 +++++++++++++++++------------------ workers/indexer.ts | 8 +- 13 files changed, 381 insertions(+), 411 deletions(-) diff --git a/connections/amqp.ts b/connections/amqp.ts index e14e2bf1..582b46ba 100644 --- a/connections/amqp.ts +++ b/connections/amqp.ts @@ -1,21 +1,19 @@ import {debugLog, hLog} from "../helpers/common_functions"; import got, {HTTPError} from "got"; -import amqp, {ChannelWrapper} from "amqp-connection-manager"; -import {IAmqpConnectionManager} from "amqp-connection-manager/dist/types/AmqpConnectionManager"; +import {connect, Connection} from 'amqplib'; -export async function createConnection(config): Promise { - try { - const amqp_url = getAmpqUrl(config); - // const conn: Connection = await connect(amqp_url); - const conn: IAmqpConnectionManager = amqp.connect(amqp_url); - debugLog("[AMQP] connection established"); - return conn; - } catch (e) { - hLog("[AMQP] failed to connect!"); - hLog(e.message); - await new Promise(resolve => setTimeout(resolve, 5000)); - return await createConnection(config); - } +export async function createConnection(config): Promise { + try { + const amqp_url = getAmpqUrl(config); + const conn: Connection = await connect(amqp_url); + debugLog("[AMQP] connection established"); + return conn; + } catch (e) { + hLog("[AMQP] failed to connect!"); + hLog(e.message); + await new Promise(resolve => setTimeout(resolve, 5000)); + return await createConnection(config); + } } export function getAmpqUrl(config): string { @@ -29,70 +27,64 @@ export function getAmpqUrl(config): string { return `amqp://${u}:${p}@${config.host}/${v}?frameMax=${frameMaxValue}`; } -async function createChannels(connection: IAmqpConnectionManager) { - try { - // const channel = await connection.createChannel(); - const channel = connection.createChannel({ - confirm: false - }); - // const confirmChannel = await connection.createConfirmChannel(); - const confirmChannel = connection.createChannel({ - confirm: true - }) - return [channel, confirmChannel]; - } catch (e) { - hLog("[AMQP] failed to create channels"); - hLog(e); - return null; - } +async function createChannels(connection) { + try { + const channel = await connection.createChannel(); + const confirmChannel = await connection.createConfirmChannel(); + return [channel, confirmChannel]; + } catch (e) { + hLog("[AMQP] failed to create channels"); + hLog(e); + return null; + } } -export async function amqpConnect(onReconnect, config, onClose): Promise { - let connection = await createConnection(config); - if (connection) { - const channels = await createChannels(connection); - if (channels) { - connection.on('error', (err) => { - hLog(err.message); - }); - connection.on('close', () => { - hLog('Connection closed!'); - onClose(); - setTimeout(async () => { - hLog('Retrying in 5 seconds...'); - const _channels = await amqpConnect(onReconnect, config, onClose); - onReconnect(_channels); - return _channels; - }, 5000); - }); - return channels; - } else { - return null; - } - } else { - return null; - } +export async function amqpConnect(onReconnect, config, onClose) { + let connection = await createConnection(config); + if (connection) { + const channels = await createChannels(connection); + if (channels) { + connection.on('error', (err) => { + hLog(err.message); + }); + connection.on('close', () => { + hLog('Connection closed!'); + onClose(); + setTimeout(async () => { + hLog('Retrying in 5 seconds...'); + const _channels = await amqpConnect(onReconnect, config, onClose); + onReconnect(_channels); + return _channels; + }, 5000); + }); + return channels; + } else { + return null; + } + } else { + return null; + } } export async function checkQueueSize(q_name, config) { - try { - const v = encodeURIComponent(config.vhost); - const apiUrl = `${config.protocol}://${config.api}/api/queues/${v}/${encodeURIComponent(q_name)}`; - const opts = { - username: config.user, - password: config.pass - }; - const data = await got.get(apiUrl, opts).json() as any; - return data.messages; - } catch (e) { - hLog(`[WARNING] Checking queue size failed! - ${e.message}`); - if (e.response && e.response.body) { - if (e instanceof HTTPError) { - hLog(e.response.body); - } else { - hLog(JSON.stringify(e.response.body, null, 2)); - } - } - return 0; - } + try { + const v = encodeURIComponent(config.vhost); + const apiUrl = `${config.protocol}://${config.api}/api/queues/${v}/${encodeURIComponent(q_name)}`; + const opts = { + username: config.user, + password: config.pass + }; + const data = await got.get(apiUrl, opts).json() as any; + return data.messages; + } catch (e) { + hLog(`[WARNING] Checking queue size failed! - ${e.message}`); + if (e.response && e.response.body) { + if (e instanceof HTTPError) { + hLog(e.response.body); + } else { + hLog(JSON.stringify(e.response.body, null, 2)); + } + } + return 0; + } } diff --git a/connections/manager.class.ts b/connections/manager.class.ts index c892617a..a0c3e79f 100644 --- a/connections/manager.class.ts +++ b/connections/manager.class.ts @@ -9,7 +9,6 @@ import {StateHistorySocket} from "./state-history"; import fetch from 'cross-fetch'; import {exec} from "child_process"; import {hLog} from "../helpers/common_functions"; -import {ChannelWrapper} from "amqp-connection-manager"; export class ConnectionManager { @@ -132,7 +131,7 @@ export class ConnectionManager { } } - async createAMQPChannels(onReconnect, onClose): Promise { + async createAMQPChannels(onReconnect, onClose) { return await amqpConnect(onReconnect, this.conn.amqp, onClose); } diff --git a/ecosystem.config.js b/ecosystem.config.js index 40f050c6..5478a257 100644 --- a/ecosystem.config.js +++ b/ecosystem.config.js @@ -20,4 +20,11 @@ readdirSync(chainsRoot) } }); +apps.push({ + name: 'hyperion-governor', + namespace: 'hyperion', + script: 'governor/server/index.js', + watch: false, +}); + module.exports = {apps}; diff --git a/package-lock.json b/package-lock.json index e27dc396..8fd54b0b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,7 +19,6 @@ "@fastify/redis": "^5.0.0", "@fastify/swagger": "6.1.0", "@pm2/io": "^5.0.0", - "amqp-connection-manager": "^4.1.14", "amqplib": "^0.10.3", "async": "^3.2.4", "base-x": "^4.0.0", @@ -444,21 +443,6 @@ "url": "https://github.com/sponsors/epoberezkin" } }, - "node_modules/amqp-connection-manager": { - "version": "4.1.14", - "resolved": "https://registry.npmjs.org/amqp-connection-manager/-/amqp-connection-manager-4.1.14.tgz", - "integrity": "sha512-1km47dIvEr0HhMUazqovSvNwIlSvDX2APdUpULaINtHpiki1O+cLRaTeXb/jav4OLtH+k6GBXx5gsKOT9kcGKQ==", - "dependencies": { - "promise-breaker": "^6.0.0" - }, - "engines": { - "node": ">=10.0.0", - "npm": ">5.0.0" - }, - "peerDependencies": { - "amqplib": "*" - } - }, "node_modules/amqplib": { "version": "0.10.3", "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.3.tgz", @@ -2263,11 +2247,6 @@ "resolved": "https://registry.npmjs.org/process-warning/-/process-warning-1.0.0.tgz", "integrity": "sha512-du4wfLyj4yCZq1VupnVSZmRsPJsNuxoDQFdCFHLaYiEbFBD7QE0a+I4D7hOxrVnh78QE/YipFAj9lXHiXocV+Q==" }, - "node_modules/promise-breaker": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/promise-breaker/-/promise-breaker-6.0.0.tgz", - "integrity": "sha512-BthzO9yTPswGf7etOBiHCVuugs2N01/Q/94dIPls48z2zCmrnDptUUZzfIb+41xq0MnYZ/BzmOd6ikDR4ibNZA==" - }, "node_modules/proxy-addr": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", diff --git a/package.json b/package.json index 46d666c4..8fd8cc6d 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,6 @@ "@fastify/redis": "^5.0.0", "@fastify/swagger": "6.1.0", "@pm2/io": "^5.0.0", - "amqp-connection-manager": "^4.1.14", "amqplib": "^0.10.3", "async": "^3.2.4", "base-x": "^4.0.0", diff --git a/tests/dsp-consumer.js b/tests/dsp-consumer.js index f9530df3..72e87b24 100644 --- a/tests/dsp-consumer.js +++ b/tests/dsp-consumer.js @@ -16,21 +16,20 @@ class DspEventConsumer { console.log('Starting DSP Consumer...'); [this.ch] = await this.manager.createAMQPChannels((channels) => { [this.ch] = channels; - this.onConnect().catch(console.log); + this.onConnect(); }, () => { this.ch_ready = false; }); - this.onConnect().catch(console.log); + this.onConnect(); } - async onConnect() { + onConnect() { if (this.conf.settings.dsp_parser) { const q = `${this.manager.chain}:dsp`; console.log(q); - await this.ch.assertQueue(q, { durable: true }); - await this.ch.consume(q, (data) => { + this.ch.prefetch(100); + this.ch.assertQueue(q, { durable: true }); + this.ch.consume(q, (data) => { this.onMessage(data); - }, { - prefetch: 100 }); } } diff --git a/tests/dsp-consumer.js.map b/tests/dsp-consumer.js.map index f5cd15ea..a45d7356 100644 --- a/tests/dsp-consumer.js.map +++ b/tests/dsp-consumer.js.map @@ -1 +1 @@ -{"version":3,"file":"dsp-consumer.js","sourceRoot":"","sources":["dsp-consumer.ts"],"names":[],"mappings":";;AAAA,8CAAsD;AACtD,gEAA+D;AAK/D,MAAM,gBAAgB;IAUlB;QALQ,aAAQ,GAAY,KAAK,CAAC;QAC1B,cAAS,GAAG,CAAC,CAAC;QACd,WAAM,GAAG,CAAC,CAAC;QACX,iBAAY,GAAuB,IAAI,GAAG,EAAE,CAAC;QAGjD,MAAM,EAAE,GAAG,IAAI,4BAAmB,EAAE,CAAC;QACrC,IAAI,CAAC,IAAI,GAAG,EAAE,CAAC,MAAM,CAAC;QACtB,IAAI,CAAC,OAAO,GAAG,IAAI,iCAAiB,CAAC,EAAE,CAAC,CAAC;IAC7C,CAAC;IAED,KAAK,CAAC,GAAG;QACL,OAAO,CAAC,GAAG,CAAC,0BAA0B,CAAC,CAAC;QACxC,CAAC,IAAI,CAAC,EAAE,CAAC,GAAG,MAAM,IAAI,CAAC,OAAO,CAAC,kBAAkB,CAAC,CAAC,QAAQ,EAAE,EAAE;YAC3D,CAAC,IAAI,CAAC,EAAE,CAAC,GAAG,QAAQ,CAAC;YACrB,IAAI,CAAC,SAAS,EAAE,CAAC,KAAK,CAAC,OAAO,CAAC,GAAG,CAAC,CAAC;QACxC,CAAC,EAAE,GAAG,EAAE;YACJ,IAAI,CAAC,QAAQ,GAAG,KAAK,CAAC;QAC1B,CAAC,CAAC,CAAC;QACH,IAAI,CAAC,SAAS,EAAE,CAAC,KAAK,CAAC,OAAO,CAAC,GAAG,CAAC,CAAC;IACxC,CAAC;IAEO,KAAK,CAAC,SAAS;QACnB,IAAI,IAAI,CAAC,IAAI,CAAC,QAAQ,CAAC,UAAU,EAAE;YAC/B,MAAM,CAAC,GAAG,GAAG,IAAI,CAAC,OAAO,CAAC,KAAK,MAAM,CAAC;YACtC,OAAO,CAAC,GAAG,CAAC,CAAC,CAAC,CAAC;YACf,MAAM,IAAI,CAAC,EAAE,CAAC,WAAW,CAAC,CAAC,EAAE,EAAC,OAAO,EAAE,IAAI,EAAC,CAAC,CAAC;YAC9C,MAAM,IAAI,CAAC,EAAE,CAAC,OAAO,CAAC,CAAC,EAAE,CAAC,IAAI,EAAE,EAAE;gBAC9B,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,CAAC;YACzB,CAAC,EAAE;gBACC,QAAQ,EAAE,GAAG;aAChB,CAAC,CAAC;SACN;IACL,CAAC;IAED,aAAa,CAAC,SAAiB;QAC3B,UAAU,CAAC,GAAG,EAAE;YACZ,IAAI,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,SAAS,CAAC,EAAE;gBAClC,MAAM,aAAa,GAAG,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,EAAE,EAAE,CAAC,CAAC,CAAC,eAAe,GAAG,CAAC,CAAC,eAAe,CAAC,CAAC;gBAC7G,KAAK,MAAM,MAAM,IAAI,aAAa,EAAE;oBAChC,OAAO,CAAC,GAAG,CAAC,MAAM,CAAC,YAAY,CAAC,EAAE,MAAM,CAAC,SAAS,EAAE,MAAM,CAAC,eAAe,CAAC,CAAC;iBAC/E;gBACD,IAAI,CAAC,YAAY,CAAC,MAAM,CAAC,SAAS,CAAC,CAAC;aACvC;QACL,CAAC,EAAE,GAAG,CAAC,CAAC;IACZ,CAAC;IAED,SAAS,CAAC,GAAY;QAClB,IAAI;YACA,MAAM,OAAO,GAAQ,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,OAAO,CAAC,QAAQ,EAAE,CAAC,CAAC;YACxD,OAAO,CAAC,GAAG,CAAC,OAAO,CAAC,CAAC;YACrB,IAAI,OAAO,CAAC,SAAS,GAAG,IAAI,CAAC,SAAS,EAAE;gBACpC,IAAI,CAAC,SAAS,GAAG,OAAO,CAAC,SAAS,CAAC;gBACnC,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,OAAO,CAAC,SAAS,EAAE,CAAC,OAAO,CAAC,CAAC,CAAC;aACvD;iBAAM,IAAI,OAAO,CAAC,SAAS,KAAK,IAAI,CAAC,SAAS,EAAE;gBAC7C,IAAI,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAI,CAAC,SAAS,CAAC,EAAE;oBACvC,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;iBACvD;aACJ;iBAAM;gBACH,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,OAAO,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;aAC1D;YACD,IAAI,CAAC,aAAa,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;YACnC,IAAI,CAAC,EAAE,CAAC,GAAG,CAAC,GAAG,CAAC,CAAC;SACpB;QAAC,OAAO,CAAC,EAAE;YACR,IAAI,CAAC,EAAE,CAAC,IAAI,CAAC,GAAG,CAAC,CAAC;SACrB;IACL,CAAC;CAEJ;AAED,IAAI,gBAAgB,EAAE,CAAC,GAAG,EAAE,CAAC,KAAK,CAAC,OAAO,CAAC,KAAK,CAAC,CAAC"} \ No newline at end of file +{"version":3,"file":"dsp-consumer.js","sourceRoot":"","sources":["dsp-consumer.ts"],"names":[],"mappings":";;AAAA,8CAAsD;AACtD,gEAA+D;AAI/D,MAAM,gBAAgB;IAUlB;QALQ,aAAQ,GAAY,KAAK,CAAC;QAC1B,cAAS,GAAG,CAAC,CAAC;QACd,WAAM,GAAG,CAAC,CAAC;QACX,iBAAY,GAAuB,IAAI,GAAG,EAAE,CAAC;QAGjD,MAAM,EAAE,GAAG,IAAI,4BAAmB,EAAE,CAAC;QACrC,IAAI,CAAC,IAAI,GAAG,EAAE,CAAC,MAAM,CAAC;QACtB,IAAI,CAAC,OAAO,GAAG,IAAI,iCAAiB,CAAC,EAAE,CAAC,CAAC;IAC7C,CAAC;IAED,KAAK,CAAC,GAAG;QACL,OAAO,CAAC,GAAG,CAAC,0BAA0B,CAAC,CAAC;QACxC,CAAC,IAAI,CAAC,EAAE,CAAC,GAAG,MAAM,IAAI,CAAC,OAAO,CAAC,kBAAkB,CAAC,CAAC,QAAQ,EAAE,EAAE;YAC3D,CAAC,IAAI,CAAC,EAAE,CAAC,GAAG,QAAQ,CAAC;YACrB,IAAI,CAAC,SAAS,EAAE,CAAC;QACrB,CAAC,EAAE,GAAG,EAAE;YACJ,IAAI,CAAC,QAAQ,GAAG,KAAK,CAAC;QAC1B,CAAC,CAAC,CAAC;QACH,IAAI,CAAC,SAAS,EAAE,CAAC;IACrB,CAAC;IAEO,SAAS;QACb,IAAI,IAAI,CAAC,IAAI,CAAC,QAAQ,CAAC,UAAU,EAAE;YAC/B,MAAM,CAAC,GAAG,GAAG,IAAI,CAAC,OAAO,CAAC,KAAK,MAAM,CAAC;YACtC,OAAO,CAAC,GAAG,CAAC,CAAC,CAAC,CAAC;YACf,IAAI,CAAC,EAAE,CAAC,QAAQ,CAAC,GAAG,CAAC,CAAC;YACtB,IAAI,CAAC,EAAE,CAAC,WAAW,CAAC,CAAC,EAAE,EAAC,OAAO,EAAE,IAAI,EAAC,CAAC,CAAC;YACxC,IAAI,CAAC,EAAE,CAAC,OAAO,CAAC,CAAC,EAAE,CAAC,IAAI,EAAE,EAAE;gBACxB,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,CAAC;YACzB,CAAC,CAAC,CAAC;SACN;IACL,CAAC;IAED,aAAa,CAAC,SAAiB;QAC3B,UAAU,CAAC,GAAG,EAAE;YACZ,IAAI,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,SAAS,CAAC,EAAE;gBAClC,MAAM,aAAa,GAAG,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,CAAC,CAAC,EAAE,CAAC,EAAE,EAAE,CAAC,CAAC,CAAC,eAAe,GAAG,CAAC,CAAC,eAAe,CAAC,CAAC;gBAC7G,KAAK,MAAM,MAAM,IAAI,aAAa,EAAE;oBAChC,OAAO,CAAC,GAAG,CAAC,MAAM,CAAC,YAAY,CAAC,EAAE,MAAM,CAAC,SAAS,EAAE,MAAM,CAAC,eAAe,CAAC,CAAC;iBAC/E;gBACD,IAAI,CAAC,YAAY,CAAC,MAAM,CAAC,SAAS,CAAC,CAAC;aACvC;QACL,CAAC,EAAE,GAAG,CAAC,CAAC;IACZ,CAAC;IAED,SAAS,CAAC,GAAY;QAClB,IAAI;YACA,MAAM,OAAO,GAAQ,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,OAAO,CAAC,QAAQ,EAAE,CAAC,CAAC;YACxD,OAAO,CAAC,GAAG,CAAC,OAAO,CAAC,CAAC;YACrB,IAAI,OAAO,CAAC,SAAS,GAAG,IAAI,CAAC,SAAS,EAAE;gBACpC,IAAI,CAAC,SAAS,GAAG,OAAO,CAAC,SAAS,CAAC;gBACnC,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,OAAO,CAAC,SAAS,EAAE,CAAC,OAAO,CAAC,CAAC,CAAC;aACvD;iBAAM,IAAI,OAAO,CAAC,SAAS,KAAK,IAAI,CAAC,SAAS,EAAE;gBAC7C,IAAI,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAI,CAAC,SAAS,CAAC,EAAE;oBACvC,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;iBACvD;aACJ;iBAAM;gBACH,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,OAAO,CAAC,SAAS,CAAC,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;aAC1D;YACD,IAAI,CAAC,aAAa,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;YACnC,IAAI,CAAC,EAAE,CAAC,GAAG,CAAC,GAAG,CAAC,CAAC;SACpB;QAAC,OAAO,CAAC,EAAE;YACR,IAAI,CAAC,EAAE,CAAC,IAAI,CAAC,GAAG,CAAC,CAAC;SACrB;IACL,CAAC;CAEJ;AAED,IAAI,gBAAgB,EAAE,CAAC,GAAG,EAAE,CAAC,KAAK,CAAC,OAAO,CAAC,KAAK,CAAC,CAAC"} \ No newline at end of file diff --git a/tests/dsp-consumer.ts b/tests/dsp-consumer.ts index 237aa121..eec8de6a 100644 --- a/tests/dsp-consumer.ts +++ b/tests/dsp-consumer.ts @@ -1,14 +1,13 @@ import {ConfigurationModule} from "../modules/config"; import {ConnectionManager} from "../connections/manager.class"; import {HyperionConfig} from "../interfaces/hyperionConfig"; -import {Message} from "amqplib/callback_api"; -import {ChannelWrapper} from "amqp-connection-manager"; +import {Channel, Message} from "amqplib/callback_api"; class DspEventConsumer { private conf: HyperionConfig; private manager: ConnectionManager; - private ch: ChannelWrapper; + private ch: Channel; private ch_ready: boolean = false; private lastBlock = 0; private lastGS = 0; @@ -24,22 +23,21 @@ class DspEventConsumer { console.log('Starting DSP Consumer...'); [this.ch] = await this.manager.createAMQPChannels((channels) => { [this.ch] = channels; - this.onConnect().catch(console.log); + this.onConnect(); }, () => { this.ch_ready = false; }); - this.onConnect().catch(console.log); + this.onConnect(); } - private async onConnect(): Promise { + private onConnect() { if (this.conf.settings.dsp_parser) { const q = `${this.manager.chain}:dsp`; console.log(q); - await this.ch.assertQueue(q, {durable: true}); - await this.ch.consume(q, (data) => { + this.ch.prefetch(100); + this.ch.assertQueue(q, {durable: true}); + this.ch.consume(q, (data) => { this.onMessage(data); - }, { - prefetch: 100 }); } } diff --git a/workers/delta-updater.ts b/workers/delta-updater.ts index bc8b0fee..d8d8a018 100644 --- a/workers/delta-updater.ts +++ b/workers/delta-updater.ts @@ -29,11 +29,9 @@ export default class MainDSWorker extends HyperionWorker { if (this.ch) { this.queueName = this.chain + ":delta_rm"; hLog(`Launched delta updater, consuming from ${this.queueName}`); - this.ch.assertQueue(this.queueName, {durable: true}).then(() => { - this.ch.consume(this.queueName, this.onConsume.bind(this),{ - prefetch: 1 - }).catch(console.log); - }); + this.ch.assertQueue(this.queueName, {durable: true}); + this.ch.prefetch(1); + this.ch.consume(this.queueName, this.onConsume.bind(this)); } } diff --git a/workers/deserializer.ts b/workers/deserializer.ts index 812de5d3..bc6fbff9 100644 --- a/workers/deserializer.ts +++ b/workers/deserializer.ts @@ -286,11 +286,10 @@ export default class MainDSWorker extends HyperionWorker { private initConsumer() { if (this.ch_ready) { + this.ch.prefetch(this.conf.prefetch.block); this.ch.consume(process.env['worker_queue'], (data) => { this.consumerQueue.push(data).catch(console.log); - }, { - prefetch: this.conf.prefetch.block - }).catch(console.log); + }); } } @@ -624,7 +623,12 @@ export default class MainDSWorker extends HyperionWorker { const pool_queue = `${this.chain}:ds_pool:${selected_q}`; if (this.ch_ready) { - this.ch.sendToQueue(pool_queue, bufferFromJson(trace, true), {headers}).catch(console.log); + const enqueueResult = this.ch.sendToQueue(pool_queue, bufferFromJson(trace, true), {headers}); + if (!enqueueResult) { + hLog("Backpressure"); + console.log("Header size: " + JSON.stringify(headers).length); + console.log(headers); + } return true; } else { return false; @@ -1160,7 +1164,7 @@ export default class MainDSWorker extends HyperionWorker { let jsonRow = await this.processContractRowNative(payload, block_num); if (jsonRow?.value && !jsonRow['_blacklisted']) { - debugLog(jsonRow); + console.log(jsonRow); debugLog('Delta DS failed ->>', jsonRow); jsonRow = await this.processContractRowNative(payload, block_num - 1); debugLog('Retry with previous ABI ->>', jsonRow); diff --git a/workers/ds-pool.ts b/workers/ds-pool.ts index ab3e10ef..f25dbd19 100644 --- a/workers/ds-pool.ts +++ b/workers/ds-pool.ts @@ -625,11 +625,12 @@ export default class DSPoolWorker extends HyperionWorker { initConsumer() { if (this.ch_ready) { + this.ch.prefetch(this.conf.prefetch.block); this.ch.consume(this.local_queue, (data) => { this.consumerQueue.push(data); - }, { - prefetch: this.conf.prefetch.block - }).catch(console.log); + }, {}, (err, ok) => { + hLog(err, ok); + }); debugLog(`started consuming from ${this.local_queue}`); } } @@ -693,14 +694,13 @@ export default class DSPoolWorker extends HyperionWorker { this.ch_ready = true; this.ch.assertQueue(this.local_queue, { durable: true - }).then(() => { - this.initConsumer(); - }).catch(console.log); + }); + this.initConsumer(); } if (this.conf.settings.dsp_parser) { this.ch.assertQueue(`${queue_prefix}:dsp`, { durable: true - }).catch(console.log); + }); } } diff --git a/workers/hyperionWorker.ts b/workers/hyperionWorker.ts index 046464a6..3f8129f4 100644 --- a/workers/hyperionWorker.ts +++ b/workers/hyperionWorker.ts @@ -11,280 +11,277 @@ import {HeapInfo} from "v8"; import {debugLog, hLog} from "../helpers/common_functions"; import {StateHistorySocket} from "../connections/state-history"; import * as AbiEOS from "@eosrio/node-abieos"; -import {ChannelWrapper} from "amqp-connection-manager"; export abstract class HyperionWorker { - conf: HyperionConfig; - manager: ConnectionManager; - mLoader: HyperionModuleLoader; - chain: string; - chainId: string; - - // AMQP Channels - ch: ChannelWrapper; - cch: ChannelWrapper; - - rpc: JsonRpc; - client: Client; - ship: StateHistorySocket; - - txEnc = new TextEncoder(); - txDec = new TextDecoder(); - cch_ready = false; - ch_ready = false; - - events: EventEmitter; - - filters: Filters; - - failedAbiMap: Map> = new Map(); - - protected constructor() { - this.checkDebugger(); - const cm = new ConfigurationModule(); - this.conf = cm.config; - this.filters = cm.filters; - this.manager = new ConnectionManager(cm); - this.mLoader = new HyperionModuleLoader(cm); - this.chain = this.conf.settings.chain; - this.chainId = this.manager.conn.chains[this.chain].chain_id; - this.rpc = this.manager.nodeosJsonRPC; - this.client = this.manager.elasticsearchClient; - this.ship = this.manager.shipClient; - this.events = new EventEmitter(); - - this.mLoader.init().then(() => { - // Connect to RabbitMQ (amqplib) - this.events.emit('loader_ready'); - this.connectAMQP().then(() => { - this.onConnect(); - }).catch(console.log); - }); - - const bytesToString = (bytes: number) => { - const e = Math.log(bytes) / Math.log(1024) | 0; - const n = (bytes / Math.pow(1024, e)).toFixed(2); - return n + ' ' + (e == 0 ? 'bytes' : (['KB', 'MB', 'GB', 'TB'])[e - 1]); - }; - - - // handle ipc messages - process.on('message', (msg: any) => { - switch (msg.event) { - case 'request_v8_heap_stats': { - const report: HeapInfo = v8.getHeapStatistics(); - const used_pct = report.used_heap_size / report.heap_size_limit; - process.send({ - event: 'v8_heap_report', - id: process.env.worker_role + ':' + process.env.worker_id, - data: { - heap_usage: (used_pct * 100).toFixed(2) + "%", - ...report - } - }); - break; - } - case 'request_memory_usage': { - const report = process.memoryUsage(); - process.send({ - event: 'memory_report', - id: process.env.worker_role + ':' + process.env.worker_id, - data: { - resident: bytesToString(report.rss), - } - }); - break; - } - default: { - this.onIpcMessage(msg); - } - } - }); - } - - async connectAMQP() { - [this.ch, this.cch] = await this.manager.createAMQPChannels((channels: ChannelWrapper[]) => { - if (channels) { - [this.ch, this.cch] = channels; - hLog('AMQP Reconnecting...'); - this.onConnect(); - } - }, () => { - this.ch_ready = false; - this.cch_ready = false; - }); - } - - onConnect() { - this.ch_ready = true; - this.cch_ready = true; - this.assertQueues(); - this.ch.on('close', () => { - this.ch_ready = false; - }); - this.cch.on('close', () => { - this.cch_ready = false; - }); - this.events.emit('ready'); - } - - checkDebugger() { - if (/--inspect/.test(process.execArgv.join(' '))) { - const inspector = require('inspector'); - hLog('DEBUGGER ATTACHED', inspector.url()); - } - } - - private anyFromCode(act: any) { - return this.chain + '::' + act.account + '::*' - } - - private anyFromName(act: any) { - return this.chain + '::*::' + act.name; - } - - private codeActionPair(act: any) { - return this.chain + '::' + act.account + '::' + act.name; - } - - private anyFromDeltaCode(delta: any) { - return this.chain + '::' + delta.code + '::*' - } - - private anyFromDeltaTable(delta: any) { - return this.chain + '::*::' + delta.table; - } - - private codeDeltaPair(delta: any) { - return this.chain + '::' + delta.code + '::' + delta.table; - } - - protected checkBlacklist(act) { - - // test for chain::code::* - if (this.filters.action_blacklist.has(this.anyFromCode(act))) { - return true; - } - - // test for chain::*::name - if (this.filters.action_blacklist.has(this.anyFromName(act))) { - return true; - } - - // test for chain::code::name - return this.filters.action_blacklist.has(this.codeActionPair(act)); - } - - protected checkWhitelist(act) { - - // test for chain::code::* - if (this.filters.action_whitelist.has(this.anyFromCode(act))) { - return true; - } - - // test for chain::*::name - if (this.filters.action_whitelist.has(this.anyFromName(act))) { - return true; - } - - // test for chain::code::name - return this.filters.action_whitelist.has(this.codeActionPair(act)); - } - - protected checkDeltaBlacklist(delta) { - - // test blacklist for chain::code::* - if (this.filters.delta_blacklist.has(this.anyFromDeltaCode(delta))) { - return true; - } - - // test blacklist for chain::*::table - if (this.filters.delta_blacklist.has(this.anyFromDeltaTable(delta))) { - return true; - } - - // test blacklist for chain::code::table - return this.filters.delta_blacklist.has(this.codeDeltaPair(delta)); - } - - protected checkDeltaWhitelist(delta) { - - // test whitelist for chain::code::* - if (this.filters.delta_whitelist.has(this.anyFromDeltaCode(delta))) { - return true; - } - - // test whitelist for chain::*::table - if (this.filters.delta_whitelist.has(this.anyFromDeltaTable(delta))) { - return true; - } - - // test whitelist for chain::code::table - return this.filters.delta_whitelist.has(this.codeDeltaPair(delta)); - } - - loadAbiHex(contract, block_num, abi_hex) { - // check local blacklist for corrupted abis that failed to load before - let _status; - if (this.failedAbiMap.has(contract) && this.failedAbiMap.get(contract).has(block_num)) { - _status = false; - debugLog('ignore saved abi for', contract, block_num); - } else { - _status = AbiEOS.load_abi_hex(contract, abi_hex); - if (!_status) { - hLog(`AbiEOS.load_abi_hex error for ${contract} at ${block_num}`); - if (this.failedAbiMap.has(contract)) { - this.failedAbiMap.get(contract).add(block_num); - } else { - this.failedAbiMap.set(contract, new Set([block_num])); - } - } else { - this.removeFromFailed(contract); - } - } - return _status; - } - - removeFromFailed(contract) { - if (this.failedAbiMap.has(contract)) { - this.failedAbiMap.delete(contract); - hLog(`${contract} was removed from the failed map!`); - } - } - - async loadCurrentAbiHex(contract) { - let _status; - if (this.failedAbiMap.has(contract) && this.failedAbiMap.get(contract).has(-1)) { - _status = false; - debugLog('ignore current abi for', contract); - } else { - const currentAbi = await this.rpc.getRawAbi(contract); - if (currentAbi.abi.byteLength > 0) { - const abi_hex = Buffer.from(currentAbi.abi).toString('hex'); - _status = AbiEOS.load_abi_hex(contract, abi_hex); - if (!_status) { - hLog(`AbiEOS.load_abi_hex error for ${contract} at head`); - if (this.failedAbiMap.has(contract)) { - this.failedAbiMap.get(contract).add(-1); - } else { - this.failedAbiMap.set(contract, new Set([-1])); - } - } else { - this.removeFromFailed(contract); - } - } else { - _status = false; - } - } - return _status; - } - - abstract run(): Promise - - abstract assertQueues(): void - - abstract onIpcMessage(msg: any): void + conf: HyperionConfig; + manager: ConnectionManager; + mLoader: HyperionModuleLoader; + chain: string; + chainId: string; + + // AMQP Channels + ch: Channel; + cch: ConfirmChannel; + + rpc: JsonRpc; + client: Client; + ship: StateHistorySocket; + + txEnc = new TextEncoder(); + txDec = new TextDecoder(); + cch_ready = false; + ch_ready = false; + + events: EventEmitter; + + filters: Filters; + + failedAbiMap: Map> = new Map(); + + protected constructor() { + this.checkDebugger(); + const cm = new ConfigurationModule(); + this.conf = cm.config; + this.filters = cm.filters; + this.manager = new ConnectionManager(cm); + this.mLoader = new HyperionModuleLoader(cm); + this.chain = this.conf.settings.chain; + this.chainId = this.manager.conn.chains[this.chain].chain_id; + this.rpc = this.manager.nodeosJsonRPC; + this.client = this.manager.elasticsearchClient; + this.ship = this.manager.shipClient; + this.events = new EventEmitter(); + + this.mLoader.init().then(() => { + // Connect to RabbitMQ (amqplib) + this.events.emit('loader_ready'); + this.connectAMQP().then(() => { + this.onConnect(); + }).catch(console.log); + }); + + const bytesToString = (bytes: number) => { + const e = Math.log(bytes) / Math.log(1024) | 0; + const n = (bytes / Math.pow(1024, e)).toFixed(2); + return n + ' ' + (e == 0 ? 'bytes' : (['KB', 'MB', 'GB', 'TB'])[e - 1]); + }; + + + // handle ipc messages + process.on('message', (msg: any) => { + switch (msg.event) { + case 'request_v8_heap_stats': { + const report: HeapInfo = v8.getHeapStatistics(); + const used_pct = report.used_heap_size / report.heap_size_limit; + process.send({ + event: 'v8_heap_report', + id: process.env.worker_role + ':' + process.env.worker_id, + data: { + heap_usage: (used_pct * 100).toFixed(2) + "%", + ...report + } + }); + break; + } + case 'request_memory_usage': { + const report = process.memoryUsage(); + process.send({ + event: 'memory_report', + id: process.env.worker_role + ':' + process.env.worker_id, + data: { + resident: bytesToString(report.rss), + } + }); + break; + } + default: { + this.onIpcMessage(msg); + } + } + }); + } + + async connectAMQP() { + [this.ch, this.cch] = await this.manager.createAMQPChannels((channels) => { + [this.ch, this.cch] = channels; + hLog('AMQP Reconnecting...'); + this.onConnect(); + }, () => { + this.ch_ready = false; + this.cch_ready = false; + }); + } + + onConnect() { + this.ch_ready = true; + this.cch_ready = true; + this.assertQueues(); + this.ch.on('close', () => { + this.ch_ready = false; + }); + this.cch.on('close', () => { + this.cch_ready = false; + }); + this.events.emit('ready'); + } + + checkDebugger() { + if (/--inspect/.test(process.execArgv.join(' '))) { + const inspector = require('inspector'); + hLog('DEBUGGER ATTACHED', inspector.url()); + } + } + + private anyFromCode(act: any) { + return this.chain + '::' + act.account + '::*' + } + + private anyFromName(act: any) { + return this.chain + '::*::' + act.name; + } + + private codeActionPair(act: any) { + return this.chain + '::' + act.account + '::' + act.name; + } + + private anyFromDeltaCode(delta: any) { + return this.chain + '::' + delta.code + '::*' + } + + private anyFromDeltaTable(delta: any) { + return this.chain + '::*::' + delta.table; + } + + private codeDeltaPair(delta: any) { + return this.chain + '::' + delta.code + '::' + delta.table; + } + + protected checkBlacklist(act) { + + // test for chain::code::* + if (this.filters.action_blacklist.has(this.anyFromCode(act))) { + return true; + } + + // test for chain::*::name + if (this.filters.action_blacklist.has(this.anyFromName(act))) { + return true; + } + + // test for chain::code::name + return this.filters.action_blacklist.has(this.codeActionPair(act)); + } + + protected checkWhitelist(act) { + + // test for chain::code::* + if (this.filters.action_whitelist.has(this.anyFromCode(act))) { + return true; + } + + // test for chain::*::name + if (this.filters.action_whitelist.has(this.anyFromName(act))) { + return true; + } + + // test for chain::code::name + return this.filters.action_whitelist.has(this.codeActionPair(act)); + } + + protected checkDeltaBlacklist(delta) { + + // test blacklist for chain::code::* + if (this.filters.delta_blacklist.has(this.anyFromDeltaCode(delta))) { + return true; + } + + // test blacklist for chain::*::table + if (this.filters.delta_blacklist.has(this.anyFromDeltaTable(delta))) { + return true; + } + + // test blacklist for chain::code::table + return this.filters.delta_blacklist.has(this.codeDeltaPair(delta)); + } + + protected checkDeltaWhitelist(delta) { + + // test whitelist for chain::code::* + if (this.filters.delta_whitelist.has(this.anyFromDeltaCode(delta))) { + return true; + } + + // test whitelist for chain::*::table + if (this.filters.delta_whitelist.has(this.anyFromDeltaTable(delta))) { + return true; + } + + // test whitelist for chain::code::table + return this.filters.delta_whitelist.has(this.codeDeltaPair(delta)); + } + + loadAbiHex(contract, block_num, abi_hex) { + // check local blacklist for corrupted abis that failed to load before + let _status; + if (this.failedAbiMap.has(contract) && this.failedAbiMap.get(contract).has(block_num)) { + _status = false; + debugLog('ignore saved abi for', contract, block_num); + } else { + _status = AbiEOS.load_abi_hex(contract, abi_hex); + if (!_status) { + hLog(`AbiEOS.load_abi_hex error for ${contract} at ${block_num}`); + if (this.failedAbiMap.has(contract)) { + this.failedAbiMap.get(contract).add(block_num); + } else { + this.failedAbiMap.set(contract, new Set([block_num])); + } + } else { + this.removeFromFailed(contract); + } + } + return _status; + } + + removeFromFailed(contract) { + if (this.failedAbiMap.has(contract)) { + this.failedAbiMap.delete(contract); + hLog(`${contract} was removed from the failed map!`); + } + } + + async loadCurrentAbiHex(contract) { + let _status; + if (this.failedAbiMap.has(contract) && this.failedAbiMap.get(contract).has(-1)) { + _status = false; + debugLog('ignore current abi for', contract); + } else { + const currentAbi = await this.rpc.getRawAbi(contract); + if (currentAbi.abi.byteLength > 0) { + const abi_hex = Buffer.from(currentAbi.abi).toString('hex'); + _status = AbiEOS.load_abi_hex(contract, abi_hex); + if (!_status) { + hLog(`AbiEOS.load_abi_hex error for ${contract} at head`); + if (this.failedAbiMap.has(contract)) { + this.failedAbiMap.get(contract).add(-1); + } else { + this.failedAbiMap.set(contract, new Set([-1])); + } + } else { + this.removeFromFailed(contract); + } + } else { + _status = false; + } + } + return _status; + } + + abstract run(): Promise + + abstract assertQueues(): void + + abstract onIpcMessage(msg: any): void } diff --git a/workers/indexer.ts b/workers/indexer.ts index acf71a43..9ab55615 100644 --- a/workers/indexer.ts +++ b/workers/indexer.ts @@ -48,11 +48,9 @@ export default class IndexerWorker extends HyperionWorker { this.indexQueue.pause(); this.ch_ready = false; }); - this.ch.assertQueue(process.env.queue, {durable: true}).then(r => { - this.ch.consume(process.env.queue, this.indexQueue.push, { - prefetch: this.conf.prefetch.index - }).catch(console.log); - }); + this.ch.assertQueue(process.env.queue, {durable: true}); + this.ch.prefetch(this.conf.prefetch.index); + this.ch.consume(process.env.queue, this.indexQueue.push); } } catch (e) { console.error('rabbitmq error!'); From 69bcbaad186065da772089ab30358d424fb3f178 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Mon, 25 Sep 2023 01:25:40 -0300 Subject: [PATCH 14/15] flow control --- package.json | 2 +- workers/deserializer.ts | 43 ++++++++++++++++++++++++++++++++++------- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/package.json b/package.json index 8fd8cc6d..a288ff2b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hyperion-history", - "version": "3.3.9-7", + "version": "3.3.9-8", "description": "Scalable Full History API Solution for EOSIO based blockchains", "main": "launcher.js", "scripts": { diff --git a/workers/deserializer.ts b/workers/deserializer.ts index bc6fbff9..aaf66559 100644 --- a/workers/deserializer.ts +++ b/workers/deserializer.ts @@ -8,6 +8,7 @@ import {Type} from "../addons/eosjs-native/eosjs-serialize"; import {debugLog, hLog} from "../helpers/common_functions"; import {createHash} from "crypto"; import flatstr from 'flatstr'; +import {Options} from "amqplib"; const FJS = require('fast-json-stringify'); @@ -124,6 +125,9 @@ export default class MainDSWorker extends HyperionWorker { allowedDynamicContracts: Set = new Set(); + backpressureQueue: any[] = []; + waitToSend = false; + constructor() { super(); @@ -290,6 +294,16 @@ export default class MainDSWorker extends HyperionWorker { this.ch.consume(process.env['worker_queue'], (data) => { this.consumerQueue.push(data).catch(console.log); }); + this.ch.on('drain', args => { + this.waitToSend = false; + while (this.backpressureQueue.length > 0) { + const msg = this.backpressureQueue.shift(); + const status = this.controlledSendToQueue(msg.queue, msg.payload, msg.options); + if (!status) { + break; + } + } + }); } } @@ -622,19 +636,34 @@ export default class MainDSWorker extends HyperionWorker { } const pool_queue = `${this.chain}:ds_pool:${selected_q}`; - if (this.ch_ready) { - const enqueueResult = this.ch.sendToQueue(pool_queue, bufferFromJson(trace, true), {headers}); - if (!enqueueResult) { - hLog("Backpressure"); - console.log("Header size: " + JSON.stringify(headers).length); - console.log(headers); + const payload = bufferFromJson(trace, true); + + if (!this.waitToSend) { + if (this.ch_ready) { + this.controlledSendToQueue(pool_queue, payload, {headers}); + return true; + } else { + return false; } - return true; } else { + this.backpressureQueue.push({ + queue: pool_queue, + payload: payload, + options: {headers} + }); return false; } } + controlledSendToQueue(pool_queue: string, payload: Buffer, options: Options.Publish): boolean { + const enqueueResult = this.ch.sendToQueue(pool_queue, payload, options); + if (!enqueueResult) { + this.waitToSend = true; + hLog("Backpressure"); + } + return enqueueResult; + } + createSerialBuffer(inputArray) { return new Serialize.SerialBuffer({textEncoder: this.txEnc, textDecoder: this.txDec, array: inputArray}); } From 76807c6c863dc4d046061467a3395b80c16dc3a1 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Mon, 25 Sep 2023 01:29:53 -0300 Subject: [PATCH 15/15] cleanup --- modules/parsers/base-parser.ts | 1 - workers/deserializer.ts | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/modules/parsers/base-parser.ts b/modules/parsers/base-parser.ts index 0aeabf9d..2dd6d1ad 100644 --- a/modules/parsers/base-parser.ts +++ b/modules/parsers/base-parser.ts @@ -192,7 +192,6 @@ export abstract class BaseParser { if (ds_act) { if (ds_act.account && ds_act.name && ds_act.authorization) { - console.log(ds_act); action.act.data = ds_act.data; } diff --git a/workers/deserializer.ts b/workers/deserializer.ts index aaf66559..d4f45d65 100644 --- a/workers/deserializer.ts +++ b/workers/deserializer.ts @@ -659,7 +659,6 @@ export default class MainDSWorker extends HyperionWorker { const enqueueResult = this.ch.sendToQueue(pool_queue, payload, options); if (!enqueueResult) { this.waitToSend = true; - hLog("Backpressure"); } return enqueueResult; } @@ -1193,7 +1192,7 @@ export default class MainDSWorker extends HyperionWorker { let jsonRow = await this.processContractRowNative(payload, block_num); if (jsonRow?.value && !jsonRow['_blacklisted']) { - console.log(jsonRow); + debugLog(jsonRow); debugLog('Delta DS failed ->>', jsonRow); jsonRow = await this.processContractRowNative(payload, block_num - 1); debugLog('Retry with previous ABI ->>', jsonRow);