Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

Commit

Permalink
feat: add onStreamEnd, muxer.streams and timeline (#56)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: This adds new validations to the stream muxer, which will cause existing tests to fail.
  • Loading branch information
jacobheun committed Sep 20, 2019
1 parent d908f8a commit 0f60832
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 6 deletions.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ pipe(conn, muxer, conn) // conn is duplex connection to another peer
```js
new Mplex(stream => { /* ... */ })
```
* `onStreamEnd` - A function called when a stream ends.
```js
// Get notified when a stream has ended
const onStreamEnd = stream => {
// Manage any tracking changes, etc
}
const muxer = new Muxer({ onStreamEnd, ... })
```
* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g.
```js
const controller = new AbortController()
Expand All @@ -126,6 +134,16 @@ const muxer = new Muxer()
muxer.onStream = stream => { /* ... */ }
```
#### `muxer.onStreamEnd`
Use this property as an alternative to passing `onStreamEnd` as an option to the `Muxer` constructor.
```js
const muxer = new Muxer()
// ...later
muxer.onStreamEnd = stream => { /* ... */ }
```
#### `const stream = muxer.newStream([options])`
Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it).
Expand All @@ -140,6 +158,17 @@ const stream = muxer.newStream()
pipe([1, 2, 3], stream, consume)
```
#### `const streams = muxer.streams`
The streams property returns an array of streams the muxer currently has open. Closed streams will not be returned.
```js
muxer.streams.map(stream => {
// Log out the stream's id
console.log(stream.id)
})
```
### Go
#### Attach muxer to a Connection
Expand Down
50 changes: 44 additions & 6 deletions src/base-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,28 @@ const pair = require('it-pair/duplex')
const pipe = require('it-pipe')
const { collect, map, consume } = require('streaming-iterables')

function close (stream) {
return pipe([], stream, consume)
}

async function closeAndWait (stream) {
await pipe([], stream, consume)
await close(stream)
expect(true).to.be.true.mark()
}

/**
* A tick is considered valid if it happened between now
* and `ms` milliseconds ago
* @param {number} date Time in ticks
* @param {number} ms max milliseconds that should have expired
* @returns {boolean}
*/
function isValidTick (date, ms = 5000) {
const now = Date.now()
if (date > now - ms && date <= now) return true
return false
}

module.exports = (common) => {
describe('base', () => {
let Muxer
Expand All @@ -25,25 +42,44 @@ module.exports = (common) => {
const p = pair()
const dialer = new Muxer()

const listener = new Muxer(stream => {
expect(stream).to.exist.mark()
closeAndWait(stream)
const listener = new Muxer({
onStream: stream => {
expect(stream).to.exist.mark() // 1st check
expect(isValidTick(stream.timeline.open)).to.equal(true)
// Make sure the stream is being tracked
expect(listener.streams).to.include(stream)
close(stream)
},
onStreamEnd: stream => {
expect(stream).to.exist.mark() // 2nd check
expect(listener.streams).to.not.include(stream)
// Make sure the stream is removed from tracking
expect(isValidTick(stream.timeline.close)).to.equal(true)
}
})

pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])

expect(3).checks(done)
expect(3).checks(() => {
// ensure we have no streams left
expect(dialer.streams).to.have.length(0)
expect(listener.streams).to.have.length(0)
done()
})

const conn = dialer.newStream()
expect(dialer.streams).to.include(conn)
expect(isValidTick(conn.timeline.open)).to.equal(true)

closeAndWait(conn)
closeAndWait(conn) // 3rd check
})

it('Open a stream from the listener', (done) => {
const p = pair()
const dialer = new Muxer(stream => {
expect(stream).to.exist.mark()
expect(isValidTick(stream.timeline.open)).to.equal(true)
closeAndWait(stream)
})
const listener = new Muxer()
Expand All @@ -54,6 +90,8 @@ module.exports = (common) => {
expect(3).check(done)

const conn = listener.newStream()
expect(listener.streams).to.include(conn)
expect(isValidTick(conn.timeline.open)).to.equal(true)

closeAndWait(conn)
})
Expand Down

0 comments on commit 0f60832

Please sign in to comment.