Skip to content

Commit

Permalink
Migration 8-9: migrate CIDsv1 to raw multihash
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan committed Jan 31, 2020
1 parent 012d6a8 commit 4658cb0
Show file tree
Hide file tree
Showing 9 changed files with 2,079 additions and 2 deletions.
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
module github.com/ipfs/fs-repo-migrations

go 1.13

replace github.com/ipfs/fs-repo-migrations/ipfs-8-to-9/migration => ./ipfs-8-to-9/migration

require (
github.com/dgraph-io/badger v1.6.0
github.com/ipfs/fs-repo-migrations/ipfs-8-to-9/migration v0.0.0-00010101000000-000000000000
github.com/mitchellh/go-homedir v1.1.0
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
)
835 changes: 835 additions & 0 deletions go.sum

Large diffs are not rendered by default.

Binary file added ipfs-8-to-9/ipfs-8-to-9
Binary file not shown.
11 changes: 11 additions & 0 deletions ipfs-8-to-9/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package main

import (
migrate "github.com/ipfs/fs-repo-migrations/go-migrate"
mg8 "github.com/ipfs/fs-repo-migrations/ipfs-8-to-9/migration"
)

func main() {
m := mg8.Migration{}
migrate.Main(&m)
}
21 changes: 21 additions & 0 deletions ipfs-8-to-9/migration/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module github.com/ipfs/fs-repo-migrations/ipfs-8-to-9/migration

go 1.13

require (
github.com/hsanjuan/go-libp2p-http v0.0.2 // indirect
github.com/ipfs/dir-index-html v1.0.3 // indirect
github.com/ipfs/fs-repo-migrations v1.4.0
github.com/ipfs/go-cid v0.0.4
github.com/ipfs/go-datastore v0.3.1
github.com/ipfs/go-ds-flatfs v0.3.0
github.com/ipfs/go-ipfs v0.4.22-0.20200130064341-6750ee973e2a
github.com/ipfs/go-ipfs-addr v0.0.1 // indirect
github.com/ipfs/go-ipfs-ds-help v0.0.1
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/multiformats/go-multicodec v0.1.6 // indirect
github.com/multiformats/go-multihash v0.0.10
github.com/prometheus/tsdb v0.7.1 // indirect
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect
golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d // indirect
)
922 changes: 922 additions & 0 deletions ipfs-8-to-9/migration/go.sum

Large diffs are not rendered by default.

110 changes: 110 additions & 0 deletions ipfs-8-to-9/migration/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// package mg8 contains the code to perform 8-9 repository migration in
// go-ipfs. This performs a switch to raw multihashes for all keys in the
// go-ipfs datastore (https://github.com/ipfs/go-ipfs/issues/6815).
package mg8

import (
"errors"
"fmt"

migrate "github.com/ipfs/fs-repo-migrations/go-migrate"
lock "github.com/ipfs/fs-repo-migrations/ipfs-1-to-2/repolock"
"github.com/ipfs/go-datastore/namespace"

mfsr "github.com/ipfs/fs-repo-migrations/mfsr"
log "github.com/ipfs/fs-repo-migrations/stump"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-ipfs/plugin/loader"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
)

// Migration implements the migration described above.
type Migration struct{}

// Versions returns the current version string for this migration.
func (m Migration) Versions() string {
return "8-to-9"
}

// Reversible returns false. This migration cannot be reverted, as we do not
// know which raw hashes were actually CIDv1s. However, things should work all
// the same as they will be treated as CIDv0s in old versions anyways.
func (m Migration) Reversible() bool {
return false
}

