Skip to content

Commit

Permalink
Merge pull request #892 from dyvenia/dev
Browse files Browse the repository at this point in the history
Release 0.4.26 PR
  • Loading branch information
Rafalz13 committed Apr 11, 2024
2 parents 8d12bc1 + d4a800d commit 1cb1a4b
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 144 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

### Removed


## [0.4.26] - 2024-04-11
### Added
- Added option for `SAP RFC` connector to get credentials from Azure KeyVault or directly passing dictionary inside flow.

### Fixed
- Fixed the `if_exists` parameter definition in the `CreateTableFromBlob` task.
- Changed `requirements.txt` to level up version of `dbt-sqlserver` in order to fix bug with `MAXRECURSION` error in dbt_run.

### Changed
- Changed `dbt-sqlserver` version to `git+https://github.com/djagoda881/dbt-sqlserver.git@v1.3.latest_option_clause`.

### Removed
- Removed `dbt-core==1.3.2` from `requirements.txt`.
- Removed copying files to conformed/ and operational/ directories when running `ADLSTOAzureSQL` flow.

## [0.4.25] - 2024-01-30
### Added
Expand Down
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ aiolimiter==1.0.0
protobuf>=3.19.0, <3.20
avro-python3==1.10.2
pygit2>=1.10.1, <1.11.0
dbt-core==1.3.2
dbt-sqlserver==1.3.1
dbt-sqlserver @ git+https://github.com/djagoda881/dbt-sqlserver.git@v1.3.latest_option_clause
lumaCLI==0.0.19
Office365-REST-Python-Client==2.4.4
TM1py==1.11.3
Expand Down
45 changes: 0 additions & 45 deletions tests/integration/flows/test_adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,51 +9,6 @@
from viadot.flows.adls_to_azure_sql import check_dtypes_sort, df_to_csv_task


def test_get_promoted_adls_path_csv_file():
adls_path_file = "raw/supermetrics/adls_ga_load_times_fr_test/2021-07-14T13%3A09%3A02.997357%2B00%3A00.csv"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_file)
promoted_path = flow.get_promoted_path(env="conformed")
assert (
promoted_path
== "conformed/supermetrics/adls_ga_load_times_fr_test/2021-07-14T13%3A09%3A02.997357%2B00%3A00.csv"
)


def test_get_promoted_adls_path_parquet_file():
adls_path_file = "raw/supermetrics/adls_ga_load_times_fr_test/2021-07-14T13%3A09%3A02.997357%2B00%3A00.parquet"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_file)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_get_promoted_adls_path_file_starts_with_slash():
adls_path_dir_starts_with_slash = "/raw/supermetrics/adls_ga_load_times_fr_test/"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_starts_with_slash)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_get_promoted_adls_path_dir_slash():
adls_path_dir_slash = "raw/supermetrics/adls_ga_load_times_fr_test/"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_slash)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_get_promoted_adls_path_dir():
adls_path_dir = "raw/supermetrics/adls_ga_load_times_fr_test"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_get_promoted_adls_path_dir_starts_with_slash():
adls_path_dir_starts_with_slash = "/raw/supermetrics/adls_ga_load_times_fr_test/"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_starts_with_slash)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_df_to_csv_task():
d = {"col1": ["rat", "\tdog"], "col2": ["cat", 4]}
df = pd.DataFrame(data=d)
Expand Down
47 changes: 43 additions & 4 deletions tests/integration/tasks/test_sap_rfc_to_df.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,52 @@
import pytest
import logging

from viadot.exceptions import CredentialError
from viadot.config import local_config
from viadot.tasks import SAPRFCToDF


def test_sap_rfc_to_df_bbp():
sap_test_creds = local_config.get("SAP").get("QA")
task = SAPRFCToDF(
credentials=sap_test_creds,
query="SELECT MATNR, MATKL, MTART, LAEDA FROM MARA WHERE LAEDA LIKE '2022%'",
query="SELECT MATNR, MATKL, MTART, LAEDA FROM MARA WHERE LAEDA LIKE '20220110%'",
func="BBP_RFC_READ_TABLE",
)
df = task.run()
df = task.run(sap_credentials_key="SAP", env="QA")
assert len(df.columns) == 4 and not df.empty


def test_sap_rfc_to_df_wrong_sap_credential_key_bbp(caplog):
task = SAPRFCToDF(
query="SELECT MATNR, MATKL, MTART, LAEDA FROM MARA WHERE LAEDA LIKE '20220110%'",
func="BBP_RFC_READ_TABLE",
)
with pytest.raises(
CredentialError,
match="sap_credentials_key: SAP_test is not stored neither in KeyVault or local config!",
):
task.run(
sap_credentials_key="SAP_test",
)
assert (
f"Getting credentials from Azure Key Vault was not possible. Either there is no key: SAP_test or env: DEV or there is not Key Vault in your environment."
in caplog.text
)


