Skip to content

Commit

Permalink
Modify hypertable catalog update API calls
Browse files Browse the repository at this point in the history
Use the same logic as PR 6773 while updating hypertable catalog tuples.
 PR 6773 addresses chunk catalog updates. We first lock the tuple and
then modify the values and update the locked tuple. Replace
ts_hypertable_update with field specific APIs and use
hypertable_update_catalog_tuple calls consistently.
  • Loading branch information
gayyappan committed Apr 16, 2024
1 parent ecf6bea commit 07bf49a
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 90 deletions.
2 changes: 1 addition & 1 deletion src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -4655,7 +4655,7 @@ add_foreign_table_as_chunk(Oid relid, Hypertable *parent_ht)
parent_ht->fd.status =
ts_set_flags_32(parent_ht->fd.status,
HYPERTABLE_STATUS_OSM | HYPERTABLE_STATUS_OSM_CHUNK_NONCONTIGUOUS);
ts_hypertable_update(parent_ht);
ts_hypertable_update_status_osm(parent_ht);
}

void
Expand Down
6 changes: 1 addition & 5 deletions src/chunk_adaptive.c
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,6 @@ ts_chunk_adaptive_set(PG_FUNCTION_ARGS)
Cache *hcache;
HeapTuple tuple;
TupleDesc tupdesc;
CatalogSecurityContext sec_ctx;
Datum values[2];
bool nulls[2] = { false, false };

Expand Down Expand Up @@ -793,10 +792,7 @@ ts_chunk_adaptive_set(PG_FUNCTION_ARGS)

/* Update the hypertable entry */
ht->fd.chunk_target_size = info.target_size_bytes;
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_hypertable_update(ht);
ts_catalog_restore_user(&sec_ctx);

ts_hypertable_update_chunk_sizing(ht);
ts_cache_release(hcache);

tuple = heap_form_tuple(tupdesc, values, nulls);
Expand Down
245 changes: 162 additions & 83 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -403,59 +403,95 @@ hypertable_scan_limit_internal(ScanKeyData *scankey, int num_scankeys, int index
return ts_scanner_scan(&scanctx);
}

static ScanTupleResult
hypertable_tuple_update(TupleInfo *ti, void *data)
/* update the tuple at this tid. The assumption is that we already hold a
* tuple exclusive lock and no other transaction can modify this tuple
* The sequence of operations for any update is:
* lock the tuple using lock_hypertable_tuple.
* then update the required fields
* call hypertable_update_catalog_tuple to complete the update.
* This ensures correct tuple locking and tuple updates in the presence of
* concurrent transactions. Failure to follow this results in catalog corruption
*/
static void
hypertable_update_catalog_tuple(ItemPointer tid, FormData_hypertable *update)
{
Hypertable *ht = data;
HeapTuple new_tuple;
CatalogSecurityContext sec_ctx;
Catalog *catalog = ts_catalog_get();
Oid table = catalog_get_table_id(catalog, HYPERTABLE);
Relation hypertable_rel = relation_open(table, RowExclusiveLock);

if (OidIsValid(ht->chunk_sizing_func))
{
const Dimension *dim = ts_hyperspace_get_dimension(ht->space, DIMENSION_TYPE_OPEN, 0);
ChunkSizingInfo info = {
.table_relid = ht->main_table_relid,
.colname = dim == NULL ? NULL : NameStr(dim->fd.column_name),
.func = ht->chunk_sizing_func,
};

ts_chunk_adaptive_sizing_info_validate(&info);

namestrcpy(&ht->fd.chunk_sizing_func_schema, NameStr(info.func_schema));
namestrcpy(&ht->fd.chunk_sizing_func_name, NameStr(info.func_name));
}
else
elog(ERROR, "chunk sizing function cannot be NULL");

new_tuple = hypertable_formdata_make_tuple(&ht->fd, ts_scanner_get_tupledesc(ti));
new_tuple = hypertable_formdata_make_tuple(update, hypertable_rel->rd_att);

ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
ts_catalog_update_tid(hypertable_rel, tid, new_tuple);
ts_catalog_restore_user(&sec_ctx);
heap_freetuple(new_tuple);
return SCAN_DONE;
relation_close(hypertable_rel, NoLock);
}

int
ts_hypertable_update(Hypertable *ht)
static bool
lock_hypertable_tuple(int32 htid, ItemPointer tid, FormData_hypertable *form)
{
ScanKeyData scankey[1];
bool success = false;
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
};
ScanIterator iterator = ts_scan_iterator_create(HYPERTABLE, RowShareLock, CurrentMemoryContext);
iterator.ctx.index = catalog_get_index(ts_catalog_get(), HYPERTABLE, HYPERTABLE_ID_INDEX);
iterator.ctx.tuplock = &scantuplock;
/* Keeping the lock since we presumably want to update the tuple */
iterator.ctx.flags = SCANNER_F_KEEPLOCK;

