Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 68291b9

Browse files
authored
[OSC-1301] Split up execution table (#35)
* split up tables * rename DataLoadExecution -> ExecutionModel * rename correlation id to execution id * ensure alembic can downgrade for unit tests * update unit tests * move alembic readme to main * reorder postgres debugging * use constant for foreignkey * split batch execution station and execution status * rename string constant for not started -> started * split up entities * improve logging and column for num models * add citext extension to alembic * log model execution as they partially complete * fail execution if model fails * lockdown gitignore for csv * use sqlalchemy inspect for table names * split up execution/executionmodel status * change string constant from completed succesfully to succesful
1 parent 3c050e2 commit 68291b9

18 files changed

+395
-221
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ __pycache__/
1010
*.py[cod]
1111
*$py.class
1212

13+
# generated csvs
14+
rdl_integration_tests.*.csv
15+
1316
# C extensions
1417
*.so
1518

README.md

Lines changed: 83 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,76 @@ See `./tests/integration_tests/test_*.cmd` scripts for usage samples.
8080

8181
## Development
8282

83+
### Alembic
84+
85+
#### To upgrade to the latest schema
86+
87+
```bash
88+
alembic -c rdl/alembic.ini -x $DESTINATION_DB_URL upgrade head
89+
```
90+
91+
#### Updating the schema
92+
93+
Ensure any new tables inherit from the same Base used in `alembic/env.py`
94+
95+
```python
96+
from rdl.entities import Base
97+
```
98+
99+
Whenever you make a schema change, run
100+
101+
```bash
102+
pip install .
103+
alembic -c rdl/alembic.ini -x $DESTINATION_DB_URL revision -m "$REVISION_MESSAGE" --autogenerate
104+
```
105+
106+
check that the new version in `alembic/versions` is correct
107+
108+
#### Downgrading the schema
109+
110+
Whenever you want to downgrade the schema
111+
112+
```bash
113+
alembic -c rdl/alembic.ini -x $DESTINATION_DB_URL history # see the list of revision ids
114+
alembic -c rdl/alembic.ini -x $DESTINATION_DB_URL current # see the current revision id
115+
alembic -c rdl/alembic.ini -x $DESTINATION_DB_URL downgrade -1 # revert back one revision
116+
alembic -c rdl/alembic.ini -x $DESTINATION_DB_URL downgrade $revision_id # revert back to a revision id, found using the history command
117+
```
118+
119+
#### Inaccurate autogenerated revisions
120+
121+
Does your autogenerated revision not look right?
122+
123+
Try editing the function `use_schema` in `alembic/env.py`, this determines what alembic looks for in the database.
124+
125+
[Relevant Documentation](https://alembic.sqlalchemy.org/en/latest/api/runtime.html?highlight=include_schemas#alembic.runtime.environment.EnvironmentContext.configure.params.include_object)
126+
127+
#### New models aren't showing up in upgrade section
128+
129+
Ensure all model classes inherit from the same Base that `alembic/env.py` imports, and that the following class
130+
properties are set
131+
132+
```python
133+
__tablename__ = 'your_mapped_table_name'
134+
__table_args__ = {'schema': Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME}
135+
```
136+
137+
Also try importing the models into `alembic/env.py`, eg
138+
139+
```python
140+
from rdl.data_load_tracking import DataLoadExecution
141+
```
142+
143+
#### Alembic won't pick up my change
144+
145+
[Alembic only supports some changes](https://alembic.sqlalchemy.org/en/latest/autogenerate.html#what-does-autogenerate-detect-and-what-does-it-not-detect)
146+
147+
Try adding raw sql in the `upgrade()` and `downgrade()` functions of your revision
148+
149+
```python
150+
op.execute(RAW_SQL)
151+
```
152+
83153
### Linting
84154

85155
Use autopep8 before pushing commits (include the "." for the folder)
@@ -109,29 +179,6 @@ Use the following vscode settings by either:
109179

110180
### Testing
111181

112-
### Postgres debugging
113-
114-
Ensure the database you are using is in utf8 mode. You cannot change encoding once the database is created.
115-
116-
```sql
117-
118-
CREATE DATABASE "my_database"
119-
WITH OWNER "postgres"
120-
ENCODING 'UTF8'
121-
TEMPLATE template0;
122-
123-
```
124-
125-
Also ensure that the database has the CITEXT extension by logging into the DB and adding it
126-
127-
```sql
128-
129-
>>>psql my_database
130-
131-
CREATE EXTENSION CITEXT;
132-
133-
```
134-
135182
#### Integration
136183

137184
The test batch files assume there is a user by the name of `postgres` on the system.
@@ -159,6 +206,19 @@ _Execution:_
159206

160207
Execution is as simply as `python3 run_tests.py`
161208

209+
### Postgres debugging
210+
211+
Ensure the database you are using is in utf8 mode. You cannot change encoding once the database is created.
212+
213+
```sql
214+
215+
CREATE DATABASE "my_database"
216+
WITH OWNER "postgres"
217+
ENCODING 'UTF8'
218+
TEMPLATE template0;
219+
220+
```
221+
162222
### `Destination.Type` Values
163223

164224
The destination.type value controls both the data reader type and the destination column type. These are implemented in ColumnTypeResolver.py.

appveyor.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ build_script:
3939
#Setup the target PostgreSQL database
4040
- psql -c "SELECT VERSION()"
4141
- createdb %DBNAME%
42-
- psql -d %DBNAME% -c "CREATE EXTENSION IF NOT EXISTS citext"
4342
- C:\projects\relational-data-loader\venv\Scripts\activate.bat
4443
#Install the dependencies for rdl.
4544
- pip install .
@@ -73,3 +72,5 @@ test_script:
7372
on_finish:
7473
#Enable this line to make the build pause after completion for RDP troubleshooting.
7574
#- ps: $blockRdp = $true; iex ((new-object net.webclient).DownloadString('https://github.com/appveyor/ci/master/scripts/enable-rdp.ps1'))
75+
76+
- alembic -c rdl/alembic.ini -x postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests downgrade base

rdl/DataLoadManager.py

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@ def __init__(self, configuration_path, source_db, target_db, data_load_tracker_r
2121
self.source_db = source_db
2222
self.target_db = target_db
2323
self.data_load_tracker_repository = data_load_tracker_repository
24-
self.correlation_id = uuid.uuid4()
2524
self.model_pattern = '**/{model_name}.json'
2625
self.all_model_pattern = self.model_pattern.format(model_name='*')
2726

2827
def start_imports(self, force_full_refresh_models):
29-
self.logger.info(f"Starting Execution ID: '{self.correlation_id}'")
30-
execution_start_time = datetime.now()
28+
self.execution_id = self.data_load_tracker_repository.create_execution()
3129

3230
model_folder = Path(self.configuration_path)
3331
if not model_folder.is_dir():
@@ -60,19 +58,7 @@ def start_imports(self, force_full_refresh_models):
6058
model_number += 1 # avoid all_model_names.index(model_name) due to linear time-complexity in list length
6159
self.start_single_import(model_file, request_full_refresh, model_number, total_number_of_models)
6260

63-
self.logger.info("Execution completed.")
64-
execution_end_time = datetime.now()
65-
total_execution_seconds = int((execution_end_time - execution_start_time).total_seconds())
66-
execution_hours = total_execution_seconds // 3600
67-
execution_minutes = (total_execution_seconds // 60) % 60
68-
execution_seconds = total_execution_seconds % 60
69-
total_number_of_rows_processed = self.data_load_tracker_repository.get_execution_rows(self.correlation_id)
70-
self.logger.info(
71-
f"Completed Execution ID: {self.correlation_id}"
72-
f"; Models Processed: {total_number_of_models:,}"
73-
f"; Rows Processed: {total_number_of_rows_processed:,}"
74-
f"; Execution Time: {execution_hours}h {execution_minutes}m {execution_seconds}s"
75-
f"; Average rows processed per second: {(total_number_of_rows_processed//max(total_execution_seconds, 1)):,}.")
61+
self.data_load_tracker_repository.complete_execution(self.execution_id, total_number_of_models)
7662

7763
def start_single_import(self, model_file, requested_full_refresh, model_number, total_number_of_models):
7864
model_name = model_file.stem
@@ -128,9 +114,9 @@ def start_single_import(self, model_file, requested_full_refresh, model_number,
128114
if full_refresh:
129115
self.logger.info(f"Performing full refresh for reason '{full_refresh_reason}'")
130116

131-
data_load_tracker = DataLoadTracker(self.correlation_id, model_name, model_checksum, model_config,
117+
data_load_tracker = DataLoadTracker(self.execution_id, model_name, model_checksum, model_config,
132118
full_refresh, full_refresh_reason, change_tracking_info)
133-
119+
self.data_load_tracker_repository.create_execution_model(data_load_tracker)
134120
destination_table_manager.create_schema(model_config['target_schema'])
135121

136122
self.logger.debug(f"Recreating the staging table {model_config['target_schema']}."
@@ -158,7 +144,8 @@ def start_single_import(self, model_file, requested_full_refresh, model_number,
158144
batch_data_loader.load_batch(batch_key_tracker)
159145
except SensitiveDataError as e:
160146
data_load_tracker.data_load_failed(e.sensitive_error_args)
161-
self.data_load_tracker_repository.save(data_load_tracker)
147+
self.data_load_tracker_repository.save_execution_model(data_load_tracker)
148+
self.data_load_tracker_repository.fail_execution(self.execution_id, model_number)
162149
raise e
163150

164151
if full_refresh:
@@ -177,10 +164,9 @@ def start_single_import(self, model_file, requested_full_refresh, model_number,
177164
destination_table_manager.drop_table(model_config['target_schema'],
178165
model_config['stage_table'])
179166
data_load_tracker.data_load_successful()
180-
self.data_load_tracker_repository.save(data_load_tracker)
181167
self.logger.info(f"{model_number:0{max_model_number_len}d} of {total_number_of_models}"
182-
f" COMPLETED {model_name},"
183-
f" {data_load_tracker.get_statistics()}")
168+
f" COMPLETED {model_name}")
169+
self.data_load_tracker_repository.save_execution_model(data_load_tracker)
184170

185171
@staticmethod
186172
def is_full_refresh(*,

rdl/alembic/README.md

Lines changed: 0 additions & 73 deletions
This file was deleted.

rdl/alembic/env.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from sqlalchemy import pool
77

88
from alembic import context
9-
from rdl.data_load_tracking.DataLoadExecution import Base
9+
from rdl.entities import Base
1010
from rdl.shared import Constants
1111

1212
# this is the Alembic Config object, which provides

rdl/alembic/versions/0d4a3ce9c0a9_add_failure_reason_column_to_data_load_.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
def upgrade():
2020
# ### commands auto generated by Alembic - please adjust! ###
21-
op.add_column('data_load_execution', sa.Column('failure_reason', sa.String(length=1000), nullable=True), schema='rdl')
21+
op.add_column('data_load_execution', sa.Column('failure_reason',
22+
sa.String(length=1000), nullable=True), schema='rdl')
2223
# ### end Alembic commands ###
2324

2425

rdl/alembic/versions/710e28aa5978_add_data_load_execution_table.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
def upgrade():
2020
# ### commands auto generated by Alembic - please adjust! ###
2121
op.execute('CREATE SCHEMA IF NOT EXISTS rdl')
22+
op.execute('CREATE EXTENSION IF NOT EXISTS CITEXT')
2223
op.create_table('data_load_execution',
2324
sa.Column('id', sa.Integer(), nullable=False),
2425
sa.Column('correlation_id', postgresql.UUID(as_uuid=True), nullable=True),

0 commit comments

Comments
 (0)