Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use 🛻 pickup from api #2310

Merged
merged 6 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/cron-pins-failed.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ jobs:
strategy:
matrix:
env: ['production']
include:
- env: production
pickup_url: http://pickup.dag.haus
steps:
- uses: actions/checkout@v2
with:
Expand All @@ -41,5 +44,7 @@ jobs:
CLUSTER2_BASIC_AUTH_TOKEN: ${{ secrets.CLUSTER2_BASIC_AUTH_TOKEN }}
CLUSTER3_API_URL: ${{ secrets.CLUSTER3_API_URL }}
CLUSTER3_BASIC_AUTH_TOKEN: ${{ secrets.CLUSTER3_BASIC_AUTH_TOKEN }}
PICKUP_URL: ${{ matrix.pickup_url }}
PICKUP_BASIC_AUTH_TOKEN: ${{ secrets.PICKUP_BASIC_AUTH_TOKEN }}
AFTER: ${{ github.event.inputs.after }}
run: yarn --cwd packages/cron start:pins-failed
7 changes: 7 additions & 0 deletions .github/workflows/cron-pins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ jobs:
strategy:
matrix:
env: ['staging', 'production']
include:
- env: production
pickup_url: http://pickup.dag.haus
- env: staging
pickup_url: http://staging.pickup.dag.haus
timeout-minutes: 60
steps:
- uses: actions/checkout@v2
Expand All @@ -37,6 +42,8 @@ jobs:
CLUSTER2_BASIC_AUTH_TOKEN: ${{ secrets.CLUSTER2_BASIC_AUTH_TOKEN }}
CLUSTER3_API_URL: ${{ secrets.CLUSTER3_API_URL }}
CLUSTER3_BASIC_AUTH_TOKEN: ${{ secrets.CLUSTER3_BASIC_AUTH_TOKEN }}
PICKUP_URL: ${{ matrix.pickup_url }}
PICKUP_BASIC_AUTH_TOKEN: ${{ secrets.PICKUP_BASIC_AUTH_TOKEN }}
run: yarn --cwd packages/cron start:pins
- name: Heartbeat
if: ${{ success() }}
Expand Down
6 changes: 6 additions & 0 deletions packages/api/src/routes/pins-add.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ export async function pinsAdd(event, ctx) {
})