// Apply runs the migration.
func (m Migration) Apply(opts migrate.Options) error {
log.Verbose = opts.Verbose
log.Log("applying %s repo migration", m.Versions())

log.VLog("locking repo at %q", opts.Path)
lk, err := lock.Lock2(opts.Path)
if err != nil {
return err
}
defer lk.Close()

repo := mfsr.RepoPath(opts.Path)

log.VLog(" - verifying version is '8'")
if err := repo.CheckVersion("8"); err != nil {
return err
}

log.VLog(" - loading repo configurations")
plugins, err := loader.NewPluginLoader(opts.Path)
if err != nil {
return fmt.Errorf("error loading plugins: %s", err)
}

if err := plugins.Initialize(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}

if err := plugins.Inject(); err != nil {
return fmt.Errorf("error injecting plugins: %s", err)
}

cfg, err := fsrepo.ConfigAt(opts.Path)
if err != nil {
return err
}

dsc, err := fsrepo.AnyDatastoreConfig(cfg.Datastore.Spec)
if err != nil {
return err
}

dstore, err := dsc.Create(opts.Path)
if err != nil {
return err
}
defer dstore.Close()

// TODO: assuming the user has not modified this
blocks := namespace.Wrap(dstore, ds.NewKey("/blocks"))

log.VLog(" - starting CIDv1 to raw multihash block migration")
cidSwapper := CidSwapper{blocks}
total, err := cidSwapper.Run()
if err != nil {
log.Error(err)
return err
}

log.Log("%d CIDv1 keys swapped to raw multihashes", total)
if err := repo.WriteVersion("9"); err != nil {
log.Error("failed to write version file")
return err
}
log.Log("updated version file")

return nil
}

// Revert attempts to undo the migration.
func (m Migration) Revert(opts migrate.Options) error {
// TODO: Consider a no-op revert though
return errors.New("This migration cannot be reverted")
}
165 changes: 165 additions & 0 deletions ipfs-8-to-9/migration/swapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package mg8

import (
"errors"
"sync"
"sync/atomic"

log "github.com/ipfs/fs-repo-migrations/stump"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/multiformats/go-multihash"
)

// SyncSize specifies how much we batch data before committing and syncing
// Increasing this number may result
var SyncSize uint64 = 10 * 1024 * 1024 // 1MiB

// NWorkers sets the number of batching threads to run
var NWorkers int = 4

// CidSwapper reads all the keys in a datastore and replaces
// them with their raw multihash.
type CidSwapper struct {
Store ds.Batching // the datastore to migrate.
}

// Run lists all the keys in the datastore and triggers a swap operation for
// those corresponding to CIDv1s (replacing them by their raw multihash).
//
// Run returns the total number of keys swapped.
func (cswap *CidSwapper) Run() (uint64, error) {
// Always perform a final sync
defer cswap.Store.Sync(ds.NewKey("/"))
// Query all keys. We will loop all keys
// and swap those that can be parsed as CIDv1.
queryAll := query.Query{
KeysOnly: true,
}

results, err := cswap.Store.Query(queryAll)
if err != nil {
return 0, err
}
defer results.Close()
resultsCh := results.Next()

var total uint64
var nErrors uint64
var wg sync.WaitGroup
wg.Add(NWorkers)
for i := 0; i < NWorkers; i++ {
go func() {
defer wg.Done()
n, e := cswap.swapWorker(resultsCh)
atomic.AddUint64(&total, n)
atomic.AddUint64(&nErrors, e)
}()
}
wg.Wait()
if nErrors > 0 {
return total, errors.New("errors happened during the migration. Consider running it again")
}

return total, nil
}

// swapWorkers reads query results from a channel and renames CIDv1 keys to
// raw multihashes by reading the blocks and storing them with the new
// key. Returns the number of keys swapped and the number of errors.
func (cswap *CidSwapper) swapWorker(resultsCh <-chan query.Result) (uint64, uint64) {
var swapped uint64
var errored uint64
var curSyncSize uint64

// Process keys from the results channel
for res := range resultsCh {
if res.Error != nil {
log.Error(res.Error)
errored++
continue
}

oldKey := ds.NewKey(res.Key)
c, err := dsKeyToCid(oldKey)
if err != nil {
// complain if we find anything that is not a CID but
// leave it as it is.
log.Log("could not parse %s as a Cid", oldKey)
continue
}
if c.Version() == 0 { // CidV0 are multihashes, leave them.
continue
}

// Cid Version > 0
newKey := multihashToDsKey(c.Hash())
size, err := cswap.swap(oldKey, newKey)
if err != nil {
log.Error("swapping %s for %s: %s", oldKey, newKey, err)
errored++
continue
}
swapped++
curSyncSize += size

// Commit and Sync if we reached SyncSize
if curSyncSize >= SyncSize {
curSyncSize = 0
err = cswap.Store.Sync(ds.NewKey("/"))
if err != nil {
log.Error(err)
errored++
continue
}
}
}
return swapped, errored
}

