Skip to content

Commit

Permalink
Fixes according to review comments.
Browse files Browse the repository at this point in the history
Ensuring that we create an active snapshot in `job_execute` regardless
of whether a transaction is open or not and removing the corresponding
code from the execution functions for the individual policy functions.
  • Loading branch information
mkindahl committed Dec 1, 2020
1 parent 30dfec0 commit 8243056
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 37 deletions.
2 changes: 1 addition & 1 deletion scripts/clang_format_wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ echo "formatting"

cd ${TEMP_DIR}

clang-format ${CLANG_FORMAT_FLAGS} ${FILE_NAMES}
clang-format-8 ${CLANG_FORMAT_FLAGS} ${FILE_NAMES}

cd ${CURR_DIR}

Expand Down
45 changes: 14 additions & 31 deletions tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,24 +256,14 @@ get_open_dimension_for_hypertable(Hypertable *ht)
bool
policy_retention_execute(int32 job_id, Jsonb *config)
{
bool started = false;
PolicyRetentionData policy_data;

if (!ActiveSnapshotSet())
{
started = true;
PushActiveSnapshot(GetTransactionSnapshot());
}

policy_retention_read_and_validate_config(config, &policy_data);

chunk_invoke_drop_chunks(policy_data.object_relid,
policy_data.boundary,
policy_data.boundary_type);

if (started)
PopActiveSnapshot();

return true;
}

Expand Down Expand Up @@ -324,8 +314,6 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
{
PolicyContinuousAggData policy_data;

if (!ActiveSnapshotSet())
PushActiveSnapshot(GetTransactionSnapshot());
policy_refresh_cagg_read_and_validate_config(config, &policy_data);
elog(LOG,
"refresh continuous aggregate range %s , %s",
Expand All @@ -334,6 +322,7 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
ts_internal_to_time_string(policy_data.refresh_window.end,
policy_data.refresh_window.type));
continuous_agg_refresh_internal(policy_data.cagg, &policy_data.refresh_window, false);

return true;
}

Expand Down Expand Up @@ -374,17 +363,10 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
bool
policy_compression_execute(int32 job_id, Jsonb *config)
{
bool started = false;
int32 chunkid;
Dimension *dim;
PolicyCompressionData policy_data;

if (!ActiveSnapshotSet())
{
started = true;
PushActiveSnapshot(GetTransactionSnapshot());
}

policy_compression_read_and_validate_config(config, &policy_data);
dim = hyperspace_get_open_dimension(policy_data.hypertable->space, 0);
chunkid = get_chunk_to_compress(dim, config);
Expand Down Expand Up @@ -412,9 +394,6 @@ policy_compression_execute(int32 job_id, Jsonb *config)

ts_cache_release(policy_data.hcache);

if (started)
PopActiveSnapshot();

elog(DEBUG1, "job %d completed compressing chunk", job_id);
return true;
}
Expand Down Expand Up @@ -468,7 +447,8 @@ bool
job_execute(BgwJob *job)
{
Const *arg1, *arg2;
bool started = false;
bool transaction_started = false;
bool pushed_snapshot = false;
char prokind;
Oid proc;
Oid proc_args[] = { INT4OID, JSONBOID };
Expand All @@ -478,9 +458,14 @@ job_execute(BgwJob *job)

if (!IsTransactionOrTransactionBlock())
{
started = true;
transaction_started = true;
StartTransactionCommand();
/* executing sql functions requires snapshot */
}

/* executing sql functions requires snapshot. */
if (!ActiveSnapshotSet())
{
pushed_snapshot = true;
PushActiveSnapshot(GetTransactionSnapshot());
}

Expand Down Expand Up @@ -525,13 +510,11 @@ job_execute(BgwJob *job)
break;
}

if (started)
{
/* if job does its own transaction handling it might not have set a snapshot */
if (ActiveSnapshotSet())
PopActiveSnapshot();
if (pushed_snapshot && ActiveSnapshotSet())
PopActiveSnapshot();

if (transaction_started)
CommitTransactionCommand();
}