const upload = await db.createUpload({
pins: [
{
status: 'PinQueued',
service: 'ElasticIpfs', // via pickup
},
],
Comment on lines +54 to +59
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do this to override the default pin info that createUpload would set if we didn't provide this. New pins via pickup are provided via E-IPFS.

type: 'Remote',
content_cid: cid.contentCid,
source_cid: cid.sourceCid,
Expand Down
6 changes: 6 additions & 0 deletions packages/api/src/routes/pins-replace.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ export async function pinsReplace(event, ctx) {
})

const upload = await db.createUpload({
pins: [
{
status: 'PinQueued',
service: 'ElasticIpfs', // via pickup
},
],
type: 'Remote',
content_cid: cid.contentCid,
source_cid: cid.sourceCid,
Expand Down
18 changes: 16 additions & 2 deletions packages/cron/src/bin/pins-failed.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ import { fileURLToPath } from 'url'
import dotenv from 'dotenv'
import fetch from '@web-std/fetch'
import { checkFailedPinStatuses } from '../jobs/pins.js'
import { getPg, getCluster1, getCluster2, getCluster3 } from '../lib/utils.js'
import {
getPg,
getCluster1,
getCluster2,
getCluster3,
getPickup,
} from '../lib/utils.js'

const __dirname = path.dirname(fileURLToPath(import.meta.url))
global.fetch = fetch
Expand All @@ -21,11 +27,19 @@ async function main() {
const cluster1 = getCluster1(process.env)
const cluster2 = getCluster2(process.env)
const cluster3 = getCluster3(process.env)
const pickup = getPickup(process.env)
const after = process.env.AFTER
? new Date(process.env.AFTER)
: oneMonthAgo()

await checkFailedPinStatuses({ pg, cluster1, cluster2, cluster3, after })
await checkFailedPinStatuses({
pg,
cluster1,
cluster2,
cluster3,
pickup,
after,
})
} finally {
await pg.end()
}
Expand Down
11 changes: 9 additions & 2 deletions packages/cron/src/bin/pins.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ import { fileURLToPath } from 'url'
import dotenv from 'dotenv'
import fetch from '@web-std/fetch'
import { updatePendingPinStatuses } from '../jobs/pins.js'
import { getPg, getCluster1, getCluster2, getCluster3 } from '../lib/utils.js'
import {
getPg,
getCluster1,
getCluster2,
getCluster3,
getPickup,
} from '../lib/utils.js'

const __dirname = path.dirname(fileURLToPath(import.meta.url))
global.fetch = fetch
Expand All @@ -18,8 +24,9 @@ async function main() {
const cluster1 = getCluster1(process.env)
const cluster2 = getCluster2(process.env)
const cluster3 = getCluster3(process.env)
const pickup = getPickup(process.env)

await updatePendingPinStatuses({ pg, cluster1, cluster2, cluster3 })
await updatePendingPinStatuses({ pg, cluster1, cluster2, cluster3, pickup })
} finally {
await pg.end()
}
Expand Down
12 changes: 8 additions & 4 deletions packages/cron/src/jobs/pins.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ const CONCURRENCY = 5
* http://nginx.org/en/docs/http/ngx_http_core_module.html#large_client_header_buffers
*/
const MAX_CLUSTER_STATUS_CIDS = 120
const CLUSTERS = ['IpfsCluster', 'IpfsCluster2', 'IpfsCluster3']
/**
* @typedef {import('../../../api/src/utils/db-types').definitions} definitions
* @type Array<definitions["pin"]["service"]>
**/
const CLUSTERS = ['IpfsCluster', 'IpfsCluster2', 'IpfsCluster3', 'ElasticIpfs']

/**
* @typedef {import('pg').Client} Client
Expand All @@ -17,8 +21,8 @@ const CLUSTERS = ['IpfsCluster', 'IpfsCluster2', 'IpfsCluster3']
* cluster1: import('@nftstorage/ipfs-cluster').Cluster
* cluster2: import('@nftstorage/ipfs-cluster').Cluster
* cluster3: import('@nftstorage/ipfs-cluster').Cluster
* pickup: import('@nftstorage/ipfs-cluster').Cluster
* }} Config
* @typedef {import('../../../api/src/utils/db-types').definitions} definitions
* @typedef {Pick<definitions['pin'], 'id'|'status'|'service'|'inserted_at'|'updated_at'> & { source_cid: string }} Pin
* @typedef {import('@supabase/postgrest-js').PostgrestQueryBuilder<Pin>} PinQuery
*/
Expand Down Expand Up @@ -145,7 +149,7 @@ UPDATE pin AS p
* }} config
*/
async function updatePinStatuses(config) {
const { countPins, fetchPins, pg, cluster3 } = config
const { countPins, fetchPins, pg, pickup } = config
if (!log.enabled) {
console.log('ℹ️ Enable logging by setting DEBUG=pins:updatePinStatuses')
}
Expand Down Expand Up @@ -182,7 +186,7 @@ async function updatePinStatuses(config) {
/** @type {Pin[]} */
const updatedPins = []
const cids = pins.map((p) => p.source_cid)
const statuses = await cluster3.statusAll({ cids })
const statuses = await pickup.statusAll({ cids })
const statusByCid = Object.fromEntries(statuses.map((s) => [s.cid, s]))

for (const pin of pins) {
Expand Down
15 changes: 15 additions & 0 deletions packages/cron/src/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ export function getCluster3(env) {
})
}

/**
* Create a new IPFS Cluster instance from the passed environment variables.
* @param {Record<string, string|undefined>} env
*/
export function getPickup(env) {
const pickupUrl = env.PICKUP_URL
if (!pickupUrl) throw new Error('PICKUP_URL must be set in env')
const basicAuthToken = env.PICKUP_BASIC_AUTH_TOKEN
if (!basicAuthToken)
throw new Error('PICKUP_BASIC_AUTH_TOKEN must be set in env')
return new Cluster(pickupUrl, {
headers: { authorization: `Basic ${basicAuthToken}` },
})
}

/**
* Create a new DBClient instance from the passed environment variables.
* @param {Record<string, string|undefined>} env
Expand Down