Skip to content

Commit

Permalink
feat: use pickup from cron pins job
Browse files Browse the repository at this point in the history
pickup pulls dags into E-IPFS, so this PR updates the api and cron jobs to reflect that in our db.

- Update pins and pins-failed cron jobs to check pin status in pickup instead of cluster

Fixes #2309

TODO

- [x] add `PICKUP_BASIC_AUTH_TOKEN` as a secret to the repo for cron jobs.
- [ ] update `CLUSTER-*` api env vars.

License: MIT
Signed-off-by: Oli Evans <[email protected]>
  • Loading branch information
olizilla committed Mar 7, 2023
1 parent 6dad78d commit 183d837
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/cron-pins-failed.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ jobs:
strategy:
matrix:
env: ['production']
include:
- env: production
pickup_url: http://pickup.dag.haus
steps:
- uses: actions/checkout@v2
with:
Expand All @@ -41,5 +44,7 @@ jobs:
CLUSTER2_BASIC_AUTH_TOKEN: ${{ secrets.CLUSTER2_BASIC_AUTH_TOKEN }}
CLUSTER3_API_URL: ${{ secrets.CLUSTER3_API_URL }}
CLUSTER3_BASIC_AUTH_TOKEN: ${{ secrets.CLUSTER3_BASIC_AUTH_TOKEN }}
PICKUP_URL: ${{ matrix.pickup_url }}
PICKUP_BASIC_AUTH_TOKEN: ${{ secrets.PICKUP_BASIC_AUTH_TOKEN }}
AFTER: ${{ github.event.inputs.after }}
run: yarn --cwd packages/cron start:pins-failed
7 changes: 7 additions & 0 deletions .github/workflows/cron-pins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ jobs:
strategy:
matrix:
env: ['staging', 'production']
include:
- env: production
pickup_url: http://pickup.dag.haus
- env: staging
pickup_url: http://staging.pickup.dag.haus
timeout-minutes: 60
steps:
- uses: actions/checkout@v2
Expand All @@ -37,6 +42,8 @@ jobs:
CLUSTER2_BASIC_AUTH_TOKEN: ${{ secrets.CLUSTER2_BASIC_AUTH_TOKEN }}
CLUSTER3_API_URL: ${{ secrets.CLUSTER3_API_URL }}
CLUSTER3_BASIC_AUTH_TOKEN: ${{ secrets.CLUSTER3_BASIC_AUTH_TOKEN }}
PICKUP_URL: ${{ matrix.pickup_url }}
PICKUP_BASIC_AUTH_TOKEN: ${{ secrets.PICKUP_BASIC_AUTH_TOKEN }}
run: yarn --cwd packages/cron start:pins
- name: Heartbeat
if: ${{ success() }}
Expand Down
18 changes: 16 additions & 2 deletions packages/cron/src/bin/pins-failed.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ import { fileURLToPath } from 'url'
import dotenv from 'dotenv'
import fetch from '@web-std/fetch'
import { checkFailedPinStatuses } from '../jobs/pins.js'
import { getPg, getCluster1, getCluster2, getCluster3 } from '../lib/utils.js'
import {
getPg,
getCluster1,
getCluster2,
getCluster3,
getPickup,
} from '../lib/utils.js'

const __dirname = path.dirname(fileURLToPath(import.meta.url))
global.fetch = fetch
Expand All @@ -21,11 +27,19 @@ async function main() {
const cluster1 = getCluster1(process.env)
const cluster2 = getCluster2(process.env)
const cluster3 = getCluster3(process.env)
const pickup = getPickup(process.env)
const after = process.env.AFTER
? new Date(process.env.AFTER)
: oneMonthAgo()

await checkFailedPinStatuses({ pg, cluster1, cluster2, cluster3, after })
await checkFailedPinStatuses({
pg,
cluster1,
cluster2,
cluster3,
pickup,
after,
})
} finally {
await pg.end()
}
Expand Down
11 changes: 9 additions & 2 deletions packages/cron/src/bin/pins.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ import { fileURLToPath } from 'url'
import dotenv from 'dotenv'
import fetch from '@web-std/fetch'
import { updatePendingPinStatuses } from '../jobs/pins.js'
import { getPg, getCluster1, getCluster2, getCluster3 } from '../lib/utils.js'
import {
getPg,
getCluster1,
getCluster2,
getCluster3,
getPickup,
} from '../lib/utils.js'