def test_sap_rfc_to_df_wrong_env_bbp(caplog):
task = SAPRFCToDF(
query="SELECT MATNR, MATKL, MTART, LAEDA FROM MARA WHERE LAEDA LIKE '20220110%'",
func="BBP_RFC_READ_TABLE",
)
with pytest.raises(
CredentialError,
match="Missing PROD_test credentials!",
):
task.run(
sap_credentials_key="SAP",
env="PROD_test",
)
assert (
f"Getting credentials from Azure Key Vault was not possible. Either there is no key: SAP or env: PROD_test or there is not Key Vault in your environment."
in caplog.text
)
83 changes: 80 additions & 3 deletions tests/integration/test_sap_rfc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from collections import OrderedDict
import pytest
import logging

from collections import OrderedDict
from viadot.sources import SAPRFC, SAPRFCV2
from viadot.exceptions import CredentialError

sap = SAPRFC()
sap2 = SAPRFCV2()
Expand Down Expand Up @@ -192,14 +195,88 @@ def test___build_pandas_filter_query_v2():
def test_default_credentials_warning_SAPRFC(caplog):
_ = SAPRFC()
assert (
"Your credentials will use DEV environment. If you would like to use different one - please specified it."
f"Your credentials will use DEV environment from local config. If you would like to use different one - please specified it in env parameter"
in caplog.text
)


def test_default_credentials_warning_SAPRFCV2(caplog):
_ = SAPRFCV2()
assert (
"Your credentials will use DEV environment. If you would like to use different one - please specified it."
f"Your credentials will use DEV environment from local config. If you would like to use different one - please specified it in env parameter"
in caplog.text
)


def test_credentials_dictionary_wrong_key_warning_SAPRFC(caplog):
_ = SAPRFC(
credentials={
"sysnr_test": "test",
"user": "test",
"passwd": "test",
"ashost": "test",
}
)
assert (
f"Required key 'sysnr' not found in your 'credentials' dictionary!"
in caplog.text
)
assert (
f"Your credentials will use DEV environment from local config. If you would like to use different one - please specified it in env parameter"
in caplog.text
)


def test_credentials_dictionary_wrong_key_warning_SAPRFCV2(caplog):
_ = SAPRFCV2(
credentials={
"sysnr_test": "test",
"user": "test",
"passwd": "test",
"ashost": "test",
}
)
assert (
f"Required key 'sysnr' not found in your 'credentials' dictionary!"
in caplog.text
)
assert (
f"Your credentials will use DEV environment from local config. If you would like to use different one - please specified it in env parameter"
in caplog.text
)


def test_sap_credentials_key_wrong_value_error_SAPRFC(caplog):
with pytest.raises(
CredentialError,
match="sap_credentials_key: SAP_test is not stored neither in KeyVault or local config!",
):
with caplog.at_level(logging.ERROR):
_ = SAPRFC(sap_credentials_key="SAP_test")


def test_sap_credentials_key_wrong_value_error_SAPRFCV2(caplog):
with pytest.raises(
CredentialError,
match="sap_credentials_key: SAP_test is not stored neither in KeyVault or local config!",
):
with caplog.at_level(logging.ERROR):
_ = SAPRFC(sap_credentials_key="SAP_test")


def test_env_wrong_value_error_SAPRFC(caplog):
with pytest.raises(
CredentialError,
match="Missing PROD_test credentials!",
):
with caplog.at_level(logging.ERROR):
_ = SAPRFC(env="PROD_test")


def test_env_wrong_value_error_SAPRFCV2(caplog):
with pytest.raises(
CredentialError,
match="Missing PROD_test credentials!",
):
with caplog.at_level(logging.ERROR):
_ = SAPRFC(env="PROD_test")
2 changes: 1 addition & 1 deletion tests/test_viadot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


