Skip to content

add download recipe #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: bug/sc-249723-recipe-forces-type
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 252 additions & 0 deletions custom-recipes/api-connect-files-downloader/recipe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
{
"meta": {
"label": "File Downloader",
"description": "Download a list of files",
"icon": "icon-rocket"
},
"kind": "PYTHON",
"selectableFromDataset": "input_A_role",
"inputRoles": [
{
"name": "input_A_role",
"label": "Dataset listing the files to download",
"description": "",
"arity": "UNARY",
"required": true,
"acceptsDataset": true

}
],

"outputRoles": [
{
"name": "output_folder",
"label": "Download folder",
"description": "",
"arity": "UNARY",
"required": true,
"acceptsDataset": false,
"acceptsManagedFolder": true
},
{
"name": "result_dataset",
"label": "Result dataset",
"description": "",
"arity": "UNARY",
"required": false,
"acceptsDataset": true,
"acceptsManagedFolder": false
}
],
"params": [
{
"type": "SEPARATOR",
"label": "Authentication"
},
{
"name": "auth_type",
"label": "Authentication type",
"description": "",
"type": "SELECT",
"defaultValue": null,
"selectChoices":[
{"value": "secure_oauth", "label": "SSO"},
{"value": "secure_basic", "label": "Secure username / password"},
{"value": null, "label": "Other"}
]
},
{
"name": "credential",
"label": "Credential preset",
"type": "PRESET",
"parameterSetId": "credential",
"visibilityCondition": "model.auth_type == null"
},
{
"name": "secure_oauth",
"label": "SSO preset",
"type": "PRESET",
"parameterSetId": "secure-oauth",
"visibilityCondition": "model.auth_type == 'secure_oauth'"
},
{
"name": "secure_basic",
"label": "Credential preset",
"type": "PRESET",
"parameterSetId": "secure-basic",
"visibilityCondition": "model.auth_type == 'secure_basic'"
},
{
"name": "should_use_user_secrets",
"label": "User 'Other credentials'",
"description": "If checked, secrets under Profile > My account > Other credentials, are available to use as {{variables}}",
"type": "BOOLEAN",
"defaultValue": false
},
{
"type": "SEPARATOR",
"label": "API call parameters"
},

{
"name": "endpoint_url",
"label": "URL template",
"description": "https://{{my_variable}}.example.com/user/{{username}}/details",
"type": "TEXTAREA"
},
{
"name": "http_method",
"label": "HTTP method",
"description": "",
"type": "SELECT",
"defaultValue": "GET",
"selectChoices":[
{"value": "GET", "label": "GET"},
{"value": "POST", "label": "POST"},
{"value": "PUT", "label": "PUT"},
{"value": "PATCH", "label": "PATCH"},
{"value": "DELETE", "label": "DELETE"}
]
},
{
"name": "endpoint_query_string",
"label": "Query Params",
"description": "Will add ?key1=val1&key2=val2 to the URL",
"type": "KEY_VALUE_LIST"
},
{
"name": "endpoint_body",
"label": "Body",
"description": "",
"type": "KEY_VALUE_LIST",
"visibilityCondition": false
},
{
"name": "endpoint_headers",
"label": "Headers",
"description": "",
"type": "KEY_VALUE_LIST",
"defaultValue": [
{
"from": "Content-Type",
"to": "application/json"
},
{
"from": "Accept",
"to": "application/json"
}
]
},
{
"name": "body_format",
"label": "Body",
"description": "",
"type": "SELECT",
"defaultValue": null,
"selectChoices":[
{"value": null, "label": "None"},
{"value": "FORM_DATA", "label": "Form-data"},
{"value": "RAW", "label": "Raw"}
]
},
{
"name": "text_body",
"label": "Request's body",
"description": "",
"type": "TEXTAREA",
"visibilityCondition": "model.body_format=='RAW'"
},
{
"name": "key_value_body",
"label": "Request's body",
"description": "",
"type": "KEY_VALUE_LIST",
"visibilityCondition": "(['FORM_DATA'].indexOf(model.body_format)>-1)"
},

{
"type": "SEPARATOR",
"label": "Template variables",
"description": "URL, headers and parameters can be templated using {{variables}}"
},
{
"name": "parameter_columns",
"label": "Columns to use as variables",
"description": "Unless renamed, these columns can be used as {{ColumnName}} variables in templates",
"type": "COLUMNS",
"columnRole": "input_A_role"
},
{
"name": "parameter_renamings",
"label": "Variables columns renaming",
"description": "Rename 'My variable column' to 'my_variable', to use as {{my_variable}}, and avoid name colisions (optional)",
"type": "KEY_VALUE_LIST"
},
{
"name": "custom_key_values",
"label": "Custom keys / values",
"description": "Replace the variable {{key}} by its value in templates (optional)",
"type": "KEY_VALUE_LIST",
"visibilityCondition": false
},
{
"type": "SEPARATOR",
"label": "Advanced"
},
{
"name": "behaviour_when_error",
"label": "Error behaviour",
"description": "Decide how the recipe should react when an input line results in an error",
"type": "SELECT",
"defaultValue": "keep-error-column",
"selectChoices":[
{"value": "ignore", "label": "Ignore the error"},
{"value": "add-error-column", "label": "Add an error column"},
{"value": "keep-error-column", "label": "Error column always in schema"},
{"value": "raise", "label": "Fail the job"}
]
},
{
"name": "ignore_ssl_check",
"label": "Ignore SSL check",
"type": "BOOLEAN",
"visibilityCondition": "model.auth_type!='secure_oauth' && model.auth_type!='secure_basic'",
"defaultValue": false
},
{
"name": "redirect_auth_header",
"label": "Redirect authorization header",
"type": "BOOLEAN",
"defaultValue": false
},
{
"name": "display_metadata",
"label": "Display metadata",
"description": "Status code, request time...",
"type": "BOOLEAN",
"defaultValue": false
},
{
"name": "timeout",
"label": "Timeout (s)",
"description": "-1 for no limit",
"type": "INT",
"defaultValue": 3600
},
{
"name": "requests_per_minute",
"label": "Rate limit (requests/m)",
"description": "-1 for no limit",
"type": "INT",
"defaultValue": -1
},
{
"name": "maximum_number_rows",
"label": "Maximum number of rows",
"description": "-1 for no limit",
"type": "INT",
"defaultValue": -1
}
],
"resourceKeys": []
}
80 changes: 80 additions & 0 deletions custom-recipes/api-connect-files-downloader/recipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
import dataiku
from dataiku.customrecipe import get_input_names_for_role, get_recipe_config, get_output_names_for_role
import pandas as pd
from safe_logger import SafeLogger
from dku_utils import get_dku_key_values, get_endpoint_parameters, get_secure_credentials, get_user_secrets
from rest_api_recipe_session import RestApiRecipeSession
from dku_constants import DKUConstants


