From 9ab4587377f8ac626b4f2cb69e328c2f0742af86 Mon Sep 17 00:00:00 2001 From: Rob Hanna Date: Tue, 7 Feb 2023 17:55:43 +0000 Subject: [PATCH] change logging and error trapping --- fim_pipeline.sh | 8 +- fim_post_processing.sh | 13 +-- fim_process_unit_wb.sh | 18 ++- src/generate_branch_list.py | 44 ++++---- src/generate_branch_list_csv.py | 55 +++++++++ src/process_branch.sh | 16 ++- src/run_unit_wb.sh | 18 +-- .../generate_branch_list_csv_params.json | 20 ++++ .../generate_branch_list_csv_unittests.py | 104 ++++++++++++++++++ unit_tests/generate_branch_list_params.json | 6 +- unit_tests/generate_branch_list_unittests.py | 6 +- 11 files changed, 250 insertions(+), 58 deletions(-) create mode 100755 src/generate_branch_list_csv.py create mode 100644 unit_tests/generate_branch_list_csv_params.json create mode 100644 unit_tests/generate_branch_list_csv_unittests.py diff --git a/fim_pipeline.sh b/fim_pipeline.sh index 171efe791..fecc24863 100755 --- a/fim_pipeline.sh +++ b/fim_pipeline.sh @@ -38,7 +38,7 @@ source $srcDir/bash_functions.env . $projectDir/fim_pre_processing.sh "$@" -logFile=$outputRunDataDir/logs/pipeline_summary_unit.log +logFile=$outputRunDataDir/logs/unit/pipeline_summary_unit.log process_wb_file=$projectDir/fim_process_unit_wb.sh pipeline_start_time=`date +%s` @@ -47,15 +47,15 @@ pipeline_start_time=`date +%s` # Why an if and else? watch the number of colons if [ -f "$hucList" ]; then if [ "$jobHucLimit" = "1" ]; then - parallel --verbose --lb -j $jobHucLimit --colsep ',' --joblog $logFile -- $process_wb_file $runName :::: $hucList + parallel --verbose --lb -j $jobHucLimit --colsep ',' --joblog $logFile -- $process_wb_file $runName :::: $hucList else parallel --eta -j $jobHucLimit --colsep ',' --joblog $logFile -- $process_wb_file $runName :::: $hucList fi else if [ "$jobHucLimit" = "1" ]; then - parallel --verbose --lb -j $jobHucLimit --colsep ',' --joblog $logFile -- $process_wb_file $runName ::: $hucList + parallel --verbose --lb -j $jobHucLimit --colsep ',' --joblog $logFile -- $process_wb_file $runName ::: $hucList else - parallel --eta -j $jobHucLimit --colsep ',' --joblog $logFile -- $process_wb_file ::: $hucList + parallel --eta -j $jobHucLimit --colsep ',' --joblog $logFile -- $process_wb_file $runName ::: $hucList fi fi diff --git a/fim_post_processing.sh b/fim_post_processing.sh index 7528340b8..b0a4a3739 100755 --- a/fim_post_processing.sh +++ b/fim_post_processing.sh @@ -89,12 +89,8 @@ post_proc_start_time=`date +%s` echo -e $startDiv"Start branch aggregation" python3 $srcDir/aggregate_branch_lists.py -d $outputRunDataDir -f "branch_ids.csv" -o $gms_inputs - ## GET NON ZERO EXIT CODES FOR UNITS ## -# Needed in case aggregation fails, we will need the logs -echo -e $startDiv"Start of unit non zero exit codes check" -find $outputRunDataDir/logs/ -name "*_unit.log" -type f | xargs grep -E "Exit status: ([1-9][0-9]{0,2})" >"$outputRunDataDir/unit_errors/non_zero_exit_codes.log" & - +## No longer applicable ## GET NON ZERO EXIT CODES FOR BRANCHES ## echo -e $startDiv"Start non-zero exit code checking" @@ -103,9 +99,10 @@ find $outputRunDataDir/logs/branch -name $hucNumber"_branch_*.log" -type f | xar ## REMOVE FAILED BRANCHES ## # Needed in case aggregation fails, we will need the logs -echo -e $startDiv"Removing branches that failed with Exit status: 61" -Tstart -python3 $srcDir/gms/remove_error_branches.py -f "$outputRunDataDir/branch_errors/non_zero_exit_codes.log" -g $gms_inputs +#echo +#echo -e $startDiv"Removing branches that failed with Exit status: 61" +#Tstart +#python3 $srcDir/gms/remove_error_branches.py -f "$outputRunDataDir/branch_errors/non_zero_exit_codes.log" -g #$gms_inputs ## RUN AGGREGATE BRANCH ELEV TABLES ## echo "Processing usgs gage aggregation" diff --git a/fim_process_unit_wb.sh b/fim_process_unit_wb.sh index 991a894b5..0f3bb8b69 100755 --- a/fim_process_unit_wb.sh +++ b/fim_process_unit_wb.sh @@ -87,6 +87,7 @@ hucLogFileName=$outputRunDataDir/logs/unit/"$hucNumber"_unit.log /usr/bin/time -v $srcDir/run_unit_wb.sh 2>&1 | tee $hucLogFileName #exit ${PIPESTATUS[0]} (and yes.. there can be more than one) +# and yes.. we can not use the $? as we are messing with exit codes return_codes=( "${PIPESTATUS[@]}" ) #echo "huc return codes are:" @@ -94,9 +95,11 @@ return_codes=( "${PIPESTATUS[@]}" ) # we do this way instead of working directly with stderr and stdout # as they were messing with output logs which we always want. +err_exists=0 for code in "${return_codes[@]}" do - # Make an extra copy of the branch log in a new folder if an error + # Make an extra copy of the unit log into a new folder. + # Note: It was tricky to load in the fim_enum into bash, so we will just # go with the code for now if [ $code -eq 0 ]; then @@ -105,19 +108,22 @@ do elif [ $code -eq 60 ]; then echo echo "***** Unit has no valid branches *****" + err_exists=1 elif [ $code -eq 61 ]; then echo - echo "***** Unit has not a valid unit *****" + echo "***** Unit has no remaining valid flowlines *****" + err_exists=1 else echo echo "***** An error has occured *****" - # copy the error log over to the unit_errors folder to better isolate it - cp $hucLogFileName $outputRunDataDir/unit_errors + err_exists=1 fi done -# TODO: Check its output logs for this huc and its branches here - +if [ "$err_exists" = "1" ]; then + # copy the error log over to the unit_errors folder to better isolate it + cp $hucLogFileName $outputRunDataDir/unit_errors +fi echo "==========================================================================" # we always return a success at this point (so we don't stop the loops / iterator) exit 0 diff --git a/src/generate_branch_list.py b/src/generate_branch_list.py index 50fc2c3b2..6e1240de4 100755 --- a/src/generate_branch_list.py +++ b/src/generate_branch_list.py @@ -8,12 +8,25 @@ sys.path.append('/foss_fim/src/gms/') from stream_branches import StreamNetwork -def Generate_branch_list(stream_network_dissolved, branch_id_attribute, - output_branch_list, output_branch_csv, huc_id): - - # we need two copies, one that is a single column list for the branch iterator (parallel) - # and one for later tools that need the huc number as well. (aggregate hucs) +def generate_branch_list(stream_network_dissolved, branch_id_attribute, + output_branch_list_file): + ''' + Processing: + This create a branch_ids.lst file which is required at the very start of processing + hucs. This becomes the list that run_unit_wb.sh needs to iterate over branches + + Note: The .csv twin to this is appended to each time a branch completes, + resulting in a list that only contains successfully processed branches. + Params: + - stream_network_dissolved (str): the gkpg that contains the list of disolved branch ids + - branch_id_attribute (str): the id of the field in the gkpg that has the branch ids. + (ie. like levpa_id (from params_template.env) ) + - output_branch_list_file (str): file name and path of the list to be created. + Output: + - create a file (likely a .lst file) with branch ids (not including branch zero) + ''' + if os.path.exists(stream_network_dissolved): # load stream network stream_network_dissolved = StreamNetwork.from_file( stream_network_dissolved, @@ -22,20 +35,7 @@ def Generate_branch_list(stream_network_dissolved, branch_id_attribute, stream_network_dissolved = stream_network_dissolved.loc[:,branch_id_attribute] # write out the list version (just branch numbers) - stream_network_dissolved.to_csv(output_branch_list, sep= " ", index=False, header=False) - - # we only add branch zero to the csv, not the list - branch_zero_row = pd.Series("0") - bz_stream_network_dissolved = stream_network_dissolved.append(branch_zero_row) - - # Create the dataframe version - df_stream_network_dissolved = bz_stream_network_dissolved.to_frame() - - # add the extra column (first column) - df_stream_network_dissolved.insert(0, 'huc_id', huc_id, True) - - #stream_network_dissolved.to_csv(output_branch_list,sep= " ",index=False,header=False) - df_stream_network_dissolved.to_csv(output_branch_csv, index=False, header=False) + stream_network_dissolved.to_csv(output_branch_list_file, sep= " ", index=False, header=False) if __name__ == '__main__': @@ -43,10 +43,8 @@ def Generate_branch_list(stream_network_dissolved, branch_id_attribute, parser = argparse.ArgumentParser(description='Create branch list') parser.add_argument('-d','--stream-network-dissolved', help='Dissolved stream network', required=True) parser.add_argument('-b','--branch-id-attribute', help='Branch ID attribute to use in dissolved stream network', required=True) - parser.add_argument('-oc','--output-branch-csv', help='Output branch list', required=True) - parser.add_argument('-ol','--output-branch-list', help='Output branch list', required=True) - parser.add_argument('-u','--huc-id', help='HUC number being aggregated', required=True) + parser.add_argument('-o','--output-branch-list-file', help='Output branch list', required=True) args = vars(parser.parse_args()) - Generate_branch_list(**args) + generate_branch_list(**args) diff --git a/src/generate_branch_list_csv.py b/src/generate_branch_list_csv.py new file mode 100755 index 000000000..e355e9f25 --- /dev/null +++ b/src/generate_branch_list_csv.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 + +import os +import argparse +import pandas as pd +import pathlib + +def generate_branch_list_csv(huc_id, branch_id, output_branch_csv): + + ''' + Processing: + This create a branch_ids.csv file which is required for various post processing tasks. + If the csv already, then the new huc, branch id wil be appended. + If it does not yet exist, a new csv will be created + + Params: + - huc_id + - branch_id + - output_branch_csv (str): csv file name and path of the list to be created. (likely branch_list.csv) + + Output: + - create a csv file (assuming the format coming in is a csv + ''' + # validations + file_extension = pathlib.Path(output_branch_csv).suffix + + if (file_extension != ".csv"): + raise ValueError("The output branch csv file does not have a .csv extension") + + if (len(huc_id) != 8) or (not huc_id.isnumeric()): + raise ValueError("The huc_id does not appear to be an eight digit number") + + if (not branch_id.isnumeric()): + raise ValueError("The branch_id does not appear to be a valid number") + + df_csv = None + new_data = [[huc_id, branch_id]] + col_names = ["huc_id","branch_id"] + df_csv = pd.DataFrame(new_data, columns=col_names) + + if (not os.path.exists(output_branch_csv)): + df_csv.to_csv(output_branch_csv, index=False, header=False) + else: + df_csv.to_csv(output_branch_csv, mode='a', index=False, header=False) + + +if __name__ == '__main__': + + parser = argparse.ArgumentParser(description='Create branch list') + parser.add_argument('-b','--branch-id', help='Branch ID', required=True) + parser.add_argument('-o','--output-branch-csv', help='Output branch csv list', required=True) + parser.add_argument('-u','--huc-id', help='HUC number being aggregated', required=True) + args = vars(parser.parse_args()) + + generate_branch_list_csv(**args) diff --git a/src/process_branch.sh b/src/process_branch.sh index 220ab29ce..627cfa182 100755 --- a/src/process_branch.sh +++ b/src/process_branch.sh @@ -15,15 +15,17 @@ branchId=$3 # outputDataDir, srcDir and others come from the Dockerfile export outputRunDataDir=$outputDataDir/$runName -branchLogFileName=$outputRunDataDir/logs/branch/"$hucNumber"_branch_$branchId.log +branchLogFileName=$outputRunDataDir/logs/branch/"$hucNumber"_branch_"$branchId".log +branch_list_csv_file=$outputRunDataDir/$hucNumber/branch_ids.csv -/usr/bin/time -v $srcDir/gms/run_by_branch.sh $hucNumber $branchId |& tee $branchLogFileName +/usr/bin/time -v $srcDir/gms/run_by_branch.sh $hucNumber $branchId 2>&1 | tee $branchLogFileName #exit ${PIPESTATUS[0]} return_codes=( "${PIPESTATUS[@]}" ) # we do this way instead of working directly with stderr and stdout # as they were messing with output logs which we always want. +err_exists=0 for code in "${return_codes[@]}" do # Make an extra copy of the branch log in a new folder @@ -34,12 +36,22 @@ do # do nothing elif [ $code -eq 61 ]; then echo + err_exists=1 echo "***** Branch has no valid flowlines *****" elif [ $code -ne 0 ]; then echo + err_exists=1 echo "***** An error has occured *****" cp $branchLogFileName $outputRunDataDir/branch_errors fi done +# Note: For branches, we do not copy over the log file for codes of 60 and 61. + +if [ "$err_exists" = "0" ]; then + # Only add the huc and branch number to the csv is the branch was successful at processing + # We also don't want to include 60's and 61's + $srcDir/generate_branch_list_csv.py -o $branch_list_csv_file -u $hucNumber -b $branchId +fi + exit 0 # we always return a success at this point (so we don't stop the loops / iterator) diff --git a/src/run_unit_wb.sh b/src/run_unit_wb.sh index d7ed78c59..e1516d9df 100755 --- a/src/run_unit_wb.sh +++ b/src/run_unit_wb.sh @@ -3,7 +3,7 @@ # Do not call this file directly. Call fim_process_unit_wb.sh which calls # this file. -## SOURCE ENV FILE AND FUNCTIONS ## +## SOURCE FILE AND FUNCTIONS ## # load the various enviro files args_file=$outputRunDataDir/runtime_args.env @@ -12,8 +12,8 @@ source $outputRunDataDir/params.env source $srcDir/bash_functions.env source $srcDir/bash_variables.env -fim_inputs_csv=$outputHucDataDir/branch_ids.csv -fim_inputs_list=$outputHucDataDir/branch_ids.lst +branch_list_csv_file=$outputHucDataDir/branch_ids.csv +branch_list_lst_file=$outputHucDataDir/branch_ids.lst branchSummaryLogFile=$outputRunDataDir/logs/branch/"$hucNumber"_summary_branch.log @@ -101,10 +101,10 @@ $srcDir/gms/buffer_stream_branches.py -a $input_DEM_domain -s $outputHucDataDir/ Tcount ## CREATE BRANCHID LIST FILE -echo -e $startDiv"Create file of branch ids for $hucNumber" +echo -e $startDiv"Create list file of branch ids for $hucNumber" date -u Tstart -$srcDir/generate_branch_list.py -oc $fim_inputs_csv -ol $fim_inputs_list -d $outputHucDataDir/nwm_subset_streams_levelPaths_dissolved.gpkg -b $branch_id_attribute -u $hucNumber +$srcDir/generate_branch_list.py -d $outputHucDataDir/nwm_subset_streams_levelPaths_dissolved.gpkg -b $branch_id_attribute -o $branch_list_lst_file Tcount ## CREATE BRANCH ZERO ## @@ -220,7 +220,7 @@ fi ## CLEANUP BRANCH ZERO OUTPUTS ## echo -e $startDiv"Cleaning up outputs in branch zero $hucNumber" -$srcDir/gms/outputs_cleanup.py -d $outputCurrentBranchDataDir -l $deny_branch_zero_list -b 0 +$srcDir/gms/outputs_cleanup.py -d $outputCurrentBranchDataDir -l $deny_branch_zero_list -b $branch_zero_id ## REMOVE FILES FROM DENY LIST ## @@ -232,13 +232,17 @@ if [ -f $deny_unit_list ]; then Tcount fi +# ------------------- +## Start the local csv branch list +$srcDir/generate_branch_list_csv.py -o $branch_list_csv_file -u $hucNumber -b $branch_zero_id + # ------------------- ## Processing Branches ## echo echo "---- Start of branch processing for $hucNumber" branch_processing_start_time=`date +%s` -parallel --eta --timeout $branch_timeout -j $jobBranchLimit --joblog $branchSummaryLogFile --colsep ',' -- $srcDir/process_branch.sh $runName $hucNumber :::: $fim_inputs_list +parallel --eta --timeout $branch_timeout -j $jobBranchLimit --joblog $branchSummaryLogFile --colsep ',' -- $srcDir/process_branch.sh $runName $hucNumber :::: $branch_list_lst_file # ------------------- ## REMOVE FILES FROM DENY LIST FOR BRANCH ZERO (but using normal branch deny) ## diff --git a/unit_tests/generate_branch_list_csv_params.json b/unit_tests/generate_branch_list_csv_params.json new file mode 100644 index 000000000..d627a1356 --- /dev/null +++ b/unit_tests/generate_branch_list_csv_params.json @@ -0,0 +1,20 @@ +{ + "valid_data_add_branch_zero": + { + "huc_id": "05030104", + "branch_id": "0", + "output_branch_csv": "/outputs/rob_wb_3/05030104/branch_ids.csv" + }, + "valid_data_add_branch": + { + "huc_id": "05030104", + "branch_id": "1946000003", + "output_branch_csv": "/outputs/rob_wb_3/05030104/branch_ids.csv" + }, + "invalid_bad_file_extension": + { + "huc_id": "05030104", + "branch_id": "1946000003", + "output_branch_csv": "/outputs/rob_wb_3/05030104/branch_ids2" + } +} diff --git a/unit_tests/generate_branch_list_csv_unittests.py b/unit_tests/generate_branch_list_csv_unittests.py new file mode 100644 index 000000000..191d2fc01 --- /dev/null +++ b/unit_tests/generate_branch_list_csv_unittests.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 + +import inspect +import os +import sys + +import json +import warnings +import unittest + +sys.path.append('/foss_fim/unit_tests/') +from unit_tests_utils import FIM_unit_test_helpers as ut_helpers + +# importing python folders in other directories +sys.path.append('/foss_fim/src/') # *** update your folder path here if required *** +import generate_branch_list_csv as src + +class test_generate_branch_list_csv(unittest.TestCase): + + ''' + Allows the params to be loaded one and used for all test methods + ''' + @classmethod + def setUpClass(self): + + warnings.simplefilter('ignore') + params_file_path = ut_helpers.get_params_filename(__file__) + with open(params_file_path) as params_file: + self.params = json.load(params_file) + + # for these tests to work, we have to check if the .csv exists and remove it + # prior to exections of the tests. + + params = self.params["valid_data_add_branch_zero"].copy() + if (os.path.exists(params["output_branch_csv"])): + os.remove(params["output_branch_csv"]) + + + def test_generate_branch_list_csv_valid_data_add_branch_zero_success(self): + + # yes.. we know that we can not control the order + + #global params_file + params = self.params["valid_data_add_branch_zero"].copy() + + src.generate_branch_list_csv(huc_id = params["huc_id"], + branch_id = params["branch_id"], + output_branch_csv = params["output_branch_csv"]) + + + print(f"Test Success: {inspect.currentframe().f_code.co_name}") + print("*************************************************************") + + + def test_generate_branch_list_csv_valid_data_add_branch_success(self): + + # yes.. we know that we can not control the order + + #global params_file + params = self.params["valid_data_add_branch"].copy() + + src.generate_branch_list_csv(huc_id = params["huc_id"], + branch_id = params["branch_id"], + output_branch_csv = params["output_branch_csv"]) + + + print(f"Test Success: {inspect.currentframe().f_code.co_name}") + print("*************************************************************") + + def test_generate_branch_list_csv_invalid_bad_file_extension(self): + + #global params_file + params = self.params["invalid_bad_file_extension"].copy() + + # we expect this to fail. If it does fail with an exception, then this test + # is sucessful. + try: + src.generate_branch_list_csv(huc_id = params["huc_id"], + branch_id = params["branch_id"], + output_branch_csv = params["output_branch_csv"]) + + raise AssertionError("Fail = excepted a thrown exception but did not get it but was received. Unit Test has 'failed'") + + except Exception: + print() + print(f"Test Success (failed as expected): {inspect.currentframe().f_code.co_name}") + + finally: + print("*************************************************************") + + +if __name__ == '__main__': + + script_file_name = os.path.basename(__file__) + + print("*****************************") + print(f"Start of {script_file_name} tests") + print() + + unittest.main() + + print() + print(f"End of {script_file_name} tests") + diff --git a/unit_tests/generate_branch_list_params.json b/unit_tests/generate_branch_list_params.json index d76a30fd8..02a89bcea 100644 --- a/unit_tests/generate_branch_list_params.json +++ b/unit_tests/generate_branch_list_params.json @@ -1,10 +1,8 @@ { "valid_data": { - "stream_network_dissolved": "/outputs/rob_wb_1/05030104/nwm_subset_streams_levelPaths_dissolved.gpkg", + "stream_network_dissolved": "/outputs/rob_wb_3/05030104/nwm_subset_streams_levelPaths_dissolved.gpkg", "branch_id_attribute": "levpa_id", - "output_branch_list": "/outputs/rob_wb_1/05030104/branch_ids.lst", - "output_branch_csv": "/outputs/rob_wb_1/05030104/branch_ids.csv", - "huc_id": 15030104 + "output_branch_list_file": "/outputs/rob_wb_3/05030104/branch_ids.lst" } } diff --git a/unit_tests/generate_branch_list_unittests.py b/unit_tests/generate_branch_list_unittests.py index cb71711de..fc7ac7776 100644 --- a/unit_tests/generate_branch_list_unittests.py +++ b/unit_tests/generate_branch_list_unittests.py @@ -35,11 +35,9 @@ def test_Generate_branch_list_success(self): params = self.params["valid_data"].copy() #update "valid_data" value if you need to (aka.. more than one node) - src.Generate_branch_list(stream_network_dissolved = params["stream_network_dissolved"], + src.generate_branch_list(stream_network_dissolved = params["stream_network_dissolved"], branch_id_attribute = params["branch_id_attribute"], - output_branch_list = params["output_branch_list"], - output_branch_csv = params["output_branch_csv"], - huc_id = params["huc_id"]) + output_branch_list_file = params["output_branch_list_file"]) print(f"Test Success: {inspect.currentframe().f_code.co_name}")