Skip to content

Commit

Permalink
Added WebSocket support for getUpstream() (#275)
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <hello@matteocollina.com>

Signed-off-by: Matteo Collina <hello@matteocollina.com>
  • Loading branch information
mcollina committed Nov 6, 2022
1 parent 72703f2 commit 205dae0
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 2 deletions.
14 changes: 12 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ class WebSocketProxy {

findUpstream (request) {
const source = new URL(request.url, 'ws://127.0.0.1')

for (const { prefix, rewritePrefix, upstream, wsClientOptions } of this.prefixList) {
if (source.pathname.startsWith(prefix)) {
const target = new URL(source.pathname.replace(prefix, rewritePrefix), upstream)
Expand Down Expand Up @@ -162,7 +161,18 @@ function setupWebSocketProxy (fastify, options, rewritePrefix) {
})
}

wsProxy.addUpstream(fastify.prefix, rewritePrefix, options.upstream, options.wsClientOptions)
if (options.upstream !== '') {
wsProxy.addUpstream(fastify.prefix, rewritePrefix, options.upstream, options.wsClientOptions)
} else if (typeof options.replyOptions.getUpstream === 'function') {
wsProxy.findUpstream = function (request) {
const source = new URL(request.url, 'ws://127.0.0.1')
const upstream = options.replyOptions.getUpstream(request, '')
const target = new URL(source.pathname, upstream)
target.protocol = upstream.indexOf('http:') === 0 ? 'ws:' : 'wss'
target.search = source.search
return { target, wsClientOptions: options.wsClientOptions }
}
}
}

function generateRewritePrefix (prefix = '', opts) {
Expand Down
3 changes: 3 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ async function run () {
const server = Fastify()
server.register(proxy, {
upstream: '',
getWebSocketUpstream () {
t.fail('should never be called')
},
replyOptions: {
getUpstream: function (original, base) {
return `http://localhost:${origin.server.address().port}`
Expand Down
60 changes: 60 additions & 0 deletions test/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,63 @@ test('captures errors on start', async (t) => {
t.teardown(app.close.bind(app))
t.teardown(app2.close.bind(app2))
})

test('getWebSocketStream', async (t) => {
t.plan(7)

const origin = createServer()
const wss = new WebSocket.Server({ server: origin })
t.teardown(wss.close.bind(wss))
t.teardown(origin.close.bind(origin))

const serverMessages = []
wss.on('connection', (ws, request) => {
t.equal(ws.protocol, subprotocolValue)
t.equal(request.headers.cookie, cookieValue)
ws.on('message', (message, binary) => {
serverMessages.push([message.toString(), binary])
// echo
ws.send(message, { binary })
})
})

await promisify(origin.listen.bind(origin))({ port: 0 })

const server = Fastify()
server.register(proxy, {
upstream: '',
replyOptions: {
getUpstream: function (original, base) {
return `http://localhost:${origin.address().port}`
}
},
websocket: true
})

await server.listen({ port: 0 })
t.teardown(server.close.bind(server))

const options = { headers: { cookie: cookieValue } }
const ws = new WebSocket(`ws://localhost:${server.server.address().port}`, [subprotocolValue], options)
await once(ws, 'open')

ws.send('hello', { binary: false })
const [reply0, binary0] = await once(ws, 'message')
t.equal(reply0.toString(), 'hello')
t.equal(binary0, false)

ws.send(Buffer.from('fastify'), { binary: true })
const [reply1, binary1] = await once(ws, 'message')
t.equal(reply1.toString(), 'fastify')
t.equal(binary1, true)

t.strictSame(serverMessages, [
['hello', false],
['fastify', true]
])

await Promise.all([
once(ws, 'close'),
server.close()
])
})

0 comments on commit 205dae0

Please sign in to comment.