// swap swaps the old for the new key in a single transaction.
func (cswap *CidSwapper) swap(old, new ds.Key) (uint64, error) {
// Unfortunately grouping multiple swaps in larger batches usually
// results in "too many open files" errors in flatfs (many really
// small files) . So we do small batches instead and Sync at larger
// intervals.
// Note flatfs will not clear up the batch after committing, so
// the object cannot be-reused.
batcher, err := cswap.Store.Batch()
if err != nil {
return 0, err
}

v, err := cswap.Store.Get(old)
vLen := uint64(len(v))
if err != nil {
return vLen, err
}
if err := batcher.Put(new, v); err != nil {
return vLen, err
}
if err := batcher.Delete(old); err != nil {
return vLen, err
}
return vLen, batcher.Commit()
}

// Copied from go-ipfs-ds-help as that one is gone.
func dsKeyToCid(dsKey datastore.Key) (cid.Cid, error) {
kb, err := dshelp.BinaryFromDsKey(dsKey)
if err != nil {
return cid.Cid{}, err
}
return cid.Cast(kb)
}

// multihashToDsKey creates a Key from the given Multihash.
// here to avoid dependency on newer dshelp function.
// TODO: can be removed if https://github.com/ipfs/go-ipfs-ds-help/pull/18
// is merged.
func multihashToDsKey(k multihash.Multihash) datastore.Key {
return dshelp.NewKeyFromBinary(k)
}
8 changes: 6 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ import (
gomigrate "github.com/ipfs/fs-repo-migrations/go-migrate"
mg0 "github.com/ipfs/fs-repo-migrations/ipfs-0-to-1/migration"
mg1 "github.com/ipfs/fs-repo-migrations/ipfs-1-to-2/migration"
homedir "github.com/ipfs/fs-repo-migrations/ipfs-2-to-3/Godeps/_workspace/src/github.com/mitchellh/go-homedir"
mg2 "github.com/ipfs/fs-repo-migrations/ipfs-2-to-3/migration"
mg3 "github.com/ipfs/fs-repo-migrations/ipfs-3-to-4/migration"
mg4 "github.com/ipfs/fs-repo-migrations/ipfs-4-to-5/migration"
mg5 "github.com/ipfs/fs-repo-migrations/ipfs-5-to-6/migration"
mg6 "github.com/ipfs/fs-repo-migrations/ipfs-6-to-7/migration"
mg7 "github.com/ipfs/fs-repo-migrations/ipfs-7-to-8/migration"
mg8 "github.com/ipfs/fs-repo-migrations/ipfs-8-to-9/migration"
mfsr "github.com/ipfs/fs-repo-migrations/mfsr"
homedir "github.com/mitchellh/go-homedir"
)

var CurrentVersion = 7
var CurrentVersion = 9

var migrations = []gomigrate.Migration{
&mg0.Migration{},
Expand All @@ -29,6 +31,8 @@ var migrations = []gomigrate.Migration{
&mg4.Migration{},
&mg5.Migration{},
&mg6.Migration{},
&mg7.Migration{}, // update bootstrappers
&mg8.Migration{}, // converts CIDv1s to raw multihashes
}

func GetIpfsDir() (string, error) {
Expand Down

0 comments on commit 4658cb0

Please sign in to comment.