logger = SafeLogger("api-connect plugin", forbidden_keys=DKUConstants.FORBIDDEN_KEYS)


def get_partitioning_keys(id_list, dku_flow_variables):
partitioning_keys = {}
partitioning = id_list.get_config().get("partitioning")
if partitioning:
dimensions_types = partitioning.get("dimensions", [])
dimensions = []
for dimension_type in dimensions_types:
dimensions.append(dimension_type.get("name"))
for dimension in dimensions:
dimension_src = "DKU_DST_{}".format(dimension)
if dimension_src in dku_flow_variables:
partitioning_keys[dimension] = dku_flow_variables.get(dimension_src)
return partitioning_keys


logger.info('API-Connect plugin recipe v{}'.format(DKUConstants.PLUGIN_VERSION))

input_A_names = get_input_names_for_role('input_A_role')
config = get_recipe_config()
dku_flow_variables = dataiku.get_flow_variables()

logger.info("config={}".format(logger.filter_secrets(config)))

credential_parameters = config.get("credential", {})
behaviour_when_error = config.get("behaviour_when_error", "add-error-column")
endpoint_parameters = get_endpoint_parameters(config)
secure_credentials = get_secure_credentials(config)
extraction_key = endpoint_parameters.get("extraction_key", "")
is_raw_output = endpoint_parameters.get("raw_output", True)
parameter_columns = [column for column in config.get("parameter_columns", []) if column]
if len(parameter_columns) == 0:
raise ValueError("There is no parameter column selected.")
parameter_renamings = get_dku_key_values(config.get("parameter_renamings", {}))
custom_key_values = get_dku_key_values(config.get("custom_key_values", {}))
user_secrets = get_user_secrets(config)
custom_key_values.update(user_secrets)
display_metadata = config.get("display_metadata", False)
maximum_number_rows = config.get("maximum_number_rows", -1)
input_parameters_dataset = dataiku.Dataset(input_A_names[0])
partitioning_keys = get_partitioning_keys(input_parameters_dataset, dku_flow_variables)
custom_key_values.update(partitioning_keys)
input_parameters_dataframe = input_parameters_dataset.get_dataframe(infer_with_pandas=False)

