diff --git a/backend/lib/deal-observer.js b/backend/lib/deal-observer.js index 77363a2..aebd32e 100644 --- a/backend/lib/deal-observer.js +++ b/backend/lib/deal-observer.js @@ -1,10 +1,11 @@ /** @import {Queryable} from '@filecoin-station/deal-observer-db' */ /** @import { Static } from '@sinclair/typebox' */ +/** @import { ActiveDeal } from '@filecoin-station/deal-observer-db/lib/types.js' */ import { getActorEvents, getActorEventsFilter, getChainHead } from './rpc-service/service.js' import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' import { Value } from '@sinclair/typebox/value' -import { convertBlockEventToActiveDealDbEntry } from './utils.js' +import { convertBlockEventToActiveDeal } from './utils.js' /** * @param {Queryable} pgPool @@ -35,7 +36,7 @@ export const fetchAndStoreActiveDeals = async (blockHeight, pgPool, makeRpcReque const eventType = 'claim' const blockEvents = await getActorEvents(getActorEventsFilter(blockHeight, eventType), makeRpcRequest) console.log(`Fetched ${blockEvents.length} ${eventType} events from block ${blockHeight}`) - await storeActiveDeals(blockEvents.map((event) => convertBlockEventToActiveDealDbEntry(event)), pgPool) + await storeActiveDeals(blockEvents.map((event) => convertBlockEventToActiveDeal(event)), pgPool) } /** @@ -59,7 +60,7 @@ export async function countStoredActiveDeals (pgPool) { } /** - * @param {Static[]} activeDeals + * @param {Static[]} activeDeals * @param {Queryable} pgPool * @returns {Promise} * */ diff --git a/backend/lib/look-up-payload-cids.js b/backend/lib/look-up-payload-cids.js index f1b24ac..2851a34 100644 --- a/backend/lib/look-up-payload-cids.js +++ b/backend/lib/look-up-payload-cids.js @@ -67,22 +67,14 @@ async function updatePayloadCidInActiveDeal (pgPool, deal, newPayloadRetrievalSt const updateQuery = ` UPDATE active_deals SET payload_cid = $1, payload_retrievability_state = $2, last_payload_retrieval_attempt = $3 - WHERE activated_at_epoch = $4 AND miner_id = $5 AND client_id = $6 AND piece_cid = $7 AND piece_size = $8 AND term_start_epoch = $9 AND term_min = $10 AND term_max = $11 AND sector_id = $12 + WHERE id = $4 ` try { await pgPool.query(updateQuery, [ newPayloadCid, newPayloadRetrievalState, lastRetrievalAttemptTimestamp, - deal.activated_at_epoch, - deal.miner_id, - deal.client_id, - deal.piece_cid, - deal.piece_size, - deal.term_start_epoch, - deal.term_min, - deal.term_max, - deal.sector_id + deal.id ]) } catch (error) { throw Error(util.format('Error updating payload of deal: ', deal), { cause: error }) diff --git a/backend/lib/utils.js b/backend/lib/utils.js index 4d5d72d..f1da573 100644 --- a/backend/lib/utils.js +++ b/backend/lib/utils.js @@ -1,15 +1,15 @@ /** @import { BlockEvent } from './rpc-service/data-types.js' */ -import { ActiveDealDbEntry, PayloadRetrievabilityState } from '@filecoin-station/deal-observer-db/lib/types.js' +import { ActiveDeal, PayloadRetrievabilityState } from '@filecoin-station/deal-observer-db/lib/types.js' import { Value } from '@sinclair/typebox/value' /** @import { Static } from '@sinclair/typebox' */ /** * * @param {Static } blockEvent - * @returns { Static < typeof ActiveDealDbEntry> } + * @returns { Static < typeof ActiveDeal> } */ -export function convertBlockEventToActiveDealDbEntry (blockEvent) { - return Value.Parse(ActiveDealDbEntry, { +export function convertBlockEventToActiveDeal (blockEvent) { + return Value.Parse(ActiveDeal, { activated_at_epoch: blockEvent.height, miner_id: blockEvent.event.provider, client_id: blockEvent.event.client, diff --git a/backend/test/deal-observer.test.js b/backend/test/deal-observer.test.js index efec2df..c87616c 100644 --- a/backend/test/deal-observer.test.js +++ b/backend/test/deal-observer.test.js @@ -4,7 +4,7 @@ import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observ import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, loadDeals, storeActiveDeals, observeBuiltinActorEvents } from '../lib/deal-observer.js' import { Value } from '@sinclair/typebox/value' import { BlockEvent } from '../lib/rpc-service/data-types.js' -import { convertBlockEventToActiveDealDbEntry } from '../lib/utils.js' +import { convertBlockEventToActiveDeal } from '../lib/utils.js' import { PayloadRetrievabilityState } from '@filecoin-station/deal-observer-db/lib/types.js' import { chainHeadTestData } from './test_data/chainHead.js' import { rawActorEventTestData } from './test_data/rawActorEvent.js' @@ -23,6 +23,7 @@ describe('deal-observer-backend', () => { beforeEach(async () => { await pgPool.query('DELETE FROM active_deals') + await pgPool.query('ALTER SEQUENCE active_deals_id_seq RESTART WITH 1') }) it('adds new FIL+ deals from built-in actor events to storage', async () => { @@ -39,10 +40,11 @@ describe('deal-observer-backend', () => { payload_cid: undefined } const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06' }) - const dbEntry = convertBlockEventToActiveDealDbEntry(event) - await storeActiveDeals([dbEntry], pgPool) + const activeDeal = convertBlockEventToActiveDeal(event) + await storeActiveDeals([activeDeal], pgPool) const actualData = await loadDeals(pgPool, 'SELECT * FROM active_deals') const expectedData = { + id: 1, activated_at_epoch: event.height, miner_id: eventData.provider, client_id: eventData.client, @@ -74,8 +76,8 @@ describe('deal-observer-backend', () => { last_payload_retrieval_attempt: undefined } const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06' }) - const dbEntry = convertBlockEventToActiveDealDbEntry(event) - await storeActiveDeals([dbEntry], pgPool) + const activeDeal = convertBlockEventToActiveDeal(event) + await storeActiveDeals([activeDeal], pgPool) const expected = await loadDeals(pgPool, 'SELECT * FROM active_deals') const actual = await fetchDealWithHighestActivatedEpoch(pgPool) assert.deepStrictEqual(expected, [actual]) @@ -84,8 +86,8 @@ describe('deal-observer-backend', () => { it('check number of stored deals', async () => { const storeBlockEvent = async (eventData) => { const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06' }) - const dbEntry = convertBlockEventToActiveDealDbEntry(event) - await storeActiveDeals([dbEntry], pgPool) + const activeDeal = convertBlockEventToActiveDeal(event) + await storeActiveDeals([activeDeal], pgPool) } const data = { id: 1, @@ -132,6 +134,7 @@ describe('deal-observer-backend built in actor event observer', () => { beforeEach(async () => { await pgPool.query('DELETE FROM active_deals') + await pgPool.query('ALTER SEQUENCE active_deals_id_seq RESTART WITH 1') }) it('stores all retrievable active deals if database is empty', async () => { await observeBuiltinActorEvents(pgPool, makeRpcRequest, 10, 0) diff --git a/backend/test/look-up-payload-cids.test.js b/backend/test/look-up-payload-cids.test.js index 0fddb76..36393e5 100644 --- a/backend/test/look-up-payload-cids.test.js +++ b/backend/test/look-up-payload-cids.test.js @@ -8,7 +8,7 @@ import assert from 'assert' import { minerPeerIds } from './test_data/minerInfo.js' import { payloadCIDs } from './test_data/payloadCIDs.js' import { Value } from '@sinclair/typebox/value' -import { ActiveDealDbEntry, PayloadRetrievabilityState } from '@filecoin-station/deal-observer-db/lib/types.js' +import { ActiveDeal, PayloadRetrievabilityState } from '@filecoin-station/deal-observer-db/lib/types.js' import { countStoredActiveDealsWithUnresolvedPayloadCid, lookUpPayloadCids } from '../lib/look-up-payload-cids.js' describe('deal-observer-backend look up payload CIDs', () => { @@ -36,6 +36,7 @@ describe('deal-observer-backend look up payload CIDs', () => { beforeEach(async () => { await pgPool.query('DELETE FROM active_deals') + await pgPool.query('ALTER SEQUENCE active_deals_id_seq RESTART WITH 1') const startEpoch = 4622129 for (let blockHeight = startEpoch; blockHeight < startEpoch + 10; blockHeight++) { await fetchAndStoreActiveDeals(blockHeight, pgPool, makeRpcRequest) @@ -103,6 +104,7 @@ describe('deal-observer-backend piece indexer payload retrieval', () => { beforeEach(async () => { await pgPool.query('DELETE FROM active_deals') + await pgPool.query('ALTER SEQUENCE active_deals_id_seq RESTART WITH 1') }) it('piece indexer does not retry to fetch missing payloads if the last retrieval was too recent', async (t) => { const returnPayload = false @@ -112,7 +114,7 @@ describe('deal-observer-backend piece indexer payload retrieval', () => { return returnPayload ? payloadCid : null } - const deal = Value.Parse(ActiveDealDbEntry, { + const deal = Value.Parse(ActiveDeal, { miner_id: 1, piece_cid: pieceCid, client_id: 1, @@ -128,14 +130,15 @@ describe('deal-observer-backend piece indexer payload retrieval', () => { }) await storeActiveDeals([deal], pgPool) - assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals'))[0], deal) + const expectedDealDbEntry = { id: 1, ...deal } + assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals')), [expectedDealDbEntry]) // The payload is unretrievable and the last retrieval timestamp should be updated await lookUpPayloadCids(fetchMinerId, getDealPayloadCid, pgPool, 10000, now) // The timestamp on when the last retrieval of the payload was, was not yet set, so the piece indexer will try to fetch the payload assert.strictEqual(payloadsCalled, 1) - deal.last_payload_retrieval_attempt = new Date(now) - deal.payload_retrievability_state = PayloadRetrievabilityState.Unresolved - assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals'))[0], deal) + expectedDealDbEntry.last_payload_retrieval_attempt = new Date(now) + expectedDealDbEntry.payload_retrievability_state = PayloadRetrievabilityState.Unresolved + assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals')), [expectedDealDbEntry]) // If we retry now without changing the field last_payload_retrieval_attempt the function for calling payload should not be called await lookUpPayloadCids(fetchMinerId, getDealPayloadCid, pgPool, 10000, now) assert.strictEqual(payloadsCalled, 1) @@ -149,7 +152,7 @@ describe('deal-observer-backend piece indexer payload retrieval', () => { return returnPayload ? payloadCid : null } // If we set the last_payload_retrieval_attempt to a value more than three days ago the piece indexer should try again to fetch the payload CID - const deal = Value.Parse(ActiveDealDbEntry, { + const deal = Value.Parse(ActiveDeal, { miner_id: 1, piece_cid: pieceCid, client_id: 1, @@ -168,9 +171,10 @@ describe('deal-observer-backend piece indexer payload retrieval', () => { await lookUpPayloadCids(fetchMinerId, getDealPayloadCid, pgPool, 10000, now) assert.strictEqual(payloadsCalled, 1) // This is the second attempt that failed to fetch the payload CID so the deal should be marked as unretrievable - deal.payload_retrievability_state = PayloadRetrievabilityState.TerminallyUnretrievable - deal.last_payload_retrieval_attempt = new Date(now) - assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals'))[0], deal) + const expectedDealDbEntry = { id: 1, ...deal } + expectedDealDbEntry.payload_retrievability_state = PayloadRetrievabilityState.TerminallyUnretrievable + expectedDealDbEntry.last_payload_retrieval_attempt = new Date(now) + assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals')), [expectedDealDbEntry]) // Now the piece indexer should no longer call the payload request for this deal await lookUpPayloadCids(fetchMinerId, getDealPayloadCid, pgPool, 10000, now) assert.strictEqual(payloadsCalled, 1) @@ -184,7 +188,7 @@ describe('deal-observer-backend piece indexer payload retrieval', () => { return returnPayload ? payloadCid : null } // If we set the last_payload_retrieval_attempt to a value more than three days ago the piece indexer should try again to fetch the payload CID - const deal = Value.Parse(ActiveDealDbEntry, { + const deal = Value.Parse(ActiveDeal, { miner_id: 1, piece_cid: pieceCid, client_id: 1, @@ -202,11 +206,12 @@ describe('deal-observer-backend piece indexer payload retrieval', () => { await storeActiveDeals([deal], pgPool) await lookUpPayloadCids(fetchMinerId, getDealPayloadCid, pgPool, 10000, now) assert.strictEqual(payloadsCalled, 1) - deal.last_payload_retrieval_attempt = new Date(now) - deal.payload_cid = payloadCid - deal.payload_retrievability_state = PayloadRetrievabilityState.Resolved + const expectedDealDbEntry = { id: 1, ...deal } + expectedDealDbEntry.last_payload_retrieval_attempt = new Date(now) + expectedDealDbEntry.payload_cid = payloadCid + expectedDealDbEntry.payload_retrievability_state = PayloadRetrievabilityState.Resolved // The second attempt at retrieving the payload cid was successful and this should be reflected in the database entry - assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals'))[0], deal) + assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals')), [expectedDealDbEntry]) // Now the piece indexer should no longer call the payload request for this deal await lookUpPayloadCids(fetchMinerId, getDealPayloadCid, pgPool, 10000, now) diff --git a/db/lib/types.js b/db/lib/types.js index ff2ad3c..e758fce 100644 --- a/db/lib/types.js +++ b/db/lib/types.js @@ -1,4 +1,4 @@ -import { Type } from '@sinclair/typebox' +import { Omit, Type } from '@sinclair/typebox' const PayloadRetrievabilityState = { NotQueried: 'PAYLOAD_CID_NOT_QUERIED_YET', @@ -10,6 +10,7 @@ const PayloadRetrievabilityState = { const PayloadRetrievabilityStateType = Type.Enum(PayloadRetrievabilityState) const ActiveDealDbEntry = Type.Object({ + id: Type.Number(), activated_at_epoch: Type.Number(), miner_id: Type.Number(), client_id: Type.Number(), @@ -24,4 +25,8 @@ const ActiveDealDbEntry = Type.Object({ last_payload_retrieval_attempt: Type.Optional(Type.Date()) }) -export { ActiveDealDbEntry, PayloadRetrievabilityState, PayloadRetrievabilityStateType } +// The content of an active deal that is to be stored in the database +// It does not contain the ID field, which is auto-generated by the database. +const ActiveDeal = Omit(ActiveDealDbEntry, ['id']) + +export { ActiveDealDbEntry, ActiveDeal, PayloadRetrievabilityState, PayloadRetrievabilityStateType } diff --git a/db/migrations/011.do.add-serial-index-to-active-deals.sql b/db/migrations/011.do.add-serial-index-to-active-deals.sql new file mode 100644 index 0000000..f0ddf4c --- /dev/null +++ b/db/migrations/011.do.add-serial-index-to-active-deals.sql @@ -0,0 +1 @@ +ALTER TABLE active_deals ADD COLUMN id SERIAL PRIMARY KEY; \ No newline at end of file