Skip to content

Commit abbd7a0

Browse files
committed
Add put_copy_data, put_copy_end, and get_copy_data.
These were introduced in libpq 7.4, obsoleting the other functions dealing with COPY data.
1 parent 11a0632 commit abbd7a0

File tree

5 files changed

+238
-1
lines changed

5 files changed

+238
-1
lines changed

examples/copy.ml

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
open Postgresql
2+
open Printf
3+
4+
let _ =
5+
if Array.length Sys.argv <> 2 then (
6+
Printf.printf "\
7+
Usage: copy conninfo\n\
8+
Connect to PostgreSQL with [conninfo] (e.g. \"host=localhost\"),\n";
9+
exit 1)
10+
11+
let create_sql = "\
12+
CREATE TEMPORARY TABLE pgo_copy_test (\
13+
id SERIAL PRIMARY KEY, \
14+
name text UNIQUE NOT NULL, \
15+
x integer NOT NULL\
16+
)"
17+
18+
let populate ?error_msg (c : connection) =
19+
let _ = c#exec ~expect:[Copy_in] "COPY pgo_copy_test (name, x) FROM STDIN" in
20+
for i = 0 to 9999 do
21+
match c#put_copy_data (sprintf "c%d\t%d\n" i i) with
22+
| Put_copy_error | Put_copy_not_queued -> assert false
23+
| Put_copy_queued -> ()
24+
done;
25+
begin match c#put_copy_end ?error_msg () with
26+
| Put_copy_error | Put_copy_not_queued -> assert false
27+
| Put_copy_queued -> ()
28+
end;
29+
match c#get_result with
30+
| None -> assert false
31+
| Some result ->
32+
begin match error_msg, result#status with
33+
| None, Command_ok -> ()
34+
| Some _msg, Fatal_error -> ()
35+
| _ -> assert false
36+
end
37+
38+
let verify (c : connection) =
39+
let _ =
40+
c#exec ~expect:[Copy_out] "COPY pgo_copy_test (id, name, x) TO STDOUT" in
41+
for i = 0 to 9999 do
42+
match c#get_copy_data () with
43+
| Get_copy_data data ->
44+
assert (data = sprintf "%d\tc%d\t%d\n" (i + 1) i i)
45+
| Get_copy_wait | Get_copy_end | Get_copy_error -> assert false
46+
done;
47+
match c#get_copy_data () with
48+
| Get_copy_end -> ()
49+
| _ -> assert false
50+
51+
let main () =
52+
let c = new connection ~conninfo:Sys.argv.(1) () in
53+
let _ = c#exec ~expect:[Command_ok] create_sql in
54+
populate c;
55+
populate c ~error_msg:"test failure";
56+
verify c;
57+
c#finish
58+
59+
let () =
60+
try main () with
61+
| Error e -> prerr_endline (string_of_error e)
62+
| e -> prerr_endline (Printexc.to_string e)

examples/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
(executables
2-
(names async binary cursor dump populate prompt test_lo)
2+
(names async binary copy cursor dump populate prompt test_lo)
33
(libraries postgresql)
44
)

src/postgresql.ml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,17 @@ type result_status =
294294

295295
external result_status : result_status -> string = "PQresStatus_stub"
296296

297+
type put_copy_result =
298+
| Put_copy_queued
299+
| Put_copy_not_queued
300+
| Put_copy_error
301+
302+
type get_copy_result =
303+
| Get_copy_data of string
304+
| Get_copy_wait
305+
| Get_copy_end
306+
| Get_copy_error
307+
297308
type getline_result = EOF | LineRead | BufFull
298309

299310
type getline_async_result =
@@ -505,6 +516,19 @@ module Stub = struct
505516

506517
(* Functions Associated with the COPY Command *)
507518

519+
external put_copy_data :
520+
connection -> string ->
521+
(int [@untagged]) -> (int [@untagged]) -> (int [@untagged])
522+
= "PQputCopyData_bc" "PQputCopyData_stub"
523+
524+
external put_copy_end :
525+
connection -> string option -> (int [@untagged])
526+
= "PQputCopyEnd_bc" "PQputCopyEnd_stub"
527+
528+
external get_copy_data :
529+
connection -> (int [@untagged]) -> get_copy_result
530+
= "PQgetCopyData_bc" "PQgetCopyData_stub"
531+
508532
external getline :
509533
connection -> Bytes.t ->
510534
(int [@untagged]) -> (int [@untagged]) -> (int [@untagged])
@@ -976,6 +1000,29 @@ object (self)
9761000

9771001
(* Low level *)
9781002

1003+
method put_copy_data ?(pos = 0) ?len buf =
1004+
let buf_len = String.length buf in
1005+
let len = match len with Some len -> len | None -> buf_len - pos in
1006+
if len < 0 || pos < 0 || pos + len > buf_len then
1007+
invalid_arg "Postgresql.connection#put_copy_data";
1008+
wrap_conn (fun conn ->
1009+
match Stub.put_copy_data conn buf pos len with
1010+
| -1 -> Put_copy_error
1011+
| 0 -> Put_copy_not_queued
1012+
| 1 -> Put_copy_queued
1013+
| _ -> assert false)
1014+
1015+
method put_copy_end ?error_msg () =
1016+
wrap_conn (fun conn ->
1017+
match Stub.put_copy_end conn error_msg with
1018+
| -1 -> Put_copy_error
1019+
| 0 -> Put_copy_not_queued
1020+
| 1 -> Put_copy_queued
1021+
| _ -> assert false)
1022+
1023+
method get_copy_data ?(async = false) () =
1024+
wrap_conn (fun conn -> Stub.get_copy_data conn (if async then 1 else 0))
1025+
9791026
method getline ?(pos = 0) ?len buf =
9801027
let buf_len = Bytes.length buf in
9811028
let len = match len with Some len -> len | None -> buf_len - pos in

src/postgresql.mli

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,20 @@ type result_status =
124124
| Copy_both
125125
| Single_tuple (** One tuple of a result set ({!set_single_row_mode}) *)
126126

127+
(** Result of put_copy_data and put_copy_end *)
128+
type put_copy_result =
129+
| Put_copy_queued (** Data queued *)
130+
| Put_copy_not_queued (** Data not queued due to full bufffers (async only) *)
131+
| Put_copy_error (** Copying failed, see [#error_message] for details *)
132+
133+
(** Result of get_copy_data *)
134+
type get_copy_result =
135+
| Get_copy_data of string (** Data corresponding to one row is retured *)
136+
| Get_copy_wait (** The next row is still being received (async only); wait
137+
for read-only, call [consume_input], and try again *)
138+
| Get_copy_end (** All data has been successfully retrieved *)
139+
| Get_copy_error (** Copying failed, see [#error_message] for details *)
140+
127141
(** Result of getline *)
128142
type getline_result =
129143
| EOF (** End of input reached *)
@@ -817,6 +831,40 @@ object
817831

818832
(** Low level *)
819833

834+
method put_copy_data : ?pos : int -> ?len : int -> string -> put_copy_result
835+
(** [put_copy_data ?pos ?len buf] sends [buf] of length [len] starting at
836+
[pos] to the backend server, which must be in copy-in mode. In
837+
non-blocking mode, returns {!Put_copy_not_queued} if the data was not
838+
queued due to full buffers.
839+
840+
@param pos default = 0
841+
@param len default = String.length - pos
842+
843+
@raise Invalid_argument if the buffer parameters are invalid.
844+
*)
845+
846+
method put_copy_end : ?error_msg : string -> unit -> put_copy_result
847+
(** [put_copy_end ?error_msg ()] terminates the copy-in mode, leaving the
848+
connection in [Command_ok] or failed state. In non-blocking mode, returns
849+
{!Put_copy_not_queued} if the termination message was not queued due to
850+
full buffers. Also, to ensure delivery of data in non-blocking mode,
851+
repeatedly wait for write-ready an call {!#flush}.
852+
853+
@param error_msg if set, force the copy operation to fail with the given
854+
message.
855+
*)
856+
857+
method get_copy_data : ?async : bool -> unit -> get_copy_result
858+
(** [get_copy_data ?async ()] retrieves the next row of data if available.
859+
Only single complete rows are returned. In synchronous mode, the call
860+
will wait for completion of the next row. In asynchronous mode it will
861+
return immediately with [Get_copy_wait] if the row transfer is incomplete.
862+
In that case, wait for read-ready and call {!#consume_input} before
863+
retrying.
864+
865+
@param async default = false
866+
*)
867+
820868
method getline : ?pos : int -> ?len : int -> Bytes.t -> getline_result
821869
(** [getline ?pos ?len buf] reads a newline-terminated line of at most
822870
[len] characters into [buf] starting at position [pos].

src/postgresql_stubs.c

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,6 +1211,86 @@ CAMLprim value PQnotifies_stub(value v_conn)
12111211

12121212
/* Functions Associated with the COPY Command */
12131213

1214+
CAMLprim intnat PQputCopyData_stub(
1215+
value v_conn, value v_buf, intnat pos, intnat len)
1216+
{
1217+
CAMLparam2(v_conn, v_buf);
1218+
PGconn *conn = get_conn(v_conn);
1219+
intnat res;
1220+
char *buf = caml_stat_alloc(len);
1221+
memcpy(buf, String_val(v_buf) + pos, len);
1222+
caml_enter_blocking_section();
1223+
res = PQputCopyData(conn, buf, len);
1224+
caml_leave_blocking_section();
1225+
caml_stat_free(buf);
1226+
CAMLreturn(res);
1227+
}
1228+
1229+
CAMLprim value PQputCopyData_bc(
1230+
value v_conn, value v_buf, value v_pos, value v_len)
1231+
{
1232+
return
1233+
Val_int(PQputCopyData_stub(v_conn, v_buf, Int_val(v_pos), Int_val(v_len)));
1234+
}
1235+
1236+
CAMLprim intnat PQputCopyEnd_stub(value v_conn, value v_msg_opt)
1237+
{
1238+
CAMLparam2(v_conn, v_msg_opt);
1239+
intnat res;
1240+
PGconn *conn = get_conn(v_conn);
1241+
char *msg = NULL;
1242+
if (Is_block(v_msg_opt)) {
1243+
value v_msg = Field(v_msg_opt, 0);
1244+
size_t msg_len = caml_string_length(v_msg);
1245+
msg = caml_stat_alloc(msg_len + 1);
1246+
memcpy(msg, String_val(v_msg), msg_len);
1247+
msg[msg_len] = '\0';
1248+
}
1249+
caml_enter_blocking_section();
1250+
res = PQputCopyEnd(conn, msg);
1251+
caml_leave_blocking_section();
1252+
if (msg)
1253+
caml_stat_free(msg);
1254+
CAMLreturn(res);
1255+
}
1256+
1257+
CAMLprim value PQputCopyEnd_bc(value v_conn, value v_msg)
1258+
{
1259+
return Val_int(PQputCopyEnd_stub(v_conn, v_msg));
1260+
}
1261+
1262+
CAMLprim value PQgetCopyData_stub(value v_conn, intnat async)
1263+
{
1264+
CAMLparam1(v_conn);
1265+
CAMLlocal2(v_buf, v_result);
1266+
PGconn *conn = get_conn(v_conn);
1267+
char *buf;
1268+
intnat res;
1269+
caml_enter_blocking_section();
1270+
res = PQgetCopyData(conn, &buf, async);
1271+
caml_leave_blocking_section();
1272+
switch (res) {
1273+
case 0:
1274+
CAMLreturn(Val_int(0)); /* Get_copy_wait */
1275+
case -1:
1276+
CAMLreturn(Val_int(1)); /* Get_copy_end */
1277+
case -2:
1278+
CAMLreturn(Val_int(2)); /* Get_copy_error */
1279+
default:
1280+
v_buf = caml_alloc_string(res);
1281+
memcpy(String_val(v_buf), buf, res);
1282+
PQfreemem(buf);
1283+
v_result = caml_alloc(1, 0); /* Get_copy_data */
1284+
Store_field(v_result, 0, v_buf);
1285+
CAMLreturn(v_result);
1286+
}
1287+
}
1288+
1289+
CAMLprim value PQgetCopyData_bc(value v_conn, value v_async)
1290+
{
1291+
return PQgetCopyData_stub(v_conn, Int_val(v_async));
1292+
}
1293+
12141294
CAMLprim intnat PQgetline_stub(
12151295
value v_conn, value v_buf, intnat pos, intnat len)
12161296
{

0 commit comments

Comments
 (0)