From 8473a4ae487d664dcfc4668d4511bad41c53e8d0 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Wed, 11 Jun 2025 08:42:33 +0200 Subject: [PATCH 1/6] Allow concurrent non-blocking uploads --- server/mergin/sync/models.py | 34 +++- server/mergin/sync/public_api.yaml | 4 + server/mergin/sync/public_api_controller.py | 88 +++++----- server/mergin/tests/test_db_hooks.py | 2 +- .../mergin/tests/test_project_controller.py | 165 +++++++++++++++--- .../9b3eac2f21c2_allow_concurrent_uploads.py | 85 +++++++++ 6 files changed, 305 insertions(+), 73 deletions(-) create mode 100644 server/migrations/community/9b3eac2f21c2_allow_concurrent_uploads.py diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 3854e4d2..5992726a 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -1003,13 +1003,12 @@ class Upload(db.Model): project_id = db.Column( UUID(as_uuid=True), db.ForeignKey("project.id", ondelete="CASCADE"), index=True ) - # project version where upload is initiated from - version = db.Column(db.Integer, index=True) changes = db.Column(db.JSON) user_id = db.Column( db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), nullable=True ) created = db.Column(db.DateTime, default=datetime.utcnow) + blocking = db.Column(db.Boolean, default=True) user = db.relationship("User") project = db.relationship( @@ -1019,16 +1018,23 @@ class Upload(db.Model): "uploads", single_parent=True, lazy="dynamic", cascade="all,delete" ), ) - __table_args__ = (db.UniqueConstraint("project_id", "version"),) - def __init__( - self, project: Project, version: int, changes: UploadChanges, user_id: int - ): + __table_args__ = ( + db.Index( + "ix_upload_blocking_partial", + project_id, + blocking, + unique=True, + postgresql_where=(blocking), + ), + ) + + def __init__(self, project: Project, changes: UploadChanges, user_id: int): self.id = str(uuid.uuid4()) self.project_id = project.id - self.version = version self.changes = ChangesSchema().dump(changes) self.user_id = user_id + self.blocking = self.is_blocking(changes) @property def upload_dir(self): @@ -1053,6 +1059,20 @@ def clear(self): db.session.delete(self) db.session.commit() + @staticmethod + def is_blocking(changes: UploadChanges) -> bool: + """Check if changes would be blocking.""" + # let's mark upload as non-blocking only if there are new non-spatial data added (e.g. photos) + return bool( + len(changes.updated) + or len(changes.removed) + or any(is_qgis(f.path) or is_versioned_file(f.path) for f in changes.added) + ) + + def file_already_in_upload(self, path) -> bool: + """Check if file is not already as new added file""" + return any(f["path"] == path for f in self.changes["added"]) + class RequestStatus(Enum): ACCEPTED = "accepted" diff --git a/server/mergin/sync/public_api.yaml b/server/mergin/sync/public_api.yaml index 5227b562..b7c0b7e8 100644 --- a/server/mergin/sync/public_api.yaml +++ b/server/mergin/sync/public_api.yaml @@ -596,6 +596,10 @@ paths: description: upload transaction uuid type: string example: 970181b5-7143-491b-91a6-36533021c9a2 + blocking: + description: if upload blocks other uploads + type: boolean + example: false "400": $ref: "#/components/responses/BadStatusResp" "401": diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 9fd229a1..d72b7b7f 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -752,31 +752,36 @@ def project_push(namespace, project_name): if not ws: abort(404) - # fixme use get_latest - pv = ProjectVersion.query.filter_by( - project_id=project.id, name=project.latest_version - ).first() - if pv and pv.name != version: - abort(400, "Version mismatch") + pv = project.get_latest_version() + if pv and pv.name < version: + abort(400, "Version mismatch, client cannot be ahead of server") if not pv and version != 0: abort(400, "First push should be with v0") if all(len(changes[key]) == 0 for key in changes.keys()): abort(400, "No changes") - # reject upload early if there is another one already running - pending_upload = Upload.query.filter_by( - project_id=project.id, version=version - ).first() - if pending_upload and pending_upload.is_active(): - abort(400, "Another process is running. Please try later.") + upload_changes = ChangesSchema(context={"version": pv.name + 1}).load(changes) - upload_changes = ChangesSchema(context={"version": version + 1}).load(changes) + # reject upload early if there is another blocking upload already running + if Upload.is_blocking(upload_changes): + pending_upload = Upload.query.filter_by( + project_id=project.id, blocking=True + ).first() + if pending_upload and pending_upload.is_active(): + abort(400, "Another process is running. Please try later.") for item in upload_changes.added: - # check if same file is not already uploaded + # check if same file is not already uploaded or in pending upload if not all(ele.path != item.path for ele in project.files): abort(400, f"File {item.path} has been already uploaded") + + for upload in project.uploads.all(): + if not upload.is_active(): + continue + if upload.file_already_in_upload(item.path): + abort(400, f"File {item.path} is already in other upload") + if not is_valid_path(item.path): abort( 400, @@ -842,18 +847,18 @@ def project_push(namespace, project_name): ) ) - upload = Upload(project, version, upload_changes, current_user.id) + upload = Upload(project, upload_changes, current_user.id) db.session.add(upload) try: - # Creating upload transaction with different project's version is possible. + # Creating blocking upload transaction can fail, e.g. in case of racing condition db.session.commit() logging.info( - f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" + f"Upload transaction {upload.id} created for project: {project.id}, version: {version}, blocking: {upload.blocking}" ) except IntegrityError: db.session.rollback() - # check and clean dangling uploads or abort - for current_upload in project.uploads.all(): + # check and clean dangling blocking uploads or abort + for current_upload in project.uploads.filter(Upload.blocking == True).all(): if current_upload.is_active(): abort(400, "Another process is running. Please try later.") db.session.delete(current_upload) @@ -871,7 +876,7 @@ def project_push(namespace, project_name): try: db.session.commit() logging.info( - f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" + f"Upload transaction {upload.id} created for project: {project.id}, version: {version}, blocking: {upload.blocking}" ) move_to_tmp(upload.upload_dir) except IntegrityError as err: @@ -919,7 +924,7 @@ def project_push(namespace, project_name): finally: upload.clear() - return {"transaction": upload.id} + return {"transaction": upload.id, "blocking": upload.blocking} @auth_required @@ -938,9 +943,9 @@ def chunk_upload(transaction_id, chunk_id): """ upload, upload_dir = get_upload(transaction_id) request.view_args["project"] = upload.project - upload_changes = ChangesSchema(context={"version": upload.version + 1}).load( - upload.changes - ) + upload_changes = ChangesSchema( + context={"version": upload.project.latest_version + 1} + ).load(upload.changes) for f in upload_changes.added + upload_changes.updated: if chunk_id in f.chunks: dest = os.path.join(upload_dir, "chunks", chunk_id) @@ -984,13 +989,15 @@ def push_finish(transaction_id): upload, upload_dir = get_upload(transaction_id) request.view_args["project"] = upload.project - changes = ChangesSchema(context={"version": upload.version + 1}).load( - upload.changes - ) project = upload.project if project.locked_until: abort(make_response(jsonify(ProjectLocked().to_dict()), 422)) + project_path = get_project_path(project) + next_version = project.latest_version + 1 + v_next_version = ProjectVersion.to_v_name(next_version) + changes = ChangesSchema(context={"version": next_version}).load(upload.changes) + corrupted_files = [] for f in changes.added + changes.updated: @@ -1045,24 +1052,17 @@ def push_finish(transaction_id): move_to_tmp(upload_dir) abort(422, {"corrupted_files": corrupted_files}) - next_version = upload.version + 1 - v_next_version = ProjectVersion.to_v_name(next_version) files_dir = os.path.join(upload_dir, "files", v_next_version) target_dir = os.path.join(project.storage.project_dir, v_next_version) - if os.path.exists(target_dir): - pv = ProjectVersion.query.filter_by( - project_id=project.id, name=project.latest_version - ).first() - if pv and pv.name == upload.version + 1: - abort( - 409, - f"There is already version with this name {v_next_version}", - ) - logging.info( - "Upload transaction: Target directory already exists. Overwriting %s" - % target_dir + + # double check someone else has not created the same version meanwhile + if ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() or os.path.exists(target_dir): + abort( + 409, + f"There is already version with this name {v_next_version}", ) - move_to_tmp(target_dir) try: # let's move uploaded files where they are expected to be @@ -1135,10 +1135,14 @@ def push_finish(transaction_id): f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " f"transaction id: {transaction_id}.: {str(err)}" ) + if os.path.exists(target_dir): + move_to_tmp(target_dir) abort(422, "Failed to create new version: {}".format(str(err))) # catch exception during pg transaction so we can rollback and prevent PendingRollbackError during upload clean up except gevent.timeout.Timeout: db.session.rollback() + if os.path.exists(target_dir): + move_to_tmp(target_dir) raise finally: # remove artifacts diff --git a/server/mergin/tests/test_db_hooks.py b/server/mergin/tests/test_db_hooks.py index e7f9e270..f896f99b 100644 --- a/server/mergin/tests/test_db_hooks.py +++ b/server/mergin/tests/test_db_hooks.py @@ -117,7 +117,7 @@ def test_remove_project(client, diff_project): mergin_user = User.query.filter_by(username=DEFAULT_USER[0]).first() project_dir = Path(diff_project.storage.project_dir) changes = UploadChanges(added=[], removed=[], updated=[]) - upload = Upload(diff_project, 10, changes, mergin_user.id) + upload = Upload(diff_project, changes, mergin_user.id) db.session.add(upload) project_id = diff_project.id user = add_user("user", "user") diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index b1f60a8f..977a2326 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -23,6 +23,8 @@ import tempfile from sqlalchemy import desc +from sqlalchemy.exc import IntegrityError + from ..app import db from ..sync.models import ( Project, @@ -35,7 +37,7 @@ PushChangeType, ProjectFilePath, ) -from ..sync.files import ChangesSchema +from ..sync.files import ChangesSchema, UploadChanges from ..sync.schemas import ProjectListSchema from ..sync.utils import generate_checksum, is_versioned_file from ..auth.models import User, UserProfile @@ -497,7 +499,6 @@ def test_delete_project(client): # mimic update of project with chunk upload changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) - os.mkdir(os.path.join(upload.project.storage.project_dir, "v2")) os.makedirs(os.path.join(upload_dir, "chunks")) for f in upload.changes["added"] + upload.changes["updated"]: with open(os.path.join(test_project_dir, f["path"]), "rb") as in_file: @@ -1020,6 +1021,12 @@ def _get_changes_with_diff_0_size(project_dir): return changes +def _get_non_blocking_changes(project_dir): + changes = _get_changes(project_dir) + changes["updated"] = changes["removed"] = [] + return changes + + test_push_data = [ ( {"version": "v1", "changes": _get_changes_without_added(test_project_dir)}, @@ -1038,7 +1045,7 @@ def _get_changes_with_diff_0_size(project_dir): 400, ), # contains already uploaded file ( - {"version": "v0", "changes": _get_changes_without_added(test_project_dir)}, + {"version": "v2", "changes": _get_changes_without_added(test_project_dir)}, 400, ), # version mismatch ({"version": "v1", "changes": {}}, 400), # wrong changes format @@ -1086,15 +1093,83 @@ def test_push_project_start(client, data, expected): assert failure.error_type == "push_start" +def test_concurrent_uploads(client): + """Test concurrent uploads into same project""" + url = f"/v1/project/push/{test_workspace_name}/{test_project}" + data = {"version": "v0", "changes": _get_changes_with_diff(test_project_dir)} + resp = client.post( + url, + data=json.dumps(data, cls=DateTimeEncoder).encode("utf-8"), + headers=json_headers, + ) + assert resp.status_code == 200 + upload = Upload.query.get(resp.json["transaction"]) + assert upload.blocking is True + + uploaded_file = file_info( + test_project_dir, "test_dir/test4.txt", chunk_size=CHUNK_SIZE + ) + # modify name to bypass name check + uploaded_file["path"] = "test123.txt" + data["changes"] = { + "added": [uploaded_file], + "updated": [], + "removed": [], + } + + resp = client.post( + url, + data=json.dumps(data, cls=DateTimeEncoder).encode("utf-8"), + headers=json_headers, + ) + assert resp.status_code == 200 + upload = Upload.query.get(resp.json["transaction"]) + assert upload.blocking is False + + # we cannot have multiple pending uploads with the same new added files + resp = client.post( + url, + data=json.dumps(data, cls=DateTimeEncoder).encode("utf-8"), + headers=json_headers, + ) + assert resp.status_code == 400 + assert ( + resp.json["detail"] + == f"File {uploaded_file['path']} is already in other upload" + ) + + # second blocking upload is forbidden + data["changes"] = _get_changes_without_added(test_project_dir) + resp = client.post( + url, + data=json.dumps(data, cls=DateTimeEncoder).encode("utf-8"), + headers=json_headers, + ) + assert resp.status_code == 400 + assert resp.json["detail"] == "Another process is running. Please try later." + + def test_push_to_new_project(client): # create blank project p = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id ).first() - project = Project( - "blank", p.storage_params, p.creator, p.workspace, files=[], public=True - ) + project = Project("blank", p.storage_params, p.creator, p.workspace, public=True) db.session.add(project) + pv = ProjectVersion( + project, + 0, + p.creator.id, + ChangesSchema(context={"version": 0}).load( + { + "added": [], + "updated": [], + "removed": [], + } + ), + ip="127.0.0.1", + ) + db.session.add(pv) db.session.commit() current_app.config["BLACKLIST"] = ["test4"] @@ -1121,7 +1196,15 @@ def test_push_to_new_project(client): headers=json_headers, ) assert resp.status_code == 400 + assert resp.json["detail"] == "Version mismatch, client cannot be ahead of server" + failure = SyncFailuresHistory.query.filter_by(project_id=project.id).first() + assert failure.last_version == "v0" + assert failure.error_type == "push_start" + assert failure.error_details == "Version mismatch, client cannot be ahead of server" + # test legacy situation when project does not have any project version associated yet + db.session.delete(pv) + db.session.commit() data = {"version": "v100", "changes": _get_changes(test_project_dir)} resp = client.post( url, @@ -1130,7 +1213,11 @@ def test_push_to_new_project(client): ) assert resp.status_code == 400 assert resp.json["detail"] == "First push should be with v0" - failure = SyncFailuresHistory.query.filter_by(project_id=project.id).first() + failure = ( + SyncFailuresHistory.query.filter_by(project_id=project.id) + .order_by(SyncFailuresHistory.timestamp.desc()) + .first() + ) assert failure.last_version == "v0" assert failure.error_type == "push_start" assert failure.error_details == "First push should be with v0" @@ -1278,7 +1365,7 @@ def create_transaction(username, changes, version=1): name=test_project, workspace_id=test_workspace_id ).first() upload_changes = ChangesSchema(context={"version": version}).load(changes) - upload = Upload(project, version, upload_changes, user.id) + upload = Upload(project, upload_changes, user.id) db.session.add(upload) db.session.commit() upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id) @@ -1364,7 +1451,6 @@ def test_push_finish(client): assert failure.error_type == "push_finish" assert "corrupted_files" in failure.error_details - os.mkdir(os.path.join(upload.project.storage.project_dir, "v2")) # mimic chunks were uploaded os.makedirs(os.path.join(upload_dir, "chunks")) for f in upload.changes["added"] + upload.changes["updated"]: @@ -1373,6 +1459,32 @@ def test_push_finish(client): with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) + # test finish upload, pretend another upload is already being processed + os.makedirs( + os.path.join( + upload.project.storage.project_dir, f"v{upload.project.latest_version + 1}" + ), + exist_ok=True, + ) + resp = client.post( + url, + headers=json_headers, + ) + assert resp.status_code == 409 + # bump to fake version to make upload finish pass + upload.project.latest_version += 1 + db.session.add(upload.project) + pv = ProjectVersion( + upload.project, + upload.project.latest_version, + upload.project.creator.id, + UploadChanges(added=[], updated=[], removed=[]), + "127.0.0.1", + ) + pv.project = upload.project + db.session.add(pv) + db.session.commit() + resp2 = client.post(url, headers={**json_headers, "User-Agent": "Werkzeug"}) assert resp2.status_code == 200 assert not os.path.exists(upload_dir) @@ -1405,7 +1517,7 @@ def test_push_finish(client): assert resp4.status_code == 403 # other failures with error code 403, 404 does to count to failures history - assert SyncFailuresHistory.query.count() == 1 + assert SyncFailuresHistory.query.count() == 2 def test_push_close(client): @@ -1774,6 +1886,9 @@ def test_optimize_storage(app, client, diff_project): diff_project.latest_version = 8 ProjectVersion.query.filter_by(project_id=diff_project.id, name=9).delete() ProjectVersion.query.filter_by(project_id=diff_project.id, name=10).delete() + for _dir in ("v9", "v10"): + if os.path.exists(os.path.join(diff_project.storage.project_dir, _dir)): + shutil.rmtree(os.path.join(diff_project.storage.project_dir, _dir)) db.session.commit() diff_project.cache_latest_files() assert diff_project.latest_version == 8 @@ -2293,19 +2408,23 @@ def test_project_version_integrity(client): changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - # manually create an identical project version in db - pv = add_project_version(upload.project, changes) - # try to finish the transaction - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) - assert resp.status_code == 422 - assert "Failed to create new version" in resp.json["detail"] - failure = SyncFailuresHistory.query.filter_by(project_id=upload.project.id).first() - assert failure.error_type == "push_finish" - assert "Failed to create new version" in failure.error_details - upload.project.latest_version = pv.name - 1 - db.session.delete(pv) - db.session.delete(failure) - db.session.commit() + + # try to finish the transaction which would fail on version created integrity error, e.g. race conditions + with patch.object( + ProjectVersion, + "__init__", + side_effect=IntegrityError("Project version already exists", None, None), + ): + resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + assert resp.status_code == 422 + assert "Failed to create new version" in resp.json["detail"] + failure = SyncFailuresHistory.query.filter_by( + project_id=upload.project.id + ).first() + assert failure.error_type == "push_finish" + assert "Failed to create new version" in failure.error_details + db.session.delete(failure) + db.session.commit() # changes without an upload with patch("mergin.sync.public_api_controller.get_user_agent") as mock: diff --git a/server/migrations/community/9b3eac2f21c2_allow_concurrent_uploads.py b/server/migrations/community/9b3eac2f21c2_allow_concurrent_uploads.py new file mode 100644 index 00000000..c4e99eab --- /dev/null +++ b/server/migrations/community/9b3eac2f21c2_allow_concurrent_uploads.py @@ -0,0 +1,85 @@ +"""Allow concurrent uploads + +Create partial index to limit at most one blocking upload per project. + +Revision ID: 9b3eac2f21c2 +Revises: 6cb54659c1de +Create Date: 2025-06-10 14:00:30.094460 + +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "9b3eac2f21c2" +down_revision = "6cb54659c1de" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("upload", sa.Column("blocking", sa.Boolean(), nullable=True)) + op.drop_index("ix_upload_version", table_name="upload") + op.drop_constraint("uq_upload_project_id", "upload", type_="unique") + op.create_index( + "ix_upload_blocking_partial", + "upload", + ["project_id", "blocking"], + unique=True, + postgresql_where=sa.text("blocking"), + ) + op.drop_column("upload", "version") + + # data migration - set all current uploads to blocking + conn = op.get_bind() + query = """ + UPDATE upload + SET blocking = true; + """ + conn.execute(sa.text(query)) + + +def downgrade(): + op.add_column( + "upload", sa.Column("version", sa.INTEGER(), autoincrement=False, nullable=True) + ) + op.drop_index( + "ix_upload_blocking_partial", + table_name="upload", + postgresql_where=sa.text("blocking"), + ) + + # data migration - remove concurrent uploads, set upload version to project latest version + conn = op.get_bind() + remove_query = """ + WITH multiple_uploads AS ( + SELECT + u.id, + ROW_NUMBER() OVER( + PARTITION BY u.project_id + ORDER BY u.created asc + ) AS row_number + FROM upload u + INNER JOIN project p ON p.id = u.project_id + ) + DELETE FROM upload u + USING multiple_uploads mu + WHERE u.id = mu.id AND mu.row_number > 1; + """ + conn.execute(sa.text(remove_query)) + + update_query = """ + UPDATE upload u + SET version = p.latest_version + FROM project p + WHERE p.id = u.project_id; + """ + conn.execute(sa.text(update_query)) + + op.create_unique_constraint( + "uq_upload_project_id", "upload", ["project_id", "version"] + ) + op.create_index("ix_upload_version", "upload", ["version"], unique=False) + op.drop_column("upload", "blocking") From 4bd0d2b459b4543908e3d9815fb8a3ae2bc03fd1 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Thu, 12 Jun 2025 14:05:12 +0200 Subject: [PATCH 2/6] Make sure stale uploads are removed properly with celery job --- server/mergin/sync/db_events.py | 10 +++++ server/mergin/sync/tasks.py | 11 +++++- server/mergin/tests/test_celery.py | 22 +++++++++-- .../mergin/tests/test_project_controller.py | 38 +++++++++++++------ 4 files changed, 65 insertions(+), 16 deletions(-) diff --git a/server/mergin/sync/db_events.py b/server/mergin/sync/db_events.py index 18d1ce60..7d7e55e0 100644 --- a/server/mergin/sync/db_events.py +++ b/server/mergin/sync/db_events.py @@ -7,6 +7,9 @@ from sqlalchemy import event from ..app import db +from .models import ProjectVersion +from .public_api_controller import push_finished +from .tasks import remove_stale_project_uploads def check(session): @@ -14,9 +17,16 @@ def check(session): abort(503, "Service unavailable due to maintenance, please try later") +def cleanup_on_push_finished(project_version: ProjectVersion) -> None: + """On finished push trigger celery job cleanup""" + remove_stale_project_uploads.delay(project_version.project_id) + + def register_events(): event.listen(db.session, "before_commit", check) + push_finished.connect(cleanup_on_push_finished) def remove_events(): event.remove(db.session, "before_commit", check) + push_finished.connect(cleanup_on_push_finished) diff --git a/server/mergin/sync/tasks.py b/server/mergin/sync/tasks.py index f56fb273..ee42c416 100644 --- a/server/mergin/sync/tasks.py +++ b/server/mergin/sync/tasks.py @@ -10,7 +10,7 @@ from zipfile import ZIP_DEFLATED, ZipFile from flask import current_app -from .models import Project, ProjectVersion, FileHistory +from .models import Project, ProjectVersion, FileHistory, Upload from .storages.disk import move_to_tmp from .config import Configuration from ..celery import celery @@ -155,3 +155,12 @@ def remove_projects_archives(): os.remove(path) except OSError as e: logging.error(f"Unable to remove {path}: {str(e)}") + + +@celery.task +def remove_stale_project_uploads(project_id: str): + """Remove project stale uploads""" + db.session.info = {"msg": "remove_stale_project_uploads"} + for upload in Upload.query.filter_by(project_id=project_id).all(): + if not upload.is_active(): + upload.clear() diff --git a/server/mergin/tests/test_celery.py b/server/mergin/tests/test_celery.py index a5d07f47..619897a8 100644 --- a/server/mergin/tests/test_celery.py +++ b/server/mergin/tests/test_celery.py @@ -10,19 +10,20 @@ from ..app import db from ..config import Configuration -from ..sync.models import Project, AccessRequest, ProjectRole, ProjectVersion +from ..sync.models import Project, AccessRequest, ProjectRole, ProjectVersion, Upload from ..celery import send_email_async from ..sync.tasks import ( + remove_stale_project_uploads, remove_temp_files, remove_projects_backups, create_project_version_zip, remove_projects_archives, ) from ..sync.storages.disk import move_to_tmp -from . import test_project, test_workspace_name, test_workspace_id +from . import test_project, test_workspace_name, test_workspace_id, json_headers +from .test_project_controller import create_transaction from .utils import add_user, create_workspace, create_project, login, modify_file_times from ..auth.models import User -from . import json_headers def test_send_email(app): @@ -157,3 +158,18 @@ def test_create_project_version_zip(diff_project): modify_file_times(latest_version.zip_path, new_time) remove_projects_archives() assert not os.path.exists(latest_version.zip_path) + + +@patch.object(Upload, "is_active") +def test_after_push_upload_cleanup(mock_is_active, client): + """Test stale uploads are removed""" + upload, _ = create_transaction("mergin", {}) + transaction_id = upload.id + + mock_is_active.return_value = True + remove_stale_project_uploads(upload.project_id) + assert Upload.query.get(transaction_id) + + mock_is_active.return_value = False + remove_stale_project_uploads(upload.project_id) + assert not Upload.query.get(transaction_id) diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 977a2326..1be460c8 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -5,6 +5,7 @@ import datetime import os from dataclasses import asdict +from unittest import mock from unittest.mock import patch from urllib.parse import quote import pysqlite3 @@ -16,16 +17,16 @@ import hashlib import shutil import re - from flask_login import current_user from pygeodiff import GeoDiff from flask import url_for, current_app import tempfile - from sqlalchemy import desc from sqlalchemy.exc import IntegrityError from ..app import db +from ..config import Configuration +from ..sync.config import Configuration as SyncConfiguration from ..sync.models import ( Project, Upload, @@ -39,7 +40,7 @@ ) from ..sync.files import ChangesSchema, UploadChanges from ..sync.schemas import ProjectListSchema -from ..sync.utils import generate_checksum, is_versioned_file +from ..sync.utils import generate_checksum, is_versioned_file, get_project_path from ..auth.models import User, UserProfile from . import ( @@ -61,9 +62,6 @@ login_as_admin, upload_file_to_project, ) -from ..config import Configuration -from ..sync.config import Configuration as SyncConfiguration -from ..sync.utils import get_project_path CHUNK_SIZE = 1024 @@ -1021,12 +1019,6 @@ def _get_changes_with_diff_0_size(project_dir): return changes -def _get_non_blocking_changes(project_dir): - changes = _get_changes(project_dir) - changes["updated"] = changes["removed"] = [] - return changes - - test_push_data = [ ( {"version": "v1", "changes": _get_changes_without_added(test_project_dir)}, @@ -1105,6 +1097,7 @@ def test_concurrent_uploads(client): assert resp.status_code == 200 upload = Upload.query.get(resp.json["transaction"]) assert upload.blocking is True + assert upload.is_active() uploaded_file = file_info( test_project_dir, "test_dir/test4.txt", chunk_size=CHUNK_SIZE @@ -1148,6 +1141,21 @@ def test_concurrent_uploads(client): assert resp.status_code == 400 assert resp.json["detail"] == "Another process is running. Please try later." + upload = Upload.query.filter_by(blocking=True).first() + trasaction_id = str(upload.id) + # pretent blocking upload is stale + with patch("mergin.sync.models.Upload.is_active") as mock_is_active: + mock_is_active.return_value = False + resp = client.post( + url, + data=json.dumps(data, cls=DateTimeEncoder).encode("utf-8"), + headers=json_headers, + ) + assert resp.status_code == 200 + assert resp.json["transaction"] + # previous stale upload was removed + assert not Upload.query.get(trasaction_id) + def test_push_to_new_project(client): # create blank project @@ -2574,6 +2582,12 @@ def test_signals(client): upload_file_to_project(project, "test.txt", client) push_finished_mock.assert_called_once() + with patch( + "mergin.sync.tasks.remove_stale_project_uploads.delay" + ) as upload_cleanup_mock: + upload_file_to_project(project, "test.qgs", client) + upload_cleanup_mock.assert_called_once() + def test_filepath_manipulation(client): """Test filepath validation during file upload""" From 7c98906bc6e8acc5a003b06e7c3e58b781af392a Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Fri, 13 Jun 2025 09:37:54 +0200 Subject: [PATCH 3/6] Fix: make sure updated/removed files from project must exist --- server/mergin/sync/public_api_controller.py | 9 +++- .../mergin/tests/test_project_controller.py | 48 +++++++++++++++++-- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index d72b7b7f..88c47bf6 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -771,9 +771,10 @@ def project_push(namespace, project_name): if pending_upload and pending_upload.is_active(): abort(400, "Another process is running. Please try later.") + current_files = set(file.path for file in project.files) for item in upload_changes.added: # check if same file is not already uploaded or in pending upload - if not all(ele.path != item.path for ele in project.files): + if item.path in current_files: abort(400, f"File {item.path} has been already uploaded") for upload in project.uploads.all(): @@ -794,6 +795,12 @@ def project_push(namespace, project_name): f"Please remove the file or try compressing it into a ZIP file before uploading.", ) + # check consistency of changes + if not set( + file.path for file in upload_changes.updated + upload_changes.removed + ).issubset(current_files): + abort(400, "Update or remove changes contain files that are not in project") + # changes' files must be unique changes_files = [ f.path diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 1be460c8..1b38257a 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -1019,6 +1019,17 @@ def _get_changes_with_diff_0_size(project_dir): return changes +def _get_random_file_metadata(): + """Return fake metadata for non-existing random file""" + return { + "path": f"{uuid.uuid4().hex}.txt", + "checksum": hashlib.sha1().hexdigest(), + "size": 0, + "mtime": datetime.datetime.now().timestamp(), + "chunks": [], + } + + test_push_data = [ ( {"version": "v1", "changes": _get_changes_without_added(test_project_dir)}, @@ -1049,9 +1060,35 @@ def _get_changes_with_diff_0_size(project_dir): { "version": "v1", "changes": { - "added": [{"path": "test.txt"}], + "added": [], + "removed": [_get_random_file_metadata()], + "updated": [], + }, + }, + 400, + ), # delete not-existing file + ( + { + "version": "v1", + "changes": { + "added": [], "removed": [], - "updated": [{"path": "test.txt"}], + "updated": [_get_random_file_metadata()], + }, + }, + 400, + ), # update not-existing file + ( + { + "version": "v1", + "changes": { + "added": [ + file_info(test_project_dir, "test.txt", chunk_size=CHUNK_SIZE) + ], + "removed": [], + "updated": [ + file_info(test_project_dir, "test.txt", chunk_size=CHUNK_SIZE) + ], }, }, 400, @@ -1182,7 +1219,9 @@ def test_push_to_new_project(client): current_app.config["BLACKLIST"] = ["test4"] url = "/v1/project/push/{}/{}".format(test_workspace_name, "blank") - data = {"version": "v0", "changes": _get_changes(test_project_dir)} + changes = _get_changes(test_project_dir) + changes["updated"] = changes["removed"] = [] + data = {"version": "v0", "changes": changes} resp = client.post( url, data=json.dumps(data, cls=DateTimeEncoder).encode("utf-8"), @@ -2416,6 +2455,7 @@ def test_project_version_integrity(client): changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) + a = upload.project.files # try to finish the transaction which would fail on version created integrity error, e.g. race conditions with patch.object( @@ -2447,7 +2487,7 @@ def test_project_version_integrity(client): # to insert an identical project version when no upload (only one endpoint used), # we need to pretend side effect of a function called just before project version insertion def _get_user_agent(): - add_project_version(project, changes) + add_project_version(project, {}) # bypass endpoint checks upload.project.latest_version = ProjectVersion.from_v_name(data["version"]) return "Input" From 05c3eb868f050a1ee98e7ec01c5e7a0408eb9915 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Tue, 17 Jun 2025 11:00:59 +0200 Subject: [PATCH 4/6] Refactor upload changes handling move relevant validations into upload schema use upload schema only in push init while project file object is used for actual file manipulations remove version context and redundant dataclasses for upload schema --- server/mergin/sync/commands.py | 4 +- server/mergin/sync/files.py | 196 +++++++++--- server/mergin/sync/models.py | 110 +++---- server/mergin/sync/public_api_controller.py | 299 +++++++++--------- server/mergin/sync/schemas.py | 2 +- server/mergin/sync/storages/disk.py | 8 +- server/mergin/tests/fixtures.py | 7 +- server/mergin/tests/test_db_hooks.py | 9 +- .../mergin/tests/test_project_controller.py | 19 +- server/mergin/tests/utils.py | 38 +-- 10 files changed, 377 insertions(+), 315 deletions(-) diff --git a/server/mergin/sync/commands.py b/server/mergin/sync/commands.py index 97e85981..4ec898cf 100644 --- a/server/mergin/sync/commands.py +++ b/server/mergin/sync/commands.py @@ -9,7 +9,6 @@ from datetime import datetime from flask import Flask, current_app -from .files import UploadChanges from ..app import db from .models import Project, ProjectVersion from .utils import split_project_path @@ -52,8 +51,7 @@ def create(name, namespace, username): # pylint: disable=W0612 p = Project(**project_params) p.updated = datetime.utcnow() db.session.add(p) - changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1") + pv = ProjectVersion(p, 0, user.id, [], "127.0.0.1") pv.project = p db.session.commit() os.makedirs(p.storage.project_dir, exist_ok=True) diff --git a/server/mergin/sync/files.py b/server/mergin/sync/files.py index 12b30afe..6073b0fb 100644 --- a/server/mergin/sync/files.py +++ b/server/mergin/sync/files.py @@ -3,14 +3,29 @@ # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial import datetime import os +import uuid from dataclasses import dataclass -from typing import Optional, List -from marshmallow import fields, EXCLUDE, pre_load, post_load, post_dump +from enum import Enum +from flask import current_app +from marshmallow import ValidationError, fields, EXCLUDE, post_dump, validates_schema from pathvalidate import sanitize_filename +from typing import Optional, List +from .utils import is_file_name_blacklisted, is_qgis, is_versioned_file from ..app import DateTimeWithZ, ma +class PushChangeType(Enum): + CREATE = "create" + UPDATE = "update" + DELETE = "delete" + UPDATE_DIFF = "update_diff" + + @classmethod + def values(cls): + return [member.value for member in cls.__members__.values()] + + def mergin_secure_filename(filename: str) -> str: """Generate secure filename for given file""" filename = os.path.normpath(filename) @@ -24,94 +39,181 @@ def mergin_secure_filename(filename: str) -> str: @dataclass class File: - """Base class for every file object""" + """Base class for every file object, either intended to upload or already existing in project""" path: str checksum: str size: int - location: str def is_valid_gpkg(self): """Check if diff file is valid""" return self.size != 0 +@dataclass +class ProjectDiffFile(File): + """Metadata for geodiff diff file (aka. changeset) associated with geopackage""" + + # location where file is actually stored + location: str + + @dataclass class ProjectFile(File): - """Project file metadata including metadata for diff file""" + """Project file metadata including metadata for diff file and location where it is stored""" # metadata for gpkg diff file - diff: Optional[File] + diff: Optional[ProjectDiffFile] # deprecated attribute kept for public API compatibility mtime: Optional[datetime.datetime] + # location where file is actually stored + location: str @dataclass -class UploadFile(File): - """File to be uploaded coming from client push process""" - - # determined by client - chunks: Optional[List[str]] - diff: Optional[File] - +class ProjectFileChange(ProjectFile): + """Metadata of changed file in project version. + + This item is saved into database into file_history. + """ + + change: PushChangeType + + +def files_changes_from_upload(changes: dict, version: int) -> List["ProjectFileChange"]: + """Create a list of version file changes from upload changes dictionary used by public API. + + It flattens changes dict and adds change type to each item. Also generates location for each file. + """ + secure_filenames = [] + version_changes = [] + version = "v" + str(version) + for key in ("added", "updated", "removed"): + for item in changes.get(key, []): + location = os.path.join(version, mergin_secure_filename(item["path"])) + diff = None + + # make sure we have unique location for each file + if location in secure_filenames: + filename, file_extension = os.path.splitext(location) + location = filename + f".{str(uuid.uuid4())}" + file_extension + + secure_filenames.append(location) + + if key == "removed": + change = PushChangeType.DELETE + location = None + elif key == "added": + change = PushChangeType.CREATE + else: + change = PushChangeType.UPDATE + if item.get("diff"): + change = PushChangeType.UPDATE_DIFF + diff_location = os.path.join( + version, mergin_secure_filename(item["diff"]["path"]) + ) + if diff_location in secure_filenames: + filename, file_extension = os.path.splitext(diff_location) + diff_location = ( + filename + f".{str(uuid.uuid4())}" + file_extension + ) + + secure_filenames.append(diff_location) + diff = ProjectDiffFile( + path=item["diff"]["path"], + checksum=item["diff"]["checksum"], + size=item["diff"]["size"], + location=diff_location, + ) + + file_change = ProjectFileChange( + path=item["path"], + checksum=item["checksum"], + size=item["size"], + mtime=None, + change=change, + location=location, + diff=diff, + ) + version_changes.append(file_change) -@dataclass -class UploadChanges: - added: List[UploadFile] - updated: List[UploadFile] - removed: List[UploadFile] + return version_changes class FileSchema(ma.Schema): path = fields.String() size = fields.Integer() checksum = fields.String() - location = fields.String(load_default="", load_only=True) class Meta: unknown = EXCLUDE - @post_load - def create_obj(self, data, **kwargs): - return File(**data) - class UploadFileSchema(FileSchema): chunks = fields.List(fields.String(), load_default=[]) diff = fields.Nested(FileSchema(), many=False, load_default=None) - @pre_load - def pre_load(self, data, **kwargs): - # add future location based on context version - version = f"v{self.context.get('version')}" - if not data.get("location"): - data["location"] = os.path.join( - version, mergin_secure_filename(data["path"]) - ) - if data.get("diff") and not data.get("diff").get("location"): - data["diff"]["location"] = os.path.join( - version, mergin_secure_filename(data["diff"]["path"]) - ) - return data - - @post_load - def create_obj(self, data, **kwargs): - return UploadFile(**data) - class ChangesSchema(ma.Schema): """Schema for upload changes""" - added = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) - updated = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) - removed = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) + added = fields.List( + fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] + ) + updated = fields.List( + fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] + ) + removed = fields.List( + fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] + ) + is_blocking = fields.Method("_is_blocking") class Meta: unknown = EXCLUDE - @post_load - def create_obj(self, data, **kwargs): - return UploadChanges(**data) + def _is_blocking(self, obj) -> bool: + """Check if changes would be blocking.""" + # let's mark upload as non-blocking only if there are new non-spatial data added (e.g. photos) + return bool( + len(obj.get("updated", [])) + or len(obj.get("removed", [])) + or any( + is_qgis(f["path"]) or is_versioned_file(f["path"]) + for f in obj.get("added", []) + ) + ) + + @post_dump + def remove_blacklisted_files(self, data, **kwargs): + """Files which are blacklisted are not allowed to be uploaded and are simple ignored.""" + for key in ("added", "updated", "removed"): + data[key] = [ + f + for f in data[key] + if not is_file_name_blacklisted( + f["path"], current_app.config["BLACKLIST"] + ) + ] + return data + + @validates_schema + def validate(self, data, **kwargs): + """Basic consistency validations for upload metadata""" + changes_files = [ + f["path"] for f in data["added"] + data["updated"] + data["removed"] + ] + + if len(changes_files) == 0: + raise ValidationError("No changes") + + # changes' files must be unique + if len(set(changes_files)) != len(changes_files): + raise ValidationError("Not unique changes") + + # check if all .gpkg file are valid + for file in data["added"] + data["updated"]: + if is_versioned_file(file["path"]) and file["size"] == 0: + raise ValidationError("File is not valid") class ProjectFileSchema(FileSchema): diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 5992726a..be57e4d4 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -23,9 +23,11 @@ from .files import ( File, - UploadChanges, + ProjectDiffFile, + ProjectFileChange, ChangesSchema, ProjectFile, + PushChangeType, ) from .interfaces import WorkspaceRole from .storages.disk import move_to_tmp @@ -38,17 +40,6 @@ project_access_granted = signal("project_access_granted") -class PushChangeType(Enum): - CREATE = "create" - UPDATE = "update" - DELETE = "delete" - UPDATE_DIFF = "update_diff" - - @classmethod - def values(cls): - return [member.value for member in cls.__members__.values()] - - class Project(db.Model): id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = db.Column(db.String, index=True) @@ -181,7 +172,7 @@ def files(self) -> List[ProjectFile]: checksum=row.checksum, location=row.location, mtime=row.mtime, - diff=File(**row.diff) if row.diff else None, + diff=ProjectDiffFile(**row.diff) if row.diff else None, ) for row in db.session.execute(query, params).fetchall() ] @@ -504,9 +495,9 @@ def path(self) -> str: return self.file.path @property - def diff_file(self) -> Optional[File]: + def diff_file(self) -> Optional[ProjectDiffFile]: if self.diff: - return File(**self.diff) + return ProjectDiffFile(**self.diff) @property def mtime(self) -> datetime: @@ -705,7 +696,7 @@ def __init__( project: Project, name: int, author_id: int, - changes: UploadChanges, + changes: List[ProjectFileChange], ip: str, user_agent: str = None, device_id: str = None, @@ -725,9 +716,7 @@ def __init__( ).all() } - changed_files_paths = [ - f.path for f in changes.updated + changes.removed + changes.added - ] + changed_files_paths = set(change.path for change in changes) existing_files_map = { f.path: f for f in ProjectFilePath.query.filter_by(project_id=self.project_id) @@ -735,46 +724,32 @@ def __init__( .all() } - for key in ( - ("added", PushChangeType.CREATE), - ("updated", PushChangeType.UPDATE), - ("removed", PushChangeType.DELETE), - ): - change_attr = key[0] - change_type = key[1] - - for upload_file in getattr(changes, change_attr): - is_diff_change = ( - change_type is PushChangeType.UPDATE - and upload_file.diff is not None - ) - - file = existing_files_map.get( - upload_file.path, ProjectFilePath(self.project_id, upload_file.path) - ) - fh = FileHistory( - file=file, - size=upload_file.size, - checksum=upload_file.checksum, - location=upload_file.location, - diff=( - asdict(upload_file.diff) - if (is_diff_change and upload_file.diff) - else null() - ), - change=( - PushChangeType.UPDATE_DIFF if is_diff_change else change_type - ), - ) - fh.version = self - fh.project_version_name = self.name - db.session.add(fh) - db.session.flush() + for item in changes: + # get existing DB file reference or create a new one (for added files) + db_file = existing_files_map.get( + item.path, ProjectFilePath(self.project_id, item.path) + ) + fh = FileHistory( + file=db_file, + size=item.size, + checksum=item.checksum, + location=item.location, + diff=( + asdict(item.diff) + if (item.change is PushChangeType.UPDATE_DIFF and item.diff) + else null() + ), + change=item.change, + ) + fh.version = self + fh.project_version_name = self.name + db.session.add(fh) + db.session.flush() - if change_type is PushChangeType.DELETE: - latest_files_map.pop(fh.path, None) - else: - latest_files_map[fh.path] = fh.id + if item.change is PushChangeType.DELETE: + latest_files_map.pop(fh.path, None) + else: + latest_files_map[fh.path] = fh.id # update cached values in project and push to transaction buffer so that self.files is up-to-date self.project.latest_project_files.file_history_ids = latest_files_map.values() @@ -909,7 +884,7 @@ def files(self) -> List[ProjectFile]: checksum=row.checksum, location=row.location, mtime=row.mtime, - diff=File(**row.diff) if row.diff else None, + diff=ProjectDiffFile(**row.diff) if row.diff else None, ) for row in result ] @@ -1029,12 +1004,13 @@ class Upload(db.Model): ), ) - def __init__(self, project: Project, changes: UploadChanges, user_id: int): + def __init__(self, project: Project, changes: dict, user_id: int): + upload_changes = ChangesSchema().dump(changes) self.id = str(uuid.uuid4()) self.project_id = project.id - self.changes = ChangesSchema().dump(changes) self.user_id = user_id - self.blocking = self.is_blocking(changes) + self.blocking = upload_changes.pop("is_blocking") + self.changes = upload_changes @property def upload_dir(self): @@ -1059,16 +1035,6 @@ def clear(self): db.session.delete(self) db.session.commit() - @staticmethod - def is_blocking(changes: UploadChanges) -> bool: - """Check if changes would be blocking.""" - # let's mark upload as non-blocking only if there are new non-spatial data added (e.g. photos) - return bool( - len(changes.updated) - or len(changes.removed) - or any(is_qgis(f.path) or is_versioned_file(f.path) for f in changes.added) - ) - def file_already_in_upload(self, path) -> bool: """Check if file is not already as new added file""" return any(f["path"] == path for f in self.changes["added"]) diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 88c47bf6..e9fb4a76 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -10,8 +10,8 @@ from dataclasses import asdict from typing import Dict from urllib.parse import quote -import uuid from datetime import datetime +from marshmallow import ValidationError import gevent import psycopg2 @@ -50,11 +50,13 @@ ProjectRole, ) from .files import ( - UploadChanges, ChangesSchema, - UploadFileSchema, + ProjectDiffFile, + ProjectFile, + ProjectFileChange, ProjectFileSchema, - FileSchema, + files_changes_from_upload, + mergin_secure_filename, ) from .schemas import ( ProjectSchema, @@ -239,15 +241,24 @@ def add_project(namespace): # noqa: E501 .first_or_404() ) version_name = 1 - files = UploadFileSchema(context={"version": 1}, many=True).load( - FileSchema(exclude=("location",), many=True).dump(template.files) - ) - changes = UploadChanges(added=files, updated=[], removed=[]) + file_changes = [] + for file in template.files: + file_changes.append( + ProjectFileChange( + file.path, + file.checksum, + file.size, + diff=None, + mtime=None, + location=os.path.join("v1", mergin_secure_filename(file.path)), + change=PushChangeType.CREATE, + ) + ) else: template = None version_name = 0 - changes = UploadChanges(added=[], updated=[], removed=[]) + file_changes = [] try: p.storage.initialize(template_project=template) @@ -258,7 +269,7 @@ def add_project(namespace): # noqa: E501 p, version_name, current_user.id, - changes, + file_changes, get_ip(request), get_user_agent(request), get_device_id(request), @@ -758,13 +769,15 @@ def project_push(namespace, project_name): if not pv and version != 0: abort(400, "First push should be with v0") - if all(len(changes[key]) == 0 for key in changes.keys()): - abort(400, "No changes") - - upload_changes = ChangesSchema(context={"version": pv.name + 1}).load(changes) + try: + ChangesSchema().validate(changes) + upload_changes = ChangesSchema().dump(changes) + except ValidationError as err: + msg = err.messages[0] if type(err.messages) == list else "Invalid input data" + abort(400, msg) # reject upload early if there is another blocking upload already running - if Upload.is_blocking(upload_changes): + if upload_changes["is_blocking"]: pending_upload = Upload.query.filter_by( project_id=project.id, blocking=True ).first() @@ -772,77 +785,45 @@ def project_push(namespace, project_name): abort(400, "Another process is running. Please try later.") current_files = set(file.path for file in project.files) - for item in upload_changes.added: + for item in upload_changes["added"]: # check if same file is not already uploaded or in pending upload - if item.path in current_files: - abort(400, f"File {item.path} has been already uploaded") + item_path = item["path"] + if item_path in current_files: + abort(400, f"File {item_path} has been already uploaded") for upload in project.uploads.all(): if not upload.is_active(): continue - if upload.file_already_in_upload(item.path): - abort(400, f"File {item.path} is already in other upload") + if upload.file_already_in_upload(item_path): + abort(400, f"File {item_path} is already in other upload") - if not is_valid_path(item.path): + if not is_valid_path(item_path): abort( 400, - f"Unsupported file name detected: {item.path}. Please remove the invalid characters.", + f"Unsupported file name detected: {item_path}. Please remove the invalid characters.", ) - if not is_supported_extension(item.path): + if not is_supported_extension(item_path): abort( 400, - f"Unsupported file type detected: {item.path}. " + f"Unsupported file type detected: {item_path}. " f"Please remove the file or try compressing it into a ZIP file before uploading.", ) # check consistency of changes if not set( - file.path for file in upload_changes.updated + upload_changes.removed + file["path"] for file in upload_changes["updated"] + upload_changes["removed"] ).issubset(current_files): abort(400, "Update or remove changes contain files that are not in project") - # changes' files must be unique - changes_files = [ - f.path - for f in upload_changes.added + upload_changes.updated + upload_changes.removed - ] - if len(set(changes_files)) != len(changes_files): - abort(400, "Not unique changes") - - sanitized_files = [] - blacklisted_files = [] - for f in upload_changes.added + upload_changes.updated + upload_changes.removed: - # check if .gpkg file is valid - if is_versioned_file(f.path): - if not f.is_valid_gpkg(): - abort(400, f"File {f.path} is not valid") - if is_file_name_blacklisted(f.path, current_app.config["BLACKLIST"]): - blacklisted_files.append(f.path) - # all file need to be unique after sanitized - if f.location in sanitized_files: - filename, file_extension = os.path.splitext(f.location) - f.location = filename + f".{str(uuid.uuid4())}" + file_extension - sanitized_files.append(f.location) - if f.diff: - if f.diff.location in sanitized_files: - filename, file_extension = os.path.splitext(f.diff.location) - f.diff.location = filename + f".{str(uuid.uuid4())}" + file_extension - sanitized_files.append(f.diff.location) - - # remove blacklisted files from changes - for key in upload_changes.__dict__.keys(): - new_value = [ - f for f in getattr(upload_changes, key) if f.path not in blacklisted_files - ] - setattr(upload_changes, key, new_value) - # Check user data limit - updates = [f.path for f in upload_changes.updated] + updates = [f["path"] for f in upload_changes["updated"]] updated_files = list(filter(lambda i: i.path in updates, project.files)) additional_disk_usage = ( - sum(file.size for file in upload_changes.added + upload_changes.updated) + sum( + file["size"] for file in upload_changes["added"] + upload_changes["updated"] + ) - sum(file.size for file in updated_files) - - sum(file.size for file in upload_changes.removed) + - sum(file["size"] for file in upload_changes["removed"]) ) current_usage = ws.disk_usage() @@ -899,12 +880,13 @@ def project_push(namespace, project_name): next_version = version + 1 user_agent = get_user_agent(request) device_id = get_device_id(request) + file_changes = files_changes_from_upload(upload.changes, version=next_version) try: pv = ProjectVersion( project, next_version, current_user.id, - upload_changes, + file_changes, get_ip(request), user_agent, device_id, @@ -950,29 +932,27 @@ def chunk_upload(transaction_id, chunk_id): """ upload, upload_dir = get_upload(transaction_id) request.view_args["project"] = upload.project - upload_changes = ChangesSchema( - context={"version": upload.project.latest_version + 1} - ).load(upload.changes) - for f in upload_changes.added + upload_changes.updated: - if chunk_id in f.chunks: - dest = os.path.join(upload_dir, "chunks", chunk_id) - lockfile = os.path.join(upload_dir, "lockfile") - with Toucher(lockfile, 30): - try: - # we could have used request.data here, but it could eventually cause OOM issue - save_to_file( - request.stream, dest, current_app.config["MAX_CHUNK_SIZE"] - ) - except IOError: - move_to_tmp(dest, transaction_id) - abort(400, "Too big chunk") - if os.path.exists(dest): - checksum = generate_checksum(dest) - size = os.path.getsize(dest) - return jsonify({"checksum": checksum, "size": size}), 200 - else: - abort(400, "Upload was probably canceled") - abort(404) + chunks = [] + for file in upload.changes["added"] + upload.changes["updated"]: + chunks += file.get("chunks", []) + + if chunk_id not in chunks: + abort(404) + + dest = os.path.join(upload_dir, "chunks", chunk_id) + with Toucher(upload.lockfile, 30): + try: + # we could have used request.data here, but it could eventually cause OOM issue + save_to_file(request.stream, dest, current_app.config["MAX_CHUNK_SIZE"]) + except IOError: + move_to_tmp(dest, transaction_id) + abort(400, "Too big chunk") + if os.path.exists(dest): + checksum = generate_checksum(dest) + size = os.path.getsize(dest) + return jsonify({"checksum": checksum, "size": size}), 200 + else: + abort(400, "Upload was probably canceled") @auth_required @@ -1003,24 +983,34 @@ def push_finish(transaction_id): project_path = get_project_path(project) next_version = project.latest_version + 1 v_next_version = ProjectVersion.to_v_name(next_version) - changes = ChangesSchema(context={"version": next_version}).load(upload.changes) + try: + upload_changes = ChangesSchema().load(upload.changes) + except ValidationError as err: + msg = err.messages[0] if type(err.messages) == list else "Invalid input data" + abort(422, msg) + + file_changes = files_changes_from_upload(upload_changes, next_version) + chunks_map = { + f["path"]: f["chunks"] + for f in upload_changes["added"] + upload_changes["updated"] + } corrupted_files = [] - for f in changes.added + changes.updated: - if f.diff is not None: - dest_file = os.path.join(upload_dir, "files", f.diff.location) - expected_size = f.diff.size - else: - dest_file = os.path.join(upload_dir, "files", f.location) - expected_size = f.size + # Concatenate chunks into single file + for f in file_changes: + if f.change == PushChangeType.DELETE: + continue - # Concatenate chunks into single file + f_path = ( + f.diff.location if f.change == PushChangeType.UPDATE_DIFF else f.location + ) + temporary_location = os.path.join(upload_dir, "files", f_path) # TODO we need to move this elsewhere since it can fail for large files (and slow FS) - os.makedirs(os.path.dirname(dest_file), exist_ok=True) - with open(dest_file, "wb") as dest: + os.makedirs(os.path.dirname(temporary_location), exist_ok=True) + with open(temporary_location, "wb") as dest: try: - for chunk_id in f.chunks: + for chunk_id in chunks_map.get(f.path, []): sleep(0) # to unblock greenlet chunk_file = os.path.join(upload_dir, "chunks", chunk_id) with open(chunk_file, "rb") as src: @@ -1035,24 +1025,27 @@ def push_finish(transaction_id): ) corrupted_files.append(f.path) continue - if not is_supported_type(dest_file): - logging.info(f"Rejecting blacklisted file: {dest_file}") + if not is_supported_type(temporary_location): + logging.info(f"Rejecting blacklisted file: {temporary_location}") abort( 400, f"Unsupported file type detected: {f.path}. " f"Please remove the file or try compressing it into a ZIP file before uploading.", ) - if expected_size != os.path.getsize(dest_file): + # check if .gpkg file is valid + if is_versioned_file(temporary_location) and not f.is_valid_gpkg(): + corrupted_files.append(f.path) + continue + + expected_size = ( + f.diff.size if f.change == PushChangeType.UPDATE_DIFF else f.size + ) + if expected_size != os.path.getsize(temporary_location): logging.error( - "Data integrity check has failed on file %s in project %s" - % (f.path, project_path), + f"Data integrity check has failed on file {f.path} in project {project_path}", exc_info=True, ) - # check if .gpkg file is valid - if is_versioned_file(dest_file): - if not f.is_valid_gpkg(): - corrupted_files.append(f.path) corrupted_files.append(f.path) if corrupted_files: @@ -1076,40 +1069,48 @@ def push_finish(transaction_id): os.renames(files_dir, target_dir) # apply gpkg updates sync_errors = {} - to_remove = [i.path for i in changes.removed] + to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE] current_files = [f for f in project.files if f.path not in to_remove] - for updated_file in changes.updated: - # yield to gevent hub since geodiff action can take some time to prevent worker timeout - sleep(0) - current_file = next( - (i for i in current_files if i.path == updated_file.path), None - ) - if not current_file: - sync_errors[updated_file.path] = "file not found on server " - continue - - if updated_file.diff: - result = project.storage.apply_diff( - current_file, updated_file, next_version + for file in file_changes: + # for updates try to apply diff to create a full updated gpkg file or from full .gpkg try to create corresponding diff + if file.change in ( + PushChangeType.UPDATE, + PushChangeType.UPDATE_DIFF, + ) and is_versioned_file(file.path): + current_file = next( + (i for i in current_files if i.path == file.path), None ) - if result.ok(): - checksum, size = result.value - updated_file.checksum = checksum - updated_file.size = size - else: - sync_errors[updated_file.path] = ( - f"project: {project.workspace.name}/{project.name}, {result.value}" - ) + if not current_file: + sync_errors[file.path] = "file not found on server " + continue - elif is_versioned_file(updated_file.path): - result = project.storage.construct_diff( - current_file, updated_file, next_version - ) - if result.ok(): - updated_file.diff = result.value + # yield to gevent hub since geodiff action can take some time to prevent worker timeout + sleep(0) + + if file.diff: + result = project.storage.apply_diff( + current_file, file, next_version + ) + if result.ok(): + checksum, size = result.value + file.checksum = checksum + file.size = size + else: + sync_errors[file.path] = ( + f"project: {project.workspace.name}/{project.name}, {result.value}" + ) else: - # if diff cannot be constructed it would be force update - logging.warning(f"Geodiff: create changeset error {result.value}") + result = project.storage.construct_diff( + current_file, file, next_version + ) + if result.ok(): + file.diff = result.value + file.change = PushChangeType.UPDATE_DIFF + else: + # if diff cannot be constructed it would be force update + logging.warning( + f"Geodiff: create changeset error {result.value}" + ) if sync_errors: msg = "" @@ -1123,7 +1124,7 @@ def push_finish(transaction_id): project, next_version, current_user.id, - changes, + file_changes, get_ip(request), user_agent, device_id, @@ -1257,15 +1258,25 @@ def clone_project(namespace, project_name): # noqa: E501 user_agent = get_user_agent(request) device_id = get_device_id(request) # transform source files to new uploaded files - files = UploadFileSchema(context={"version": 1}, many=True).load( - FileSchema(exclude=("location",), many=True).dump(cloned_project.files) - ) - changes = UploadChanges(added=files, updated=[], removed=[]) + file_changes = [] + for file in cloned_project.files: + file_changes.append( + ProjectFileChange( + file.path, + file.checksum, + file.size, + diff=None, + mtime=None, + location=os.path.join("v1", mergin_secure_filename(file.path)), + change=PushChangeType.CREATE, + ) + ) + project_version = ProjectVersion( p, version, current_user.id, - changes, + file_changes, get_ip(request), user_agent, device_id, diff --git a/server/mergin/sync/schemas.py b/server/mergin/sync/schemas.py index 75b6f09e..617ad531 100644 --- a/server/mergin/sync/schemas.py +++ b/server/mergin/sync/schemas.py @@ -75,7 +75,7 @@ def project_user_permissions(project): class FileHistorySchema(ma.SQLAlchemyAutoSchema): mtime = DateTimeWithZ() - diff = fields.Nested(FileSchema(), attribute="diff_file", exclude=("location",)) + diff = fields.Nested(FileSchema(), attribute="diff_file") expiration = DateTimeWithZ(attribute="expiration", dump_only=True) class Meta: diff --git a/server/mergin/sync/storages/disk.py b/server/mergin/sync/storages/disk.py index 4debb255..459d704d 100644 --- a/server/mergin/sync/storages/disk.py +++ b/server/mergin/sync/storages/disk.py @@ -21,7 +21,7 @@ generate_checksum, is_versioned_file, ) -from ..files import mergin_secure_filename, ProjectFile, UploadFile, File +from ..files import ProjectDiffFile, mergin_secure_filename, ProjectFile def save_to_file(stream, path, max_size=None): @@ -245,7 +245,7 @@ def _generator(): return _generator() def apply_diff( - self, current_file: ProjectFile, upload_file: UploadFile, version: int + self, current_file: ProjectFile, upload_file: ProjectFile, version: int ) -> Result: """Apply geodiff diff file on current gpkg basefile. Creates GeodiffActionHistory record of the action. Returns checksum and size of generated file. If action fails it returns geodiff error message. @@ -313,7 +313,7 @@ def apply_diff( return Err(self.gediff_log.getvalue()) def construct_diff( - self, current_file: ProjectFile, upload_file: UploadFile, version: int + self, current_file: ProjectFile, upload_file: ProjectFile, version: int ) -> Result: """Construct geodiff diff file from uploaded gpkg and current basefile. Returns diff metadata as a result. If action fails it returns geodiff error message. @@ -345,7 +345,7 @@ def construct_diff( basefile_tmp, uploaded_file_tmp, changeset_tmp ) # create diff metadata as it would be created by other clients - diff_file = File( + diff_file = ProjectDiffFile( path=diff_name, checksum=generate_checksum(changeset_tmp), size=os.path.getsize(changeset_tmp), diff --git a/server/mergin/tests/fixtures.py b/server/mergin/tests/fixtures.py index 7cff688e..9f39909d 100644 --- a/server/mergin/tests/fixtures.py +++ b/server/mergin/tests/fixtures.py @@ -19,7 +19,7 @@ from ..stats.models import MerginInfo from . import test_project, test_workspace_id, test_project_dir, TMP_DIR from .utils import login_as_admin, initialize, cleanup, file_info -from ..sync.files import ChangesSchema +from ..sync.files import files_changes_from_upload thisdir = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.join(thisdir, os.pardir)) @@ -213,12 +213,13 @@ def diff_project(app): else: # no files uploaded, hence no action needed pass - upload_changes = ChangesSchema(context={"version": i + 2}).load(change) + + file_changes = files_changes_from_upload(change, version=i + 2) pv = ProjectVersion( project, i + 2, project.creator.id, - upload_changes, + file_changes, "127.0.0.1", ) assert pv.project_size == sum(file.size for file in pv.files) diff --git a/server/mergin/tests/test_db_hooks.py b/server/mergin/tests/test_db_hooks.py index f896f99b..48c14c0e 100644 --- a/server/mergin/tests/test_db_hooks.py +++ b/server/mergin/tests/test_db_hooks.py @@ -18,7 +18,6 @@ ProjectRole, ProjectUser, ) -from ..sync.files import UploadChanges from ..auth.models import User from ..app import db from . import DEFAULT_USER @@ -40,8 +39,7 @@ def test_close_user_account(client, diff_project): # user has access to mergin user diff_project diff_project.set_role(user.id, ProjectRole.WRITER) # user contributed to another user project so he is listed in projects history - changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(diff_project, 11, user.id, changes, "127.0.0.1") + pv = ProjectVersion(diff_project, 11, user.id, [], "127.0.0.1") diff_project.latest_version = pv.name pv.project = diff_project db.session.add(pv) @@ -116,8 +114,9 @@ def test_remove_project(client, diff_project): # set up mergin_user = User.query.filter_by(username=DEFAULT_USER[0]).first() project_dir = Path(diff_project.storage.project_dir) - changes = UploadChanges(added=[], removed=[], updated=[]) - upload = Upload(diff_project, changes, mergin_user.id) + upload = Upload( + diff_project, {"added": [], "removed": [], "updated": []}, mergin_user.id + ) db.session.add(upload) project_id = diff_project.id user = add_user("user", "user") diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 1b38257a..989133e6 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -38,7 +38,7 @@ PushChangeType, ProjectFilePath, ) -from ..sync.files import ChangesSchema, UploadChanges +from ..sync.files import ChangesSchema, files_changes_from_upload from ..sync.schemas import ProjectListSchema from ..sync.utils import generate_checksum, is_versioned_file, get_project_path from ..auth.models import User, UserProfile @@ -1205,13 +1205,7 @@ def test_push_to_new_project(client): project, 0, p.creator.id, - ChangesSchema(context={"version": 0}).load( - { - "added": [], - "updated": [], - "removed": [], - } - ), + [], ip="127.0.0.1", ) db.session.add(pv) @@ -1411,8 +1405,7 @@ def create_transaction(username, changes, version=1): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id ).first() - upload_changes = ChangesSchema(context={"version": version}).load(changes) - upload = Upload(project, upload_changes, user.id) + upload = Upload(project, changes, user.id) db.session.add(upload) db.session.commit() upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id) @@ -1525,7 +1518,7 @@ def test_push_finish(client): upload.project, upload.project.latest_version, upload.project.creator.id, - UploadChanges(added=[], updated=[], removed=[]), + [], "127.0.0.1", ) pv.project = upload.project @@ -2436,12 +2429,12 @@ def add_project_version(project, changes, version=None): else User.query.filter_by(username=DEFAULT_USER[0]).first() ) next_version = version or project.next_version() - upload_changes = ChangesSchema(context={"version": next_version}).load(changes) + file_changes = files_changes_from_upload(changes, version=next_version) pv = ProjectVersion( project, next_version, author.id, - upload_changes, + file_changes, ip="127.0.0.1", ) db.session.add(pv) diff --git a/server/mergin/tests/utils.py b/server/mergin/tests/utils.py index 94fc033f..0c4448a6 100644 --- a/server/mergin/tests/utils.py +++ b/server/mergin/tests/utils.py @@ -20,7 +20,7 @@ from ..auth.models import User, UserProfile from ..sync.utils import generate_location, generate_checksum from ..sync.models import Project, ProjectVersion, FileHistory, ProjectRole -from ..sync.files import UploadChanges, ChangesSchema +from ..sync.files import ProjectFileChange, PushChangeType, files_changes_from_upload from ..sync.workspace import GlobalWorkspace from ..app import db from . import json_headers, DEFAULT_USER, test_project, test_project_dir, TMP_DIR @@ -82,8 +82,7 @@ def create_project(name, workspace, user, **kwargs): p.updated = datetime.utcnow() db.session.add(p) db.session.flush() - changes = UploadChanges(added=[], updated=[], removed=[]) - pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1") + pv = ProjectVersion(p, 0, user.id, [], "127.0.0.1") db.session.add(pv) db.session.commit() @@ -156,15 +155,17 @@ def initialize(): for f in files: abs_path = os.path.join(root, f) project_files.append( - { - "path": abs_path.replace(test_project_dir, "").lstrip("/"), - "location": os.path.join( + ProjectFileChange( + path=abs_path.replace(test_project_dir, "").lstrip("/"), + checksum=generate_checksum(abs_path), + size=os.path.getsize(abs_path), + mtime=str(datetime.fromtimestamp(os.path.getmtime(abs_path))), + change=PushChangeType.CREATE, + location=os.path.join( "v1", abs_path.replace(test_project_dir, "").lstrip("/") ), - "size": os.path.getsize(abs_path), - "checksum": generate_checksum(abs_path), - "mtime": str(datetime.fromtimestamp(os.path.getmtime(abs_path))), - } + diff=None, + ) ) p.latest_version = 1 p.public = True @@ -173,14 +174,7 @@ def initialize(): db.session.add(p) db.session.commit() - upload_changes = ChangesSchema(context={"version": 1}).load( - { - "added": project_files, - "updated": [], - "removed": [], - } - ) - pv = ProjectVersion(p, 1, user.id, upload_changes, "127.0.0.1") + pv = ProjectVersion(p, 1, user.id, project_files, "127.0.0.1") db.session.add(pv) db.session.commit() @@ -285,7 +279,7 @@ def create_blank_version(project): project, project.next_version(), project.creator.id, - UploadChanges(added=[], updated=[], removed=[]), + [], "127.0.0.1", ) db.session.add(pv) @@ -355,14 +349,12 @@ def push_change(project, action, path, src_dir): else: return - upload_changes = ChangesSchema(context={"version": project.next_version()}).load( - changes - ) + file_changes = files_changes_from_upload(changes, version=project.next_version()) pv = ProjectVersion( project, project.next_version(), project.creator.id, - upload_changes, + file_changes, "127.0.0.1", ) db.session.add(pv) From 573b31a5ceed7bc66be1db04129eb3a750d3d7e0 Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Mon, 7 Jul 2025 17:29:45 +0200 Subject: [PATCH 5/6] Added: - header title to sidebar - minor padding tweaks - add responsive and non-responsive mode to DataViewWrapper -> we need sometimes table inside other component --- .../src/common/components/AppSidebarRight.vue | 12 ++--- .../components/data-view/DataViewWrapper.vue | 44 +++++++++---------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/web-app/packages/lib/src/common/components/AppSidebarRight.vue b/web-app/packages/lib/src/common/components/AppSidebarRight.vue index 10ace6b7..89a839ba 100644 --- a/web-app/packages/lib/src/common/components/AppSidebarRight.vue +++ b/web-app/packages/lib/src/common/components/AppSidebarRight.vue @@ -26,11 +26,13 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial
-

