Skip to content

Commit

Permalink
Fix handling of compressed tables in COPY path
Browse files Browse the repository at this point in the history
The dispatch_state of ChunkDispatch was not set in the COPY path (see
the definition of ChunkDispatch). However, it was accessed to count the
number of decompressed tuples. This PR creates the needed state and
introduces a missing check for the amount of decompressed tuples.
  • Loading branch information
jnidzwetzki committed Mar 1, 2024
1 parent d120885 commit c288efa
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 5 deletions.
1 change: 1 addition & 0 deletions .unreleased/bugfix_6717
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #6717 Fix handling of compressed tables with primary or unique index in COPY path
23 changes: 23 additions & 0 deletions src/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ copy_chunk_state_create(Hypertable *ht, Relation rel, CopyFromFunc from_func, Co
ccstate->rel = rel;
ccstate->estate = estate;
ccstate->dispatch = ts_chunk_dispatch_create(ht, estate, 0);

/* In the copy path, no chunk dispatch node and no chunk dispatch state is available. Create an
* empty state to be able to count decompressed tuples. */
ccstate->dispatch->dispatch_state = palloc0(sizeof(ChunkDispatchState));

ccstate->cstate = cstate;
ccstate->scandesc = scandesc;
ccstate->next_copy_from = from_func;
Expand Down Expand Up @@ -328,6 +333,24 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf
NULL /* on chunk changed function */,
NULL /* payload for on chunk changed function */);

if (ts_guc_max_tuples_decompressed_per_dml > 0)
{
if (miinfo->ccstate->dispatch->dispatch_state->tuples_decompressed >
ts_guc_max_tuples_decompressed_per_dml)
{
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("tuple decompression limit exceeded by operation"),
errdetail("current limit: %d, tuples decompressed: %lld",
ts_guc_max_tuples_decompressed_per_dml,
(long long int)
miinfo->ccstate->dispatch->dispatch_state->tuples_decompressed),
errhint("Consider increasing "
"timescaledb.max_tuples_decompressed_per_dml_transaction or "
"set to 0 (unlimited).")));
}
}

ResultRelInfo *resultRelInfo = cis->result_relation_info;

