Skip to content

Commit

Permalink
Refresh correct partial during refresh on drop
Browse files Browse the repository at this point in the history
When drop_chunks is called on a hypertable manually or through
a retention policy, it initiates refresh in hypertable's continuous
aggregates. The refresh includes all buckets of each dropped chunk.
Some buckets are inside a dropped chunk only partially. In such case,
this commit fixes to refresh only the correct partials of the buckets.

The fix passes chunk id to refresh on drop of a chunk. The chunk id is
used to identify and update correct partials. This is a workaround
that the invalidation outside of the refreshed chunk doesn't update
the other partial, which is outside the refreshed chunk.

The commit also adds the test demonstrating correct refresh.

Fixes #2592
  • Loading branch information
k-rus committed Nov 3, 2020
1 parent 9551de0 commit 3e7fa45
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -3231,7 +3231,7 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
* refresh. However, such merging needs to account for
* multi-dimensional tables where some chunks have the same
* primary dimension ranges. */
ts_cm_functions->continuous_agg_refresh_all(ht, start, end);
ts_cm_functions->continuous_agg_refresh_all(ht, start, end, chunks[i].fd.id);

/* Invalidate the dropped region to indicate that it was
* modified. The invalidation will allow the refresh command on a
Expand Down
13 changes: 10 additions & 3 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,14 @@ continuous_agg_update_options_default(ContinuousAgg *cagg, WithClauseResult *wit
}

static void
continuous_agg_invalidate_or_refresh_all_default(const Hypertable *ht, int64 start, int64 end)
continuous_agg_refresh_all_default(const Hypertable *ht, int64 start, int64 end, int32 chunk_id)
{
error_no_default_fn_community();
pg_unreachable();
}

static void
continuous_agg_invalidate_all_default(const Hypertable *ht, int64 start, int64 end)
{
error_no_default_fn_community();
pg_unreachable();
Expand Down Expand Up @@ -322,8 +329,8 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.process_cagg_viewstmt = process_cagg_viewstmt_default,
.continuous_agg_invalidation_trigger = error_no_default_fn_pg_community,
.continuous_agg_refresh = error_no_default_fn_pg_community,
.continuous_agg_refresh_all = continuous_agg_invalidate_or_refresh_all_default,
.continuous_agg_invalidate = continuous_agg_invalidate_or_refresh_all_default,
.continuous_agg_refresh_all = continuous_agg_refresh_all_default,
.continuous_agg_invalidate = continuous_agg_invalidate_all_default,
.continuous_agg_update_options = continuous_agg_update_options_default,

/* compression */
Expand Down
3 changes: 2 additions & 1 deletion src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ typedef struct CrossModuleFunctions
WithClauseResult *with_clause_options);
PGFunction continuous_agg_invalidation_trigger;
PGFunction continuous_agg_refresh;
void (*continuous_agg_refresh_all)(const Hypertable *ht, int64 start, int64 end);
void (*continuous_agg_refresh_all)(const Hypertable *ht, int64 start, int64 end,
int32 chunk_id);
void (*continuous_agg_invalidate)(const Hypertable *ht, int64 start, int64 end);
void (*continuous_agg_update_options)(ContinuousAgg *cagg,
WithClauseResult *with_clause_options);
Expand Down
50 changes: 34 additions & 16 deletions tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,20 @@ static Datum internal_to_time_value_or_infinite(int64 internal, Oid time_type,

static void spi_update_materializations(SchemaAndName partial_view,
SchemaAndName materialization_table, Name time_column_name,
TimeRange invalidation_range);
TimeRange invalidation_range, int32 chunk_id);
static void spi_delete_materializations(SchemaAndName materialization_table, Name time_column_name,
TimeRange invalidation_range);
TimeRange invalidation_range, const char *chunk_condition);
static void spi_insert_materializations(SchemaAndName partial_view,
SchemaAndName materialization_table, Name time_column_name,
TimeRange materialization_range);
TimeRange materialization_range,
const char *chunk_condition);

