Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 3c632aa

Browse files
author
Sean Budd
authored
[OSC-1366] Skip on no changes (#40)
* skip if there's no changes * check if data changed to avoid reloading * ensure sync versions are valid before skipping
1 parent 4d134b8 commit 3c632aa

File tree

5 files changed

+47
-6
lines changed

5 files changed

+47
-6
lines changed

rdl/DataLoadManager.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,13 @@ def start_single_import(self, model_file, requested_full_refresh, model_number,
119119
self.data_load_tracker_repository.create_execution_model(data_load_tracker)
120120
destination_table_manager.create_schema(model_config['target_schema'])
121121

122+
skip_incremental_reason, skip_incremental = DataLoadManager.check_skip_incremental(change_tracking_info)
123+
if not full_refresh and skip_incremental:
124+
self.logger.info(f"Skipping incremental refresh for reason '{skip_incremental_reason}'")
125+
data_load_tracker.data_load_successful()
126+
self.data_load_tracker_repository.save_execution_model(data_load_tracker)
127+
return
128+
122129
self.logger.debug(f"Recreating the staging table {model_config['target_schema']}."
123130
f"{model_config['stage_table']}")
124131
destination_table_manager.create_table(model_config['target_schema'],
@@ -168,6 +175,15 @@ def start_single_import(self, model_file, requested_full_refresh, model_number,
168175
f" COMPLETED {model_name}")
169176
self.data_load_tracker_repository.save_execution_model(data_load_tracker)
170177

178+
@staticmethod
179+
def check_skip_incremental(change_tracking_info):
180+
if change_tracking_info.last_sync_version and change_tracking_info.sync_version and (
181+
change_tracking_info.last_sync_version == change_tracking_info.sync_version):
182+
return Constants.IncrementalSkipReason.SYNC_VERSIONS_ARE_EQUAL, True
183+
if not change_tracking_info.data_changed_since_last_sync:
184+
return Constants.IncrementalSkipReason.NO_DATA_CHANGED, True
185+
return Constants.IncrementalSkipReason.NOT_APPLICABLE, False
186+
171187
@staticmethod
172188
def is_full_refresh(*,
173189
user_requested,
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
class ChangeTrackingInfo:
2-
def __init__(self, last_sync_version, sync_version, force_full_load):
2+
def __init__(self, last_sync_version, sync_version, force_full_load, data_changed_since_last_sync):
33
self.last_sync_version = last_sync_version
44
self.sync_version = sync_version
55
self.force_full_load = force_full_load
6+
self.data_changed_since_last_sync = data_changed_since_last_sync

rdl/data_sources/CsvDataSource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,4 @@ def get_next_data_frame(
7676
return data_frame
7777

7878
def init_change_tracking(self, table_config, last_sync_version):
79-
return ChangeTrackingInfo(0, 0, False)
79+
return ChangeTrackingInfo(0, 0, False, True)

rdl/data_sources/MsSqlDataSource.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ def init_change_tracking(self, table_config, last_known_sync_version):
173173
# c) whether a full refresh is needed - this is a derivative of the validity of the last known sync version
174174
# because if the last known sync version is no longer valid, then our target data is in-an-invalid-state /
175175
# out-of-sync and a full refresh must be forced to sync the data.
176+
# d) whether any data has changed for the table synce the last sync - this is a derivative of the results of
177+
# the CHANGETABLE() call, which we perform later and join to the table to load in the changed rows.
176178
#
177179
# the following help us determining the above:
178180
# a) sync_version: the current version of change tracking at source database.
@@ -183,6 +185,8 @@ def init_change_tracking(self, table_config, last_known_sync_version):
183185
# c) min_valid_version: the minimum version that is valid for use in obtaining change tracking information from
184186
# the specified table.
185187
# it's value IS sourced from CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(..)).
188+
# d) data_changed_since_last_sync: whether or not data has changed in the table since last_known_sync_version
189+
# it's value is sourced from CHANGETABLE(table, last_known_sync_version).
186190

187191
get_change_tracking_info_sql = f"" \
188192
f"DECLARE @sync_version BIGINT = CHANGE_TRACKING_CURRENT_VERSION(); \n" \
@@ -193,6 +197,7 @@ def init_change_tracking(self, table_config, last_known_sync_version):
193197
f" CASE WHEN @last_known_sync_version >= @min_valid_version THEN 1 ELSE 0 END; \n" \
194198
f"DECLARE @last_sync_version BIGINT; \n" \
195199
f"DECLARE @force_full_load BIT; \n" \
200+
f"DECLARE @data_changed_since_last_sync BIT; \n" \
196201
f" \n" \
197202
f"IF @last_known_sync_version_is_valid = 1 \n" \
198203
f"BEGIN \n" \
@@ -205,17 +210,32 @@ def init_change_tracking(self, table_config, last_known_sync_version):
205210
f" SET @last_sync_version = 0; \n" \
206211
f"END \n" \
207212
f" \n" \
208-
f"SELECT @sync_version AS sync_version \n" \
209-
f", @last_sync_version AS last_sync_version \n" \
210-
f", @force_full_load AS force_full_load; \n"
213+
f"IF EXISTS ( " \
214+
f" SELECT 1" \
215+
f" FROM CHANGETABLE(CHANGES " \
216+
f" {table_config['schema']}.{table_config['name']}, {last_known_sync_version} ) " \
217+
f" as c )" \
218+
f"BEGIN \n" \
219+
f" SET @data_changed_since_last_sync = 1; \n" \
220+
f"END \n" \
221+
f"ELSE \n" \
222+
f"BEGIN \n" \
223+
f" SET @data_changed_since_last_sync = 0; \n" \
224+
f"END \n" \
225+
f" \n" \
226+
f"SELECT @sync_version AS sync_version \n" \
227+
f", @last_sync_version AS last_sync_version \n" \
228+
f", @force_full_load AS force_full_load \n" \
229+
f", @data_changed_since_last_sync AS data_changed_since_last_sync; \n"
211230

212231
self.logger.debug(f"Getting ChangeTracking info for {table_config['schema']}.{table_config['name']}.\n"
213232
f"{get_change_tracking_info_sql}")
214233

215234
result = self.database_engine.execute(text(get_change_tracking_info_sql))
216235
row = result.fetchone()
217236

218-
return ChangeTrackingInfo(row["last_sync_version"], row["sync_version"], row["force_full_load"])
237+
return ChangeTrackingInfo( row["last_sync_version"], row["sync_version"],
238+
row["force_full_load"], row["data_changed_since_last_sync"])
219239

220240
@staticmethod
221241
def build_where_clause(batch_key_tracker, table_alias):

rdl/shared/Constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ class FullRefreshReason:
1010
MODEL_CHANGED = 'Model Changed'
1111
INVALID_CHANGE_TRACKING = 'Change Tracking Invalid'
1212

13+
class IncrementalSkipReason:
14+
NOT_APPLICABLE = 'N/A'
15+
SYNC_VERSIONS_ARE_EQUAL = 'last_sync_version is the same as sync_version'
16+
NO_DATA_CHANGED = 'No data has changed since last sync'
1317

1418
class ExecutionStatus:
1519
STARTED = 'Started'

0 commit comments

Comments
 (0)