return true;
}
1 change: 0 additions & 1 deletion tsl/src/bgw_policy/job_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ ts_bgw_job_update_by_id(int32 job_id, BgwJob *job)
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
.lockflags = 0 /* don't follow updates, used in PG12 */
};
ScannerCtx scanctx = { .table = catalog_get_table_id(catalog, BGW_JOB),
.index = catalog_get_index(catalog, BGW_JOB, BGW_JOB_PKEY_IDX),
Expand Down
11 changes: 9 additions & 2 deletions tsl/test/expected/continuous_aggs_errors.out
Original file line number Diff line number Diff line change
Expand Up @@ -508,23 +508,30 @@ ERROR: cannot refresh continuous aggregate on chunk from different hypertable
DETAIL: The the continuous aggregate is defined on hypertable "conditions", while chunk is from hypertable "measurements". The continuous aggregate can be refreshed only on a chunk from the same hypertable.
\set ON_ERROR_STOP 1
\set VERBOSITY terse
-- Add a continuous aggregate on the measurements table and a policy.
-- Add a continuous aggregate on the measurements table and a policy
-- to be able to test error cases for the add_job function.
CREATE MATERIALIZED VIEW measurements_summary WITH (timescaledb.continuous) AS
SELECT time_bucket('1 day', time), COUNT(time)
FROM measurements
GROUP BY 1 WITH NO DATA;
-- First test that add_job checks the config. These should all generate errors.
-- First test that add_job checks the config. It is currently possible
-- to add non-custom jobs using the add_job function so we need to
-- test that the function actually checks the config parameters. These
-- should all generate errors, for different reasons...
\set ON_ERROR_STOP 0
-- ... this one because it is missing a field.
SELECT add_job(
'_timescaledb_internal.policy_refresh_continuous_aggregate'::regproc,
'1 hour'::interval,
config => '{"end_offset": null, "start_offset": null}');
ERROR: could not find "mat_hypertable_id" in config for job
-- ... this one because it has a bad value for start_offset
SELECT add_job(
'_timescaledb_internal.policy_refresh_continuous_aggregate'::regproc,
'1 hour'::interval,
config => '{"end_offset": null, "start_offset": "1 fortnight", "mat_hypertable_id": 11}');
ERROR: invalid input syntax for type interval: "1 fortnight"
-- ... this one because it has a bad value for end_offset
SELECT add_job(
'_timescaledb_internal.policy_refresh_continuous_aggregate'::regproc,
'1 hour'::interval,
Expand Down
11 changes: 9 additions & 2 deletions tsl/test/sql/continuous_aggs_errors.sql
Original file line number Diff line number Diff line change
Expand Up @@ -480,22 +480,29 @@ SELECT _timescaledb_internal.refresh_continuous_aggregate(
\set ON_ERROR_STOP 1
\set VERBOSITY terse

-- Add a continuous aggregate on the measurements table and a policy.
-- Add a continuous aggregate on the measurements table and a policy
-- to be able to test error cases for the add_job function.
CREATE MATERIALIZED VIEW measurements_summary WITH (timescaledb.continuous) AS
SELECT time_bucket('1 day', time), COUNT(time)
FROM measurements
GROUP BY 1 WITH NO DATA;

-- First test that add_job checks the config. These should all generate errors.
-- First test that add_job checks the config. It is currently possible
-- to add non-custom jobs using the add_job function so we need to
-- test that the function actually checks the config parameters. These
-- should all generate errors, for different reasons...
\set ON_ERROR_STOP 0
-- ... this one because it is missing a field.
SELECT add_job(
'_timescaledb_internal.policy_refresh_continuous_aggregate'::regproc,
'1 hour'::interval,
config => '{"end_offset": null, "start_offset": null}');
-- ... this one because it has a bad value for start_offset
SELECT add_job(
'_timescaledb_internal.policy_refresh_continuous_aggregate'::regproc,
'1 hour'::interval,
config => '{"end_offset": null, "start_offset": "1 fortnight", "mat_hypertable_id": 11}');
-- ... this one because it has a bad value for end_offset
SELECT add_job(
'_timescaledb_internal.policy_refresh_continuous_aggregate'::regproc,
'1 hour'::interval,
Expand Down

0 comments on commit 8243056

Please sign in to comment.