ScanKeyInit(&scankey[0],
Anum_hypertable_pkey_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(ht->fd.id));
/* see table_tuple_lock for details about flags that are set in TupleExclusive mode */
scantuplock.lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
if (!IsolationUsesXactSnapshot())
{
/* in read committed mode, we follow all updates to this tuple */
scantuplock.lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
}

return hypertable_scan_limit_internal(scankey,
1,
HYPERTABLE_ID_INDEX,
hypertable_tuple_update,
ht,
1,
RowExclusiveLock,
CurrentMemoryContext,
NULL);
ts_scan_iterator_scan_key_init(&iterator,
Anum_hypertable_pkey_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(htid));

ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
if (ti->lockresult != TM_Ok)
{
if (IsolationUsesXactSnapshot())
{
/* For Repeatable Read and Serializable isolation level report error
* if we cannot lock the tuple
*/
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("unable to lock hypertable catalog tuple, lock result is %d for "
"hypertable "
"ID (%d)",
ti->lockresult,
htid)));
}
}
ts_hypertable_formdata_fill(form, ti);
ItemPointer result_tid = ts_scanner_get_tuple_tid(ti);
tid->ip_blkid = result_tid->ip_blkid;
tid->ip_posid = result_tid->ip_posid;
success = true;
break;
}
ts_scan_iterator_close(&iterator);
return success;
}

int
Expand Down Expand Up @@ -754,25 +790,44 @@ ts_hypertable_reset_associated_schema_name(const char *associated_schema)
int
ts_hypertable_set_name(Hypertable *ht, const char *newname)
{
namestrcpy(&ht->fd.table_name, newname);
FormData_hypertable form;
ItemPointerData tid;
/* lock the tuple entry in the catalog table */
bool found = lock_hypertable_tuple(ht->fd.id, &tid, &form);
Ensure(found, "hypertable id %d not found", ht->fd.id);

return ts_hypertable_update(ht);
namestrcpy(&form.table_name, newname);
hypertable_update_catalog_tuple(&tid, &form);
return true;
}

int
ts_hypertable_set_schema(Hypertable *ht, const char *newname)
{
namestrcpy(&ht->fd.schema_name, newname);
FormData_hypertable form;
ItemPointerData tid;
/* lock the tuple entry in the catalog table */
bool found = lock_hypertable_tuple(ht->fd.id, &tid, &form);
Ensure(found, "hypertable id %d not found", ht->fd.id);

return ts_hypertable_update(ht);
namestrcpy(&form.schema_name, newname);
hypertable_update_catalog_tuple(&tid, &form);
return true;
}

int
ts_hypertable_set_num_dimensions(Hypertable *ht, int16 num_dimensions)
{
FormData_hypertable form;
ItemPointerData tid;
/* lock the tuple entry in the catalog table */
bool found = lock_hypertable_tuple(ht->fd.id, &tid, &form);
Ensure(found, "hypertable id %d not found", ht->fd.id);

Assert(num_dimensions > 0);
ht->fd.num_dimensions = num_dimensions;
return ts_hypertable_update(ht);
form.num_dimensions = num_dimensions;
hypertable_update_catalog_tuple(&tid, &form);
return true;
}

#define DEFAULT_ASSOCIATED_TABLE_PREFIX_FORMAT "_hyper_%d"
Expand Down Expand Up @@ -2217,13 +2272,17 @@ ts_hypertable_set_integer_now_func(PG_FUNCTION_ARGS)
bool
ts_hypertable_set_compressed(Hypertable *ht, int32 compressed_hypertable_id)
{
FormData_hypertable form;
ItemPointerData tid;
/* lock the tuple entry in the catalog table */
bool found = lock_hypertable_tuple(ht->fd.id, &tid, &form);
Ensure(found, "hypertable id %d not found", ht->fd.id);

Assert(!TS_HYPERTABLE_IS_INTERNAL_COMPRESSION_TABLE(ht));
ht->fd.compression_state = HypertableCompressionEnabled;
/* distr. hypertables do not have a internal compression table
* on the access node
*/
ht->fd.compressed_hypertable_id = compressed_hypertable_id;
return ts_hypertable_update(ht) > 0;
form.compression_state = HypertableCompressionEnabled;
form.compressed_hypertable_id = compressed_hypertable_id;
hypertable_update_catalog_tuple(&tid, &form);
return true;
}

/* set compression_state as disabled and remove any
Expand All @@ -2232,10 +2291,17 @@ ts_hypertable_set_compressed(Hypertable *ht, int32 compressed_hypertable_id)
bool
ts_hypertable_unset_compressed(Hypertable *ht)
{
FormData_hypertable form;
ItemPointerData tid;
/* lock the tuple entry in the catalog table */
bool found = lock_hypertable_tuple(ht->fd.id, &tid, &form);
Ensure(found, "hypertable id %d not found", ht->fd.id);

