Skip to content

Commit 88cea53

Browse files
committed
Wait till change upload is done before uploading next change
1 parent 61f9dbd commit 88cea53

File tree

4 files changed

+108
-101
lines changed

4 files changed

+108
-101
lines changed

mergin/cli.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
download_project_is_running,
3434
)
3535
from mergin.client_pull import pull_project_async, pull_project_is_running, pull_project_finalize, pull_project_cancel
36-
from mergin.client_push import push_project_async, push_project_is_running, push_project_finalize, push_project_cancel
36+
from mergin.client_push import push_next_change, push_project_is_running, push_project_finalize, push_project_cancel
3737

3838

3939
from pygeodiff import GeoDiff
@@ -412,27 +412,32 @@ def push(ctx):
412412
return
413413
directory = os.getcwd()
414414
try:
415-
jobs = push_project_async(mc, directory)
416-
for job in jobs:
417-
if job is not None: # if job is none, we don't upload any files, and the transaction is finished already
418-
with click.progressbar(length=job.total_size) as bar:
419-
last_transferred_size = 0
420-
while push_project_is_running(job):
421-
time.sleep(1 / 10) # 100ms
422-
new_transferred_size = job.transferred_size
423-
bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only
424-
last_transferred_size = new_transferred_size
425-
push_project_finalize(job)
426-
click.echo("Done")
415+
# keep going until there are no more changes
416+
while True:
417+
job = push_next_change(mc, directory)
418+
if job is None:
419+
click.echo("All changes uploaded.")
420+
break
421+
422+
# show progress for this single change upload
423+
with click.progressbar(length=job.total_size, label="Uploading change") as bar:
424+
last_transferred_size = 0
425+
while push_project_is_running(job):
426+
time.sleep(1 / 10) # 100ms
427+
new_transferred_size = job.transferred_size
428+
bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only
429+
last_transferred_size = new_transferred_size
430+
# finalize this change upload (bump versions on server & locally)
431+
push_project_finalize(job)
432+
click.echo("Change pushed, checking for more…")
427433
except InvalidProject as e:
428434
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
429435
except ClientError as e:
430436
click.secho("Error: " + str(e), fg="red")
431437
return
432438
except KeyboardInterrupt:
433439
click.secho("Cancelling...")
434-
for job in jobs:
435-
push_project_cancel(job)
440+
push_project_cancel(job)
436441
except Exception as e:
437442
_print_unhandled_exception()
438443

mergin/client.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
download_diffs_finalize,
3434
)
3535
from .client_pull import pull_project_async, pull_project_wait, pull_project_finalize
36-
from .client_push import push_project_async, push_project_wait, push_project_finalize
36+
from .client_push import push_next_change, push_project_wait, push_project_finalize
3737
from .utils import DateTimeEncoder, get_versions_with_file_changes, int_version, is_version_acceptable
3838
from .version import __version__
3939

@@ -897,10 +897,10 @@ def push_project(self, directory):
897897
:param directory: Project's directory
898898
:type directory: String
899899
"""
900-
jobs = push_project_async(self, directory)
901-
if not jobs:
902-
return # there is nothing to push (or we only deleted some files)
903-
for job in jobs:
900+
while True:
901+
job = push_next_change(self, directory)
902+
if not job:
903+
return # there is nothing to push (or we only deleted some files)
904904
push_project_wait(job)
905905
push_project_finalize(job)
906906

mergin/client_pull.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def download_project_async(mc, project_path, directory, project_version=None):
129129
"""
130130

131131
if "/" not in project_path:
132-
raise ClientError("Project name needs to be fully qualified, e.g. <username>/<projectname>")
132+
raise ClientError("Project name needs to be fully qualified, e.g. <workspacename>/<projectname>")
133133
if os.path.exists(directory):
134134
raise ClientError("Project directory already exists")
135135
os.makedirs(directory)

mergin/client_push.py

Lines changed: 82 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import concurrent.futures
1717
import os
1818
from typing import Dict, List, Optional
19+
import click
1920

