Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cohttp EOF read #1035

Open
wants to merge 2 commits into
base: v5-backports
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cohttp-lwt-unix/src/io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,32 @@ let catch f =
| ex -> Lwt.fail ex)

let pp_error = Fmt.exn

let wait_eof_or_closed conn ic sleep_fn =
let wait_for_cancel () = fst (Lwt.task ()) in
match (conn : Conduit_lwt_unix.flow) with
| Vchan _ -> wait_for_cancel ()
| TCP { fd; _ } | Domain_socket { fd; _ } ->
let peek_buffer = Bytes.create 1 in
let has_recv_eof fd =
(* MSG_PEEK does not consume data from the stream and does not
impact normal read operations *)
Lwt_unix.recv fd peek_buffer 0 1 Unix.[ MSG_PEEK ] >>= fun n ->
Lwt.return (n = 0)
in
let rec loop fd =
(* Calls [sleep_fn] to allow yielding control to the request handler *)
sleep_fn () >>= fun () ->
if Lwt_io.is_closed ic then
(* The connection was closed locally. Stop waiting for EOF.
The client has closed the connection and now possibly is doing
some clean up. We should not interrupt this. Let's wait
till the promise for the request handling is resolved and then this
promise will be cancelled. *)
wait_for_cancel ()
else
has_recv_eof fd >>= function
| true -> Lwt.return_unit
| false -> loop fd
in
loop fd
5 changes: 5 additions & 0 deletions cohttp-lwt-unix/test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
(name test_body)
(libraries cohttp_lwt_unix_test cohttp-lwt-unix))

(executable
(modules test_leak)
(name test_leak)
(libraries cohttp-lwt-unix))

(rule
(alias runtest)
(package cohttp-lwt-unix)
Expand Down
48 changes: 48 additions & 0 deletions cohttp-lwt-unix/test/test_leak.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
(*
This test is meant to be used in the following way:
- `make` the whole repo
- `dune exec ./_build/default/cohttp-lwt-unix/test/test_leak.exe`
- `curl -s 'localhost:8080/sleep'` in a different console
- observe the first console having a stream of "sleep messages"
- when stopping (CTRL+C) the `curl` request, the first console
should show a closing connection message; if it does, then the
test is successful, otherwise (and the server keeps sleeping),
the test failed.
*)

open Lwt
open Cohttp_lwt_unix

let port = 8080

let callback (_, con) req _body =
(* Record connection established *)
let con_string = Cohttp.Connection.to_string con in
Format.printf "Cohttp connection on %s@." con_string;
(* Match given endpoint *)
let uri = req |> Request.uri |> Uri.path in
match uri with
| "/sleep" ->
(* Continuous sleep *)
let rec get_busy () =
Lwt_unix.sleep 1.0 >>= fun () ->
Format.printf "I slept @.";
get_busy ()
in
get_busy ()
(* Unknown call *)
| _ -> Server.respond_string ~status:`Not_found ~body:"Not found" ()

let start_server () =
let server =
Server.create
~mode:(`TCP (`Port port))
(Server.make
~conn_closed:(fun _ -> Format.printf "Cohttp connection closed\n%!")
~sleep_fn:(fun () -> Lwt_unix.sleep 1.0)
~callback ())
in
Printf.printf "Server running on port %d\n%!" port;
server

let () = Lwt_main.run (start_server ())
30 changes: 30 additions & 0 deletions cohttp-lwt/src/s.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ module type IO = sig
which case it returns the error. *)

val pp_error : Format.formatter -> error -> unit

val wait_eof_or_closed : conn -> ic -> (unit -> unit t) -> unit t
(** [wait_eof_or_closed conn ic sleep_fn] waits for an EOF or a Closed status
on the input channel [ic]. This function is designed to be used in
[Lwt.pick] to run concurrently with the request handling from the input
channel. The function checks for EOF using [MSG_PEEK] on the input channel
without consuming data, thereby not disturbing the request handling. If
the connection is closed locally, Cohttp will stop waiting for EOF and
will wait the promise to be cancelled. This function ensures that the
monitoring does not spin too quickly and uses CPU efficiently when the
input channel has read activity but the client is not reading it.

[sleep_fn] is a parameter function used to yield control periodically,
keeping Cohttp platform-independent. *)
end

(** The [Net] module type defines how to connect to a remote node and close the
Expand Down Expand Up @@ -155,12 +169,27 @@ module type Server = sig

val make_response_action :
?conn_closed:(conn -> unit) ->
?sleep_fn:(unit -> unit Lwt.t) ->
callback:(conn -> Cohttp.Request.t -> Body.t -> response_action Lwt.t) ->
unit ->
t
(** [make_response_action] creates a set of callbacks used by Cohttp Server.

- [callback] is called when a new connection is accepted by the server
socket.
- [conn_closed] if provided, will be called when the connection is closed,
e.g. when an EOF is received.
- [sleep_fn] if provided, will be used for periodic checks for EOF from
the client. If this callback is not provided, Cohttp will not detect and
notify the client about EOF received from the peer while the client is
handling the new connection. This can lead to a resource leak if the
[callback] is designed to never resolve. If the connection is closed
locally, Cohttp will stop waiting for EOF and will wait the promise to
be cancelled. *)

val make_expert :
?conn_closed:(conn -> unit) ->
?sleep_fn:(unit -> unit Lwt.t) ->
callback:
(conn ->
Cohttp.Request.t ->
Expand All @@ -171,6 +200,7 @@ module type Server = sig

val make :
?conn_closed:(conn -> unit) ->
?sleep_fn:(unit -> unit Lwt.t) ->
callback:
(conn -> Cohttp.Request.t -> Body.t -> (Cohttp.Response.t * Body.t) Lwt.t) ->
unit ->
Expand Down
60 changes: 39 additions & 21 deletions cohttp-lwt/src/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@ module Make (IO : S.IO) = struct
type t = {
callback : conn -> Cohttp.Request.t -> Body.t -> response_action Lwt.t;
conn_closed : conn -> unit;
sleep_fn : (unit -> unit Lwt.t) option;
}

let make_response_action ?(conn_closed = ignore) ~callback () =
{ conn_closed; callback }
let make_response_action ?(conn_closed = ignore) ?sleep_fn ~callback () =
{ conn_closed; callback; sleep_fn }

let make ?conn_closed ~callback () =
let make ?conn_closed ?sleep_fn ~callback () =
let callback conn req body =
callback conn req body >|= fun rsp -> `Response rsp
in
make_response_action ?conn_closed ~callback ()
make_response_action ?conn_closed ?sleep_fn ~callback ()

let make_expert ?conn_closed ~callback () =
let make_expert ?conn_closed ?sleep_fn ~callback () =
let callback conn req body =
callback conn req body >|= fun rsp -> `Expert rsp
in
make_response_action ?conn_closed ~callback ()
make_response_action ?conn_closed ?sleep_fn ~callback ()

module Transfer_IO = Cohttp__Transfer_io.Make (IO)

Expand Down Expand Up @@ -111,50 +112,67 @@ module Make (IO : S.IO) = struct
`Response rsp))
(fun () -> Body.drain_body body)

