Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check configuration in alter_job and add_job #2689

Merged
merged 1 commit into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 0 additions & 112 deletions src/bgw/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -921,115 +921,3 @@ ts_bgw_job_insert_relation(Name application_name, Name job_type, Interval *sched
table_close(rel, RowExclusiveLock);
return values[AttrNumberGetAttrOffset(Anum_bgw_job_id)];
}

/* This function only updates the fields modifiable with alter_job. */
static ScanTupleResult
bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data)
{
BgwJob *updated_job = (BgwJob *) data;
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
HeapTuple new_tuple;

Datum values[Natts_bgw_job] = { 0 };
bool isnull[Natts_bgw_job] = { 0 };
bool repl[Natts_bgw_job] = { 0 };

Datum old_schedule_interval =
slot_getattr(ti->slot, Anum_bgw_job_schedule_interval, &isnull[0]);
Assert(!isnull[0]);

ts_bgw_job_permission_check(updated_job);

/* when we update the schedule interval, modify the next start time as well*/
if (!DatumGetBool(DirectFunctionCall2(interval_eq,
old_schedule_interval,
IntervalPGetDatum(&updated_job->fd.schedule_interval))))
{
BgwJobStat *stat = ts_bgw_job_stat_find(updated_job->fd.id);

if (stat != NULL)
{
TimestampTz next_start = DatumGetTimestampTz(
DirectFunctionCall2(timestamptz_pl_interval,
TimestampTzGetDatum(stat->fd.last_finish),
IntervalPGetDatum(&updated_job->fd.schedule_interval)));
/* allow DT_NOBEGIN for next_start here through allow_unset=true in the case that
* last_finish is DT_NOBEGIN,
* This means the value is counted as unset which is what we want */
ts_bgw_job_stat_update_next_start(updated_job->fd.id, next_start, true);
}
values[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] =
IntervalPGetDatum(&updated_job->fd.schedule_interval);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_schedule_interval)] = true;
}

values[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] =
IntervalPGetDatum(&updated_job->fd.max_runtime);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_runtime)] = true;

values[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] =
Int32GetDatum(updated_job->fd.max_retries);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_max_retries)] = true;

values[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] =
IntervalPGetDatum(&updated_job->fd.retry_period);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_retry_period)] = true;

values[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] =
BoolGetDatum(updated_job->fd.scheduled);
repl[AttrNumberGetAttrOffset(Anum_bgw_job_scheduled)] = true;

repl[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;
if (updated_job->fd.config)
values[AttrNumberGetAttrOffset(Anum_bgw_job_config)] =
JsonbPGetDatum(updated_job->fd.config);
else
isnull[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;

new_tuple = heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, isnull, repl);

ts_catalog_update(ti->scanrel, new_tuple);

heap_freetuple(new_tuple);
if (should_free)
heap_freetuple(tuple);

return SCAN_DONE;
}

/*
* Overwrite job with specified job_id with the given fields
*
* This function only updates the fields modifiable with alter_job.
*/
void
ts_bgw_job_update_by_id(int32 job_id, BgwJob *job)
{
ScanKeyData scankey[1];
Catalog *catalog = ts_catalog_get();
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
.lockflags = 0 /* don't follow updates, used in PG12 */
};
ScannerCtx scanctx = { .table = catalog_get_table_id(catalog, BGW_JOB),
.index = catalog_get_index(catalog, BGW_JOB, BGW_JOB_PKEY_IDX),
.nkeys = 1,
.scankey = scankey,
.data = job,
.limit = 1,
.tuple_found = bgw_job_tuple_update_by_id,
.lockmode = RowExclusiveLock,
.scandirection = ForwardScanDirection,
.result_mctx = CurrentMemoryContext,
.tuplock = &scantuplock };

ScanKeyInit(&scankey[0],
Anum_bgw_job_pkey_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(job_id));

ts_scanner_scan(&scanctx);
}
2 changes: 0 additions & 2 deletions src/bgw/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ extern TSDLLEXPORT int32 ts_bgw_job_insert_relation(Name application_name, Name
Interval *retry_period, Name proc_schema,
Name proc_name, Name owner, bool scheduled,
int32 hypertable_id, Jsonb *config);
extern TSDLLEXPORT void ts_bgw_job_update_by_id(int32 job_id, BgwJob *updated_job);

extern TSDLLEXPORT void ts_bgw_job_permission_check(BgwJob *job);

extern TSDLLEXPORT void ts_bgw_job_validate_job_owner(Oid owner);
Expand Down
Loading