Assert(!TS_HYPERTABLE_IS_INTERNAL_COMPRESSION_TABLE(ht));
ht->fd.compression_state = HypertableCompressionOff;
ht->fd.compressed_hypertable_id = INVALID_HYPERTABLE_ID;
return ts_hypertable_update(ht) > 0;
form.compression_state = HypertableCompressionOff;
form.compressed_hypertable_id = INVALID_HYPERTABLE_ID;
hypertable_update_catalog_tuple(&tid, &form);
return true;
}

bool
Expand Down Expand Up @@ -2400,38 +2466,51 @@ ts_hypertable_has_compression_table(const Hypertable *ht)
* 1. RowShareLock to SELECT for UPDATE
* 2. UPDATE status using RowExclusiveLock
*/
static bool
hypertable_update_status_osm(Hypertable *ht)
bool
ts_hypertable_update_status_osm(Hypertable *ht)
{
bool success = false;
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
};
ScanIterator iterator = ts_scan_iterator_create(HYPERTABLE, RowShareLock, CurrentMemoryContext);
iterator.ctx.index = catalog_get_index(ts_catalog_get(), HYPERTABLE, HYPERTABLE_ID_INDEX);
iterator.ctx.tuplock = &scantuplock;
ts_scan_iterator_scan_key_init(&iterator,
Anum_hypertable_pkey_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(ht->fd.id));
FormData_hypertable form;
ItemPointerData tid;
/* lock the tuple entry in the catalog table */
bool found = lock_hypertable_tuple(ht->fd.id, &tid, &form);
Ensure(found, "hypertable id %d not found", ht->fd.id);

ts_scanner_foreach(&iterator)
if (form.status != ht->fd.status)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
int status;
bool status_isnull;
form.status = ht->fd.status;
hypertable_update_catalog_tuple(&tid, &form);
}
return true;
}

status = DatumGetInt32(slot_getattr(ti->slot, Anum_hypertable_status, &status_isnull));
Assert(!status_isnull);
if (status != ht->fd.status)
{
success = ts_hypertable_update(ht); // get RowExclusiveLock and update here
}
bool
ts_hypertable_update_chunk_sizing(Hypertable *ht)
{
FormData_hypertable form;
ItemPointerData tid;
/* lock the tuple entry in the catalog table */
bool found = lock_hypertable_tuple(ht->fd.id, &tid, &form);
Ensure(found, "hypertable id %d not found", ht->fd.id);

if (OidIsValid(ht->chunk_sizing_func))
{
const Dimension *dim = ts_hyperspace_get_dimension(ht->space, DIMENSION_TYPE_OPEN, 0);
ChunkSizingInfo info = {
.table_relid = ht->main_table_relid,
.colname = dim == NULL ? NULL : NameStr(dim->fd.column_name),
.func = ht->chunk_sizing_func,
};

ts_chunk_adaptive_sizing_info_validate(&info);

namestrcpy(&form.chunk_sizing_func_schema, NameStr(info.func_schema));
namestrcpy(&form.chunk_sizing_func_name, NameStr(info.func_name));
}
ts_scan_iterator_close(&iterator);
return success;
else
elog(ERROR, "chunk sizing function cannot be NULL");
form.chunk_target_size = ht->fd.chunk_target_size;
hypertable_update_catalog_tuple(&tid, &form);
return true;
}

static DimensionSlice *
Expand Down Expand Up @@ -2587,7 +2666,7 @@ ts_hypertable_osm_range_update(PG_FUNCTION_ARGS)
else
ht->fd.status = ts_clear_flags_32(ht->fd.status, HYPERTABLE_STATUS_OSM_CHUNK_NONCONTIGUOUS);

hypertable_update_status_osm(ht);
ts_hypertable_update_status_osm(ht);
ts_cache_release(hcache);

slice->fd.range_start = range_start_internal;
Expand Down
3 changes: 2 additions & 1 deletion src/hypertable.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ extern Hypertable *ts_resolve_hypertable_from_table_or_cagg(Cache *hcache, Oid r
extern int ts_hypertable_scan_with_memory_context(const char *schema, const char *table,
tuple_found_func tuple_found, void *data,
LOCKMODE lockmode, MemoryContext mctx);
extern TSDLLEXPORT int ts_hypertable_update(Hypertable *ht);
extern bool ts_hypertable_update_status_osm(Hypertable *ht);
extern bool ts_hypertable_update_chunk_sizing(Hypertable *ht);
extern int ts_hypertable_set_name(Hypertable *ht, const char *newname);
extern int ts_hypertable_set_schema(Hypertable *ht, const char *newname);
extern int ts_hypertable_set_num_dimensions(Hypertable *ht, int16 num_dimensions);
Expand Down

0 comments on commit 07bf49a

Please sign in to comment.