Skip to content

Commit

Permalink
draft of force processing entity sub backlog
Browse files Browse the repository at this point in the history
  • Loading branch information
ktuite committed Jul 15, 2024
1 parent 0cd5694 commit 37bbec4
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 13 deletions.
22 changes: 22 additions & 0 deletions lib/model/migrations/20240715-01-backlog-add-event.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2024 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const up = async (db) => {
await db.raw(`ALTER TABLE entity_submission_backlog
ADD COLUMN
"auditId" INT4 NOT NULL,
ADD CONSTRAINT fk_audit_id
FOREIGN KEY("auditId")
REFERENCES audits(id)
ON DELETE CASCADE`);
};

const down = (db) => db.raw('ALTER TABLE DROP COLUMN "auditId"');

module.exports = { up, down };
46 changes: 33 additions & 13 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// except according to the terms contained in the LICENSE file.

const { sql } = require('slonik');
const { Actor, Entity, Submission, Form } = require('../frames');
const { Actor, Audit, Entity, Submission, Form } = require('../frames');
const { equals, extender, unjoiner, page, markDeleted, insertMany } = require('../../util/db');
const { map, mergeRight, pickAll } = require('ramda');
const { blankStringToNull, construct } = require('../../util/util');
Expand Down Expand Up @@ -196,22 +196,35 @@ SELECT actions
FROM dataset_form_defs
WHERE "datasetId" = ${datasetId} AND "formDefId" = ${formDefId}`);

const _holdSubmission = (run, submissionId, submissionDefId, branchId, branchBaseVersion) => run(sql`
INSERT INTO entity_submission_backlog ("submissionId", "submissionDefId", "branchId", "branchBaseVersion", "loggedAt")
VALUES (${submissionId}, ${submissionDefId}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
const _holdSubmission = (run, eventId, submissionId, submissionDefId, branchId, branchBaseVersion) => run(sql`
INSERT INTO entity_submission_backlog ("auditId", "submissionId", "submissionDefId", "branchId", "branchBaseVersion", "loggedAt")
VALUES (${eventId}, ${submissionId}, ${submissionDefId}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
`);

const _checkHeldSubmission = (maybeOne, branchId, branchBaseVersion) => maybeOne(sql`
DELETE FROM entity_submission_backlog
WHERE "branchId"=${branchId} AND "branchBaseVersion" = ${branchBaseVersion}
RETURNING *`);

const _getHeldSubmissionsAsEvents = () => ({ all }) => all(sql`
SELECT audits.* FROM entity_submission_backlog
JOIN audits on entity_submission_backlog."auditId" = audits.id
ORDER BY "branchId", "branchBaseVersion"`)
.then(map(construct(Audit)));

// Used by _updateVerison below to figure out the intended base version in Central
// based on the branchId, trunkVersion, and baseVersion in the submission
const _computeBaseVersion = async (maybeOne, run, dataset, clientEntity, submissionDef) => {
const _computeBaseVersion = async (maybeOne, run, eventId, dataset, clientEntity, submissionDef, ignoreBaseVersionConstraint = false) => {
if (!clientEntity.def.trunkVersion || clientEntity.def.baseVersion === clientEntity.def.trunkVersion) {
// trunk and client baseVersion are the same, indicating the start of a batch
return clientEntity.def.baseVersion;
} else if (ignoreBaseVersionConstraint) {
// we are forcefully applying an update of this submission
// and it is likely that the correct base version does not exist in the database.
// we will be using trunk version instead
// but also removing the held submission from the backlog
await _checkHeldSubmission(maybeOne, clientEntity.def.branchId, clientEntity.def.baseVersion);
return clientEntity.def.trunkVersion;
} else {
const condition = { datasetId: dataset.id, uuid: clientEntity.uuid,
branchId: clientEntity.def.branchId,
Expand All @@ -221,7 +234,7 @@ const _computeBaseVersion = async (maybeOne, run, dataset, clientEntity, submiss
const previousInBranch = (await _getDef(maybeOne, new QueryOptions({ condition })));
if (!previousInBranch.isDefined()) {
// not ready to process this submission. eventually hold it for later.
await _holdSubmission(run, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion);
await _holdSubmission(run, eventId, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
} else {
return previousInBranch.get().version;
Expand Down Expand Up @@ -253,7 +266,7 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss
return entity;
};

const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event) => async ({ Audits, Entities, maybeOne, run }) => {
const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, ignoreBaseVersionConstraint = false) => async ({ Audits, Entities, maybeOne, run }) => {
if (!(event.action === 'submission.create'
|| event.action === 'submission.update.version'
|| event.action === 'submission.reprocess'))
Expand All @@ -269,7 +282,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
if (clientEntity.def.branchId == null) {
throw Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name });
} else {
await _holdSubmission(run, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion);
await _holdSubmission(run, event.id, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
}
} else {
Expand All @@ -288,7 +301,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
// Figure out the intended baseVersion
// If this is an offline update with a branchId, the baseVersion value is local to that
// offline context and we need to translate it to the correct base version within Central.
const baseVersion = await _computeBaseVersion(maybeOne, run, dataset, clientEntity, submissionDef);
const baseVersion = await _computeBaseVersion(maybeOne, run, event.id, dataset, clientEntity, submissionDef, ignoreBaseVersionConstraint);

// If baseVersion is null, we held a submission and will stop processing now.
if (baseVersion == null)
Expand Down Expand Up @@ -354,9 +367,9 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
// Entrypoint to where submissions (a specific version) become entities
const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, maybeOne, oneFirst }) => {
const { submissionId, submissionDefId } = event.details;
const ignoreBaseVersionConstraint = parentEvent?.details?.force === true;

const form = await Forms.getByActeeId(event.acteeId);

// If form is deleted/purged then submission won't be there either.
if (!form.isDefined())
return null;
Expand Down Expand Up @@ -409,7 +422,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
// Try update before create (if both are specified)
if (entityData.system.update === '1' || entityData.system.update === 'true')
try {
maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event);
maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, ignoreBaseVersionConstraint);
} catch (err) {
if ((err.problemCode === 404.8) && (entityData.system.create === '1' || entityData.system.create === 'true')) {
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
Expand All @@ -426,7 +439,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
// branchBaseVersion could be undefined if handling an offline create
const currentBranchBaseVersion = branchBaseVersion ?? 0;
const nextSub = await _checkHeldSubmission(maybeOne, branchId, currentBranchBaseVersion + 1);
if (nextSub.isDefined()) {
if (nextSub.isDefined() && !ignoreBaseVersionConstraint) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId } = nextSub.get();
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
{ submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId });
Expand Down Expand Up @@ -461,6 +474,12 @@ const createEntitiesFromPendingSubmissions = (submissionEvents, parentEvent) =>
() => processSubmissionEvent(event, parentEvent)(container)));


const processHeldSubmissions = () => async (container) => {
const events = await container.Entities._getHeldSubmissionsAsEvents();
return runSequentially(events.map(event =>
() => processSubmissionEvent(event, { details: { force: true } })(container)));
};


////////////////////////////////////////////////////////////////////////////////
// GETTING ENTITIES
Expand Down Expand Up @@ -624,5 +643,6 @@ module.exports = {
countByDatasetId, getById,
getAll, getAllDefs, del,
createEntitiesFromPendingSubmissions,
resolveConflict
resolveConflict,
processHeldSubmissions, _getHeldSubmissionsAsEvents
};
55 changes: 55 additions & 0 deletions test/integration/api/offline-entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -609,4 +609,59 @@ describe('Offline Entities', () => {
});
}));
});

describe('force-processing held submissions', () => {
it('should apply an entity update when the previous update is missing', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');

// Trunk version is 1, but base version is 2
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('branchId=""', `branchId="${uuid()}"`)
.replace('baseVersion="1"', 'baseVersion="2"')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

await container.Entities.processHeldSubmissions();

// check that nothing too bad happens from running this again
await container.Entities.processHeldSubmissions();
}));

it('should apply two updates when first upate is missing', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');
const branchId = uuid();

// Trunk version is 1, but base version is 2
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('branchId=""', `branchId="${branchId}"`)
.replace('baseVersion="1"', 'baseVersion="2"')
)
.set('Content-Type', 'application/xml')
.expect(200);

await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('branchId=""', `branchId="${branchId}"`)
.replace('one', 'one-update2')
.replace('baseVersion="1"', 'baseVersion="3"')
.replace('<status>arrived</status>', '<status>departed</status>')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

await container.Entities.processHeldSubmissions();

await exhaust(container);

// check that nothing too bad happens from running this again
await container.Entities.processHeldSubmissions();
}));
});
});

0 comments on commit 37bbec4

Please sign in to comment.