Skip to content

Commit

Permalink
Handling cases where some/all background workers fail to start
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanhjli committed Mar 23, 2022
1 parent 7375cf6 commit b1c4ddc
Showing 1 changed file with 75 additions and 32 deletions.
107 changes: 75 additions & 32 deletions src/postgres/src/backend/commands/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,12 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,

Size tuplespace_size = (Size) cstate->shared_memory_size;

int nworkers_ready = 0;
int first_ready_worker = -1;
bool worker_ready[cstate->num_workers];

EnterParallelMode();

for (int worker_num = 0; worker_num < cstate->num_workers; worker_num++) {
ParallelContext *pcxt;
Form_pg_class relform_space;
Expand All @@ -1042,8 +1048,6 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
shm_mq_handle *output;
int queue_size = 1000;

EnterParallelMode();

/* Create a parallel context. */
pcxt = CreateParallelContext("postgres", "ParallelCopyMain", 1, false);

Expand Down Expand Up @@ -1164,7 +1168,19 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,

ereport(LOG, (errmsg("Launching background workers")));
LaunchParallelWorkers(pcxt);
ereport(LOG, (errmsg("Waiting for workers to attach.")));

WaitForParallelWorkersToAttach(pcxt);

/* Even if it's not a proper worker, store it so we can destroy it later. */
pcxts[worker_num] = pcxt;

/* Track background workers that have successfully launched/attached. */
if (pcxt->nknown_attached_workers == 0) {
worker_ready[worker_num] = false;
continue;
}

ereport(LOG, (errmsg("Waiting for workers to attach to our queues.")));
/* Wait for workers to attach. */
for (;;)
{
Expand All @@ -1173,12 +1189,31 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
{
break;
}
/* Need to add handle for if workers failed to start. */
}

pcxts[worker_num] = pcxt;
worker_ready[worker_num] = true;
nworkers_ready++;

if (first_ready_worker < 0) {
first_ready_worker = worker_num;
}
}
ereport(LOG, (errmsg("Workers have attached, beginning tuple writing")));

/* If no workers launched, default back to regular copy. */
if (nworkers_ready == 0) {
elog(LOG,
"No background workers were launched successfully, "
"continuing with default copy.");
*processed = CopyFrom(cstate);
for (int i = 0; i < cstate->num_workers; i++) {
DestroyParallelContext(pcxts[i]);
}
ExitParallelMode();
EndCopyFrom(cstate);
return;
}

ereport(LOG, (errmsg("\"%d\" workers are ready, beginning tuple writing.", nworkers_ready)));

/* Write HeapTuples to shared memory space. */
EState *estate = CreateExecutorState();
Expand Down Expand Up @@ -1208,7 +1243,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
for (int i = 0; i < cstate->num_workers; i++) {
has_written[i] = false;
}
int cur_worker_num = 0;
int cur_worker_num = first_ready_worker;
while (has_more_tuples)
{
CHECK_FOR_INTERRUPTS();
Expand Down Expand Up @@ -1273,10 +1308,15 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
tuples_size = 0;

/* Worker numbers will go from 0 to cstate->num_workers - 1. */
/* Possibly need to consider case where workers fail to launch. */
cur_worker_num = cur_worker_num + 1 < cstate->num_workers
? cur_worker_num + 1
: 0;
/* Handle case where some workers may have failed to launch. */
int next_worker_num = cur_worker_num;
do {
next_worker_num = next_worker_num + 1 < cstate->num_workers
? next_worker_num + 1
: 0;
}
while (!worker_ready[next_worker_num]);
cur_worker_num = next_worker_num;

ereport(LOG, (errmsg("Total processed is \"%lu\"", total_processed)));
}
Expand All @@ -1290,33 +1330,36 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,

/* One final receive and send to every worker that there are no more tuples so they stop waiting. */
for (int worker_num = 0; worker_num < cstate->num_workers; worker_num++) {
shm_mq_result res;
Size nbytes;
void *data;
if (has_written[worker_num]) {
res = shm_mq_receive(inputs[worker_num], &nbytes, &data, false);
if (res == SHM_MQ_SUCCESS)
{
total_processed += *(int *)data;
if (worker_ready[worker_num]) {
shm_mq_result res;
Size nbytes;
void *data;
if (has_written[worker_num]) {
res = shm_mq_receive(inputs[worker_num], &nbytes, &data, false);
if (res == SHM_MQ_SUCCESS)
{
total_processed += *(int *)data;
}
else
{
ereport(LOG, (errmsg("Main worker receiving could not receive response from background worker.")));
}
ereport(LOG, (errmsg("Recieve successful, worker \"%d\" processed \"%d\" tuples", worker_num, *(int *)data)));
}
else

num_tuples = 0;
res = shm_mq_send(outputs[worker_num], sizeof(&num_tuples), &num_tuples, false);
if (res != SHM_MQ_SUCCESS)
{
ereport(LOG, (errmsg("Main worker receiving could not receive response from background worker.")));
ereport(LOG, (errmsg("Send was not successful")));
}
ereport(LOG, (errmsg("Recieve successful, worker \"%d\" processed \"%d\" tuples", worker_num, *(int *)data)));
}
ereport(LOG, (errmsg("Send successful, main sent \"%d\" tuples", num_tuples)));
WaitForParallelWorkersToFinish(pcxts[worker_num]);

num_tuples = 0;
res = shm_mq_send(outputs[worker_num], sizeof(&num_tuples), &num_tuples, false);
if (res != SHM_MQ_SUCCESS)
{
ereport(LOG, (errmsg("Send was not successful")));
shm_mq_detach(inputs[worker_num]);
shm_mq_detach(outputs[worker_num]);
}
ereport(LOG, (errmsg("Send successful, main sent \"%d\" tuples", num_tuples)));
WaitForParallelWorkersToFinish(pcxts[worker_num]);

shm_mq_detach(inputs[worker_num]);
shm_mq_detach(outputs[worker_num]);
DestroyParallelContext(pcxts[worker_num]);
}

Expand Down

0 comments on commit b1c4ddc

Please sign in to comment.