void
continuous_agg_update_materialization(SchemaAndName partial_view,
SchemaAndName materialization_table, Name time_column_name,
InternalTimeRange new_materialization_range,
InternalTimeRange invalidation_range, int64 bucket_width)
InternalTimeRange invalidation_range, int64 bucket_width,
const int32 chunk_id)
{
InternalTimeRange combined_materialization_range = new_materialization_range;
bool materialize_invalidations_separately = range_length(invalidation_range) > 0;
Expand Down Expand Up @@ -90,19 +92,22 @@ continuous_agg_update_materialization(SchemaAndName partial_view,
materialization_table,
time_column_name,
internal_time_range_to_time_range(
combined_materialization_range));
combined_materialization_range),
chunk_id);
}
else
{
spi_update_materializations(partial_view,
materialization_table,
time_column_name,
internal_time_range_to_time_range(invalidation_range));
internal_time_range_to_time_range(invalidation_range),
chunk_id);

spi_update_materializations(partial_view,
materialization_table,
time_column_name,
internal_time_range_to_time_range(new_materialization_range));
internal_time_range_to_time_range(new_materialization_range),
chunk_id);
}

res = SPI_finish();
Expand Down Expand Up @@ -200,18 +205,28 @@ internal_time_range_to_time_range(InternalTimeRange internal)

static void
spi_update_materializations(SchemaAndName partial_view, SchemaAndName materialization_table,
Name time_column_name, TimeRange invalidation_range)
Name time_column_name, TimeRange invalidation_range,
const int32 chunk_id)
{
spi_delete_materializations(materialization_table, time_column_name, invalidation_range);
StringInfo chunk_condition = makeStringInfo();

if (chunk_id != INVALID_CHUNK_ID)
appendStringInfo(chunk_condition, "AND chunk_id = %d", chunk_id);

spi_delete_materializations(materialization_table,
time_column_name,
invalidation_range,
chunk_condition->data);
spi_insert_materializations(partial_view,
materialization_table,
time_column_name,
invalidation_range);
invalidation_range,
chunk_condition->data);
}

static void
spi_delete_materializations(SchemaAndName materialization_table, Name time_column_name,
TimeRange invalidation_range)
TimeRange invalidation_range, const char *const chunk_condition)
{
int res;
StringInfo command = makeStringInfo();
Expand All @@ -226,13 +241,14 @@ spi_delete_materializations(SchemaAndName materialization_table, Name time_colum

appendStringInfo(command,
"DELETE FROM %s.%s AS D WHERE "
"D.%s >= %s AND D.%s < %s;",
"D.%s >= %s AND D.%s < %s %s;",
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(invalidation_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(invalidation_end));
quote_literal_cstr(invalidation_end),
chunk_condition);

res = SPI_execute_with_args(command->data,
0 /*=nargs*/,
Expand All @@ -247,7 +263,8 @@ spi_delete_materializations(SchemaAndName materialization_table, Name time_colum

static void
spi_insert_materializations(SchemaAndName partial_view, SchemaAndName materialization_table,
Name time_column_name, TimeRange materialization_range)
Name time_column_name, TimeRange materialization_range,
const char *const chunk_condition)
{
int res;
StringInfo command = makeStringInfo();
Expand All @@ -262,15 +279,16 @@ spi_insert_materializations(SchemaAndName partial_view, SchemaAndName materializ

appendStringInfo(command,
"INSERT INTO %s.%s SELECT * FROM %s.%s AS I "
"WHERE I.%s >= %s AND I.%s < %s;",
"WHERE I.%s >= %s AND I.%s < %s %s;",
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),
quote_identifier(NameStr(*partial_view.schema)),
quote_identifier(NameStr(*partial_view.name)),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end));
quote_literal_cstr(materialization_end),
chunk_condition);

res = SPI_execute_with_args(command->data,
0 /*=nargs*/,
Expand Down
4 changes: 2 additions & 2 deletions tsl/src/continuous_aggs/materialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void continuous_agg_update_materialization(SchemaAndName partial_view,
SchemaAndName materialization_table,
Name time_column_name,
InternalTimeRange new_materialization_range,
InternalTimeRange invalidation_range,
int64 bucket_width);
InternalTimeRange invalidation_range, int64 bucket_width,
int32 chunk_id);

#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_MATERIALIZE_H */
22 changes: 13 additions & 9 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ continuous_agg_refresh_init(CaggRefreshState *refresh, const ContinuousAgg *cagg
*/
static void
continuous_agg_refresh_execute(const CaggRefreshState *refresh,
const InternalTimeRange *bucketed_refresh_window)
const InternalTimeRange *bucketed_refresh_window,
const int32 chunk_id)
{
SchemaAndName cagg_hypertable_name = {
.schema = &refresh->cagg_ht->fd.schema_name,
Expand All @@ -257,7 +258,8 @@ continuous_agg_refresh_execute(const CaggRefreshState *refresh,
&time_dim->fd.column_name,
*bucketed_refresh_window,
unused_invalidation_range,
refresh->cagg.data.bucket_width);
refresh->cagg.data.bucket_width,
chunk_id);
}

static void
Expand Down Expand Up @@ -288,7 +290,7 @@ log_refresh_window(int elevel, const ContinuousAgg *cagg, const InternalTimeRang
static void
continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const InvalidationStore *invalidations)
const InvalidationStore *invalidations, const int32 chunk_id)
{
CaggRefreshState refresh;
TupleTableSlot *slot;
Expand Down Expand Up @@ -322,7 +324,7 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
compute_circumscribed_bucketed_refresh_window(&invalidation, cagg->data.bucket_width);

log_refresh_window(DEBUG1, cagg, &bucketed_refresh_window, "invalidation refresh on");
continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window);
continuous_agg_refresh_execute(&refresh, &bucketed_refresh_window, chunk_id);
}

