Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force-process entity submissions held in backlog #1172

Merged
merged 9 commits into from
Aug 26, 2024
23 changes: 23 additions & 0 deletions lib/bin/process-backlog.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2024 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.
//
// This script re-processes submissions containing offline entity actions that
// were previously held in a backlog due to submissions coming in out of order.

const { run } = require('../task/task');
const { processBacklog } = require('../task/process-backlog');

const { program } = require('commander');
program.option('-f, --force', 'Force all submissions in the backlog to be processed immediately.');
program.parse();

const options = program.opts();

run(processBacklog(options.force)
.then((count) => `Submissions processed: ${count}`));
89 changes: 69 additions & 20 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const config = require('config');
const { sql } = require('slonik');
const { Actor, Entity, Submission, Form } = require('../frames');
const { Actor, Audit, Entity, Submission, Form } = require('../frames');
const { equals, extender, unjoiner, page, markDeleted, insertMany } = require('../../util/db');
const { map, mergeRight, pickAll } = require('ramda');
const { blankStringToNull, construct } = require('../../util/util');
Expand All @@ -17,7 +18,7 @@ const { odataFilter, odataOrderBy } = require('../../data/odata-filter');
const { odataToColumnMap, parseSubmissionXml, getDiffProp, ConflictType } = require('../../data/entity');
const { isTrue } = require('../../util/http');
const Problem = require('../../util/problem');
const { runSequentially } = require('../../util/promise');
const { getOrReject, runSequentially } = require('../../util/promise');


/////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -171,20 +172,25 @@ createVersion.audit = (updatedEntity, dataset, partial, subDef) => (log) => {
entityDefId: updatedEntity.aux.currentVersion.id,
entity: { uuid: updatedEntity.uuid, dataset: dataset.name }
});
return Promise.resolve();
};
createVersion.audit.withResult = true;

////////////////////////////////////////////////////////////////////////////////
// WRAPPER FUNCTIONS FOR CREATING AND UPDATING ENTITIES

const _createEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent) => async ({ Audits, Entities }) => {
const _createEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing = false) => async ({ Audits, Entities }) => {
// If dataset requires approval on submission to create an entity and this event is not
// an approval event, then don't create an entity
if ((dataset.approvalRequired && event.details.reviewState !== 'approved') ||
(!dataset.approvalRequired && event.action === 'submission.update')) // don't process submission if approval is not required and submission metadata is updated
return null;

// TODO: auto-generate a label if forced and if the submission doesn't provide one
// Auto-generate a label if forced and if the submission doesn't provide one
if (forceOutOfOrderProcessing && entityData.system.label == null) {
// eslint-disable-next-line no-param-reassign
entityData.system.label = 'auto generated';
}
ktuite marked this conversation as resolved.
Show resolved Hide resolved
const partial = await Entity.fromParseEntityData(entityData, { create: true });

const sourceDetails = { submission: { instanceId: submissionDef.instanceId }, parentEventId: parentEvent ? parentEvent.id : undefined };
Expand All @@ -202,7 +208,7 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss
return entity;
};

const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event) => async ({ Audits, Entities }) => {
const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing = false) => async ({ Audits, Entities }) => {
if (!(event.action === 'submission.create'
|| event.action === 'submission.update.version'
|| event.action === 'submission.reprocess'))
Expand All @@ -213,7 +219,21 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss

// Figure out the intended baseVersion
// If this is an offline update with a branchId, the baseVersion value is local to that offline context.
const baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef);
let baseEntityDef;

// Try computing base version.
// But if there is a 404.8 not found error, double-check if the entity never existed or was deleted.
try {
baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing);
} catch (err) {
if (err.problemCode === 404.8) {
// Look up deleted entity by passing deleted as option argData
const deletedEntity = await Entities.getById(dataset.id, clientEntity.uuid, new QueryOptions({ argData: { deleted: 'true' } }));
if (deletedEntity.isDefined())
throw Problem.user.entityDeleted({ entityUuid: clientEntity.uuid });
}
throw err;
}

// If baseEntityVersion is null, we held a submission and will stop processing now.
if (baseEntityDef == null)
Expand Down Expand Up @@ -294,7 +314,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss

// Used by _updateVerison to figure out the intended base version in Central
// based on the branchId, trunkVersion, and baseVersion in the submission
const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef) => async ({ Entities }) => {
const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing = false) => async ({ Entities }) => {
if (!clientEntity.def.branchId) {

// no offline branching to deal with, use baseVersion as is
Expand Down Expand Up @@ -327,10 +347,17 @@ const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef) => a
const baseEntityVersion = await Entities.getDef(dataset.id, clientEntity.uuid, new QueryOptions({ condition }));

if (!baseEntityVersion.isDefined()) {
// TODO: add case for force-processing
// If there is no base version and we are not forcing the processing, hold submission in the backlog.
await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
if (forceOutOfOrderProcessing) {
// If the base version doesn't exist but we forcing the update anyway, use the latest version on the server as the base.
// If that can't be found, throw an error for _processSubmissionEvent to catch so it can try create instead of update.
const latestEntity = await Entities.getById(dataset.id, clientEntity.uuid)
.then(getOrReject(Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name })));
return latestEntity.aux.currentVersion;
} else {
// If there is no base version and we are not forcing the processing, hold submission in the backlog.
await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
}
}

