Skip to content

Commit

Permalink
Simple prototype for hash code parallel copy, minimal gains past 2 wo…
Browse files Browse the repository at this point in the history
…rkers currently
  • Loading branch information
nathanhjli committed Mar 23, 2022
1 parent b1c4ddc commit 5b1788c
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 53 deletions.
272 changes: 219 additions & 53 deletions src/postgres/src/backend/commands/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "catalog/yb_catalog_version.h"
#include "catalog/yb_type.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "commands/trigger.h"
Expand Down Expand Up @@ -301,6 +302,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
int yb_default_copy_from_rows_per_transaction = DEFAULT_BATCH_ROWS_PER_TRANSACTION;

/* non-export function prototypes */
static uint16 GetTupleHashCode(ListCell *cur, List *hashColumns, TupleDesc tupDesc,
Datum *values, bool *nulls);
static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
RawStmt *raw_query, Oid queryRelId, List *attnamelist,
List *options);
Expand Down Expand Up @@ -354,6 +357,74 @@ static void CopySendInt16(CopyState cstate, int16 val);
static bool CopyGetInt16(CopyState cstate, int16 *val);


/*
* Get hash code given tuple values and nulls.
*/
static uint16
GetTupleHashCode(ListCell *cur, List *hashColumns, TupleDesc tupDesc, Datum *values, bool *nulls)
{
char *arg_buf;
size_t size = 0;

foreach(cur, hashColumns)
{
int index = lfirst_int(cur);
Oid argtype = TupleDescAttr(tupDesc, index)->atttypid;
if (unlikely(argtype == UNKNOWNOID))
{
ereport(ERROR,
(errcode(ERRCODE_INDETERMINATE_DATATYPE),
errmsg("undefined datatype given to yb_hash_code")));
}

size_t typesize;
const YBCPgTypeEntity *typeentity =
YbDataTypeFromOidMod(InvalidAttrNumber, argtype);
YBCStatus status = YBCGetDocDBKeySize(values[index], typeentity,
nulls[index], &typesize);
if (unlikely(!YBCStatusIsOK(status)))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Unsupported datatype given to yb_hash_code"),
errdetail("Only types supported by HASH key columns are allowed"),
errhint("Use explicit casts to ensure input types are as desired")));
}
size += typesize;
}

arg_buf = alloca(size);

char *arg_buf_pos = arg_buf;
size_t total_bytes = 0;

foreach(cur, hashColumns)
{
int index = lfirst_int(cur);
Oid argtype = TupleDescAttr(tupDesc, index)->atttypid;
const YBCPgTypeEntity *typeentity =
YbDataTypeFromOidMod(InvalidAttrNumber, argtype);
size_t written;
YBCStatus status = YBCAppendDatumToKey(values[index], typeentity,
nulls[index], arg_buf_pos, &written);
if (unlikely(!YBCStatusIsOK(status)))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Unsupported datatype given to yb_hash_code"),
errdetail("Only types supported by HASH key columns are allowed"),
errhint("Use explicit casts to ensure input types are as desired")));
}
arg_buf_pos += written;

total_bytes += written;
}

uint16_t hash_code = YBCCompoundHash(arg_buf, total_bytes);

return hash_code;
}

/*
* Send copy start/stop messages for frontend copies. These have changed
* in past protocol redesigns.
Expand Down Expand Up @@ -1223,41 +1294,93 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
bool *nulls = (bool *) palloc(cstate->rel->rd_att->natts * sizeof(bool));
bool has_more_tuples = true;

/*
* Get columns associated with hash.
*/
Oid dboid = YBCGetDatabaseOid(cstate->rel);
YBCPgTableDesc ybc_table_desc = NULL;
TupleDesc tupDesc = RelationGetDescr(cstate->rel);

HandleYBStatus(YBCPgGetTableDesc(dboid, YbGetStorageRelid(cstate->rel), &ybc_table_desc));

List *attnums = cstate->attnumlist;
ListCell *cur;
List *hashColumns = NULL;

foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
YBCPgColumnInfo column_info = {0};
HandleYBTableDescStatus(YBCPgGetColumnInfo(ybc_table_desc, attnum, &column_info),
ybc_table_desc);
if (column_info.is_hash)
{
hashColumns = lappend_int(hashColumns, attnum-1);
}
}

/*
* Estimating the maximum number of tuples we can possibly store.
* Actual number of tuples will be smaller since we don't consider the header.
*/
int max_num_tuples = tuplespace_size / HEAPTUPLESIZE;
HeapTuple tuple;
HeapTuple tuples[max_num_tuples];
HeapTuple tuples[nworkers_ready][max_num_tuples];
HeapTuple tuple_address;
int num_tuples = 0;
int hash_range = MAX_HASH_CODE / nworkers_ready;

