Skip to content

Commit

Permalink
v3.3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
igorls committed Dec 9, 2022
1 parent da295aa commit 2ee60bc
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 41 deletions.
49 changes: 38 additions & 11 deletions modules/master.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,16 @@ export class HyperionMaster {
this.total_range = this.head - msg.block_num;
}
},
'kill_worker': (msg: any) => {
for (let workersKey in cluster.workers) {
const w = cluster.workers[workersKey];
if (w.id === parseInt(msg.id)) {
const idx = this.workerMap.findIndex(value => value.worker_id === w.id);
this.workerMap.splice(idx, 1);
w.kill();
}
}
},
'completed': (msg: any) => {
if (msg.id === this.doctorId.toString()) {
hLog('repair worker completed', msg);
Expand Down Expand Up @@ -645,7 +655,7 @@ export class HyperionMaster {
index: new_index
});
} catch (e) {
console.log(e);
hLog(e);
process.exit(1);
}

Expand All @@ -657,7 +667,7 @@ export class HyperionMaster {
name: `${queue_prefix}-${index.type}`
});
} catch (e) {
console.log(e);
hLog(e);
process.exit(1);
}

Expand Down Expand Up @@ -1500,7 +1510,11 @@ export class HyperionMaster {
hLog(log_msg.join(' | '));
}

