From c6af92bc5cd9b25b5287063ef035ce1a2c692d28 Mon Sep 17 00:00:00 2001 From: dskvr Date: Sat, 16 Mar 2024 02:17:11 +0100 Subject: [PATCH] fix race conditions and concurrency issues --- apps/nocapd/src/classes/Worker.js | 45 +++++++++++++++---------------- apps/nocapd/src/daemon.js | 3 +-- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/apps/nocapd/src/classes/Worker.js b/apps/nocapd/src/classes/Worker.js index 19d4aab4..2615c04d 100644 --- a/apps/nocapd/src/classes/Worker.js +++ b/apps/nocapd/src/classes/Worker.js @@ -51,8 +51,8 @@ export class NWWorker { setupNocapOpts(){ this.nocapOpts = { timeout: this.timeout, - checked_by: this.pubkey, - rejectOnConnectFailure: true + checked_by: this.pubkey + // rejectOnConnectFailure: true } } @@ -98,41 +98,38 @@ export class NWWorker { async on_completed(job, rvalue){ this.log.debug(`on_completed(): ${job.id}: ${JSON.stringify(rvalue)}`) const { result } = rvalue - const offline = result?.open?.data !== true - if(!result || offline){ - this.on_failed(job, new Error(`Nocap.check('${this.checks}'): failed for ${job.data.relay}`)) - return - } + let fail = result?.open?.data? false: true + if(fail) + await this.on_fail(result) + else + await this.on_success(result) + await this.after_completed( result, fail ) + } + + async on_success(result){ + this.log.debug(`on_success(): ${result.url}`) this.progressMessage(result.url, result) if(this.config?.publisher?.kinds?.includes(30066) ){ const publish30066 = new Publish.Kind30066() await publish30066.one( result ) } - if(this.config?.publisher?.kinds?.includes(30166) ){ const publish30166 = new Publish.Kind30166() - await publish30166.one( result ) + await publish30166.one( result ) } - await this.after_completed( result ) } - async after_completed(result){ - this.log.debug(`after_completed(): ${result.url}`) - this.processed++ - await this.updateRelayCache( { ...result} ) - await this.retry.setRetries( result.url, true ) - await this.setLastChecked( result.url, Date.now() ) + async on_fail(result){ + this.log.debug(`on_fail(): ${result.url}`) + this.progressMessage(result.url, null, true) } - async on_failed(job, err){ - this.log.debug(`on_failed(): ${job.id}`) - const { relay:url } = job.data - const retry_id = await this.retry.setRetries( url, false ) - const lastChecked_id = await this.setLastChecked( url, Date.now() ) - const relay_id = await this.updateRelayCache({ url, open: { data: false }} ) - this.log?.debug(`Websocket check failed for ${job.data.relay}: ${JSON.stringify(err)}, retry_id: ${retry_id}, lastChecked_id: ${lastChecked_id}, relay_id: ${relay_id}`) - this.progressMessage(url, null, true) + async after_completed(result, error=false){ + this.log.debug(`after_completed(): ${result.url}`) this.processed++ + await this.updateRelayCache( { ...result } ) + await this.retry.setRetries( result.url, !error ) + await this.setLastChecked( result.url, Date.now() ) } async on_drained(){ diff --git a/apps/nocapd/src/daemon.js b/apps/nocapd/src/daemon.js index ecab21de..87f056fb 100644 --- a/apps/nocapd/src/daemon.js +++ b/apps/nocapd/src/daemon.js @@ -36,7 +36,7 @@ const maybeAnnounce = async () => { conf.frequency = timestring(conf.frequency, 's').toString() const announce = new AnnounceMonitor(conf) announce.generate() - announce.sign( process.env.DAEMON_PRIVKEY ) + announce.sign( process.env.DAEMON_PRIVKEY ) await announce.publish( conf.relays ) } @@ -71,7 +71,6 @@ const syncRelaysIn = async () => { const initWorker = async () => { const connection = RedisConnectionDetails() - console.log(connection) const concurrency = config?.nocapd?.bullmq?.worker?.concurrency? config.nocapd.bullmq.worker.concurrency: 1 const $q = new NocapdQueues({ pubkey: PUBKEY, logger: new Logger('NocapdQueues') }) const ncdq = NocapdQueue(`nocapd/${config?.monitor?.slug}` || null)