Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 2ac1331

Browse files
authored
Merge pull request #6 from PageUpPeopleOrg/OSC-974_CompoundPKs
Added correct handling of compound primary keys
2 parents 5da36e0 + cd62873 commit 2ac1331

File tree

10 files changed

+165
-39
lines changed

10 files changed

+165
-39
lines changed

appveyor.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ build_script:
3434
#Setup the source MSSQL database
3535
- sqlcmd -b -E -S "(local)\SQL2016" -i .\integration_tests\mssql_source\source_database_setup\create_database.sql
3636
- sqlcmd -b -E -f 65001 -S "(local)\SQL2016" -d RelationalDataLoaderIntegrationTestSource -i .\integration_tests\mssql_source\source_database_setup\create_large_table.sql
37+
- sqlcmd -b -E -f 65001 -S "(local)\SQL2016" -d RelationalDataLoaderIntegrationTestSource -i .\integration_tests\mssql_source\source_database_setup\create_compound_pk.sql
3738

3839
#Setup the target PostgreSQL database
3940
- psql -c "SELECT VERSION()"

integration_tests/csv_source/config/ColumnTest.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"source_table": {
44
"name": "ColumnTest",
55
"schema": "dbo",
6-
"primary_key": "id"
6+
"primary_keys": ["id"]
77
},
88
"target_schema": "rdl_integration_tests",
99
"stage_table": "stage_source_data",
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"source_table": {
3+
"name": "CompoundPk",
4+
"schema": "dbo",
5+
"primary_keys": ["Id1","Id2"]
6+
},
7+
"target_schema": "rdl_integration_tests",
8+
"stage_table": "stage_compound_pk",
9+
"load_table": "load_compound_pk",
10+
11+
"batch": {
12+
"size": 100000
13+
},
14+
"columns": [
15+
{
16+
"source_name": "Id1",
17+
"destination": {
18+
"name": "id_1",
19+
"type": "int",
20+
"nullable": false,
21+
"primary_key": true
22+
}
23+
},
24+
{
25+
"source_name": "Id2",
26+
"destination": {
27+
"name": "id_2",
28+
"type": "int",
29+
"nullable": false,
30+
"primary_key": true
31+
}
32+
}
33+
34+
]
35+
}

integration_tests/mssql_source/config/LargeTableTest.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"source_table": {
33
"name": "LargeTable",
44
"schema": "dbo",
5-
"primary_key": "Id"
5+
"primary_keys": ["Id"]
66
},
77
"target_schema": "rdl_integration_tests",
88
"stage_table": "stage_large_data",
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
IF object_id('CompoundPk') IS NULL
2+
CREATE TABLE CompoundPk (
3+
Id1 INT,
4+
Id2 INT ,
5+
CONSTRAINT PK_CompoundPK PRIMARY KEY (Id1, Id2))
6+
ELSE
7+
TRUNCATE TABLE CompoundPk
8+
9+
INSERT CompoundPk
10+
(
11+
Id1,
12+
Id2
13+
)
14+
SELECT 1,1
15+
UNION ALL
16+
SELECT 1,2
17+
UNION ALL
18+
SELECT 2,2
19+
UNION ALL
20+
SELECT 2,1
21+
22+
23+

