Skip to content

Commit

Permalink
Support uploads over longpoll (#5513)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismccord authored Jul 10, 2023
1 parent a76ea52 commit 5d31b29
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 149 deletions.
10 changes: 10 additions & 0 deletions assets/js/phoenix/longpoll.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import {

import Ajax from "./ajax"

let arrayBufferToBase64 = (buffer) => {
let binary = ""
let bytes = new Uint8Array(buffer)
let len = bytes.byteLength
for(let i = 0; i < len; i++){ binary += String.fromCharCode(bytes[i]) }
return btoa(binary)
}

export default class LongPoll {

constructor(endPoint){
Expand Down Expand Up @@ -107,7 +115,9 @@ export default class LongPoll {
// we collect all pushes within the current event loop by
// setTimeout 0, which optimizes back-to-back procedural
// pushes against an empty buffer

send(body){
if(typeof(body) !== "string"){ body = arrayBufferToBase64(body) }
if(this.currentBatch){
this.currentBatch.push(body)
} else if(this.awaitingBatchAck){
Expand Down
54 changes: 44 additions & 10 deletions lib/phoenix/transports/long_poll.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ defmodule Phoenix.Transports.LongPoll do
@moduledoc false
@behaviour Plug

# 10MB
@max_base64_size 10_000_000

import Plug.Conn
alias Phoenix.Socket.{V1, V2, Transport}

Expand Down Expand Up @@ -49,6 +52,7 @@ defmodule Phoenix.Transports.LongPoll do
case resume_session(conn.params, endpoint, opts) do
{:ok, server_ref} ->
listen(conn, server_ref, endpoint, opts)

:error ->
new_session(conn, endpoint, handler, opts)
end
Expand All @@ -59,6 +63,7 @@ defmodule Phoenix.Transports.LongPoll do
case resume_session(conn.params, endpoint, opts) do
{:ok, server_ref} ->
publish(conn, server_ref, endpoint, opts)

:error ->
conn |> put_status(:gone) |> status_json()
end
Expand All @@ -75,8 +80,16 @@ defmodule Phoenix.Transports.LongPoll do
# we need to match on both v1 and v2 protocol, as well as wrap for backwards compat
batch =
case get_req_header(conn, "content-type") do
["application/x-ndjson"] -> String.split(body, ["\n", "\r\n"])
_ -> [body]
["application/x-ndjson"] ->
body
|> String.split(["\n", "\r\n"])
|> Enum.map(fn
"[" <> _ = txt -> {txt, :text}
base64 -> {safe_decode64!(base64), :binary}
end)

_ ->
[{body, :text}]
end

{conn, status} =
Expand All @@ -94,6 +107,14 @@ defmodule Phoenix.Transports.LongPoll do
end
end

defp safe_decode64!(base64) do
if byte_size(base64) <= @max_base64_size do
Base.decode64!(base64)
else
raise Plug.BadRequestError
end
end

defp transport_dispatch(endpoint, server_ref, body, opts) do
ref = make_ref()
broadcast_from!(endpoint, server_ref, {:dispatch, client_ref(server_ref), body, ref})
Expand All @@ -110,9 +131,9 @@ defmodule Phoenix.Transports.LongPoll do

defp new_session(conn, endpoint, handler, opts) do
priv_topic =
"phx:lp:"
<> Base.encode64(:crypto.strong_rand_bytes(16))
<> (System.system_time(:millisecond) |> Integer.to_string)
"phx:lp:" <>
Base.encode64(:crypto.strong_rand_bytes(16)) <>
(System.system_time(:millisecond) |> Integer.to_string())

keys = Keyword.get(opts, :connect_info, [])
connect_info = Transport.connect_info(conn, endpoint, keys)
Expand All @@ -124,7 +145,7 @@ defmodule Phoenix.Transports.LongPoll do
conn |> put_status(:forbidden) |> status_json()

{:ok, server_pid} ->
data = {:v1, endpoint.config(:endpoint_id), server_pid, priv_topic}
data = {:v1, endpoint.config(:endpoint_id), server_pid, priv_topic}
token = sign_token(endpoint, data, opts)
conn |> put_status(:gone) |> status_token_messages_json(token, [])
end
Expand All @@ -141,10 +162,11 @@ defmodule Phoenix.Transports.LongPoll do

{:now_available, ^ref} ->
broadcast_from!(endpoint, server_ref, {:flush, client_ref(server_ref), ref})

receive do
{:messages, messages, ^ref} -> {:ok, messages}
after
opts[:window_ms] -> {:no_content, []}
opts[:window_ms] -> {:no_content, []}
end
after
opts[:window_ms] ->
Expand All @@ -170,7 +192,7 @@ defmodule Phoenix.Transports.LongPoll do
receive do
{:subscribe, ^ref} -> {:ok, server_ref}
after
opts[:pubsub_timeout_ms] -> :error
opts[:pubsub_timeout_ms] -> :error
end

_ ->
Expand All @@ -195,20 +217,32 @@ defmodule Phoenix.Transports.LongPoll do

defp subscribe(endpoint, topic) when is_binary(topic),
do: Phoenix.PubSub.subscribe(endpoint.config(:pubsub_server), topic, link: true)

defp subscribe(_endpoint, pid) when is_pid(pid),
do: :ok

defp broadcast_from!(endpoint, topic, msg) when is_binary(topic),
do: Phoenix.PubSub.broadcast_from!(endpoint.config(:pubsub_server), self(), topic, msg)

defp broadcast_from!(_endpoint, pid, msg) when is_pid(pid),
do: send(pid, msg)

defp sign_token(endpoint, data, opts) do
Phoenix.Token.sign(endpoint, Atom.to_string(endpoint.config(:pubsub_server)), data, opts[:crypto])
Phoenix.Token.sign(
endpoint,
Atom.to_string(endpoint.config(:pubsub_server)),
data,
opts[:crypto]
)
end

defp verify_token(endpoint, signed, opts) do
Phoenix.Token.verify(endpoint, Atom.to_string(endpoint.config(:pubsub_server)), signed, opts[:crypto])
Phoenix.Token.verify(
endpoint,
Atom.to_string(endpoint.config(:pubsub_server)),
signed,
opts[:crypto]
)
end

defp status_json(conn) do
Expand Down
4 changes: 2 additions & 2 deletions lib/phoenix/transports/long_poll_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ defmodule Phoenix.Transports.LongPoll.Server do
end
end

def handle_info({:dispatch, client_ref, body, ref}, state) do
def handle_info({:dispatch, client_ref, {body, opcode}, ref}, state) do
%{handler: {handler, handler_state}} = state

case handler.handle_in({body, opcode: :text}, handler_state) do
case handler.handle_in({body, opcode: opcode}, handler_state) do
{:reply, status, {_, reply}, handler_state} ->
state = %{state | handler: {handler, handler_state}}
status = if status == :ok, do: :ok, else: :error
Expand Down
Loading

0 comments on commit 5d31b29

Please sign in to comment.