Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: check for missing payload #77

Merged
merged 19 commits into from
Feb 7, 2025
13 changes: 10 additions & 3 deletions backend/lib/deal-observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ export async function storeActiveDeals (activeDeals, pgPool) {
term_start_epoch,
term_min,
term_max,
sector_id
sector_id,
payload_unretrievable,
last_payload_retrieval
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
)
VALUES (
unnest($1::int[]),
Expand All @@ -71,7 +73,10 @@ export async function storeActiveDeals (activeDeals, pgPool) {
unnest($6::int[]),
unnest($7::int[]),
unnest($8::int[]),
unnest($9::bigint[])
unnest($9::bigint[]),
unnest($10::boolean[]),
unnest($11::timestamp[])

)
`
await pgPool.query(insertQuery, [
Expand All @@ -83,7 +88,9 @@ export async function storeActiveDeals (activeDeals, pgPool) {
activeDeals.map(deal => deal.term_start_epoch),
activeDeals.map(deal => deal.term_min),
activeDeals.map(deal => deal.term_max),
activeDeals.map(deal => deal.sector_id)
activeDeals.map(deal => deal.sector_id),
activeDeals.map(deal => deal.payload_unretrievable),
activeDeals.map(deal => deal.last_payload_retrieval)
])
} catch (error) {
// If any error occurs, roll back the transaction
Expand Down
28 changes: 17 additions & 11 deletions backend/lib/piece-indexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { getMinerPeerId } from './rpc-service/service.js'
/** @import { Static } from '@sinclair/typebox' */
/** @import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' */

const THREE_DAYS_IN_MILLISECONDS = 1000 * 60 * 60 * 24 * 3

/**
*
* @param {function} makeRpcRequest
Expand All @@ -14,25 +16,27 @@ import { getMinerPeerId } from './rpc-service/service.js'
* @param {number} maxDeals
* @returns {Promise<void>}
*/
export const indexPieces = async (makeRpcRequest, getDealPayloadCid, pgPool, maxDeals) => {
for (const deal of await fetchDealsWithNoPayloadCid(pgPool, maxDeals)) {
export const indexPieces = async (makeRpcRequest, getDealPayloadCid, pgPool, maxDeals, now = Date.now()) => {
for (const deal of await fetchDealsWithNoPayloadCid(pgPool, maxDeals, new Date(now - THREE_DAYS_IN_MILLISECONDS))) {
pyropy marked this conversation as resolved.
Show resolved Hide resolved
const minerPeerId = await getMinerPeerId(deal.miner_id, makeRpcRequest)
const payloadCid = await getDealPayloadCid(minerPeerId, deal.piece_cid)
if (payloadCid) {
deal.payload_cid = payloadCid
await updatePayloadInActiveDeal(pgPool, deal)
deal.payload_cid = await getDealPayloadCid(minerPeerId, deal.piece_cid)
if (!deal.payload_cid && deal.last_payload_retrieval) {
deal.payload_unretrievable = true
}
deal.last_payload_retrieval = new Date(now)
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
await updatePayloadInActiveDeal(pgPool, deal)
}
}

/**
* @param {Queryable} pgPool
* @param {number} maxDeals
* @param {Date} now
* @returns {Promise<Array<Static< typeof ActiveDealDbEntry>>>}
*/
export async function fetchDealsWithNoPayloadCid (pgPool, maxDeals) {
const query = 'SELECT * FROM active_deals WHERE payload_cid IS NULL ORDER BY activated_at_epoch ASC LIMIT $1'
return await loadDeals(pgPool, query, [maxDeals])
export async function fetchDealsWithNoPayloadCid (pgPool, maxDeals, now) {
const query = 'SELECT * FROM active_deals WHERE payload_cid IS NULL AND payload_unretrievable IS DISTINCT FROM TRUE AND (last_payload_retrieval IS NULL OR last_payload_retrieval < $1) ORDER BY activated_at_epoch ASC LIMIT $2'
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
return await loadDeals(pgPool, query, [now, maxDeals])
}

/**
Expand All @@ -43,12 +47,14 @@ export async function fetchDealsWithNoPayloadCid (pgPool, maxDeals) {
async function updatePayloadInActiveDeal (pgPool, deal) {
const updateQuery = `
UPDATE active_deals
SET payload_cid = $1
WHERE activated_at_epoch = $2 AND miner_id = $3 AND client_id = $4 AND piece_cid = $5 AND piece_size = $6 AND term_start_epoch = $7 AND term_min = $8 AND term_max = $9 AND sector_id = $10
SET payload_cid = $1, payload_unretrievable = $2, last_payload_retrieval = $3
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
WHERE activated_at_epoch = $4 AND miner_id = $5 AND client_id = $6 AND piece_cid = $7 AND piece_size = $8 AND term_start_epoch = $9 AND term_min = $10 AND term_max = $11 AND sector_id = $12
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
`
try {
await pgPool.query(updateQuery, [
deal.payload_cid,
deal.payload_unretrievable,
deal.last_payload_retrieval,
deal.activated_at_epoch,
deal.miner_id,
deal.client_id,
Expand Down
4 changes: 3 additions & 1 deletion backend/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ export function convertBlockEventToActiveDealDbEntry (blockEvent) {
term_min: blockEvent.event.termMin,
term_max: blockEvent.event.termMax,
sector_id: blockEvent.event.sector,
payload_cid: undefined
payload_cid: undefined,
payload_unretrievable: undefined,
last_payload_retrieval: undefined
})
}
8 changes: 6 additions & 2 deletions backend/test/deal-observer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ describe('deal-observer-backend', () => {
term_min: eventData.termMin,
term_max: eventData.termMax,
sector_id: eventData.sector,
payload_cid: undefined
payload_cid: undefined,
payload_unretrievable: undefined,
last_payload_retrieval: undefined
}
assert.deepStrictEqual(actualData, [expectedData])
})
Expand All @@ -63,7 +65,9 @@ describe('deal-observer-backend', () => {
termMin: 12340,
termMax: 12340,
sector: 6n,
payload_cid: undefined
payload_cid: undefined,
payload_unretrievable: undefined,
last_payload_retrieval: undefined
}
const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06' })
const dbEntry = convertBlockEventToActiveDealDbEntry(event)
Expand Down
134 changes: 133 additions & 1 deletion backend/test/piece-indexer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import { before, beforeEach, it, describe, after } from 'node:test'
import { rawActorEventTestData } from './test_data/rawActorEvent.js'
import { chainHeadTestData } from './test_data/chainHead.js'
import { parse } from '@ipld/dag-json'
import { observeBuiltinActorEvents } from '../lib/deal-observer.js'
import { loadDeals, observeBuiltinActorEvents, storeActiveDeals } from '../lib/deal-observer.js'
import assert from 'assert'
import { minerPeerIds } from './test_data/minerInfo.js'
import { payloadCIDs } from './test_data/payloadCIDs.js'
import { indexPieces } from '../lib/piece-indexer.js'
import { Value } from '@sinclair/typebox/value'
import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js'

describe('deal-observer-backend piece indexer', () => {
const makeRpcRequest = async (method, params) => {
Expand Down Expand Up @@ -64,3 +66,133 @@ describe('deal-observer-backend piece indexer', () => {
)
})
})

describe('deal-observer-backend piece indexer payload retrieval', () => {
let pgPool
const payloadCid = 'PAYLOAD_CID'
const minerPeerId = 'MINER_PEER_ID'
const pieceCid = 'PIECE_CID'
const now = Date.now()
const fetchMinerId = async () => {
return { PeerId: minerPeerId }
}

before(async () => {
pgPool = await createPgPool()
await migrateWithPgClient(pgPool)
})

after(async () => {
await pgPool.end()
})

beforeEach(async () => {
await pgPool.query('DELETE FROM active_deals')
})
it('piece indexer does not retry to fetch missing payloads if the last retrieval was too recent', async (t) => {
const returnPayload = false
let payloadsCalled = 0
const getDealPayloadCid = async () => {
payloadsCalled++
return returnPayload ? payloadCid : null
}

const deal = Value.Parse(ActiveDealDbEntry, {
miner_id: 1,
piece_cid: pieceCid,
client_id: 1,
activated_at_epoch: 1,
piece_size: 1000,
term_start_epoch: 1,
term_min: 1,
term_max: 1,
sector_id: 1,
payload_cid: undefined,
payload_unretrievable: undefined,
last_payload_retrieval: undefined
})

await storeActiveDeals([deal], pgPool)
assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals'))[0], deal)
// The payload is unretrievable and the last retrieval timestamp should be updated
await indexPieces(fetchMinerId, getDealPayloadCid, pgPool, 10000, now)
// The timestamp on when the last retrieval of the payload was, was not yet set, so the piece indexer will try to fetch the payload
assert.strictEqual(payloadsCalled, 1)
deal.last_payload_retrieval = new Date(now)
assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals'))[0], deal)
// If we retry now without changing the field last_payload_retrieval the function for calling payload should not be called
await indexPieces(fetchMinerId, getDealPayloadCid, pgPool, 10000, now)
assert.strictEqual(payloadsCalled, 1)
})

it('piece indexer sets the payload to be unretrievable if the second attempt fails', async (t) => {
const returnPayload = false
let payloadsCalled = 0
const getDealPayloadCid = async () => {
payloadsCalled++
return returnPayload ? payloadCid : null
}
// If we set the last_payload_retrieval to a value more than three days ago the piece indexer should try again to fetch the payload CID
const deal = Value.Parse(ActiveDealDbEntry, {
miner_id: 1,
piece_cid: pieceCid,
client_id: 1,
activated_at_epoch: 1,
piece_size: 1000,
term_start_epoch: 1,
term_min: 1,
term_max: 1,
sector_id: 1,
payload_cid: undefined,
payload_unretrievable: undefined,
last_payload_retrieval: new Date(now - 1000 * 60 * 60 * 24 * 4)
})

await storeActiveDeals([deal], pgPool)
await indexPieces(fetchMinerId, getDealPayloadCid, pgPool, 10000, now)
assert.strictEqual(payloadsCalled, 1)
// This is the second attempt that failed to fetch the payload CID so the deal should be marked as unretrievable
deal.payload_unretrievable = true
deal.last_payload_retrieval = new Date(now)
assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals'))[0], deal)
// Now the piece indexer should no longer call the payload request for this deal
await indexPieces(fetchMinerId, getDealPayloadCid, pgPool, 10000, now)
assert.strictEqual(payloadsCalled, 1)
})

it('piece indexer correctly udpates the payloads if the retry succeeeds', async (t) => {
const returnPayload = true
let payloadsCalled = 0
const getDealPayloadCid = async () => {
payloadsCalled++
return returnPayload ? payloadCid : null
}
// If we set the last_payload_retrieval to a value more than three days ago the piece indexer should try again to fetch the payload CID
const deal = Value.Parse(ActiveDealDbEntry, {
miner_id: 1,
piece_cid: pieceCid,
client_id: 1,
activated_at_epoch: 1,
piece_size: 1000,
term_start_epoch: 1,
term_min: 1,
term_max: 1,
sector_id: 1,
payload_cid: undefined,
payload_unretrievable: undefined,
last_payload_retrieval: new Date(now - 1000 * 60 * 60 * 24 * 4)
})

await storeActiveDeals([deal], pgPool)
await indexPieces(fetchMinerId, getDealPayloadCid, pgPool, 10000, now)
assert.strictEqual(payloadsCalled, 1)
deal.last_payload_retrieval = new Date(now)
deal.payload_cid = payloadCid
// The second attempt at retrieving the payload cid was successful and this should be reflected in the database entry
assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals'))[0], deal)

// Now the piece indexer should no longer call the payload request for this deal
await indexPieces(fetchMinerId, getDealPayloadCid, pgPool, 10000, now)
assert.strictEqual(payloadsCalled, 1)
})
})
4 changes: 3 additions & 1 deletion db/lib/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ const ActiveDealDbEntry = Type.Object({
term_min: Type.Number(),
term_max: Type.Number(),
sector_id: Type.BigInt(),
payload_cid: Type.Optional(Type.String())
payload_cid: Type.Optional(Type.String()),
payload_unretrievable: Type.Optional(Type.Boolean()),
last_payload_retrieval: Type.Optional(Type.Date())
})

export { ActiveDealDbEntry }
3 changes: 3 additions & 0 deletions db/migrations/009.do.add-payload-retrievability-column.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE active_deals
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
ADD COLUMN payload_unretrievable BOOLEAN,
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
ADD COLUMN last_payload_retrieval TIMESTAMP WITH TIME ZONE;
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 4 additions & 0 deletions db/migrations/010.do.index-missing-payloads.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE INDEX CONCURRENTLY IF NOT EXISTS missing_payloads_idx
ON active_deals (activated_at_epoch)
WHERE payload_cid IS NULL
AND payload_unretrievable IS DISTINCT FROM TRUE;