if (this.indexedObjects === 0 && this.deserializedActions === 0 && this.consumedBlocks === 0 && !this.mode_transition) {
if(this.liveConsumedBlocks > 0 && this.consumedBlocks === 0 && this.conf.indexer.abi_scan_mode) {
hLog('Warning: Live reading on ABI SCAN mode')
}

if (this.liveConsumedBlocks + this.indexedObjects + this.deserializedActions + this.consumedBlocks === 0 && !this.mode_transition) {

// Report completed range (parallel reading)
if (this.total_blocks === this.total_range && !this.range_completed) {
Expand Down Expand Up @@ -1549,6 +1563,13 @@ export class HyperionMaster {
}
} else {
if (!this.shutdownStarted) {
const readers = this.workerMap.filter(value => {
return value.worker_role === 'reader' || value.worker_role === 'continuous_reader';
});
if (readers.length === 0) {
hLog(`No more active workers, stopping now...`);
process.exit();
}
const idleMsg = 'No blocks are being processed, please check your state-history node!';
if (this.idle_count === 2) {
this.emitAlert('warning', idleMsg);
Expand Down Expand Up @@ -1715,7 +1736,16 @@ export class HyperionMaster {
this.ioRedisClient = new IORedis(this.manager.conn.redis);

// Remove first indexed block from cache (v2/health)
await this.ioRedisClient.del(`${this.manager.chain}::fib`)
await this.ioRedisClient.del(`${this.manager.chain}::fib`);

// check nodeos
try {
const info = await this.rpc.get_info();
hLog(`Nodeos version: ${info.server_version_string}`);
} catch (e) {
hLog(`Chain API Error: ${e.message}`);
process.exit();
}

// Elasticsearch
this.client = this.manager.elasticsearchClient;
Expand Down Expand Up @@ -1822,9 +1852,9 @@ export class HyperionMaster {
// handle worker disconnection events
cluster.on('disconnect', (worker) => {
if (!this.mode_transition && !this.shutdownStarted) {
hLog(`The worker #${worker.id} has disconnected, attempting to re-launch in 5 seconds...`);
const workerReference = this.workerMap.find(value => value.worker_id === worker.id);
if (workerReference) {
hLog(`The worker #${worker.id} has disconnected, attempting to re-launch in 5 seconds...`);
workerReference.wref = null;
workerReference.failures++;
hLog(`New worker defined: ${workerReference.worker_role} for ${workerReference.worker_queue}`);
Expand All @@ -1840,7 +1870,7 @@ export class HyperionMaster {
}, 1000);
}, 5000);
} else {
console.log(`Worker #${worker.id} not found in map!`);
hLog(`The worker #${worker.id} has disconnected`);
}
}
});
Expand Down Expand Up @@ -2216,9 +2246,7 @@ export class HyperionMaster {
await this.setupIndexers();
await this.setupStreaming();
await this.setupDSPool();
this.addWorker({
worker_role: "delta_updater"
});
this.addWorker({worker_role: "delta_updater"});
}

private async findRange() {
Expand All @@ -2244,8 +2272,7 @@ export class HyperionMaster {
try {
this.chain_data = await this.rpc.get_info();
} catch (e) {
console.log(e.message);
console.error('failed to connect to chain api');
hLog('Failed to connect to chain api: ' + e.message);
process.exit(1);
}
this.head = this.chain_data.head_block_num;
Expand Down
32 changes: 18 additions & 14 deletions workers/ds-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -602,20 +602,24 @@ export default class DSPoolWorker extends HyperionWorker {

pushToActionStreamingQueue(payload, uniqueAction) {
if (this.allowStreaming && this.conf.features['streaming'].traces) {
const notifArray = new Set();
uniqueAction.act.authorization.forEach(auth => {
notifArray.add(auth.actor);
});
uniqueAction.notified.forEach(acc => {
notifArray.add(acc);
});
const headers = {
event: 'trace',
account: uniqueAction['act']['account'],
name: uniqueAction['act']['name'],
notified: [...notifArray].join(",")
};
this.ch.publish('', this.chain + ':stream', payload, {headers});
try {
const notifArray = new Set();
uniqueAction.act.authorization.forEach(auth => {
notifArray.add(auth.actor);
});
uniqueAction.receipts.forEach(rec => {
notifArray.add(rec.receiver);
});
const headers = {
event: 'trace',
account: uniqueAction['act']['account'],
name: uniqueAction['act']['name'],
notified: [...notifArray].join(",")
};
this.ch.publish('', this.chain + ':stream', payload, {headers});
} catch (e) {
hLog(e);
}
}
}

Expand Down
46 changes: 30 additions & 16 deletions workers/state-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,46 +317,55 @@ export default class StateReader extends HyperionWorker {

// NORMAL OPERATION MODE
if (!this.recovery) {

const result = deserialize('result', data, this.txEnc, this.txDec, this.types);

// ship status message
if (result[0] === 'get_status_result_v0') {
this.shipInitStatus = result[1];
hLog(`\n| SHIP Status Report\n| Init block: ${this.shipInitStatus['chain_state_begin_block']}\n| Head block: ${this.shipInitStatus['chain_state_end_block']}`);
const chain_state_begin_block = this.shipInitStatus['chain_state_begin_block'];
if (!this.conf.indexer.disable_reading) {

switch (process.env['worker_role']) {

// range reader setup
case 'reader': {
if (chain_state_begin_block > process.env.first_block) {

// skip snapshot block until a solution to parse it is found
const nextBlock = chain_state_begin_block + 2;
hLog(`First saved block is ahead of requested range! - Req: ${process.env.first_block} | First: ${chain_state_begin_block}`);
hLog('Requesting a single block');
if (this.conf.settings.ignore_snapshot) {
this.local_block_num = chain_state_begin_block;
process.send({event: 'update_init_block', block_num: chain_state_begin_block + 2,});
this.newRange({
first_block: chain_state_begin_block + 1,
last_block: chain_state_begin_block + this.conf.scaling.batch_size
});
} else {
process.send({event: 'update_init_block', block_num: chain_state_begin_block + 1});
this.newRange({
first_block: chain_state_begin_block,
last_block: chain_state_begin_block + 1
});
}
this.local_block_num = nextBlock - 1;
process.send({
event: 'update_init_block',
block_num: nextBlock
});
this.newRange({
first_block: nextBlock,
last_block: nextBlock + this.conf.scaling.batch_size
});
} else {
this.requestBlocks(0);
}
break;
}

// live reader setup
case 'continuous_reader': {
this.requestBlocks(parseInt(process.env['worker_last_processed_block'], 10));
break;
}

}
} else {
this.ship.close();
process.exit(1);
}

} else {

// ship block message
const res = result[1];

if (res['this_block']) {
Expand Down Expand Up @@ -440,7 +449,12 @@ export default class StateReader extends HyperionWorker {
}
}
} else {
return 0;
hLog(`Reader is idle! - Head at: ${result[1].head.block_num}`);
this.ship.close();
process.send({
event: 'kill_worker',
id: process.env['worker_id']
});
}
}

Expand Down

0 comments on commit 2ee60bc

Please sign in to comment.