From 37bbec43a4b805cf3af8ff9a1c6443bb6dc40820 Mon Sep 17 00:00:00 2001 From: Kathleen Tuite Date: Mon, 15 Jul 2024 14:40:26 -0700 Subject: [PATCH] draft of force processing entity sub backlog --- .../20240715-01-backlog-add-event.js | 22 ++++++++ lib/model/query/entities.js | 46 +++++++++++----- test/integration/api/offline-entities.js | 55 +++++++++++++++++++ 3 files changed, 110 insertions(+), 13 deletions(-) create mode 100644 lib/model/migrations/20240715-01-backlog-add-event.js diff --git a/lib/model/migrations/20240715-01-backlog-add-event.js b/lib/model/migrations/20240715-01-backlog-add-event.js new file mode 100644 index 000000000..0ea440932 --- /dev/null +++ b/lib/model/migrations/20240715-01-backlog-add-event.js @@ -0,0 +1,22 @@ +// Copyright 2024 ODK Central Developers +// See the NOTICE file at the top-level directory of this distribution and at +// https://github.com/getodk/central-backend/blob/master/NOTICE. +// This file is part of ODK Central. It is subject to the license terms in +// the LICENSE file found in the top-level directory of this distribution and at +// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central, +// including this file, may be copied, modified, propagated, or distributed +// except according to the terms contained in the LICENSE file. + +const up = async (db) => { + await db.raw(`ALTER TABLE entity_submission_backlog + ADD COLUMN + "auditId" INT4 NOT NULL, + ADD CONSTRAINT fk_audit_id + FOREIGN KEY("auditId") + REFERENCES audits(id) + ON DELETE CASCADE`); +}; + +const down = (db) => db.raw('ALTER TABLE DROP COLUMN "auditId"'); + +module.exports = { up, down }; diff --git a/lib/model/query/entities.js b/lib/model/query/entities.js index fad3e6762..3ade80d91 100644 --- a/lib/model/query/entities.js +++ b/lib/model/query/entities.js @@ -8,7 +8,7 @@ // except according to the terms contained in the LICENSE file. const { sql } = require('slonik'); -const { Actor, Entity, Submission, Form } = require('../frames'); +const { Actor, Audit, Entity, Submission, Form } = require('../frames'); const { equals, extender, unjoiner, page, markDeleted, insertMany } = require('../../util/db'); const { map, mergeRight, pickAll } = require('ramda'); const { blankStringToNull, construct } = require('../../util/util'); @@ -196,9 +196,9 @@ SELECT actions FROM dataset_form_defs WHERE "datasetId" = ${datasetId} AND "formDefId" = ${formDefId}`); -const _holdSubmission = (run, submissionId, submissionDefId, branchId, branchBaseVersion) => run(sql` - INSERT INTO entity_submission_backlog ("submissionId", "submissionDefId", "branchId", "branchBaseVersion", "loggedAt") - VALUES (${submissionId}, ${submissionDefId}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP()) +const _holdSubmission = (run, eventId, submissionId, submissionDefId, branchId, branchBaseVersion) => run(sql` + INSERT INTO entity_submission_backlog ("auditId", "submissionId", "submissionDefId", "branchId", "branchBaseVersion", "loggedAt") + VALUES (${eventId}, ${submissionId}, ${submissionDefId}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP()) `); const _checkHeldSubmission = (maybeOne, branchId, branchBaseVersion) => maybeOne(sql` @@ -206,12 +206,25 @@ const _checkHeldSubmission = (maybeOne, branchId, branchBaseVersion) => maybeOne WHERE "branchId"=${branchId} AND "branchBaseVersion" = ${branchBaseVersion} RETURNING *`); +const _getHeldSubmissionsAsEvents = () => ({ all }) => all(sql` + SELECT audits.* FROM entity_submission_backlog + JOIN audits on entity_submission_backlog."auditId" = audits.id + ORDER BY "branchId", "branchBaseVersion"`) + .then(map(construct(Audit))); + // Used by _updateVerison below to figure out the intended base version in Central // based on the branchId, trunkVersion, and baseVersion in the submission -const _computeBaseVersion = async (maybeOne, run, dataset, clientEntity, submissionDef) => { +const _computeBaseVersion = async (maybeOne, run, eventId, dataset, clientEntity, submissionDef, ignoreBaseVersionConstraint = false) => { if (!clientEntity.def.trunkVersion || clientEntity.def.baseVersion === clientEntity.def.trunkVersion) { // trunk and client baseVersion are the same, indicating the start of a batch return clientEntity.def.baseVersion; + } else if (ignoreBaseVersionConstraint) { + // we are forcefully applying an update of this submission + // and it is likely that the correct base version does not exist in the database. + // we will be using trunk version instead + // but also removing the held submission from the backlog + await _checkHeldSubmission(maybeOne, clientEntity.def.branchId, clientEntity.def.baseVersion); + return clientEntity.def.trunkVersion; } else { const condition = { datasetId: dataset.id, uuid: clientEntity.uuid, branchId: clientEntity.def.branchId, @@ -221,7 +234,7 @@ const _computeBaseVersion = async (maybeOne, run, dataset, clientEntity, submiss const previousInBranch = (await _getDef(maybeOne, new QueryOptions({ condition }))); if (!previousInBranch.isDefined()) { // not ready to process this submission. eventually hold it for later. - await _holdSubmission(run, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion); + await _holdSubmission(run, eventId, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion); return null; } else { return previousInBranch.get().version; @@ -253,7 +266,7 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss return entity; }; -const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event) => async ({ Audits, Entities, maybeOne, run }) => { +const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, ignoreBaseVersionConstraint = false) => async ({ Audits, Entities, maybeOne, run }) => { if (!(event.action === 'submission.create' || event.action === 'submission.update.version' || event.action === 'submission.reprocess')) @@ -269,7 +282,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss if (clientEntity.def.branchId == null) { throw Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name }); } else { - await _holdSubmission(run, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion); + await _holdSubmission(run, event.id, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion); return null; } } else { @@ -288,7 +301,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss // Figure out the intended baseVersion // If this is an offline update with a branchId, the baseVersion value is local to that // offline context and we need to translate it to the correct base version within Central. - const baseVersion = await _computeBaseVersion(maybeOne, run, dataset, clientEntity, submissionDef); + const baseVersion = await _computeBaseVersion(maybeOne, run, event.id, dataset, clientEntity, submissionDef, ignoreBaseVersionConstraint); // If baseVersion is null, we held a submission and will stop processing now. if (baseVersion == null) @@ -354,9 +367,9 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss // Entrypoint to where submissions (a specific version) become entities const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, maybeOne, oneFirst }) => { const { submissionId, submissionDefId } = event.details; + const ignoreBaseVersionConstraint = parentEvent?.details?.force === true; const form = await Forms.getByActeeId(event.acteeId); - // If form is deleted/purged then submission won't be there either. if (!form.isDefined()) return null; @@ -409,7 +422,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset // Try update before create (if both are specified) if (entityData.system.update === '1' || entityData.system.update === 'true') try { - maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event); + maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, ignoreBaseVersionConstraint); } catch (err) { if ((err.problemCode === 404.8) && (entityData.system.create === '1' || entityData.system.create === 'true')) { maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent); @@ -426,7 +439,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset // branchBaseVersion could be undefined if handling an offline create const currentBranchBaseVersion = branchBaseVersion ?? 0; const nextSub = await _checkHeldSubmission(maybeOne, branchId, currentBranchBaseVersion + 1); - if (nextSub.isDefined()) { + if (nextSub.isDefined() && !ignoreBaseVersionConstraint) { const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId } = nextSub.get(); await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId }, { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId }); @@ -461,6 +474,12 @@ const createEntitiesFromPendingSubmissions = (submissionEvents, parentEvent) => () => processSubmissionEvent(event, parentEvent)(container))); +const processHeldSubmissions = () => async (container) => { + const events = await container.Entities._getHeldSubmissionsAsEvents(); + return runSequentially(events.map(event => + () => processSubmissionEvent(event, { details: { force: true } })(container))); +}; + //////////////////////////////////////////////////////////////////////////////// // GETTING ENTITIES @@ -624,5 +643,6 @@ module.exports = { countByDatasetId, getById, getAll, getAllDefs, del, createEntitiesFromPendingSubmissions, - resolveConflict + resolveConflict, + processHeldSubmissions, _getHeldSubmissionsAsEvents }; diff --git a/test/integration/api/offline-entities.js b/test/integration/api/offline-entities.js index 337a1cb2c..195f93f8d 100644 --- a/test/integration/api/offline-entities.js +++ b/test/integration/api/offline-entities.js @@ -609,4 +609,59 @@ describe('Offline Entities', () => { }); })); }); + + describe('force-processing held submissions', () => { + it('should apply an entity update when the previous update is missing', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + + // Trunk version is 1, but base version is 2 + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${uuid()}"`) + .replace('baseVersion="1"', 'baseVersion="2"') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + await container.Entities.processHeldSubmissions(); + + // check that nothing too bad happens from running this again + await container.Entities.processHeldSubmissions(); + })); + + it('should apply two updates when first upate is missing', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + + // Trunk version is 1, but base version is 2 + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('baseVersion="1"', 'baseVersion="2"') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('one', 'one-update2') + .replace('baseVersion="1"', 'baseVersion="3"') + .replace('arrived', 'departed') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + await container.Entities.processHeldSubmissions(); + + await exhaust(container); + + // check that nothing too bad happens from running this again + await container.Entities.processHeldSubmissions(); + })); + }); });