output_names_stats = get_output_names_for_role('output_folder')[0]
folder = dataiku.Folder(output_names_stats)

recipe_session = RestApiRecipeSession(
custom_key_values,
credential_parameters,
secure_credentials,
endpoint_parameters,
extraction_key,
parameter_columns,
parameter_renamings,
display_metadata,
maximum_number_rows=maximum_number_rows,
behaviour_when_error=behaviour_when_error
)
results = recipe_session.process_dataframe(input_parameters_dataframe, is_raw_output, folder=folder)

result_dataset_names = get_output_names_for_role('result_dataset')
if len(result_dataset_names) > 0:
result_dataset_name = result_dataset_names[0]
odf = pd.DataFrame(results)
if odf.size > 0:
api_output = dataiku.Dataset(result_dataset_name)
api_output.write_with_schema(odf)
16 changes: 13 additions & 3 deletions python-lib/rest_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def get(self, url, can_raise_exeption=True, **kwargs):
json_response = self.request("GET", url, can_raise_exeption=can_raise_exeption, **kwargs)
return json_response

def request(self, method, url, can_raise_exeption=True, **kwargs):
def request(self, method, url, can_raise_exeption=True, raw_output=False, **kwargs):
logger.info(u"Accessing endpoint {} with params={}".format(url, kwargs.get("params")))
self.assert_secure_domain(url)
self.enforce_throttling()
Expand Down Expand Up @@ -161,6 +161,9 @@ def request(self, method, url, can_raise_exeption=True, **kwargs):
self.pagination.update_next_page({}, response.links)
return self.empty_json_response()

if raw_output:
return response

json_response = self.get_json_from_response(response, can_raise_exeption=can_raise_exeption)

return json_response
Expand All @@ -176,7 +179,7 @@ def request_with_redirect_retry(self, method, url, **kwargs):
response = self.session.request(method, response.url, **redirection_kwargs)
return response

def paginated_api_call(self, can_raise_exeption=True):
def paginated_api_call(self, can_raise_exeption=True, raw_output=False):
if self.pagination.params_must_be_blanked:
self.requests_kwargs["params"] = {}
else:
Expand All @@ -186,7 +189,14 @@ def paginated_api_call(self, can_raise_exeption=True):
self.requests_kwargs.update({"params": params})
self.call_number = self.call_number + 1
logger.info("API call number #{}".format(self.call_number))
return self.request(self.http_method, self.pagination.get_next_page_url(), can_raise_exeption, **self.requests_kwargs)
return self.request(self.http_method, self.pagination.get_next_page_url(), can_raise_exeption, raw_output=raw_output, **self.requests_kwargs)

def api_call(self, can_raise_exeption=True, raw_output=False):
params = self.requests_kwargs.get("params")
self.requests_kwargs.update({"params": params})
self.call_number = self.call_number + 1
logger.info("API call number #{}".format(self.call_number))
return self.request(self.http_method, self.endpoint_url, can_raise_exeption, raw_output=raw_output, **self.requests_kwargs)

def empty_json_response(self):
return {self.extraction_key: {}} if self.extraction_key else {}
Expand Down
Loading