// Return the base entity version
Expand All @@ -351,7 +378,7 @@ const _getFormDefActions = (oneFirst, datasetId, formDefId) => oneFirst(sql`
// so any errors can be rolled back and logged as an entity processing error.
const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, oneFirst }) => {
const { submissionId, submissionDefId } = event.details;
// TODO: check parentEvent details to determine if this is a forced reprocessing or not
const forceOutOfOrderProcessing = parentEvent?.details?.force === true;
ktuite marked this conversation as resolved.
Show resolved Hide resolved

const form = await Forms.getByActeeId(event.acteeId);
// If form is deleted/purged then submission won't be there either.
Expand All @@ -369,7 +396,6 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
// don't try to process it now, it will be dequeued and reprocessed elsewhere.
if (existingHeldSubmission.isDefined())
return null;
// TODO: check how force-reprocessing interacts with this logic above

const submission = await Submissions.getSubAndDefById(submissionDefId);

Expand Down Expand Up @@ -409,22 +435,21 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
throw Problem.user.entityActionNotPermitted({ action, permitted: permittedActions });
}

// TODO: work out how force-reprocessing interacts with this logic (updateEntity and createEntity should know about it)
let maybeEntity = null;
// Try update before create (if both are specified)
if (entityData.system.update === '1' || entityData.system.update === 'true')
try {
maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event);
maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing);
} catch (err) {
const attemptCreate = (entityData.system.create === '1' || entityData.system.create === 'true');
const attemptCreate = (entityData.system.create === '1' || entityData.system.create === 'true') || forceOutOfOrderProcessing;
if ((err.problemCode === 404.8) && attemptCreate) {
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing);
} else {
throw (err);
}
}
else if (entityData.system.create === '1' || entityData.system.create === 'true')
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing);

// Check for held submissions that follow this one in the same branch
if (maybeEntity != null) {
Expand All @@ -434,8 +459,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
const currentBranchBaseVersion = branchBaseVersion ?? 0;
const nextSub = await Entities._getNextHeldSubmissionInBranch(entityUuid, branchId, currentBranchBaseVersion + 1);

// TODO: don't handle the next submission if the current one was processed forcefully
if (nextSub.isDefined()) {
if (nextSub.isDefined() && !forceOutOfOrderProcessing) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId, auditId } = nextSub.get();
await Entities._deleteHeldSubmissionByEventId(auditId);
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
Expand Down Expand Up @@ -496,6 +520,30 @@ const _deleteHeldSubmissionByEventId = (eventId) => ({ run }) => run(sql`
WHERE "auditId"=${eventId}`);


////////////////////////////////////////////////////////////////////////////////
// FORCE PROCESSING SUBMISSIONS FROM BACKLOG

const DAY_RANGE = config.has('default.taskSchedule.forceProcess')
? config.get('default.taskSchedule.forceProcess')
: 7; // Default is 7 days

const _getHeldSubmissionsAsEvents = (force) => ({ all }) => all(sql`
SELECT audits.* FROM entity_submission_backlog
JOIN audits on entity_submission_backlog."auditId" = audits.id
${force ? sql`` : sql`WHERE entity_submission_backlog."loggedAt" < current_date - cast(${DAY_RANGE} as int)`}
ORDER BY "branchId", "branchBaseVersion"`)
.then(map(construct(Audit)));

const processBacklog = (force = false) => async (container) => {
const events = await container.Entities._getHeldSubmissionsAsEvents(force);
return runSequentially(events.map(event =>
async () => {
await container.Entities._deleteHeldSubmissionByEventId(event.id);
return processSubmissionEvent(event, { details: { force: true } })(container);
}))
.then(() => events.length);
};
ktuite marked this conversation as resolved.
Show resolved Hide resolved

////////////////////////////////////////////////////////////////////////////////
// PROCESSING PENDING SUBMISSIONS FROM TOGGLING DATASET APPROVALREQUIRED FLAG

Expand Down Expand Up @@ -680,6 +728,7 @@ module.exports = {
_computeBaseVersion,
_holdSubmission, _checkHeldSubmission,
_getNextHeldSubmissionInBranch, _deleteHeldSubmissionByEventId,
_getHeldSubmissionsAsEvents, processBacklog,
processSubmissionEvent, streamForExport,
getDefBySubmissionId,
createVersion,
Expand Down
16 changes: 16 additions & 0 deletions lib/task/process-backlog.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2024 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.
//
// This task deletes expired sessions from the table so it does not become
// overladen and bogged down over time.
ktuite marked this conversation as resolved.
Show resolved Hide resolved

const { task } = require('./task');
const processBacklog = task.withContainer(({ Entities }) => Entities.processBacklog);
ktuite marked this conversation as resolved.
Show resolved Hide resolved
module.exports = { processBacklog };

3 changes: 3 additions & 0 deletions lib/util/problem.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ const problems = {
// entity base version specified in submission does not exist
entityVersionNotFound: problem(404.9, ({ baseVersion, entityUuid, datasetName }) => `Base version (${baseVersion}) does not exist for entity UUID (${entityUuid}) in dataset (${datasetName}).`),

// entity has been deleted
entityDeleted: problem(404.11, ({ entityUuid }) => `The entity with UUID (${entityUuid}) has been deleted.`),

// { allowed: [ acceptable formats ], got }
unacceptableFormat: problem(406.1, ({ allowed }) => `Requested format not acceptable; this resource allows: ${allowed.join()}.`),

Expand Down
Loading