diff --git a/custom-recipes/api-connect-files-downloader/recipe.json b/custom-recipes/api-connect-files-downloader/recipe.json new file mode 100644 index 0000000..565dbea --- /dev/null +++ b/custom-recipes/api-connect-files-downloader/recipe.json @@ -0,0 +1,252 @@ +{ + "meta": { + "label": "File Downloader", + "description": "Download a list of files", + "icon": "icon-rocket" + }, + "kind": "PYTHON", + "selectableFromDataset": "input_A_role", + "inputRoles": [ + { + "name": "input_A_role", + "label": "Dataset listing the files to download", + "description": "", + "arity": "UNARY", + "required": true, + "acceptsDataset": true + + } + ], + + "outputRoles": [ + { + "name": "output_folder", + "label": "Download folder", + "description": "", + "arity": "UNARY", + "required": true, + "acceptsDataset": false, + "acceptsManagedFolder": true + }, + { + "name": "result_dataset", + "label": "Result dataset", + "description": "", + "arity": "UNARY", + "required": false, + "acceptsDataset": true, + "acceptsManagedFolder": false + } + ], + "params": [ + { + "type": "SEPARATOR", + "label": "Authentication" + }, + { + "name": "auth_type", + "label": "Authentication type", + "description": "", + "type": "SELECT", + "defaultValue": null, + "selectChoices":[ + {"value": "secure_oauth", "label": "SSO"}, + {"value": "secure_basic", "label": "Secure username / password"}, + {"value": null, "label": "Other"} + ] + }, + { + "name": "credential", + "label": "Credential preset", + "type": "PRESET", + "parameterSetId": "credential", + "visibilityCondition": "model.auth_type == null" + }, + { + "name": "secure_oauth", + "label": "SSO preset", + "type": "PRESET", + "parameterSetId": "secure-oauth", + "visibilityCondition": "model.auth_type == 'secure_oauth'" + }, + { + "name": "secure_basic", + "label": "Credential preset", + "type": "PRESET", + "parameterSetId": "secure-basic", + "visibilityCondition": "model.auth_type == 'secure_basic'" + }, + { + "name": "should_use_user_secrets", + "label": "User 'Other credentials'", + "description": "If checked, secrets under Profile > My account > Other credentials, are available to use as {{variables}}", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "type": "SEPARATOR", + "label": "API call parameters" + }, + + { + "name": "endpoint_url", + "label": "URL template", + "description": "https://{{my_variable}}.example.com/user/{{username}}/details", + "type": "TEXTAREA" + }, + { + "name": "http_method", + "label": "HTTP method", + "description": "", + "type": "SELECT", + "defaultValue": "GET", + "selectChoices":[ + {"value": "GET", "label": "GET"}, + {"value": "POST", "label": "POST"}, + {"value": "PUT", "label": "PUT"}, + {"value": "PATCH", "label": "PATCH"}, + {"value": "DELETE", "label": "DELETE"} + ] + }, + { + "name": "endpoint_query_string", + "label": "Query Params", + "description": "Will add ?key1=val1&key2=val2 to the URL", + "type": "KEY_VALUE_LIST" + }, + { + "name": "endpoint_body", + "label": "Body", + "description": "", + "type": "KEY_VALUE_LIST", + "visibilityCondition": false + }, + { + "name": "endpoint_headers", + "label": "Headers", + "description": "", + "type": "KEY_VALUE_LIST", + "defaultValue": [ + { + "from": "Content-Type", + "to": "application/json" + }, + { + "from": "Accept", + "to": "application/json" + } + ] + }, + { + "name": "body_format", + "label": "Body", + "description": "", + "type": "SELECT", + "defaultValue": null, + "selectChoices":[ + {"value": null, "label": "None"}, + {"value": "FORM_DATA", "label": "Form-data"}, + {"value": "RAW", "label": "Raw"} + ] + }, + { + "name": "text_body", + "label": "Request's body", + "description": "", + "type": "TEXTAREA", + "visibilityCondition": "model.body_format=='RAW'" + }, + { + "name": "key_value_body", + "label": "Request's body", + "description": "", + "type": "KEY_VALUE_LIST", + "visibilityCondition": "(['FORM_DATA'].indexOf(model.body_format)>-1)" + }, + + { + "type": "SEPARATOR", + "label": "Template variables", + "description": "URL, headers and parameters can be templated using {{variables}}" + }, + { + "name": "parameter_columns", + "label": "Columns to use as variables", + "description": "Unless renamed, these columns can be used as {{ColumnName}} variables in templates", + "type": "COLUMNS", + "columnRole": "input_A_role" + }, + { + "name": "parameter_renamings", + "label": "Variables columns renaming", + "description": "Rename 'My variable column' to 'my_variable', to use as {{my_variable}}, and avoid name colisions (optional)", + "type": "KEY_VALUE_LIST" + }, + { + "name": "custom_key_values", + "label": "Custom keys / values", + "description": "Replace the variable {{key}} by its value in templates (optional)", + "type": "KEY_VALUE_LIST", + "visibilityCondition": false + }, + { + "type": "SEPARATOR", + "label": "Advanced" + }, + { + "name": "behaviour_when_error", + "label": "Error behaviour", + "description": "Decide how the recipe should react when an input line results in an error", + "type": "SELECT", + "defaultValue": "keep-error-column", + "selectChoices":[ + {"value": "ignore", "label": "Ignore the error"}, + {"value": "add-error-column", "label": "Add an error column"}, + {"value": "keep-error-column", "label": "Error column always in schema"}, + {"value": "raise", "label": "Fail the job"} + ] + }, + { + "name": "ignore_ssl_check", + "label": "Ignore SSL check", + "type": "BOOLEAN", + "visibilityCondition": "model.auth_type!='secure_oauth' && model.auth_type!='secure_basic'", + "defaultValue": false + }, + { + "name": "redirect_auth_header", + "label": "Redirect authorization header", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "name": "display_metadata", + "label": "Display metadata", + "description": "Status code, request time...", + "type": "BOOLEAN", + "defaultValue": false + }, + { + "name": "timeout", + "label": "Timeout (s)", + "description": "-1 for no limit", + "type": "INT", + "defaultValue": 3600 + }, + { + "name": "requests_per_minute", + "label": "Rate limit (requests/m)", + "description": "-1 for no limit", + "type": "INT", + "defaultValue": -1 + }, + { + "name": "maximum_number_rows", + "label": "Maximum number of rows", + "description": "-1 for no limit", + "type": "INT", + "defaultValue": -1 + } + ], + "resourceKeys": [] +} diff --git a/custom-recipes/api-connect-files-downloader/recipe.py b/custom-recipes/api-connect-files-downloader/recipe.py new file mode 100644 index 0000000..2796d3b --- /dev/null +++ b/custom-recipes/api-connect-files-downloader/recipe.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +import dataiku +from dataiku.customrecipe import get_input_names_for_role, get_recipe_config, get_output_names_for_role +import pandas as pd +from safe_logger import SafeLogger +from dku_utils import get_dku_key_values, get_endpoint_parameters, get_secure_credentials, get_user_secrets +from rest_api_recipe_session import RestApiRecipeSession +from dku_constants import DKUConstants + + +logger = SafeLogger("api-connect plugin", forbidden_keys=DKUConstants.FORBIDDEN_KEYS) + + +def get_partitioning_keys(id_list, dku_flow_variables): + partitioning_keys = {} + partitioning = id_list.get_config().get("partitioning") + if partitioning: + dimensions_types = partitioning.get("dimensions", []) + dimensions = [] + for dimension_type in dimensions_types: + dimensions.append(dimension_type.get("name")) + for dimension in dimensions: + dimension_src = "DKU_DST_{}".format(dimension) + if dimension_src in dku_flow_variables: + partitioning_keys[dimension] = dku_flow_variables.get(dimension_src) + return partitioning_keys + + +logger.info('API-Connect plugin recipe v{}'.format(DKUConstants.PLUGIN_VERSION)) + +input_A_names = get_input_names_for_role('input_A_role') +config = get_recipe_config() +dku_flow_variables = dataiku.get_flow_variables() + +logger.info("config={}".format(logger.filter_secrets(config))) + +credential_parameters = config.get("credential", {}) +behaviour_when_error = config.get("behaviour_when_error", "add-error-column") +endpoint_parameters = get_endpoint_parameters(config) +secure_credentials = get_secure_credentials(config) +extraction_key = endpoint_parameters.get("extraction_key", "") +is_raw_output = endpoint_parameters.get("raw_output", True) +parameter_columns = [column for column in config.get("parameter_columns", []) if column] +if len(parameter_columns) == 0: + raise ValueError("There is no parameter column selected.") +parameter_renamings = get_dku_key_values(config.get("parameter_renamings", {})) +custom_key_values = get_dku_key_values(config.get("custom_key_values", {})) +user_secrets = get_user_secrets(config) +custom_key_values.update(user_secrets) +display_metadata = config.get("display_metadata", False) +maximum_number_rows = config.get("maximum_number_rows", -1) +input_parameters_dataset = dataiku.Dataset(input_A_names[0]) +partitioning_keys = get_partitioning_keys(input_parameters_dataset, dku_flow_variables) +custom_key_values.update(partitioning_keys) +input_parameters_dataframe = input_parameters_dataset.get_dataframe(infer_with_pandas=False) + +output_names_stats = get_output_names_for_role('output_folder')[0] +folder = dataiku.Folder(output_names_stats) + +recipe_session = RestApiRecipeSession( + custom_key_values, + credential_parameters, + secure_credentials, + endpoint_parameters, + extraction_key, + parameter_columns, + parameter_renamings, + display_metadata, + maximum_number_rows=maximum_number_rows, + behaviour_when_error=behaviour_when_error +) +results = recipe_session.process_dataframe(input_parameters_dataframe, is_raw_output, folder=folder) + +result_dataset_names = get_output_names_for_role('result_dataset') +if len(result_dataset_names) > 0: + result_dataset_name = result_dataset_names[0] + odf = pd.DataFrame(results) + if odf.size > 0: + api_output = dataiku.Dataset(result_dataset_name) + api_output.write_with_schema(odf) diff --git a/python-lib/rest_api_client.py b/python-lib/rest_api_client.py index 0b467ce..41a4fdf 100644 --- a/python-lib/rest_api_client.py +++ b/python-lib/rest_api_client.py @@ -116,7 +116,7 @@ def get(self, url, can_raise_exeption=True, **kwargs): json_response = self.request("GET", url, can_raise_exeption=can_raise_exeption, **kwargs) return json_response - def request(self, method, url, can_raise_exeption=True, **kwargs): + def request(self, method, url, can_raise_exeption=True, raw_output=False, **kwargs): logger.info(u"Accessing endpoint {} with params={}".format(url, kwargs.get("params"))) self.assert_secure_domain(url) self.enforce_throttling() @@ -161,6 +161,9 @@ def request(self, method, url, can_raise_exeption=True, **kwargs): self.pagination.update_next_page({}, response.links) return self.empty_json_response() + if raw_output: + return response + json_response = self.get_json_from_response(response, can_raise_exeption=can_raise_exeption) return json_response @@ -176,7 +179,7 @@ def request_with_redirect_retry(self, method, url, **kwargs): response = self.session.request(method, response.url, **redirection_kwargs) return response - def paginated_api_call(self, can_raise_exeption=True): + def paginated_api_call(self, can_raise_exeption=True, raw_output=False): if self.pagination.params_must_be_blanked: self.requests_kwargs["params"] = {} else: @@ -186,7 +189,14 @@ def paginated_api_call(self, can_raise_exeption=True): self.requests_kwargs.update({"params": params}) self.call_number = self.call_number + 1 logger.info("API call number #{}".format(self.call_number)) - return self.request(self.http_method, self.pagination.get_next_page_url(), can_raise_exeption, **self.requests_kwargs) + return self.request(self.http_method, self.pagination.get_next_page_url(), can_raise_exeption, raw_output=raw_output, **self.requests_kwargs) + + def api_call(self, can_raise_exeption=True, raw_output=False): + params = self.requests_kwargs.get("params") + self.requests_kwargs.update({"params": params}) + self.call_number = self.call_number + 1 + logger.info("API call number #{}".format(self.call_number)) + return self.request(self.http_method, self.endpoint_url, can_raise_exeption, raw_output=raw_output, **self.requests_kwargs) def empty_json_response(self): return {self.extraction_key: {}} if self.extraction_key else {} diff --git a/python-lib/rest_api_recipe_session.py b/python-lib/rest_api_recipe_session.py index 8a474ba..97463e2 100644 --- a/python-lib/rest_api_recipe_session.py +++ b/python-lib/rest_api_recipe_session.py @@ -40,7 +40,7 @@ def get_column_to_parameter_dict(parameter_columns, parameter_renamings): column_to_parameter_dict[parameter_column] = parameter_column return column_to_parameter_dict - def process_dataframe(self, input_parameters_dataframe, is_raw_output): + def process_dataframe(self, input_parameters_dataframe, is_raw_output, folder=None): results = [] time_last_request = None session = requests.Session() @@ -71,15 +71,34 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output): behaviour_when_error=self.behaviour_when_error ) self.client.time_last_request = time_last_request - while self.client.has_more_data(): - page_results = self.retrieve_next_page(is_raw_output) - results.extend(page_results) - rows_count += len(page_results) - if self.is_row_limit and rows_count >= self.maximum_number_rows: - break + if not folder: + while self.client.has_more_data(): + page_results = self.retrieve_next_page(is_raw_output) + results.extend(page_results) + rows_count += len(page_results) + if self.is_row_limit and rows_count >= self.maximum_number_rows: + break + else: + result = self.download_to_folder(folder) + previous = json.loads(input_parameters_row.to_json()) + if isinstance(previous, dict): + result.update(previous) + results.extend([result]) time_last_request = self.client.time_last_request return results + def download_to_folder(self, folder): + file_name = self.client.endpoint_url.split("/")[-1:][0] + response = self.client.api_call(can_raise_exeption=False, raw_output=True) + status_code = None + if isinstance(response, requests.models.Response): + status_code = response.status_code + if status_code < 400: + folder.upload_data(file_name, response.content) + elif isinstance(response, dict): + return response + return {"Status code": status_code} + def retrieve_next_page(self, is_raw_output): page_rows = [] logger.info("retrieve_next_page: Calling next page")