Skip to content

Commit 5ba7535

Browse files
committed
formatting
1 parent da39507 commit 5ba7535

File tree

4 files changed

+78
-87
lines changed

4 files changed

+78
-87
lines changed

mergin/cli.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,13 @@
3333
download_project_is_running,
3434
)
3535
from mergin.client_pull import pull_project_async, pull_project_is_running, pull_project_finalize, pull_project_cancel
36-
from mergin.client_push import push_project_is_running, push_project_finalize, push_project_cancel, push_project_async, total_upload_size
36+
from mergin.client_push import (
37+
push_project_is_running,
38+
push_project_finalize,
39+
push_project_cancel,
40+
push_project_async,
41+
total_upload_size,
42+
)
3743

3844
from pygeodiff import GeoDiff
3945

@@ -414,6 +420,7 @@ def sync(ctx):
414420
try:
415421
size = total_upload_size(directory)
416422
with click.progressbar(length=size, label="Syncing") as bar:
423+
417424
def on_progress(increment):
418425
bar.update(increment)
419426

@@ -436,6 +443,7 @@ def on_progress(increment):
436443
except Exception as e:
437444
_print_unhandled_exception()
438445

446+
439447
@cli.command()
440448
@click.pass_context
441449
def push(ctx):

mergin/client.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,16 @@
3535
download_diffs_finalize,
3636
pull_project_async,
3737
pull_project_wait,
38-
pull_project_finalize
38+
pull_project_finalize,
39+
)
40+
from .client_push import (
41+
push_project_wait,
42+
push_project_finalize,
43+
push_project_async,
44+
push_project_is_running,
45+
ChangesHandler,
46+
get_change_batch,
3947
)
40-
from .client_push import push_project_wait, push_project_finalize, push_project_async, push_project_is_running, \
41-
ChangesHandler, get_change_batch
4248
from .utils import DateTimeEncoder, get_versions_with_file_changes, int_version, is_version_acceptable
4349
from .version import __version__
4450

@@ -904,7 +910,6 @@ def push_project(self, directory):
904910
:param directory: Project's directory
905911
:type directory: String
906912
"""
907-
# while True:
908913
job = push_project_async(self, directory)
909914
if not job:
910915
return # there is nothing to push (or we only deleted some files)
@@ -978,7 +983,6 @@ def sync_project_with_callback(self, project_dir, progress_callback=None, sleep_
978983
last = now
979984
push_project_finalize(self.push_job)
980985

981-
982986
def clone_project(self, source_project_path, cloned_project_name, cloned_project_namespace=None):
983987
"""
984988
Clone project on server.

mergin/client_push.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ def split(self) -> List[Dict[str, List[dict]]]:
164164
# TODO: apply limits; changes = self._limit_by_file_count(changes)
165165
return changes_list
166166

167+
167168
def get_change_batch(mc, project_dir) -> Tuple[Optional[Dict[str, List[dict]]], bool]:
168169
"""
169170
Return the next changes dictionary and flag if there are more changes (to be uploaded in the next upload job)
@@ -264,7 +265,7 @@ def push_project_async(mc, directory, change_batch=None) -> Optional[UploadJob]:
264265
upload_files = data["changes"]["added"] + data["changes"]["updated"]
265266

266267
transaction_id = server_resp["transaction"] if upload_files else None
267-
exclusive = server_resp.get("exclusive", True)
268+
exclusive = server_resp.get("blocking", True)
268269
job = UploadJob(project_path, changes, transaction_id, mp, mc, tmp_dir, exclusive)
269270

270271
if not upload_files:
@@ -366,7 +367,9 @@ def push_project_finalize(job):
366367

367368
if with_upload_of_files:
368369
try:
369-
job.mp.log.info(f"Finishing {'exclusive' if job.exclusive else 'non-exclusive'} transaction {job.transaction_id}")
370+
job.mp.log.info(
371+
f"Finishing {'exclusive' if job.exclusive else 'non-exclusive'} transaction {job.transaction_id}"
372+
)
370373
resp = job.mc.post("/v1/project/push/finish/%s" % job.transaction_id)
371374
job.server_resp = json.load(resp)
372375
except ClientError as err:
@@ -445,9 +448,6 @@ def total_upload_size(directory) -> int:
445448
mp = MerginProject(directory)
446449
changes = mp.get_push_changes()
447450
files = changes.get("added", []) + changes.get("updated", [])
448-
size = sum(
449-
f.get("diff", {}).get("size", f.get("size", 0))
450-
for f in files
451-
)
451+
size = sum(f.get("diff", {}).get("size", f.get("size", 0)) for f in files)
452452
mp.log.info(f"Upload size of all files is {size}")
453453
return size

mergin/test/test_client.py

Lines changed: 54 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
TokenError,
2727
ServerType,
2828
)
29-
from ..client_push import push_next_change, push_project_cancel, ChangesHandler, _do_upload
29+
from ..client_push import push_project_cancel, ChangesHandler
3030
from ..client_pull import (
3131
download_project_async,
3232
download_project_wait,
@@ -2899,10 +2899,10 @@ def test_mc_without_login():
28992899
with pytest.raises(ClientError, match="Authentication information is missing or invalid."):
29002900
mc.workspaces_list()
29012901

2902+
29022903
def _sort_dict_of_files_by_path(d):
2903-
return {
2904-
k: sorted(v, key=lambda f: f["path"]) for k, v in d.items()
2905-
}
2904+
return {k: sorted(v, key=lambda f: f["path"]) for k, v in d.items()}
2905+
29062906

29072907
def test_changes_handler(mc):
29082908
"""
@@ -2937,7 +2937,8 @@ def test_changes_handler(mc):
29372937
# import sqlite3
29382938
# import os
29392939

