Skip to content

Commit 7ac175f

Browse files
varmar05wonder-sk
authored andcommitted
Added multithreaded (parallel) upload and download to speed up sync (closes #9)
1 parent bda6f42 commit 7ac175f

File tree

2 files changed

+159
-44
lines changed

2 files changed

+159
-44
lines changed

cli.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,14 @@ def init(project, directory, public):
9393
@cli.command()
9494
@click.argument('project')
9595
@click.argument('directory', type=click.Path(), required=False)
96-
def download(project, directory):
96+
@click.option('--parallel/--no-parallel', default=True, help='Download by sending parallel requests')
97+
def download(project, directory, parallel):
9798
"""Download last version of mergin project"""
98-
9999
c = _init_client()
100100
directory = directory or os.path.basename(project)
101101
click.echo('Downloading into {}'.format(directory))
102102
try:
103-
c.download_project(project, directory)
103+
c.download_project(project, directory, parallel)
104104
click.echo('Done')
105105
except Exception as e:
106106
click.secho(str(e), fg='red')
@@ -109,6 +109,7 @@ def download(project, directory):
109109
def num_version(name):
110110
return int(name.lstrip("v"))
111111

112+
112113
@cli.command()
113114
def status():
114115
"""Show all changes in project files - upstream and local"""
@@ -162,12 +163,13 @@ def status():
162163

163164

164165
@cli.command()
165-
def push():
166+
@click.option('--parallel/--no-parallel', default=True, help='Upload by sending parallel requests')
167+
def push(parallel):
166168
"""Upload local changes into Mergin repository"""
167169

168170
c = _init_client()
169171
try:
170-
c.push_project(os.getcwd())
172+
c.push_project(os.getcwd(), parallel)
171173
click.echo('Done')
172174
except InvalidProject:
173175
click.echo('Invalid project directory')
@@ -176,12 +178,13 @@ def push():
176178

177179

178180
@cli.command()
179-
def pull():
181+
@click.option('--parallel/--no-parallel', default=True, help='Download by sending parallel requests')
182+
def pull(parallel):
180183
"""Fetch changes from Mergin repository"""
181184

182185
c = _init_client()
183186
try:
184-
c.pull_project(os.getcwd())
187+
c.pull_project(os.getcwd(), parallel)
185188
click.echo('Done')
186189
except InvalidProject:
187190
click.secho('Invalid project directory', fg='red')

mergin/client.py

Lines changed: 149 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,14 @@
1010
import math
1111
import hashlib
1212
from datetime import datetime, timezone
13+
import concurrent.futures
1314

1415
this_dir = os.path.dirname(os.path.realpath(__file__))
15-
CHUNK_SIZE = 10 * 1024 * 1024
16+
17+
CHUNK_SIZE = 100 * 1024 * 1024
18+
# there is an upper limit for chunk size on server, ideally should be requested from there once implemented
19+
UPLOAD_CHUNK_SIZE = 10 * 1024 * 1024
20+
1621
IGNORE_EXT = re.compile(r'({})$'.format(
1722
'|'.join(re.escape(x) for x in ['-shm', '-wal', '~', 'pyc', 'swap'])
1823
))
@@ -360,7 +365,7 @@ def project_versions(self, project_path):
360365
resp = self.get("/v1/project/version/{}".format(project_path))
361366
return json.load(resp)
362367

363-
def download_project(self, project_path, directory):
368+
def download_project(self, project_path, directory, parallel=True):
364369
"""
365370
Download latest version of project into given directory.
366371
@@ -369,6 +374,9 @@ def download_project(self, project_path, directory):
369374
370375
:param directory: Target directory
371376
:type directory: String
377+
378+
:param parallel: Use multi-thread approach to download files in parallel requests, default True
379+
:type parallel: Boolean
372380
"""
373381
if os.path.exists(directory):
374382
raise Exception("Project directory already exists")
@@ -377,8 +385,23 @@ def download_project(self, project_path, directory):
377385
project_info = self.project_info(project_path)
378386
version = project_info['version'] if project_info['version'] else 'v0'
379387

380-
for file in project_info['files']:
381-
self._download_file(project_path, version, file, directory)
388+
# sending parallel requests is good for projects with a lot of small files
389+
if parallel:
390+
with concurrent.futures.ThreadPoolExecutor() as executor:
391+
futures_map = {}
392+
for file in project_info['files']:
393+
future = executor.submit(self._download_file, project_path, version, file, directory, parallel)
394+
futures_map[future] = file
395+
396+
for future in concurrent.futures.as_completed(futures_map):
397+
file = futures_map[future]
398+
try:
399+
future.result(60)
400+
except concurrent.futures.TimeoutError:
401+
raise ClientError("Timeout error: failed to download {}".format(file))
402+
else:
403+
for file in project_info['files']:
404+
self._download_file(project_path, version, file, directory, parallel)
382405

383406
data = {
384407
"name": project_path,
@@ -387,12 +410,14 @@ def download_project(self, project_path, directory):
387410
}
388411
save_project_file(directory, data)
389412

390-
def push_project(self, directory):
413+
def push_project(self, directory, parallel=True):
391414
"""
392415
Upload local changes to the repository.
393416
394417
:param directory: Project's directory
395418
:type directory: String
419+
:param parallel: Use multi-thread approach to upload files in parallel requests, defaults to True
420+
:type parallel: Boolean
396421
"""
397422
local_info = inspect_project(directory)
398423
project_path = local_info["name"]
@@ -408,7 +433,7 @@ def push_project(self, directory):
408433

409434
upload_files = changes["added"] + changes["updated"]
410435
for f in upload_files:
411-
f["chunks"] = [str(uuid.uuid4()) for i in range(math.ceil(f["size"] / CHUNK_SIZE))]
436+
f["chunks"] = [str(uuid.uuid4()) for i in range(math.ceil(f["size"] / UPLOAD_CHUNK_SIZE))]
412437

413438
data = {
414439
"version": local_info.get("version"),
@@ -419,19 +444,23 @@ def push_project(self, directory):
419444

420445
# upload files' chunks and close transaction
421446
if upload_files:
422-
headers = {"Content-Type": "application/octet-stream"}
423-
for f in upload_files:
424-
with open(os.path.join(directory, f["path"]), 'rb') as file:
425-
for chunk in f["chunks"]:
426-
data = file.read(CHUNK_SIZE)
427-
checksum = hashlib.sha1()
428-
checksum.update(data)
429-
size = len(data)
430-
resp = self.post("/v1/project/push/chunk/%s/%s" % (info["transaction"], chunk), data, headers)
431-
data = json.load(resp)
432-
if not (data['size'] == size and data['checksum'] == checksum.hexdigest()):
433-
self.post("/v1/project/push/cancel/%s" % info["transaction"])
434-
raise ClientError("Mismatch between uploaded file and local one")
447+
if parallel:
448+
with concurrent.futures.ThreadPoolExecutor() as executor:
449+
futures_map = {}
450+
for file in upload_files:
451+
future = executor.submit(self._upload_file, info["transaction"], directory, file, parallel)
452+
futures_map[future] = file
453+
454+
for future in concurrent.futures.as_completed(futures_map):
455+
file = futures_map[future]
456+
try:
457+
future.result(60)
458+
except concurrent.futures.TimeoutError:
459+
raise ClientError("Timeout error: failed to upload {}".format(file))
460+
else:
461+
for file in upload_files:
462+
self._upload_file(info["transaction"], directory, file, parallel)
463+
435464
try:
436465
resp = self.post("/v1/project/push/finish/%s" % info["transaction"])
437466
info = json.load(resp)
@@ -443,12 +472,14 @@ def push_project(self, directory):
443472
local_info["version"] = info["version"]
444473
save_project_file(directory, local_info)
445474

446-
def pull_project(self, directory):
475+
def pull_project(self, directory, parallel=True):
447476
"""
448477
Fetch and apply changes from repository.
449478
450479
:param directory: Project's directory
451480
:type directory: String
481+
:param parallel: Use multi-thread approach to fetch files in parallel requests, defaults to True
482+
:type parallel: Boolean
452483
"""
453484

454485
local_info = inspect_project(directory)
@@ -485,12 +516,31 @@ def backup_if_conflict(path, checksum):
485516
fetch_files = pull_changes["added"] + pull_changes["updated"]
486517
if fetch_files:
487518
temp_dir = os.path.join(directory, '.mergin', 'fetch_{}-{}'.format(local_info["version"], server_info["version"]))
488-
for file in fetch_files:
489-
self._download_file(project_path, server_info['version'], file, temp_dir)
490-
src = os.path.join(temp_dir, file["path"])
491-
dest = local_path(file["path"])
492-
backup_if_conflict(file["path"], file["checksum"])
493-
move_file(src, dest)
519+
# sending parallel requests is good for projects with a lot of small files
520+
if parallel:
521+
with concurrent.futures.ThreadPoolExecutor() as executor:
522+
futures_map = {}
523+
for file in fetch_files:
524+
future = executor.submit(project_path, server_info['version'], file, temp_dir, parallel)
525+
futures_map[future] = file
526+
527+
for future in concurrent.futures.as_completed(futures_map):
528+
file = futures_map[future]
529+
try:
530+
future.result(60)
531+
except concurrent.futures.TimeoutError:
532+
raise ClientError("Timeout error: failed to download {}".format(file))
533+
src = os.path.join(temp_dir, file["path"])
534+
dest = local_path(file["path"])
535+
backup_if_conflict(file["path"], file["checksum"])
536+
move_file(src, dest)
537+
else:
538+
for file in fetch_files:
539+
self._download_file(project_path, server_info['version'], file, temp_dir, parallel)
540+
src = os.path.join(temp_dir, file["path"])
541+
dest = local_path(file["path"])
542+
backup_if_conflict(file["path"], file["checksum"])
543+
move_file(src, dest)
494544
shutil.rmtree(temp_dir)
495545

496546
for file in pull_changes["removed"]:
@@ -505,7 +555,7 @@ def backup_if_conflict(path, checksum):
505555
local_info["version"] = server_info["version"] if server_info["version"] else 'v0'
506556
save_project_file(directory, local_info)
507557

508-
def _download_file(self, project_path, project_version, file, directory):
558+
def _download_file(self, project_path, project_version, file, directory, parallel=True):
509559
"""
510560
Helper to download single project file from server in chunks.
511561
@@ -517,29 +567,49 @@ def _download_file(self, project_path, project_version, file, directory):
517567
:type file: dict
518568
:param directory: Project's directory
519569
:type directory: String
570+
:param parallel: Use multi-thread approach to download parts in parallel requests, default True
571+
:type parallel: Boolean
520572
"""
521573
query_params = {
522574
"file": file['path'],
523575
"version": project_version
524576
}
525577
file_dir = os.path.dirname(os.path.normpath(os.path.join(directory, file['path'])))
526578
basename = os.path.basename(file['path'])
527-
length = 0
528-
count = 0
529-
while length < file['size']:
530-
range_header = {"Range": "bytes={}-{}".format(length, length + CHUNK_SIZE)}
579+
580+
def download_file_part(part):
581+
"""Callback to get a part of file using request to server with Range header."""
582+
start = part * (1 + CHUNK_SIZE)
583+
range_header = {"Range": "bytes={}-{}".format(start, start + CHUNK_SIZE)}
531584
resp = self.get("/v1/project/raw/{}".format(project_path), data=query_params, headers=range_header)
532585
if resp.status in [200, 206]:
533-
save_to_file(resp, os.path.join(file_dir, basename+".{}".format(count)))
534-
length += (CHUNK_SIZE + 1)
535-
count += 1
586+
save_to_file(resp, os.path.join(file_dir, basename + ".{}".format(part)))
587+
else:
588+
raise ClientError('Failed to download part {} of file {}'.format(part, basename))
589+
590+
# download large files in chunks is beneficial mostly for retry on failure
591+
chunks = math.ceil(file['size'] / CHUNK_SIZE)
592+
if parallel:
593+
# create separate n threads, default as cores * 5
594+
with concurrent.futures.ThreadPoolExecutor() as executor:
595+
futures_map = {executor.submit(download_file_part, i): i for i in range(chunks)}
596+
for future in concurrent.futures.as_completed(futures_map):
597+
i = futures_map[future]
598+
try:
599+
future.result(60)
600+
except concurrent.futures.TimeoutError:
601+
raise ClientError('Timeout error: failed to download part {} of file {}'.format(i, basename))
602+
else:
603+
for i in range(chunks):
604+
download_file_part(i)
536605

537606
# merge chunks together
538607
with open(os.path.join(file_dir, basename), 'wb') as final:
539-
for i in range(count):
540-
with open(os.path.join(directory, file['path'] + ".{}".format(i)), 'rb') as chunk:
608+
for i in range(chunks):
609+
file_part = os.path.join(directory, file['path'] + ".{}".format(i))
610+
with open(file_part, 'rb') as chunk:
541611
shutil.copyfileobj(chunk, final)
542-
os.remove(os.path.join(directory, file['path'] + ".{}".format(i)))
612+
os.remove(file_part)
543613

544614
def delete_project(self, project_path):
545615
"""
@@ -553,3 +623,45 @@ def delete_project(self, project_path):
553623
url = urllib.parse.urljoin(self.url, urllib.parse.quote(path))
554624
request = urllib.request.Request(url, method="DELETE")
555625
self._do_request(request)
626+
627+
def _upload_file(self, transaction, project_dir, file_meta, parallel=True):
628+
"""
629+
Upload file in open upload transaction.
630+
631+
:param transaction: transaction uuid
632+
:type transaction: String
633+
:param project_dir: local project directory
634+
:type project_dir: String
635+
:param file_meta: metadata for file to upload
636+
:type file_meta: Dict
637+
:param parallel: Use multi-thread approach to upload file chunks in parallel requests, defaults to True
638+
:type parallel: Boolean
639+
:raises ClientError: raise on data integrity check failure
640+
"""
641+
headers = {"Content-Type": "application/octet-stream"}
642+
file_path = os.path.join(project_dir, file_meta["path"])
643+
644+
def upload_chunk(chunk_id, data):
645+
checksum = hashlib.sha1()
646+
checksum.update(data)
647+
size = len(data)
648+
resp = self.post("/v1/project/push/chunk/{}/{}".format(transaction, chunk_id), data, headers)
649+
data = json.load(resp)
650+
if not (data['size'] == size and data['checksum'] == checksum.hexdigest()):
651+
self.post("/v1/project/push/cancel/{}".format(transaction))
652+
raise ClientError("Mismatch between uploaded file chunk {} and local one".format(chunk))
653+
654+
with open(file_path, 'rb') as file:
655+
if parallel:
656+
with concurrent.futures.ThreadPoolExecutor() as executor:
657+
futures_map = {executor.submit(upload_chunk, chunk, file.read(UPLOAD_CHUNK_SIZE)): chunk for chunk in file_meta["chunks"]}
658+
for future in concurrent.futures.as_completed(futures_map):
659+
chunk = futures_map[future]
660+
try:
661+
future.result(60)
662+
except concurrent.futures.TimeoutError:
663+
raise ClientError('Timeout error: failed to upload chunk {}'.format(chunk))
664+
else:
665+
for chunk in file_meta["chunks"]:
666+
data = file.read(UPLOAD_CHUNK_SIZE)
667+
upload_chunk(chunk, data)

0 commit comments

Comments
 (0)