diff --git a/modules/master.ts b/modules/master.ts index a3b8a769..f134159b 100644 --- a/modules/master.ts +++ b/modules/master.ts @@ -287,6 +287,16 @@ export class HyperionMaster { this.total_range = this.head - msg.block_num; } }, + 'kill_worker': (msg: any) => { + for (let workersKey in cluster.workers) { + const w = cluster.workers[workersKey]; + if (w.id === parseInt(msg.id)) { + const idx = this.workerMap.findIndex(value => value.worker_id === w.id); + this.workerMap.splice(idx, 1); + w.kill(); + } + } + }, 'completed': (msg: any) => { if (msg.id === this.doctorId.toString()) { hLog('repair worker completed', msg); @@ -645,7 +655,7 @@ export class HyperionMaster { index: new_index }); } catch (e) { - console.log(e); + hLog(e); process.exit(1); } @@ -657,7 +667,7 @@ export class HyperionMaster { name: `${queue_prefix}-${index.type}` }); } catch (e) { - console.log(e); + hLog(e); process.exit(1); } @@ -1500,7 +1510,11 @@ export class HyperionMaster { hLog(log_msg.join(' | ')); } - if (this.indexedObjects === 0 && this.deserializedActions === 0 && this.consumedBlocks === 0 && !this.mode_transition) { + if(this.liveConsumedBlocks > 0 && this.consumedBlocks === 0 && this.conf.indexer.abi_scan_mode) { + hLog('Warning: Live reading on ABI SCAN mode') + } + + if (this.liveConsumedBlocks + this.indexedObjects + this.deserializedActions + this.consumedBlocks === 0 && !this.mode_transition) { // Report completed range (parallel reading) if (this.total_blocks === this.total_range && !this.range_completed) { @@ -1549,6 +1563,13 @@ export class HyperionMaster { } } else { if (!this.shutdownStarted) { + const readers = this.workerMap.filter(value => { + return value.worker_role === 'reader' || value.worker_role === 'continuous_reader'; + }); + if (readers.length === 0) { + hLog(`No more active workers, stopping now...`); + process.exit(); + } const idleMsg = 'No blocks are being processed, please check your state-history node!'; if (this.idle_count === 2) { this.emitAlert('warning', idleMsg); @@ -1715,7 +1736,16 @@ export class HyperionMaster { this.ioRedisClient = new IORedis(this.manager.conn.redis); // Remove first indexed block from cache (v2/health) - await this.ioRedisClient.del(`${this.manager.chain}::fib`) + await this.ioRedisClient.del(`${this.manager.chain}::fib`); + + // check nodeos + try { + const info = await this.rpc.get_info(); + hLog(`Nodeos version: ${info.server_version_string}`); + } catch (e) { + hLog(`Chain API Error: ${e.message}`); + process.exit(); + } // Elasticsearch this.client = this.manager.elasticsearchClient; @@ -1822,9 +1852,9 @@ export class HyperionMaster { // handle worker disconnection events cluster.on('disconnect', (worker) => { if (!this.mode_transition && !this.shutdownStarted) { - hLog(`The worker #${worker.id} has disconnected, attempting to re-launch in 5 seconds...`); const workerReference = this.workerMap.find(value => value.worker_id === worker.id); if (workerReference) { + hLog(`The worker #${worker.id} has disconnected, attempting to re-launch in 5 seconds...`); workerReference.wref = null; workerReference.failures++; hLog(`New worker defined: ${workerReference.worker_role} for ${workerReference.worker_queue}`); @@ -1840,7 +1870,7 @@ export class HyperionMaster { }, 1000); }, 5000); } else { - console.log(`Worker #${worker.id} not found in map!`); + hLog(`The worker #${worker.id} has disconnected`); } } }); @@ -2216,9 +2246,7 @@ export class HyperionMaster { await this.setupIndexers(); await this.setupStreaming(); await this.setupDSPool(); - this.addWorker({ - worker_role: "delta_updater" - }); + this.addWorker({worker_role: "delta_updater"}); } private async findRange() { @@ -2244,8 +2272,7 @@ export class HyperionMaster { try { this.chain_data = await this.rpc.get_info(); } catch (e) { - console.log(e.message); - console.error('failed to connect to chain api'); + hLog('Failed to connect to chain api: ' + e.message); process.exit(1); } this.head = this.chain_data.head_block_num; diff --git a/workers/ds-pool.ts b/workers/ds-pool.ts index 70af15ee..a51f2ceb 100644 --- a/workers/ds-pool.ts +++ b/workers/ds-pool.ts @@ -602,20 +602,24 @@ export default class DSPoolWorker extends HyperionWorker { pushToActionStreamingQueue(payload, uniqueAction) { if (this.allowStreaming && this.conf.features['streaming'].traces) { - const notifArray = new Set(); - uniqueAction.act.authorization.forEach(auth => { - notifArray.add(auth.actor); - }); - uniqueAction.notified.forEach(acc => { - notifArray.add(acc); - }); - const headers = { - event: 'trace', - account: uniqueAction['act']['account'], - name: uniqueAction['act']['name'], - notified: [...notifArray].join(",") - }; - this.ch.publish('', this.chain + ':stream', payload, {headers}); + try { + const notifArray = new Set(); + uniqueAction.act.authorization.forEach(auth => { + notifArray.add(auth.actor); + }); + uniqueAction.receipts.forEach(rec => { + notifArray.add(rec.receiver); + }); + const headers = { + event: 'trace', + account: uniqueAction['act']['account'], + name: uniqueAction['act']['name'], + notified: [...notifArray].join(",") + }; + this.ch.publish('', this.chain + ':stream', payload, {headers}); + } catch (e) { + hLog(e); + } } } diff --git a/workers/state-reader.ts b/workers/state-reader.ts index baf98fce..c4b8debf 100644 --- a/workers/state-reader.ts +++ b/workers/state-reader.ts @@ -317,46 +317,55 @@ export default class StateReader extends HyperionWorker { // NORMAL OPERATION MODE if (!this.recovery) { + const result = deserialize('result', data, this.txEnc, this.txDec, this.types); + + // ship status message if (result[0] === 'get_status_result_v0') { this.shipInitStatus = result[1]; hLog(`\n| SHIP Status Report\n| Init block: ${this.shipInitStatus['chain_state_begin_block']}\n| Head block: ${this.shipInitStatus['chain_state_end_block']}`); const chain_state_begin_block = this.shipInitStatus['chain_state_begin_block']; if (!this.conf.indexer.disable_reading) { + switch (process.env['worker_role']) { + + // range reader setup case 'reader': { if (chain_state_begin_block > process.env.first_block) { + + // skip snapshot block until a solution to parse it is found + const nextBlock = chain_state_begin_block + 2; hLog(`First saved block is ahead of requested range! - Req: ${process.env.first_block} | First: ${chain_state_begin_block}`); - hLog('Requesting a single block'); - if (this.conf.settings.ignore_snapshot) { - this.local_block_num = chain_state_begin_block; - process.send({event: 'update_init_block', block_num: chain_state_begin_block + 2,}); - this.newRange({ - first_block: chain_state_begin_block + 1, - last_block: chain_state_begin_block + this.conf.scaling.batch_size - }); - } else { - process.send({event: 'update_init_block', block_num: chain_state_begin_block + 1}); - this.newRange({ - first_block: chain_state_begin_block, - last_block: chain_state_begin_block + 1 - }); - } + this.local_block_num = nextBlock - 1; + process.send({ + event: 'update_init_block', + block_num: nextBlock + }); + this.newRange({ + first_block: nextBlock, + last_block: nextBlock + this.conf.scaling.batch_size + }); } else { this.requestBlocks(0); } break; } + + // live reader setup case 'continuous_reader': { this.requestBlocks(parseInt(process.env['worker_last_processed_block'], 10)); break; } + } } else { this.ship.close(); process.exit(1); } + } else { + + // ship block message const res = result[1]; if (res['this_block']) { @@ -440,7 +449,12 @@ export default class StateReader extends HyperionWorker { } } } else { - return 0; + hLog(`Reader is idle! - Head at: ${result[1].head.block_num}`); + this.ship.close(); + process.send({ + event: 'kill_worker', + id: process.env['worker_id'] + }); } }