2940-
def inflate_gpkg(path, blob_size_bytes=1024*1024, rows=50):
2940+
2941+
def inflate_gpkg(path, blob_size_bytes=1024 * 1024, rows=50):
29412942
"""
29422943
Append a table named 'inflate' to the GeoPackage at `path`,
29432944
then insert `rows` rows each containing a BLOB of size `blob_size_bytes`.
@@ -2949,48 +2950,51 @@ def inflate_gpkg(path, blob_size_bytes=1024*1024, rows=50):
29492950
con = sqlite3.connect(path)
29502951
cur = con.cursor()
29512952
# 1) create the dummy table if it doesn't already exist
2952-
cur.execute("""
2953+
cur.execute(
2954+
"""
29532955
CREATE TABLE IF NOT EXISTS inflate (
29542956
id INTEGER PRIMARY KEY,
29552957
data BLOB NOT NULL
29562958
);
2957-
""")
2959+
"""
2960+
)
29582961
# 2) prepare one blob of the given size
2959-
dummy_blob = sqlite3.Binary(b'\x00' * blob_size_bytes)
2962+
dummy_blob = sqlite3.Binary(b"\x00" * blob_size_bytes)
29602963
# 3) insert a bunch of rows
29612964
for _ in range(rows):
29622965
cur.execute("INSERT INTO inflate (data) VALUES (?);", (dummy_blob,))
29632966
con.commit()
29642967
con.close()
29652968

29662969

2967-
# def _make_slow_upload(delay: float):
2968-
# """
2969-
# Helper to mock up a slow upload
2970-
# """
2971-
# def slow_upload(item, job):
2972-
# time.sleep(delay) # delay in seconds for each chunk upload
2973-
# return _do_upload(item, job)
2974-
# return slow_upload
2975-
#
2976-
#
2977-
# def _delayed_push(mc: MerginClient, directory: str, delay: float):
2978-
# """
2979-
# Patches chunks upload during project push
2980-
# """
2981-
# with patch("mergin.client_push._do_upload", side_effect=_make_slow_upload(delay)):
2982-
# return mc.push_project(directory)
2983-
2970+
def create_dummy_photos(dir_path, count=20, size_kb=5000):
2971+
"""Create `count` dummy JPG files in `dir_path` with ~`size_kb` each."""
2972+
os.makedirs(dir_path, exist_ok=True)
2973+
for i in range(count):
2974+
filename = os.path.join(dir_path, f"photo_{i:03}.jpg")
2975+
with open(filename, "wb") as f:
2976+
f.write(os.urandom(size_kb * 1024)) # Random bytes to simulate real file
29842977

29852978

29862979
files_to_push = [
2987-
# ("base.gpkg", "inserted_1_A.gpkg", False), # both pushes are exclusive, the latter one is refused
2988-
("inserted_1_A.gpkg", "test.txt", True), # the second push is non-exclusive - it is free to go
2989-
# ("test3.txt", "inserted_1_A_mod.gpkg", True), # the first push is non-exclusive - it does not block other pushes
2980+
(
2981+
"base.gpkg",
2982+
"inserted_1_A.gpkg",
2983+
False,
2984+
"another_process",
2985+
), # both pushes are exclusive, the latter one is refused
2986+
(
2987+
"inserted_1_A.gpkg",
2988+
"test.txt",
2989+
False,
2990+
"version_conflict",
2991+
), # small files pushed at the same time might result in version conflict due to race condition
2992+
("inserted_1_A.gpkg", "many_photos", True, None), # the upload of many photos does not block the other upload
29902993
]
29912994

