Skip to content

Commit

Permalink
v3.3.8 stable
Browse files Browse the repository at this point in the history
- fix readers stopping before having finished
  • Loading branch information
igorls committed Jan 18, 2023
1 parent 2ee60bc commit 2827f85
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 114 deletions.
14 changes: 11 additions & 3 deletions connections/state-history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ export class StateHistorySocket {
private ws;
private readonly shipUrl;
private readonly max_payload_mb;
retryOnDisconnect = true;
connected = false;

constructor(ship_url, max_payload_mb) {
this.shipUrl = ship_url;
Expand All @@ -17,29 +19,35 @@ export class StateHistorySocket {
}

connect(onMessage, onDisconnect, onError, onConnected) {

debugLog(`Connecting to ${this.shipUrl}...`);
this.ws = new WebSocket(this.shipUrl, {
perMessageDeflate: false,
maxPayload: this.max_payload_mb * 1024 * 1024,
});
this.ws.on('open', () => {
this.connected = true;
hLog('Websocket connected!');
if (onConnected) {
onConnected();
}
});
this.ws.on('message', (data) => onMessage(data));
this.ws.on('close', () => {
this.connected = false;
hLog('Websocket disconnected!');
onDisconnect();
if(this.retryOnDisconnect) {
onDisconnect();
}
});
this.ws.on('error', (err) => {
hLog(`${this.shipUrl} :: ${err.message}`);
});
}

close() {
close(graceful: boolean) {
if(graceful) {
this.retryOnDisconnect = false;
}
this.ws.close();
}

Expand Down
42 changes: 37 additions & 5 deletions modules/master.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ export class HyperionMaster {
tx: msg.trx_ids
});

this.lastProducedBlockNum = msg.block_num;

if (this.conf.settings.bp_monitoring && !this.conf.indexer.abi_scan_mode) {
this.liveBlockQueue.push(msg).catch(reason => {
hLog('Error handling consumed_block:', reason);
Expand Down Expand Up @@ -326,17 +328,38 @@ export class HyperionMaster {
if (end > this.head) {
end = this.head;
}
this.lastAssignedBlock += this.maxBatchSize;
const def = {
first_block: start,
last_block: end
};
this.lastAssignedBlock = def.last_block;
this.activeReadersCount++;
messageAllWorkers(cluster, {
event: 'new_range',
target: msg.id,
data: def
});
} else {
if (this.lastAssignedBlock >= this.head) {
hLog(`Parallel readers finished the requested range`);
const readers = this.workerMap.filter(value => value.worker_role === 'reader');
this.workerMap = this.workerMap.filter(value => value.worker_role !== 'reader');
readers.forEach(value => value.wref.kill());
// for (let hyperionWorkerDef of this.workerMap) {
// if (hyperionWorkerDef.worker_role === 'reader') {
// hyperionWorkerDef.wref.kill();
// }
// }
// for (let workersKey in cluster.workers) {
// const w = cluster.workers[workersKey];
// console.log(w);
// if (w.id === parseInt(msg.id)) {
// const idx = this.workerMap.findIndex(value => value.worker_id === w.id);
// this.workerMap.splice(idx, 1);
// w.kill();
// }
// }
}
}
}
},
Expand Down Expand Up @@ -1461,6 +1484,12 @@ export class HyperionMaster {
avg_consume_rate = consume_rate;
}
const log_msg = [];

// print current head for live reading
if (this.lastProducedBlockNum > 0) {
log_msg.push(`#${this.lastProducedBlockNum}`);
}

log_msg.push(`W:${_workers}`);

const _r = (this.pushedBlocks + this.livePushedBlocks) / tScale;
Expand All @@ -1484,9 +1513,12 @@ export class HyperionMaster {
this.metrics.indexingRate?.set(_ir);

if (this.total_blocks < this.total_range && !this.conf.indexer.live_only_mode) {
const remaining = this.total_range - this.total_blocks;
const estimated_time = Math.round(remaining / avg_consume_rate);
const time_string = moment().add(estimated_time, 'seconds').fromNow(false);
let time_string = 'waiting for indexer';
if (avg_consume_rate > 0) {
const remaining = this.total_range - this.total_blocks;
const estimated_time = Math.round(remaining / avg_consume_rate);
time_string = moment().add(estimated_time, 'seconds').fromNow(false);
}
const pct_parsed = ((this.total_blocks / this.total_range) * 100).toFixed(1);
const pct_read = ((this.total_read / this.total_range) * 100).toFixed(1);
log_msg.push(`${this.total_blocks}/${this.total_read}/${this.total_range}`);
Expand All @@ -1510,7 +1542,7 @@ export class HyperionMaster {
hLog(log_msg.join(' | '));
}

if(this.liveConsumedBlocks > 0 && this.consumedBlocks === 0 && this.conf.indexer.abi_scan_mode) {
if (this.liveConsumedBlocks > 0 && this.consumedBlocks === 0 && this.conf.indexer.abi_scan_mode) {
hLog('Warning: Live reading on ABI SCAN mode')
}

Expand Down
Loading

0 comments on commit 2827f85

Please sign in to comment.