const __dirname = path.dirname(fileURLToPath(import.meta.url))
global.fetch = fetch
Expand All @@ -18,8 +24,9 @@ async function main() {
const cluster1 = getCluster1(process.env)
const cluster2 = getCluster2(process.env)
const cluster3 = getCluster3(process.env)
const pickup = getPickup(process.env)

await updatePendingPinStatuses({ pg, cluster1, cluster2, cluster3 })
await updatePendingPinStatuses({ pg, cluster1, cluster2, cluster3, pickup })
} finally {
await pg.end()
}
Expand Down
12 changes: 8 additions & 4 deletions packages/cron/src/jobs/pins.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ const CONCURRENCY = 5
* http://nginx.org/en/docs/http/ngx_http_core_module.html#large_client_header_buffers
*/
const MAX_CLUSTER_STATUS_CIDS = 120
const CLUSTERS = ['IpfsCluster', 'IpfsCluster2', 'IpfsCluster3']
/**
* @typedef {import('../../../api/src/utils/db-types').definitions} definitions
* @type Array<definitions["pin"]["service"]>
**/
const CLUSTERS = ['IpfsCluster', 'IpfsCluster2', 'IpfsCluster3', 'ElasticIpfs']

/**
* @typedef {import('pg').Client} Client
Expand All @@ -17,8 +21,8 @@ const CLUSTERS = ['IpfsCluster', 'IpfsCluster2', 'IpfsCluster3']
* cluster1: import('@nftstorage/ipfs-cluster').Cluster
* cluster2: import('@nftstorage/ipfs-cluster').Cluster
* cluster3: import('@nftstorage/ipfs-cluster').Cluster
* pickup: import('@nftstorage/ipfs-cluster').Cluster
* }} Config
* @typedef {import('../../../api/src/utils/db-types').definitions} definitions
* @typedef {Pick<definitions['pin'], 'id'|'status'|'service'|'inserted_at'|'updated_at'> & { source_cid: string }} Pin
* @typedef {import('@supabase/postgrest-js').PostgrestQueryBuilder<Pin>} PinQuery
*/
Expand Down Expand Up @@ -145,7 +149,7 @@ UPDATE pin AS p
* }} config
*/
async function updatePinStatuses(config) {
const { countPins, fetchPins, pg, cluster3 } = config
const { countPins, fetchPins, pg, pickup } = config
if (!log.enabled) {
console.log('ℹ️ Enable logging by setting DEBUG=pins:updatePinStatuses')
}
Expand Down Expand Up @@ -182,7 +186,7 @@ async function updatePinStatuses(config) {
/** @type {Pin[]} */
const updatedPins = []
const cids = pins.map((p) => p.source_cid)
const statuses = await cluster3.statusAll({ cids })
const statuses = await pickup.statusAll({ cids })
const statusByCid = Object.fromEntries(statuses.map((s) => [s.cid, s]))

for (const pin of pins) {
Expand Down
15 changes: 15 additions & 0 deletions packages/cron/src/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ export function getCluster3(env) {
})
}

/**
* Create a new IPFS Cluster instance from the passed environment variables.
* @param {Record<string, string|undefined>} env
*/
export function getPickup(env) {
const pickupUrl = env.PICKUP_URL
if (!pickupUrl) throw new Error('PICKUP_URL must be set in env')
const basicAuthToken = env.PICKUP_BASIC_AUTH_TOKEN
if (!basicAuthToken)
throw new Error('PICKUP_BASIC_AUTH_TOKEN must be set in env')
return new Cluster(pickupUrl, {
headers: { authorization: `Basic ${basicAuthToken}` },
})
}

/**
* Create a new DBClient instance from the passed environment variables.
* @param {Record<string, string|undefined>} env
Expand Down

0 comments on commit 183d837

Please sign in to comment.