ExecDropSingleTupleTableSlot(slot);
Expand Down Expand Up @@ -397,7 +399,8 @@ emit_up_to_date_notice(const ContinuousAgg *cagg)

static bool
process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window, bool verbose)
const InternalTimeRange *refresh_window, bool verbose,
int32 chunk_id)
{
InvalidationStore *invalidations;
Oid hyper_relid = ts_hypertable_id_to_relid(cagg->data.mat_hypertable_id);
Expand All @@ -423,7 +426,7 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
errhint("Use WITH NO DATA if you do not want to refresh the continuous "
"aggregate on creation.")));
}
continuous_agg_refresh_with_window(cagg, refresh_window, invalidations);
continuous_agg_refresh_with_window(cagg, refresh_window, invalidations, chunk_id);
invalidation_store_free(invalidations);
return true;
}
Expand Down Expand Up @@ -524,7 +527,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
StartTransactionCommand();
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id);

if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, verbose))
if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, verbose, INVALID_CHUNK_ID))
emit_up_to_date_notice(cagg);
}

Expand All @@ -539,7 +542,8 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
* region would work.
*/
void
continuous_agg_refresh_all(const Hypertable *ht, int64 start, int64 end)
continuous_agg_refresh_all(const Hypertable *const ht, const int64 start, const int64 end,
const int32 chunk_id)
{
Catalog *catalog = ts_catalog_get();
List *caggs = ts_continuous_aggs_find_by_raw_table_id(ht->fd.id);
Expand Down Expand Up @@ -572,6 +576,6 @@ continuous_agg_refresh_all(const Hypertable *ht, int64 start, int64 end)
{
const ContinuousAgg *cagg = lfirst(lc);

process_cagg_invalidations_and_refresh(cagg, &refresh_window, false);
process_cagg_invalidations_and_refresh(cagg, &refresh_window, false, chunk_id);
}
}
3 changes: 2 additions & 1 deletion tsl/src/continuous_aggs/refresh.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
extern Datum continuous_agg_refresh(PG_FUNCTION_ARGS);
extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window, bool verbose);
extern void continuous_agg_refresh_all(const Hypertable *ht, int64 start, int64 end);
extern void continuous_agg_refresh_all(const Hypertable *ht, int64 start, int64 end,
int32 chunk_id);

#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_REFRESH_H */
Loading

0 comments on commit 3e7fa45

Please sign in to comment.