@@ -134,6 +134,22 @@ def is_versioned_file(self, file):
134
134
f_extension = os .path .splitext (file )[1 ]
135
135
return f_extension in diff_extensions
136
136
137
+ def is_gpkg_open (self , path ):
138
+ """
139
+ Check whether geopackage file is open (and wal file exists)
140
+
141
+ :param path: absolute path of file on disk
142
+ :type path: str
143
+ :returns: whether file is open
144
+ :rtype: bool
145
+ """
146
+ f_extension = os .path .splitext (path )[1 ]
147
+ if f_extension != '.gpkg' :
148
+ return False
149
+ if os .path .exists (f'{ path } -wal' ):
150
+ return True
151
+ return False
152
+
137
153
def ignore_file (self , file ):
138
154
"""
139
155
Helper function for blacklisting certain types of files.
@@ -165,6 +181,7 @@ def inspect_files(self):
165
181
for file in files :
166
182
if self .ignore_file (file ):
167
183
continue
184
+
168
185
abs_path = os .path .abspath (os .path .join (root , file ))
169
186
rel_path = os .path .relpath (abs_path , start = self .dir )
170
187
proj_path = '/' .join (rel_path .split (os .path .sep )) # we need posix path
@@ -222,7 +239,8 @@ def compare_file_sets(self, origin, current):
222
239
path = f ["path" ]
223
240
if path not in origin_map :
224
241
continue
225
- if f ["checksum" ] == origin_map [path ]["checksum" ]:
242
+ # with open WAL files we don't know yet, better to mark file as updated
243
+ if not self .is_gpkg_open (self .fpath (path )) and f ["checksum" ] == origin_map [path ]["checksum" ]:
226
244
continue
227
245
f ["origin_checksum" ] = origin_map [path ]["checksum" ]
228
246
updated .append (f )
@@ -298,7 +316,12 @@ def get_push_changes(self):
298
316
:rtype: dict
299
317
"""
300
318
changes = self .compare_file_sets (self .metadata ['files' ], self .inspect_files ())
319
+ # do checkpoint to push changes from wal file to gpkg
301
320
for file in changes ['added' ] + changes ['updated' ]:
321
+ size , checksum = do_sqlite_checkpoint (self .fpath (file ["path" ]))
322
+ if size and checksum :
323
+ file ["size" ] = size
324
+ file ["checksum" ] = checksum
302
325
file ['chunks' ] = [str (uuid .uuid4 ()) for i in range (math .ceil (file ["size" ] / UPLOAD_CHUNK_SIZE ))]
303
326
304
327
if not self .geodiff :
@@ -311,8 +334,9 @@ def get_push_changes(self):
311
334
if not self .is_versioned_file (path ):
312
335
continue
313
336
337
+ # we use geodiff to check if we can push only diff files
314
338
current_file = self .fpath (path )
315
- origin_file = self .fpath (path , self . meta_dir )
339
+ origin_file = self .fpath_meta (path )
316
340
diff_id = str (uuid .uuid4 ())
317
341
diff_name = path + '-diff-' + diff_id
318
342
diff_file = self .fpath_meta (diff_name )
@@ -332,7 +356,8 @@ def get_push_changes(self):
332
356
else :
333
357
not_updated .append (file )
334
358
except (pygeodiff .GeoDiffLibError , pygeodiff .GeoDiffLibConflictError ) as e :
335
- pass # we do force update
359
+ # changes from wal file already committed
360
+ pass
336
361
337
362
changes ['updated' ] = [f for f in changes ['updated' ] if f not in not_updated ]
338
363
return changes
@@ -477,15 +502,16 @@ def apply_push_changes(self, changes):
477
502
elif k == 'added' :
478
503
shutil .copy (self .fpath (path ), basefile )
479
504
elif k == 'updated' :
480
- # in case for geopackage cannot be created diff
505
+ # in case for geopackage cannot be created diff (e.g. forced update with committed changes from wal file)
481
506
if "diff" not in item :
482
- continue
483
- # better to apply diff to previous basefile to avoid issues with geodiff tmp files
484
- changeset = self .fpath_meta (item ['diff' ]['path' ])
485
- patch_error = self .apply_diffs (basefile , [changeset ])
486
- if patch_error :
487
- # in case of local sync issues it is safier to remove basefile, next time it will be downloaded from server
488
- os .remove (basefile )
507
+ shutil .copy (self .fpath (path ), basefile )
508
+ else :
509
+ # better to apply diff to previous basefile to avoid issues with geodiff tmp files
510
+ changeset = self .fpath_meta (item ['diff' ]['path' ])
511
+ patch_error = self .apply_diffs (basefile , [changeset ])
512
+ if patch_error :
513
+ # in case of local sync issues it is safier to remove basefile, next time it will be downloaded from server
514
+ os .remove (basefile )
489
515
else :
490
516
pass
491
517
@@ -944,10 +970,6 @@ def _push_changes(self, mp, data, parallel):
944
970
with concurrent .futures .ThreadPoolExecutor () as executor :
945
971
futures_map = {}
946
972
for file in upload_files :
947
- # do checkpoint to push changes from wal file to gpkg if there is no diff
948
- if "diff" not in file and mp .is_versioned_file (file ["path" ]):
949
- do_sqlite_checkpoint (mp .fpath (file ["path" ]))
950
- file ["checksum" ] = generate_checksum (mp .fpath (file ["path" ]))
951
973
file ['location' ] = mp .fpath_meta (file ['diff' ]['path' ]) if 'diff' in file else mp .fpath (file ['path' ])
952
974
future = executor .submit (self ._upload_file , info ["transaction" ], file , parallel )
953
975
futures_map [future ] = file
@@ -960,10 +982,6 @@ def _push_changes(self, mp, data, parallel):
960
982
raise ClientError ("Timeout error: failed to upload {}" .format (file ))
961
983
else :
962
984
for file in upload_files :
963
- # do checkpoint to push changes from wal file to gpkg if there is no diff
964
- if "diff" not in file and mp .is_versioned_file (file ["path" ]):
965
- do_sqlite_checkpoint (mp .fpath (file ["path" ]))
966
- file ["checksum" ] = generate_checksum (mp .fpath (file ["path" ]))
967
985
file ['location' ] = mp .fpath_meta (file ['diff' ]['path' ]) if 'diff' in file else mp .fpath (file ['path' ])
968
986
self ._upload_file (info ["transaction" ], file , parallel )
969
987
@@ -1085,6 +1103,7 @@ def _download_file(self, project_path, file, directory, parallel=True, diff_only
1085
1103
}
1086
1104
file_dir = os .path .dirname (os .path .normpath (os .path .join (directory , file ['path' ])))
1087
1105
basename = os .path .basename (file ['diff' ]['path' ]) if diff_only else os .path .basename (file ['path' ])
1106
+ expected_size = file ['diff' ]['size' ] if diff_only else file ['size' ]
1088
1107
1089
1108
if file ['size' ] == 0 :
1090
1109
os .makedirs (file_dir , exist_ok = True )
@@ -1125,7 +1144,7 @@ def download_file_part(part):
1125
1144
shutil .copyfileobj (chunk , final )
1126
1145
os .remove (file_part )
1127
1146
1128
- if os .path .getsize (os .path .join (file_dir , basename )) != file [ 'size' ] :
1147
+ if os .path .getsize (os .path .join (file_dir , basename )) != expected_size :
1129
1148
os .remove (os .path .join (file_dir , basename ))
1130
1149
raise ClientError (f'Download of file { basename } failed. Please try it again.' )
1131
1150
0 commit comments