Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: check for missing payload #77

Merged
merged 19 commits into from
Feb 7, 2025
4 changes: 3 additions & 1 deletion backend/bin/deal-observer-backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, observeBuil
import { indexPieces } from '../lib/piece-indexer.js'
import { findAndSubmitUnsubmittedDeals, submitDealsToSparkApi } from '../lib/spark-api-submit-deals.js'
import { getDealPayloadCid } from '../lib/piece-indexer-service.js'
import NodeCache from 'node-cache'

const {
INFLUXDB_TOKEN,
Expand Down Expand Up @@ -128,12 +129,13 @@ const sparkApiSubmitDealsLoop = async (pgPool, { sparkApiBaseUrl, sparkApiToken,

export const pieceIndexerLoop = async (makeRpcRequest, getDealPayloadCid, pgPool) => {
const LOOP_NAME = 'Piece Indexer'
const payloadsCache = new NodeCache()
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
while (true) {
const start = Date.now()
// Maximum number of deals to index in one loop iteration
const maxDeals = 1000
try {
await indexPieces(makeRpcRequest, getDealPayloadCid, pgPool, maxDeals)
await indexPieces(makeRpcRequest, getDealPayloadCid, pgPool, maxDeals, payloadsCache)
} catch (e) {
console.error(e)
Sentry.captureException(e)
Expand Down
52 changes: 45 additions & 7 deletions backend/lib/piece-indexer.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,74 @@
import { loadDeals } from './deal-observer.js'
import * as util from 'node:util'
import { getMinerPeerId } from './rpc-service/service.js'
import assert from 'node:assert'

/** @import {Queryable} from '@filecoin-station/deal-observer-db' */
/** @import { Static } from '@sinclair/typebox' */
/** @import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' */
/** @import * as NodeCache from 'node-cache' */

const EIGHT_HOURS_IN_MILLISECONDS = 1000 * 60 * 60 * 8
const MAX_RETRIES = 4

/**
*
* @param {function} makeRpcRequest
* @param {function} getDealPayloadCid
* @param {Queryable} pgPool
* @param {number} maxDeals
* @param {NodeCache} cache
* @returns {Promise<void>}
*/
export const indexPieces = async (makeRpcRequest, getDealPayloadCid, pgPool, maxDeals) => {
export const indexPieces = async (makeRpcRequest, getDealPayloadCid, pgPool, maxDeals, cache, now = Date.now()) => {
for (const deal of await fetchDealsWithNoPayloadCid(pgPool, maxDeals)) {
const minerPeerId = await getMinerPeerId(deal.miner_id, makeRpcRequest)
const payloadCid = await getDealPayloadCid(minerPeerId, deal.piece_cid)
if (payloadCid) {
deal.payload_cid = payloadCid
const cacheKey = JSON.stringify({ minerPeerId, pieceCid: deal.piece_cid })
if (cache.has(cacheKey)) {
await checkCacheForRetrievablePayloads(minerPeerId, deal, getDealPayloadCid, cache, now)
} else {
deal.payload_cid = await getDealPayloadCid(minerPeerId, deal.piece_cid)
if (!deal.payload_cid) {
cache.set(cacheKey, { retriesLeft: MAX_RETRIES, lastRetry: now })
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
} else {
deal.payload_unretrievable = false
}
}
if (deal.payload_cid || deal.payload_unretrievable) {
await updatePayloadInActiveDeal(pgPool, deal)
}
}
}

export async function checkCacheForRetrievablePayloads (minerPeerId, deal, getDealPayloadCid, cache, now) {
const cacheKey = JSON.stringify({ minerPeerId, pieceCid: deal.piece_cid })
let { retriesLeft, lastRetry } = cache.get(cacheKey)
assert(retriesLeft > 0)
// We retry to fetch a payload if there are retries left and the last retry was more than 8 hours ago
if (lastRetry <= now - EIGHT_HOURS_IN_MILLISECONDS) {
deal.payload_cid = await getDealPayloadCid(minerPeerId, deal.pieceCid)
if (deal.payload_cid) {
deal.payload_unretrievable = false
cache.del(cacheKey)
} else {
retriesLeft = retriesLeft - 1
if (retriesLeft === 0) {
deal.payload_unretrievable = true
cache.del(cacheKey)
} else {
cache.set(cacheKey, { retriesLeft, lastRetry: now })
}
}
}
}

/**
* @param {Queryable} pgPool
* @param {number} maxDeals
* @returns {Promise<Array<Static< typeof ActiveDealDbEntry>>>}
*/
export async function fetchDealsWithNoPayloadCid (pgPool, maxDeals) {
const query = 'SELECT * FROM active_deals WHERE payload_cid IS NULL ORDER BY activated_at_epoch ASC LIMIT $1'
const query = 'SELECT * FROM active_deals WHERE payload_cid IS NULL AND (payload_unretrievable IS NULL OR payload_unretrievable = FALSE) ORDER BY activated_at_epoch ASC LIMIT $1'
return await loadDeals(pgPool, query, [maxDeals])
}

Expand All @@ -43,12 +80,13 @@ export async function fetchDealsWithNoPayloadCid (pgPool, maxDeals) {
async function updatePayloadInActiveDeal (pgPool, deal) {
const updateQuery = `
UPDATE active_deals
SET payload_cid = $1
WHERE activated_at_epoch = $2 AND miner_id = $3 AND client_id = $4 AND piece_cid = $5 AND piece_size = $6 AND term_start_epoch = $7 AND term_min = $8 AND term_max = $9 AND sector_id = $10
SET payload_cid = $1, payload_unretrievable = $2
WHERE activated_at_epoch = $3 AND miner_id = $4 AND client_id = $5 AND piece_cid = $6 AND piece_size = $7 AND term_start_epoch = $8 AND term_min = $9 AND term_max = $10 AND sector_id = $11
`
try {
await pgPool.query(updateQuery, [
deal.payload_cid,
deal.payload_unretrievable,
deal.activated_at_epoch,
deal.miner_id,
deal.client_id,
Expand Down
3 changes: 2 additions & 1 deletion backend/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export function convertBlockEventToActiveDealDbEntry (blockEvent) {
term_min: blockEvent.event.termMin,
term_max: blockEvent.event.termMax,
sector_id: blockEvent.event.sector,
payload_cid: undefined
payload_cid: undefined,
payload_unretrievable: undefined
})
}
1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"@sinclair/typebox": "^0.34.14",
"debug": "^4.4.0",
"multiformats": "^13.3.1",
"node-cache": "^5.1.2",
"p-retry": "^6.2.1",
"pg": "^8.13.1",
"pg-cursor": "^2.12.1",
Expand Down
6 changes: 4 additions & 2 deletions backend/test/deal-observer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ describe('deal-observer-backend', () => {
term_min: eventData.termMin,
term_max: eventData.termMax,
sector_id: eventData.sector,
payload_cid: undefined
payload_cid: undefined,
payload_unretrievable: undefined
}
assert.deepStrictEqual(actualData, [expectedData])
})
Expand All @@ -63,7 +64,8 @@ describe('deal-observer-backend', () => {
termMin: 12340,
termMax: 12340,
sector: 6n,
payload_cid: undefined
payload_cid: undefined,
payload_unretrievable: undefined
}
const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06' })
const dbEntry = convertBlockEventToActiveDealDbEntry(event)
Expand Down
142 changes: 139 additions & 3 deletions backend/test/piece-indexer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ import { before, beforeEach, it, describe, after } from 'node:test'
import { rawActorEventTestData } from './test_data/rawActorEvent.js'
import { chainHeadTestData } from './test_data/chainHead.js'
import { parse } from '@ipld/dag-json'
import { observeBuiltinActorEvents } from '../lib/deal-observer.js'
import { loadDeals, observeBuiltinActorEvents, storeActiveDeals } from '../lib/deal-observer.js'
import assert from 'assert'
import { minerPeerIds } from './test_data/minerInfo.js'
import { payloadCIDs } from './test_data/payloadCIDs.js'
import { indexPieces } from '../lib/piece-indexer.js'
import { checkCacheForRetrievablePayloads, indexPieces } from '../lib/piece-indexer.js'
import NodeCache from 'node-cache'
import { getMinerPeerId } from '../lib/rpc-service/service.js'
import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js'
import { Value } from '@sinclair/typebox/value'

describe('deal-observer-backend piece indexer', () => {
const makeRpcRequest = async (method, params) => {
Expand Down Expand Up @@ -46,6 +50,7 @@ describe('deal-observer-backend piece indexer', () => {

it('piece indexer loop function fetches deals where there exists no payload yet and updates the database entry', async (t) => {
const getDealPayloadCidCalls = []
const payloadsCache = new NodeCache()
const getDealPayloadCid = async (providerId, pieceCid) => {
getDealPayloadCidCalls.push({ providerId, pieceCid })
const payloadCid = payloadCIDs.get(JSON.stringify({ minerId: providerId, pieceCid }))
Expand All @@ -56,11 +61,142 @@ describe('deal-observer-backend piece indexer', () => {
(await pgPool.query('SELECT * FROM active_deals WHERE payload_cid IS NULL')).rows.length,
336
)
await indexPieces(makeRpcRequest, getDealPayloadCid, pgPool, 10000)
await indexPieces(makeRpcRequest, getDealPayloadCid, pgPool, 10000, payloadsCache)
assert.strictEqual(getDealPayloadCidCalls.length, 336)
assert.strictEqual(
(await pgPool.query('SELECT * FROM active_deals WHERE payload_cid IS NULL')).rows.length,
85 // Not all deals have a payload CID in the test data
)
})

it('piece indexer checks cache for missing pyloads', async (t) => {
const getDealPayloadCidCalls = []
const payloadsCache = new NodeCache()
const now = Date.now()
const getDealPayloadCid = async (providerId, pieceCid) => {
getDealPayloadCidCalls.push({ providerId, pieceCid })
return payloadCIDs.get(JSON.stringify({ minerId: providerId, pieceCid }))?.payloadCid
}

await indexPieces(makeRpcRequest, getDealPayloadCid, pgPool, 10000, payloadsCache, now)
const dealsWithNoPayloadCid = await loadDeals(pgPool, 'SELECT * FROM active_deals WHERE payload_cid IS NULL')
const numDealsCached = payloadsCache.keys().length
assert.strictEqual(dealsWithNoPayloadCid.length, numDealsCached)
// Make sure that each one of the deals that has no payload is cached
for (const deal of dealsWithNoPayloadCid) {
const minerPeerId = await getMinerPeerId(deal.miner_id, makeRpcRequest)
assert.deepStrictEqual(payloadsCache.get(JSON.stringify({ minerPeerId, pieceCid: deal.piece_cid })), { retriesLeft: 4, lastRetry: now })
}
})

it('piece indexer cache retries fetching payloads correctly', async (t) => {
const cache = new NodeCache()
const payloadCid = 'PAYLOAD_CID'
let returnPayload = false
let payloadsCalled = 0
const getDealPayloadCid = async () => {
payloadsCalled++
return returnPayload ? payloadCid : null
}
const minerPeerId = 'MINER_PEER_ID'
const pieceCid = 'PIECE_CID'
const now = Date.now()
const deal = Value.Parse(ActiveDealDbEntry, {
miner_id: 1,
piece_cid: pieceCid,
client_id: 1,
activated_at_epoch: 1,
piece_size: 1000,
term_start_epoch: 1,
term_min: 1,
term_max: 1,
sector_id: 1,
payload_cid: undefined,
payload_unretrievable: undefined
})
cache.set(JSON.stringify({ minerPeerId, pieceCid }), { retriesLeft: 2, lastRetry: now })
checkCacheForRetrievablePayloads(minerPeerId, deal, getDealPayloadCid, cache, now)
// Payloads should not be fetched as the last retry was less than 8 hours ago
assert.strictEqual(payloadsCalled, 0)
cache.set(JSON.stringify({ minerPeerId, pieceCid }), { retriesLeft: 2, lastRetry: now - 9 * 60 * 60 * 1000 })
await checkCacheForRetrievablePayloads(minerPeerId, deal, getDealPayloadCid, cache, now)
// Payloads should be fetched now as the last retry was more than 8 hours ago
assert.strictEqual(payloadsCalled, 1)
// After fetching the payload, the cache should be updated with the new retries left and last retry time
assert.deepEqual(cache.get(JSON.stringify({ minerPeerId, pieceCid })), { retriesLeft: 1, lastRetry: now })

// If we try again and there is only one retry left, the cache should delete the payload entry and the deal should be marked to have an unretrievable payload
cache.set(JSON.stringify({ minerPeerId, pieceCid }), { retriesLeft: 1, lastRetry: now - 9 * 60 * 60 * 1000 })
await checkCacheForRetrievablePayloads(minerPeerId, deal, getDealPayloadCid, cache, now)
assert.strictEqual(payloadsCalled, 2)
assert.strictEqual(cache.get(JSON.stringify({ minerPeerId, pieceCid })), undefined)
assert.strictEqual(deal.payload_unretrievable, true)
assert.strictEqual(deal.payload_cid, null)

// If we set the payload to be retrievable the entry should be marked to not be unretrievable and it should be removed from the cache
cache.set(JSON.stringify({ minerPeerId, pieceCid }), { retriesLeft: 2, lastRetry: now - 9 * 60 * 60 * 1000 })
returnPayload = true
await checkCacheForRetrievablePayloads(minerPeerId, deal, getDealPayloadCid, cache, now)
assert.strictEqual(payloadsCalled, 3)
assert.strictEqual(deal.payload_unretrievable, false)
assert.strictEqual(deal.payload_cid, payloadCid)
assert.strictEqual(cache.get(JSON.stringify({ minerPeerId, pieceCid })), undefined)
})

it('piece indexer updates deals that were set to be retrievable or not retrievable', async (t) => {
const cache = new NodeCache()
const payloadCid = 'PAYLOAD_CID'
let returnPayload = false
let payloadsCalled = 0
const minerPeerId = 'MINER_PEER_ID'
const getDealPayloadCid = async () => {
payloadsCalled++
return returnPayload ? payloadCid : null
}
const fetchMinerId = async () => {
return { PeerId: minerPeerId }
}
const pieceCid = 'PIECE_CID'
const now = Date.now()
const deal = Value.Parse(ActiveDealDbEntry, {
miner_id: 1,
piece_cid: pieceCid,
client_id: 1,
activated_at_epoch: 1,
piece_size: 1000,
term_start_epoch: 1,
term_min: 1,
term_max: 1,
sector_id: 1,
payload_cid: undefined,
payload_unretrievable: undefined
})
// We start with a clean table an insert a single deal with no payload
await pgPool.query('DELETE FROM active_deals')
await storeActiveDeals([deal], pgPool)
assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals'))[0], deal)
// The payload is unretrievable on the first try and should be stored in the cache
await indexPieces(fetchMinerId, getDealPayloadCid, pgPool, 10000, cache, now)
assert.strictEqual(payloadsCalled, 1)
assert.deepStrictEqual(cache.get(JSON.stringify({ minerPeerId, pieceCid })), { retriesLeft: 4, lastRetry: now })

// If the last retry is more than 9 hours ago we should retry and if it is the last retry the entry should be removed from the cache and marked as unretrievable
cache.set(JSON.stringify({ minerPeerId, pieceCid }), { retriesLeft: 1, lastRetry: now - 9 * 60 * 60 * 1000 })
await indexPieces(fetchMinerId, getDealPayloadCid, pgPool, 10000, cache, now)
assert.strictEqual(payloadsCalled, 2)
deal.payload_unretrievable = true
// The deal should be marked as unretrievable and reflected in the database
assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals WHERE payload_unretrievable = TRUE'))[0], deal)

// Payloads that can be retrieved on some retries should be marked as not unretrievable and the cache should remove the entry
await pgPool.query('DELETE FROM active_deals')
await storeActiveDeals([deal], pgPool)
cache.set(JSON.stringify({ minerPeerId, pieceCid }), { retriesLeft: 1, lastRetry: now - 9 * 60 * 60 * 1000 })
returnPayload = true
await indexPieces(fetchMinerId, getDealPayloadCid, pgPool, 10000, cache, now)
assert.strictEqual(payloadsCalled, 3)
deal.payload_unretrievable = false
deal.payload_cid = payloadCid
assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals WHERE payload_unretrievable = FALSE'))[0], deal)
})
})
3 changes: 2 additions & 1 deletion db/lib/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const ActiveDealDbEntry = Type.Object({
term_min: Type.Number(),
term_max: Type.Number(),
sector_id: Type.BigInt(),
payload_cid: Type.Optional(Type.String())
payload_cid: Type.Optional(Type.String()),
payload_unretrievable: Type.Optional(Type.Boolean())
})

export { ActiveDealDbEntry }
2 changes: 2 additions & 0 deletions db/migrations/009.do.add-payload-retrievability-column.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE active_deals
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
ADD COLUMN payload_unretrievable BOOLEAN;
22 changes: 22 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.