- -

+ +

+ +

+
-
+
diff --git a/web-app/packages/lib/src/common/components/data-view/DataViewWrapper.vue b/web-app/packages/lib/src/common/components/data-view/DataViewWrapper.vue index 8b936d85..a3ced259 100644 --- a/web-app/packages/lib/src/common/components/data-view/DataViewWrapper.vue +++ b/web-app/packages/lib/src/common/components/data-view/DataViewWrapper.vue @@ -13,15 +13,18 @@
{{ col.text }}
-
+
+ +
@@ -50,12 +53,17 @@ v-for="column in computedColumns.filter((item) => !item.fixed)" :key="column.value" :class="[ - 'flex flex-column justify-content-center col-12 gap-1', - `lg:col-${column.cols ?? defaultCols}`, + 'flex flex-column justify-content-center gap-1', + responsive + ? `col-12 lg:col-${column.cols ?? defaultCols}` + : `col-${column.cols ?? defaultCols}`, 'py-2 lg:py-0' ]" > -

+

{{ column.text }}

@@ -86,8 +94,10 @@ v-for="col in columns.filter((item) => !item.fixed)" :key="col.value" :class="[ - 'flex flex-column justify-content-center col-12', - `lg:col-${col.cols ?? 2}`, + 'flex flex-column justify-content-center', + responsive + ? `col-12 lg:col-${col.cols ?? defaultCols}` + : `col-${col.cols ?? defaultCols}`, 'py-2 pr-2' ]" > @@ -116,20 +126,6 @@ defineOptions({ inheritAttrs: false }) -/** - * Defines the props for the `DataViewWrapper` component. - * - * @interface Props - * @property {string} dataKey - The key to use for identifying each item in the data. - * @property {boolean} [loading] - Indicates whether the data is currently being loaded. - * @property {number} [loadingRows] - The number of loading rows to display when the data is being loaded. - * @property {string} [emptyMessage] - The message to display when there is no data available. - * @property {DataViewWrapperColumnItem[]} columns - An array of column definitions for the data view. - * @property {(item: any) => StyleValue} [rowStyle] - A function that returns the style for each row in the data view. - * @property {boolean} [rowCursorPointer] - Indicates whether the cursor should be a pointer when hovering over a row. - * @property {DataViewWrapperOptions} [options] - The options for the data view. - * @property {object[]} [value] - The data to be displayed in the data view. - */ /** * Defines the props for the `DataViewWrapper` component. * @@ -156,6 +152,7 @@ interface Props { options?: DataViewWrapperOptions value?: object[] defaultCols?: number + responsive?: boolean } const props = withDefaults(defineProps(), { @@ -163,7 +160,8 @@ const props = withDefaults(defineProps(), { loadingRows: 3, emptyMessage: 'No data available', rowCursorPointer: true, - defaultCols: 2 + defaultCols: 2, + responsive: true }) type EmitItem = Record From 4e3d1768e73e21582b3667f96674758d2f819a78 Mon Sep 17 00:00:00 2001 From: Marcel Kocisek Date: Wed, 16 Jul 2025 13:10:06 +0200 Subject: [PATCH 6/6] Revert "Allow concurrent non-blocking uploads" --- server/mergin/sync/commands.py | 4 +- server/mergin/sync/db_events.py | 10 - server/mergin/sync/files.py | 196 +++------ server/mergin/sync/models.py | 120 +++--- server/mergin/sync/public_api.yaml | 4 - server/mergin/sync/public_api_controller.py | 376 +++++++++--------- server/mergin/sync/schemas.py | 2 +- server/mergin/sync/storages/disk.py | 8 +- server/mergin/sync/tasks.py | 11 +- server/mergin/tests/fixtures.py | 7 +- server/mergin/tests/test_celery.py | 22 +- server/mergin/tests/test_db_hooks.py | 9 +- .../mergin/tests/test_project_controller.py | 238 ++--------- server/mergin/tests/utils.py | 38 +- .../9b3eac2f21c2_allow_concurrent_uploads.py | 85 ---- 15 files changed, 370 insertions(+), 760 deletions(-) delete mode 100644 server/migrations/community/9b3eac2f21c2_allow_concurrent_uploads.py diff --git a/server/mergin/sync/commands.py b/server/mergin/sync/commands.py index 4ec898cf..97e85981 100644 --- a/server/mergin/sync/commands.py +++ b/server/mergin/sync/commands.py @@ -9,6 +9,7 @@ from datetime import datetime from flask import Flask, current_app +from .files import UploadChanges from ..app import db from .models import Project, ProjectVersion from .utils import split_project_path @@ -51,7 +52,8 @@ def create(name, namespace, username): # pylint: disable=W0612 p = Project(**project_params) p.updated = datetime.utcnow() db.session.add(p) - pv = ProjectVersion(p, 0, user.id, [], "127.0.0.1") + changes = UploadChanges(added=[], updated=[], removed=[]) + pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1") pv.project = p db.session.commit() os.makedirs(p.storage.project_dir, exist_ok=True) diff --git a/server/mergin/sync/db_events.py b/server/mergin/sync/db_events.py index 7d7e55e0..18d1ce60 100644 --- a/server/mergin/sync/db_events.py +++ b/server/mergin/sync/db_events.py @@ -7,9 +7,6 @@ from sqlalchemy import event from ..app import db -from .models import ProjectVersion -from .public_api_controller import push_finished -from .tasks import remove_stale_project_uploads def check(session): @@ -17,16 +14,9 @@ def check(session): abort(503, "Service unavailable due to maintenance, please try later") -def cleanup_on_push_finished(project_version: ProjectVersion) -> None: - """On finished push trigger celery job cleanup""" - remove_stale_project_uploads.delay(project_version.project_id) - - def register_events(): event.listen(db.session, "before_commit", check) - push_finished.connect(cleanup_on_push_finished) def remove_events(): event.remove(db.session, "before_commit", check) - push_finished.connect(cleanup_on_push_finished) diff --git a/server/mergin/sync/files.py b/server/mergin/sync/files.py index 6073b0fb..12b30afe 100644 --- a/server/mergin/sync/files.py +++ b/server/mergin/sync/files.py @@ -3,29 +3,14 @@ # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial import datetime import os -import uuid from dataclasses import dataclass -from enum import Enum -from flask import current_app -from marshmallow import ValidationError, fields, EXCLUDE, post_dump, validates_schema -from pathvalidate import sanitize_filename from typing import Optional, List +from marshmallow import fields, EXCLUDE, pre_load, post_load, post_dump +from pathvalidate import sanitize_filename -from .utils import is_file_name_blacklisted, is_qgis, is_versioned_file from ..app import DateTimeWithZ, ma -class PushChangeType(Enum): - CREATE = "create" - UPDATE = "update" - DELETE = "delete" - UPDATE_DIFF = "update_diff" - - @classmethod - def values(cls): - return [member.value for member in cls.__members__.values()] - - def mergin_secure_filename(filename: str) -> str: """Generate secure filename for given file""" filename = os.path.normpath(filename) @@ -39,181 +24,94 @@ def mergin_secure_filename(filename: str) -> str: @dataclass class File: - """Base class for every file object, either intended to upload or already existing in project""" + """Base class for every file object""" path: str checksum: str size: int + location: str def is_valid_gpkg(self): """Check if diff file is valid""" return self.size != 0 -@dataclass -class ProjectDiffFile(File): - """Metadata for geodiff diff file (aka. changeset) associated with geopackage""" - - # location where file is actually stored - location: str - - @dataclass class ProjectFile(File): - """Project file metadata including metadata for diff file and location where it is stored""" + """Project file metadata including metadata for diff file""" # metadata for gpkg diff file - diff: Optional[ProjectDiffFile] + diff: Optional[File] # deprecated attribute kept for public API compatibility mtime: Optional[datetime.datetime] - # location where file is actually stored - location: str @dataclass -class ProjectFileChange(ProjectFile): - """Metadata of changed file in project version. - - This item is saved into database into file_history. - """ - - change: PushChangeType - - -def files_changes_from_upload(changes: dict, version: int) -> List["ProjectFileChange"]: - """Create a list of version file changes from upload changes dictionary used by public API. - - It flattens changes dict and adds change type to each item. Also generates location for each file. - """ - secure_filenames = [] - version_changes = [] - version = "v" + str(version) - for key in ("added", "updated", "removed"): - for item in changes.get(key, []): - location = os.path.join(version, mergin_secure_filename(item["path"])) - diff = None - - # make sure we have unique location for each file - if location in secure_filenames: - filename, file_extension = os.path.splitext(location) - location = filename + f".{str(uuid.uuid4())}" + file_extension - - secure_filenames.append(location) - - if key == "removed": - change = PushChangeType.DELETE - location = None - elif key == "added": - change = PushChangeType.CREATE - else: - change = PushChangeType.UPDATE - if item.get("diff"): - change = PushChangeType.UPDATE_DIFF - diff_location = os.path.join( - version, mergin_secure_filename(item["diff"]["path"]) - ) - if diff_location in secure_filenames: - filename, file_extension = os.path.splitext(diff_location) - diff_location = ( - filename + f".{str(uuid.uuid4())}" + file_extension - ) - - secure_filenames.append(diff_location) - diff = ProjectDiffFile( - path=item["diff"]["path"], - checksum=item["diff"]["checksum"], - size=item["diff"]["size"], - location=diff_location, - ) - - file_change = ProjectFileChange( - path=item["path"], - checksum=item["checksum"], - size=item["size"], - mtime=None, - change=change, - location=location, - diff=diff, - ) - version_changes.append(file_change) +class UploadFile(File): + """File to be uploaded coming from client push process""" - return version_changes + # determined by client + chunks: Optional[List[str]] + diff: Optional[File] + + +@dataclass +class UploadChanges: + added: List[UploadFile] + updated: List[UploadFile] + removed: List[UploadFile] class FileSchema(ma.Schema): path = fields.String() size = fields.Integer() checksum = fields.String() + location = fields.String(load_default="", load_only=True) class Meta: unknown = EXCLUDE + @post_load + def create_obj(self, data, **kwargs): + return File(**data) + class UploadFileSchema(FileSchema): chunks = fields.List(fields.String(), load_default=[]) diff = fields.Nested(FileSchema(), many=False, load_default=None) + @pre_load + def pre_load(self, data, **kwargs): + # add future location based on context version + version = f"v{self.context.get('version')}" + if not data.get("location"): + data["location"] = os.path.join( + version, mergin_secure_filename(data["path"]) + ) + if data.get("diff") and not data.get("diff").get("location"): + data["diff"]["location"] = os.path.join( + version, mergin_secure_filename(data["diff"]["path"]) + ) + return data + + @post_load + def create_obj(self, data, **kwargs): + return UploadFile(**data) + class ChangesSchema(ma.Schema): """Schema for upload changes""" - added = fields.List( - fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] - ) - updated = fields.List( - fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] - ) - removed = fields.List( - fields.Nested(UploadFileSchema()), load_default=[], dump_default=[] - ) - is_blocking = fields.Method("_is_blocking") + added = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) + updated = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) + removed = fields.List(fields.Nested(UploadFileSchema()), load_default=[]) class Meta: unknown = EXCLUDE - def _is_blocking(self, obj) -> bool: - """Check if changes would be blocking.""" - # let's mark upload as non-blocking only if there are new non-spatial data added (e.g. photos) - return bool( - len(obj.get("updated", [])) - or len(obj.get("removed", [])) - or any( - is_qgis(f["path"]) or is_versioned_file(f["path"]) - for f in obj.get("added", []) - ) - ) - - @post_dump - def remove_blacklisted_files(self, data, **kwargs): - """Files which are blacklisted are not allowed to be uploaded and are simple ignored.""" - for key in ("added", "updated", "removed"): - data[key] = [ - f - for f in data[key] - if not is_file_name_blacklisted( - f["path"], current_app.config["BLACKLIST"] - ) - ] - return data - - @validates_schema - def validate(self, data, **kwargs): - """Basic consistency validations for upload metadata""" - changes_files = [ - f["path"] for f in data["added"] + data["updated"] + data["removed"] - ] - - if len(changes_files) == 0: - raise ValidationError("No changes") - - # changes' files must be unique - if len(set(changes_files)) != len(changes_files): - raise ValidationError("Not unique changes") - - # check if all .gpkg file are valid - for file in data["added"] + data["updated"]: - if is_versioned_file(file["path"]) and file["size"] == 0: - raise ValidationError("File is not valid") + @post_load + def create_obj(self, data, **kwargs): + return UploadChanges(**data) class ProjectFileSchema(FileSchema): diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index be57e4d4..3854e4d2 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -23,11 +23,9 @@ from .files import ( File, - ProjectDiffFile, - ProjectFileChange, + UploadChanges, ChangesSchema, ProjectFile, - PushChangeType, ) from .interfaces import WorkspaceRole from .storages.disk import move_to_tmp @@ -40,6 +38,17 @@ project_access_granted = signal("project_access_granted") +class PushChangeType(Enum): + CREATE = "create" + UPDATE = "update" + DELETE = "delete" + UPDATE_DIFF = "update_diff" + + @classmethod + def values(cls): + return [member.value for member in cls.__members__.values()] + + class Project(db.Model): id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = db.Column(db.String, index=True) @@ -172,7 +181,7 @@ def files(self) -> List[ProjectFile]: checksum=row.checksum, location=row.location, mtime=row.mtime, - diff=ProjectDiffFile(**row.diff) if row.diff else None, + diff=File(**row.diff) if row.diff else None, ) for row in db.session.execute(query, params).fetchall() ] @@ -495,9 +504,9 @@ def path(self) -> str: return self.file.path @property - def diff_file(self) -> Optional[ProjectDiffFile]: + def diff_file(self) -> Optional[File]: if self.diff: - return ProjectDiffFile(**self.diff) + return File(**self.diff) @property def mtime(self) -> datetime: @@ -696,7 +705,7 @@ def __init__( project: Project, name: int, author_id: int, - changes: List[ProjectFileChange], + changes: UploadChanges, ip: str, user_agent: str = None, device_id: str = None, @@ -716,7 +725,9 @@ def __init__( ).all() } - changed_files_paths = set(change.path for change in changes) + changed_files_paths = [ + f.path for f in changes.updated + changes.removed + changes.added + ] existing_files_map = { f.path: f for f in ProjectFilePath.query.filter_by(project_id=self.project_id) @@ -724,32 +735,46 @@ def __init__( .all() } - for item in changes: - # get existing DB file reference or create a new one (for added files) - db_file = existing_files_map.get( - item.path, ProjectFilePath(self.project_id, item.path) - ) - fh = FileHistory( - file=db_file, - size=item.size, - checksum=item.checksum, - location=item.location, - diff=( - asdict(item.diff) - if (item.change is PushChangeType.UPDATE_DIFF and item.diff) - else null() - ), - change=item.change, - ) - fh.version = self - fh.project_version_name = self.name - db.session.add(fh) - db.session.flush() + for key in ( + ("added", PushChangeType.CREATE), + ("updated", PushChangeType.UPDATE), + ("removed", PushChangeType.DELETE), + ): + change_attr = key[0] + change_type = key[1] + + for upload_file in getattr(changes, change_attr): + is_diff_change = ( + change_type is PushChangeType.UPDATE + and upload_file.diff is not None + ) - if item.change is PushChangeType.DELETE: - latest_files_map.pop(fh.path, None) - else: - latest_files_map[fh.path] = fh.id + file = existing_files_map.get( + upload_file.path, ProjectFilePath(self.project_id, upload_file.path) + ) + fh = FileHistory( + file=file, + size=upload_file.size, + checksum=upload_file.checksum, + location=upload_file.location, + diff=( + asdict(upload_file.diff) + if (is_diff_change and upload_file.diff) + else null() + ), + change=( + PushChangeType.UPDATE_DIFF if is_diff_change else change_type + ), + ) + fh.version = self + fh.project_version_name = self.name + db.session.add(fh) + db.session.flush() + + if change_type is PushChangeType.DELETE: + latest_files_map.pop(fh.path, None) + else: + latest_files_map[fh.path] = fh.id # update cached values in project and push to transaction buffer so that self.files is up-to-date self.project.latest_project_files.file_history_ids = latest_files_map.values() @@ -884,7 +909,7 @@ def files(self) -> List[ProjectFile]: checksum=row.checksum, location=row.location, mtime=row.mtime, - diff=ProjectDiffFile(**row.diff) if row.diff else None, + diff=File(**row.diff) if row.diff else None, ) for row in result ] @@ -978,12 +1003,13 @@ class Upload(db.Model): project_id = db.Column( UUID(as_uuid=True), db.ForeignKey("project.id", ondelete="CASCADE"), index=True ) + # project version where upload is initiated from + version = db.Column(db.Integer, index=True) changes = db.Column(db.JSON) user_id = db.Column( db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), nullable=True ) created = db.Column(db.DateTime, default=datetime.utcnow) - blocking = db.Column(db.Boolean, default=True) user = db.relationship("User") project = db.relationship( @@ -993,24 +1019,16 @@ class Upload(db.Model): "uploads", single_parent=True, lazy="dynamic", cascade="all,delete" ), ) + __table_args__ = (db.UniqueConstraint("project_id", "version"),) - __table_args__ = ( - db.Index( - "ix_upload_blocking_partial", - project_id, - blocking, - unique=True, - postgresql_where=(blocking), - ), - ) - - def __init__(self, project: Project, changes: dict, user_id: int): - upload_changes = ChangesSchema().dump(changes) + def __init__( + self, project: Project, version: int, changes: UploadChanges, user_id: int + ): self.id = str(uuid.uuid4()) self.project_id = project.id + self.version = version + self.changes = ChangesSchema().dump(changes) self.user_id = user_id - self.blocking = upload_changes.pop("is_blocking") - self.changes = upload_changes @property def upload_dir(self): @@ -1035,10 +1053,6 @@ def clear(self): db.session.delete(self) db.session.commit() - def file_already_in_upload(self, path) -> bool: - """Check if file is not already as new added file""" - return any(f["path"] == path for f in self.changes["added"]) - class RequestStatus(Enum): ACCEPTED = "accepted" diff --git a/server/mergin/sync/public_api.yaml b/server/mergin/sync/public_api.yaml index b7c0b7e8..5227b562 100644 --- a/server/mergin/sync/public_api.yaml +++ b/server/mergin/sync/public_api.yaml @@ -596,10 +596,6 @@ paths: description: upload transaction uuid type: string example: 970181b5-7143-491b-91a6-36533021c9a2 - blocking: - description: if upload blocks other uploads - type: boolean - example: false "400": $ref: "#/components/responses/BadStatusResp" "401": diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index e9fb4a76..9fd229a1 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -10,8 +10,8 @@ from dataclasses import asdict from typing import Dict from urllib.parse import quote +import uuid from datetime import datetime -from marshmallow import ValidationError import gevent import psycopg2 @@ -50,13 +50,11 @@ ProjectRole, ) from .files import ( + UploadChanges, ChangesSchema, - ProjectDiffFile, - ProjectFile, - ProjectFileChange, + UploadFileSchema, ProjectFileSchema, - files_changes_from_upload, - mergin_secure_filename, + FileSchema, ) from .schemas import ( ProjectSchema, @@ -241,24 +239,15 @@ def add_project(namespace): # noqa: E501 .first_or_404() ) version_name = 1 - file_changes = [] - for file in template.files: - file_changes.append( - ProjectFileChange( - file.path, - file.checksum, - file.size, - diff=None, - mtime=None, - location=os.path.join("v1", mergin_secure_filename(file.path)), - change=PushChangeType.CREATE, - ) - ) + files = UploadFileSchema(context={"version": 1}, many=True).load( + FileSchema(exclude=("location",), many=True).dump(template.files) + ) + changes = UploadChanges(added=files, updated=[], removed=[]) else: template = None version_name = 0 - file_changes = [] + changes = UploadChanges(added=[], updated=[], removed=[]) try: p.storage.initialize(template_project=template) @@ -269,7 +258,7 @@ def add_project(namespace): # noqa: E501 p, version_name, current_user.id, - file_changes, + changes, get_ip(request), get_user_agent(request), get_device_id(request), @@ -763,67 +752,85 @@ def project_push(namespace, project_name): if not ws: abort(404) - pv = project.get_latest_version() - if pv and pv.name < version: - abort(400, "Version mismatch, client cannot be ahead of server") + # fixme use get_latest + pv = ProjectVersion.query.filter_by( + project_id=project.id, name=project.latest_version + ).first() + if pv and pv.name != version: + abort(400, "Version mismatch") if not pv and version != 0: abort(400, "First push should be with v0") - try: - ChangesSchema().validate(changes) - upload_changes = ChangesSchema().dump(changes) - except ValidationError as err: - msg = err.messages[0] if type(err.messages) == list else "Invalid input data" - abort(400, msg) - - # reject upload early if there is another blocking upload already running - if upload_changes["is_blocking"]: - pending_upload = Upload.query.filter_by( - project_id=project.id, blocking=True - ).first() - if pending_upload and pending_upload.is_active(): - abort(400, "Another process is running. Please try later.") - - current_files = set(file.path for file in project.files) - for item in upload_changes["added"]: - # check if same file is not already uploaded or in pending upload - item_path = item["path"] - if item_path in current_files: - abort(400, f"File {item_path} has been already uploaded") - - for upload in project.uploads.all(): - if not upload.is_active(): - continue - if upload.file_already_in_upload(item_path): - abort(400, f"File {item_path} is already in other upload") + if all(len(changes[key]) == 0 for key in changes.keys()): + abort(400, "No changes") + + # reject upload early if there is another one already running + pending_upload = Upload.query.filter_by( + project_id=project.id, version=version + ).first() + if pending_upload and pending_upload.is_active(): + abort(400, "Another process is running. Please try later.") + + upload_changes = ChangesSchema(context={"version": version + 1}).load(changes) - if not is_valid_path(item_path): + for item in upload_changes.added: + # check if same file is not already uploaded + if not all(ele.path != item.path for ele in project.files): + abort(400, f"File {item.path} has been already uploaded") + if not is_valid_path(item.path): abort( 400, - f"Unsupported file name detected: {item_path}. Please remove the invalid characters.", + f"Unsupported file name detected: {item.path}. Please remove the invalid characters.", ) - if not is_supported_extension(item_path): + if not is_supported_extension(item.path): abort( 400, - f"Unsupported file type detected: {item_path}. " + f"Unsupported file type detected: {item.path}. " f"Please remove the file or try compressing it into a ZIP file before uploading.", ) - # check consistency of changes - if not set( - file["path"] for file in upload_changes["updated"] + upload_changes["removed"] - ).issubset(current_files): - abort(400, "Update or remove changes contain files that are not in project") + # changes' files must be unique + changes_files = [ + f.path + for f in upload_changes.added + upload_changes.updated + upload_changes.removed + ] + if len(set(changes_files)) != len(changes_files): + abort(400, "Not unique changes") + + sanitized_files = [] + blacklisted_files = [] + for f in upload_changes.added + upload_changes.updated + upload_changes.removed: + # check if .gpkg file is valid + if is_versioned_file(f.path): + if not f.is_valid_gpkg(): + abort(400, f"File {f.path} is not valid") + if is_file_name_blacklisted(f.path, current_app.config["BLACKLIST"]): + blacklisted_files.append(f.path) + # all file need to be unique after sanitized + if f.location in sanitized_files: + filename, file_extension = os.path.splitext(f.location) + f.location = filename + f".{str(uuid.uuid4())}" + file_extension + sanitized_files.append(f.location) + if f.diff: + if f.diff.location in sanitized_files: + filename, file_extension = os.path.splitext(f.diff.location) + f.diff.location = filename + f".{str(uuid.uuid4())}" + file_extension + sanitized_files.append(f.diff.location) + + # remove blacklisted files from changes + for key in upload_changes.__dict__.keys(): + new_value = [ + f for f in getattr(upload_changes, key) if f.path not in blacklisted_files + ] + setattr(upload_changes, key, new_value) # Check user data limit - updates = [f["path"] for f in upload_changes["updated"]] + updates = [f.path for f in upload_changes.updated] updated_files = list(filter(lambda i: i.path in updates, project.files)) additional_disk_usage = ( - sum( - file["size"] for file in upload_changes["added"] + upload_changes["updated"] - ) + sum(file.size for file in upload_changes.added + upload_changes.updated) - sum(file.size for file in updated_files) - - sum(file["size"] for file in upload_changes["removed"]) + - sum(file.size for file in upload_changes.removed) ) current_usage = ws.disk_usage() @@ -835,18 +842,18 @@ def project_push(namespace, project_name): ) ) - upload = Upload(project, upload_changes, current_user.id) + upload = Upload(project, version, upload_changes, current_user.id) db.session.add(upload) try: - # Creating blocking upload transaction can fail, e.g. in case of racing condition + # Creating upload transaction with different project's version is possible. db.session.commit() logging.info( - f"Upload transaction {upload.id} created for project: {project.id}, version: {version}, blocking: {upload.blocking}" + f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" ) except IntegrityError: db.session.rollback() - # check and clean dangling blocking uploads or abort - for current_upload in project.uploads.filter(Upload.blocking == True).all(): + # check and clean dangling uploads or abort + for current_upload in project.uploads.all(): if current_upload.is_active(): abort(400, "Another process is running. Please try later.") db.session.delete(current_upload) @@ -864,7 +871,7 @@ def project_push(namespace, project_name): try: db.session.commit() logging.info( - f"Upload transaction {upload.id} created for project: {project.id}, version: {version}, blocking: {upload.blocking}" + f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" ) move_to_tmp(upload.upload_dir) except IntegrityError as err: @@ -880,13 +887,12 @@ def project_push(namespace, project_name): next_version = version + 1 user_agent = get_user_agent(request) device_id = get_device_id(request) - file_changes = files_changes_from_upload(upload.changes, version=next_version) try: pv = ProjectVersion( project, next_version, current_user.id, - file_changes, + upload_changes, get_ip(request), user_agent, device_id, @@ -913,7 +919,7 @@ def project_push(namespace, project_name): finally: upload.clear() - return {"transaction": upload.id, "blocking": upload.blocking} + return {"transaction": upload.id} @auth_required @@ -932,27 +938,29 @@ def chunk_upload(transaction_id, chunk_id): """ upload, upload_dir = get_upload(transaction_id) request.view_args["project"] = upload.project - chunks = [] - for file in upload.changes["added"] + upload.changes["updated"]: - chunks += file.get("chunks", []) - - if chunk_id not in chunks: - abort(404) - - dest = os.path.join(upload_dir, "chunks", chunk_id) - with Toucher(upload.lockfile, 30): - try: - # we could have used request.data here, but it could eventually cause OOM issue - save_to_file(request.stream, dest, current_app.config["MAX_CHUNK_SIZE"]) - except IOError: - move_to_tmp(dest, transaction_id) - abort(400, "Too big chunk") - if os.path.exists(dest): - checksum = generate_checksum(dest) - size = os.path.getsize(dest) - return jsonify({"checksum": checksum, "size": size}), 200 - else: - abort(400, "Upload was probably canceled") + upload_changes = ChangesSchema(context={"version": upload.version + 1}).load( + upload.changes + ) + for f in upload_changes.added + upload_changes.updated: + if chunk_id in f.chunks: + dest = os.path.join(upload_dir, "chunks", chunk_id) + lockfile = os.path.join(upload_dir, "lockfile") + with Toucher(lockfile, 30): + try: + # we could have used request.data here, but it could eventually cause OOM issue + save_to_file( + request.stream, dest, current_app.config["MAX_CHUNK_SIZE"] + ) + except IOError: + move_to_tmp(dest, transaction_id) + abort(400, "Too big chunk") + if os.path.exists(dest): + checksum = generate_checksum(dest) + size = os.path.getsize(dest) + return jsonify({"checksum": checksum, "size": size}), 200 + else: + abort(400, "Upload was probably canceled") + abort(404) @auth_required @@ -976,41 +984,29 @@ def push_finish(transaction_id): upload, upload_dir = get_upload(transaction_id) request.view_args["project"] = upload.project + changes = ChangesSchema(context={"version": upload.version + 1}).load( + upload.changes + ) project = upload.project if project.locked_until: abort(make_response(jsonify(ProjectLocked().to_dict()), 422)) - project_path = get_project_path(project) - next_version = project.latest_version + 1 - v_next_version = ProjectVersion.to_v_name(next_version) - try: - upload_changes = ChangesSchema().load(upload.changes) - except ValidationError as err: - msg = err.messages[0] if type(err.messages) == list else "Invalid input data" - abort(422, msg) - - file_changes = files_changes_from_upload(upload_changes, next_version) - chunks_map = { - f["path"]: f["chunks"] - for f in upload_changes["added"] + upload_changes["updated"] - } - corrupted_files = [] - # Concatenate chunks into single file - for f in file_changes: - if f.change == PushChangeType.DELETE: - continue + for f in changes.added + changes.updated: + if f.diff is not None: + dest_file = os.path.join(upload_dir, "files", f.diff.location) + expected_size = f.diff.size + else: + dest_file = os.path.join(upload_dir, "files", f.location) + expected_size = f.size - f_path = ( - f.diff.location if f.change == PushChangeType.UPDATE_DIFF else f.location - ) - temporary_location = os.path.join(upload_dir, "files", f_path) + # Concatenate chunks into single file # TODO we need to move this elsewhere since it can fail for large files (and slow FS) - os.makedirs(os.path.dirname(temporary_location), exist_ok=True) - with open(temporary_location, "wb") as dest: + os.makedirs(os.path.dirname(dest_file), exist_ok=True) + with open(dest_file, "wb") as dest: try: - for chunk_id in chunks_map.get(f.path, []): + for chunk_id in f.chunks: sleep(0) # to unblock greenlet chunk_file = os.path.join(upload_dir, "chunks", chunk_id) with open(chunk_file, "rb") as src: @@ -1025,92 +1021,88 @@ def push_finish(transaction_id): ) corrupted_files.append(f.path) continue - if not is_supported_type(temporary_location): - logging.info(f"Rejecting blacklisted file: {temporary_location}") + if not is_supported_type(dest_file): + logging.info(f"Rejecting blacklisted file: {dest_file}") abort( 400, f"Unsupported file type detected: {f.path}. " f"Please remove the file or try compressing it into a ZIP file before uploading.", ) - # check if .gpkg file is valid - if is_versioned_file(temporary_location) and not f.is_valid_gpkg(): - corrupted_files.append(f.path) - continue - - expected_size = ( - f.diff.size if f.change == PushChangeType.UPDATE_DIFF else f.size - ) - if expected_size != os.path.getsize(temporary_location): + if expected_size != os.path.getsize(dest_file): logging.error( - f"Data integrity check has failed on file {f.path} in project {project_path}", + "Data integrity check has failed on file %s in project %s" + % (f.path, project_path), exc_info=True, ) + # check if .gpkg file is valid + if is_versioned_file(dest_file): + if not f.is_valid_gpkg(): + corrupted_files.append(f.path) corrupted_files.append(f.path) if corrupted_files: move_to_tmp(upload_dir) abort(422, {"corrupted_files": corrupted_files}) + next_version = upload.version + 1 + v_next_version = ProjectVersion.to_v_name(next_version) files_dir = os.path.join(upload_dir, "files", v_next_version) target_dir = os.path.join(project.storage.project_dir, v_next_version) - - # double check someone else has not created the same version meanwhile - if ProjectVersion.query.filter_by( - project_id=project.id, name=next_version - ).count() or os.path.exists(target_dir): - abort( - 409, - f"There is already version with this name {v_next_version}", + if os.path.exists(target_dir): + pv = ProjectVersion.query.filter_by( + project_id=project.id, name=project.latest_version + ).first() + if pv and pv.name == upload.version + 1: + abort( + 409, + f"There is already version with this name {v_next_version}", + ) + logging.info( + "Upload transaction: Target directory already exists. Overwriting %s" + % target_dir ) + move_to_tmp(target_dir) try: # let's move uploaded files where they are expected to be os.renames(files_dir, target_dir) # apply gpkg updates sync_errors = {} - to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE] + to_remove = [i.path for i in changes.removed] current_files = [f for f in project.files if f.path not in to_remove] - for file in file_changes: - # for updates try to apply diff to create a full updated gpkg file or from full .gpkg try to create corresponding diff - if file.change in ( - PushChangeType.UPDATE, - PushChangeType.UPDATE_DIFF, - ) and is_versioned_file(file.path): - current_file = next( - (i for i in current_files if i.path == file.path), None - ) - if not current_file: - sync_errors[file.path] = "file not found on server " - continue - - # yield to gevent hub since geodiff action can take some time to prevent worker timeout - sleep(0) + for updated_file in changes.updated: + # yield to gevent hub since geodiff action can take some time to prevent worker timeout + sleep(0) + current_file = next( + (i for i in current_files if i.path == updated_file.path), None + ) + if not current_file: + sync_errors[updated_file.path] = "file not found on server " + continue - if file.diff: - result = project.storage.apply_diff( - current_file, file, next_version - ) - if result.ok(): - checksum, size = result.value - file.checksum = checksum - file.size = size - else: - sync_errors[file.path] = ( - f"project: {project.workspace.name}/{project.name}, {result.value}" - ) + if updated_file.diff: + result = project.storage.apply_diff( + current_file, updated_file, next_version + ) + if result.ok(): + checksum, size = result.value + updated_file.checksum = checksum + updated_file.size = size else: - result = project.storage.construct_diff( - current_file, file, next_version + sync_errors[updated_file.path] = ( + f"project: {project.workspace.name}/{project.name}, {result.value}" ) - if result.ok(): - file.diff = result.value - file.change = PushChangeType.UPDATE_DIFF - else: - # if diff cannot be constructed it would be force update - logging.warning( - f"Geodiff: create changeset error {result.value}" - ) + + elif is_versioned_file(updated_file.path): + result = project.storage.construct_diff( + current_file, updated_file, next_version + ) + if result.ok(): + updated_file.diff = result.value + else: + # if diff cannot be constructed it would be force update + logging.warning(f"Geodiff: create changeset error {result.value}") if sync_errors: msg = "" @@ -1124,7 +1116,7 @@ def push_finish(transaction_id): project, next_version, current_user.id, - file_changes, + changes, get_ip(request), user_agent, device_id, @@ -1143,14 +1135,10 @@ def push_finish(transaction_id): f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " f"transaction id: {transaction_id}.: {str(err)}" ) - if os.path.exists(target_dir): - move_to_tmp(target_dir) abort(422, "Failed to create new version: {}".format(str(err))) # catch exception during pg transaction so we can rollback and prevent PendingRollbackError during upload clean up except gevent.timeout.Timeout: db.session.rollback() - if os.path.exists(target_dir): - move_to_tmp(target_dir) raise finally: # remove artifacts @@ -1258,25 +1246,15 @@ def clone_project(namespace, project_name): # noqa: E501 user_agent = get_user_agent(request) device_id = get_device_id(request) # transform source files to new uploaded files - file_changes = [] - for file in cloned_project.files: - file_changes.append( - ProjectFileChange( - file.path, - file.checksum, - file.size, - diff=None, - mtime=None, - location=os.path.join("v1", mergin_secure_filename(file.path)), - change=PushChangeType.CREATE, - ) - ) - + files = UploadFileSchema(context={"version": 1}, many=True).load( + FileSchema(exclude=("location",), many=True).dump(cloned_project.files) + ) + changes = UploadChanges(added=files, updated=[], removed=[]) project_version = ProjectVersion( p, version, current_user.id, - file_changes, + changes, get_ip(request), user_agent, device_id, diff --git a/server/mergin/sync/schemas.py b/server/mergin/sync/schemas.py index 617ad531..75b6f09e 100644 --- a/server/mergin/sync/schemas.py +++ b/server/mergin/sync/schemas.py @@ -75,7 +75,7 @@ def project_user_permissions(project): class FileHistorySchema(ma.SQLAlchemyAutoSchema): mtime = DateTimeWithZ() - diff = fields.Nested(FileSchema(), attribute="diff_file") + diff = fields.Nested(FileSchema(), attribute="diff_file", exclude=("location",)) expiration = DateTimeWithZ(attribute="expiration", dump_only=True) class Meta: diff --git a/server/mergin/sync/storages/disk.py b/server/mergin/sync/storages/disk.py index 459d704d..4debb255 100644 --- a/server/mergin/sync/storages/disk.py +++ b/server/mergin/sync/storages/disk.py @@ -21,7 +21,7 @@ generate_checksum, is_versioned_file, ) -from ..files import ProjectDiffFile, mergin_secure_filename, ProjectFile +from ..files import mergin_secure_filename, ProjectFile, UploadFile, File def save_to_file(stream, path, max_size=None): @@ -245,7 +245,7 @@ def _generator(): return _generator() def apply_diff( - self, current_file: ProjectFile, upload_file: ProjectFile, version: int + self, current_file: ProjectFile, upload_file: UploadFile, version: int ) -> Result: """Apply geodiff diff file on current gpkg basefile. Creates GeodiffActionHistory record of the action. Returns checksum and size of generated file. If action fails it returns geodiff error message. @@ -313,7 +313,7 @@ def apply_diff( return Err(self.gediff_log.getvalue()) def construct_diff( - self, current_file: ProjectFile, upload_file: ProjectFile, version: int + self, current_file: ProjectFile, upload_file: UploadFile, version: int ) -> Result: """Construct geodiff diff file from uploaded gpkg and current basefile. Returns diff metadata as a result. If action fails it returns geodiff error message. @@ -345,7 +345,7 @@ def construct_diff( basefile_tmp, uploaded_file_tmp, changeset_tmp ) # create diff metadata as it would be created by other clients - diff_file = ProjectDiffFile( + diff_file = File( path=diff_name, checksum=generate_checksum(changeset_tmp), size=os.path.getsize(changeset_tmp), diff --git a/server/mergin/sync/tasks.py b/server/mergin/sync/tasks.py index ee42c416..f56fb273 100644 --- a/server/mergin/sync/tasks.py +++ b/server/mergin/sync/tasks.py @@ -10,7 +10,7 @@ from zipfile import ZIP_DEFLATED, ZipFile from flask import current_app -from .models import Project, ProjectVersion, FileHistory, Upload +from .models import Project, ProjectVersion, FileHistory from .storages.disk import move_to_tmp from .config import Configuration from ..celery import celery @@ -155,12 +155,3 @@ def remove_projects_archives(): os.remove(path) except OSError as e: logging.error(f"Unable to remove {path}: {str(e)}") - - -@celery.task -def remove_stale_project_uploads(project_id: str): - """Remove project stale uploads""" - db.session.info = {"msg": "remove_stale_project_uploads"} - for upload in Upload.query.filter_by(project_id=project_id).all(): - if not upload.is_active(): - upload.clear() diff --git a/server/mergin/tests/fixtures.py b/server/mergin/tests/fixtures.py index 9f39909d..7cff688e 100644 --- a/server/mergin/tests/fixtures.py +++ b/server/mergin/tests/fixtures.py @@ -19,7 +19,7 @@ from ..stats.models import MerginInfo from . import test_project, test_workspace_id, test_project_dir, TMP_DIR from .utils import login_as_admin, initialize, cleanup, file_info -from ..sync.files import files_changes_from_upload +from ..sync.files import ChangesSchema thisdir = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.join(thisdir, os.pardir)) @@ -213,13 +213,12 @@ def diff_project(app): else: # no files uploaded, hence no action needed pass - - file_changes = files_changes_from_upload(change, version=i + 2) + upload_changes = ChangesSchema(context={"version": i + 2}).load(change) pv = ProjectVersion( project, i + 2, project.creator.id, - file_changes, + upload_changes, "127.0.0.1", ) assert pv.project_size == sum(file.size for file in pv.files) diff --git a/server/mergin/tests/test_celery.py b/server/mergin/tests/test_celery.py index 619897a8..a5d07f47 100644 --- a/server/mergin/tests/test_celery.py +++ b/server/mergin/tests/test_celery.py @@ -10,20 +10,19 @@ from ..app import db from ..config import Configuration -from ..sync.models import Project, AccessRequest, ProjectRole, ProjectVersion, Upload +from ..sync.models import Project, AccessRequest, ProjectRole, ProjectVersion from ..celery import send_email_async from ..sync.tasks import ( - remove_stale_project_uploads, remove_temp_files, remove_projects_backups, create_project_version_zip, remove_projects_archives, ) from ..sync.storages.disk import move_to_tmp -from . import test_project, test_workspace_name, test_workspace_id, json_headers -from .test_project_controller import create_transaction +from . import test_project, test_workspace_name, test_workspace_id from .utils import add_user, create_workspace, create_project, login, modify_file_times from ..auth.models import User +from . import json_headers def test_send_email(app): @@ -158,18 +157,3 @@ def test_create_project_version_zip(diff_project): modify_file_times(latest_version.zip_path, new_time) remove_projects_archives() assert not os.path.exists(latest_version.zip_path) - - -@patch.object(Upload, "is_active") -def test_after_push_upload_cleanup(mock_is_active, client): - """Test stale uploads are removed""" - upload, _ = create_transaction("mergin", {}) - transaction_id = upload.id - - mock_is_active.return_value = True - remove_stale_project_uploads(upload.project_id) - assert Upload.query.get(transaction_id) - - mock_is_active.return_value = False - remove_stale_project_uploads(upload.project_id) - assert not Upload.query.get(transaction_id) diff --git a/server/mergin/tests/test_db_hooks.py b/server/mergin/tests/test_db_hooks.py index 48c14c0e..e7f9e270 100644 --- a/server/mergin/tests/test_db_hooks.py +++ b/server/mergin/tests/test_db_hooks.py @@ -18,6 +18,7 @@ ProjectRole, ProjectUser, ) +from ..sync.files import UploadChanges from ..auth.models import User from ..app import db from . import DEFAULT_USER @@ -39,7 +40,8 @@ def test_close_user_account(client, diff_project): # user has access to mergin user diff_project diff_project.set_role(user.id, ProjectRole.WRITER) # user contributed to another user project so he is listed in projects history - pv = ProjectVersion(diff_project, 11, user.id, [], "127.0.0.1") + changes = UploadChanges(added=[], updated=[], removed=[]) + pv = ProjectVersion(diff_project, 11, user.id, changes, "127.0.0.1") diff_project.latest_version = pv.name pv.project = diff_project db.session.add(pv) @@ -114,9 +116,8 @@ def test_remove_project(client, diff_project): # set up mergin_user = User.query.filter_by(username=DEFAULT_USER[0]).first() project_dir = Path(diff_project.storage.project_dir) - upload = Upload( - diff_project, {"added": [], "removed": [], "updated": []}, mergin_user.id - ) + changes = UploadChanges(added=[], removed=[], updated=[]) + upload = Upload(diff_project, 10, changes, mergin_user.id) db.session.add(upload) project_id = diff_project.id user = add_user("user", "user") diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 989133e6..b1f60a8f 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -5,7 +5,6 @@ import datetime import os from dataclasses import asdict -from unittest import mock from unittest.mock import patch from urllib.parse import quote import pysqlite3 @@ -17,16 +16,14 @@ import hashlib import shutil import re + from flask_login import current_user from pygeodiff import GeoDiff from flask import url_for, current_app import tempfile -from sqlalchemy import desc -from sqlalchemy.exc import IntegrityError +from sqlalchemy import desc from ..app import db -from ..config import Configuration -from ..sync.config import Configuration as SyncConfiguration from ..sync.models import ( Project, Upload, @@ -38,9 +35,9 @@ PushChangeType, ProjectFilePath, ) -from ..sync.files import ChangesSchema, files_changes_from_upload +from ..sync.files import ChangesSchema from ..sync.schemas import ProjectListSchema -from ..sync.utils import generate_checksum, is_versioned_file, get_project_path +from ..sync.utils import generate_checksum, is_versioned_file from ..auth.models import User, UserProfile from . import ( @@ -62,6 +59,9 @@ login_as_admin, upload_file_to_project, ) +from ..config import Configuration +from ..sync.config import Configuration as SyncConfiguration +from ..sync.utils import get_project_path CHUNK_SIZE = 1024 @@ -497,6 +497,7 @@ def test_delete_project(client): # mimic update of project with chunk upload changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) + os.mkdir(os.path.join(upload.project.storage.project_dir, "v2")) os.makedirs(os.path.join(upload_dir, "chunks")) for f in upload.changes["added"] + upload.changes["updated"]: with open(os.path.join(test_project_dir, f["path"]), "rb") as in_file: @@ -1019,17 +1020,6 @@ def _get_changes_with_diff_0_size(project_dir): return changes -def _get_random_file_metadata(): - """Return fake metadata for non-existing random file""" - return { - "path": f"{uuid.uuid4().hex}.txt", - "checksum": hashlib.sha1().hexdigest(), - "size": 0, - "mtime": datetime.datetime.now().timestamp(), - "chunks": [], - } - - test_push_data = [ ( {"version": "v1", "changes": _get_changes_without_added(test_project_dir)}, @@ -1048,7 +1038,7 @@ def _get_random_file_metadata(): 400, ), # contains already uploaded file ( - {"version": "v2", "changes": _get_changes_without_added(test_project_dir)}, + {"version": "v0", "changes": _get_changes_without_added(test_project_dir)}, 400, ), # version mismatch ({"version": "v1", "changes": {}}, 400), # wrong changes format @@ -1060,35 +1050,9 @@ def _get_random_file_metadata(): { "version": "v1", "changes": { - "added": [], - "removed": [_get_random_file_metadata()], - "updated": [], - }, - }, - 400, - ), # delete not-existing file - ( - { - "version": "v1", - "changes": { - "added": [], - "removed": [], - "updated": [_get_random_file_metadata()], - }, - }, - 400, - ), # update not-existing file - ( - { - "version": "v1", - "changes": { - "added": [ - file_info(test_project_dir, "test.txt", chunk_size=CHUNK_SIZE) - ], + "added": [{"path": "test.txt"}], "removed": [], - "updated": [ - file_info(test_project_dir, "test.txt", chunk_size=CHUNK_SIZE) - ], + "updated": [{"path": "test.txt"}], }, }, 400, @@ -1122,100 +1086,20 @@ def test_push_project_start(client, data, expected): assert failure.error_type == "push_start" -def test_concurrent_uploads(client): - """Test concurrent uploads into same project""" - url = f"/v1/project/push/{test_workspace_name}/{test_project}" - data = {"version": "v0", "changes": _get_changes_with_diff(test_project_dir)} - resp = client.post( - url, - data=json.dumps(data, cls=DateTimeEncoder).encode("utf-8"), - headers=json_headers, - ) - assert resp.status_code == 200 - upload = Upload.query.get(resp.json["transaction"]) - assert upload.blocking is True - assert upload.is_active() - - uploaded_file = file_info( - test_project_dir, "test_dir/test4.txt", chunk_size=CHUNK_SIZE - ) - # modify name to bypass name check - uploaded_file["path"] = "test123.txt" - data["changes"] = { - "added": [uploaded_file], - "updated": [], - "removed": [], - } - - resp = client.post( - url, - data=json.dumps(data, cls=DateTimeEncoder).encode("utf-8"), - headers=json_headers, - ) - assert resp.status_code == 200 - upload = Upload.query.get(resp.json["transaction"]) - assert upload.blocking is False - - # we cannot have multiple pending uploads with the same new added files - resp = client.post( - url, - data=json.dumps(data, cls=DateTimeEncoder).encode("utf-8"), - headers=json_headers, - ) - assert resp.status_code == 400 - assert ( - resp.json["detail"] - == f"File {uploaded_file['path']} is already in other upload" - ) - - # second blocking upload is forbidden - data["changes"] = _get_changes_without_added(test_project_dir) - resp = client.post( - url, - data=json.dumps(data, cls=DateTimeEncoder).encode("utf-8"), - headers=json_headers, - ) - assert resp.status_code == 400 - assert resp.json["detail"] == "Another process is running. Please try later." - - upload = Upload.query.filter_by(blocking=True).first() - trasaction_id = str(upload.id) - # pretent blocking upload is stale - with patch("mergin.sync.models.Upload.is_active") as mock_is_active: - mock_is_active.return_value = False - resp = client.post( - url, - data=json.dumps(data, cls=DateTimeEncoder).encode("utf-8"), - headers=json_headers, - ) - assert resp.status_code == 200 - assert resp.json["transaction"] - # previous stale upload was removed - assert not Upload.query.get(trasaction_id) - - def test_push_to_new_project(client): # create blank project p = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id ).first() - project = Project("blank", p.storage_params, p.creator, p.workspace, public=True) - db.session.add(project) - pv = ProjectVersion( - project, - 0, - p.creator.id, - [], - ip="127.0.0.1", + project = Project( + "blank", p.storage_params, p.creator, p.workspace, files=[], public=True ) - db.session.add(pv) + db.session.add(project) db.session.commit() current_app.config["BLACKLIST"] = ["test4"] url = "/v1/project/push/{}/{}".format(test_workspace_name, "blank") - changes = _get_changes(test_project_dir) - changes["updated"] = changes["removed"] = [] - data = {"version": "v0", "changes": changes} + data = {"version": "v0", "changes": _get_changes(test_project_dir)} resp = client.post( url, data=json.dumps(data, cls=DateTimeEncoder).encode("utf-8"), @@ -1237,15 +1121,7 @@ def test_push_to_new_project(client): headers=json_headers, ) assert resp.status_code == 400 - assert resp.json["detail"] == "Version mismatch, client cannot be ahead of server" - failure = SyncFailuresHistory.query.filter_by(project_id=project.id).first() - assert failure.last_version == "v0" - assert failure.error_type == "push_start" - assert failure.error_details == "Version mismatch, client cannot be ahead of server" - # test legacy situation when project does not have any project version associated yet - db.session.delete(pv) - db.session.commit() data = {"version": "v100", "changes": _get_changes(test_project_dir)} resp = client.post( url, @@ -1254,11 +1130,7 @@ def test_push_to_new_project(client): ) assert resp.status_code == 400 assert resp.json["detail"] == "First push should be with v0" - failure = ( - SyncFailuresHistory.query.filter_by(project_id=project.id) - .order_by(SyncFailuresHistory.timestamp.desc()) - .first() - ) + failure = SyncFailuresHistory.query.filter_by(project_id=project.id).first() assert failure.last_version == "v0" assert failure.error_type == "push_start" assert failure.error_details == "First push should be with v0" @@ -1405,7 +1277,8 @@ def create_transaction(username, changes, version=1): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id ).first() - upload = Upload(project, changes, user.id) + upload_changes = ChangesSchema(context={"version": version}).load(changes) + upload = Upload(project, version, upload_changes, user.id) db.session.add(upload) db.session.commit() upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id) @@ -1491,6 +1364,7 @@ def test_push_finish(client): assert failure.error_type == "push_finish" assert "corrupted_files" in failure.error_details + os.mkdir(os.path.join(upload.project.storage.project_dir, "v2")) # mimic chunks were uploaded os.makedirs(os.path.join(upload_dir, "chunks")) for f in upload.changes["added"] + upload.changes["updated"]: @@ -1499,32 +1373,6 @@ def test_push_finish(client): with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) - # test finish upload, pretend another upload is already being processed - os.makedirs( - os.path.join( - upload.project.storage.project_dir, f"v{upload.project.latest_version + 1}" - ), - exist_ok=True, - ) - resp = client.post( - url, - headers=json_headers, - ) - assert resp.status_code == 409 - # bump to fake version to make upload finish pass - upload.project.latest_version += 1 - db.session.add(upload.project) - pv = ProjectVersion( - upload.project, - upload.project.latest_version, - upload.project.creator.id, - [], - "127.0.0.1", - ) - pv.project = upload.project - db.session.add(pv) - db.session.commit() - resp2 = client.post(url, headers={**json_headers, "User-Agent": "Werkzeug"}) assert resp2.status_code == 200 assert not os.path.exists(upload_dir) @@ -1557,7 +1405,7 @@ def test_push_finish(client): assert resp4.status_code == 403 # other failures with error code 403, 404 does to count to failures history - assert SyncFailuresHistory.query.count() == 2 + assert SyncFailuresHistory.query.count() == 1 def test_push_close(client): @@ -1926,9 +1774,6 @@ def test_optimize_storage(app, client, diff_project): diff_project.latest_version = 8 ProjectVersion.query.filter_by(project_id=diff_project.id, name=9).delete() ProjectVersion.query.filter_by(project_id=diff_project.id, name=10).delete() - for _dir in ("v9", "v10"): - if os.path.exists(os.path.join(diff_project.storage.project_dir, _dir)): - shutil.rmtree(os.path.join(diff_project.storage.project_dir, _dir)) db.session.commit() diff_project.cache_latest_files() assert diff_project.latest_version == 8 @@ -2429,12 +2274,12 @@ def add_project_version(project, changes, version=None): else User.query.filter_by(username=DEFAULT_USER[0]).first() ) next_version = version or project.next_version() - file_changes = files_changes_from_upload(changes, version=next_version) + upload_changes = ChangesSchema(context={"version": next_version}).load(changes) pv = ProjectVersion( project, next_version, author.id, - file_changes, + upload_changes, ip="127.0.0.1", ) db.session.add(pv) @@ -2448,24 +2293,19 @@ def test_project_version_integrity(client): changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - a = upload.project.files - - # try to finish the transaction which would fail on version created integrity error, e.g. race conditions - with patch.object( - ProjectVersion, - "__init__", - side_effect=IntegrityError("Project version already exists", None, None), - ): - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) - assert resp.status_code == 422 - assert "Failed to create new version" in resp.json["detail"] - failure = SyncFailuresHistory.query.filter_by( - project_id=upload.project.id - ).first() - assert failure.error_type == "push_finish" - assert "Failed to create new version" in failure.error_details - db.session.delete(failure) - db.session.commit() + # manually create an identical project version in db + pv = add_project_version(upload.project, changes) + # try to finish the transaction + resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + assert resp.status_code == 422 + assert "Failed to create new version" in resp.json["detail"] + failure = SyncFailuresHistory.query.filter_by(project_id=upload.project.id).first() + assert failure.error_type == "push_finish" + assert "Failed to create new version" in failure.error_details + upload.project.latest_version = pv.name - 1 + db.session.delete(pv) + db.session.delete(failure) + db.session.commit() # changes without an upload with patch("mergin.sync.public_api_controller.get_user_agent") as mock: @@ -2480,7 +2320,7 @@ def test_project_version_integrity(client): # to insert an identical project version when no upload (only one endpoint used), # we need to pretend side effect of a function called just before project version insertion def _get_user_agent(): - add_project_version(project, {}) + add_project_version(project, changes) # bypass endpoint checks upload.project.latest_version = ProjectVersion.from_v_name(data["version"]) return "Input" @@ -2615,12 +2455,6 @@ def test_signals(client): upload_file_to_project(project, "test.txt", client) push_finished_mock.assert_called_once() - with patch( - "mergin.sync.tasks.remove_stale_project_uploads.delay" - ) as upload_cleanup_mock: - upload_file_to_project(project, "test.qgs", client) - upload_cleanup_mock.assert_called_once() - def test_filepath_manipulation(client): """Test filepath validation during file upload""" diff --git a/server/mergin/tests/utils.py b/server/mergin/tests/utils.py index 0c4448a6..94fc033f 100644 --- a/server/mergin/tests/utils.py +++ b/server/mergin/tests/utils.py @@ -20,7 +20,7 @@ from ..auth.models import User, UserProfile from ..sync.utils import generate_location, generate_checksum from ..sync.models import Project, ProjectVersion, FileHistory, ProjectRole -from ..sync.files import ProjectFileChange, PushChangeType, files_changes_from_upload +from ..sync.files import UploadChanges, ChangesSchema from ..sync.workspace import GlobalWorkspace from ..app import db from . import json_headers, DEFAULT_USER, test_project, test_project_dir, TMP_DIR @@ -82,7 +82,8 @@ def create_project(name, workspace, user, **kwargs): p.updated = datetime.utcnow() db.session.add(p) db.session.flush() - pv = ProjectVersion(p, 0, user.id, [], "127.0.0.1") + changes = UploadChanges(added=[], updated=[], removed=[]) + pv = ProjectVersion(p, 0, user.id, changes, "127.0.0.1") db.session.add(pv) db.session.commit() @@ -155,17 +156,15 @@ def initialize(): for f in files: abs_path = os.path.join(root, f) project_files.append( - ProjectFileChange( - path=abs_path.replace(test_project_dir, "").lstrip("/"), - checksum=generate_checksum(abs_path), - size=os.path.getsize(abs_path), - mtime=str(datetime.fromtimestamp(os.path.getmtime(abs_path))), - change=PushChangeType.CREATE, - location=os.path.join( + { + "path": abs_path.replace(test_project_dir, "").lstrip("/"), + "location": os.path.join( "v1", abs_path.replace(test_project_dir, "").lstrip("/") ), - diff=None, - ) + "size": os.path.getsize(abs_path), + "checksum": generate_checksum(abs_path), + "mtime": str(datetime.fromtimestamp(os.path.getmtime(abs_path))), + } ) p.latest_version = 1 p.public = True @@ -174,7 +173,14 @@ def initialize(): db.session.add(p) db.session.commit() - pv = ProjectVersion(p, 1, user.id, project_files, "127.0.0.1") + upload_changes = ChangesSchema(context={"version": 1}).load( + { + "added": project_files, + "updated": [], + "removed": [], + } + ) + pv = ProjectVersion(p, 1, user.id, upload_changes, "127.0.0.1") db.session.add(pv) db.session.commit() @@ -279,7 +285,7 @@ def create_blank_version(project): project, project.next_version(), project.creator.id, - [], + UploadChanges(added=[], updated=[], removed=[]), "127.0.0.1", ) db.session.add(pv) @@ -349,12 +355,14 @@ def push_change(project, action, path, src_dir): else: return - file_changes = files_changes_from_upload(changes, version=project.next_version()) + upload_changes = ChangesSchema(context={"version": project.next_version()}).load( + changes + ) pv = ProjectVersion( project, project.next_version(), project.creator.id, - file_changes, + upload_changes, "127.0.0.1", ) db.session.add(pv) diff --git a/server/migrations/community/9b3eac2f21c2_allow_concurrent_uploads.py b/server/migrations/community/9b3eac2f21c2_allow_concurrent_uploads.py deleted file mode 100644 index c4e99eab..00000000 --- a/server/migrations/community/9b3eac2f21c2_allow_concurrent_uploads.py +++ /dev/null @@ -1,85 +0,0 @@ -"""Allow concurrent uploads - -Create partial index to limit at most one blocking upload per project. - -Revision ID: 9b3eac2f21c2 -Revises: 6cb54659c1de -Create Date: 2025-06-10 14:00:30.094460 - -""" - -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = "9b3eac2f21c2" -down_revision = "6cb54659c1de" -branch_labels = None -depends_on = None - - -def upgrade(): - op.add_column("upload", sa.Column("blocking", sa.Boolean(), nullable=True)) - op.drop_index("ix_upload_version", table_name="upload") - op.drop_constraint("uq_upload_project_id", "upload", type_="unique") - op.create_index( - "ix_upload_blocking_partial", - "upload", - ["project_id", "blocking"], - unique=True, - postgresql_where=sa.text("blocking"), - ) - op.drop_column("upload", "version") - - # data migration - set all current uploads to blocking - conn = op.get_bind() - query = """ - UPDATE upload - SET blocking = true; - """ - conn.execute(sa.text(query)) - - -def downgrade(): - op.add_column( - "upload", sa.Column("version", sa.INTEGER(), autoincrement=False, nullable=True) - ) - op.drop_index( - "ix_upload_blocking_partial", - table_name="upload", - postgresql_where=sa.text("blocking"), - ) - - # data migration - remove concurrent uploads, set upload version to project latest version - conn = op.get_bind() - remove_query = """ - WITH multiple_uploads AS ( - SELECT - u.id, - ROW_NUMBER() OVER( - PARTITION BY u.project_id - ORDER BY u.created asc - ) AS row_number - FROM upload u - INNER JOIN project p ON p.id = u.project_id - ) - DELETE FROM upload u - USING multiple_uploads mu - WHERE u.id = mu.id AND mu.row_number > 1; - """ - conn.execute(sa.text(remove_query)) - - update_query = """ - UPDATE upload u - SET version = p.latest_version - FROM project p - WHERE p.id = u.project_id; - """ - conn.execute(sa.text(update_query)) - - op.create_unique_constraint( - "uq_upload_project_id", "upload", ["project_id", "version"] - ) - op.create_index("ix_upload_version", "upload", ["version"], unique=False) - op.drop_column("upload", "blocking")