let handle_response ~keep_alive oc res body conn_closed handle_client =
type conn_action = Call_conn_closed | Call_conn_closed_and_drain of Body.t

let handle_response ~keep_alive oc res body handle_client =
IO.catch (fun () ->
let flush = Response.flush res in
Response.write ~flush
(fun writer -> Body.write_body (Response.write_body writer) body)
res oc)
>>= function
| Ok () ->
if keep_alive then handle_client oc
else
let () = conn_closed () in
Lwt.return_unit
if keep_alive then handle_client oc else Lwt.return Call_conn_closed
| Error e ->
Log.info (fun m -> m "IO error while writing body: %a" IO.pp_error e);
conn_closed ();
Body.drain_body body
Lwt.return (Call_conn_closed_and_drain body)

let rec handle_client ic oc conn spec =
Request.read ic >>= function
| `Eof ->
spec.conn_closed conn;
Lwt.return_unit
Log.debug (fun m ->
m "Got EOF while handling client: %s"
(Cohttp.Connection.to_string (snd conn)));
Lwt.return Call_conn_closed
| `Invalid data ->
Log.err (fun m -> m "invalid input %s while handling client" data);
spec.conn_closed conn;
Lwt.return_unit
Lwt.return Call_conn_closed
| `Ok req -> (
let body = read_body ic req in
handle_request spec.callback conn req body >>= function
| `Response (res, body) ->
let keep_alive = Request.is_keep_alive req in
handle_response ~keep_alive oc res body
(fun () -> spec.conn_closed conn)
(fun oc -> handle_client ic oc conn spec)
handle_response ~keep_alive oc res body (fun oc ->
handle_client ic oc conn spec)
| `Expert (res, io_handler) ->
Response.write_header res oc >>= fun () ->
io_handler ic oc >>= fun () -> handle_client ic oc conn spec)

let callback spec io_id ic oc =
let conn_id = Cohttp.Connection.create () in
let conn_closed () = spec.conn_closed (io_id, conn_id) in
let handle () = handle_client ic oc (io_id, conn_id) spec in
let is_conn_closed () =
(* Without a sleep function we cannot safely loop waiting for EOF *)
match spec.sleep_fn with
| None -> fst (Lwt.task ()) (* wait to be cancelled *)
| Some sleep_fn ->
IO.wait_eof_or_closed io_id ic sleep_fn >>= fun () ->
Log.debug (fun m ->
m "Client closed the connection, got EOF for %s"
(Cohttp.Connection.to_string conn_id));
Lwt.return Call_conn_closed
in
Lwt.catch
(fun () ->
IO.catch (fun () -> handle_client ic oc (io_id, conn_id) spec)
IO.catch (fun () ->
Lwt.pick [ handle (); is_conn_closed () ] >>= function
| Call_conn_closed ->
conn_closed ();
Lwt.return_unit
| Call_conn_closed_and_drain body ->
conn_closed ();
Body.drain_body body)
>>= function
| Ok () -> Lwt.return_unit
| Error e ->
Expand Down
2 changes: 2 additions & 0 deletions cohttp-mirage/src/io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,6 @@ module Make (Channel : Mirage_channel.S) = struct
| Read_exn e -> Lwt.return_error (Read_error e)
| Write_exn e -> Lwt.return_error (Write_error e)
| ex -> Lwt.fail ex)

let wait_eof_or_closed _conn _ic _sleep_fn = assert false
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we ask for the advice of the maintainers of the library, as we advocate that this function will never be called from this library, but we want to receive a second opinion.

end
Loading