Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fetch payload CIDs from piece indexer #31

Merged
merged 163 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 137 commits
Commits
Show all changes
163 commits
Select commit Hold shift + click to select a range
0d40c1e
update README
Jan 14, 2025
22cf09a
formatting
Jan 14, 2025
0f4ee8a
poc claim retrieval
Jan 14, 2025
4d9c274
update filter
Jan 15, 2025
a5e94cb
add transformer for claim events
Jan 15, 2025
57d3177
formatting
Jan 15, 2025
70dfc02
add lotus service
Jan 15, 2025
37b7eae
add deal observer
Jan 15, 2025
1cf5de0
remove logs
Jan 15, 2025
5772bf3
test transformer
Jan 15, 2025
00e7742
merged with main
Jan 15, 2025
ff8b8cb
merged with main
Jan 15, 2025
6ef67c2
formatting
Jan 15, 2025
4579b79
package.json
Jan 15, 2025
57cfa42
formatting
Jan 15, 2025
1a5e1e2
formatting
Jan 15, 2025
0944082
add packages
Jan 15, 2025
4193352
remove unused imports
Jan 15, 2025
a2ab839
remove unused imports
Jan 15, 2025
dd86cad
filter for unique events
Jan 15, 2025
114357d
fix event read
Jan 15, 2025
23ce715
fix parsing
Jan 15, 2025
80af619
package.json
Jan 15, 2025
d3ce078
formatting
Jan 15, 2025
20e41a2
add test data
Jan 15, 2025
5f62533
add type for rawactorevent
Jan 15, 2025
296c237
add js imports
Jan 15, 2025
a81408f
remove schema test
Jan 15, 2025
b2137ea
add mock server
Jan 16, 2025
ee27e7c
feat: `active_deals` table
bajtos Jan 16, 2025
d824ac4
refactor lotus
Jan 16, 2025
a29cf0a
tests for service
Jan 16, 2025
4da1941
add utils
Jan 16, 2025
a00e61b
package json
Jan 16, 2025
c2ed0ad
formatting
Jan 16, 2025
cf379aa
formatting
Jan 16, 2025
2eb8f7a
rename formatter
Jan 16, 2025
2e79edc
rename service
Jan 16, 2025
9c49c62
rename validator file
Jan 16, 2025
6cdc0f8
formatting
Jan 16, 2025
e536643
namimg and type checking
Jan 16, 2025
fc88c78
one block per query
Jan 16, 2025
9a238da
observer loop implemented
Jan 17, 2025
59667de
URL name
Jan 17, 2025
77de363
added deal caching
Jan 20, 2025
e9062cb
add api endpoints
Jan 21, 2025
a388110
add tests for storing deals
Jan 21, 2025
301b6cc
add test for db type
Jan 21, 2025
43f6f4d
formatting
Jan 21, 2025
f5e855e
resolved merge conflicts
Jan 21, 2025
1bceb81
added pix service
Jan 23, 2025
dcfcb8e
Update backend/bin/deal-observer-backend.js
NikolasHaimerl Jan 23, 2025
2e92416
Update backend/test/rpc-client.test.js
NikolasHaimerl Jan 23, 2025
50d6940
Update backend/test/rpc-client.test.js
NikolasHaimerl Jan 23, 2025
d869d28
Update backend/lib/rpc-service/service.js
NikolasHaimerl Jan 23, 2025
932d3ea
Update backend/lib/rpc-service/utils.js
NikolasHaimerl Jan 23, 2025
cce73df
address PR threads
Jan 23, 2025
94cec11
add db types
Jan 23, 2025
a56d9bd
add db types
Jan 23, 2025
6d9f4b2
remove service
Jan 23, 2025
0e7fed3
fix migration script
Jan 23, 2025
2670c74
Update backend/lib/deal-observer.js
NikolasHaimerl Jan 23, 2025
2006cdc
Update backend/lib/deal-observer.js
NikolasHaimerl Jan 23, 2025
4eceb9c
Update backend/lib/deal-observer.js
NikolasHaimerl Jan 23, 2025
e8efc3e
Update backend/lib/deal-observer.js
NikolasHaimerl Jan 23, 2025
370f6f3
Update backend/lib/rpc-service/service.js
NikolasHaimerl Jan 23, 2025
03b99da
fix typing
Jan 23, 2025
7aafd0a
formatting
Jan 23, 2025
f4b5c18
merged with main
Jan 23, 2025
308a8c8
merged with main
Jan 23, 2025
373b18d
add piece indexer call
Jan 24, 2025
ee30693
Update backend/lib/deal-observer.js
NikolasHaimerl Jan 24, 2025
f1c2072
convert piece id to string
Jan 24, 2025
773cf66
formatting
Jan 24, 2025
ec29f3b
resolved merge conflicts
Jan 24, 2025
40e5c49
fix lock file
Jan 24, 2025
1b3331c
add unique constraint
Jan 24, 2025
21675d7
Update backend/bin/deal-observer-backend.js
NikolasHaimerl Jan 24, 2025
0f781ef
Update backend/lib/deal-observer.js
NikolasHaimerl Jan 24, 2025
197cbbd
Update backend/bin/deal-observer-backend.js
NikolasHaimerl Jan 24, 2025
3ec7a0d
Update backend/bin/deal-observer-backend.js
NikolasHaimerl Jan 24, 2025
0faeb9f
Update backend/bin/deal-observer-backend.js
NikolasHaimerl Jan 24, 2025
ac9e5d9
resolved PR threads
Jan 24, 2025
63ebd68
add piece indexer binary
Jan 25, 2025
28d582c
fix: set undefined height
Jan 25, 2025
c9f7c94
Update backend/bin/deal-observer-backend.js
NikolasHaimerl Jan 27, 2025
0a6badc
Update backend/bin/deal-observer-backend.js
NikolasHaimerl Jan 27, 2025
0caa839
formatting
Jan 27, 2025
dc1cbbc
Update backend/lib/rpc-service/utils.js
NikolasHaimerl Jan 27, 2025
1e5e2a5
Use Static<typeof BlockEvent> to avoid TS errors (#27)
bajtos Jan 27, 2025
ee95dc9
address pr threads
Jan 27, 2025
a081ddf
address pr threads
Jan 27, 2025
7841b47
address pr threads
Jan 27, 2025
b55d514
address pr threads
Jan 27, 2025
4c425bd
merged with main
Jan 27, 2025
f2d68f5
Update backend/bin/deal-observer-backend.js
NikolasHaimerl Jan 27, 2025
2aac6ca
merged with main
Jan 27, 2025
7fad024
add test for fetching peer id
Jan 27, 2025
a6c9046
add logging for deal observer
Jan 27, 2025
9881fe2
add log for write time
Jan 27, 2025
a666cd1
implement cache
Jan 27, 2025
7df7bf0
add test for infinite loop
Jan 27, 2025
e36b897
remove vscode file
Jan 27, 2025
a01ea28
fix lock file
Jan 27, 2025
2b62aee
Update backend/bin/deal-observer-backend.js
NikolasHaimerl Jan 28, 2025
833ff2a
resolve PR threads
Jan 28, 2025
f1b2fe7
merged with main
Jan 28, 2025
1f25ff8
refactoring
Jan 28, 2025
66c1c44
remove outsourced PRs
Jan 29, 2025
494d2ea
address pr threads
Jan 29, 2025
47c421a
handle parsing error
Jan 29, 2025
51bba17
renamed params function
Jan 29, 2025
ea3239a
merged with main
Jan 29, 2025
ab71c4b
fmt
Jan 29, 2025
9c43dad
fmt
Jan 29, 2025
894c0eb
add test for piece indexer loop
Jan 29, 2025
6e043b5
add test for piece indexer loop
Jan 29, 2025
cb1f50b
Merge branch 'main' into nhaimerl-piece-indexer
Jan 30, 2025
7ae1d39
merged with main
Jan 30, 2025
47a17d2
improve error handling
Jan 30, 2025
1f44762
improve error handling
Jan 30, 2025
a7e46d4
fmt
Jan 30, 2025
24a8c86
fix parallel insertion during tests
Jan 31, 2025
ffe44da
resolved PR threads
Jan 31, 2025
351daf8
Merge branch 'main' into nhaimerl-piece-indexer
juliangruber Jan 31, 2025
7cfc261
fix db async
Jan 31, 2025
ecab504
merged with main
Jan 31, 2025
78e04da
add lock utils
Jan 31, 2025
1cf0dbe
Merge branch 'main' into nhaimerl-piece-indexer
juliangruber Jan 31, 2025
a0f5b66
fix merge
juliangruber Jan 31, 2025
c7c84e4
fix types
juliangruber Jan 31, 2025
ad92c7b
fix merge
juliangruber Jan 31, 2025
77ced6d
fix merge
juliangruber Jan 31, 2025
8490e3b
fix test
juliangruber Jan 31, 2025
7a40f9a
log
juliangruber Jan 31, 2025
719a4c8
`parseDeals` -> `loadDeals`
juliangruber Jan 31, 2025
9a26941
docs
juliangruber Jan 31, 2025
9e60d66
fix fn name, remove unused export
juliangruber Jan 31, 2025
6d0f871
refactor loop
juliangruber Jan 31, 2025
f4dd437
`getDealPayloadCid` can return `null`
juliangruber Jan 31, 2025
68bebba
test concurrency 0
juliangruber Jan 31, 2025
5369842
fix remove dependency
juliangruber Jan 31, 2025
8dcef45
add missing pix request implementation
juliangruber Jan 31, 2025
691a709
fix lint
juliangruber Jan 31, 2025
76270c2
clean up
juliangruber Jan 31, 2025
0081f78
Merge branch 'main' into nhaimerl-piece-indexer
juliangruber Jan 31, 2025
21ab6bd
refactor
juliangruber Jan 31, 2025
9993620
remove assert (we use types for this)
juliangruber Jan 31, 2025
e5035e6
`queryLimit` -> `maxDeals`
juliangruber Jan 31, 2025
6e6372f
clean up
juliangruber Jan 31, 2025
b8e8dd1
move main function to top
juliangruber Jan 31, 2025
ab4132d
`fetchDealsWithNoPayloadCid()` always returns an array
juliangruber Jan 31, 2025
7922b7e
refactor
juliangruber Jan 31, 2025
86d9596
reorder functions
juliangruber Jan 31, 2025
482cd93
make query safer
juliangruber Jan 31, 2025
663a200
`makePixRequest` -> `getDealPayloadCid`
juliangruber Jan 31, 2025
3e8aa30
refactor
juliangruber Jan 31, 2025
97550db
refactor
juliangruber Jan 31, 2025
98ed48e
style
juliangruber Jan 31, 2025
6ef1de2
test_data: add missing miner
juliangruber Jan 31, 2025
523815f
fix test options
juliangruber Jan 31, 2025
f0930fc
update tests
juliangruber Jan 31, 2025
6bb2c71
test fn call count
juliangruber Jan 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions backend/bin/deal-observer-backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import '../lib/instrument.js'
import { createInflux } from '../lib/telemetry.js'
import { getChainHead, rpcRequest } from '../lib/rpc-service/service.js'
import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, observeBuiltinActorEvents } from '../lib/deal-observer.js'
import { indexPieces } from '../lib/piece-indexer.js'
import { findAndSubmitUnsubmittedDeals, submitDealsToSparkApi } from '../lib/spark-api-submit-deals.js'

