Skip to content

Commit

Permalink
feat: add token based dialer
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun committed Dec 12, 2019
1 parent 0cacfe2 commit f8540fa
Show file tree
Hide file tree
Showing 14 changed files with 611 additions and 138 deletions.
1 change: 1 addition & 0 deletions doc/DIALER.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* As tokens are limited, DialRequests should be given a prioritized list of Multiaddrs to minimize the potential request time.
* Once a single Multiaddr Dial has succeeded, all pending dials in that Dial Request should be aborted. All tokens should be immediately released to the Dialer.
* If all Multiaddr Dials fail, or the DIAL_TIMEOUT max is reached for the entire DialRequest, all in progress dials for that DialRequest should be aborted. All tokens should immediately be released to the Dialer.
* If a Multiaddr Dial fails and there are no more dials to use its token, that token should be immediately released to the Dialer.

## Multiaddr Confidence

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
},
"dependencies": {
"abort-controller": "^3.0.0",
"aggregate-error": "^3.0.1",
"async": "^2.6.2",
"async-iterator-all": "^1.0.0",
"bignumber.js": "^9.0.0",
Expand All @@ -67,6 +68,7 @@
"p-map": "^3.0.0",
"p-queue": "^6.1.1",
"p-settle": "^3.1.0",
"paramap-it": "^0.1.1",
"peer-id": "^0.13.4",
"peer-info": "^0.17.0",
"promisify-es6": "^1.0.3",
Expand Down
1 change: 1 addition & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module.exports = {
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued
MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials
PER_PEER_LIMIT: 4, // Allowed parallel dials per DialRequest
QUARTER_HOUR: 15 * 60e3,
PRIORITY_HIGH: 10,
PRIORITY_LOW: 20
Expand Down
161 changes: 95 additions & 66 deletions src/dialer.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
'use strict'

const nextTick = require('async/nextTick')
const multiaddr = require('multiaddr')
const errCode = require('err-code')
const { default: PQueue } = require('p-queue')
const AbortController = require('abort-controller')
const delay = require('delay')
const debug = require('debug')
const log = debug('libp2p:dialer')
log.error = debug('libp2p:dialer:error')
const PeerId = require('peer-id')
const { DialRequest } = require('./dialer/dial-request')
const { anySignal } = require('./util')

const { codes } = require('./errors')
const {
DIAL_TIMEOUT,
MAX_PARALLEL_DIALS,
DIAL_TIMEOUT
PER_PEER_LIMIT
} = require('./constants')

class Dialer {
Expand All @@ -29,106 +30,134 @@ class Dialer {
transportManager,
peerStore,
concurrency = MAX_PARALLEL_DIALS,
timeout = DIAL_TIMEOUT
timeout = DIAL_TIMEOUT,
perPeerLimit = PER_PEER_LIMIT
}) {
this.transportManager = transportManager
this.peerStore = peerStore
this.concurrency = concurrency
this.timeout = timeout
this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true })
this.perPeerLimit = perPeerLimit
this.tokens = [...new Array(concurrency)].map((_, index) => index)

/**
* @property {IdentifyService}
*/
this._identifyService = null
}

set identifyService (service) {
this._identifyService = service
}

/**
* @type {IdentifyService}
*/
get identifyService () {
return this._identifyService
this.releaseToken = this.releaseToken.bind(this)
}

/**
* Connects to a given `Multiaddr`. `addr` should include the id of the peer being
* dialed, it will be used for encryption verification.
*
* @async
* @param {Multiaddr} addr The address to dial
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
*/
async connectToMultiaddr (addr, options = {}) {
connectToMultiaddr (addr, options = {}) {
addr = multiaddr(addr)
let conn
let controller

if (!options.signal) {
controller = new AbortController()
options.signal = controller.signal
}
return this.connectToMultiaddrs([addr], options)
}

/**
* Connects to the first success of a given list of `Multiaddr`. `addrs` should
* include the id of the peer being dialed, it will be used for encryption verification.
*
* @param {Array<Multiaddr>} addrs
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
*/
async connectToMultiaddrs (addrs, options = {}) {
const dialAction = (addr, options) => this.transportManager.dial(addr, options)
const dialRequest = new DialRequest({
addrs,
dialAction,
dialer: this
})

// Combine the timeout signal and options.signal, if provided
const timeoutController = new AbortController()
const signals = [timeoutController.signal]
options.signal && signals.push(options.signal)
const signal = anySignal(signals)
const timeoutPromise = delay.reject(this.timeout, {
value: errCode(new Error('Dial timed out'), codes.ERR_TIMEOUT)
})

try {
conn = await this.queue.add(() => this.transportManager.dial(addr, options))
// Race the dial request and the timeout
const dialResult = await Promise.race([
dialRequest.run({
...options,
signal
}),
timeoutPromise
])
timeoutPromise.clear()
return dialResult
} catch (err) {
if (err.name === 'TimeoutError') {
controller.abort()
err.code = codes.ERR_TIMEOUT
}
log.error('Error dialing address %s,', addr, err)
log.error(err)
timeoutController.abort()
throw err
}

// Perform a delayed Identify handshake
if (this.identifyService) {
nextTick(async () => {
try {
await this.identifyService.identify(conn, conn.remotePeer)
} catch (err) {
log.error(err)
}
})
}

return conn
}

/**
* Connects to a given `PeerInfo` or `PeerId` by dialing all of its known addresses.
* The dial to the first address that is successfully able to upgrade a connection
* will be used.
*
* @async
* @param {PeerInfo|PeerId} peer The remote peer to dial
* @param {PeerId} peerId The remote peer id to dial
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
*/
async connectToPeer (peer, options = {}) {
if (PeerId.isPeerId(peer)) {
peer = this.peerStore.get(peer.toB58String())
}
connectToPeer (peerId, options = {}) {
const addrs = this.peerStore.multiaddrsForPeer(peerId)

const addrs = peer.multiaddrs.toArray()
for (const addr of addrs) {
try {
return await this.connectToMultiaddr(addr, options)
} catch (_) {
// The error is already logged, just move to the next addr
continue
}
}
// TODO: ensure the peer id is on the multiaddr

return this.connectToMultiaddrs(addrs, options)
}

const err = errCode(new Error('Could not dial peer, all addresses failed'), codes.ERR_CONNECTION_FAILED)
log.error(err)
throw err
getTokens (num) {
const total = Math.min(num, this.perPeerLimit, this.tokens.length)
const tokens = this.tokens.splice(0, total)
log('%d tokens request, returning %d, %d remaining', num, total, this.tokens.length)
return tokens
}

releaseToken (token) {
log('token %d released', token)
this.tokens.push(token)
}
}

module.exports = Dialer

// class ActionLimiter {
// constructor(actions, options = {}) {
// this.actions = actions
// this.limit = options.limit || 4
// this.controller = options.controller || new AbortController()
// }
// async abort () {
// this.controller.abort()
// }
// async run () {
// const limit = pLimit(this.limit)
// let result
// try {
// result = await pAny(this.actions.map(action => limit(action)))
// } catch (err) {
// console.log(err)
// if (!err.code) err.code = codes.ERR_CONNECTION_FAILED
// log.error(err)
// throw err
// } finally {
// console.log('RES', result)
// this.controller.abort()
// }
// return result
// }
// }
Loading

0 comments on commit f8540fa

Please sign in to comment.