Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forced exit support #204

Open
wants to merge 3 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions zp-relayer/configs/relayerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ const config = {
[TxType.WITHDRAWAL]: toBN(process.env.RELAYER_BASE_TX_GAS_WITHDRAWAL || '650000'),
nativeConvertOverhead: toBN(process.env.RELAYER_BASE_TX_GAS_NATIVE_CONVERT || '200000'),
},
forcedExitStartBlock: parseInt(process.env.RELAYER_FORCED_EXIT_START_BLOCK || '0'),
forcedExitPollingInterval: parseInt(process.env.RELAYER_FORCED_EXIT_POLLING_INTERVAL || '600000'),
forcedExitBlockConfirmations: parseInt(process.env.RELAYER_FORCED_EXIT_BLOCK_CONFIRMATIONS || '1'),
}

export default config
35 changes: 0 additions & 35 deletions zp-relayer/direct-deposit/utils.ts

This file was deleted.

115 changes: 48 additions & 67 deletions zp-relayer/direct-deposit/watcher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
// Reference implementation:
// https://github.com/omni/tokenbridge/blob/master/oracle/src/watcher.js
import type Web3 from 'web3'
import type { AbiItem } from 'web3-utils'
import type { DirectDeposit } from '@/queue/poolTxQueue'
import { web3 } from '@/services/web3'
Expand All @@ -9,81 +6,65 @@ import DirectDepositQueueAbi from '@/abi/direct-deposit-queue-abi.json'
import config from '@/configs/watcherConfig'
import { logger } from '@/services/appLogger'
import { redis } from '@/services/redisClient'
import { lastProcessedBlock, getLastProcessedBlock, updateLastProcessedBlock, parseDirectDepositEvent } from './utils'
import { BatchCache } from './BatchCache'
import { validateDirectDeposit } from '@/validation/tx/validateDirectDeposit'
import { getBlockNumber, getEvents } from '@/utils/web3'
import { directDepositQueue } from '@/queue/directDepositQueue'
import { EventWatcher } from '../services/EventWatcher'

const PoolInstance = new web3.eth.Contract(PoolAbi as AbiItem[], config.poolAddress)
const DirectDepositQueueInstance = new web3.eth.Contract(DirectDepositQueueAbi as AbiItem[])

const eventName = 'SubmitDirectDeposit'
export function parseDirectDepositEvent(o: Record<string, any>): DirectDeposit {
const dd: DirectDeposit = {
sender: o.sender,
nonce: o.nonce,
fallbackUser: o.fallbackUser,
zkAddress: {
diversifier: o.zkAddress.diversifier,
pk: o.zkAddress.pk,
},
deposit: o.deposit,
}

const batch = new BatchCache<DirectDeposit>(
config.directDepositBatchSize,
config.directDepositBatchTtl,
ds => {
logger.info('Adding direct-deposit events to queue', { count: ds.length })
directDepositQueue.add('', ds)
},
dd => validateDirectDeposit(dd, DirectDepositQueueInstance),
redis
)
return dd
}

async function init() {
await getLastProcessedBlock()
await batch.init()
const PoolInstance = new web3.eth.Contract(PoolAbi as AbiItem[], config.poolAddress)
const queueAddress = await PoolInstance.methods.direct_deposit_queue().call()
DirectDepositQueueInstance.options.address = queueAddress
runWatcher()
}
const DirectDepositQueueInstance = new web3.eth.Contract(DirectDepositQueueAbi as AbiItem[], queueAddress)

async function getLastBlockToProcess(web3: Web3) {
const lastBlockNumber = await getBlockNumber(web3)
return lastBlockNumber - config.blockConfirmations
}

