Skip to content

Commit

Permalink
Remove stats update from incremental recompression
Browse files Browse the repository at this point in the history
Compression size stats are formed during initial compression.
At that time, we know what the uncompressed stats were before
the operation and the compressed stats after. But during incremental
recompression, we are not decompressing the whole chunk thus we
cannot update those statistics. So far, we only updated compressed
stats and the tuple count. This ends up making the compression
ratio incorrect since we are not updating all the stats. Removing
any updates during incremental recompression will at least keep
the initial stats consistent which is better than partially updated
stats.
  • Loading branch information
antekresic committed Mar 14, 2024
1 parent ae9b83b commit 8a40e55
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 153 deletions.
112 changes: 4 additions & 108 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,83 +188,6 @@ compression_chunk_size_catalog_update_merged(int32 chunk_id, const RelationSize
return updated;
}

/*
* This function updates catalog chunk compression statistics
* for an existing compressed chunk after it has been recompressed
* segmentwise in-place (as opposed to creating a new compressed chunk).
* Note that because of this it is not possible to accurately report
* the fields
* uncompressed_chunk_size, uncompressed_index_size, uncompressed_toast_size
* anymore, so these are not updated.
*/
static int
compression_chunk_size_catalog_update_recompressed(int32 uncompressed_chunk_id,
int32 compressed_chunk_id,
const RelationSize *recompressed_size,
int64 rowcnt_pre_compression,
int64 rowcnt_post_compression)
{
ScanIterator iterator =
ts_scan_iterator_create(COMPRESSION_CHUNK_SIZE, RowExclusiveLock, CurrentMemoryContext);
bool updated = false;

iterator.ctx.index =
catalog_get_index(ts_catalog_get(), COMPRESSION_CHUNK_SIZE, COMPRESSION_CHUNK_SIZE_PKEY);
ts_scan_iterator_scan_key_init(&iterator,
Anum_compression_chunk_size_pkey_chunk_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(uncompressed_chunk_id));
ts_scanner_foreach(&iterator)
{
Datum values[Natts_compression_chunk_size];
bool replIsnull[Natts_compression_chunk_size] = { false };
bool repl[Natts_compression_chunk_size] = { false };
bool should_free;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
HeapTuple new_tuple;
heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, replIsnull);

/* Only update the information pertaining to the compressed chunk */
/* these fields are about the compressed chunk so they can be updated */
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_heap_size)] =
Int64GetDatum(recompressed_size->heap_size);
repl[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_heap_size)] = true;

values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_toast_size)] =
Int64GetDatum(recompressed_size->toast_size);
repl[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_toast_size)] = true;

values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_index_size)] =
Int64GetDatum(recompressed_size->index_size);
repl[AttrNumberGetAttrOffset(Anum_compression_chunk_size_compressed_index_size)] = true;

values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_numrows_pre_compression)] =
Int64GetDatum(rowcnt_pre_compression);
repl[AttrNumberGetAttrOffset(Anum_compression_chunk_size_numrows_pre_compression)] = true;

values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_numrows_post_compression)] =
Int64GetDatum(rowcnt_post_compression);
repl[AttrNumberGetAttrOffset(Anum_compression_chunk_size_numrows_post_compression)] = true;

new_tuple =
heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, replIsnull, repl);
ts_catalog_update(ti->scanrel, new_tuple);
heap_freetuple(new_tuple);

if (should_free)
heap_freetuple(tuple);

updated = true;
break;
}

ts_scan_iterator_end(&iterator);
ts_scan_iterator_close(&iterator);
return updated;
}

static void
get_hypertable_or_cagg_name(Hypertable *ht, Name objname)
{
Expand Down Expand Up @@ -1179,14 +1102,8 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ExclusiveLock);
Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, ExclusiveLock);

/****** compression statistics ******/
RelationSize after_size;
int64 skipped_uncompressed_rows = 0;
int64 skipped_compressed_rows = 0;

Tuplesortstate *segment_tuplesortstate;

/*************** tuplesort state *************************/
Tuplesortstate *segment_tuplesortstate;
TupleDesc compressed_rel_tupdesc = RelationGetDescr(compressed_chunk_rel);
TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel);