2992-
@pytest.mark.parametrize("file1,file2,success", files_to_push)
2993-
def test_exclusive_upload(mc, mc2, file1, file2, success):
2995+
2996+
@pytest.mark.parametrize("file1,file2,success,fail_reason", files_to_push)
2997+
def test_exclusive_upload(mc, mc2, file1, file2, success, fail_reason):
29942998
"""
29952999
Test two clients pushing at the same time
29963000
"""
@@ -3006,58 +3010,33 @@ def test_exclusive_upload(mc, mc2, file1, file2, success):
30063010
mc.add_project_collaborator(project_info["id"], API_USER2, ProjectRole.WRITER)
30073011
mc2.download_project(project_full_name, project_dir2)
30083012

3009-
def push1():
3010-
mc.push_project(project_dir1)
3013+
def sync1():
3014+
mc.sync_project(project_dir1)
30113015

3012-
def push2():
3013-
mc2.push_project(project_dir2)
3016+
def sync2():
3017+
mc2.sync_project(project_dir2)
30143018

3015-
# with open(os.path.join(project_dir1, file1), "wb") as f:
3016-
# f.write(os.urandom(50 * 1024 * 1024)) # 50 MB
30173019
shutil.copy(os.path.join(TEST_DATA_DIR, file1), project_dir1)
3018-
shutil.copy(os.path.join(TEST_DATA_DIR, file2), project_dir2)
3019-
big_gpkg = os.path.join(project_dir1, file1)
3020-
# this will add ~50 MB of zero‐bytes to the file
3021-
inflate_gpkg(big_gpkg, blob_size_bytes=1_000_000, rows=50)
3022-
3023-
# first_upload_delay = 2
3024-
# resp1 = _delayed_push(mc, project_dir1, first_upload_delay)
3025-
# resp2 = _delayed_push(mc, project_dir2, 0)
3026-
# if not success:
3027-
# resp1.
3028-
3029-
# run both pushes concurrently
3030-
# with patch("mergin.client_push._do_upload", side_effect=_slow_upload):
3020+
if file2 == "many_photos":
3021+
create_dummy_photos(project_dir2)
3022+
else:
3023+
shutil.copy(os.path.join(TEST_DATA_DIR, file2), project_dir2)
3024+
30313025
with concurrent.futures.ThreadPoolExecutor() as executor:
3032-
future1 = executor.submit(push1)
3033-
future2 = executor.submit(push2)
3034-
# first_upload_delay = 2
3035-
# future1 = executor.submit(_delayed_push, mc, project_dir1, first_upload_delay)
3036-
# future2 = executor.submit(_delayed_push, mc2, project_dir2, 0)
3037-
# time.sleep(first_upload_delay + 0.2)
3038-
# assert not future1.exception()
3026+
future1 = executor.submit(sync1)
3027+
future2 = executor.submit(sync2)
30393028
exc2 = future2.exception()
30403029
exc1 = future1.exception()
3041-
# assert not exc2
30423030

30433031
if not success:
30443032
error = exc1 if exc1 else exc2 # one is uploads is lucky to pass the other was slow
3045-
assert (exc1 is None or exc2 is None)
3033+
assert exc1 is None or exc2 is None
30463034
assert isinstance(error, ClientError)
3047-
assert error.detail == "Another process is running. Please try later."
3048-
3049-
# assert type(exc1) is ClientError
3050-
# assert exc1.http_error == 400
3051-
# assert exc1.detail == "Another process is running. Please try later."
3052-
else:
3053-
# assert not exc1
3054-
assert not (exc1 or exc2)
3055-
# assert (exc1 is None and isinstance(exc2, ClientError) or (exc2 is None and isinstance(exc1, ClientError)))
3056-
# if not success:
3057-
# assert type(exc2) is ClientError
3058-
# assert exc2.http_error == 400
3059-
# assert exc2.detail == "Another process is running. Please try later."
3060-
# else:
3061-
# assert not exc2
3062-
3063-
3035+
if fail_reason == "another_process":
3036+
assert error.http_error == 400
3037+
assert error.detail == "Another process is running. Please try later."
3038+
elif fail_reason == "version_conflict":
3039+
assert error.http_error == 409
3040+
assert error.detail == "There is already version with this name v1"
3041+
else:
3042+
assert not (exc1 or exc2)

0 commit comments

Comments
 (0)