Skip to content

Commit

Permalink
Merge pull request #5667 from ipfs/fix/simplify-dag-add
Browse files Browse the repository at this point in the history
simplify dag put and correctly take pin lock
  • Loading branch information
Stebalien authored Oct 27, 2018
2 parents 831ed47 + a0c0355 commit ccd7f2c
Showing 1 changed file with 37 additions and 74 deletions.
111 changes: 37 additions & 74 deletions core/commands/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ import (
"math"

"github.com/ipfs/go-ipfs/core/commands/cmdenv"
"github.com/ipfs/go-ipfs/core/commands/e"
"github.com/ipfs/go-ipfs/core/coredag"
"github.com/ipfs/go-ipfs/pin"

cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash"
path "gx/ipfs/QmRKuTyCzg7HFBcV1YUhzStroGtJSb8iWgyxfsDCwFhWTS/go-path"
cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds"
files "gx/ipfs/QmZMWMvWMVKCbHetJ4RgndbuEF1io2UpUxwQwtNjtYPzSC/go-ipfs-files"
ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format"
cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
)
Expand Down Expand Up @@ -87,81 +85,60 @@ into an object of the specified format.
}
}

outChan := make(chan interface{}, 8)
cids := cid.NewSet()
b := ipld.NewBatch(req.Context, nd.DAG)

addAllAndPin := func(f files.File) error {
cids := cid.NewSet()
b := ipld.NewBatch(req.Context, nd.DAG)

for {
file, err := f.NextFile()
if err == io.EOF {
// Finished the list of files.
break
} else if err != nil {
return err
}

nds, err := coredag.ParseInputs(ienc, format, file, mhType, -1)
if err != nil {
return err
}
if len(nds) == 0 {
return fmt.Errorf("no node returned from ParseInputs")
}

for _, nd := range nds {
err := b.Add(nd)
if err != nil {
return err
}
}

cid := nds[0].Cid()
cids.Add(cid)
if dopin {
defer nd.Blockstore.PinLock().Unlock()
}

select {
case outChan <- &OutputObject{Cid: cid}:
case <-req.Context.Done():
return nil
}
for {
file, err := req.Files.NextFile()
if err == io.EOF {
// Finished the list of files.
break
} else if err != nil {
return err
}

if err := b.Commit(); err != nil {
nds, err := coredag.ParseInputs(ienc, format, file, mhType, -1)
if err != nil {
return err
}
if len(nds) == 0 {
return fmt.Errorf("no node returned from ParseInputs")
}

if dopin {
defer nd.Blockstore.PinLock().Unlock()

cids.ForEach(func(c cid.Cid) error {
nd.Pinning.PinWithMode(c, pin.Recursive)
return nil
})

err := nd.Pinning.Flush()
for _, nd := range nds {
err := b.Add(nd)
if err != nil {
return err
}
}

return nil
cid := nds[0].Cid()
cids.Add(cid)
if err := res.Emit(&OutputObject{Cid: cid}); err != nil {
return err
}
}

errC := make(chan error)
go func() {
var err error
defer func() { errC <- err }()
defer close(outChan)
err = addAllAndPin(req.Files)
}()

err = res.Emit(outChan)
if err != nil {
if err := b.Commit(); err != nil {
return err
}

return <-errC
if dopin {
cids.ForEach(func(c cid.Cid) error {
nd.Pinning.PinWithMode(c, pin.Recursive)
return nil
})

err := nd.Pinning.Flush()
if err != nil {
return err
}
}
return nil
},
Type: OutputObject{},
Encoders: cmds.EncoderMap{
Expand Down Expand Up @@ -260,17 +237,3 @@ var DagResolveCmd = &cmds.Command{
},
Type: ResolveOutput{},
}

// copy+pasted from ../commands.go
func unwrapOutput(i interface{}) (interface{}, error) {
var (
ch <-chan interface{}
ok bool
)

if ch, ok = i.(<-chan interface{}); !ok {
return nil, e.TypeErr(ch, i)
}

return <-ch, nil
}

0 comments on commit ccd7f2c

Please sign in to comment.