modules/BatchDataLoader.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,29 @@ def __init__(self, data_source, source_table_configuration, target_schema, targe
2020
self.change_tracking_info = change_tracking_info
2121

2222
# Imports rows, returns True if >0 rows were found
23-
def load_batch(self, previous_batch_key):
23+
def load_batch(self, batch_key_tracker):
2424
batch_tracker = self.data_load_tracker.start_batch()
2525

26-
self.logger.debug("ImportBatch Starting from previous_batch_key: {0}. Full Refresh: {1} this_sync_version: {2}".format(previous_batch_key, self.full_refresh, self.change_tracking_info.this_sync_version))
26+
self.logger.debug("ImportBatch Starting from previous_batch_key: {0}. Full Refresh: {1} this_sync_version: {2}".format(batch_key_tracker.bookmarks, self.full_refresh, self.change_tracking_info.this_sync_version))
2727

2828
data_frame = self.data_source.get_next_data_frame(self.source_table_configuration, self.columns,
29-
self.batch_configuration, batch_tracker, previous_batch_key,
29+
self.batch_configuration, batch_tracker, batch_key_tracker,
3030
self.full_refresh, self.change_tracking_info)
3131

3232
if data_frame is None or len(data_frame) == 0:
33-
self.logger.debug("There are no rows to import, returning -1")
33+
self.logger.debug("There are no more rows to import.")
3434
batch_tracker.load_skipped_due_to_zero_rows()
35-
return -1
35+
batch_key_tracker.has_more_data = False
36+
return
3637

3738
data_frame = self.attach_column_transformers(data_frame)
3839
self.write_data_frame_to_table(data_frame)
3940
batch_tracker.load_completed_successfully()
4041

41-
last_key_returned = data_frame.iloc[-1][self.source_table_configuration['primary_key']]
42+
for primary_key in batch_key_tracker.primary_keys:
43+
batch_key_tracker.set_bookmark(primary_key, data_frame.iloc[-1][primary_key])
4244

43-
self.logger.info("Batch key {0} Completed. {1}".format(last_key_returned, batch_tracker.get_statistics()))
44-
return last_key_returned
45+
self.logger.info("Batch keys {0} Completed. {1}".format(batch_key_tracker.bookmarks, batch_tracker.get_statistics()))
4546

4647
def write_data_frame_to_table(self, data_frame):
4748
qualified_target_table = "{0}.{1}".format(self.target_schema, self.target_table)

modules/BatchKeyTracker.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
2+
class BatchKeyTracker(object):
3+
4+
def __init__(self, primary_keys):
5+
self.primary_keys = primary_keys
6+
self.has_more_data = True
7+
self.bookmarks = {}
8+
9+
for primary_key in primary_keys:
10+
self.bookmarks[primary_key] = 0
11+
12+
def set_bookmark(self, key, value):
13+
self.bookmarks[key] = value
14+
15+

modules/DataLoadManager.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from modules.BatchDataLoader import BatchDataLoader
88
from modules.DestinationTableManager import DestinationTableManager
99
from modules.data_load_tracking.DataLoadTracker import DataLoadTracker
10-
10+
from modules.BatchKeyTracker import BatchKeyTracker
1111

1212
class DataLoadManager(object):
1313
def __init__(self, configuration_path, data_source, data_load_tracker_repository, logger=None):
@@ -97,9 +97,10 @@ def start_single_import(self, target_engine, model_name, requested_full_refresh)
9797
full_refresh,
9898
change_tracking_info)
9999

100-
previous_unique_column_value = 0
101-
while previous_unique_column_value > -1:
102-
previous_unique_column_value = batch_data_loader.load_batch(previous_unique_column_value)
100+
101+
batch_key_tracker = BatchKeyTracker(pipeline_configuration['source_table']['primary_keys']);
102+
while batch_key_tracker.has_more_data:
103+
batch_data_loader.load_batch(batch_key_tracker)
103104

104105
if full_refresh:
105106
# Rename the stage table to the load table.

modules/data_sources/CsvDataSource.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ def assert_column_exists(self, column_name, data_frame, csv_file):
4343

4444

4545
# For now, the CSV data sources will get all rows in the CSV regardless of batch size. - Ie, they don't currently support paging.
46-
def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key, full_refresh, change_tracking_info):
46+
def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, batch_key_tracker, full_refresh, change_tracking_info):
4747

48-
if previous_batch_key > 0:
48+
# There is no incremental loading in CSV - therefore, we will check if we have loaded data before in that run
49+
# if we have, we have loaded all the data.
50+
if batch_key_tracker.bookmarks[batch_key_tracker.primary_keys[0]] > 0:
4951
return None
5052

