Skip to content
This repository has been archived by the owner on Apr 29, 2020. It is now read-only.

Commit

Permalink
perf: concurrent file import (#41)
Browse files Browse the repository at this point in the history
* perf: concurrent file import

Adds two new options:

`fileImportConcurrency`

This controls the number of files that are imported concurrently.
You may wish to set this high if you are importing lots of small
files.

`blockWriteConcurrency`

This controls how many blocks from each file we write to disk at
the same time.  Setting this high when writing large files will
significantly increase import speed, though having it high when
`fileImportConcurrency` is also high can swamp the process.

It also:

1. Flattens module options because validating deep objects was
  clunky and the separation of access to config sub objects within
  this module isn't very good
1. Replaces `superstruct` and `deep-extend` with `merge-options`
  which is better suited for merging options and is smaller
1. Replaces `async-iterator-*` modules with the more zeitgeisty
  `it-*` namespace

Supersedes #38, sort of. No batching but atomicity guarantees are 
maintained and performance gains are broadly similar with the right
tuning.
  • Loading branch information
achingbrain committed Nov 27, 2019
1 parent b5e5b5a commit 68ac8cc
Show file tree
Hide file tree
Showing 22 changed files with 124 additions and 159 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,26 +124,26 @@ The input's file paths and directory structure will be preserved in the [`dag-pb
- `chunker` (string, defaults to `"fixed"`): the chunking strategy. Supports:
- `fixed`
- `rabin`
- `chunkerOptions` (object, optional): the options for the chunker. Defaults to an object with the following properties:
- `avgChunkSize` (positive integer, defaults to `262144`): the average chunk size (rabin chunker only)
- `minChunkSize` (positive integer): the minimum chunk size (rabin chunker only)
- `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size
- `avgChunkSize` (positive integer, defaults to `262144`): the average chunk size (rabin chunker only)
- `minChunkSize` (positive integer): the minimum chunk size (rabin chunker only)
- `maxChunkSize` (positive integer, defaults to `262144`): the maximum chunk size
- `strategy` (string, defaults to `"balanced"`): the DAG builder strategy name. Supports:
- `flat`: flat list of chunks
- `balanced`: builds a balanced tree
- `trickle`: builds [a trickle tree](https://github.com/ipfs/specs/pull/57#issuecomment-265205384)
- `maxChildrenPerNode` (positive integer, defaults to `174`): the maximum children per node for the `balanced` and `trickle` DAG builder strategies
- `layerRepeat` (positive integer, defaults to 4): (only applicable to the `trickle` DAG builder strategy). The maximum repetition of parent nodes for each layer of the tree.
- `reduceSingleLeafToSelf` (boolean, defaults to `true`): optimization for, when reducing a set of nodes with one node, reduce it to that node.
- `dirBuilder` (object): the options for the directory builder
- `hamt` (object): the options for the HAMT sharded directory builder
- bits (positive integer, defaults to `8`): the number of bits at each bucket of the HAMT
- `hamtHashFn` (async function(string) Buffer): a function that hashes file names to create HAMT shards
- `hamtBucketBits` (positive integer, defaults to `8`): the number of bits at each bucket of the HAMT
- `progress` (function): a function that will be called with the byte length of chunks as a file is added to ipfs.
- `onlyHash` (boolean, defaults to false): Only chunk and hash - do not write to disk
- `hashAlg` (string): multihash hashing algorithm to use
- `cidVersion` (integer, default 0): the CID version to use when storing the data (storage keys are based on the CID, _including_ it's version)
- `rawLeaves` (boolean, defaults to false): When a file would span multiple DAGNodes, if this is true the leaf nodes will not be wrapped in `UnixFS` protobufs and will instead contain the raw file bytes
- `leafType` (string, defaults to `'file'`) what type of UnixFS node leaves should be - can be `'file'` or `'raw'` (ignored when `rawLeaves` is `true`)
- `blockWriteConcurrency` (positive integer, defaults to 10) How many blocks to hash and write to the block store concurrently. For small numbers of large files this should be high (e.g. 50).
- `fileImportConcurrency` (number, defaults to 50) How many files to import concurrently. For large numbers of small files this should be high (e.g. 50).

[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs
Expand Down
17 changes: 9 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,34 @@
"homepage": "https://github.com/ipfs/js-ipfs-unixfs-importer#readme",
"devDependencies": {
"aegir": "^20.0.0",
"async-iterator-buffer-stream": "^1.0.0",
"async-iterator-last": "^1.0.0",
"chai": "^4.2.0",
"cids": "~0.7.1",
"deep-extend": "~0.6.0",
"detect-node": "^2.0.4",
"dirty-chai": "^2.0.1",
"ipfs-unixfs-exporter": "^0.39.0",
"ipld": "^0.25.0",
"ipld-in-memory": "^3.0.0",
"it-buffer-stream": "^1.0.0",
"it-last": "^1.0.0",
"multihashes": "~0.4.14",
"nyc": "^14.0.0",
"sinon": "^7.1.0"
},
"dependencies": {
"async-iterator-all": "^1.0.0",
"async-iterator-batch": "~0.0.1",
"async-iterator-first": "^1.0.0",
"bl": "^4.0.0",
"deep-extend": "~0.6.0",
"err-code": "^2.0.0",
"hamt-sharding": "~0.0.2",
"ipfs-unixfs": "^0.2.0",
"ipld-dag-pb": "^0.18.0",
"it-all": "^1.0.1",
"it-batch": "^1.0.3",
"it-first": "^1.0.1",
"it-parallel-batch": "1.0.2",
"merge-options": "^2.0.0",
"multicodec": "~0.5.1",
"multihashing-async": "^0.8.0",
"rabin-wasm": "~0.0.8",
"superstruct": "^0.8.2"
"rabin-wasm": "~0.0.8"
},
"contributors": [
"Alan Shaw <alan.shaw@protocol.ai>",
Expand Down
2 changes: 1 addition & 1 deletion src/dag-builder/file/balanced.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const batch = require('async-iterator-batch')
const batch = require('it-batch')

async function * balanced (source, reduce, options) {
yield await reduceToParents(source, reduce, options)
Expand Down
10 changes: 2 additions & 8 deletions src/dag-builder/file/flat.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
'use strict'

const batch = require('async-iterator-batch')
const all = require('it-all')

module.exports = async function * (source, reduce) {
const roots = []

for await (const chunk of batch(source, Infinity)) {
roots.push(await reduce(chunk))
}

yield roots[0]
yield await reduce(await all(source))
}
70 changes: 39 additions & 31 deletions src/dag-builder/file/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,62 @@ const {
DAGNode,
DAGLink
} = require('ipld-dag-pb')
const all = require('async-iterator-all')
const all = require('it-all')
const parallelBatch = require('it-parallel-batch')

const dagBuilders = {
flat: require('./flat'),
balanced: require('./balanced'),
trickle: require('./trickle')
}

async function * buildFile (file, source, ipld, options) {
let count = -1
let previous

async function * importBuffer (file, source, ipld, options) {
for await (const buffer of source) {
count++
options.progress(buffer.length)
let node
let unixfs
yield async () => {
options.progress(buffer.length)
let node
let unixfs

const opts = {
...options
}
const opts = {
...options
}

if (options.rawLeaves) {
node = buffer
if (options.rawLeaves) {
node = buffer

opts.codec = 'raw'
opts.cidVersion = 1
} else {
unixfs = new UnixFS(options.leafType, buffer)
opts.codec = 'raw'
opts.cidVersion = 1
} else {
unixfs = new UnixFS(options.leafType, buffer)

if (file.mtime) {
unixfs.mtime = file.mtime
}
if (file.mtime) {
unixfs.mtime = file.mtime
}

if (file.mode) {
unixfs.mode = file.mode
}

if (file.mode) {
unixfs.mode = file.mode
node = new DAGNode(unixfs.marshal())
}

node = new DAGNode(unixfs.marshal())
const cid = await persist(node, ipld, opts)

return {
cid: cid,
unixfs,
node
}
}
}
}

const cid = await persist(node, ipld, opts)
async function * buildFileBatch (file, source, ipld, options) {
let count = -1
let previous

const entry = {
cid: cid,
unixfs,
node
}
for await (const entry of parallelBatch(importBuffer(file, source, ipld, options), options.blockWriteConcurrency)) {
count++

if (count === 0) {
previous = entry
Expand Down Expand Up @@ -149,7 +157,7 @@ const fileBuilder = async (file, source, ipld, options) => {
throw errCode(new Error(`Unknown importer build strategy name: ${options.strategy}`), 'ERR_BAD_STRATEGY')
}

const roots = await all(dagBuilder(buildFile(file, source, ipld, options), reduce(file, ipld, options), options.builderOptions))
const roots = await all(dagBuilder(buildFileBatch(file, source, ipld, options), reduce(file, ipld, options), options))

if (roots.length > 1) {
throw errCode(new Error('expected a maximum of 1 roots and got ' + roots.length), 'ETOOMANYROOTS')
Expand Down
2 changes: 1 addition & 1 deletion src/dag-builder/file/trickle.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const batch = require('async-iterator-batch')
const batch = require('it-batch')

module.exports = function * trickleReduceToRoot (source, reduce, options) {
yield trickleStream(source, reduce, options)
Expand Down
6 changes: 3 additions & 3 deletions src/dag-builder/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ async function * dagBuilder (source, ipld, options) {
}
}

const chunker = createChunker(options.chunker, validateChunks(source), options.chunkerOptions)
const chunker = createChunker(options.chunker, validateChunks(source), options)

// item is a file
yield fileBuilder(entry, chunker, ipld, options)
yield () => fileBuilder(entry, chunker, ipld, options)
} else {
// item is a directory
yield dirBuilder(entry, ipld, options)
yield () => dirBuilder(entry, ipld, options)
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions src/dir-sharded.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const multihashing = require('multihashing-async')
const Dir = require('./dir')
const persist = require('./utils/persist')
const Bucket = require('hamt-sharding')
const extend = require('deep-extend')
const mergeOptions = require('merge-options').bind({ ignoreUndefined: true })

const hashFn = async function (value) {
const hash = await multihashing(Buffer.from(value, 'utf8'), 'murmur3-128')
Expand All @@ -31,16 +31,20 @@ const hashFn = async function (value) {
hashFn.code = 0x22 // TODO: get this from multihashing-async?

const defaultOptions = {
hashFn: hashFn
hamtHashFn: hashFn,
hamtBucketBits: 8
}

class DirSharded extends Dir {
constructor (props, options) {
options = extend({}, defaultOptions, options)
options = mergeOptions(defaultOptions, options)

super(props, options)

this._bucket = Bucket(options)
this._bucket = Bucket({
hashFn: options.hamtHashFn,
bits: options.hamtBucketBits
})
}

async put (name, value) {
Expand Down Expand Up @@ -139,7 +143,7 @@ async function * flush (path, bucket, ipld, shardRoot, options) {
const data = Buffer.from(children.bitField().reverse())
const dir = new UnixFS('hamt-sharded-directory', data)
dir.fanout = bucket.tableSize()
dir.hashType = options.hashFn.code
dir.hashType = options.hamtHashFn.code

if (shardRoot && shardRoot.mtime) {
dir.mtime = shardRoot.mtime
Expand Down
89 changes: 26 additions & 63 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,78 +1,41 @@
'use strict'

const { superstruct } = require('superstruct')
const dagBuilder = require('./dag-builder')
const treeBuilder = require('./tree-builder')
const mh = require('multihashes')
const parallelBatch = require('it-parallel-batch')
const mergeOptions = require('merge-options').bind({ ignoreUndefined: true })

const struct = superstruct({
types: {
codec: v => ['dag-pb', 'dag-cbor', 'raw'].includes(v),
hashAlg: v => Object.keys(mh.names).includes(v),
leafType: v => ['file', 'raw'].includes(v)
}
})

const ChunkerOptions = struct({
minChunkSize: 'number?',
maxChunkSize: 'number?',
avgChunkSize: 'number?',
window: 'number?',
polynomial: 'number?'
}, {
maxChunkSize: 262144,
avgChunkSize: 262144,
window: 16,
polynomial: 17437180132763653 // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11
})

const BuilderOptions = struct({
maxChildrenPerNode: 'number?',
layerRepeat: 'number?'
}, {
maxChildrenPerNode: 174,
layerRepeat: 4
})

const Options = struct({
chunker: struct.enum(['fixed', 'rabin']),
rawLeaves: 'boolean?',
hashOnly: 'boolean?',
strategy: struct.enum(['balanced', 'flat', 'trickle']),
reduceSingleLeafToSelf: 'boolean?',
codec: 'codec?',
format: 'codec?',
hashAlg: 'hashAlg?',
leafType: 'leafType?',
cidVersion: 'number?',
progress: 'function?',
wrapWithDirectory: 'boolean?',
shardSplitThreshold: 'number?',
onlyHash: 'boolean?',
chunkerOptions: ChunkerOptions,
builderOptions: BuilderOptions,

wrap: 'boolean?',
pin: 'boolean?',
recursive: 'boolean?',
ignore: 'array?',
hidden: 'boolean?',
preload: 'boolean?'
}, {
const defaultOptions = {
chunker: 'fixed',
strategy: 'balanced',
strategy: 'balanced', // 'flat', 'trickle'
rawLeaves: false,
onlyHash: false,
reduceSingleLeafToSelf: true,
codec: 'dag-pb',
hashAlg: 'sha2-256',
leafType: 'file',
leafType: 'file', // 'raw'
cidVersion: 0,
progress: () => () => {},
shardSplitThreshold: 1000
})
shardSplitThreshold: 1000,
fileImportConcurrency: 50,
blockWriteConcurrency: 10,
minChunkSize: 262144,
maxChunkSize: 262144,
avgChunkSize: 262144,
window: 16,
polynomial: 17437180132763653, // https://github.com/ipfs/go-ipfs-chunker/blob/d0125832512163708c0804a3cda060e21acddae4/rabin.go#L11
maxChildrenPerNode: 174,
layerRepeat: 4,
wrapWithDirectory: false,
pin: true,
recursive: false,
ignore: null, // []
hidden: false,
preload: true
}

module.exports = async function * (source, ipld, options = {}) {
const opts = Options(options)
const opts = mergeOptions(defaultOptions, options)

if (options.cidVersion > 0 && options.rawLeaves === undefined) {
// if the cid version is 1 or above, use raw leaves as this is
Expand All @@ -93,10 +56,10 @@ module.exports = async function * (source, ipld, options = {}) {
}

if (options.format) {
options.codec = options.format
opts.codec = options.format
}

for await (const entry of treeBuilder(dagBuilder(source, ipld, opts), ipld, opts)) {
for await (const entry of treeBuilder(parallelBatch(dagBuilder(source, ipld, opts), opts.fileImportConcurrency), ipld, opts)) {
yield {
cid: entry.cid,
path: entry.path,
Expand Down
Loading

0 comments on commit 68ac8cc

Please sign in to comment.