const {
Expand All @@ -23,8 +24,7 @@ if (!INFLUXDB_TOKEN) {
assert(SPARK_API_BASE_URL, 'SPARK_API_BASE_URL required')
assert(SPARK_API_TOKEN, 'SPARK_API_TOKEN required')

const OBSERVE_ACTOR_EVENTS_LOOP_INTERVAL = 10 * 1000
const SPARK_API_SUBMIT_DEALS_LOOP_INTERVAL = 10 * 1000
const LOOP_INTERVAL = 10 * 1000
juliangruber marked this conversation as resolved.
Show resolved Hide resolved

// Filecoin will need some epochs to reach finality.
// We do not want to fetch deals that are newer than the current chain head - 940 epochs.
Expand All @@ -34,6 +34,7 @@ const maxPastEpochs = 1999
assert(finalityEpochs <= maxPastEpochs)

const pgPool = await createPgPool()
const queryLimit = 1000
const { recordTelemetry } = createInflux(INFLUXDB_TOKEN)

const observeActorEventsLoop = async (makeRpcRequest, pgPool) => {
Expand Down Expand Up @@ -69,12 +70,12 @@ const observeActorEventsLoop = async (makeRpcRequest, pgPool) => {

if (INFLUXDB_TOKEN) {
recordTelemetry(`loop_${slug(LOOP_NAME, '_')}`, point => {
point.intField('interval_ms', OBSERVE_ACTOR_EVENTS_LOOP_INTERVAL)
point.intField('interval_ms', LOOP_INTERVAL)
point.intField('duration_ms', dt)
})
}
if (dt < OBSERVE_ACTOR_EVENTS_LOOP_INTERVAL) {
await timers.setTimeout(OBSERVE_ACTOR_EVENTS_LOOP_INTERVAL - dt)
if (dt < LOOP_INTERVAL) {
await timers.setTimeout(LOOP_INTERVAL - dt)
}
}
}
Expand Down Expand Up @@ -107,17 +108,45 @@ const sparkApiSubmitDealsLoop = async (pgPool, { sparkApiBaseUrl, sparkApiToken,

if (INFLUXDB_TOKEN) {
recordTelemetry(`loop_${slug(LOOP_NAME, '_')}`, point => {
point.intField('interval_ms', SPARK_API_SUBMIT_DEALS_LOOP_INTERVAL)
point.intField('interval_ms', LOOP_INTERVAL)
point.intField('duration_ms', dt)
})
}
if (dt < OBSERVE_ACTOR_EVENTS_LOOP_INTERVAL) {
await timers.setTimeout(SPARK_API_SUBMIT_DEALS_LOOP_INTERVAL - dt)
if (dt < LOOP_INTERVAL) {
await timers.setTimeout(LOOP_INTERVAL - dt)
}
}
}

export const pieceIndexerLoop = async (makeRpcRequest, makePixRequest, pgPool) => {
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
const LOOP_NAME = 'Piece Indexer'
while (true) {
const start = Date.now()
try {
indexPieces(makeRpcRequest, makePixRequest, pgPool, queryLimit)
} catch (e) {
console.error(e)
Sentry.captureException(e)
}
const dt = Date.now() - start
console.log(`Loop "${LOOP_NAME}" took ${dt}ms`)

// For local monitoring and debugging, we can omit sending data to InfluxDB
if (INFLUXDB_TOKEN) {
recordTelemetry(`loop_${slug(LOOP_NAME, '_')}`, point => {
point.intField('interval_ms', LOOP_INTERVAL)
point.intField('duration_ms', dt)
})
}
if (dt < LOOP_INTERVAL) {
await timers.setTimeout(LOOP_INTERVAL - dt)
}
}
}

await Promise.all([
// TODO: Define `pixRequest`
pieceIndexerLoop(rpcRequest, () => {}, pgPool),
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
observeActorEventsLoop(rpcRequest, pgPool),
sparkApiSubmitDealsLoop(pgPool, {
sparkApiBaseUrl: SPARK_API_BASE_URL,
Expand Down
4 changes: 3 additions & 1 deletion backend/lib/config.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const {
RPC_URLS = 'https://api.node.glif.io/rpc/v0',
PIECE_INDEXER_URL = 'https://pix.filspark.com',
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
GLIF_TOKEN
} = process.env

Expand All @@ -15,5 +16,6 @@ if (RPC_URL.includes('glif') && GLIF_TOKEN) {

export {
RPC_URL,
rpcHeaders
rpcHeaders,
PIECE_INDEXER_URL
}
71 changes: 29 additions & 42 deletions backend/lib/deal-observer.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
/** @import {Queryable} from '@filecoin-station/deal-observer-db' */
/** @import { BlockEvent } from './rpc-service/data-types.js' */
/** @import { Static } from '@sinclair/typebox' */

import assert from 'node:assert'
import { getActorEvents, getActorEventsFilter } from './rpc-service/service.js'
import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js'
import { Value } from '@sinclair/typebox/value'
import { convertBlockEventToActiveDealDbEntry } from './utils.js'

/**
* @param {number} blockHeight
Expand All @@ -15,10 +14,9 @@ import { Value } from '@sinclair/typebox/value'
*/
export async function observeBuiltinActorEvents (blockHeight, pgPool, makeRpcRequest) {
const eventType = 'claim'
const activeDeals = await getActorEvents(getActorEventsFilter(blockHeight, eventType), makeRpcRequest)
assert(activeDeals !== undefined, `No ${eventType} events found in block ${blockHeight}`)
console.log(`Observed ${activeDeals.length} ${eventType} events in block ${blockHeight}`)
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
await storeActiveDeals(activeDeals, pgPool)
const blockEvents = await getActorEvents(getActorEventsFilter(blockHeight, eventType), makeRpcRequest)
console.log(`Observed ${blockEvents.length} ${eventType} events in block ${blockHeight}`)
await storeActiveDeals(blockEvents.map((event) => convertBlockEventToActiveDealDbEntry(event)), pgPool)
}

/**
Expand All @@ -42,26 +40,14 @@ export async function countStoredActiveDeals (pgPool) {
}

/**
* @param {Static<typeof BlockEvent>[]} activeDeals
* @param {Static<typeof ActiveDealDbEntry >[]} activeDeals
* @param {Queryable} pgPool
* @returns {Promise<void>}
* */
export async function storeActiveDeals (activeDeals, pgPool) {
const transformedDeals = activeDeals.map((deal) => (
{
activated_at_epoch: deal.height,
miner_id: deal.event.provider,
client_id: deal.event.client,
piece_cid: deal.event.pieceCid,
piece_size: deal.event.pieceSize,
term_start_epoch: deal.event.termStart,
term_min: deal.event.termMin,
term_max: deal.event.termMax,
sector_id: deal.event.sector,
payload_cid: null
}))

const startInserting = Date.now()
if (activeDeals.length === 0) {
return
}
try {
// Insert deals in a batch
const insertQuery = `
Expand Down Expand Up @@ -89,34 +75,35 @@ export async function storeActiveDeals (activeDeals, pgPool) {
)
`
await pgPool.query(insertQuery, [
transformedDeals.map(deal => deal.activated_at_epoch),
transformedDeals.map(deal => deal.miner_id),
transformedDeals.map(deal => deal.client_id),
transformedDeals.map(deal => deal.piece_cid),
transformedDeals.map(deal => deal.piece_size),
transformedDeals.map(deal => deal.term_start_epoch),
transformedDeals.map(deal => deal.term_min),
transformedDeals.map(deal => deal.term_max),
transformedDeals.map(deal => deal.sector_id)
activeDeals.map(deal => deal.activated_at_epoch),
activeDeals.map(deal => deal.miner_id),
activeDeals.map(deal => deal.client_id),
activeDeals.map(deal => deal.piece_cid),
activeDeals.map(deal => deal.piece_size),
activeDeals.map(deal => deal.term_start_epoch),
activeDeals.map(deal => deal.term_min),
activeDeals.map(deal => deal.term_max),
activeDeals.map(deal => deal.sector_id)
])

// Commit the transaction if all inserts are successful
console.log(`Inserting ${activeDeals.length} deals took ${Date.now() - startInserting}ms`)
} catch (error) {
// If any error occurs, roll back the transaction
// TODO: Add sentry entry for this error
// https://github.com/filecoin-station/deal-observer/issues/28
console.error('Error inserting deals:', error.message)
throw Error('Error inserting deals', { cause: error })
}
}

/**
* @param {Queryable} pgPool
* @param {string} query
* @returns {Promise<Array<Static<typeof ActiveDealDbEntry>>>}
*/
async function loadDeals (pgPool, query) {
* @param {Queryable} pgPool
* @param {string} query
* @returns {Promise<Array<Static <typeof ActiveDealDbEntry>>>}
*/
export async function loadDeals (pgPool, query) {
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
const result = (await pgPool.query(query)).rows.map(deal => {
// SQL used null, typebox needs undefined for null values
Object.keys(deal).forEach(key => {
if (deal[key] === null) {
deal[key] = undefined
}
})
return Value.Parse(ActiveDealDbEntry, deal)
}
)
Expand Down
82 changes: 82 additions & 0 deletions backend/lib/piece-indexer.js
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have piece-indexer - see https://github.com/CheckerNetwork/piece-indexer

How about calling this file piece-indexer-client.js?

(Not blocking this PR from landing.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I found that confusing as well. Will fix

Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { fetchPayloadCid } from './pix-service/service.js'
import { loadDeals } from './deal-observer.js'
import assert from 'node:assert'
import * as util from 'node:util'

/** @import {Queryable} from '@filecoin-station/deal-observer-db' */
/** @import { Static } from '@sinclair/typebox' */
/** @import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' */

/**
* @param {Queryable} pgPool
* @param {function} makeRpcRequest
* @param {function} makePixRequest
* @returns {Promise<void>}
*/
export async function updatePayloadCids (pgPool, makeRpcRequest, activeDeals, makePixRequest) {
const updatedDeals = []
for (const deal of activeDeals) {
const payloadCid = await fetchPayloadCid(deal.miner_id, deal.piece_cid, makeRpcRequest, makePixRequest)
deal.payload_cid = payloadCid
updatedDeals.push(deal)
}
await updatePayloadInActiveDeal(pgPool, updatedDeals)
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
}

/**
*
* @param {function} makeRpcRequest
* @param {function} makePixRequest
* @param {Queryable} pgPool
* @param {number} queryLimit
* @returns {Promise<void>}
*/
export const indexPieces = async (makeRpcRequest, makePixRequest, pgPool, queryLimit) => {
// TODO: handle payloads which cannot be retrieved from the piece CID indexer
const dealsWithMissingPayloadCid = await fetchDealsWithNoPayloadCid(pgPool, queryLimit)
if (dealsWithMissingPayloadCid !== null && dealsWithMissingPayloadCid) {
await updatePayloadCids(pgPool, makeRpcRequest, dealsWithMissingPayloadCid, makePixRequest)
}
}

/**
* @param {Queryable} pgPool
* @param {number} limit
* @returns {Promise<Array<Static< typeof ActiveDealDbEntry>>>}
*/
export async function fetchDealsWithNoPayloadCid (pgPool, limit) {
assert(typeof limit === 'number', 'limit must be a number')
const query = `SELECT * FROM active_deals WHERE payload_cid IS NULL ORDER BY activated_at_epoch ASC LIMIT ${limit}`
return await loadDeals(pgPool, query)
}

/**
* @param {Queryable} pgPool
* @param {Array<Static<typeof ActiveDealDbEntry>>} deals
* @returns { Promise<void>}
*/
export async function updatePayloadInActiveDeal (pgPool, deals) {
const updateQuery = `
UPDATE active_deals
SET payload_cid = $1
WHERE activated_at_epoch = $2 AND miner_id = $3 AND client_id = $4 AND piece_cid = $5 AND piece_size = $6 AND term_start_epoch = $7 AND term_min = $8 AND term_max = $9 AND sector_id = $10
`
for (const deal of deals) {
try {
await pgPool.query(updateQuery, [
deal.payload_cid,
deal.activated_at_epoch,
deal.miner_id,
deal.client_id,
deal.piece_cid,
deal.piece_size,
deal.term_start_epoch,
deal.term_min,
deal.term_max,
deal.sector_id
])
} catch (error) {
throw Error(util.format('Error updating payload of deal: ', deal), { cause: error })
}
}
}
10 changes: 10 additions & 0 deletions backend/lib/pix-service/data-types.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Type } from '@sinclair/typebox'

// The response type from the piece indexer api
const PixResponse = Type.Object({
samples: Type.Array(Type.String())
})

export {
PixResponse
}
43 changes: 43 additions & 0 deletions backend/lib/pix-service/service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { Value } from '@sinclair/typebox/value'
import { PIECE_INDEXER_URL } from '../config.js'
import { getMinerPeerId } from '../rpc-service/service.js'
import { PixResponse } from './data-types.js'
import pRetry from 'p-retry'

/**
* @param {string} providerId
* @param {string} pieceCid
* @returns {Promise<string>}
*/
export const getDealPayloadCid = async (providerId, pieceCid) => {
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
const url = PIECE_INDEXER_URL + '/sample/' + providerId + '/' + pieceCid
try {
const response = await pRetry(async () => await fetch(url, {
method: 'GET',
headers: { 'content-type': 'application/json' }
}), { retries: 5 })
const json = await response.json()
try {
const parsedPixResponse = Value.Parse(PixResponse, json)
if (parsedPixResponse.samples.length === 0) {
throw new Error('No samples found in the response.')
}
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
return parsedPixResponse.samples[0]
} catch (e) {
throw new Error(`Failed to parse response from piece indexer. The response was : ${JSON.stringify(json)}`, { cause: e })
}
} catch (e) {
throw new Error('Failed to make RPC request.', { cause: e })
}
}

/**
* @param {number} providerId
* @param {string} pieceCid
* @returns {Promise<string>}
*/
export async function fetchPayloadCid (providerId, pieceCid, makeRpcRequest, makePixRequest) {
const minerPeerId = await getMinerPeerId(providerId, makeRpcRequest)
const payloadCid = await makePixRequest(minerPeerId, pieceCid)
return payloadCid
}
Loading
Loading