Skip to content

Commit

Permalink
refactor: async routing (#489)
Browse files Browse the repository at this point in the history
* feat: async routing

* chore: put dht extra api commands under content routing

* chore: add default option to createPeerInfo

Co-Authored-By: Jacob Heun <jacobheun@gmail.com>

* chore: address review

* chore: rm dlv
  • Loading branch information
vasco-santos authored and jacobheun committed Jan 24, 2020
1 parent f77ce39 commit a020db1
Show file tree
Hide file tree
Showing 24 changed files with 804 additions and 227 deletions.
9 changes: 6 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"dependencies": {
"abort-controller": "^3.0.0",
"async": "^2.6.2",
"async-iterator-all": "^1.0.0",
"bignumber.js": "^9.0.0",
"class-is": "^1.1.0",
"debug": "^4.1.1",
Expand All @@ -62,6 +63,7 @@
"multiaddr": "^7.2.1",
"multistream-select": "^0.15.0",
"once": "^1.4.0",
"p-any": "^2.1.0",
"p-map": "^3.0.0",
"p-queue": "^6.1.1",
"p-settle": "^3.1.0",
Expand Down Expand Up @@ -90,11 +92,11 @@
"interface-datastore": "^0.6.0",
"it-pair": "^1.0.0",
"libp2p-bootstrap": "^0.10.3",
"libp2p-delegated-content-routing": "^0.2.2",
"libp2p-delegated-peer-routing": "^0.2.2",
"libp2p-delegated-content-routing": "^0.4.1",
"libp2p-delegated-peer-routing": "^0.4.0",
"libp2p-floodsub": "^0.19.0",
"libp2p-gossipsub": "^0.1.0",
"libp2p-kad-dht": "~0.17.0",
"libp2p-kad-dht": "^0.18.0",
"libp2p-mdns": "^0.13.0",
"libp2p-mplex": "^0.9.1",
"libp2p-pnet": "~0.1.0",
Expand All @@ -105,6 +107,7 @@
"lodash.times": "^4.3.2",
"nock": "^10.0.6",
"p-defer": "^3.0.0",
"p-times": "^2.1.0",
"p-wait-for": "^3.1.0",
"portfinder": "^1.0.20",
"pull-goodbye": "0.0.2",
Expand Down
125 changes: 77 additions & 48 deletions src/content-routing.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
'use strict'

const tryEach = require('async/tryEach')
const parallel = require('async/parallel')
const errCode = require('err-code')
const promisify = require('promisify-es6')
const { messages, codes } = require('./errors')

const all = require('async-iterator-all')
const pAny = require('p-any')

module.exports = (node) => {
const routers = node._modules.contentRouting || []
const dht = node._dht

// If we have the dht, make it first
if (node._dht) {
routers.unshift(node._dht)
if (dht) {
routers.unshift(dht)
}

return {
Expand All @@ -19,66 +21,93 @@ module.exports = (node) => {
* Once a content router succeeds, iteration will stop.
*
* @param {CID} key The CID key of the content to find
* @param {object} options
* @param {number} options.maxTimeout How long the query should run
* @param {number} options.maxNumProviders - maximum number of providers to find
* @param {function(Error, Result<Array>)} callback
* @returns {void}
* @param {object} [options]
* @param {number} [options.timeout] How long the query should run
* @param {number} [options.maxNumProviders] - maximum number of providers to find
* @returns {AsyncIterable<PeerInfo>}
*/
findProviders: promisify((key, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
} else if (typeof options === 'number') { // This can be deprecated in a future release
options = {
maxTimeout: options
}
}

async * findProviders (key, options) {
if (!routers.length) {
return callback(errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE'))
throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')
}

const tasks = routers.map((router) => {
return (cb) => router.findProviders(key, options, (err, results) => {
if (err) {
return cb(err)
}
const result = await pAny(
routers.map(async (router) => {
const provs = await all(router.findProviders(key, options))

// If we don't have any results, we need to provide an error to keep trying
if (!results || Object.keys(results).length === 0) {
return cb(errCode(new Error('not found'), 'NOT_FOUND'), null)
if (!provs || !provs.length) {
throw errCode(new Error('not found'), 'NOT_FOUND')
}

cb(null, results)
return provs
})
})
)

tryEach(tasks, (err, results) => {
if (err && err.code !== 'NOT_FOUND') {
return callback(err)
}
results = results || []
callback(null, results)
})
}),
for (const pInfo of result) {
yield pInfo
}
},

/**
* Iterates over all content routers in parallel to notify it is
* a provider of the given key.
*
* @param {CID} key The CID key of the content to find
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
provide: promisify((key, callback) => {
async provide (key) { // eslint-disable-line require-await
if (!routers.length) {
return callback(errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE'))
throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')
}

return Promise.all(routers.map((router) => router.provide(key)))
},

/**
* Store the given key/value pair in the DHT.
* @param {Buffer} key
* @param {Buffer} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put
* @returns {Promise<void>}
*/
async put (key, value, options) { // eslint-disable-line require-await
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

return dht.put(key, value, options)
},

/**
* Get the value to the given key.
* Times out after 1 minute by default.
* @param {Buffer} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<{from: PeerId, val: Buffer}>}
*/
async get (key, options) { // eslint-disable-line require-await
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

return dht.get(key, options)
},

/**
* Get the `n` values to the given key without sorting.
* @param {Buffer} key
* @param {number} nVals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Array<{from: PeerId, val: Buffer}>>}
*/
async getMany (key, nVals, options) { // eslint-disable-line require-await
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

parallel(routers.map((router) => {
return (cb) => router.provide(key, cb)
}), callback)
})
return dht.getMany(key, nVals, options)
}
}
}
72 changes: 0 additions & 72 deletions src/dht.js

This file was deleted.

13 changes: 10 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const multiaddr = require('multiaddr')

const peerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const dht = require('./dht')
const pubsub = require('./pubsub')
const { getPeerInfo, getPeerInfoRemote } = require('./get-peer-info')
const { validate: validateConfig } = require('./config')
Expand Down Expand Up @@ -124,7 +123,15 @@ class Libp2p extends EventEmitter {

// dht provided components (peerRouting, contentRouting, dht)
if (this._modules.dht) {
this._dht = dht(this, this._modules.dht, this._config.dht)
const DHT = this._modules.dht
this._dht = new DHT({
dialer: this.dialer,
peerInfo: this.peerInfo,
peerStore: this.peerStore,
registrar: this.registrar,
datastore: this.datastore,
...this._config.dht
})
}

// start pubsub
Expand Down Expand Up @@ -333,7 +340,7 @@ class Libp2p extends EventEmitter {

// TODO: this should be modified once random-walk is used as
// the other discovery modules
this._dht._dht.on('peer', this._peerDiscovered)
this._dht.on('peer', this._peerDiscovered)
}
}

Expand Down
49 changes: 15 additions & 34 deletions src/peer-routing.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
'use strict'

const tryEach = require('async/tryEach')
const errCode = require('err-code')
const promisify = require('promisify-es6')
const pAny = require('p-any')

module.exports = (node) => {
const routers = node._modules.peerRouting || []
Expand All @@ -17,43 +16,25 @@ module.exports = (node) => {
* Iterates over all peer routers in series to find the given peer.
*
* @param {String} id The id of the peer to find
* @param {object} options
* @param {number} options.maxTimeout How long the query should run
* @param {function(Error, Result<Array>)} callback
* @returns {void}
* @param {object} [options]
* @param {number} [options.timeout] How long the query should run
* @returns {Promise<PeerInfo>}
*/
findPeer: promisify((id, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}

findPeer: async (id, options) => { // eslint-disable-line require-await
if (!routers.length) {
callback(errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE'))
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
}

const tasks = routers.map((router) => {
return (cb) => router.findPeer(id, options, (err, result) => {
if (err) {
return cb(err)
}

// If we don't have a result, we need to provide an error to keep trying
if (!result || Object.keys(result).length === 0) {
return cb(errCode(new Error('not found'), 'NOT_FOUND'), null)
}
return pAny(routers.map(async (router) => {
const result = await router.findPeer(id, options)

cb(null, result)
})
})

tryEach(tasks, (err, results) => {
if (err) {
return callback(err)
// If we don't have a result, we need to provide an error to keep trying
if (!result || Object.keys(result).length === 0) {
throw errCode(new Error('not found'), 'NOT_FOUND')
}
results = results || []
callback(null, results)
})
})

return result
}))
}
}
}
Loading

0 comments on commit a020db1

Please sign in to comment.