/*
Expand Down
3 changes: 2 additions & 1 deletion src/nodes/chunk_dispatch/chunk_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
*/
typedef struct ChunkDispatch
{
/* Link to the executor state for INSERTs. This is not set for COPY path. */
/* Link to the executor state for INSERTs. This is an mostly empty dummy state in the COPY path.
*/
struct ChunkDispatchState *dispatch_state;
Hypertable *hypertable;
SubspaceStore *cache;
Expand Down
9 changes: 5 additions & 4 deletions src/nodes/chunk_dispatch/chunk_insert_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ chunk_dispatch_get_arbiter_indexes(const ChunkDispatch *dispatch)
static bool
chunk_dispatch_has_returning(const ChunkDispatch *dispatch)
{
if (!dispatch->dispatch_state)
if (!dispatch->dispatch_state || !dispatch->dispatch_state->mtstate)
return false;
return get_modifytable(dispatch)->returningLists != NIL;
}
Expand All @@ -87,16 +87,17 @@ chunk_dispatch_get_returning_clauses(const ChunkDispatch *dispatch)
OnConflictAction
chunk_dispatch_get_on_conflict_action(const ChunkDispatch *dispatch)
{
if (!dispatch->dispatch_state)
if (!dispatch->dispatch_state || !dispatch->dispatch_state->mtstate)
return ONCONFLICT_NONE;
return get_modifytable(dispatch)->onConflictAction;
}

static CmdType
chunk_dispatch_get_cmd_type(const ChunkDispatch *dispatch)
{
return dispatch->dispatch_state == NULL ? CMD_INSERT :
dispatch->dispatch_state->mtstate->operation;
return (dispatch->dispatch_state == NULL || dispatch->dispatch_state->mtstate == NULL) ?
CMD_INSERT :
dispatch->dispatch_state->mtstate->operation;
}

/*
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -2355,6 +2355,8 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
&tmfd,
false);
Assert(result == TM_Ok);

Assert(cis->cds != NULL);
cis->cds->batches_decompressed += decompressor.batches_decompressed;
cis->cds->tuples_decompressed += decompressor.tuples_decompressed;
}
Expand Down
48 changes: 48 additions & 0 deletions tsl/test/expected/compression.out
Original file line number Diff line number Diff line change
Expand Up @@ -2771,3 +2771,51 @@ SELECT * FROM ONLY :CHUNK2;
------+--------+-------
(0 rows)

------
--- Test copy with a compressed table with unique index
------
CREATE TABLE compressed_table (time timestamptz, a int, b int, c int);
CREATE UNIQUE INDEX compressed_table_index ON compressed_table(time, a, b, c);
SELECT create_hypertable('compressed_table', 'time');
NOTICE: adding not-null constraint to column "time"
create_hypertable
--------------------------------
(49,public,compressed_table,t)
(1 row)

ALTER TABLE compressed_table SET (timescaledb.compress, timescaledb.compress_segmentby='a', timescaledb.compress_orderby = 'time DESC');
WARNING: column "b" should be used for segmenting or ordering
WARNING: column "c" should be used for segmenting or ordering
COPY compressed_table (time,a,b,c) FROM stdin;
SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('compressed_table') i;
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_49_108_chunk
(1 row)

\set ON_ERROR_STOP 0
COPY compressed_table (time,a,b,c) FROM stdin;
ERROR: duplicate key value violates unique constraint "_hyper_49_108_chunk_compressed_table_index"
\set ON_ERROR_STOP 1
COPY compressed_table (time,a,b,c) FROM stdin;
SELECT * FROM compressed_table;
time | a | b | c
------------------------------------+----+---+---
Thu Feb 29 01:00:00 2024 PST | 5 | 1 | 1
Thu Feb 29 06:02:03.87313 2024 PST | 10 | 2 | 2
Thu Feb 29 06:02:03.87313 2024 PST | 20 | 3 | 3
(3 rows)

SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('compressed_table') i;
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_49_108_chunk
(1 row)

-- Check DML decompression limit
SET timescaledb.max_tuples_decompressed_per_dml_transaction = 1;
\set ON_ERROR_STOP 0
COPY compressed_table (time,a,b,c) FROM stdin;
ERROR: tuple decompression limit exceeded by operation
\set ON_ERROR_STOP 1
RESET timescaledb.max_tuples_decompressed_per_dml_transaction;
39 changes: 39 additions & 0 deletions tsl/test/sql/compression.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1120,3 +1120,42 @@ SELECT compress_chunk(:'CHUNK2');
-- should return no rows
SELECT * FROM ONLY :CHUNK2;

------
--- Test copy with a compressed table with unique index
------

CREATE TABLE compressed_table (time timestamptz, a int, b int, c int);
CREATE UNIQUE INDEX compressed_table_index ON compressed_table(time, a, b, c);

SELECT create_hypertable('compressed_table', 'time');
ALTER TABLE compressed_table SET (timescaledb.compress, timescaledb.compress_segmentby='a', timescaledb.compress_orderby = 'time DESC');

COPY compressed_table (time,a,b,c) FROM stdin;
2024-02-29 10:00:00.00000+01 5 1 1
2024-02-29 15:02:03.87313+01 10 2 2
\.

SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('compressed_table') i;

\set ON_ERROR_STOP 0
COPY compressed_table (time,a,b,c) FROM stdin;
2024-02-29 15:02:03.87313+01 10 2 2
\.
\set ON_ERROR_STOP 1

COPY compressed_table (time,a,b,c) FROM stdin;
2024-02-29 15:02:03.87313+01 20 3 3
\.

SELECT * FROM compressed_table;
SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('compressed_table') i;

-- Check DML decompression limit
SET timescaledb.max_tuples_decompressed_per_dml_transaction = 1;
\set ON_ERROR_STOP 0
COPY compressed_table (time,a,b,c) FROM stdin;
2024-02-29 15:02:03.87313+01 10 4 4
2024-02-29 15:02:03.87313+01 20 5 5
\.
\set ON_ERROR_STOP 1
RESET timescaledb.max_tuples_decompressed_per_dml_transaction;

0 comments on commit c288efa

Please sign in to comment.