Expand Down Expand Up @@ -1284,9 +1201,6 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
TupleTableSlot *slot = table_slot_create(compressed_chunk_rel, NULL);
index_rescan(index_scan, NULL, 0, NULL, 0);

Datum val;
bool is_null;

while (index_getnext_slot(index_scan, ForwardScanDirection, slot))
{
slot_getallattrs(slot);
Expand Down Expand Up @@ -1346,12 +1260,6 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)

if (skip_current_segment)
{
val = slot_getattr(slot,
AttrOffsetGetAttrNumber(row_compressor.count_metadata_column_offset),
&is_null);
Assert(!is_null);
skipped_uncompressed_rows += DatumGetInt32(val);
skipped_compressed_rows++;
continue;
}

Expand Down Expand Up @@ -1419,29 +1327,17 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
CommandCounterIncrement();
}

after_size = ts_relation_size_impl(compressed_chunk->table_id);
/* the compression size statistics we are able to update and accurately report are:
* rowcount pre/post compression,
* compressed chunk sizes */
row_compressor.rowcnt_pre_compression += skipped_uncompressed_rows;
row_compressor.num_compressed_rows += skipped_compressed_rows;
compression_chunk_size_catalog_update_recompressed(uncompressed_chunk->fd.id,
compressed_chunk->fd.id,
&after_size,
row_compressor.rowcnt_pre_compression,
row_compressor.num_compressed_rows);

row_compressor_close(&row_compressor);
ExecDropSingleTupleTableSlot(slot);
index_endscan(index_scan);
UnregisterSnapshot(snapshot);
index_close(index_rel, ExclusiveLock);
index_close(index_rel, NoLock);
row_decompressor_close(&decompressor);

/* changed chunk status, so invalidate any plans involving this chunk */
CacheInvalidateRelcacheByRelid(uncompressed_chunk_id);
table_close(uncompressed_chunk_rel, ExclusiveLock);
table_close(compressed_chunk_rel, ExclusiveLock);
table_close(uncompressed_chunk_rel, NoLock);
table_close(compressed_chunk_rel, NoLock);

PG_RETURN_OID(uncompressed_chunk_id);
}
22 changes: 10 additions & 12 deletions tsl/test/expected/recompress_chunk_segmentwise.out
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ SELECT ctid, * FROM :compressed_chunk_name_1;
select numrows_pre_compression, numrows_post_compression from _timescaledb_catalog.compression_chunk_size;
numrows_pre_compression | numrows_post_compression
-------------------------+--------------------------
3 | 1
2 | 1
(1 row)

---------------- test1: one affected segment, one unaffected --------------
Expand All @@ -107,10 +107,7 @@ select compress_chunk(:'chunk_to_compress_2');
_timescaledb_internal._hyper_3_3_chunk
(1 row)

-- should have 2 compressed rows
-- select numrows_pre_compression, numrows_post_compression from _timescaledb_catalog.compression_chunk_size ccs
-- join compressed_chunk_info_view v on ccs.chunk_id = v.chunk_id where v.compressed_chunk_schema || '.' || v.compressed_chunk_name
-- = :'chunk_to_compress_2';
-- stats are no longer updated during segmentwise recompression
select * from compression_rowcnt_view where chunk_name = :'chunk_to_compress_2';
numrows_pre_compression | numrows_post_compression | chunk_name | chunk_id
-------------------------+--------------------------+----------------------------------------+----------
Expand Down Expand Up @@ -160,11 +157,11 @@ select * from :chunk_to_compress_2 ORDER BY a, c, time DESC;
Sun Jan 01 11:56:20.048355 2023 PST | 3 | | 3
(4 rows)

-- should still have 2 compressed rows
-- stats are no longer updated during segmentwise recompression
select * from compression_rowcnt_view where chunk_name = :'chunk_to_compress_2';
numrows_pre_compression | numrows_post_compression | chunk_name | chunk_id
-------------------------+--------------------------+----------------------------------------+----------
4 | 2 | _timescaledb_internal._hyper_3_3_chunk | 3
3 | 2 | _timescaledb_internal._hyper_3_3_chunk | 3
(1 row)

----------------- more than one batch per segment ----------------------
Expand Down Expand Up @@ -205,6 +202,7 @@ select ctid, * from :compressed_chunk_name_2;
(9 rows)

