From dca4727112ebfda5c9749374741c5715ae088d5f Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 1 Nov 2019 20:10:40 +0100 Subject: [PATCH 1/9] feat: peer-store v0 --- test/peer-store/peer-store.node.js | 68 ++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 test/peer-store/peer-store.node.js diff --git a/test/peer-store/peer-store.node.js b/test/peer-store/peer-store.node.js new file mode 100644 index 0000000000..54b8de36da --- /dev/null +++ b/test/peer-store/peer-store.node.js @@ -0,0 +1,68 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai +const sinon = require('sinon') + +const mergeOptions = require('merge-options') + +const multiaddr = require('multiaddr') +const Libp2p = require('../../src') + +const baseOptions = require('../utils/base-options') +const peerUtils = require('../utils/creators/peer') +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('peer-store on dial', () => { + let peerInfo + let remotePeerInfo + let libp2p + let remoteLibp2p + let remoteAddr + + before(async () => { + [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) + remoteLibp2p = new Libp2p(mergeOptions(baseOptions, { + peerInfo: remotePeerInfo + })) + + await remoteLibp2p.transportManager.listen([listenAddr]) + remoteAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + after(async () => { + sinon.restore() + await remoteLibp2p.stop() + libp2p && await libp2p.stop() + }) + + it('should put the remote peerInfo after dial and emit event', async () => { + // TODO: needs crypto PR fix + // const remoteId = remotePeerInfo.id.toB58String() + const remoteId = peerInfo.id.toB58String() + + libp2p = new Libp2p(mergeOptions(baseOptions, { + peerInfo + })) + + sinon.spy(libp2p.peerStore, 'put') + sinon.spy(libp2p.peerStore, 'add') + sinon.spy(libp2p.peerStore, 'update') + + const connection = await libp2p.dial(remoteAddr) + await connection.close() + + expect(libp2p.peerStore.put.callCount).to.equal(1) + expect(libp2p.peerStore.add.callCount).to.equal(1) + expect(libp2p.peerStore.update.callCount).to.equal(0) + + const storedPeer = libp2p.peerStore.get(remoteId) + expect(storedPeer).to.exist() + }) +}) + +describe('peer-store on discovery', () => { + // TODO: implement with discovery +}) From ec5319a50ea71c42ebab0e9a76f6b16208a5b1eb Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 4 Nov 2019 14:23:23 +0100 Subject: [PATCH 2/9] chore: apply suggestions from code review Co-Authored-By: Jacob Heun --- src/peer-store/index.js | 2 + test/peer-store/peer-store.node.js | 68 ------------------------------ 2 files changed, 2 insertions(+), 68 deletions(-) delete mode 100644 test/peer-store/peer-store.node.js diff --git a/src/peer-store/index.js b/src/peer-store/index.js index 1e91b926f0..af5a40a258 100644 --- a/src/peer-store/index.js +++ b/src/peer-store/index.js @@ -148,6 +148,8 @@ class PeerStore extends EventEmitter { if (!recorded.id.pubKey && peerInfo.id.pubKey) { recorded.id.pubKey = peerInfo.id.pubKey } + + // this.peers.set(id, recorded) } /** diff --git a/test/peer-store/peer-store.node.js b/test/peer-store/peer-store.node.js deleted file mode 100644 index 54b8de36da..0000000000 --- a/test/peer-store/peer-store.node.js +++ /dev/null @@ -1,68 +0,0 @@ -'use strict' -/* eslint-env mocha */ - -const chai = require('chai') -chai.use(require('dirty-chai')) -const { expect } = chai -const sinon = require('sinon') - -const mergeOptions = require('merge-options') - -const multiaddr = require('multiaddr') -const Libp2p = require('../../src') - -const baseOptions = require('../utils/base-options') -const peerUtils = require('../utils/creators/peer') -const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') - -describe('peer-store on dial', () => { - let peerInfo - let remotePeerInfo - let libp2p - let remoteLibp2p - let remoteAddr - - before(async () => { - [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) - remoteLibp2p = new Libp2p(mergeOptions(baseOptions, { - peerInfo: remotePeerInfo - })) - - await remoteLibp2p.transportManager.listen([listenAddr]) - remoteAddr = remoteLibp2p.transportManager.getAddrs()[0] - }) - - after(async () => { - sinon.restore() - await remoteLibp2p.stop() - libp2p && await libp2p.stop() - }) - - it('should put the remote peerInfo after dial and emit event', async () => { - // TODO: needs crypto PR fix - // const remoteId = remotePeerInfo.id.toB58String() - const remoteId = peerInfo.id.toB58String() - - libp2p = new Libp2p(mergeOptions(baseOptions, { - peerInfo - })) - - sinon.spy(libp2p.peerStore, 'put') - sinon.spy(libp2p.peerStore, 'add') - sinon.spy(libp2p.peerStore, 'update') - - const connection = await libp2p.dial(remoteAddr) - await connection.close() - - expect(libp2p.peerStore.put.callCount).to.equal(1) - expect(libp2p.peerStore.add.callCount).to.equal(1) - expect(libp2p.peerStore.update.callCount).to.equal(0) - - const storedPeer = libp2p.peerStore.get(remoteId) - expect(storedPeer).to.exist() - }) -}) - -describe('peer-store on discovery', () => { - // TODO: implement with discovery -}) From 751bc00f1bb3d6c80caac5731c688bb4315c8af2 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 5 Nov 2019 11:46:00 +0100 Subject: [PATCH 3/9] chore: address review --- src/peer-store/index.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/peer-store/index.js b/src/peer-store/index.js index af5a40a258..1e91b926f0 100644 --- a/src/peer-store/index.js +++ b/src/peer-store/index.js @@ -148,8 +148,6 @@ class PeerStore extends EventEmitter { if (!recorded.id.pubKey && peerInfo.id.pubKey) { recorded.id.pubKey = peerInfo.id.pubKey } - - // this.peers.set(id, recorded) } /** From dcd127b7a5f8503827e61f86a87a416ac18fb989 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 22 Oct 2019 14:11:51 +0200 Subject: [PATCH 4/9] refactor: pubsub subsystem --- README.md | 14 +-- package.json | 6 +- src/index.js | 39 ++++--- src/pubsub.js | 126 ++++---------------- src/upgrader.js | 6 +- test/pubsub/configuration.node.js | 93 +++++++++++++++ test/pubsub/implementations.node.js | 101 ++++++++++++++++ test/pubsub/operation.node.js | 174 ++++++++++++++++++++++++++++ test/pubsub/utils.js | 29 +++++ 9 files changed, 457 insertions(+), 131 deletions(-) create mode 100644 test/pubsub/configuration.node.js create mode 100644 test/pubsub/implementations.node.js create mode 100644 test/pubsub/operation.node.js create mode 100644 test/pubsub/utils.js diff --git a/README.md b/README.md index bcd8f540c0..8c868dc4fe 100644 --- a/README.md +++ b/README.md @@ -211,22 +211,18 @@ class Node extends Libp2p { **IMPORTANT NOTE**: All the methods listed in the API section that take a callback are also now Promisified. Libp2p is migrating away from callbacks to async/await, and in a future release (that will be announced in advance), callback support will be removed entirely. You can follow progress of the async/await endeavor at https://github.com/ipfs/js-ipfs/issues/1670. -#### Create a Node - `Libp2p.createLibp2p(options, callback)` +#### Create a Node - `Libp2p.create(options)` > Behaves exactly like `new Libp2p(options)`, but doesn't require a PeerInfo. One will be generated instead ```js -const { createLibp2p } = require('libp2p') -createLibp2p(options, (err, libp2p) => { - if (err) throw err - libp2p.start((err) => { - if (err) throw err - }) -}) +const { create } = require('libp2p') +const libp2p = await create(options) + +await libp2p.start() ``` - `options`: Object of libp2p configuration options -- `callback`: Function with signature `function (Error, Libp2p) {}` #### Create a Node alternative - `new Libp2p(options)` diff --git a/package.json b/package.json index 16986a97ff..7ea9febbb9 100644 --- a/package.json +++ b/package.json @@ -62,6 +62,7 @@ "multiaddr": "^7.1.0", "multistream-select": "^0.15.0", "once": "^1.4.0", + "p-map": "^3.0.0", "p-queue": "^6.1.1", "p-settle": "^3.1.0", "peer-id": "^0.13.3", @@ -90,8 +91,8 @@ "libp2p-bootstrap": "^0.9.7", "libp2p-delegated-content-routing": "^0.2.2", "libp2p-delegated-peer-routing": "^0.2.2", - "libp2p-floodsub": "~0.17.0", - "libp2p-gossipsub": "~0.0.4", + "libp2p-floodsub": "libp2p/js-libp2p-floodsub#refactor/async", + "libp2p-gossipsub": "ChainSafe/gossipsub-js#refactor/async", "libp2p-kad-dht": "^0.15.3", "libp2p-mdns": "^0.12.3", "libp2p-mplex": "^0.9.1", @@ -103,6 +104,7 @@ "lodash.times": "^4.3.2", "nock": "^10.0.6", "p-defer": "^3.0.0", + "p-wait-for": "^3.1.0", "portfinder": "^1.0.20", "pull-goodbye": "0.0.2", "pull-length-prefixed": "^1.3.3", diff --git a/src/index.js b/src/index.js index 45648db0fa..5efcc8febd 100644 --- a/src/index.js +++ b/src/index.js @@ -1,7 +1,7 @@ 'use strict' const FSM = require('fsm-event') -const EventEmitter = require('events').EventEmitter +const { EventEmitter } = require('events') const debug = require('debug') const log = debug('libp2p') log.error = debug('libp2p:error') @@ -9,7 +9,6 @@ const errCode = require('err-code') const promisify = require('promisify-es6') const each = require('async/each') -const nextTick = require('async/nextTick') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') @@ -66,6 +65,8 @@ class Libp2p extends EventEmitter { this._transport = [] // Transport instances/references this._discovery = [] // Discovery service instances/references + this.peerStore = new PeerStore() + // create the switch, and listen for errors this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch) @@ -147,7 +148,7 @@ class Libp2p extends EventEmitter { } // start pubsub - if (this._modules.pubsub && this._config.pubsub.enabled !== false) { + if (this._modules.pubsub) { this.pubsub = pubsub(this, this._modules.pubsub, this._config.pubsub) } @@ -251,6 +252,7 @@ class Libp2p extends EventEmitter { this.state('stop') try { + this.pubsub && await this.pubsub.stop() await this.transportManager.close() await this._switch.stop() } catch (err) { @@ -385,10 +387,16 @@ class Libp2p extends EventEmitter { const multiaddrs = this.peerInfo.multiaddrs.toArray() // Start parallel tasks + const tasks = [ + this.transportManager.listen(multiaddrs) + ] + + if (this._config.pubsub.enabled) { + this.pubsub && this.pubsub.start() + } + try { - await Promise.all([ - this.transportManager.listen(multiaddrs) - ]) + await Promise.all(tasks) } catch (err) { log.error(err) this.emit('error', err) @@ -483,16 +491,15 @@ module.exports = Libp2p * Like `new Libp2p(options)` except it will create a `PeerInfo` * instance if one is not provided in options. * @param {object} options Libp2p configuration options - * @param {function(Error, Libp2p)} callback - * @returns {void} + * @returns {Libp2p} */ -module.exports.createLibp2p = promisify((options, callback) => { +module.exports.create = async (options = {}) => { if (options.peerInfo) { - return nextTick(callback, null, new Libp2p(options)) + return new Libp2p(options) } - PeerInfo.create((err, peerInfo) => { - if (err) return callback(err) - options.peerInfo = peerInfo - callback(null, new Libp2p(options)) - }) -}) + + const peerInfo = await PeerInfo.create() + + options.peerInfo = peerInfo + return new Libp2p(options) +} diff --git a/src/pubsub.js b/src/pubsub.js index 2246a2d1e9..344c2f65af 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -1,52 +1,21 @@ 'use strict' -const nextTick = require('async/nextTick') -const { messages, codes } = require('./errors') -const promisify = require('promisify-es6') - const errCode = require('err-code') +const { messages, codes } = require('./errors') module.exports = (node, Pubsub, config) => { - const pubsub = new Pubsub(node, config) + const pubsub = new Pubsub(node.peerInfo, node.registrar, config) return { /** * Subscribe the given handler to a pubsub topic - * * @param {string} topic * @param {function} handler The handler to subscribe - * @param {object|null} [options] - * @param {function} [callback] An optional callback - * - * @returns {Promise|void} A promise is returned if no callback is provided - * - * @example Subscribe a handler to a topic - * - * // `null` must be passed for options until subscribe is no longer using promisify - * const handler = (message) => { } - * await libp2p.subscribe(topic, handler, null) - * - * @example Use a callback instead of the Promise api - * - * // `options` may be passed or omitted when supplying a callback - * const handler = (message) => { } - * libp2p.subscribe(topic, handler, callback) + * @returns {void} */ - subscribe: (topic, handler, options, callback) => { - // can't use promisify because it thinks the handler is a callback - if (typeof options === 'function') { - callback = options - options = {} - } - + subscribe: (topic, handler) => { if (!node.isStarted() && !pubsub.started) { - const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) - - if (callback) { - return nextTick(() => callback(err)) - } - - return Promise.reject(err) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } if (pubsub.listenerCount(topic) === 0) { @@ -54,46 +23,16 @@ module.exports = (node, Pubsub, config) => { } pubsub.on(topic, handler) - - if (callback) { - return nextTick(() => callback()) - } - - return Promise.resolve() }, /** * Unsubscribes from a pubsub topic - * * @param {string} topic - * @param {function|null} handler The handler to unsubscribe from - * @param {function} [callback] An optional callback - * - * @returns {Promise|void} A promise is returned if no callback is provided - * - * @example Unsubscribe a topic for all handlers - * - * // `null` must be passed until unsubscribe is no longer using promisify - * await libp2p.unsubscribe(topic, null) - * - * @example Unsubscribe a topic for 1 handler - * - * await libp2p.unsubscribe(topic, handler) - * - * @example Use a callback instead of the Promise api - * - * libp2p.unsubscribe(topic, handler, callback) + * @param {function} [handler] The handler to unsubscribe from */ - unsubscribe: (topic, handler, callback) => { - // can't use promisify because it thinks the handler is a callback + unsubscribe: (topic, handler) => { if (!node.isStarted() && !pubsub.started) { - const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) - - if (callback) { - return nextTick(() => callback(err)) - } - - return Promise.reject(err) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } if (!handler) { @@ -105,61 +44,46 @@ module.exports = (node, Pubsub, config) => { if (pubsub.listenerCount(topic) === 0) { pubsub.unsubscribe(topic) } - - if (callback) { - return nextTick(() => callback()) - } - - return Promise.resolve() }, - publish: promisify((topic, data, callback) => { + publish: (topic, data) => { if (!node.isStarted() && !pubsub.started) { - return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } try { data = Buffer.from(data) } catch (err) { - return nextTick(callback, errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID')) + throw errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID') } - pubsub.publish(topic, data, callback) - }), + return pubsub.publish(topic, data) + }, - ls: promisify((callback) => { + getTopics: () => { if (!node.isStarted() && !pubsub.started) { - return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } - const subscriptions = Array.from(pubsub.subscriptions) - - nextTick(() => callback(null, subscriptions)) - }), + return pubsub.getTopics() + }, - peers: promisify((topic, callback) => { + getPeersSubscribed: (topic) => { if (!node.isStarted() && !pubsub.started) { - return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } - if (typeof topic === 'function') { - callback = topic - topic = null - } - - const peers = Array.from(pubsub.peers.values()) - .filter((peer) => topic ? peer.topics.has(topic) : true) - .map((peer) => peer.info.id.toB58String()) - - nextTick(() => callback(null, peers)) - }), + return pubsub.getPeersSubscribed(topic) + }, setMaxListeners (n) { return pubsub.setMaxListeners(n) }, - start: promisify((cb) => pubsub.start(cb)), + _pubsub: pubsub, + + start: () => pubsub.start(), - stop: promisify((cb) => pubsub.stop(cb)) + stop: () => pubsub.stop() } } diff --git a/src/upgrader.js b/src/upgrader.js index 1699451a6d..6890c6f770 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -186,7 +186,7 @@ class Upgrader { const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) log('%s: incoming stream opened on %s', direction, protocol) connection.addStream(stream, protocol) - this._onStream({ connection, stream, protocol }) + this._onStream({ connection, stream, protocol, remotePeer }) } catch (err) { log.error(err) } @@ -254,9 +254,9 @@ class Upgrader { * @param {Stream} options.stream * @param {string} options.protocol */ - _onStream ({ connection, stream, protocol }) { + _onStream ({ connection, stream, protocol, remotePeer }) { const handler = this.protocols.get(protocol) - handler({ connection, stream, protocol }) + handler({ connection, stream, protocol, remotePeer }) } /** diff --git a/test/pubsub/configuration.node.js b/test/pubsub/configuration.node.js new file mode 100644 index 0000000000..7ec68b309e --- /dev/null +++ b/test/pubsub/configuration.node.js @@ -0,0 +1,93 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai + +const mergeOptions = require('merge-options') + +const multiaddr = require('multiaddr') +const PeerInfo = require('peer-info') + +const { create } = require('../../src') +const { baseOptions, subsystemOptions } = require('./utils') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('Pubsub subsystem is configurable', () => { + let libp2p + + afterEach(async () => { + libp2p && await libp2p.stop() + }) + + it('should not exist if no module is provided', async () => { + libp2p = await create(baseOptions) + expect(libp2p.pubsub).to.not.exist() + }) + + it('should exist if the module is provided', async () => { + libp2p = await create(subsystemOptions) + expect(libp2p.pubsub).to.exist() + }) + + it('should start and stop by default once libp2p starts', async () => { + const peerInfo = await PeerInfo.create() + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo + }) + + libp2p = await create(customOptions) + expect(libp2p.pubsub._pubsub.started).to.equal(false) + + await libp2p.start() + expect(libp2p.pubsub._pubsub.started).to.equal(true) + + await libp2p.stop() + expect(libp2p.pubsub._pubsub.started).to.equal(false) + }) + + it('should not start if disabled once libp2p starts', async () => { + const peerInfo = await PeerInfo.create() + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo, + config: { + pubsub: { + enabled: false + } + } + }) + + libp2p = await create(customOptions) + expect(libp2p.pubsub._pubsub.started).to.equal(false) + + await libp2p.start() + expect(libp2p.pubsub._pubsub.started).to.equal(false) + }) + + it('should allow a manual start', async () => { + const peerInfo = await PeerInfo.create() + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo, + config: { + pubsub: { + enabled: false + } + } + }) + + libp2p = await create(customOptions) + await libp2p.start() + expect(libp2p.pubsub._pubsub.started).to.equal(false) + + await libp2p.pubsub.start() + expect(libp2p.pubsub._pubsub.started).to.equal(true) + }) +}) diff --git a/test/pubsub/implementations.node.js b/test/pubsub/implementations.node.js new file mode 100644 index 0000000000..120ee84d2a --- /dev/null +++ b/test/pubsub/implementations.node.js @@ -0,0 +1,101 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai + +const pWaitFor = require('p-wait-for') +const pDefer = require('p-defer') +const mergeOptions = require('merge-options') + +const Floodsub = require('libp2p-floodsub') +const Gossipsub = require('libp2p-gossipsub') +const { multicodec: floodsubMulticodec } = require('libp2p-floodsub') +const { multicodec: gossipsubMulticodec } = require('libp2p-gossipsub') + +const multiaddr = require('multiaddr') +const PeerInfo = require('peer-info') + +const { create } = require('../../src') +const { baseOptions } = require('./utils') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') +const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('Pubsub subsystem is able to use different implementations', () => { + let peerInfo, remotePeerInfo + let libp2p, remoteLibp2p + let remAddr + + beforeEach(async () => { + [peerInfo, remotePeerInfo] = await Promise.all([ + PeerInfo.create(), + PeerInfo.create() + ]) + + peerInfo.multiaddrs.add(listenAddr) + remotePeerInfo.multiaddrs.add(remoteListenAddr) + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + it('Floodsub nodes', () => { + return pubsubTest(floodsubMulticodec, Floodsub) + }) + + it('Gossipsub nodes', () => { + return pubsubTest(gossipsubMulticodec, Gossipsub) + }) + + const pubsubTest = async (multicodec, pubsub) => { + const defer = pDefer() + const topic = 'test-topic' + const data = 'hey!' + + libp2p = await create(mergeOptions(baseOptions, { + peerInfo, + modules: { + pubsub: pubsub + } + })) + + remoteLibp2p = await create(mergeOptions(baseOptions, { + peerInfo: remotePeerInfo, + modules: { + pubsub: pubsub + } + })) + + await Promise.all([ + libp2p.start(), + remoteLibp2p.start() + ]) + + const libp2pId = libp2p.peerInfo.id.toB58String() + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + + const connection = await libp2p.dialProtocol(remAddr, multicodec) + + await new Promise((resolve) => setTimeout(resolve, 1000)) + expect(connection).to.exist() + + libp2p.pubsub.subscribe(topic, (msg) => { + expect(msg.data.toString()).to.equal(data) + defer.resolve() + }) + + // wait for remoteLibp2p to know about libp2p subscription + await pWaitFor(() => { + const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic) + return subscribedPeers.includes(libp2pId) + }) + await new Promise((resolve) => setTimeout(resolve, 1000)) + remoteLibp2p.pubsub.publish(topic, data) + + await defer.promise + } +}) diff --git a/test/pubsub/operation.node.js b/test/pubsub/operation.node.js new file mode 100644 index 0000000000..b52d6c1302 --- /dev/null +++ b/test/pubsub/operation.node.js @@ -0,0 +1,174 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai +const sinon = require('sinon') + +const pWaitFor = require('p-wait-for') +const pDefer = require('p-defer') +const mergeOptions = require('merge-options') + +const multiaddr = require('multiaddr') +const PeerInfo = require('peer-info') + +const { create } = require('../../src') +const { subsystemOptions, subsystemMulticodecs } = require('./utils') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') +const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('Pubsub subsystem operates correctly', () => { + let peerInfo, remotePeerInfo + let libp2p, remoteLibp2p + let remAddr + + beforeEach(async () => { + [peerInfo, remotePeerInfo] = await Promise.all([ + PeerInfo.create(), + PeerInfo.create() + ]) + + peerInfo.multiaddrs.add(listenAddr) + remotePeerInfo.multiaddrs.add(remoteListenAddr) + }) + + describe('pubsub started before connect', () => { + beforeEach(async () => { + libp2p = await create(mergeOptions(subsystemOptions, { + peerInfo + })) + + remoteLibp2p = await create(mergeOptions(subsystemOptions, { + peerInfo: remotePeerInfo + })) + + await libp2p.start() + await remoteLibp2p.start() + + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + afterEach(() => { + sinon.restore() + }) + + it('should get notified of connected peers on dial', async () => { + sinon.spy(libp2p.registrar, 'onConnect') + sinon.spy(remoteLibp2p.registrar, 'onConnect') + + const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs) + + expect(connection).to.exist() + expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(1) + expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(1) + + expect(libp2p.registrar.onConnect.callCount).to.equal(1) + expect(remoteLibp2p.registrar.onConnect.callCount).to.equal(1) + }) + + it('should receive pubsub messages', async () => { + const defer = pDefer() + const topic = 'test-topic' + const data = 'hey!' + const libp2pId = libp2p.peerInfo.id.toB58String() + + await libp2p.dialProtocol(remAddr, subsystemMulticodecs) + + let subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.not.include(topic) + + libp2p.pubsub.subscribe(topic, (msg) => { + expect(msg.data.toString()).to.equal(data) + defer.resolve() + }) + + subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.include(topic) + + // wait for remoteLibp2p to know about libp2p subscription + await pWaitFor(() => { + const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic) + return subscribedPeers.includes(libp2pId) + }) + remoteLibp2p.pubsub.publish(topic, data) + + await defer.promise + }) + }) + + // TODO: Needs identify push + describe.skip('pubsub started after connect', () => { + beforeEach(async () => { + libp2p = await create(mergeOptions(subsystemOptions, { + peerInfo + })) + + remoteLibp2p = await create(mergeOptions(subsystemOptions, { + peerInfo: remotePeerInfo, + config: { + pubsub: { + enabled: false + } + } + })) + + await libp2p.start() + await remoteLibp2p.start() + + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + afterEach(() => { + sinon.restore() + }) + + it.skip('should get notified of connected peers after starting', async () => { + const connection = await libp2p.dial(remAddr) + + expect(connection).to.exist() + expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(0) + expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(0) + + remoteLibp2p.pubsub.start() + + // Wait for + // Validate + expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(1) + expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(1) + }) + + it.skip('should receive pubsub messages', async () => { + const defer = pDefer() + const topic = 'test-topic' + const data = 'hey!' + + await libp2p.dial(remAddr) + + remoteLibp2p.pubsub.start() + + // TODO: wait for + + libp2p.pubsub.subscribe(topic) + libp2p.pubsub.once(topic, (msg) => { + expect(msg.data.toString()).to.equal(data) + defer.resolve() + }) + + libp2p.pubsub.publish(topic, data) + + await defer.promise + }) + }) +}) diff --git a/test/pubsub/utils.js b/test/pubsub/utils.js new file mode 100644 index 0000000000..11495c5df8 --- /dev/null +++ b/test/pubsub/utils.js @@ -0,0 +1,29 @@ +'use strict' + +const Gossipsub = require('libp2p-gossipsub') +const { multicodec } = require('libp2p-gossipsub') +const Crypto = require('../../src/insecure/plaintext') +const Muxer = require('libp2p-mplex') +const Transport = require('libp2p-tcp') + +const mergeOptions = require('merge-options') + +const baseOptions = { + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } +} + +module.exports.baseOptions = baseOptions + +const subsystemOptions = mergeOptions(baseOptions, { + modules: { + pubsub: Gossipsub + } +}) + +module.exports.subsystemOptions = subsystemOptions + +module.exports.subsystemMulticodecs = [multicodec] From cdfd02306fd793c1997e481b82363fd315b40626 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 6 Nov 2019 18:03:57 +0100 Subject: [PATCH 5/9] chore: address review --- src/pubsub.js | 15 +++++++++++++++ test/pubsub/configuration.node.js | 9 ++++----- test/pubsub/implementations.node.js | 12 +++--------- test/pubsub/operation.node.js | 14 ++++++-------- 4 files changed, 28 insertions(+), 22 deletions(-) diff --git a/src/pubsub.js b/src/pubsub.js index 344c2f65af..2c067d993d 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -46,6 +46,12 @@ module.exports = (node, Pubsub, config) => { } }, + /** + * Publish messages to the given topics. + * @param {Array|string} topic + * @param {Buffer} data + * @returns {Promise} + */ publish: (topic, data) => { if (!node.isStarted() && !pubsub.started) { throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) @@ -60,6 +66,10 @@ module.exports = (node, Pubsub, config) => { return pubsub.publish(topic, data) }, + /** + * Get a list of topics the node is subscribed to. + * @returns {Array} topics + */ getTopics: () => { if (!node.isStarted() && !pubsub.started) { throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) @@ -68,6 +78,11 @@ module.exports = (node, Pubsub, config) => { return pubsub.getTopics() }, + /** + * Get a list of the peer-ids that are subscribed to one topic. + * @param {string} topic + * @returns {Array} + */ getPeersSubscribed: (topic) => { if (!node.isStarted() && !pubsub.started) { throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) diff --git a/test/pubsub/configuration.node.js b/test/pubsub/configuration.node.js index 7ec68b309e..829e303f3d 100644 --- a/test/pubsub/configuration.node.js +++ b/test/pubsub/configuration.node.js @@ -6,12 +6,11 @@ chai.use(require('dirty-chai')) const { expect } = chai const mergeOptions = require('merge-options') - const multiaddr = require('multiaddr') -const PeerInfo = require('peer-info') const { create } = require('../../src') const { baseOptions, subsystemOptions } = require('./utils') +const peerUtils = require('../utils/creators/peer') const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') @@ -33,7 +32,7 @@ describe('Pubsub subsystem is configurable', () => { }) it('should start and stop by default once libp2p starts', async () => { - const peerInfo = await PeerInfo.create() + const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1) peerInfo.multiaddrs.add(listenAddr) const customOptions = mergeOptions(subsystemOptions, { @@ -51,7 +50,7 @@ describe('Pubsub subsystem is configurable', () => { }) it('should not start if disabled once libp2p starts', async () => { - const peerInfo = await PeerInfo.create() + const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1) peerInfo.multiaddrs.add(listenAddr) const customOptions = mergeOptions(subsystemOptions, { @@ -71,7 +70,7 @@ describe('Pubsub subsystem is configurable', () => { }) it('should allow a manual start', async () => { - const peerInfo = await PeerInfo.create() + const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1) peerInfo.multiaddrs.add(listenAddr) const customOptions = mergeOptions(subsystemOptions, { diff --git a/test/pubsub/implementations.node.js b/test/pubsub/implementations.node.js index 120ee84d2a..cb2ee9f217 100644 --- a/test/pubsub/implementations.node.js +++ b/test/pubsub/implementations.node.js @@ -15,10 +15,10 @@ const { multicodec: floodsubMulticodec } = require('libp2p-floodsub') const { multicodec: gossipsubMulticodec } = require('libp2p-gossipsub') const multiaddr = require('multiaddr') -const PeerInfo = require('peer-info') const { create } = require('../../src') const { baseOptions } = require('./utils') +const peerUtils = require('../utils/creators/peer') const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') @@ -29,10 +29,7 @@ describe('Pubsub subsystem is able to use different implementations', () => { let remAddr beforeEach(async () => { - [peerInfo, remotePeerInfo] = await Promise.all([ - PeerInfo.create(), - PeerInfo.create() - ]) + [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) peerInfo.multiaddrs.add(listenAddr) remotePeerInfo.multiaddrs.add(remoteListenAddr) @@ -79,8 +76,6 @@ describe('Pubsub subsystem is able to use different implementations', () => { remAddr = remoteLibp2p.transportManager.getAddrs()[0] const connection = await libp2p.dialProtocol(remAddr, multicodec) - - await new Promise((resolve) => setTimeout(resolve, 1000)) expect(connection).to.exist() libp2p.pubsub.subscribe(topic, (msg) => { @@ -93,9 +88,8 @@ describe('Pubsub subsystem is able to use different implementations', () => { const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic) return subscribedPeers.includes(libp2pId) }) - await new Promise((resolve) => setTimeout(resolve, 1000)) - remoteLibp2p.pubsub.publish(topic, data) + remoteLibp2p.pubsub.publish(topic, data) await defer.promise } }) diff --git a/test/pubsub/operation.node.js b/test/pubsub/operation.node.js index b52d6c1302..b7e5a77b2d 100644 --- a/test/pubsub/operation.node.js +++ b/test/pubsub/operation.node.js @@ -9,12 +9,11 @@ const sinon = require('sinon') const pWaitFor = require('p-wait-for') const pDefer = require('p-defer') const mergeOptions = require('merge-options') - const multiaddr = require('multiaddr') -const PeerInfo = require('peer-info') const { create } = require('../../src') const { subsystemOptions, subsystemMulticodecs } = require('./utils') +const peerUtils = require('../utils/creators/peer') const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') @@ -25,10 +24,7 @@ describe('Pubsub subsystem operates correctly', () => { let remAddr beforeEach(async () => { - [peerInfo, remotePeerInfo] = await Promise.all([ - PeerInfo.create(), - PeerInfo.create() - ]) + [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) peerInfo.multiaddrs.add(listenAddr) remotePeerInfo.multiaddrs.add(remoteListenAddr) @@ -44,8 +40,10 @@ describe('Pubsub subsystem operates correctly', () => { peerInfo: remotePeerInfo })) - await libp2p.start() - await remoteLibp2p.start() + await Promise.all([ + libp2p.start(), + remoteLibp2p.start() + ]) remAddr = remoteLibp2p.transportManager.getAddrs()[0] }) From 4cc97364856eec2482a0db3d7517c6f940ef87db Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 12 Nov 2019 15:34:55 +0100 Subject: [PATCH 6/9] chore: use topology interface --- package.json | 4 +- src/connection-manager/topology.js | 108 ----------------------------- src/registrar.js | 12 +--- test/registrar/registrar.spec.js | 77 ++++---------------- 4 files changed, 19 insertions(+), 182 deletions(-) delete mode 100644 src/connection-manager/topology.js diff --git a/package.json b/package.json index 7ea9febbb9..42bb04b768 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,7 @@ "it-protocol-buffers": "^0.2.0", "latency-monitor": "~0.2.1", "libp2p-crypto": "^0.17.1", - "libp2p-interfaces": "^0.1.3", + "libp2p-interfaces": "^0.1.4", "mafmt": "^7.0.0", "merge-options": "^1.0.1", "moving-average": "^1.0.0", @@ -91,7 +91,7 @@ "libp2p-bootstrap": "^0.9.7", "libp2p-delegated-content-routing": "^0.2.2", "libp2p-delegated-peer-routing": "^0.2.2", - "libp2p-floodsub": "libp2p/js-libp2p-floodsub#refactor/async", + "libp2p-floodsub": "^0.19.0", "libp2p-gossipsub": "ChainSafe/gossipsub-js#refactor/async", "libp2p-kad-dht": "^0.15.3", "libp2p-mdns": "^0.12.3", diff --git a/src/connection-manager/topology.js b/src/connection-manager/topology.js deleted file mode 100644 index 2c2a877919..0000000000 --- a/src/connection-manager/topology.js +++ /dev/null @@ -1,108 +0,0 @@ -'use strict' - -const assert = require('assert') - -class Topology { - /** - * @param {Object} props - * @param {number} props.min minimum needed connections (default: 0) - * @param {number} props.max maximum needed connections (default: Infinity) - * @param {Array} props.multicodecs protocol multicodecs - * @param {Object} props.handlers - * @param {function} props.handlers.onConnect protocol "onConnect" handler - * @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler - * @constructor - */ - constructor ({ - min = 0, - max = Infinity, - multicodecs, - handlers - }) { - assert(multicodecs, 'one or more multicodec should be provided') - assert(handlers, 'the handlers should be provided') - assert(handlers.onConnect && typeof handlers.onConnect === 'function', - 'the \'onConnect\' handler must be provided') - assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function', - 'the \'onDisconnect\' handler must be provided') - - this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs] - this.min = min - this.max = max - - // Handlers - this._onConnect = handlers.onConnect - this._onDisconnect = handlers.onDisconnect - - this.peers = new Map() - this._registrar = undefined - - this._onProtocolChange = this._onProtocolChange.bind(this) - } - - set registrar (registrar) { - this._registrar = registrar - this._registrar.peerStore.on('change:protocols', this._onProtocolChange) - - // Update topology peers - this._updatePeers(this._registrar.peerStore.peers.values()) - } - - /** - * Update topology. - * @param {Array} peerInfoIterable - * @returns {void} - */ - _updatePeers (peerInfoIterable) { - for (const peerInfo of peerInfoIterable) { - if (this.multicodecs.filter(multicodec => peerInfo.protocols.has(multicodec))) { - // Add the peer regardless of whether or not there is currently a connection - this.peers.set(peerInfo.id.toB58String(), peerInfo) - // If there is a connection, call _onConnect - const connection = this._registrar.getConnection(peerInfo) - connection && this._onConnect(peerInfo, connection) - } else { - // Remove any peers we might be tracking that are no longer of value to us - this.peers.delete(peerInfo.id.toB58String()) - } - } - } - - /** - * Notify protocol of peer disconnected. - * @param {PeerInfo} peerInfo - * @param {Error} [error] - * @returns {void} - */ - disconnect (peerInfo, error) { - this._onDisconnect(peerInfo, error) - } - - /** - * Check if a new peer support the multicodecs for this topology. - * @param {Object} props - * @param {PeerInfo} props.peerInfo - * @param {Array} props.protocols - */ - _onProtocolChange ({ peerInfo, protocols }) { - const existingPeer = this.peers.get(peerInfo.id.toB58String()) - const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol)) - - // Not supporting the protocol anymore? - if (existingPeer && hasProtocol.length === 0) { - this._onDisconnect({ - peerInfo - }) - } - - // New to protocol support - for (const protocol of protocols) { - if (this.multicodecs.includes(protocol)) { - this._updatePeers([peerInfo]) - return - } - } - } -} - -module.exports = Topology diff --git a/src/registrar.js b/src/registrar.js index c6e4439c00..98d2f9787e 100644 --- a/src/registrar.js +++ b/src/registrar.js @@ -7,7 +7,6 @@ log.error = debug('libp2p:peer-store:error') const { Connection } = require('libp2p-interfaces/src/connection') const PeerInfo = require('peer-info') -const Toplogy = require('./connection-manager/topology') /** * Responsible for notifying registered protocols of events in the network. @@ -106,17 +105,12 @@ class Registrar { /** * Register handlers for a set of multicodecs given - * @param {Object} topologyProps properties for topology - * @param {Array|string} topologyProps.multicodecs - * @param {Object} topologyProps.handlers - * @param {function} topologyProps.handlers.onConnect - * @param {function} topologyProps.handlers.onDisconnect + * @param {Topology} topology protocol topology * @return {string} registrar identifier */ - register (topologyProps) { - // Create multicodec topology + register (topology) { + // Create topology const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() - const topology = new Toplogy(topologyProps) this.topologies.set(id, topology) diff --git a/test/registrar/registrar.spec.js b/test/registrar/registrar.spec.js index ec7d1b6189..e1540cfd3c 100644 --- a/test/registrar/registrar.spec.js +++ b/test/registrar/registrar.spec.js @@ -7,6 +7,7 @@ const { expect } = chai const pDefer = require('p-defer') const PeerInfo = require('peer-info') +const Topology = require('libp2p-interfaces/src/topology/multicodec-topology') const PeerStore = require('../../src/peer-store') const Registrar = require('../../src/registrar') const { createMockConnection } = require('./utils') @@ -32,53 +33,7 @@ describe('registrar', () => { throw new Error('should fail to register a protocol if no multicodec is provided') }) - it('should fail to register a protocol if no handlers are provided', () => { - const topologyProps = { - multicodecs: multicodec - } - - try { - registrar.register(topologyProps) - } catch (err) { - expect(err).to.exist() - return - } - throw new Error('should fail to register a protocol if no handlers are provided') - }) - - it('should fail to register a protocol if the onConnect handler is not provided', () => { - const topologyProps = { - multicodecs: multicodec, - handlers: { - onDisconnect: () => { } - } - } - - try { - registrar.register(topologyProps) - } catch (err) { - expect(err).to.exist() - return - } - throw new Error('should fail to register a protocol if the onConnect handler is not provided') - }) - - it('should fail to register a protocol if the onDisconnect handler is not provided', () => { - const topologyProps = { - multicodecs: multicodec, - handlers: { - onConnect: () => { } - } - } - - try { - registrar.register(topologyProps) - } catch (err) { - expect(err).to.exist() - return - } - throw new Error('should fail to register a protocol if the onDisconnect handler is not provided') - }) + // TODO: not valid topology }) describe('registration', () => { @@ -88,13 +43,13 @@ describe('registrar', () => { }) it('should be able to register a protocol', () => { - const topologyProps = { + const topologyProps = new Topology({ + multicodecs: multicodec, handlers: { onConnect: () => { }, onDisconnect: () => { } - }, - multicodecs: multicodec - } + } + }) const identifier = registrar.register(topologyProps) @@ -102,13 +57,13 @@ describe('registrar', () => { }) it('should be able to unregister a protocol', () => { - const topologyProps = { + const topologyProps = new Topology({ + multicodecs: multicodec, handlers: { onConnect: () => { }, onDisconnect: () => { } - }, - multicodecs: multicodec - } + } + }) const identifier = registrar.register(topologyProps) const success = registrar.unregister(identifier) @@ -138,7 +93,7 @@ describe('registrar', () => { registrar.onConnect(remotePeerInfo, conn) expect(registrar.connections.size).to.eql(1) - const topologyProps = { + const topologyProps = new Topology({ multicodecs: multicodec, handlers: { onConnect: (peerInfo, connection) => { @@ -153,7 +108,7 @@ describe('registrar', () => { onDisconnectDefer.resolve() } } - } + }) // Register protocol const identifier = registrar.register(topologyProps) @@ -161,11 +116,9 @@ describe('registrar', () => { // Topology created expect(topology).to.exist() - expect(topology.peers.size).to.eql(1) registrar.onDisconnect(remotePeerInfo) expect(registrar.connections.size).to.eql(0) - expect(topology.peers.size).to.eql(1) // topology should keep the peer // Wait for handlers to be called return Promise.all([ @@ -178,7 +131,7 @@ describe('registrar', () => { const onConnectDefer = pDefer() const onDisconnectDefer = pDefer() - const topologyProps = { + const topologyProps = new Topology({ multicodecs: multicodec, handlers: { onConnect: () => { @@ -188,7 +141,7 @@ describe('registrar', () => { onDisconnectDefer.resolve() } } - } + }) // Register protocol const identifier = registrar.register(topologyProps) @@ -196,7 +149,6 @@ describe('registrar', () => { // Topology created expect(topology).to.exist() - expect(topology.peers.size).to.eql(0) expect(registrar.connections.size).to.eql(0) // Setup connections before registrar @@ -212,7 +164,6 @@ describe('registrar', () => { peerStore.put(peerInfo) await onConnectDefer.promise - expect(topology.peers.size).to.eql(1) // Remove protocol to peer and update it peerInfo.protocols.delete(multicodec) From 340edf53e3fba30e5df98aaf5be212983e776ba1 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 15 Nov 2019 15:15:12 +0100 Subject: [PATCH 7/9] chore: address review --- package.json | 2 +- src/registrar.js | 7 +++++++ src/upgrader.js | 6 +++--- test/pubsub/operation.node.js | 32 ++++++++++++++++++++++---------- test/registrar/registrar.spec.js | 13 ++++++++++++- 5 files changed, 45 insertions(+), 15 deletions(-) diff --git a/package.json b/package.json index 42bb04b768..6a1d81a609 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,7 @@ "it-protocol-buffers": "^0.2.0", "latency-monitor": "~0.2.1", "libp2p-crypto": "^0.17.1", - "libp2p-interfaces": "^0.1.4", + "libp2p-interfaces": "^0.1.5", "mafmt": "^7.0.0", "merge-options": "^1.0.1", "moving-average": "^1.0.0", diff --git a/src/registrar.js b/src/registrar.js index 98d2f9787e..d777458ecf 100644 --- a/src/registrar.js +++ b/src/registrar.js @@ -5,6 +5,8 @@ const debug = require('debug') const log = debug('libp2p:peer-store') log.error = debug('libp2p:peer-store:error') +const Topology = require('libp2p-interfaces/src/topology') +const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology') const { Connection } = require('libp2p-interfaces/src/connection') const PeerInfo = require('peer-info') @@ -109,6 +111,11 @@ class Registrar { * @return {string} registrar identifier */ register (topology) { + assert( + Topology.isTopology(topology) || + MulticodecTopology.isMulticodecTopology(topology), + 'topology must be an instance of interfaces/topology') + // Create topology const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() diff --git a/src/upgrader.js b/src/upgrader.js index 6890c6f770..1699451a6d 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -186,7 +186,7 @@ class Upgrader { const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) log('%s: incoming stream opened on %s', direction, protocol) connection.addStream(stream, protocol) - this._onStream({ connection, stream, protocol, remotePeer }) + this._onStream({ connection, stream, protocol }) } catch (err) { log.error(err) } @@ -254,9 +254,9 @@ class Upgrader { * @param {Stream} options.stream * @param {string} options.protocol */ - _onStream ({ connection, stream, protocol, remotePeer }) { + _onStream ({ connection, stream, protocol }) { const handler = this.protocols.get(protocol) - handler({ connection, stream, protocol, remotePeer }) + handler({ connection, stream, protocol }) } /** diff --git a/test/pubsub/operation.node.js b/test/pubsub/operation.node.js index b7e5a77b2d..bec41a3025 100644 --- a/test/pubsub/operation.node.js +++ b/test/pubsub/operation.node.js @@ -101,8 +101,7 @@ describe('Pubsub subsystem operates correctly', () => { }) }) - // TODO: Needs identify push - describe.skip('pubsub started after connect', () => { + describe('pubsub started after connect', () => { beforeEach(async () => { libp2p = await create(mergeOptions(subsystemOptions, { peerInfo @@ -132,7 +131,7 @@ describe('Pubsub subsystem operates correctly', () => { sinon.restore() }) - it.skip('should get notified of connected peers after starting', async () => { + it('should get notified of connected peers after starting', async () => { const connection = await libp2p.dial(remAddr) expect(connection).to.exist() @@ -141,14 +140,16 @@ describe('Pubsub subsystem operates correctly', () => { remoteLibp2p.pubsub.start() - // Wait for - // Validate + await pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1) + expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(1) expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(1) }) - it.skip('should receive pubsub messages', async () => { + it('should receive pubsub messages', async function () { + this.timeout(10e3) const defer = pDefer() + const libp2pId = libp2p.peerInfo.id.toB58String() const topic = 'test-topic' const data = 'hey!' @@ -156,15 +157,26 @@ describe('Pubsub subsystem operates correctly', () => { remoteLibp2p.pubsub.start() - // TODO: wait for + await pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1) + + let subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.not.include(topic) - libp2p.pubsub.subscribe(topic) - libp2p.pubsub.once(topic, (msg) => { + libp2p.pubsub.subscribe(topic, (msg) => { expect(msg.data.toString()).to.equal(data) defer.resolve() }) - libp2p.pubsub.publish(topic, data) + subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.include(topic) + + // wait for remoteLibp2p to know about libp2p subscription + await pWaitFor(() => { + const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic) + return subscribedPeers.includes(libp2pId) + }) + + remoteLibp2p.pubsub.publish(topic, data) await defer.promise }) diff --git a/test/registrar/registrar.spec.js b/test/registrar/registrar.spec.js index e1540cfd3c..9114e03590 100644 --- a/test/registrar/registrar.spec.js +++ b/test/registrar/registrar.spec.js @@ -33,7 +33,18 @@ describe('registrar', () => { throw new Error('should fail to register a protocol if no multicodec is provided') }) - // TODO: not valid topology + it('should fail to register a protocol if an invalid topology is provided', () => { + const fakeTopology = { + random: 1 + } + try { + registrar.register() + } catch (err) { + expect(err).to.exist(fakeTopology) + return + } + throw new Error('should fail to register a protocol if an invalid topology is provided') + }) }) describe('registration', () => { From 6240dc04724f55c02f1830ef578ed2b090f3a05d Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 15 Nov 2019 15:35:43 +0100 Subject: [PATCH 8/9] chore: address review --- package.json | 2 +- src/registrar.js | 4 +--- test/pubsub/operation.node.js | 5 ++++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/package.json b/package.json index 6a1d81a609..1ebd91a925 100644 --- a/package.json +++ b/package.json @@ -92,7 +92,7 @@ "libp2p-delegated-content-routing": "^0.2.2", "libp2p-delegated-peer-routing": "^0.2.2", "libp2p-floodsub": "^0.19.0", - "libp2p-gossipsub": "ChainSafe/gossipsub-js#refactor/async", + "libp2p-gossipsub": "ChainSafe/gossipsub-js#beta/async", "libp2p-kad-dht": "^0.15.3", "libp2p-mdns": "^0.12.3", "libp2p-mplex": "^0.9.1", diff --git a/src/registrar.js b/src/registrar.js index d777458ecf..c4a116806b 100644 --- a/src/registrar.js +++ b/src/registrar.js @@ -6,7 +6,6 @@ const log = debug('libp2p:peer-store') log.error = debug('libp2p:peer-store:error') const Topology = require('libp2p-interfaces/src/topology') -const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology') const { Connection } = require('libp2p-interfaces/src/connection') const PeerInfo = require('peer-info') @@ -112,8 +111,7 @@ class Registrar { */ register (topology) { assert( - Topology.isTopology(topology) || - MulticodecTopology.isMulticodecTopology(topology), + Topology.isTopology(topology), 'topology must be an instance of interfaces/topology') // Create topology diff --git a/test/pubsub/operation.node.js b/test/pubsub/operation.node.js index bec41a3025..7cf6f45577 100644 --- a/test/pubsub/operation.node.js +++ b/test/pubsub/operation.node.js @@ -140,7 +140,10 @@ describe('Pubsub subsystem operates correctly', () => { remoteLibp2p.pubsub.start() - await pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1) + await pWaitFor(() => + libp2p.pubsub._pubsub.peers.size === 1 && + remoteLibp2p.pubsub._pubsub.peers.size === 1 + ) expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(1) expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(1) From 9b9ac54717e4073c2c1d5bd882a455f2612a9ab1 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 15 Nov 2019 16:26:09 +0100 Subject: [PATCH 9/9] chore: simplify tests --- test/pubsub/operation.node.js | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/test/pubsub/operation.node.js b/test/pubsub/operation.node.js index 7cf6f45577..5ffe3f34ab 100644 --- a/test/pubsub/operation.node.js +++ b/test/pubsub/operation.node.js @@ -58,17 +58,14 @@ describe('Pubsub subsystem operates correctly', () => { }) it('should get notified of connected peers on dial', async () => { - sinon.spy(libp2p.registrar, 'onConnect') - sinon.spy(remoteLibp2p.registrar, 'onConnect') - const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs) expect(connection).to.exist() - expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(1) - expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(1) - expect(libp2p.registrar.onConnect.callCount).to.equal(1) - expect(remoteLibp2p.registrar.onConnect.callCount).to.equal(1) + return Promise.all([ + pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1), + pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1) + ]) }) it('should receive pubsub messages', async () => { @@ -140,13 +137,10 @@ describe('Pubsub subsystem operates correctly', () => { remoteLibp2p.pubsub.start() - await pWaitFor(() => - libp2p.pubsub._pubsub.peers.size === 1 && - remoteLibp2p.pubsub._pubsub.peers.size === 1 - ) - - expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(1) - expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(1) + return Promise.all([ + pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1), + pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1) + ]) }) it('should receive pubsub messages', async function () { @@ -160,7 +154,10 @@ describe('Pubsub subsystem operates correctly', () => { remoteLibp2p.pubsub.start() - await pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1) + await Promise.all([ + pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1), + pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1) + ]) let subscribedTopics = libp2p.pubsub.getTopics() expect(subscribedTopics).to.not.include(topic)