async function watch() {
const lastBlockToProcess = await getLastBlockToProcess(web3)

if (lastBlockToProcess <= lastProcessedBlock) {
logger.debug('All blocks already processed')
return
}
const batch = new BatchCache<DirectDeposit>(
config.directDepositBatchSize,
config.directDepositBatchTtl,
ds => {
logger.info('Adding direct-deposit events to queue', { count: ds.length })
directDepositQueue.add('', ds)
},
dd => validateDirectDeposit(dd, DirectDepositQueueInstance),
redis
)
await batch.init()

const fromBlock = lastProcessedBlock + 1
const rangeEndBlock = fromBlock + config.eventsProcessingBatchSize
let toBlock = Math.min(lastBlockToProcess, rangeEndBlock)
const watcher = new EventWatcher({
name: 'direct-deposit',
startBlock: config.startBlock,
blockConfirmations: config.blockConfirmations,
eventName: 'SubmitDirectDeposit',
eventPollingInterval: config.eventPollingInterval,
eventsProcessingBatchSize: config.eventsProcessingBatchSize,
redis,
web3,
contract: DirectDepositQueueInstance,
callback: async events => {
const directDeposits: [string, DirectDeposit][] = []
for (let event of events) {
const dd = parseDirectDepositEvent(event.returnValues)
directDeposits.push([dd.nonce, dd])
}

let events = await getEvents(DirectDepositQueueInstance, eventName, {
fromBlock,
toBlock,
await batch.add(directDeposits)
},
})
logger.info(`Found ${events.length} direct-deposit events`)

const directDeposits: [string, DirectDeposit][] = []
for (let event of events) {
const dd = parseDirectDepositEvent(event.returnValues)
directDeposits.push([dd.nonce, dd])
}

await batch.add(directDeposits)

logger.debug('Updating last processed block', { lastProcessedBlock: toBlock.toString() })
await updateLastProcessedBlock(toBlock)
}

async function runWatcher() {
try {
await watch()
} catch (e) {
logger.error(e)
}

setTimeout(() => {
runWatcher()
}, config.eventPollingInterval)
await watcher.init()
return watcher
}

init()
init().then(w => w.run())
38 changes: 38 additions & 0 deletions zp-relayer/init.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type Web3 from 'web3'
import { toBN } from 'web3-utils'
import { Mutex } from 'async-mutex'
import { Params } from 'libzkbob-rs-node'
import { pool } from './pool'
Expand All @@ -17,6 +18,9 @@ import { FeeManagerType, FeeManager, StaticFeeManager, DynamicFeeManager, Optimi
import type { IPriceFeed } from './services/price-feed/IPriceFeed'
import type { IWorkerBaseConfig } from './workers/workerTypes'
import { NativePriceFeed, OneInchPriceFeed, PriceFeedType } from './services/price-feed'
import { createForcedExitWorker } from './workers/forcedExitWorker'
import { EventWatcher } from './services/EventWatcher'
import { forcedExitQueue } from './queue/forcedExitQueue'

function buildProver<T extends Circuit>(circuit: T, type: ProverType, path: string): IProver<T> {
if (type === ProverType.Local) {
Expand Down Expand Up @@ -107,6 +111,37 @@ export async function init() {
const feeManager = buildFeeManager(config.feeManagerType, priceFeed, gasPriceService, web3)
await feeManager.start()

const forcedExitWatcher = new EventWatcher({
name: 'forced-exit',
startBlock: config.forcedExitStartBlock,
blockConfirmations: config.forcedExitBlockConfirmations,
eventName: 'CommitForcedExit',
eventPollingInterval: config.forcedExitPollingInterval,
eventsProcessingBatchSize: config.eventsProcessingBatchSize,
redis,
web3,
contract: pool.PoolInstance,
callback: async events => {
for (let event of events) {
const nullifier = event.returnValues.nullifier as string
await pool.state.nullifiers.add([nullifier])

const exitEnd = toBN(event.returnValues.exitEnd)
const now = toBN(Math.floor(Date.now() / 1000))
await forcedExitQueue.add(
nullifier,
{ nullifier },
{
// add a 10 minute buffer
delay: exitEnd.sub(now).addn(600).muln(1000).toNumber(),
}
)
}
},
})
await forcedExitWatcher.init()
forcedExitWatcher.run()

const workerPromises = [
createPoolTxWorker({
...baseConfig,
Expand All @@ -125,6 +160,9 @@ export async function init() {
...baseConfig,
directDepositProver,
}),
createForcedExitWorker({
...baseConfig,
}),
]

const workers = await Promise.all(workerPromises)
Expand Down
11 changes: 11 additions & 0 deletions zp-relayer/queue/forcedExitQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Queue } from 'bullmq'
import { FORCED_EXIT_QUEUE_NAME } from '@/utils/constants'
import { redis } from '@/services/redisClient'

export interface ForcedExitPayload {
nullifier: string
}

export const forcedExitQueue = new Queue<ForcedExitPayload>(FORCED_EXIT_QUEUE_NAME, {
connection: redis,
})
86 changes: 86 additions & 0 deletions zp-relayer/services/EventWatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Reference implementation:
// https://github.com/omni/tokenbridge/blob/master/oracle/src/watcher.js
import { logger } from '@/services/appLogger'
import { getBlockNumber, getEvents } from '@/utils/web3'
import type { Redis } from 'ioredis'
import type Web3 from 'web3'
import type { Contract, EventData } from 'web3-eth-contract'

interface IWatcherConfig {
name: string
startBlock: number
blockConfirmations: number
eventName: string
eventPollingInterval: number
eventsProcessingBatchSize: number
redis: Redis
web3: Web3
contract: Contract
callback: (events: EventData[]) => Promise<void>
}

export class EventWatcher {
lastProcessedBlock: number
lastBlockRedisKey: string

constructor(private config: IWatcherConfig) {
this.lastBlockRedisKey = `${config.name}:lastProcessedBlock`
this.lastProcessedBlock = Math.max(config.startBlock - 1, 0)
}

async init() {
await this.getLastProcessedBlock()
}

private async getLastProcessedBlock() {
const result = await this.config.redis.get(this.lastBlockRedisKey)
logger.debug('Last Processed block obtained', { fromRedis: result, fromConfig: this.lastProcessedBlock })
this.lastProcessedBlock = result ? parseInt(result, 10) : this.lastProcessedBlock
}

private updateLastProcessedBlock(lastBlockNumber: number) {
this.lastProcessedBlock = lastBlockNumber
return this.config.redis.set(this.lastBlockRedisKey, this.lastProcessedBlock)
}

private async getLastBlockToProcess(web3: Web3) {
const lastBlockNumber = await getBlockNumber(web3)
return lastBlockNumber - this.config.blockConfirmations
}

private async watch() {
const lastBlockToProcess = await this.getLastBlockToProcess(this.config.web3)

if (lastBlockToProcess <= this.lastProcessedBlock) {
logger.debug('All blocks already processed')
return
}

const fromBlock = this.lastProcessedBlock + 1
const rangeEndBlock = fromBlock + this.config.eventsProcessingBatchSize
let toBlock = Math.min(lastBlockToProcess, rangeEndBlock)

let events = await getEvents(this.config.contract, this.config.eventName, {
fromBlock,
toBlock,
})
logger.info(`Found ${events.length} events`)

await this.config.callback(events)

logger.debug('Updating last processed block', { lastProcessedBlock: toBlock.toString() })
await this.updateLastProcessedBlock(toBlock)
}

async run() {
try {
await this.watch()
} catch (e) {
logger.error(e)
}

setTimeout(() => {
this.run()
}, this.config.eventPollingInterval)
}
}
1 change: 1 addition & 0 deletions zp-relayer/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const constants = {
TX_QUEUE_NAME: 'tx',
SENT_TX_QUEUE_NAME: 'sent',
DIRECT_DEPOSIT_QUEUE_NAME: 'dd-prove',
FORCED_EXIT_QUEUE_NAME: 'forced-exit',
DIRECT_DEPOSIT_SET_NAME: 'dd:cache',
DIRECT_DEPOSIT_REPROCESS_NAME: 'dd:reprocess',
DIRECT_DEPOSIT_REPROCESS_INTERVAL: 60 * 1000,
Expand Down
44 changes: 44 additions & 0 deletions zp-relayer/workers/forcedExitWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { toBN } from 'web3-utils'
import { Job, Worker } from 'bullmq'
import { logger } from '@/services/appLogger'
import { contractCallRetry, withErrorLog } from '@/utils/helpers'
import { FORCED_EXIT_QUEUE_NAME } from '@/utils/constants'
import type { IForcedExitWorkerConfig } from './workerTypes'
import { ForcedExitPayload } from '@/queue/forcedExitQueue'
import { pool } from '@/pool'

export async function createForcedExitWorker({ redis }: IForcedExitWorkerConfig) {
const workerLogger = logger.child({ worker: 'forced-exit' })
const WORKER_OPTIONS = {
autorun: false,
connection: redis,
concurrency: 1,
}

const forcedExitWorkerProcessor = async (job: Job<ForcedExitPayload>) => {
const jobLogger = workerLogger.child({ jobId: job.id })

const nullifier = job.data.nullifier

const isPresent = await contractCallRetry(pool.PoolInstance, 'nullifiers', [nullifier]).then(toBN)
if (isPresent.isZero()) {
jobLogger.info('User has not finalized forced exit, removing it', { nullifier })
await pool.state.nullifiers.remove([nullifier])
} else {
jobLogger.info('User successfully finalized forced exit', { nullifier })

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
jobLogger.info('User successfully finalized forced exit', { nullifier })
jobLogger.info('Forced exit was outdated', { nullifier })

It's not pretty correct message. User also may cancel FE during 10-min buffer time interval in forced exit queue

}
return
}

const forcedExitWorker = new Worker<ForcedExitPayload>(
FORCED_EXIT_QUEUE_NAME,
job => withErrorLog(() => forcedExitWorkerProcessor(job)),
WORKER_OPTIONS
)

forcedExitWorker.on('error', e => {
workerLogger.info('DIRECT-DEPOSIT_WORKER ERR: %o', e)
})

return forcedExitWorker
}
Loading