-- after compression
-- stats are no longer updated during segmentwise recompression
select * from compression_rowcnt_view where chunk_name = :'chunk_to_compress_2';
numrows_pre_compression | numrows_post_compression | chunk_name | chunk_id
-------------------------+--------------------------+----------------------------------------+----------
Expand All @@ -231,11 +229,11 @@ select ctid, * from :compressed_chunk_name_2;
(0,12) | 882 | 30 | 0 | 0 | Sat Dec 31 16:00:00 2022 PST | Sat Dec 31 23:20:00 2022 PST | BAAAApQnSNVgAP//////4XuAAAADcgAAAAQAAAAAAADf7gAFKFrcytAAAAUoWuBeVv8AADbgAAAAAAMZdQAAPQkA |
(9 rows)

-- after recompression
-- stats are no longer updated during segmentwise recompression
select * from compression_rowcnt_view where chunk_name = :'chunk_to_compress_2';
numrows_pre_compression | numrows_post_compression | chunk_name | chunk_id
-------------------------+--------------------------+----------------------------------------+----------
8644 | 9 | _timescaledb_internal._hyper_5_5_chunk | 5
8643 | 9 | _timescaledb_internal._hyper_5_5_chunk | 5
(1 row)

-- failing test from compression_ddl
Expand All @@ -251,6 +249,7 @@ ALTER TABLE test_defaults SET (timescaledb.compress,timescaledb.compress_segment
INSERT INTO test_defaults SELECT '2000-01-01', 1;
INSERT INTO test_defaults SELECT '2001-01-01', 1;
SELECT compress_chunk(show_chunks) AS "compressed_chunk" FROM show_chunks('test_defaults') ORDER BY show_chunks::text LIMIT 1 \gset
-- stats are no longer updated during segmentwise recompression
select * from compression_rowcnt_view where chunk_name = :'compressed_chunk';
numrows_pre_compression | numrows_post_compression | chunk_name | chunk_id
-------------------------+--------------------------+----------------------------------------+----------
Expand Down Expand Up @@ -296,12 +295,11 @@ SELECT * FROM test_defaults ORDER BY 1,2;
Mon Jan 01 00:00:00 2001 PST | 1 | | 42
(3 rows)

-- here we will have an additional compressed row after recompression because the new
-- data corresponds to a new segment
-- stats are no longer updated during segmentwise recompression
select * from compression_rowcnt_view where chunk_name = :'compressed_chunk';
numrows_pre_compression | numrows_post_compression | chunk_name | chunk_id
-------------------------+--------------------------+----------------------------------------+----------
2 | 2 | _timescaledb_internal._hyper_7_7_chunk | 7
1 | 1 | _timescaledb_internal._hyper_7_7_chunk | 7
(1 row)

-- test prepared statements
Expand Down
22 changes: 7 additions & 15 deletions tsl/test/isolation/expected/compression_recompress.out
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
Parsed test spec with 3 sessions

starting permutation: s2_block_on_compressed_chunk_size s1_begin s1_recompress_chunk s2_select_from_compressed_chunk s2_wait_for_select_to_finish s2_unblock s1_rollback
step s2_block_on_compressed_chunk_size:
BEGIN;
LOCK TABLE _timescaledb_catalog.compression_chunk_size;

starting permutation: s1_begin s1_recompress_chunk s2_select_from_compressed_chunk s2_wait_for_select_to_finish s1_rollback
step s1_begin:
BEGIN;

step s1_recompress_chunk:
SELECT count(_timescaledb_functions.recompress_chunk_segmentwise(i)) AS recompress
FROM show_chunks('sensor_data') i
LIMIT 1;
<waiting ...>

recompress
----------
1
(1 row)

step s2_select_from_compressed_chunk:
SELECT sum(temperature) > 1 FROM sensor_data WHERE sensor_id = 2;

Expand All @@ -23,15 +24,6 @@ t

step s2_wait_for_select_to_finish:

step s2_unblock:
ROLLBACK;

step s1_recompress_chunk: <... completed>
recompress
----------
1
(1 row)

step s1_rollback:
ROLLBACK;

Expand Down
11 changes: 1 addition & 10 deletions tsl/test/isolation/specs/compression_recompress.spec
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,6 @@ step "s1_decompress" {
}

session "s2"
## locking up the catalog table will block the recompression from releasing the index lock
## we should not be deadlocking since the index lock has been reduced to ExclusiveLock
step "s2_block_on_compressed_chunk_size" {
BEGIN;
LOCK TABLE _timescaledb_catalog.compression_chunk_size;
}
step "s2_unblock" {
ROLLBACK;
}
step "s2_select_from_compressed_chunk" {
SELECT sum(temperature) > 1 FROM sensor_data WHERE sensor_id = 2;
}
Expand All @@ -106,6 +97,6 @@ step "s3_release_chunk_insert" {
}


permutation "s2_block_on_compressed_chunk_size" "s1_begin" "s1_recompress_chunk" "s2_select_from_compressed_chunk" "s2_wait_for_select_to_finish" "s2_unblock" "s1_rollback"
permutation "s1_begin" "s1_recompress_chunk" "s2_select_from_compressed_chunk" "s2_wait_for_select_to_finish" "s1_rollback"

permutation "s1_compress" "s3_block_chunk_insert" "s2_insert" "s1_decompress" "s1_compress" "s3_release_chunk_insert"
14 changes: 6 additions & 8 deletions tsl/test/sql/recompress_chunk_segmentwise.sql
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ select show_chunks as chunk_to_compress_2 from show_chunks('mytab_twoseg') limit

select compress_chunk(:'chunk_to_compress_2');

-- should have 2 compressed rows
-- select numrows_pre_compression, numrows_post_compression from _timescaledb_catalog.compression_chunk_size ccs
-- join compressed_chunk_info_view v on ccs.chunk_id = v.chunk_id where v.compressed_chunk_schema || '.' || v.compressed_chunk_name
-- = :'chunk_to_compress_2';
-- stats are no longer updated during segmentwise recompression
select * from compression_rowcnt_view where chunk_name = :'chunk_to_compress_2';

insert into mytab_twoseg values ('2023-01-01 19:56:20.048355+02'::timestamptz, 2, NULL, 2);
Expand All @@ -103,7 +100,7 @@ select ctid, * from :compressed_chunk_name_2;
-- verify that initial data is returned as expected
select * from :chunk_to_compress_2 ORDER BY a, c, time DESC;

-- should still have 2 compressed rows
-- stats are no longer updated during segmentwise recompression
select * from compression_rowcnt_view where chunk_name = :'chunk_to_compress_2';

----------------- more than one batch per segment ----------------------
Expand All @@ -129,12 +126,13 @@ select show_chunks('mytab2') as chunk_to_compress_2 \gset

select ctid, * from :compressed_chunk_name_2;
-- after compression
-- stats are no longer updated during segmentwise recompression
select * from compression_rowcnt_view where chunk_name = :'chunk_to_compress_2';

select _timescaledb_functions.recompress_chunk_segmentwise(:'chunk_to_compress_2');

select ctid, * from :compressed_chunk_name_2;
-- after recompression
-- stats are no longer updated during segmentwise recompression
select * from compression_rowcnt_view where chunk_name = :'chunk_to_compress_2';

-- failing test from compression_ddl
Expand All @@ -149,6 +147,7 @@ INSERT INTO test_defaults SELECT '2001-01-01', 1;

SELECT compress_chunk(show_chunks) AS "compressed_chunk" FROM show_chunks('test_defaults') ORDER BY show_chunks::text LIMIT 1 \gset

-- stats are no longer updated during segmentwise recompression
select * from compression_rowcnt_view where chunk_name = :'compressed_chunk';

SELECT * FROM test_defaults ORDER BY 1;
Expand All @@ -162,8 +161,7 @@ SELECT * FROM test_defaults ORDER BY 1,2;

SELECT compress_chunk(:'compressed_chunk');
SELECT * FROM test_defaults ORDER BY 1,2;
-- here we will have an additional compressed row after recompression because the new
-- data corresponds to a new segment
-- stats are no longer updated during segmentwise recompression
select * from compression_rowcnt_view where chunk_name = :'compressed_chunk';

-- test prepared statements
Expand Down

0 comments on commit 8a40e55

Please sign in to comment.