5153
csv_file = os.path.abspath(self.source_path / "{0}.csv".format(table_configuration['name']))

modules/data_sources/MsSqlDataSource.py

Lines changed: 71 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from modules.data_sources.ChangeTrackingInfo import ChangeTrackingInfo
99
from sqlalchemy.sql import text
1010

11+
1112
class MsSqlDataSource(object):
1213

1314
def __init__(self, connection_string, logger=None):
@@ -31,32 +32,40 @@ def prefix_column(column_name, full_refresh, primary_key_column_name):
3132
else:
3233
return "t.{0}".format(column_name)
3334

34-
def build_select_statement(self, table_configuration, columns, batch_configuration, previous_batch_key, full_refresh, change_tracking_info):
35-
column_array = list(map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_configuration['primary_key']), columns))
35+
def build_select_statement(self, table_configuration, columns, batch_configuration, batch_key_tracker, full_refresh,
36+
change_tracking_info):
37+
column_array = list(
38+
map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_configuration['primary_keys']),
39+
columns))
3640
column_names = ", ".join(column_array)
41+
3742
if full_refresh:
38-
return "SELECT TOP ({0}) {1} FROM {2}.{3} t WHERE t.{4} > {5} ORDER BY t.{4}".format(batch_configuration['size'],
39-
column_names,
40-
table_configuration[
41-
'schema'],
42-
table_configuration[
43-
'name'],
44-
table_configuration[
45-
'primary_key'],
46-
previous_batch_key)
43+
order_by = ", t.".join(table_configuration['primary_keys'])
44+
45+
return "SELECT TOP ({0}) {1} FROM {2}.{3} t WHERE {4} ORDER BY {5}".format(batch_configuration['size'],
46+
column_names,
47+
table_configuration[
48+
'schema'],
49+
table_configuration[
50+
'name'],
51+
self.build_where_clause(batch_key_tracker, "t"),
52+
order_by)
4753
else:
54+
order_by = ", chg.".join(table_configuration['primary_keys'])
55+
4856
sql_builder = io.StringIO()
4957
sql_builder.write("SELECT TOP ({0}) {1}, ".format(batch_configuration['size'], column_names))
50-
sql_builder.write("chg.SYS_CHANGE_VERSION as data_pipeline_change_version, CASE chg.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 END as data_pipeline_is_deleted \n")
58+
sql_builder.write(
59+
"chg.SYS_CHANGE_VERSION as data_pipeline_change_version, CASE chg.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 END as data_pipeline_is_deleted \n")
5160
sql_builder.write("FROM CHANGETABLE(CHANGES {0}.{1}, {2}) chg ".format(table_configuration['schema'],
5261
table_configuration['name'],
5362
change_tracking_info.this_sync_version))
54-
sql_builder.write(" LEFT JOIN {0}.{1} t on chg.{2} = t.{2} ".format( table_configuration['schema'],
55-
table_configuration['name'],
56-
table_configuration['primary_key'],))
63+
sql_builder.write(" LEFT JOIN {0}.{1} t on {2} ".format(table_configuration['schema'],
64+
table_configuration['name'],
65+
self.build_change_table_on_clause(batch_key_tracker)))
5766

58-
sql_builder.write("WHERE chg.{0} > {1} ORDER BY chg.{0}".format(table_configuration['primary_key'],
59-
previous_batch_key))
67+
sql_builder.write("WHERE {0}".format(self.build_where_clause(batch_key_tracker, "t")))
68+
sql_builder.write("ORDER BY {0}".format(order_by))
6069

6170
return sql_builder.getvalue()
6271

@@ -65,7 +74,8 @@ def assert_data_source_is_valid(self, table_configuration, configured_columns):
6574
columns_in_database = self.get_table_columns(table_configuration)
6675

