diff --git a/package.json b/package.json index 844abe1..6d9f330 100644 --- a/package.json +++ b/package.json @@ -68,6 +68,7 @@ "peer-id": "^0.7.0", "peer-info": "^0.7.0", "protocol-buffers": "^3.1.6", + "pull-stream": "^3.4.3", "run-parallel": "^1.1.6" }, "contributors": [ diff --git a/src/connection.js b/src/connection.js index 1a67b4b..d9f02b3 100644 --- a/src/connection.js +++ b/src/connection.js @@ -1,8 +1,10 @@ 'use strict' -const protocolMuxer = require('./protocol-muxer') const identify = require('libp2p-identify') const multistream = require('multistream-select') +const pull = require('pull-stream') + +const protocolMuxer = require('./protocol-muxer') module.exports = function connection (swarm) { return { @@ -14,7 +16,7 @@ module.exports = function connection (swarm) { // for listening swarm.handle(muxer.multicodec, (conn) => { - const muxedConn = muxer(conn, true) + const muxedConn = muxer.listen(conn) muxedConn.on('stream', (conn) => { protocolMuxer(swarm.protocols, conn) @@ -35,7 +37,7 @@ module.exports = function connection (swarm) { ms.select(identify.multicodec, (err, conn) => { if (err) { return cb(err) } - identify.exec(conn, (err, peerInfo, observedAddrs) => { + identify.listen(conn, (err, peerInfo, observedAddrs) => { if (err) { return cb(err) } observedAddrs.forEach((oa) => { @@ -57,10 +59,13 @@ module.exports = function connection (swarm) { } swarm.emit('peer-mux-established', peerInfo) - muxedConn.on('close', () => { - delete swarm.muxedConns[peerInfo.id.toB58String()] - swarm.emit('peer-mux-closed', peerInfo) - }) + pull( + muxedConn, + pull.onEnd(() => { + delete swarm.muxedConns[peerInfo.id.toB58String()] + swarm.emit('peer-mux-closed', peerInfo) + }) + ) }) } }) @@ -68,7 +73,9 @@ module.exports = function connection (swarm) { reuse () { swarm.identify = true - swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo)) + swarm.handle(identify.multicodec, (conn) => { + identify.dial(conn, swarm._peerInfo) + }) } } } diff --git a/src/dial.js b/src/dial.js index b4d05b4..c4c1ae1 100644 --- a/src/dial.js +++ b/src/dial.js @@ -2,6 +2,8 @@ const multistream = require('multistream-select') const Connection = require('interface-connection').Connection +const debug = require('debug') +const log = debug('libp2p:swarm:dial') const protocolMuxer = require('./protocol-muxer') const secio = require('./secio') @@ -21,6 +23,7 @@ module.exports = function dial (swarm) { const proxyConn = new Connection() const b58Id = pi.id.toB58String() + log('dialing %s', b58Id) if (!swarm.muxedConns[b58Id]) { if (!swarm.conns[b58Id]) { @@ -45,8 +48,10 @@ module.exports = function dial (swarm) { return proxyConn function gotWarmedUpConn (conn) { + if (!conn.setPeerInfo) { + conn = new Connection(conn) + } conn.setPeerInfo(pi) - attemptMuxerUpgrade(conn, (err, muxer) => { if (!protocol) { if (err) { @@ -142,6 +147,7 @@ module.exports = function dial (swarm) { if (err) { return callback(new Error('multistream not supported')) } + log('selecting %s', key) ms.select(key, (err, conn) => { if (err) { if (muxers.length === 0) { @@ -152,7 +158,7 @@ module.exports = function dial (swarm) { return } - const muxedConn = swarm.muxers[key](conn, false) + const muxedConn = swarm.muxers[key].dial(conn) swarm.muxedConns[b58Id] = {} swarm.muxedConns[b58Id].muxer = muxedConn // should not be needed anymore - swarm.muxedConns[b58Id].conn = conn @@ -161,7 +167,6 @@ module.exports = function dial (swarm) { muxedConn.once('close', () => { delete swarm.muxedConns[pi.id.toB58String()] - conn.end() swarm.emit('peer-mux-closed', pi) }) diff --git a/src/secio.js b/src/secio.js index a4723dc..42a4590 100644 --- a/src/secio.js +++ b/src/secio.js @@ -6,5 +6,5 @@ exports = module.exports exports.create = (local, insecure) => { const session = new SecureSession(local, local.privKey, insecure) - return session.secureStream() + return session.secure } diff --git a/src/transport.js b/src/transport.js index 15b6871..56be25d 100644 --- a/src/transport.js +++ b/src/transport.js @@ -2,6 +2,9 @@ const Connection = require('interface-connection').Connection const parallel = require('run-parallel') +const pull = require('pull-stream') +const debug = require('debug') +const log = debug('libp2p:swarm:transport') const protocolMuxer = require('./protocol-muxer') @@ -14,7 +17,7 @@ module.exports = function (swarm) { } if (!callback) { callback = noop } - + log('adding %s', key) if (swarm.transports[key]) { throw new Error('There is already a transport with this key') } @@ -32,7 +35,7 @@ module.exports = function (swarm) { if (!Array.isArray(multiaddrs)) { multiaddrs = [multiaddrs] } - + log('dialing %s', key, multiaddrs.map((m) => m.toString())) // a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) multiaddrs = dialables(t, multiaddrs) @@ -40,18 +43,8 @@ module.exports = function (swarm) { // transport, otherwise, create a passthrough if (multiaddrs.length === 1) { const conn = t.dial(multiaddrs.shift()) - - conn.once('error', connectError) - - conn.once('connect', () => { - conn.removeListener('error', connectError) - callback(null, conn) - }) - - return conn - } - function connectError () { - callback(new Error('failed to connect to every multiaddr')) + callback(null, new Connection(conn)) + return } // c) multiaddrs should already be a filtered list @@ -60,23 +53,9 @@ module.exports = function (swarm) { next(multiaddrs.shift()) - return proxyConn - // TODO improve in the future to make all the dials in paralell function next (multiaddr) { - const conn = t.dial(multiaddr) - - conn.once('error', connectError) - - function connectError () { - if (multiaddrs.length === 0) { - return callback(new Error('failed to connect to every multiaddr')) - } - next(multiaddrs.shift()) - } - - conn.once('connect', () => { - conn.removeListener('error', connectError) + const conn = t.dial(multiaddr, () => { proxyConn.setInnerConn(conn) callback(null, proxyConn) }) @@ -102,15 +81,14 @@ module.exports = function (swarm) { const createListeners = multiaddrs.map((ma) => { return (cb) => { const listener = transport.createListener(handler) - listener.listen(ma, () => { - listener.getAddrs((err, addrs) => { - if (err) { - return cb(err) - } - freshMultiaddrs = freshMultiaddrs.concat(addrs) - transport.listeners.push(listener) - cb() - }) + listener.listen(ma) + listener.getAddrs((err, addrs) => { + if (err) { + return cb(err) + } + freshMultiaddrs = freshMultiaddrs.concat(addrs) + transport.listeners.push(listener) + cb() }) } }) @@ -139,13 +117,7 @@ module.exports = function (swarm) { } function dialables (tp, multiaddrs) { - return tp.filter(multiaddrs.map((addr) => { - // webrtc-star needs the /ipfs/QmHash - if (addr.toString().indexOf('webrtc-star') > 0) { - return addr - } - - return addr - })) + return tp.filter(multiaddrs) } + function noop () {} diff --git a/test/06-conn-upgrade-secio.node.js b/test/06-conn-upgrade-secio.node.js index 6e1ef86..c998509 100644 --- a/test/06-conn-upgrade-secio.node.js +++ b/test/06-conn-upgrade-secio.node.js @@ -8,10 +8,11 @@ const multiaddr = require('multiaddr') const Peer = require('peer-info') const TCP = require('libp2p-tcp') const multiplex = require('libp2p-spdy') +const pull = require('pull-stream') const Swarm = require('../src') -describe.skip('secio conn upgrade (on TCP)', function () { +describe.only('secio conn upgrade (on TCP)', function () { this.timeout(60 * 1000) var swarmA @@ -73,20 +74,17 @@ describe.skip('secio conn upgrade (on TCP)', function () { it('handle + dial on protocol', (done) => { swarmB.handle('/abacaxi/1.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) swarmA.dial(peerB, '/abacaxi/1.0.0', (err, conn) => { expect(err).to.not.exist expect(Object.keys(swarmA.muxedConns).length).to.equal(1) - conn.end() - - conn.on('data', () => {}) // let it flow.. let it flooooow - conn.on('end', done) + pull(pull.empty(), conn, pull.onEnd(done)) }) }) - it.skip('dial to warm conn', (done) => { + it('dial to warm conn', (done) => { swarmB.dial(peerA, (err) => { expect(err).to.not.exist expect(Object.keys(swarmB.conns).length).to.equal(0) @@ -95,20 +93,16 @@ describe.skip('secio conn upgrade (on TCP)', function () { }) }) - it.skip('dial on protocol, reuse warmed conn', (done) => { + it('dial on protocol, reuse warmed conn', (done) => { swarmA.handle('/papaia/1.0.0', (conn) => { - conn.pipe(conn) - conn.on('error', (err) => { throw err }) + pull(conn, conn) }) swarmB.dial(peerA, '/papaia/1.0.0', (err, conn) => { expect(err).to.not.exist expect(Object.keys(swarmB.conns).length).to.equal(0) expect(Object.keys(swarmB.muxedConns).length).to.equal(1) - conn.end() - conn.on('error', (err) => { throw err }) - conn.on('data', () => {}) // let it flow.. let it flooooow - conn.on('end', done) + pull(pull.empty(), conn, pull.onEnd(done)) }) })