/*
* Currently, we will just iterate through workers.
* When hash code partitioning is added, we will set cur_worker_num based on the
* calculated hash code for the tuple.
*/
Size tuples_size = 0;
bool has_written[cstate->num_workers];
for (int i = 0; i < cstate->num_workers; i++) {
Size tuples_size[nworkers_ready];
bool has_written[nworkers_ready];
int tuples_indexes[nworkers_ready];
for (int i = 0; i < nworkers_ready; i++)
{
has_written[i] = false;
tuples_indexes[i] = 0;
tuples_size[i] = 0;
}
int cur_worker_num = first_ready_worker;

/* Need to consider if some workers fail to launch. */
shm_mq_handle *ready_inputs[nworkers_ready];
shm_mq_handle *ready_outputs[nworkers_ready];
HeapTuple ready_tuple_spaces[nworkers_ready];
int j = 0;
for (int i = 0; i < cstate->num_workers; i++) {
if (worker_ready[i]) {
ready_inputs[j] = inputs[i];
ready_outputs[j] = outputs[i];
ready_tuple_spaces[j] = tuple_spaces[i];
j++;
}
}
int cur_worker_num = 0;

while (has_more_tuples)
{
CHECK_FOR_INTERRUPTS();

has_more_tuples = NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid);
if (has_more_tuples)
{
/* Calculate the hash code for the tuple. */
uint16 hash_code = GetTupleHashCode(cur, hashColumns, tupDesc, values, nulls);

/* Determine which worker we should be writing to, given the hash code. */
cur_worker_num = (hash_code / hash_range) < nworkers_ready
? hash_code / hash_range
: nworkers_ready - 1;

tuple = heap_form_tuple(cstate->rel->rd_att, values, nulls);
if (loaded_oid != InvalidOid)
HeapTupleSetOid(tuple, loaded_oid);
tuple->t_tableOid = RelationGetRelid(cstate->rel);
}

if (!has_more_tuples || tuples_size + tuple->t_len + HEAPTUPLESIZE > tuplespace_size)
if (tuples_size[cur_worker_num] + tuple->t_len + HEAPTUPLESIZE > tuplespace_size)
{
/*
* If we've written to this worker before, we need to check the response before
Expand All @@ -1268,7 +1391,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
shm_mq_result res;
Size nbytes;
void *data;
res = shm_mq_receive(inputs[cur_worker_num], &nbytes, &data, false);
res = shm_mq_receive(ready_inputs[cur_worker_num], &nbytes, &data, false);
if (res == SHM_MQ_SUCCESS)
{
total_processed += *(int *)data;
Expand All @@ -1281,20 +1404,21 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
}

/* If no more space, send the current batch of tuples to the worker. */
tuple_address = tuple_spaces[cur_worker_num];
int num_tuples = tuples_indexes[cur_worker_num];
tuple_address = ready_tuple_spaces[cur_worker_num];
for (int tuple_num = 0; tuple_num < num_tuples; tuple_num++)
{
heap_copytuple_into_shm(tuples[tuple_num], tuple_address);
tuple_address = (HeapTuple) ((char *) tuple_address + HEAPTUPLESIZE + tuples[tuple_num]->t_len);
heap_freetuple(tuples[tuple_num]);
heap_copytuple_into_shm(tuples[cur_worker_num][tuple_num], tuple_address);
tuple_address = (HeapTuple) ((char *) tuple_address + HEAPTUPLESIZE + tuples[cur_worker_num][tuple_num]->t_len);
heap_freetuple(tuples[cur_worker_num][tuple_num]);
}

/* Signal to background worker that tuples are written. */
/* Main process will send number of tuples sent through the queue. */
/* Background worker will return the number of tuples processed through the quuee. */
/* Each pcxt will have one input and one output queue */
shm_mq_result res;
res = shm_mq_send(outputs[cur_worker_num], sizeof(&num_tuples), &num_tuples, false);
res = shm_mq_send(ready_outputs[cur_worker_num], sizeof(&num_tuples), &num_tuples, false);
if (res != SHM_MQ_SUCCESS)
{
ereport(LOG, (errmsg("Send was not successful to worker \"%d\".", cur_worker_num)));
Expand All @@ -1304,56 +1428,98 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
has_written[cur_worker_num] = true;

/* Reset for next batch. */
num_tuples = 0;
tuples_size = 0;

/* Worker numbers will go from 0 to cstate->num_workers - 1. */
/* Handle case where some workers may have failed to launch. */
int next_worker_num = cur_worker_num;
do {
next_worker_num = next_worker_num + 1 < cstate->num_workers
? next_worker_num + 1
: 0;
}
while (!worker_ready[next_worker_num]);
cur_worker_num = next_worker_num;

ereport(LOG, (errmsg("Total processed is \"%lu\"", total_processed)));
tuples_indexes[cur_worker_num] = 0;
tuples_size[cur_worker_num] = 0;
}

if (!has_more_tuples)
if (!has_more_tuples) {
/* No more tuples, we need to send all remaining stored tuples. */
for (int worker_num = 0; worker_num < nworkers_ready; worker_num++) {
if (tuples_indexes[worker_num] > 0) {
/*
* If we've written to this worker before, we need to check the response before
* sending the next batch.
*/
if (has_written[worker_num])
{
shm_mq_result res;
Size nbytes;
void *data;
res = shm_mq_receive(ready_inputs[worker_num], &nbytes, &data, false);
if (res == SHM_MQ_SUCCESS)
{
total_processed += *(int *)data;
ereport(LOG, (errmsg("Recieve successful, worker \"%d\" processed \"%d\" tuples", worker_num, *(int *)data)));
}
else
{
ereport(ERROR, (errmsg("Main worker could not receive response from background worker \"%d\".", worker_num)));
}
}

/* If no more space, send the current batch of tuples to the worker. */
int num_tuples = tuples_indexes[worker_num];
tuple_address = ready_tuple_spaces[worker_num];
for (int tuple_num = 0; tuple_num < num_tuples; tuple_num++)
{
heap_copytuple_into_shm(tuples[worker_num][tuple_num], tuple_address);
tuple_address = (HeapTuple) ((char *) tuple_address + HEAPTUPLESIZE + tuples[worker_num][tuple_num]->t_len);
heap_freetuple(tuples[worker_num][tuple_num]);
}

/* Signal to background worker that tuples are written. */
/* Main process will send number of tuples sent through the queue. */
/* Background worker will return the number of tuples processed through the quuee. */
/* Each pcxt will have one input and one output queue */
shm_mq_result res;
res = shm_mq_send(ready_outputs[worker_num], sizeof(&num_tuples), &num_tuples, false);
if (res != SHM_MQ_SUCCESS)
{
ereport(LOG, (errmsg("Send was not successful to worker \"%d\".", worker_num)));
}
ereport(LOG, (errmsg("Send successful, main sent \"%d\" tuple to worker \"%d\".", num_tuples, worker_num)));
}
}
break;
}

tuples[num_tuples++] = tuple;
tuples_size += tuple->t_len + HEAPTUPLESIZE;
tuples[cur_worker_num][tuples_indexes[cur_worker_num]++] = tuple;
tuples_size[cur_worker_num] += tuple->t_len + HEAPTUPLESIZE;
}

/* One final receive and send to every worker that there are no more tuples so they stop waiting. */
for (int worker_num = 0; worker_num < cstate->num_workers; worker_num++) {
if (worker_ready[worker_num]) {
shm_mq_result res;
Size nbytes;
void *data;
if (has_written[worker_num]) {
res = shm_mq_receive(inputs[worker_num], &nbytes, &data, false);
if (res == SHM_MQ_SUCCESS)
{
total_processed += *(int *)data;
}
else
{
ereport(LOG, (errmsg("Main worker receiving could not receive response from background worker.")));
}
ereport(LOG, (errmsg("Recieve successful, worker \"%d\" processed \"%d\" tuples", worker_num, *(int *)data)));
for (int worker_num = 0; worker_num < nworkers_ready; worker_num++) {
shm_mq_result res;
Size nbytes;
void *data;
if (has_written[worker_num])
{
res = shm_mq_receive(inputs[worker_num], &nbytes, &data, false);
if (res == SHM_MQ_SUCCESS)
{
total_processed += *(int *)data;
}

num_tuples = 0;
res = shm_mq_send(outputs[worker_num], sizeof(&num_tuples), &num_tuples, false);
if (res != SHM_MQ_SUCCESS)
else
{
ereport(LOG, (errmsg("Send was not successful")));
ereport(LOG, (errmsg("Main worker receiving could not receive response from background worker.")));
}
ereport(LOG, (errmsg("Send successful, main sent \"%d\" tuples", num_tuples)));
ereport(LOG, (errmsg("Recieve successful, worker \"%d\" processed \"%d\" tuples", worker_num, *(int *)data)));
}

int num_tuples = 0;
res = shm_mq_send(outputs[worker_num], sizeof(&num_tuples), &num_tuples, false);
if (res != SHM_MQ_SUCCESS)
{
ereport(LOG, (errmsg("Send was not successful")));
}
ereport(LOG, (errmsg("Send successful, main sent \"%d\" tuples", num_tuples)));
}

/* Cleanup. */
for (int worker_num = 0; worker_num < cstate->num_workers; worker_num++)
{
if (worker_ready[worker_num])
{
WaitForParallelWorkersToFinish(pcxts[worker_num]);

shm_mq_detach(inputs[worker_num]);
Expand Down
2 changes: 2 additions & 0 deletions src/postgres/src/include/commands/copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#define DEFAULT_BATCH_ROWS_PER_TRANSACTION 1000
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
#define MIN_HASH_CODE 0
#define MAX_HASH_CODE 65536

/* CopyStateData is private in commands/copy.c */
typedef struct CopyStateData *CopyState;
Expand Down

0 comments on commit 5b1788c

Please sign in to comment.