6776
for column in configured_columns:
68-
self.assert_column_exists(column['source_name'], columns_in_database, "{0}.{1}".format(table_configuration['schema'], table_configuration['name']))
77+
self.assert_column_exists(column['source_name'], columns_in_database,
78+
"{0}.{1}".format(table_configuration['schema'], table_configuration['name']))
6979

7080
def assert_column_exists(self, column_name, columns_in_database, table_name):
7181
if column_name in columns_in_database:
@@ -82,9 +92,10 @@ def get_table_columns(self, table_configuration):
8292
autoload_with=self.database_engine)
8393
return list(map(lambda column: column.name, table.columns))
8494

85-
86-
def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key, full_refresh, change_tracking_info):
87-
sql = self.build_select_statement(table_configuration, columns, batch_configuration, previous_batch_key, full_refresh, change_tracking_info,)
95+
def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, batch_key_tracker,
96+
full_refresh, change_tracking_info):
97+
sql = self.build_select_statement(table_configuration, columns, batch_configuration, batch_key_tracker,
98+
full_refresh, change_tracking_info, )
8899
self.logger.debug("Starting read of SQL Statement: {0}".format(sql))
89100
data_frame = pandas.read_sql_query(sql, self.database_engine)
90101

@@ -111,9 +122,11 @@ def init_change_tracking(self, table_configuration, last_sync_version):
111122
sql_builder.write("DECLARE @last_sync_version bigint = {0}; \n".format(last_sync_version))
112123
sql_builder.write("DECLARE @this_sync_version bigint = 0; \n")
113124
sql_builder.write("DECLARE @next_sync_version bigint = CHANGE_TRACKING_CURRENT_VERSION(); \n")
114-
sql_builder.write("IF @last_sync_version >= CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('{0}.{1}'))\n".format(table_configuration['schema'],table_configuration['name']))
125+
sql_builder.write("IF @last_sync_version >= CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('{0}.{1}'))\n".format(
126+
table_configuration['schema'], table_configuration['name']))
115127
sql_builder.write(" SET @this_sync_version = @last_sync_version; \n")
116-
sql_builder.write(" SELECT @next_sync_version as next_sync_version, @this_sync_version as this_sync_version; \n")
128+
sql_builder.write(
129+
" SELECT @next_sync_version as next_sync_version, @this_sync_version as this_sync_version; \n")
117130

118131
self.logger.debug("Getting ChangeTrackingInformation for {0}.{1}. {2}".format(table_configuration['schema'],
119132
table_configuration['name'],
@@ -125,3 +138,38 @@ def init_change_tracking(self, table_configuration, last_sync_version):
125138

126139
force_full_load = bool(row["this_sync_version"] == 0 or row["next_sync_version"] == 0)
127140
return ChangeTrackingInfo(row["this_sync_version"], row["next_sync_version"], force_full_load)
141+
142+
@staticmethod
143+
def build_where_clause(batch_key_tracker, table_alias):
144+
has_value = False
145+
146+
try:
147+
sql_builder = io.StringIO()
148+
for primary_key in batch_key_tracker.bookmarks:
149+
if has_value:
150+
sql_builder.write(" AND ")
151+
152+
sql_builder.write(
153+
" {0}.{1} > {2}".format(table_alias, primary_key, batch_key_tracker.bookmarks[primary_key]))
154+
has_value = True
155+
156+
return sql_builder.getvalue()
157+
finally:
158+
sql_builder.close()
159+
160+
@staticmethod
161+
def build_change_table_on_clause(batch_key_tracker):
162+
has_value = False
163+
164+
try:
165+
sql_builder = io.StringIO()
166+
for primary_key in batch_key_tracker.bookmarks:
167+
if has_value:
168+
sql_builder.write(" AND ")
169+
170+
sql_builder.write(" chg.{0} = t.{0}".format(primary_key))
171+
has_value = True
172+
173+
return sql_builder.getvalue()
174+
finally:
175+
sql_builder.close()

0 commit comments

Comments
 (0)