def test_version():
assert __version__ == "0.4.25"
assert __version__ == "0.4.26"
2 changes: 1 addition & 1 deletion viadot/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.25"
__version__ = "0.4.26"
40 changes: 2 additions & 38 deletions viadot/flows/adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,6 @@ def __init__(
self.overwrite_adls = overwrite_adls
self.if_empty = if_empty
self.adls_sp_credentials_secret = adls_sp_credentials_secret
self.adls_path_conformed = self.get_promoted_path(env="conformed")
self.adls_path_operations = self.get_promoted_path(env="operations")

# AzureSQLCreateTable
self.table = table
Expand Down Expand Up @@ -257,20 +255,6 @@ def _map_if_exists(if_exists: str) -> str:
def slugify(name):
return name.replace(" ", "_").lower()

def get_promoted_path(self, env: str) -> str:
adls_path_clean = self.adls_path.strip("/")
extension = adls_path_clean.split(".")[-1].strip()
if extension == "parquet":
file_name = adls_path_clean.split("/")[-2] + ".csv"
common_path = "/".join(adls_path_clean.split("/")[1:-2])
else:
file_name = adls_path_clean.split("/")[-1]
common_path = "/".join(adls_path_clean.split("/")[1:-1])

promoted_path = os.path.join(env, common_path, file_name)

return promoted_path

def gen_flow(self) -> Flow:
lake_to_df_task = AzureDataLakeToDF(timeout=self.timeout)
df = lake_to_df_task.bind(
Expand Down Expand Up @@ -327,22 +311,6 @@ def gen_flow(self) -> Flow:
flow=self,
)

promote_to_conformed_task = AzureDataLakeCopy(timeout=self.timeout)
promote_to_conformed_task.bind(
from_path=self.adls_path,
to_path=self.adls_path_conformed,
sp_credentials_secret=self.adls_sp_credentials_secret,
vault_name=self.vault_name,
flow=self,
)
promote_to_operations_task = AzureDataLakeCopy(timeout=self.timeout)
promote_to_operations_task.bind(
from_path=self.adls_path_conformed,
to_path=self.adls_path_operations,
sp_credentials_secret=self.adls_sp_credentials_secret,
vault_name=self.vault_name,
flow=self,
)
create_table_task = AzureSQLCreateTable(timeout=self.timeout)
create_table_task.bind(
schema=self.schema,
Expand All @@ -368,13 +336,9 @@ def gen_flow(self) -> Flow:
# data validation function (optional)
if self.validate_df_dict:
validate_df.bind(df=df, tests=self.validate_df_dict, flow=self)
validate_df.set_upstream(lake_to_df_task, flow=self)
df_reorder.set_upstream(validate_df, flow=self)

df_reorder.set_upstream(lake_to_df_task, flow=self)
df_to_csv.set_upstream(dtypes, flow=self)
df_to_csv.set_upstream(df_reorder, flow=self)
promote_to_conformed_task.set_upstream(df_to_csv, flow=self)
create_table_task.set_upstream(df_to_csv, flow=self)
promote_to_operations_task.set_upstream(
promote_to_conformed_task, flow=self
)
bulk_insert_task.set_upstream(create_table_task, flow=self)
12 changes: 10 additions & 2 deletions viadot/flows/sap_rfc_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ class SAPRFCToADLS(Flow):
def __init__(
self,
name: str,
query: str = None,
query: str,
rfc_sep: str = None,
rfc_replacement: str = "-",
func: str = "RFC_READ_TABLE",
rfc_total_col_width_character_limit: int = 400,
rfc_unique_id: List[str] = None,
sap_credentials: dict = None,
sap_credentials_key: str = "SAP",
env: str = "DEV",
output_file_extension: str = ".parquet",
local_file_path: str = None,
file_sep: str = "\t",
Expand Down Expand Up @@ -66,7 +68,9 @@ def __init__(
rfc_unique_id=["VBELN", "LPRIO"],
...
)
sap_credentials (dict, optional): The credentials to use to authenticate with SAP. By default, they're taken from the local viadot config.
sap_credentials (dict, optional): The credentials to use to authenticate with SAP. Defaults to None.
sap_credentials_key (str, optional): The key for sap credentials located in the local config or Azure Key Vault. Defaults to "SAP".
env (str, optional): The key for sap_credentials_key pointing to the SAP environment. Defaults to "DEV"
output_file_extension (str, optional): Output file extension - to allow selection of .csv for data which is not easy to handle with parquet. Defaults to ".parquet".
local_file_path (str, optional): Local destination path. Defaults to None.
file_sep(str, optional): The separator to use in the CSV. Defaults to "\t".
Expand All @@ -91,6 +95,8 @@ def __init__(
self.rfc_total_col_width_character_limit = rfc_total_col_width_character_limit
self.rfc_unique_id = rfc_unique_id
self.sap_credentials = sap_credentials
self.sap_credentials_key = sap_credentials_key
self.env = env
self.output_file_extension = output_file_extension
self.local_file_path = local_file_path
self.file_sep = file_sep
Expand Down Expand Up @@ -121,6 +127,8 @@ def gen_flow(self) -> Flow:
rfc_unique_id=self.rfc_unique_id,
alternative_version=self.alternative_version,
credentials=self.sap_credentials,
sap_credentials_key=self.sap_credentials_key,
env=self.env,
flow=self,
)
if self.validate_df_dict:
Expand Down
Loading

0 comments on commit 1cb1a4b

Please sign in to comment.