Skip to content

Commit

Permalink
fix race conditions and concurrency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Mar 16, 2024
1 parent 0d9ee15 commit c6af92b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 26 deletions.
45 changes: 21 additions & 24 deletions apps/nocapd/src/classes/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ export class NWWorker {
setupNocapOpts(){
this.nocapOpts = {
timeout: this.timeout,
checked_by: this.pubkey,
rejectOnConnectFailure: true
checked_by: this.pubkey
// rejectOnConnectFailure: true
}
}

Expand Down Expand Up @@ -98,41 +98,38 @@ export class NWWorker {
async on_completed(job, rvalue){
this.log.debug(`on_completed(): ${job.id}: ${JSON.stringify(rvalue)}`)
const { result } = rvalue
const offline = result?.open?.data !== true
if(!result || offline){
this.on_failed(job, new Error(`Nocap.check('${this.checks}'): failed for ${job.data.relay}`))
return
}
let fail = result?.open?.data? false: true
if(fail)
await this.on_fail(result)
else
await this.on_success(result)
await this.after_completed( result, fail )
}

async on_success(result){
this.log.debug(`on_success(): ${result.url}`)
this.progressMessage(result.url, result)
if(this.config?.publisher?.kinds?.includes(30066) ){
const publish30066 = new Publish.Kind30066()
await publish30066.one( result )
}

if(this.config?.publisher?.kinds?.includes(30166) ){
const publish30166 = new Publish.Kind30166()
await publish30166.one( result )
await publish30166.one( result )
}
await this.after_completed( result )
}

async after_completed(result){
this.log.debug(`after_completed(): ${result.url}`)
this.processed++
await this.updateRelayCache( { ...result} )
await this.retry.setRetries( result.url, true )
await this.setLastChecked( result.url, Date.now() )
async on_fail(result){
this.log.debug(`on_fail(): ${result.url}`)
this.progressMessage(result.url, null, true)
}

async on_failed(job, err){
this.log.debug(`on_failed(): ${job.id}`)
const { relay:url } = job.data
const retry_id = await this.retry.setRetries( url, false )
const lastChecked_id = await this.setLastChecked( url, Date.now() )
const relay_id = await this.updateRelayCache({ url, open: { data: false }} )
this.log?.debug(`Websocket check failed for ${job.data.relay}: ${JSON.stringify(err)}, retry_id: ${retry_id}, lastChecked_id: ${lastChecked_id}, relay_id: ${relay_id}`)
this.progressMessage(url, null, true)
async after_completed(result, error=false){
this.log.debug(`after_completed(): ${result.url}`)
this.processed++
await this.updateRelayCache( { ...result } )
await this.retry.setRetries( result.url, !error )
await this.setLastChecked( result.url, Date.now() )
}

async on_drained(){
Expand Down
3 changes: 1 addition & 2 deletions apps/nocapd/src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const maybeAnnounce = async () => {
conf.frequency = timestring(conf.frequency, 's').toString()
const announce = new AnnounceMonitor(conf)
announce.generate()
announce.sign( process.env.DAEMON_PRIVKEY )
announce.sign( process.env.DAEMON_PRIVKEY )
await announce.publish( conf.relays )
}

Expand Down Expand Up @@ -71,7 +71,6 @@ const syncRelaysIn = async () => {

const initWorker = async () => {
const connection = RedisConnectionDetails()
console.log(connection)
const concurrency = config?.nocapd?.bullmq?.worker?.concurrency? config.nocapd.bullmq.worker.concurrency: 1
const $q = new NocapdQueues({ pubkey: PUBKEY, logger: new Logger('NocapdQueues') })
const ncdq = NocapdQueue(`nocapd/${config?.monitor?.slug}` || null)
Expand Down

0 comments on commit c6af92b

Please sign in to comment.