Skip to content

Commit

Permalink
port async batch commit code from ipfs
Browse files Browse the repository at this point in the history
(ipfs/kubo#4296)

1. Modern storage devices (i.e., SSDs) tend to be highly parallel.
2. Allows us to read and write at the same time (avoids pausing while flushing).

fixes ipfs/kubo#898 (comment)
  • Loading branch information
Stebalien committed Oct 16, 2017
1 parent 5225978 commit b14c461
Showing 1 changed file with 79 additions and 18 deletions.
97 changes: 79 additions & 18 deletions batch.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package format

import (
"runtime"

cid "github.com/ipfs/go-cid"
)

// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage
// devices, and CPUs to find the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2

// NewBatch returns a node buffer (Batch) that buffers nodes internally and
// commits them to the underlying DAGService in batches. Use this if you intend
// to add a lot of nodes all at once.
Expand All @@ -15,37 +22,91 @@ func NewBatch(ds DAGService) *Batch {
// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
// descriptors at the same time for the optimized batch write.
MaxBlocks: 128,
MaxNodes: 128,
}
}

// Batch is a buffer for batching adds to a dag.
type Batch struct {
ds DAGService

// TODO: try to re-use memory.
nodes []Node
size int
MaxSize int
MaxBlocks int
activeCommits int
commitError error
commitResults chan error

nodes []Node
size int

MaxSize int
MaxNodes int
}

func (t *Batch) processResults() {
for t.activeCommits > 0 && t.commitError == nil {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.commitError = err
}
default:
return
}
}
}

func (t *Batch) asyncCommit() {
numBlocks := len(t.nodes)
if numBlocks == 0 || t.commitError != nil {
return
}
if t.activeCommits >= ParallelBatchCommits {
err := <-t.commitResults
t.activeCommits--

if err != nil {
t.commitError = err
return
}
}
go func(b []Node) {
_, err := t.ds.AddMany(b)
t.commitResults <- err
}(t.nodes)

t.activeCommits++
t.nodes = make([]Node, 0, numBlocks)
t.size = 0

return
}

// Add a node to this batch of nodes, potentially committing the set of batched
// nodes to the underlying DAGService.
// Add adds a node to the batch and commits the batch if necessary.
func (t *Batch) Add(nd Node) (*cid.Cid, error) {
// Not strictly necessary but allows us to catch errors early.
t.processResults()
if t.commitError != nil {
return nil, t.commitError
}

t.nodes = append(t.nodes, nd)
t.size += len(nd.RawData())
if t.size > t.MaxSize || len(t.nodes) > t.MaxBlocks {
return nd.Cid(), t.Commit()
if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes {
t.asyncCommit()
}
return nd.Cid(), nil
return nd.Cid(), t.commitError
}

// Commit commits the buffered of nodes to the underlying DAGService.
// Make sure to call this after you're done adding nodes to the batch to ensure
// that they're actually added to the DAGService.
// Commit commits batched nodes.
func (t *Batch) Commit() error {
_, err := t.ds.AddMany(t.nodes)
t.nodes = nil
t.size = 0
return err
t.asyncCommit()
for t.activeCommits > 0 && t.commitError == nil {
err := <-t.commitResults
t.activeCommits--
if err != nil {
t.commitError = err
}
}

return t.commitError
}

0 comments on commit b14c461

Please sign in to comment.