diff --git a/merkledag/batch.go b/merkledag/batch.go new file mode 100644 index 00000000000..5879ad99b2d --- /dev/null +++ b/merkledag/batch.go @@ -0,0 +1,98 @@ +package merkledag + +import ( + "runtime" + + cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" + node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format" + blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format" +) + +// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking. +// TODO: Experiment with multiple datastores, storage devices, and CPUs to find +// the right value/formula. +var ParallelBatchCommits = runtime.NumCPU() * 2 + +// Batch is a buffer for batching adds to a dag. +type Batch struct { + ds *dagService + + activeCommits int + commitError error + commitResults chan error + + blocks []blocks.Block + size int + + MaxSize int + MaxBlocks int +} + +func (t *Batch) processResults() { + for t.activeCommits > 0 && t.commitError == nil { + select { + case err := <-t.commitResults: + t.activeCommits-- + if err != nil { + t.commitError = err + } + default: + return + } + } +} + +func (t *Batch) asyncCommit() { + if len(t.blocks) == 0 || t.commitError != nil { + return + } + if t.activeCommits >= ParallelBatchCommits { + err := <-t.commitResults + t.activeCommits-- + + if err != nil { + t.commitError = err + return + } + } + go func(b []blocks.Block) { + _, err := t.ds.Blocks.AddBlocks(b) + t.commitResults <- err + }(t.blocks) + + t.activeCommits++ + t.blocks = nil + t.size = 0 + + return +} + +// Add adds a node to the batch and commits the batch if necessary. +func (t *Batch) Add(nd node.Node) (*cid.Cid, error) { + // Not strictly necessary but allows us to catch errors early. + t.processResults() + if t.commitError != nil { + return nil, t.commitError + } + + t.blocks = append(t.blocks, nd) + t.size += len(nd.RawData()) + if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks { + t.asyncCommit() + } + return nd.Cid(), t.commitError +} + +// Commit commits batched nodes. +func (t *Batch) Commit() error { + t.asyncCommit() + for t.activeCommits > 0 && t.commitError == nil { + err := <-t.commitResults + t.activeCommits-- + if err != nil { + t.commitError = err + } + } + + return t.commitError +} diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 214fbbd8407..92cb5fa866f 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -11,7 +11,6 @@ import ( cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format" - blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format" ipldcbor "gx/ipfs/QmWCs8kMecJwCPK8JThue8TjgM2ieJ2HjTLDu7Cv2NEmZi/go-ipld-cbor" ) @@ -75,8 +74,9 @@ func (n *dagService) Add(nd node.Node) (*cid.Cid, error) { func (n *dagService) Batch() *Batch { return &Batch{ - ds: n, - MaxSize: 8 << 20, + ds: n, + commitResults: make(chan error, ParallelBatchCommits), + MaxSize: 8 << 20, // By default, only batch up to 128 nodes at a time. // The current implementation of flatfs opens this many file @@ -389,31 +389,6 @@ func (np *nodePromise) Get(ctx context.Context) (node.Node, error) { } } -type Batch struct { - ds *dagService - - blocks []blocks.Block - size int - MaxSize int - MaxBlocks int -} - -func (t *Batch) Add(nd node.Node) (*cid.Cid, error) { - t.blocks = append(t.blocks, nd) - t.size += len(nd.RawData()) - if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks { - return nd.Cid(), t.Commit() - } - return nd.Cid(), nil -} - -func (t *Batch) Commit() error { - _, err := t.ds.Blocks.AddBlocks(t.blocks) - t.blocks = nil - t.size = 0 - return err -} - type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error) // EnumerateChildren will walk the dag below the given root node and add all