diff --git a/batch.go b/batch.go new file mode 100644 index 0000000..754490b --- /dev/null +++ b/batch.go @@ -0,0 +1,177 @@ +package format + +import ( + "context" + "errors" + "runtime" +) + +// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking. +// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage +// devices, and CPUs to find the right value/formula. +var ParallelBatchCommits = runtime.NumCPU() * 2 + +// ErrNotCommited is returned when closing a batch that hasn't been successfully +// committed. +var ErrNotCommited = errors.New("error: batch not commited") + +// ErrClosed is returned when operating on a batch that has already been closed. +var ErrClosed = errors.New("error: batch closed") + +// NewBatch returns a node buffer (Batch) that buffers nodes internally and +// commits them to the underlying DAGService in batches. Use this if you intend +// to add or remove a lot of nodes all at once. +// +// If the passed context is canceled, any in-progress commits are aborted. +func NewBatch(ctx context.Context, ds DAGService) *Batch { + ctx, cancel := context.WithCancel(ctx) + return &Batch{ + ds: ds, + ctx: ctx, + cancel: cancel, + commitResults: make(chan error, ParallelBatchCommits), + MaxSize: 8 << 20, + + // By default, only batch up to 128 nodes at a time. + // The current implementation of flatfs opens this many file + // descriptors at the same time for the optimized batch write. + MaxNodes: 128, + } +} + +// Batch is a buffer for batching adds to a dag. +type Batch struct { + ds DAGService + + ctx context.Context + cancel func() + + activeCommits int + err error + commitResults chan error + + nodes []Node + size int + + MaxSize int + MaxNodes int +} + +func (t *Batch) processResults() { + for t.activeCommits > 0 { + select { + case err := <-t.commitResults: + t.activeCommits-- + if err != nil { + t.setError(err) + return + } + default: + return + } + } +} + +func (t *Batch) asyncCommit() { + numBlocks := len(t.nodes) + if numBlocks == 0 { + return + } + if t.activeCommits >= ParallelBatchCommits { + select { + case err := <-t.commitResults: + t.activeCommits-- + + if err != nil { + t.setError(err) + return + } + case <-t.ctx.Done(): + t.setError(t.ctx.Err()) + return + } + } + go func(ctx context.Context, b []Node, result chan error, ds DAGService) { + select { + case result <- ds.AddMany(ctx, b): + case <-ctx.Done(): + } + }(t.ctx, t.nodes, t.commitResults, t.ds) + + t.activeCommits++ + t.nodes = make([]Node, 0, numBlocks) + t.size = 0 + + return +} + +// Add adds a node to the batch and commits the batch if necessary. +func (t *Batch) Add(nd Node) error { + if t.err != nil { + return t.err + } + // Not strictly necessary but allows us to catch errors early. + t.processResults() + + if t.err != nil { + return t.err + } + + t.nodes = append(t.nodes, nd) + t.size += len(nd.RawData()) + + if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes { + t.asyncCommit() + } + return t.err +} + +// Commit commits batched nodes. +func (t *Batch) Commit() error { + if t.err != nil { + return t.err + } + + t.asyncCommit() + +loop: + for t.activeCommits > 0 { + select { + case err := <-t.commitResults: + t.activeCommits-- + if err != nil { + t.setError(err) + break loop + } + case <-t.ctx.Done(): + t.setError(t.ctx.Err()) + break loop + } + } + + return t.err +} + +func (t *Batch) setError(err error) { + t.err = err + + t.cancel() + + // Drain as much as we can without blocking. +loop: + for { + select { + case <-t.commitResults: + default: + break loop + } + } + + // Be nice and cleanup. These can take a *lot* of memory. + t.commitResults = nil + t.ds = nil + t.ctx = nil + t.nodes = nil + t.size = 0 + t.activeCommits = 0 +} diff --git a/batch_test.go b/batch_test.go new file mode 100644 index 0000000..71972cb --- /dev/null +++ b/batch_test.go @@ -0,0 +1,109 @@ +package format + +import ( + "context" + "sync" + "testing" + + cid "github.com/ipfs/go-cid" +) + +// Test dag +type testDag struct { + mu sync.Mutex + nodes map[string]Node +} + +func newTestDag() *testDag { + return &testDag{nodes: make(map[string]Node)} +} + +func (d *testDag) Get(ctx context.Context, cid *cid.Cid) (Node, error) { + d.mu.Lock() + defer d.mu.Unlock() + if n, ok := d.nodes[cid.KeyString()]; ok { + return n, nil + } + return nil, ErrNotFound +} + +func (d *testDag) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *NodeOption { + d.mu.Lock() + defer d.mu.Unlock() + out := make(chan *NodeOption, len(cids)) + for _, c := range cids { + if n, ok := d.nodes[c.KeyString()]; ok { + out <- &NodeOption{Node: n} + } else { + out <- &NodeOption{Err: ErrNotFound} + } + } + return out +} + +func (d *testDag) Add(ctx context.Context, node Node) error { + d.mu.Lock() + defer d.mu.Unlock() + d.nodes[node.Cid().KeyString()] = node + return nil +} + +func (d *testDag) AddMany(ctx context.Context, nodes []Node) error { + d.mu.Lock() + defer d.mu.Unlock() + for _, n := range nodes { + d.nodes[n.Cid().KeyString()] = n + } + return nil +} + +func (d *testDag) Remove(ctx context.Context, c *cid.Cid) error { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.nodes, c.KeyString()) + return nil +} + +func (d *testDag) RemoveMany(ctx context.Context, cids []*cid.Cid) error { + d.mu.Lock() + defer d.mu.Unlock() + for _, c := range cids { + delete(d.nodes, c.KeyString()) + } + return nil +} + +var _ DAGService = new(testDag) + +func TestBatch(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + d := newTestDag() + b := NewBatch(ctx, d) + for i := 0; i < 1000; i++ { + // It would be great if we could use *many* different nodes here + // but we can't add any dependencies and I don't feel like adding + // any more testing code. + if err := b.Add(new(EmptyNode)); err != nil { + t.Fatal(err) + } + } + if err := b.Commit(); err != nil { + t.Fatal(err) + } + + n, err := d.Get(ctx, new(EmptyNode).Cid()) + if err != nil { + t.Fatal(err) + } + switch n.(type) { + case *EmptyNode: + default: + t.Fatal("expected the node to exist in the dag") + } + + if len(d.nodes) != 1 { + t.Fatal("should have one node") + } +} diff --git a/daghelpers.go b/daghelpers.go new file mode 100644 index 0000000..fd72e49 --- /dev/null +++ b/daghelpers.go @@ -0,0 +1,99 @@ +package format + +import ( + "context" + + cid "github.com/ipfs/go-cid" +) + +// GetLinks returns the CIDs of the children of the given node. Prefer this +// method over looking up the node itself and calling `Links()` on it as this +// method may be able to use a link cache. +func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) { + if c.Type() == cid.Raw { + return nil, nil + } + if gl, ok := ng.(LinkGetter); ok { + return gl.GetLinks(ctx, c) + } + node, err := ng.Get(ctx, c) + if err != nil { + return nil, err + } + return node.Links(), nil +} + +// GetDAG will fill out all of the links of the given Node. +// It returns an array of NodePromise with the linked nodes all in the proper +// order. +func GetDAG(ctx context.Context, ds NodeGetter, root Node) []*NodePromise { + var cids []*cid.Cid + for _, lnk := range root.Links() { + cids = append(cids, lnk.Cid) + } + + return GetNodes(ctx, ds, cids) +} + +// GetNodes returns an array of 'FutureNode' promises, with each corresponding +// to the key with the same index as the passed in keys +func GetNodes(ctx context.Context, ds NodeGetter, keys []*cid.Cid) []*NodePromise { + + // Early out if no work to do + if len(keys) == 0 { + return nil + } + + promises := make([]*NodePromise, len(keys)) + for i := range keys { + promises[i] = NewNodePromise(ctx) + } + + dedupedKeys := dedupeKeys(keys) + go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + nodechan := ds.GetMany(ctx, dedupedKeys) + + for count := 0; count < len(keys); { + select { + case opt, ok := <-nodechan: + if !ok { + for _, p := range promises { + p.Fail(ErrNotFound) + } + return + } + + if opt.Err != nil { + for _, p := range promises { + p.Fail(opt.Err) + } + return + } + + nd := opt.Node + c := nd.Cid() + for i, lnk_c := range keys { + if c.Equals(lnk_c) { + count++ + promises[i].Send(nd) + } + } + case <-ctx.Done(): + return + } + } + }() + return promises +} + +// Remove duplicates from a list of keys +func dedupeKeys(cids []*cid.Cid) []*cid.Cid { + set := cid.NewSet() + for _, c := range cids { + set.Add(c) + } + return set.Keys() +} diff --git a/format.go b/format.go index 1e2f1f0..699c8c1 100644 --- a/format.go +++ b/format.go @@ -19,6 +19,10 @@ type Resolver interface { Tree(path string, depth int) []string } +// Node is the base interface all IPLD nodes must implement. +// +// Nodes are **Immutable** and all methods defined on the interface are +// **Thread Safe**. type Node interface { blocks.Block Resolver @@ -40,10 +44,6 @@ type Node interface { Size() (uint64, error) } -type NodeGetter interface { - Get(context.Context, *cid.Cid) (Node, error) -} - // Link represents an IPFS Merkle DAG Link between Nodes. type Link struct { // utf string name. should be unique per object diff --git a/merkledag.go b/merkledag.go new file mode 100644 index 0000000..058fb8f --- /dev/null +++ b/merkledag.go @@ -0,0 +1,63 @@ +package format + +import ( + "context" + "fmt" + + cid "github.com/ipfs/go-cid" +) + +var ErrNotFound = fmt.Errorf("merkledag: not found") + +// Either a node or an error. +type NodeOption struct { + Node Node + Err error +} + +// The basic Node resolution service. +type NodeGetter interface { + // Get retrieves nodes by CID. Depending on the NodeGetter + // implementation, this may involve fetching the Node from a remote + // machine; consider setting a deadline in the context. + Get(context.Context, *cid.Cid) (Node, error) + + // GetMany returns a channel of NodeOptions given a set of CIDs. + GetMany(context.Context, []*cid.Cid) <-chan *NodeOption +} + +// NodeGetters can optionally implement this interface to make finding linked +// objects faster. +type LinkGetter interface { + NodeGetter + + // TODO(ipfs/go-ipld-format#9): This should return []*cid.Cid + + // GetLinks returns the children of the node refered to by the given + // CID. + GetLinks(ctx context.Context, nd *cid.Cid) ([]*Link, error) +} + +// DAGService is an IPFS Merkle DAG service. +type DAGService interface { + NodeGetter + + // Add adds a node to this DAG. + Add(context.Context, Node) error + + // Remove removes a node from this DAG. + // + // Remove returns no error if the requested node is not present in this DAG. + Remove(context.Context, *cid.Cid) error + + // AddMany adds many nodes to this DAG. + // + // Consider using NewBatch instead of calling this directly if you need + // to add an unbounded number of nodes to avoid buffering too much. + AddMany(context.Context, []Node) error + + // RemoveMany removes many nodes from this DAG. + // + // It returns success even if the nodes were not present in the DAG. + RemoveMany(context.Context, []*cid.Cid) error +} diff --git a/promise.go b/promise.go new file mode 100644 index 0000000..02743b0 --- /dev/null +++ b/promise.go @@ -0,0 +1,66 @@ +package format + +import ( + "context" +) + +// NodePromise provides a promise like interface for a dag Node +// the first call to Get will block until the Node is received +// from its internal channels, subsequent calls will return the +// cached node. +// +// Thread Safety: This is multiple-consumer/single-producer safe. +func NewNodePromise(ctx context.Context) *NodePromise { + return &NodePromise{ + done: make(chan struct{}), + ctx: ctx, + } +} + +type NodePromise struct { + value Node + err error + done chan struct{} + + ctx context.Context +} + +// Call this function to fail a promise. +// +// Once a promise has been failed or fulfilled, further attempts to fail it will +// be silently dropped. +func (np *NodePromise) Fail(err error) { + if np.err != nil || np.value != nil { + // Already filled. + return + } + np.err = err + close(np.done) +} + +// Fulfill this promise. +// +// Once a promise has been fulfilled or failed, calling this function will +// panic. +func (np *NodePromise) Send(nd Node) { + // if promise has a value, don't fail it + if np.err != nil || np.value != nil { + panic("already filled") + } + np.value = nd + close(np.done) +} + +// Get the value of this promise. +// +// This function is safe to call concurrently from any number of goroutines. +func (np *NodePromise) Get(ctx context.Context) (Node, error) { + select { + case <-np.done: + return np.value, np.err + case <-np.ctx.Done(): + return nil, np.ctx.Err() + case <-ctx.Done(): + return nil, ctx.Err() + } +}