2021
from .common import UPLOAD_CHUNK_SIZE, ClientError
2122
from .merginproject import MerginProject
@@ -154,13 +155,13 @@ def split(self) -> List[Dict[str, List[dict]]]:
154155
Applies all configured internal filters and returns a list of change ready to be uploaded.
155156
"""
156157
changes = self._filter_changes(self._raw_changes)
157-
changes = self._split_by_type(changes)
158+
changes_list = self._split_by_type(changes)
158159
# TODO: apply limits; changes = self._limit_by_file_count(changes)
159-
return changes
160+
return changes_list
160161

161162

162-
def push_project_async(mc, directory) -> Optional[List[UploadJob]]:
163-
"""Starts push of a project and returns pending upload jobs"""
163+
def push_next_change(mc, directory) -> Optional[UploadJob]:
164+
"""Starts push of a change of a project and returns pending upload job"""
164165

165166
mp = MerginProject(directory)
166167
if mp.has_unfinished_pull():
@@ -197,85 +198,86 @@ def push_project_async(mc, directory) -> Optional[List[UploadJob]]:
197198
+ f"\n\nLocal version: {local_version}\nServer version: {server_version}"
198199
)
199200

200-
changes = mp.get_push_changes()
201-
changes_handler = ChangesHandler(mc, project_info, changes)
202-
changes_list = changes_handler.split()
201+
all_changes = mp.get_push_changes()
202+
changes_list = ChangesHandler(mc, project_info, all_changes).split()
203+
if not changes_list:
204+
return None
203205

204-
tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")
205-
jobs = []
206-
207-
for changes in changes_list:
208-
mp.log.debug("push changes:\n" + pprint.pformat(changes))
209-
210-
# If there are any versioned files (aka .gpkg) that are not updated through a diff,
211-
# we need to make a temporary copy somewhere to be sure that we are uploading full content.
212-
# That's because if there are pending transactions, checkpointing or switching from WAL mode
213-
# won't work, and we would end up with some changes left in -wal file which do not get
214-
# uploaded. The temporary copy using geodiff uses sqlite backup API and should copy everything.
215-
for f in changes["updated"]:
216-
if mp.is_versioned_file(f["path"]) and "diff" not in f:
217-
mp.copy_versioned_file_for_upload(f, tmp_dir.name)
218-
219-
for f in changes["added"]:
220-
if mp.is_versioned_file(f["path"]):
221-
mp.copy_versioned_file_for_upload(f, tmp_dir.name)
222-
223-
if not any(len(v) for v in changes.values()):
224-
mp.log.info(f"--- push {project_path} - nothing to do")
225-
return
206+
# take only the first change
207+
change = changes_list[0]
208+
mp.log.debug("push change:\n" + pprint.pformat(change))
226209

227-
# drop internal info from being sent to server
228-
for item in changes["updated"]:
229-
item.pop("origin_checksum", None)
230-
data = {"version": local_version, "changes": changes}
210+
tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")
231211

232-
try:
233-
resp = mc.post(
234-
f"/v1/project/push/{project_path}",
235-
data,
236-
{"Content-Type": "application/json"},
237-
)
238-
except ClientError as err:
239-
mp.log.error("Error starting transaction: " + str(err))
240-
mp.log.info("--- push aborted")
241-
raise
242-
server_resp = json.load(resp)
243-
244-
upload_files = data["changes"]["added"] + data["changes"]["updated"]
245-
246-
transaction_id = server_resp["transaction"] if upload_files else None
247-
job = UploadJob(project_path, changes, transaction_id, mp, mc, tmp_dir)
248-
249-
if not upload_files:
250-
mp.log.info("not uploading any files")
251-
job.server_resp = server_resp
252-
push_project_finalize(job)
253-
return None # all done - no pending job
254-
255-
mp.log.info(f"got transaction ID {transaction_id}")
256-
257-
upload_queue_items = []
258-
total_size = 0
259-
# prepare file chunks for upload
260-
for file in upload_files:
261-
if "diff" in file:
262-
# versioned file - uploading diff
263-
file_location = mp.fpath_meta(file["diff"]["path"])
264-
file_size = file["diff"]["size"]
265-
elif "upload_file" in file:
266-
# versioned file - uploading full (a temporary copy)
267-
file_location = file["upload_file"]
268-
file_size = file["size"]
269-
else:
270-
# non-versioned file
271-
file_location = mp.fpath(file["path"])
272-
file_size = file["size"]
212+
# If there are any versioned files (aka .gpkg) that are not updated through a diff,
213+
# we need to make a temporary copy somewhere to be sure that we are uploading full content.
214+
# That's because if there are pending transactions, checkpointing or switching from WAL mode
215+
# won't work, and we would end up with some changes left in -wal file which do not get
216+
# uploaded. The temporary copy using geodiff uses sqlite backup API and should copy everything.
217+
for f in change["updated"]:
218+
if mp.is_versioned_file(f["path"]) and "diff" not in f:
219+
mp.copy_versioned_file_for_upload(f, tmp_dir.name)
220+
221+
for f in change["added"]:
222+
if mp.is_versioned_file(f["path"]):
223+
mp.copy_versioned_file_for_upload(f, tmp_dir.name)
224+
225+
if not any(len(v) for v in change.values()):
226+
mp.log.info(f"--- push {project_path} - nothing to do")
227+
return
273228

274-
for chunk_index, chunk_id in enumerate(file["chunks"]):
275-
size = min(UPLOAD_CHUNK_SIZE, file_size - chunk_index * UPLOAD_CHUNK_SIZE)
276-
upload_queue_items.append(UploadQueueItem(file_location, size, transaction_id, chunk_id, chunk_index))
229+
# drop internal info from being sent to server
230+
for item in change["updated"]:
231+
item.pop("origin_checksum", None)
232+
data = {"version": local_version, "changes": change}
277233

278-
total_size += file_size
234+
try:
235+
resp = mc.post(
236+
f"/v1/project/push/{project_path}",
237+
data,
238+
{"Content-Type": "application/json"},
239+
)
240+
except ClientError as err:
241+
mp.log.error("Error starting transaction: " + str(err))
242+
mp.log.info("--- push aborted")
243+
raise
244+
server_resp = json.load(resp)
245+
246+
upload_files = data["changes"]["added"] + data["changes"]["updated"]
247+
248+
transaction_id = server_resp["transaction"] if upload_files else None
249+
job = UploadJob(project_path, change, transaction_id, mp, mc, tmp_dir)
250+
251+
if not upload_files:
252+
mp.log.info("not uploading any files")
253+
job.server_resp = server_resp
254+
push_project_finalize(job)
255+
return None # all done - no pending job
256+
257+
mp.log.info(f"got transaction ID {transaction_id}")
258+
259+
upload_queue_items = []
260+
total_size = 0
261+
# prepare file chunks for upload
262+
for file in upload_files:
263+
if "diff" in file:
264+
# versioned file - uploading diff
265+
file_location = mp.fpath_meta(file["diff"]["path"])
266+
file_size = file["diff"]["size"]
267+
elif "upload_file" in file:
268+
# versioned file - uploading full (a temporary copy)
269+
file_location = file["upload_file"]
270+
file_size = file["size"]
271+
else:
272+
# non-versioned file
273+
file_location = mp.fpath(file["path"])
274+
file_size = file["size"]
275+
276+
for chunk_index, chunk_id in enumerate(file["chunks"]):
277+
size = min(UPLOAD_CHUNK_SIZE, file_size - chunk_index * UPLOAD_CHUNK_SIZE)
278+
upload_queue_items.append(UploadQueueItem(file_location, size, transaction_id, chunk_id, chunk_index))
279+
280+
total_size += file_size
279281

280282
job.total_size = total_size
281283
job.upload_queue_items = upload_queue_items
@@ -287,8 +289,8 @@ def push_project_async(mc, directory) -> Optional[List[UploadJob]]:
287289
for item in upload_queue_items:
288290
future = job.executor.submit(_do_upload, item, job)
289291
job.futures.append(future)
290-
jobs.append(job)
291-
return jobs
292+
293+
return job
292294

293295

294296
def push_project_wait(job):

0 commit comments

Comments
 (0)