Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
feat: send files HTTP request should stream (#629)
Browse files Browse the repository at this point in the history
* files add multipart HTTP request streams for real

* fixed dangling multipart stream

* multipart: better backpressure: waiting for drain before resuming file content

* src/utils/request-api.js renamed to send-request.js

* only do file backpressure on node because browser HTTP
  • Loading branch information
pgte authored and daviddias committed Nov 20, 2017
1 parent d4b8ed1 commit dae62cb
Show file tree
Hide file tree
Showing 20 changed files with 444 additions and 215 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
"lru-cache": "^4.1.1",
"multiaddr": "^3.0.1",
"multihashes": "~0.4.12",
"multipart-stream": "^2.0.1",
"ndjson": "^1.5.0",
"once": "^1.4.0",
"peer-id": "~0.10.2",
Expand Down
29 changes: 15 additions & 14 deletions src/block/put.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,35 @@
const promisify = require('promisify-es6')
const Block = require('ipfs-block')
const CID = require('cids')
const once = require('once')
const SendOneFile = require('../utils/send-one-file')

module.exports = (send) => {
return promisify((block, cid, callback) => {
const sendOneFile = SendOneFile(send, 'block/put')

return promisify((block, cid, _callback) => {
// TODO this needs to be adjusted with the new go-ipfs http-api
if (typeof cid === 'function') {
callback = cid
_callback = cid
cid = {}
}

const callback = once(_callback)

if (Array.isArray(block)) {
const err = new Error('block.put() only accepts 1 file')
return callback(err)
return callback(new Error('block.put accepts only one block'))
}

if (typeof block === 'object' && block.data) {
block = block.data
}

const request = {
path: 'block/put',
files: block
}

// Transform the response to a Block
const transform = (info, callback) => {
callback(null, new Block(block, new CID(info.Key)))
}
sendOneFile(block, {}, (err, result) => {
if (err) {
return callback(err) // early
}

send.andTransform(request, transform, callback)
callback(null, new Block(block, new CID(result.Key)))
})
})
}
30 changes: 3 additions & 27 deletions src/files/add-pull-stream.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,6 @@
'use strict'

const addCmd = require('./add.js')
const pull = require('pull-stream')
const pushable = require('pull-pushable')
const SendFilesStream = require('../utils/send-files-stream')
const toPull = require('stream-to-pull-stream')

module.exports = (send) => {
const add = addCmd(send)

return (options) => {
options = options || {}

const source = pushable()
const sink = pull.collect((err, tuples) => {
if (err) { return source.end(err) }

add(tuples, options, (err, filesAdded) => {
if (err) { return source.end(err) }

filesAdded.forEach((file) => source.push(file))
source.end()
})
})

return {
sink: sink,
source: source
}
}
}
module.exports = (send) => (options) => toPull(SendFilesStream(send, 'add')(options))
30 changes: 2 additions & 28 deletions src/files/add-readable-stream.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,5 @@
'use strict'

const addCmd = require('./add.js')
const Duplex = require('readable-stream').Duplex
const SendFilesStream = require('../utils/send-files-stream')

module.exports = (send) => {
const add = addCmd(send)

return (options) => {
options = options || {}

const tuples = []

const ds = new Duplex({ objectMode: true })
ds._read = (n) => {}

ds._write = (file, enc, next) => {
tuples.push(file)
next()
}

ds.end = () => add(tuples, options, (err, res) => {
if (err) { return ds.emit('error', err) }

res.forEach((tuple) => ds.push(tuple))
ds.push(null)
})

return ds
}
}
module.exports = (send) => SendFilesStream(send, 'add')
59 changes: 25 additions & 34 deletions src/files/add.js
Original file line number Diff line number Diff line change
@@ -1,51 +1,42 @@
'use strict'

const isStream = require('is-stream')
const promisify = require('promisify-es6')
const ProgressStream = require('../utils/progress-stream')
const converter = require('../utils/converter')
const ConcatStream = require('concat-stream')
const once = require('once')
const isStream = require('is-stream')
const SendFilesStream = require('../utils/send-files-stream')

module.exports = (send) => {
return promisify((files, opts, callback) => {
if (typeof opts === 'function') {
callback = opts
opts = {}
}
const createAddStream = SendFilesStream(send, 'add')

opts = opts || {}

const ok = Buffer.isBuffer(files) ||
isStream.readable(files) ||
Array.isArray(files)

if (!ok) {
return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
return promisify((_files, options, _callback) => {
if (typeof options === 'function') {
_callback = options
options = null
}

const qs = {}
const callback = once(_callback)

if (opts['cid-version'] != null) {
qs['cid-version'] = opts['cid-version']
} else if (opts.cidVersion != null) {
qs['cid-version'] = opts.cidVersion
if (!options) {
options = {}
}

if (opts['raw-leaves'] != null) {
qs['raw-leaves'] = opts['raw-leaves']
} else if (opts.rawLeaves != null) {
qs['raw-leaves'] = opts.rawLeaves
}
const ok = Buffer.isBuffer(_files) ||
isStream.readable(_files) ||
Array.isArray(_files)

if (opts.hash != null) {
qs.hash = opts.hash
} else if (opts.hashAlg != null) {
qs.hash = opts.hashAlg
if (!ok) {
return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
}

const request = { path: 'add', files: files, qs: qs, progress: opts.progress }
const files = [].concat(_files)

const stream = createAddStream(options)
const concat = ConcatStream((result) => callback(null, result))
stream.once('error', callback)
stream.pipe(concat)

send.andTransform(request, (response, cb) => {
converter(ProgressStream.fromStream(opts.progress, response), cb)
}, callback)
files.forEach((file) => stream.write(file))
stream.end()
})
}
34 changes: 24 additions & 10 deletions src/files/write.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
'use strict'

const promisify = require('promisify-es6')
const concatStream = require('concat-stream')
const once = require('once')
const SendFilesStream = require('../utils/send-files-stream')

module.exports = (send) => {
return promisify((pathDst, files, opts, callback) => {
const sendFilesStream = SendFilesStream(send, 'files/write')

return promisify((pathDst, _files, opts, _callback) => {
if (typeof opts === 'function' &&
!callback) {
callback = opts
!_callback) {
_callback = opts
opts = {}
}

// opts is the real callback --
// 'callback' is being injected by promisify
if (typeof opts === 'function' &&
typeof callback === 'function') {
callback = opts
typeof _callback === 'function') {
_callback = opts
opts = {}
}

send({
path: 'files/write',
const files = [].concat(_files)
const callback = once(_callback)

const options = {
args: pathDst,
qs: opts,
files: files
}, callback)
qs: opts
}

const stream = sendFilesStream(options)
const concat = concatStream((result) => callback(null, result))
stream.once('error', callback)
stream.pipe(concat)

files.forEach((file) => stream.write(file))
stream.end()
})
}
4 changes: 2 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const multiaddr = require('multiaddr')
const loadCommands = require('./utils/load-commands')
const getConfig = require('./utils/default-config')
const getRequestAPI = require('./utils/request-api')
const sendRequest = require('./utils/send-request')

function IpfsAPI (hostOrMultiaddr, port, opts) {
const config = getConfig()
Expand Down Expand Up @@ -35,7 +35,7 @@ function IpfsAPI (hostOrMultiaddr, port, opts) {
config.port = split[1]
}

const requestAPI = getRequestAPI(config)
const requestAPI = sendRequest(config)
const cmds = loadCommands(requestAPI)
cmds.send = requestAPI
cmds.Buffer = Buffer
Expand Down
15 changes: 8 additions & 7 deletions src/object/appendData.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
'use strict'

const promisify = require('promisify-es6')
const once = require('once')
const cleanMultihash = require('../utils/clean-multihash')
const SendOneFile = require('../utils/send-one-file')

module.exports = (send) => {
const objectGet = require('./get')(send)
const sendOneFile = SendOneFile(send, 'object/patch/append-data')

return promisify((multihash, data, opts, callback) => {
return promisify((multihash, data, opts, _callback) => {
if (typeof opts === 'function') {
callback = opts
_callback = opts
opts = {}
}
const callback = once(_callback)
if (!opts) {
opts = {}
}
Expand All @@ -21,14 +25,11 @@ module.exports = (send) => {
return callback(err)
}

send({
path: 'object/patch/append-data',
args: [multihash],
files: data
}, (err, result) => {
sendOneFile(data, { args: [multihash] }, (err, result) => {
if (err) {
return callback(err)
}

objectGet(result.Hash, { enc: 'base58' }, callback)
})
})
Expand Down
23 changes: 15 additions & 8 deletions src/object/put.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@ const lruOptions = {
}

const cache = LRU(lruOptions)
const SendOneFile = require('../utils/send-one-file')
const once = require('once')

module.exports = (send) => {
return promisify((obj, options, callback) => {
const sendOneFile = SendOneFile(send, 'object/put')

return promisify((obj, options, _callback) => {
if (typeof options === 'function') {
callback = options
_callback = options
options = {}
}

const callback = once(_callback)

if (!options) {
options = {}
}
Expand Down Expand Up @@ -56,13 +63,13 @@ module.exports = (send) => {
}
const enc = options.enc || 'json'

send({
path: 'object/put',
qs: { inputenc: enc },
files: buf
}, (err, result) => {
const sendOptions = {
qs: { inputenc: enc }
}

sendOneFile(buf, sendOptions, (err, result) => {
if (err) {
return callback(err)
return callback(err) // early
}

if (Buffer.isBuffer(obj)) {
Expand Down
14 changes: 7 additions & 7 deletions src/object/setData.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
'use strict'

const promisify = require('promisify-es6')
const once = require('once')
const cleanMultihash = require('../utils/clean-multihash')
const SendOneFile = require('../utils/send-one-file')

module.exports = (send) => {
const objectGet = require('./get')(send)
const sendOneFile = SendOneFile(send, 'object/patch/set-data')

return promisify((multihash, data, opts, callback) => {
return promisify((multihash, data, opts, _callback) => {
if (typeof opts === 'function') {
callback = opts
_callback = opts
opts = {}
}
const callback = once(_callback)
if (!opts) {
opts = {}
}
Expand All @@ -21,11 +25,7 @@ module.exports = (send) => {
return callback(err)
}

send({
path: 'object/patch/set-data',
args: [multihash],
files: data
}, (err, result) => {
sendOneFile(data, { args: [multihash] }, (err, result) => {
if (err) {
return callback(err)
}
Expand Down
Loading

0 comments on commit dae62cb

Please sign in to comment.