From 0f608322c8ea1a21bd6ddaa0c0e9425731a7670b Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Fri, 20 Sep 2019 12:56:18 +0200 Subject: [PATCH] feat: add onStreamEnd, muxer.streams and timeline (#56) BREAKING CHANGE: This adds new validations to the stream muxer, which will cause existing tests to fail. --- README.md | 29 ++++++++++++++++++++++++++++ src/base-test.js | 50 ++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 73 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 59bd9e0..7d69425 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,14 @@ pipe(conn, muxer, conn) // conn is duplex connection to another peer ```js new Mplex(stream => { /* ... */ }) ``` +* `onStreamEnd` - A function called when a stream ends. + ```js + // Get notified when a stream has ended + const onStreamEnd = stream => { + // Manage any tracking changes, etc + } + const muxer = new Muxer({ onStreamEnd, ... }) + ``` * `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g. ```js const controller = new AbortController() @@ -126,6 +134,16 @@ const muxer = new Muxer() muxer.onStream = stream => { /* ... */ } ``` +#### `muxer.onStreamEnd` + +Use this property as an alternative to passing `onStreamEnd` as an option to the `Muxer` constructor. + +```js +const muxer = new Muxer() +// ...later +muxer.onStreamEnd = stream => { /* ... */ } +``` + #### `const stream = muxer.newStream([options])` Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it). @@ -140,6 +158,17 @@ const stream = muxer.newStream() pipe([1, 2, 3], stream, consume) ``` +#### `const streams = muxer.streams` + +The streams property returns an array of streams the muxer currently has open. Closed streams will not be returned. + +```js +muxer.streams.map(stream => { + // Log out the stream's id + console.log(stream.id) +}) +``` + ### Go #### Attach muxer to a Connection diff --git a/src/base-test.js b/src/base-test.js index 01a0e02..f74a64a 100644 --- a/src/base-test.js +++ b/src/base-test.js @@ -8,11 +8,28 @@ const pair = require('it-pair/duplex') const pipe = require('it-pipe') const { collect, map, consume } = require('streaming-iterables') +function close (stream) { + return pipe([], stream, consume) +} + async function closeAndWait (stream) { - await pipe([], stream, consume) + await close(stream) expect(true).to.be.true.mark() } +/** + * A tick is considered valid if it happened between now + * and `ms` milliseconds ago + * @param {number} date Time in ticks + * @param {number} ms max milliseconds that should have expired + * @returns {boolean} + */ +function isValidTick (date, ms = 5000) { + const now = Date.now() + if (date > now - ms && date <= now) return true + return false +} + module.exports = (common) => { describe('base', () => { let Muxer @@ -25,25 +42,44 @@ module.exports = (common) => { const p = pair() const dialer = new Muxer() - const listener = new Muxer(stream => { - expect(stream).to.exist.mark() - closeAndWait(stream) + const listener = new Muxer({ + onStream: stream => { + expect(stream).to.exist.mark() // 1st check + expect(isValidTick(stream.timeline.open)).to.equal(true) + // Make sure the stream is being tracked + expect(listener.streams).to.include(stream) + close(stream) + }, + onStreamEnd: stream => { + expect(stream).to.exist.mark() // 2nd check + expect(listener.streams).to.not.include(stream) + // Make sure the stream is removed from tracking + expect(isValidTick(stream.timeline.close)).to.equal(true) + } }) pipe(p[0], dialer, p[0]) pipe(p[1], listener, p[1]) - expect(3).checks(done) + expect(3).checks(() => { + // ensure we have no streams left + expect(dialer.streams).to.have.length(0) + expect(listener.streams).to.have.length(0) + done() + }) const conn = dialer.newStream() + expect(dialer.streams).to.include(conn) + expect(isValidTick(conn.timeline.open)).to.equal(true) - closeAndWait(conn) + closeAndWait(conn) // 3rd check }) it('Open a stream from the listener', (done) => { const p = pair() const dialer = new Muxer(stream => { expect(stream).to.exist.mark() + expect(isValidTick(stream.timeline.open)).to.equal(true) closeAndWait(stream) }) const listener = new Muxer() @@ -54,6 +90,8 @@ module.exports = (common) => { expect(3).check(done) const conn = listener.newStream() + expect(listener.streams).to.include(conn) + expect(isValidTick(conn.timeline.open)).to.equal(true) closeAndWait(conn) })