diff --git a/config/deny_gms_branch_zero.lst b/config/deny_gms_branch_zero.lst index 3b80477a4..93bbf79b9 100644 --- a/config/deny_gms_branch_zero.lst +++ b/config/deny_gms_branch_zero.lst @@ -37,7 +37,7 @@ gw_catchments_reaches_{}.gpkg gw_catchments_reaches_{}.tif #gw_catchments_reaches_filtered_addedAttributes_{}.gpkg #gw_catchments_reaches_filtered_addedAttributes_{}.tif -gw_catchments_reaches_filtered_addedAttributes_crosswalked_{}.gpkg +#gw_catchments_reaches_filtered_addedAttributes_crosswalked_{}.gpkg headwaters_{}.tif #hydroTable_{}.csv idFile_{}.txt diff --git a/config/deny_gms_branches_min.lst b/config/deny_gms_branches_min.lst index b11c17d06..4492a170f 100644 --- a/config/deny_gms_branches_min.lst +++ b/config/deny_gms_branches_min.lst @@ -37,7 +37,7 @@ gw_catchments_reaches_{}.gpkg gw_catchments_reaches_{}.tif #gw_catchments_reaches_filtered_addedAttributes_{}.gpkg #gw_catchments_reaches_filtered_addedAttributes_{}.tif -gw_catchments_reaches_filtered_addedAttributes_crosswalked_{}.gpkg +#gw_catchments_reaches_filtered_addedAttributes_crosswalked_{}.gpkg headwaters_{}.tif #hydroTable_{}.csv idFile_{}.txt diff --git a/config/deny_gms_unit_default.lst b/config/deny_gms_unit_default.lst index f4f91920a..53f32b920 100644 --- a/config/deny_gms_unit_default.lst +++ b/config/deny_gms_unit_default.lst @@ -13,6 +13,7 @@ nwm_headwaters.gpkg #nwm_subset_streams_levelPaths.gpkg #nwm_subset_streams_levelPaths_dissolved.gpkg #nwm_subset_streams_levelPaths_dissolved_headwaters.gpkg +#usgs_elev_table.csv #usgs_subset_gages.gpkg #wbd.gpkg #wbd8_clp.gpkg diff --git a/config/params_template.env b/config/params_template.env index 4d967cbbf..49db90523 100644 --- a/config/params_template.env +++ b/config/params_template.env @@ -52,6 +52,16 @@ export vrough_suffix="_vmann" # text to append to output log and src_full_crossw export vmann_input_file="data/inputs/rating_curve/variable_roughness/mannings_global_06_011.csv" # input file location with nwm feature_id and channel roughness and overbank roughness attributes export bankfull_attribute="chann_volume_ratio" # src_full_crosswalked_bankfull.csv attribute (column id) containing the channel vs overbank ratio values (generated in the identify_src_bankfull.py) +#### apply SRC adjustments using USGS rating curve database #### +export src_adjust_usgs="True" # Toggle to run src adjustment routine (True=on; False=off) +export nwm_recur_file="data/inputs/rating_curve/nwm_recur_flows/nwm21_17C_recurrence_flows_cfs.csv" # input file location with nwm feature_id and recurrence flow values + +#### apply SRC adjustments using observed FIM/flow point database #### +export src_adjust_spatial="True" # Toggle to run src adjustment routine (True=on; False=off) +export fim_obs_pnt_data="data/inputs/rating_curve/water_edge_database/usgs_nws_benchmark_points_cleaned.gpkg" +#### path to env file with sensitive paths for accessing postgres database #### +export CALB_DB_KEYS_FILE="/data/config/calb_db_keys.env" + #### computational parameters #### export ncores_gw=1 # mpi number of cores for gagewatershed export ncores_fd=1 # mpi number of cores for flow directions @@ -60,4 +70,4 @@ export memfree=0G # min free memory required to start a new job or keep youngest #### logging parameters #### export startDiv="\n##########################################################################\n" -export stopDiv="\n##########################################################################" +export stopDiv="\n##########################################################################" \ No newline at end of file diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index adab50e16..a811abeac 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,43 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. +## v4.0.7.0 - 2022-08-17 - [PR #657](https://github.com/NOAA-OWP/inundation-mapping/pull/657) + +Introduces synthetic rating curve calibration workflow. The calibration computes new Manning's coefficients for the HAND SRCs using input data: USGS gage locations, USGS rating curve csv, and a benchmark FIM extent point database stored in PostgreSQL database. This addresses [#535]. + +## Additions + +- `src/src_adjust_spatial_obs.py`: new synthetic rating curve calibration routine that prepares all of the spatial (point data) benchmark data for ingest to the Manning's coefficient calculations performed in `src_roughness_optimization.py` +- `src/src_adjust_usgs_rating.py`: new synthetic rating curve calibration routine that prepares all of the USGS gage location and observed rating curve data for ingest to the Manning's coefficient calculations performed in `src_roughness_optimization.py` +- `src/src_roughness_optimization.py`: new SRC post-processing script that ingests observed data and HUC/branch FIM output data to compute optimized Manning's coefficient values and update the discharge values in the SRCs. Outputs a new hydroTable.csv. + +## Changes + +- `config/deny_gms_branch_zero.lst`: added `gw_catchments_reaches_filtered_addedAttributes_crosswalked_{}.gpkg` to list of files to keep (used in calibration workflow) +- `config/deny_gms_branches_min.lst`: added `gw_catchments_reaches_filtered_addedAttributes_crosswalked_{}.gpkg` to list of files to keep (used in calibration workflow) +- `config/deny_gms_unit_default.lst`: added `usgs_elev_table.csv` to list of files to keep (used in calibration workflow) +- `config/params_template.env`: added new variables for user to control calibration + - `src_adjust_usgs`: Toggle to run src adjustment routine (True=on; False=off) + - `nwm_recur_file`: input file location with nwm feature_id and recurrence flow values + - `src_adjust_spatial`: Toggle to run src adjustment routine (True=on; False=off) + - `fim_obs_pnt_data`: input file location with benchmark point data used to populate the postgresql database + - `CALB_DB_KEYS_FILE`: path to env file with sensitive paths for accessing postgres database +- `gms_run_branch.sh`: includes new steps in the workflow to connect to the calibration PostgreSQL database, run SRC calibration w/ USGS gage rating curves, run SRC calibration w/ benchmark point database +- `src/add_crosswalk.py`: added step to create placeholder variables to be replaced in post-processing (as needed). Created here to ensure consistent column variables in the final hydrotable.csv +- `src/gms/run_by_unit.sh`: added new steps to workflow to create the `usgs_subset_gages.gpkg` file for branch zero and then perform crosswalk and create `usgs_elev_table.csv` for branch zero +- `src/make_stages_and_catchlist.py`: Reconcile flows and catchments hydroids +- `src/usgs_gage_aggregate.py`: changed streamorder data type from integer to string to better handle missing values in `usgs_gage_unit_setup.py` +- `src/usgs_gage_unit_setup.py`: added new inputs and function to populate `usgs_elev_table.csv` for branch zero using all available gages within the huc (not filtering to a specific branch) +- `src/utils/shared_functions.py`: added two new functions for calibration workflow + - `check_file_age`: check the age of a file (use for flagging potentially outdated input) + - `concat_huc_csv`: concatenate huc csv files to a single dataframe/csv +- `src/utils/shared_variables.py`: defined new SRC calibration threshold variables + - `DOWNSTREAM_THRESHOLD`: distance in km to propogate new roughness values downstream + - `ROUGHNESS_MAX_THRESH`: max allowable adjusted roughness value (void values larger than this) + - `ROUGHNESS_MIN_THRESH`: min allowable adjusted roughness value (void values smaller than this) + +

+ ## v4.0.6.2 - 2022-08-16 - [PR #639](https://github.com/NOAA-OWP/inundation-mapping/pull/639) This file converts USFIMR remote sensed inundation shapefiles into a raster that can be used to compare to the FIM data. It has to be run separately for each shapefile. This addresses [#629]. diff --git a/gms_run_branch.sh b/gms_run_branch.sh index 057e3affc..ecdcb026c 100755 --- a/gms_run_branch.sh +++ b/gms_run_branch.sh @@ -113,6 +113,21 @@ fi source $envFile source $srcDir/bash_functions.env +## CONNECT TO CALIBRATION POSTGRESQL DATABASE (OPTIONAL) ## +if [ "$src_adjust_spatial" = "True" ]; then + if [ ! -f $CALB_DB_KEYS_FILE ]; then + echo "ERROR! - src_adjust_spatial parameter is set to "True" (see parameter file), but the provided calibration database access keys file does not exist: $CALB_DB_KEYS_FILE" + exit 1 + else + source $CALB_DB_KEYS_FILE + echo "Populate PostgrSQL database with benchmark FIM extent points and HUC attributes" + echo "Loading HUC Data" + time ogr2ogr -overwrite -nln hucs -a_srs ESRI:102039 -f PostgreSQL PG:"host=$CALIBRATION_DB_HOST dbname=calibration user=$CALIBRATION_DB_USER_NAME password=$CALIBRATION_DB_PASS" $input_WBD_gdb WBDHU8 + echo "Loading Point Data" + time ogr2ogr -overwrite -f PostgreSQL PG:"host=$CALIBRATION_DB_HOST dbname=calibration user=$CALIBRATION_DB_USER_NAME password=$CALIBRATION_DB_PASS" $fim_obs_pnt_data usgs_nws_benchmark_points -nln points + fi +fi + # default values if [ "$jobLimit" = "" ] ; then jobLimit=$default_max_jobs @@ -187,6 +202,19 @@ echo echo "Processing usgs gage aggregation" python3 $srcDir/usgs_gage_aggregate.py -fim $outputRunDataDir -gms $gms_inputs +## RUN SYNTHETIC RATING CURVE CALIBRATION W/ USGS GAGE RATING CURVES ## +if [ "$src_adjust_usgs" = "True" ]; then + echo -e $startDiv"Performing SRC adjustments using USGS rating curve database"$stopDiv + # Run SRC Optimization routine using USGS rating curve data (WSE and flow @ NWM recur flow thresholds) + time python3 $srcDir/src_adjust_usgs_rating.py -run_dir $outputRunDataDir -usgs_rc $inputDataDir/usgs_gages/usgs_rating_curves.csv -nwm_recur $nwm_recur_file -j $jobLimit +fi + +## RUN SYNTHETIC RATING CURVE CALIBRATION W/ BENCHMARK POINT DATABASE (POSTGRESQL) ## +if [ "$src_adjust_spatial" = "True" ]; then + echo -e $startDiv"Performing SRC adjustments using benchmark point database"$stopDiv + time python3 $srcDir/src_adjust_spatial_obs.py -fim_dir $outputRunDataDir -j $jobLimit +fi + # ------------------- ## GET NON ZERO EXIT CODES ## # Needed in case aggregation fails, we will need the logs diff --git a/src/add_crosswalk.py b/src/add_crosswalk.py index 37ee0a25d..fe64d0f50 100755 --- a/src/add_crosswalk.py +++ b/src/add_crosswalk.py @@ -240,8 +240,23 @@ def add_crosswalk(input_catchments_fileName,input_flows_fileName,input_srcbase_f # make hydroTable output_hydro_table = output_src.loc[:,['HydroID','feature_id','NextDownID','order_','Number of Cells','SurfaceArea (m2)','BedArea (m2)','TopWidth (m)','LENGTHKM','AREASQKM','WettedPerimeter (m)','HydraulicRadius (m)','WetArea (m2)','Volume (m3)','SLOPE','ManningN','Stage','Discharge (m3s-1)']] output_hydro_table.rename(columns={'Stage' : 'stage','Discharge (m3s-1)':'discharge_cms'},inplace=True) + ## Set placeholder variables to be replaced in post-processing (as needed). Create here to ensure consistent column vars output_hydro_table['barc_on'] = False # set barc_on attribute to Fasle (default) --> will be overwritten if BARC module runs + output_hydro_table['raw_discharge_cms'] = output_src['Discharge (m3s-1)'] + output_hydro_table['raw_Volume (m3)'] = output_src['Volume (m3)'] + output_hydro_table['raw_WetArea (m2)'] = output_src['WetArea (m2)'] + output_hydro_table['raw_HydraulicRadius (m)'] = output_src['HydraulicRadius (m)'] output_hydro_table['vmann_on'] = False # set vmann_on attribute to Fasle (default) --> will be overwritten if variable roughness module runs + output_hydro_table['raw_ManningN'] = output_src['ManningN'] + output_hydro_table['vmann_discharge_cms'] = pd.NA + output_hydro_table['vmann_ManningN'] = pd.NA + output_hydro_table['adjust_src_on'] = False + output_hydro_table['last_updated'] = pd.NA + output_hydro_table['submitter'] = pd.NA + output_hydro_table['adjust_ManningN'] = pd.NA + output_hydro_table['obs_source'] = pd.NA + output_hydro_table['default_discharge_cms'] = pd.NA + output_hydro_table['default_ManningN'] = pd.NA if output_hydro_table.HydroID.dtype != 'str': output_hydro_table.HydroID = output_hydro_table.HydroID.astype(str) diff --git a/src/gms/run_by_unit.sh b/src/gms/run_by_unit.sh index 67213acab..c513e8b3d 100755 --- a/src/gms/run_by_unit.sh +++ b/src/gms/run_by_unit.sh @@ -214,18 +214,27 @@ else echo -e $startDiv"Skipping branch zero processing because there are no stream orders being dropped $hucNumber"$stopDiv fi -## CLEANUP BRANCH ZERO OUTPUTS ## -echo -e $startDiv"Cleaning up outputs in branch zero $hucNumber"$stopDiv +## CREATE USGS GAGES FILE +echo -e $startDiv"Assigning USGS gages to branches for $hucNumber"$stopDiv date -u Tstart -$srcDir/gms/outputs_cleanup.py -d $outputCurrentBranchDataDir -l $srcDir/../config/deny_gms_branch_zero.lst -v -b +python3 -m memory_profiler $srcDir/usgs_gage_unit_setup.py -gages $inputDataDir/usgs_gages/usgs_gages.gpkg -nwm $outputHucDataDir/nwm_subset_streams_levelPaths.gpkg -o $outputHucDataDir/usgs_subset_gages.gpkg -huc $hucNumber -ahps $inputDataDir/ahps_sites/nws_lid.gpkg -bzero_id $branch_zero_id -bzero $dropLowStreamOrders Tcount -## CREATE USGS GAGES FILE -echo -e $startDiv"Assigning USGS gages to branches for $hucNumber"$stopDiv +## USGS CROSSWALK ## +if [ -f $outputHucDataDir/usgs_subset_gages_$branch_zero_id.gpkg ]; then + echo -e $startDiv"USGS Crosswalk $hucNumber $branch_zero_id"$stopDiv + date -u + Tstart + python3 $srcDir/usgs_gage_crosswalk.py -gages $outputHucDataDir/usgs_subset_gages_$branch_zero_id.gpkg -flows $outputCurrentBranchDataDir/demDerived_reaches_split_filtered_$branch_zero_id.gpkg -cat $outputCurrentBranchDataDir/gw_catchments_reaches_filtered_addedAttributes_crosswalked_$branch_zero_id.gpkg -dem $outputCurrentBranchDataDir/dem_meters_$branch_zero_id.tif -dem_adj $outputCurrentBranchDataDir/dem_thalwegCond_$branch_zero_id.tif -outtable $outputCurrentBranchDataDir/usgs_elev_table.csv -b $branch_zero_id + Tcount +fi + +## CLEANUP BRANCH ZERO OUTPUTS ## +echo -e $startDiv"Cleaning up outputs in branch zero $hucNumber"$stopDiv date -u Tstart -python3 -m memory_profiler $srcDir/usgs_gage_unit_setup.py -gages $inputDataDir/usgs_gages/usgs_gages.gpkg -nwm $outputHucDataDir/nwm_subset_streams_levelPaths.gpkg -o $outputHucDataDir/usgs_subset_gages.gpkg -huc $hucNumber -ahps $inputDataDir/ahps_sites/nws_lid.gpkg +$srcDir/gms/outputs_cleanup.py -d $outputCurrentBranchDataDir -l $srcDir/../config/deny_gms_branch_zero.lst -v -b Tcount ## REMOVE FILES FROM DENY LIST ## diff --git a/src/make_stages_and_catchlist.py b/src/make_stages_and_catchlist.py index b9210945d..677de1899 100755 --- a/src/make_stages_and_catchlist.py +++ b/src/make_stages_and_catchlist.py @@ -12,13 +12,9 @@ def make_stages_and_catchlist(flows_filename, catchments_filename, stages_filena flows = gpd.read_file(flows_filename) catchments = gpd.read_file(catchments_filename) - - hydroIDs = flows['HydroID'].tolist() - len_of_hydroIDs = len(hydroIDs) - slopes = flows['S0'].tolist() - lengthkm = flows['LengthKm'].tolist() - areasqkm = catchments['areasqkm'].tolist() - + # Reconcile flows and catchments hydroids + flows = flows.merge(catchments[['HydroID']], on='HydroID', how='inner') + catchments = catchments.merge(flows[['HydroID']], on='HydroID', how='inner') stages_max = stages_max + stages_interval stages = np.round(np.arange(stages_min,stages_max,stages_interval),4) diff --git a/src/src_adjust_spatial_obs.py b/src/src_adjust_spatial_obs.py new file mode 100644 index 000000000..abd77ace0 --- /dev/null +++ b/src/src_adjust_spatial_obs.py @@ -0,0 +1,382 @@ +import argparse +import geopandas as gpd +from geopandas.tools import sjoin +import os +import rasterio +import pandas as pd +import numpy as np +import sys +import json +import datetime as dt +from collections import deque +import multiprocessing +from multiprocessing import Pool +from src_roughness_optimization import update_rating_curve +import psycopg2 # python package for connecting to postgres +from dotenv import load_dotenv +import time + +#import variables from .env file +load_dotenv() +CALIBRATION_DB_HOST = os.getenv("CALIBRATION_DB_HOST") +CALIBRATION_DB_USER_NAME = os.getenv("CALIBRATION_DB_USER_NAME") +CALIBRATION_DB_PASS = os.getenv("CALIBRATION_DB_PASS") + +from utils.shared_variables import DOWNSTREAM_THRESHOLD, ROUGHNESS_MIN_THRESH, ROUGHNESS_MAX_THRESH +''' +The script imports a PostgreSQL database containing observed FIM extent points and associated flow data. This script attributes the point data with its hydroid and HAND values before passing a dataframe to the src_roughness_optimization.py workflow. + +Processing +- Define CRS to use for initial geoprocessing and read wbd_path and points_layer. +- Define paths to hydroTable.csv, HAND raster, catchments raster, and synthetic rating curve JSON. +- Clip the points water_edge_df to the huc cathments polygons (for faster processing?) +- Define coords variable to be used in point raster value attribution and use point geometry to determine catchment raster pixel values +- Check that there are valid obs in the water_edge_df (not empty) and convert pandas series to dataframe to pass to update_rating_curve +- Call update_rating_curve() to perform the rating curve calibration. + +Inputs +- points_layer: .gpkg layer containing observed/truth FIM extent points and associated flow value +- fim_directory: fim directory containing individual HUC output dirs +- wbd_path: path the watershed boundary dataset layer (HUC polygon boundaries) +- job_number: number of multi-processing jobs to use +- debug_outputs_option: optional flag to output intermediate files for reviewing/debugging + +Outputs +- water_edge_median_df: dataframe containing "hydroid", "flow", "submitter", "coll_time", "flow_unit", "layer", and median "HAND" value +''' + +def process_points(args): + + ''' + The funciton ingests geodataframe and attributes the point data with its hydroid and HAND values before passing a dataframe to the src_roughness_optimization.py workflow + + Processing + - Extract x,y coordinates from geometry + - Projects the point data to matching CRS for HAND and hydroid rasters + - Samples the hydroid and HAND raster values for each point and stores the values in dataframe + - Calculates the median HAND value for all points by hydroid + ''' + + branch_dir = args[0] + huc = args[1] + branch_id = args[2] + hand_path = args[3] + catchments_path = args[4] + catchments_poly_path = args[5] + water_edge_df = args[6] + htable_path = args[7] + optional_outputs = args[8] + + ## Define coords variable to be used in point raster value attribution. + coords = [(x,y) for x, y in zip(water_edge_df.X, water_edge_df.Y)] + + ## Use point geometry to determine HAND raster pixel values. + hand_src = rasterio.open(hand_path) + hand_crs = hand_src.crs + water_edge_df.to_crs(hand_crs) # Reproject geodataframe to match hand_src. Should be the same, but this is a double check. + water_edge_df['hand'] = [h[0] for h in hand_src.sample(coords)] + hand_src.close() + del hand_src, hand_crs, + + ## Use point geometry to determine catchment raster pixel values. + catchments_src = rasterio.open(catchments_path) + catchments_crs = catchments_src.crs + water_edge_df.to_crs(catchments_crs) + water_edge_df['hydroid'] = [c[0] for c in catchments_src.sample(coords)] + catchments_src.close() + del catchments_src, catchments_crs + + water_edge_df = water_edge_df[(water_edge_df['hydroid'].notnull()) & (water_edge_df['hand'] > 0) & (water_edge_df['hydroid'] > 0)] + + ## Check that there are valid obs in the water_edge_df (not empty) + if water_edge_df.empty: + log_text = 'NOTE --> skipping HUC: ' + str(huc) + ' Branch: ' + str(branch_id) + ': no valid observation points found within the branch catchments' + else: + ## Intermediate output for debugging + if optional_outputs: + branch_debug_pts_out_gpkg = os.path.join(branch_dir, 'export_water_edge_df_' + branch_id + '.gpkg') + water_edge_df.to_file(branch_debug_pts_out_gpkg, driver='GPKG', index=False) + + #print('Processing points for HUC: ' + str(huc) + ' Branch: ' + str(branch_id)) + ## Get median HAND value for appropriate groups. + water_edge_median_ds = water_edge_df.groupby(["hydroid", "flow", "submitter", "coll_time", "flow_unit","layer"])['hand'].median() + + ## Write user_supplied_n_vals to CSV for next step. + pt_n_values_csv = os.path.join(branch_dir, 'user_supplied_n_vals_' + branch_id + '.csv') + water_edge_median_ds.to_csv(pt_n_values_csv) + ## Convert pandas series to dataframe to pass to update_rating_curve + water_edge_median_df = water_edge_median_ds.reset_index() + water_edge_median_df['coll_time'] = water_edge_median_df.coll_time.astype(str) + del water_edge_median_ds + + ## Additional arguments for src_roughness_optimization + source_tag = 'point_obs' # tag to use in source attribute field + merge_prev_adj = True # merge in previous SRC adjustment calculations + + ## Call update_rating_curve() to perform the rating curve calibration. + log_text = update_rating_curve(branch_dir, water_edge_median_df, htable_path, huc, branch_id, catchments_poly_path, optional_outputs, source_tag, merge_prev_adj, DOWNSTREAM_THRESHOLD) + ## Still testing: use code below to print out any exceptions. + ''' + try: + log_text = update_rating_curve(branch_dir, water_edge_median_df, htable_path, huc, catchments_poly_path, optional_outputs, source_tag, merge_prev_adj, DOWNSTREAM_THRESHOLD) + except Exception as e: + print(str(huc) + ' --> ' + str(e)) + log_text = 'ERROR!!!: HUC ' + str(huc) + ' --> ' + str(e) + ''' + return(log_text) + + +def find_points_in_huc(huc_id, conn): + # Point data in the database is already attributed with HUC8 id + ''' + The function queries the PostgreSQL database and returns all points attributed with the input huc id. + + Processing + - Query the PostgreSQL database for points attributed with huc id. + - Reads the filtered database result into a pandas geodataframe + + Inputs + - conn: connection to PostgreSQL db + - huc_id: HUC id to query the db + + Outputs + - water_edge_df: geodataframe with point data + ''' + + huc_pt_query = """SELECT ST_X(P.geom), ST_Y(P.geom), P.submitter, P.flow, P.coll_time, P.flow_unit, P.layer, P.geom + FROM points P + JOIN hucs H ON ST_Contains(H.geom, P.geom) + WHERE H.huc8 = %s """ + + # Need to hard code the CRS to use EPSG:5070 instead of the default ESRI:102039 (gdal pyproj throws an error with crs 102039) + # Appears that EPSG:5070 is functionally equivalent to ESRI:102039: https://gis.stackexchange.com/questions/329123/crs-interpretation-in-qgis + water_edge_df = gpd.GeoDataFrame.from_postgis(huc_pt_query, con=conn, params=[huc_id], crs="EPSG:5070", parse_dates=['coll_time']) + water_edge_df = water_edge_df.drop(columns=['st_x','st_y']) + + return water_edge_df + +def find_hucs_with_points(conn,fim_out_huc_list): + ''' + The function queries the PostgreSQL database and returns a list of all the HUCs that contain calb point data. + + Processing + - Query the PostgreSQL database for all unique huc ids + + Inputs + - conn: connection to PostgreSQL db + + Outputs + - hucs_wpoints: list with all unique huc ids + ''' + + cursor = conn.cursor() + ''' + cursor.execute(""" + SELECT DISTINCT H.huc8 + FROM points P JOIN hucs H ON ST_Contains(H.geom, P.geom); + """) + ''' + cursor.execute("SELECT DISTINCT H.huc8 FROM points P JOIN hucs H ON ST_Contains(H.geom, P.geom) WHERE H.huc8 = ANY(%s);", (fim_out_huc_list,)) + hucs_fetch = cursor.fetchall() # list with tuple with the attributes defined above (need to convert to df?) + hucs_wpoints = [] + for huc in hucs_fetch: + hucs_wpoints.append(huc[0]) + cursor.close() + return hucs_wpoints + +def ingest_points_layer(fim_directory, job_number, debug_outputs_option, log_file): + ''' + The function obtains all points within a given huc, locates the corresponding FIM output files for each huc (confirms all necessary files exist), and then passes a proc list of huc organized data to process_points function. + + Processing + - Query the PostgreSQL database for all unique huc ids that have calb points + - Loop through all HUCs with calb data and locate necessary fim output files to pass to calb workflow + + Inputs + - fim_directory: parent directory of fim ouputs (contains HUC directories) + - job_number: number of multiprocessing jobs to use for processing hucs + - debug_outputs_option: optional flag to output intermediate files + + Outputs + - procs_list: passes multiprocessing list of input args for process_points function input + ''' + conn = connect() # Connect to the PostgreSQL db once + log_file.write('Connected to database via host: ' + str(CALIBRATION_DB_HOST) + '\n') + print("Finding all fim_output hucs that contain calibration points...") + fim_out_huc_list = [ item for item in os.listdir(fim_directory) if os.path.isdir(os.path.join(fim_directory, item)) ] + fim_out_huc_list.remove('logs') + ## Record run time and close log file + run_time_start = dt.datetime.now() + log_file.write('Finding all hucs that contain calibration points...' + '\n') + huc_list_db = find_hucs_with_points(conn,fim_out_huc_list) + run_time_end = dt.datetime.now() + task_run_time = run_time_end - run_time_start + log_file.write('HUC SEARCH TASK RUN TIME: ' + str(task_run_time) + '\n') + print(f"{len(huc_list_db)} hucs found in point database" + '\n') + log_file.write(f"{len(huc_list_db)} hucs found in point database" + '\n') + log_file.write('#########################################################\n') + + ## Ensure HUC id is either HUC8 + huc_list = [] + for huc in huc_list_db: + ## zfill to the appropriate scale to ensure leading zeros are present, if necessary. + if len(huc) == 7: + huc = huc.zfill(8) + if huc not in huc_list: + huc_list.append(huc) + log_file.write(str(huc) + '\n') + + procs_list = [] # Initialize proc list for mulitprocessing. + + #huc_list = ['12040103'] + ## Define paths to relevant HUC HAND data. + for huc in huc_list: + huc_branches_dir = os.path.join(fim_directory, huc,'branches') + water_edge_df = find_points_in_huc(huc, conn).reset_index() + print(f"{len(water_edge_df)} points found in " + str(huc)) + log_file.write(f"{len(water_edge_df)} points found in " + str(huc) + '\n') + + ## Create X and Y location columns by extracting from geometry. + water_edge_df['X'] = water_edge_df['geom'].x + water_edge_df['Y'] = water_edge_df['geom'].y + + ## Check to make sure the HUC directory exists in the current fim_directory + if not os.path.exists(os.path.join(fim_directory, huc)): + log_file.write("FIM Directory for huc: " + str(huc) + " does not exist --> skipping SRC adjustments for this HUC (obs points found)\n") + + ## Intermediate output for debugging + if debug_outputs_option: + huc_debug_pts_out = os.path.join(fim_directory, huc, 'debug_water_edge_df_' + huc + '.csv') + water_edge_df.to_csv(huc_debug_pts_out) + huc_debug_pts_out_gpkg = os.path.join(fim_directory, huc, 'export_water_edge_df_' + huc + '.gpkg') + water_edge_df.to_file(huc_debug_pts_out_gpkg, driver='GPKG', index=False) + + for branch_id in os.listdir(huc_branches_dir): + branch_dir = os.path.join(huc_branches_dir,branch_id) + ## Define paths to HAND raster, catchments raster, and synthetic rating curve JSON. + hand_path = os.path.join(branch_dir, 'rem_zeroed_masked_' + branch_id + '.tif') + catchments_path = os.path.join(branch_dir, 'gw_catchments_reaches_filtered_addedAttributes_' + branch_id + '.tif') + htable_path = os.path.join(branch_dir, 'hydroTable_' + branch_id + '.csv') + catchments_poly_path = os.path.join(branch_dir, 'gw_catchments_reaches_filtered_addedAttributes_crosswalked_' + branch_id + '.gpkg') + + # Check to make sure the fim output files exist. Continue to next iteration if not and warn user. + if not os.path.exists(hand_path): + print("WARNING: HAND grid does not exist (skipping): " + str(huc) + ' - branch-id: ' + str(branch_id)) + log_file.write("WARNING: HAND grid does not exist (skipping): " + str(huc) + ' - branch-id: ' + str(branch_id) + '\n') + elif not os.path.exists(catchments_path): + print("WARNING: Catchments grid does not exist (skipping): " + str(huc) + ' - branch-id: ' + str(branch_id)) + log_file.write("WARNING: Catchments grid does not exist (skipping): " + str(huc) + ' - branch-id: ' + str(branch_id) + '\n') + elif not os.path.exists(htable_path): + print("WARNING: hydroTable does not exist (skipping): " + str(huc) + ' - branch-id: ' + str(branch_id)) + log_file.write("WARNING: hydroTable does not exist (skipping): " + str(huc) + ' - branch-id: ' + str(branch_id) + '\n') + else: + procs_list.append([branch_dir, huc, branch_id, hand_path, catchments_path, catchments_poly_path, water_edge_df, htable_path, debug_outputs_option]) + + with Pool(processes=job_number) as pool: + log_output = pool.map(process_points, procs_list) + log_file.writelines(["%s\n" % item for item in log_output]) + log_file.write('#########################################################\n') + disconnect(conn) # move this to happen at the end of the huc looping + +def connect(): + """ Connect to the PostgreSQL database server """ + + print('Connecting to the PostgreSQL database...') + conn = None + not_connected = True + while not_connected: + try: + + # connect to the PostgreSQL server + conn = psycopg2.connect( + host=CALIBRATION_DB_HOST, + database="calibration", + user=CALIBRATION_DB_USER_NAME, + password=CALIBRATION_DB_PASS) + + # create a cursor + cur = conn.cursor() + + # execute a statement + print('Host name: ' + CALIBRATION_DB_HOST) + print('PostgreSQL database version:') + cur.execute('SELECT version()') + + # display the PostgreSQL database server version + db_version = cur.fetchone() + print(db_version) + + # close the communication with the PostgreSQL + cur.close() + not_connected = False + print("Connected to database\n\n") + except (Exception, psycopg2.DatabaseError) as error: + print("Waiting for database to come online") + time.sleep(5) + + return conn + +def disconnect(conn): + """ Disconnect from the PostgreSQL database server """ + + if conn is not None: + conn.close() + print('Database connection closed.') + +def run_prep(fim_directory,debug_outputs_option,ds_thresh_override,DOWNSTREAM_THRESHOLD,job_number): + assert os.path.isdir(fim_directory), 'ERROR: could not find the input fim_dir location: ' + str(fim_directory) + + available_cores = multiprocessing.cpu_count() + if job_number > available_cores: + job_number = available_cores - 1 + print("Provided job number exceeds the number of available cores. " + str(job_number) + " max jobs will be used instead.") + + if ds_thresh_override != DOWNSTREAM_THRESHOLD: + print('ALERT!! - Using a downstream distance threshold value (' + str(float(ds_thresh_override)) + 'km) different than the default (' + str(DOWNSTREAM_THRESHOLD) + 'km) - interpret results accordingly') + DOWNSTREAM_THRESHOLD = float(ds_thresh_override) + + ## Create output dir for log file + output_dir = os.path.join(fim_directory,"logs","src_optimization") + if not os.path.isdir(output_dir): + os.makedirs(output_dir) + + ## Create a time var to log run time + begin_time = dt.datetime.now() + + ## Create log file for processing records + print('This may take a few minutes...') + sys.__stdout__ = sys.stdout + log_file = open(os.path.join(output_dir,'log_spatial_src_adjust.log'),"w") + log_file.write('#########################################################\n') + log_file.write('Parameter Values:\n' + 'DOWNSTREAM_THRESHOLD= ' + str(DOWNSTREAM_THRESHOLD) + '\n' + 'ROUGHNESS_MIN_THRESH= ' + str( ROUGHNESS_MIN_THRESH) + '\n' + 'ROUGHNESS_MAX_THRESH=' + str(ROUGHNESS_MAX_THRESH) + '\n') + log_file.write('#########################################################\n\n') + log_file.write('START TIME: ' + str(begin_time) + '\n') + + ingest_points_layer(fim_directory, job_number, debug_outputs_option, log_file) + + ## Record run time and close log file + end_time = dt.datetime.now() + log_file.write('END TIME: ' + str(end_time) + '\n') + tot_run_time = end_time - begin_time + log_file.write('TOTAL RUN TIME: ' + str(tot_run_time)) + sys.stdout = sys.__stdout__ + log_file.close() + +if __name__ == '__main__': + ## Parse arguments. + parser = argparse.ArgumentParser(description='Adjusts rating curve given a shapefile containing points of known water boundary.') + #parser.add_argument('-db','--points-layer',help='Path to points layer containing known water boundary locations',required=True) + parser.add_argument('-fim_dir','--fim-directory',help='Parent directory of FIM-required datasets.',required=True) + parser.add_argument('-debug','--extra-outputs',help='OPTIONAL flag: Use this to keep intermediate output files for debugging/testing',default=False,required=False, action='store_true') + parser.add_argument('-dthresh','--downstream-thresh',help='OPTIONAL Override: distance in km to propogate modified roughness values downstream', default=DOWNSTREAM_THRESHOLD,required=False) + parser.add_argument('-j','--job-number',help='OPTIONAL: Number of jobs to use',required=False,default=2) + + ## Assign variables from arguments. + args = vars(parser.parse_args()) + #points_layer = args['points_layer'] + fim_directory = args['fim_directory'] + debug_outputs_option = args['extra_outputs'] + ds_thresh_override = args['downstream_thresh'] + job_number = int(args['job_number']) + + run_prep(fim_directory,debug_outputs_option,ds_thresh_override,DOWNSTREAM_THRESHOLD,job_number) \ No newline at end of file diff --git a/src/src_adjust_usgs_rating.py b/src/src_adjust_usgs_rating.py new file mode 100644 index 000000000..d0df1f5e3 --- /dev/null +++ b/src/src_adjust_usgs_rating.py @@ -0,0 +1,262 @@ +import argparse +import os +import pandas as pd +import sys +import json +import datetime as dt +from pathlib import Path +from collections import deque +import multiprocessing +from multiprocessing import Pool +from utils.shared_functions import check_file_age, concat_huc_csv +from src_roughness_optimization import update_rating_curve +''' +The script ingests a USGS rating curve csv and a NWM flow recurrence interval database. The gage location will be associated to the corresponding hydroID and attributed with the HAND elevation value + +Processing +- Read in USGS rating curve from csv and convert WSE navd88 values to meters +- Read in the aggregate USGS elev table csv from the HUC fim directory (output from usgs_gage_crosswalk.py) +- Filter null entries and convert usgs flow from cfs to cms +- Calculate HAND elevation value for each gage location (NAVD88 elevation - NHD DEM thalweg elevation) +- Read in the NWM recurr csv file and convert flow to cfs +- Calculate the closest SRC discharge value to the NWM flow value +- Create dataframe with crosswalked USGS flow and NWM recurr flow and assign metadata attributes +- Calculate flow difference (variance) to check for large discrepancies btw NWM flow and USGS closest flow +- Log any signifant differences (or negative HAND values) btw the NWM flow value and closest USGS rating flow +- Produce log file +- Call update_rating_curve() to perform the rating curve calibration. + +Inputs +- branch_dir: fim directory containing individual HUC output dirs +- usgs_rc_filepath: USGS rating curve database (produced by rating_curve_get_usgs_curves.py) +- nwm_recurr_filepath: NWM flow recurrence interval dataset +- debug_outputs_option: optional flag to output intermediate files for reviewing/debugging +- job_number: number of multi-processing jobs to use + +Outputs +- water_edge_median_ds: dataframe containing 'location_id','hydroid','feature_id','huc','hand','discharge_cms','nwm_recur_flow_cms','nwm_recur','layer' +''' + +def create_usgs_rating_database(usgs_rc_filepath, usgs_elev_df, nwm_recurr_filepath, log_dir): + start_time = dt.datetime.now() + print('Reading USGS rating curve from csv...') + log_text = 'Processing database for USGS flow/WSE at NWM flow recur intervals...\n' + col_usgs = ["location_id", "flow", "stage", "elevation_navd88"] + usgs_rc_df = pd.read_csv(usgs_rc_filepath, dtype={'location_id': object}, usecols=col_usgs)#, nrows=30000) + print('Duration (read usgs_rc_csv): {}'.format(dt.datetime.now() - start_time)) + + # convert WSE navd88 values to meters + usgs_rc_df['elevation_navd88_m'] = usgs_rc_df['elevation_navd88'] / 3.28084 + + # read in the aggregate USGS elev table csv + start_time = dt.datetime.now() + cross_df = usgs_elev_df[["location_id", "HydroID", "feature_id", "levpa_id", "HUC8", "dem_adj_elevation"]].copy() + cross_df.rename(columns={'dem_adj_elevation':'hand_datum', 'HydroID':'hydroid', 'HUC8':'huc'}, inplace=True) + + # filter null location_id rows from cross_df (removes ahps lide entries that aren't associated with USGS gage) + cross_df = cross_df[cross_df.location_id.notnull()] + + # convert usgs flow from cfs to cms + usgs_rc_df['discharge_cms'] = usgs_rc_df.flow / 35.3147 + usgs_rc_df = usgs_rc_df.drop(columns=["flow"]) + + # merge usgs ratings with crosswalk attributes + usgs_rc_df = usgs_rc_df.merge(cross_df, how='left', on='location_id') + usgs_rc_df = usgs_rc_df[usgs_rc_df['hydroid'].notna()] + + # calculate hand elevation + usgs_rc_df['hand'] = usgs_rc_df['elevation_navd88_m'] - usgs_rc_df['hand_datum'] + usgs_rc_df = usgs_rc_df[['location_id','feature_id','hydroid','levpa_id','huc','hand','discharge_cms']] + usgs_rc_df['feature_id'] = usgs_rc_df['feature_id'].astype(int) + + # read in the NWM recurr csv file + nwm_recur_df = pd.read_csv(nwm_recurr_filepath, dtype={'feature_id': int}) + nwm_recur_df = nwm_recur_df.drop(columns=["Unnamed: 0"]) + nwm_recur_df.rename(columns={'2_0_year_recurrence_flow_17C':'2_0_year','5_0_year_recurrence_flow_17C':'5_0_year','10_0_year_recurrence_flow_17C':'10_0_year','25_0_year_recurrence_flow_17C':'25_0_year','50_0_year_recurrence_flow_17C':'50_0_year','100_0_year_recurrence_flow_17C':'100_0_year'}, inplace=True) + + #convert cfs to cms (x 0.028317) + nwm_recur_df.loc[:, ['2_0_year','5_0_year','10_0_year','25_0_year','50_0_year','100_0_year']] *= 0.028317 + + # merge nwm recurr with usgs_rc + merge_df = usgs_rc_df.merge(nwm_recur_df, how='left', on='feature_id') + + # NWM recurr intervals + recurr_intervals = ("2","5","10","25","50","100") + final_df = pd.DataFrame() # create empty dataframe to append flow interval dataframes + for interval in recurr_intervals: + log_text += ('\n\nProcessing: ' + str(interval) + '-year NWM recurr intervals\n') + print('Processing: ' + str(interval) + '-year NWM recurr intervals') + ## Calculate the closest SRC discharge value to the NWM flow value + merge_df['Q_find'] = (merge_df['discharge_cms'] - merge_df[interval+"_0_year"]).abs() + + ## Check for any missing/null entries in the input SRC + if merge_df['Q_find'].isnull().values.any(): # there may be null values for lake or coastal flow lines (need to set a value to do groupby idxmin below) + log_text += 'HUC: ' + str(merge_df['huc']) + ' : feature_id' + str(merge_df['feature_id']) + ' --> Null values found in "Q_find" calc. These will be filled with 999999 () \n' + ## Fill missing/nan nwm 'Discharge (m3s-1)' values with 999999 to handle later + merge_df['Q_find'] = merge_df['Q_find'].fillna(999999) + if merge_df['hydroid'].isnull().values.any(): + log_text += 'HUC: ' + str(merge_df['huc']) + ' --> Null values found in "hydroid"... \n' + + # Create dataframe with crosswalked USGS flow and NWM recurr flow + calc_df = merge_df.loc[merge_df.groupby(['location_id','levpa_id'])['Q_find'].idxmin()].reset_index(drop=True) # find the index of the Q_1_5_find (closest matching flow) + # Calculate flow difference (variance) to check for large discrepancies btw NWM flow and USGS closest flow + calc_df['check_variance'] = ((calc_df['discharge_cms'] - calc_df[interval+"_0_year"])/calc_df['discharge_cms']).abs() + # Assign new metadata attributes + calc_df['nwm_recur'] = interval+"_0_year" + calc_df['layer'] = '_usgs-gage____' + interval+"-year" + calc_df.rename(columns={interval+"_0_year":'nwm_recur_flow_cms'}, inplace=True) + # Subset calc_df for final output + calc_df = calc_df[['location_id','hydroid','feature_id','levpa_id','huc','hand','discharge_cms','check_variance','nwm_recur_flow_cms','nwm_recur','layer']] + final_df = final_df.append(calc_df, ignore_index=True) + # Log any negative HAND elev values and remove from database + log_text += ('Warning: Negative HAND stage values -->\n') + log_text += (calc_df[calc_df['hand']<0].to_string() +'\n') + final_df = final_df[final_df['hand']>0] + # Log any signifant differences btw the NWM flow value and closest USGS rating flow (this ensures that we consistently sample the USGS rating curves at known intervals - NWM recur flow) + log_text += ('Warning: Large variance (>10%) between NWM flow and closest USGS flow -->\n') + log_text += (calc_df[calc_df['check_variance']>0.1].to_string() +'\n') + final_df = final_df[final_df['check_variance']<0.1] + final_df['submitter'] = 'usgs_rating_wrds_api_' + final_df['location_id'] + # Get datestamp from usgs rating curve file to use as coll_time attribute in hydroTable.csv + datestamp = check_file_age(usgs_rc_filepath) + final_df['coll_time'] = str(datestamp)[:15] + + # Rename attributes (for ingest to update_rating_curve) and output csv with the USGS RC database + final_df.rename(columns={'discharge_cms':'flow'}, inplace=True) + final_df.to_csv(os.path.join(log_dir,"usgs_rc_nwm_recurr.csv"),index=False) + + # Output log text to log file + log_text += ('#########\nTotal entries per USGS gage location -->\n') + loc_id_df = final_df.groupby(['location_id']).size().reset_index(name='count') + log_text += (loc_id_df.to_string() +'\n') + log_text += ('#########\nTotal entries per NWM recur value -->\n') + recur_count_df = final_df.groupby(['nwm_recur']).size().reset_index(name='count') + log_text += (recur_count_df.to_string() +'\n') + log_usgs_db = open(os.path.join(log_dir,'log_usgs_rc_database.log'),"w") + log_usgs_db.write(log_text) + log_usgs_db.close() + return(final_df) + +def branch_proc_list(usgs_df,run_dir,debug_outputs_option,log_file): + huc_list = usgs_df['huc'].tolist() + huc_list = list(set(huc_list)) + procs_list = [] # Initialize list for mulitprocessing. + + # loop through all unique level paths that have a USGS gage + #branch_huc_dict = pd.Series(usgs_df.levpa_id.values,index=usgs_df.huc).to_dict('list') + #branch_huc_dict = usgs_df.set_index('huc').T.to_dict('list') + huc_branch_dict = usgs_df.groupby('huc')['levpa_id'].apply(set).to_dict() + + for huc in huc_branch_dict: + branch_set = huc_branch_dict[huc] + for branch_id in branch_set: + # Define paths to branch HAND data. + # Define paths to HAND raster, catchments raster, and synthetic rating curve JSON. + # Assumes outputs are for HUC8 (not HUC6) + branch_dir = os.path.join(run_dir,huc,'branches',branch_id) + hand_path = os.path.join(branch_dir, 'rem_zeroed_masked_' + branch_id + '.tif') + catchments_path = os.path.join(branch_dir, 'gw_catchments_reaches_filtered_addedAttributes_' + branch_id + '.tif') + catchments_poly_path = os.path.join(branch_dir, 'gw_catchments_reaches_filtered_addedAttributes_crosswalked_' + branch_id + '.gpkg') + htable_path = os.path.join(branch_dir, 'hydroTable_' + branch_id + '.csv') + water_edge_median_ds = usgs_df[(usgs_df['huc']==huc) & (usgs_df['levpa_id']==branch_id)] + + # Check to make sure the fim output files exist. Continue to next iteration if not and warn user. + if not os.path.exists(hand_path): + print("WARNING: HAND grid does not exist (skipping): " + str(huc) + ' - branch-id: ' + str(branch_id)) + log_file.write("WARNING: HAND grid does not exist (skipping): " + str(huc) + ' - branch-id: ' + str(branch_id) + '\n') + elif not os.path.exists(catchments_path): + print("WARNING: Catchments grid does not exist (skipping): " + str(huc) + ' - branch-id: ' + str(branch_id)) + log_file.write("WARNING: Catchments grid does not exist (skipping): " + str(huc) + ' - branch-id: ' + str(branch_id) + '\n') + elif not os.path.exists(htable_path): + print("WARNING: hydroTable does not exist (skipping): " + str(huc) + ' - branch-id: ' + str(branch_id)) + log_file.write("WARNING: hydroTable does not exist (skipping): " + str(huc) + ' - branch-id: ' + str(branch_id) + '\n') + else: + ## Additional arguments for src_roughness_optimization + source_tag = 'usgs_rating' # tag to use in source attribute field + merge_prev_adj = False # merge in previous SRC adjustment calculations + + print('Will perform SRC adjustments for huc: ' + str(huc) + ' - branch-id: ' + str(branch_id)) + procs_list.append([branch_dir, water_edge_median_ds, htable_path, huc, branch_id, catchments_poly_path, debug_outputs_option, source_tag, merge_prev_adj]) + + # multiprocess all available branches + print(f"Calculating new SRCs for {len(procs_list)} branches using {job_number} jobs...") + with Pool(processes=job_number) as pool: + log_output = pool.starmap(update_rating_curve, procs_list) + log_file.writelines(["%s\n" % item for item in log_output]) + # try statement for debugging + # try: + # with Pool(processes=job_number) as pool: + # log_output = pool.starmap(update_rating_curve, procs_list) + # log_file.writelines(["%s\n" % item for item in log_output]) + # except Exception as e: + # print(str(huc) + ' --> ' + ' branch id: ' + str(branch_id) + str(e)) + # log_file.write('ERROR!!!: HUC ' + str(huc) + ' --> ' + ' branch id: ' + str(branch_id) + str(e) + '\n') + +def run_prep(run_dir,usgs_rc_filepath,nwm_recurr_filepath,debug_outputs_option,job_number): + ## Check input args are valid + assert os.path.isdir(run_dir), 'ERROR: could not find the input fim_dir location: ' + str(run_dir) + + ## Create an aggregate dataframe with all usgs_elev_table.csv entries for hucs in fim_dir + print('Reading USGS gage HAND elevation from usgs_elev_table.csv files...') + #usgs_elev_file = os.path.join(branch_dir,'usgs_elev_table.csv') + #usgs_elev_df = pd.read_csv(usgs_elev_file, dtype={'HUC8': object, 'location_id': object, 'feature_id': int}) + csv_name = 'usgs_elev_table.csv' + usgs_elev_df = concat_huc_csv(run_dir,csv_name) + if usgs_elev_df is None: + print('WARNING: usgs_elev_df not created - check that usgs_elev_table.csv files exist in fim_dir!') + elif usgs_elev_df.empty: + print('WARNING: usgs_elev_df is empty - check that usgs_elev_table.csv files exist in fim_dir!') + + available_cores = multiprocessing.cpu_count() + if job_number > available_cores: + job_number = available_cores - 1 + print("Provided job number exceeds the number of available cores. " + str(job_number) + " max jobs will be used instead.") + + ## Create output dir for log and usgs rc database + log_dir = os.path.join(run_dir,"logs","src_optimization") + print("Log file output here: " + str(log_dir)) + if not os.path.isdir(log_dir): + os.makedirs(log_dir) + + print('This may take a few minutes...') + usgs_df = create_usgs_rating_database(usgs_rc_filepath, usgs_elev_df, nwm_recurr_filepath, log_dir) + + ## Create a time var to log run time + begin_time = dt.datetime.now() + # Create log file for processing records + log_file = open(os.path.join(log_dir,'log_usgs_rc_src_adjust.log'),"w") + log_file.write('START TIME: ' + str(begin_time) + '\n') + log_file.write('#########################################################\n\n') + + ## Create huc proc_list for multiprocessing and execute the update_rating_curve function + branch_proc_list(usgs_df,run_dir,debug_outputs_option,log_file) + + ## Record run time and close log file + log_file.write('#########################################################\n\n') + end_time = dt.datetime.now() + log_file.write('END TIME: ' + str(end_time) + '\n') + tot_run_time = end_time - begin_time + log_file.write('TOTAL RUN TIME: ' + str(tot_run_time)) + sys.stdout = sys.__stdout__ + log_file.close() + +if __name__ == '__main__': + ## Parse arguments. + parser = argparse.ArgumentParser(description='Adjusts rating curve with database of USGS rating curve (calculated WSE/flow).') + parser.add_argument('-run_dir','--run-dir',help='Parent directory of FIM run.',required=True) + parser.add_argument('-usgs_rc','--usgs-ratings',help='Path to USGS rating curve csv file',required=True) + parser.add_argument('-nwm_recur','--nwm_recur',help='Path to NWM recur file (multiple NWM flow intervals). NOTE: assumes flow units are cfs!!',required=True) + parser.add_argument('-debug','--extra-outputs',help='Optional flag: Use this to keep intermediate output files for debugging/testing',default=False,required=False, action='store_true') + parser.add_argument('-j','--job-number',help='Number of jobs to use',required=False,default=1) + + ## Assign variables from arguments. + args = vars(parser.parse_args()) + run_dir = args['run_dir'] + usgs_rc_filepath = args['usgs_ratings'] + nwm_recurr_filepath = args['nwm_recur'] + debug_outputs_option = args['extra_outputs'] + job_number = int(args['job_number']) + + ## Prepare/check inputs, create log file, and spin up the proc list + run_prep(run_dir,usgs_rc_filepath,nwm_recurr_filepath,debug_outputs_option,job_number) + diff --git a/src/src_roughness_optimization.py b/src/src_roughness_optimization.py new file mode 100644 index 000000000..d0607de08 --- /dev/null +++ b/src/src_roughness_optimization.py @@ -0,0 +1,376 @@ +import argparse +import geopandas as gpd +from geopandas.tools import sjoin +import os +import rasterio +import pandas as pd +import numpy as np +import sys +import json +import datetime as dt +from collections import deque +import multiprocessing +from multiprocessing import Pool + +from utils.shared_variables import DOWNSTREAM_THRESHOLD, ROUGHNESS_MIN_THRESH, ROUGHNESS_MAX_THRESH + + +def update_rating_curve(fim_directory, water_edge_median_df, htable_path, huc, branch_id, catchments_poly_path, debug_outputs_option, source_tag, merge_prev_adj=False, down_dist_thresh=DOWNSTREAM_THRESHOLD): + ''' + This script ingests a dataframe containing observed data (HAND elevation and flow) and then calculates new SRC roughness values via Manning's equation. The new roughness values are averaged for each HydroID and then progated downstream and a new discharge value is calculated where applicable. + + Processing Steps: + - Read in the hydroTable.csv and check whether it has previously been updated (rename default columns if needed) + - Loop through the user provided point data --> stage/flow dataframe row by row and copy the corresponding htable values for the matching stage->HAND lookup + - Calculate new HydroID roughness values for input obs data using Manning's equation + - Create dataframe to check for erroneous Manning's n values (values set in tools_shared_variables.py: >0.6 or <0.001 --> see input args) + - Create magnitude and ahps column by subsetting the "layer" attribute + - Create df grouped by hydroid with ahps_lid and huc number and then pivot the magnitude column to display n value for each magnitude at each hydroid + - Create df with the most recent collection time entry and submitter attribs + - Cacluate median ManningN to handle cases with multiple hydroid entries and create a df with the median hydroid_ManningN value per feature_id + - Rename the original hydrotable variables to allow new calculations to use the primary var name + - Check for large variabilty in the calculated Manning's N values (for cases with mutliple entries for a singel hydroid) + - Create attributes to traverse the flow network between HydroIDs + - Calculate group_ManningN (mean calb n for consective hydroids) and apply values downsteam to non-calb hydroids (constrained to first Xkm of hydroids - set downstream diststance var as input arg) + - Create the adjust_ManningN column by combining the hydroid_ManningN with the featid_ManningN (use feature_id value if the hydroid is in a feature_id that contains valid hydroid_ManningN value(s)) + - Merge in previous SRC adjustments (where available) for hydroIDs that do not have a new adjusted roughness value + - Update the catchments polygon .gpkg with joined attribute - "src_calibrated" + - Merge the final ManningN dataframe to the original hydroTable + - Create the ManningN column by combining the hydroid_ManningN with the default_ManningN (use modified where available) + - Calculate new discharge_cms with new adjusted ManningN + - Export a new hydroTable.csv and overwrite the previous version and output new src json (overwrite previous) + + Inputs: + - fim_directory: fim directory containing individual HUC output dirs + - water_edge_median_df: dataframe containing observation data (attributes: "hydroid", "flow", "submitter", "coll_time", "flow_unit", "layer", "HAND") + - htable_path: path to the current HUC hydroTable.csv + - huc: string variable for the HUC id # (huc8 or huc6) + - branch_id: string variable for the branch id + - catchments_poly_path: path to the current HUC catchments polygon layer .gpkg + - debug_outputs_option: optional input argument to output additional intermediate data files (csv files with SRC calculations) + - source_tag: input text tag used to specify the type/source of the input obs data used for the SRC adjustments (e.g. usgs_rating or point_obs) + - merge_prev_adj: boolean argument to specify when to merge previous SRC adjustments vs. overwrite (default=False) + - down_dist_thresh: optional input argument to override the env variable that controls the downstream distance new roughness values are applied downstream of locations with valid obs data + + Ouputs: + - output_catchments: same input "catchments_poly_path" .gpkg with appened attributes for SRC adjustments fields + - df_htable: same input "htable_path" --> updated hydroTable.csv with new/modified attributes + - output_src_json: src.json file with new SRC discharge values + + ''' + #print("Processing huc --> " + str(huc)) + log_text = "\nProcessing huc --> " + str(huc) + ' branch id: ' + str(branch_id) + '\n' + log_text += "DOWNSTREAM_THRESHOLD: " + str(down_dist_thresh) + 'km\n' + log_text += "Merge Previous Adj Values: " + str(merge_prev_adj) + '\n' + df_nvalues = water_edge_median_df.copy() + df_nvalues = df_nvalues[ (df_nvalues.hydroid.notnull()) & (df_nvalues.hydroid > 0) ] # remove null entries that do not have a valid hydroid + + ## Read in the hydroTable.csv and check wether it has previously been updated (rename default columns if needed) + df_htable = pd.read_csv(htable_path, dtype={'HUC': object, 'last_updated':object, 'submitter':object, 'obs_source':object}) + df_prev_adj = pd.DataFrame() # initialize empty df for populating/checking later + if 'default_discharge_cms' not in df_htable.columns: # need this column to exist before continuing + df_htable['adjust_src_on'] = False + df_htable['last_updated'] = pd.NA + df_htable['submitter'] = pd.NA + df_htable['adjust_ManningN'] = pd.NA + df_htable['obs_source'] = pd.NA + df_htable['default_discharge_cms'] = pd.NA + df_htable['default_ManningN'] = pd.NA + if df_htable['default_discharge_cms'].isnull().values.any(): # check if there are not valid values in the column (True = no previous calibration outputs) + df_htable['default_discharge_cms'] = df_htable['discharge_cms'].values + df_htable['default_ManningN'] = df_htable['ManningN'].values + + if merge_prev_adj and not df_htable['adjust_ManningN'].isnull().all(): # check if the merge_prev_adj setting is True and there are valid 'adjust_ManningN' values from previous calibration outputs + # Create a subset of hydrotable with previous adjusted SRC attributes + df_prev_adj_htable = df_htable.copy()[['HydroID','submitter','last_updated','adjust_ManningN','obs_source']] + df_prev_adj_htable.rename(columns={'submitter':'submitter_prev','last_updated':'last_updated_prev','adjust_ManningN':'adjust_ManningN_prev','obs_source':'obs_source_prev'}, inplace=True) + df_prev_adj_htable = df_prev_adj_htable.groupby(["HydroID"]).first() + # Only keep previous USGS rating curve adjustments (previous spatial obs adjustments are not retained) + df_prev_adj = df_prev_adj_htable[df_prev_adj_htable['obs_source_prev'].str.contains("usgs_rating", na=False)] + log_text += 'HUC: ' + str(huc) + ' Branch: ' + str(branch_id) + ': found previous hydroTable calibration attributes --> retaining previous calb attributes for blending...\n' + # Delete previous adj columns to prevent duplicate variable issues (if src_roughness_optimization.py was previously applied) + df_htable.drop(['ManningN','discharge_cms','submitter','last_updated','adjust_ManningN','adjust_src_on','obs_source'], axis=1, inplace=True) + df_htable.rename(columns={'default_discharge_cms':'discharge_cms','default_ManningN':'ManningN'}, inplace=True) + + ## loop through the user provided point data --> stage/flow dataframe row by row + for index, row in df_nvalues.iterrows(): + if row.hydroid not in df_htable['HydroID'].values: + print('ERROR: HydroID for calb point was not found in the hydrotable (check hydrotable) for HUC: ' + str(huc) + ' branch id: ' + str(branch_id) + ' hydroid: ' + str(row.hydroid)) + log_text += 'ERROR: HydroID for calb point was not found in the hydrotable (check hydrotable) for HUC: ' + str(huc) + ' branch id: ' + str(branch_id) + ' hydroid: ' + str(row.hydroid) + '\n' + else: + df_htable_hydroid = df_htable[df_htable.HydroID == row.hydroid] # filter htable for entries with matching hydroid + if df_htable_hydroid.empty: + print('ERROR: df_htable_hydroid is empty but expected data: ' + str(huc) + ' branch id: ' + str(branch_id) + ' hydroid: ' + str(row.hydroid)) + log_text += 'ERROR: df_htable_hydroid is empty but expected data: ' + str(huc) + ' branch id: ' + str(branch_id) + ' hydroid: ' + str(row.hydroid) + '\n' + + find_src_stage = df_htable_hydroid.loc[df_htable_hydroid['stage'].sub(row.hand).abs().idxmin()] # find closest matching stage to the user provided HAND value + ## copy the corresponding htable values for the matching stage->HAND lookup + df_nvalues.loc[index,'feature_id'] = find_src_stage.feature_id + df_nvalues.loc[index,'LakeID'] = find_src_stage.LakeID + df_nvalues.loc[index,'NextDownID'] = find_src_stage.NextDownID + df_nvalues.loc[index,'LENGTHKM'] = find_src_stage.LENGTHKM + df_nvalues.loc[index,'src_stage'] = find_src_stage.stage + df_nvalues.loc[index,'ManningN'] = find_src_stage.ManningN + df_nvalues.loc[index,'SLOPE'] = find_src_stage.SLOPE + df_nvalues.loc[index,'HydraulicRadius_m'] = find_src_stage['HydraulicRadius (m)'] + df_nvalues.loc[index,'WetArea_m2'] = find_src_stage['WetArea (m2)'] + df_nvalues.loc[index,'discharge_cms'] = find_src_stage.discharge_cms + + ## mask src values that crosswalk to the SRC zero point (src_stage ~ 0 or discharge <= 0) + print(str(huc) + ' branch id: ' + str(branch_id)) + df_nvalues[['HydraulicRadius_m','WetArea_m2']] = df_nvalues[['HydraulicRadius_m','WetArea_m2']].mask((df_nvalues['src_stage'] <= 0.1) | (df_nvalues['discharge_cms'] <= 0.0), np.nan) + + ## Calculate roughness using Manning's equation + df_nvalues.rename(columns={'ManningN':'default_ManningN','hydroid':'HydroID'}, inplace=True) # rename the previous ManningN column + df_nvalues['hydroid_ManningN'] = df_nvalues['WetArea_m2']* \ + pow(df_nvalues['HydraulicRadius_m'],2.0/3)* \ + pow(df_nvalues['SLOPE'],0.5)/df_nvalues['flow'] + + ## Create dataframe to check for erroneous Manning's n values (values set in tools_shared_variables.py --> >0.6 or <0.001) + df_nvalues['Mann_flag'] = np.where((df_nvalues['hydroid_ManningN'] >= ROUGHNESS_MAX_THRESH) | (df_nvalues['hydroid_ManningN'] <= ROUGHNESS_MIN_THRESH) | (df_nvalues['hydroid_ManningN'].isnull()),'Fail','Pass') + df_mann_flag = df_nvalues[(df_nvalues['Mann_flag'] == 'Fail')][['HydroID','hydroid_ManningN']] + if not df_mann_flag.empty: + log_text += '!!! Flaged Mannings Roughness values below !!!' +'\n' + log_text += df_mann_flag.to_string() + '\n' + + ## Create magnitude and ahps column by subsetting the "layer" attribute + df_nvalues['magnitude'] = df_nvalues['layer'].str.split("_").str[5] + df_nvalues['ahps_lid'] = df_nvalues['layer'].str.split("_").str[1] + df_nvalues['huc'] = str(huc) + df_nvalues.drop(['layer'], axis=1, inplace=True) + + ## Create df grouped by hydroid with ahps_lid and huc number + df_huc_lid = df_nvalues.groupby(["HydroID"]).first()[['ahps_lid','huc']] + df_huc_lid.columns = pd.MultiIndex.from_product([['info'], df_huc_lid.columns]) + + ## pivot the magnitude column to display n value for each magnitude at each hydroid + df_nvalues_mag = df_nvalues.pivot_table(index='HydroID', columns='magnitude', values=['hydroid_ManningN'], aggfunc='mean') # if there are multiple entries per hydroid and magnitude - aggregate using mean + + ## Optional: Export csv with the newly calculated Manning's N values + if debug_outputs_option: + output_calc_n_csv = os.path.join(fim_directory, 'calc_src_n_vals_' + branch_id + '.csv') + df_nvalues.to_csv(output_calc_n_csv,index=False) + + ## filter the modified Manning's n dataframe for values out side allowable range + df_nvalues = df_nvalues[df_nvalues['Mann_flag'] == 'Pass'] + + ## Check that there are valid entries in the calculate roughness df after filtering + if not df_nvalues.empty: + ## Create df with the most recent collection time entry and submitter attribs + df_updated = df_nvalues[['HydroID','coll_time','submitter','ahps_lid']] # subset the dataframe + df_updated = df_updated.sort_values('coll_time').drop_duplicates(['HydroID'],keep='last') # sort by collection time and then drop duplicate HydroIDs (keep most recent coll_time per HydroID) + df_updated.rename(columns={'coll_time':'last_updated'}, inplace=True) + + ## cacluate median ManningN to handle cases with multiple hydroid entries + df_mann_hydroid = df_nvalues.groupby(["HydroID"])[['hydroid_ManningN']].median() + + ## Create a df with the median hydroid_ManningN value per feature_id + #df_mann_featid = df_nvalues.groupby(["feature_id"])[['hydroid_ManningN']].mean() + #df_mann_featid.rename(columns={'hydroid_ManningN':'featid_ManningN'}, inplace=True) + + ## Rename the original hydrotable variables to allow new calculations to use the primary var name + df_htable.rename(columns={'ManningN':'default_ManningN','discharge_cms':'default_discharge_cms'}, inplace=True) + + ## Check for large variabilty in the calculated Manning's N values (for cases with mutliple entries for a singel hydroid) + df_nrange = df_nvalues.groupby('HydroID').agg({'hydroid_ManningN': ['median', 'min', 'max', 'std', 'count']}) + df_nrange['hydroid_ManningN','range'] = df_nrange['hydroid_ManningN','max'] - df_nrange['hydroid_ManningN','min'] + df_nrange = df_nrange.join(df_nvalues_mag, how='outer') # join the df_nvalues_mag containing hydroid_manningn values per flood magnitude category + df_nrange = df_nrange.merge(df_huc_lid, how='outer', on='HydroID') # join the df_huc_lid df to add attributes for lid and huc# + log_text += 'Statistics for Modified Roughness Calcs -->' +'\n' + log_text += df_nrange.to_string() + '\n' + log_text += '----------------------------------------\n' + + ## Optional: Output csv with SRC calc stats + if debug_outputs_option: + output_stats_n_csv = os.path.join(fim_directory, 'stats_src_n_vals_' + branch_id + '.csv') + df_nrange.to_csv(output_stats_n_csv,index=True) + + ## subset the original hydrotable dataframe and subset to one row per HydroID + df_nmerge = df_htable[['HydroID','feature_id','NextDownID','LENGTHKM','LakeID','order_']].drop_duplicates(['HydroID'],keep='first') + + ## Need to check that there are non-lake hydroids in the branch hydrotable (prevents downstream error) + df_htable_check_lakes = df_nmerge.loc[df_nmerge['LakeID'] == -999] + if not df_htable_check_lakes.empty: + + ## Create attributes to traverse the flow network between HydroIDs + df_nmerge = branch_network(df_nmerge) + + ## Merge the newly caluclated ManningN dataframes + df_nmerge = df_nmerge.merge(df_mann_hydroid, how='left', on='HydroID') + df_nmerge = df_nmerge.merge(df_updated, how='left', on='HydroID') + + ## Calculate group_ManningN (mean calb n for consective hydroids) and apply values downsteam to non-calb hydroids (constrained to first Xkm of hydroids - set downstream diststance var as input arg) + df_nmerge = group_manningn_calc(df_nmerge, down_dist_thresh) + + ## Create a df with the median hydroid_ManningN value per feature_id + df_mann_featid = df_nmerge.groupby(["feature_id"])[['hydroid_ManningN']].mean() + df_mann_featid.rename(columns={'hydroid_ManningN':'featid_ManningN'}, inplace=True) + df_mann_featid_attrib = df_nmerge.groupby('feature_id').first() # create a seperate df with attributes to apply to other hydroids that share a featureid + df_mann_featid_attrib = df_mann_featid_attrib[df_mann_featid_attrib['submitter'].notna()][['last_updated','submitter']] + df_nmerge = df_nmerge.merge(df_mann_featid, how='left', on='feature_id').set_index('feature_id') + df_nmerge = df_nmerge.combine_first(df_mann_featid_attrib).reset_index() + + if not df_nmerge['hydroid_ManningN'].isnull().all(): + ## Temp testing filter to only use the hydroid manning n values (drop the featureid and group ManningN variables) + #df_nmerge = df_nmerge.assign(featid_ManningN=np.nan) + #df_nmerge = df_nmerge.assign(group_ManningN=np.nan) + + ## Create the adjust_ManningN column by combining the hydroid_ManningN with the featid_ManningN (use feature_id value if the hydroid is in a feature_id that contains valid hydroid_ManningN value(s)) + conditions = [ (df_nmerge['hydroid_ManningN'].isnull()) & (df_nmerge['featid_ManningN'].notnull()), (df_nmerge['hydroid_ManningN'].isnull()) & (df_nmerge['featid_ManningN'].isnull()) & (df_nmerge['group_ManningN'].notnull()) ] + choices = [ df_nmerge['featid_ManningN'], df_nmerge['group_ManningN'] ] + df_nmerge['adjust_ManningN'] = np.select(conditions, choices, default=df_nmerge['hydroid_ManningN']) + df_nmerge['obs_source'] = np.where(df_nmerge['adjust_ManningN'].notnull(), source_tag, pd.NA) + df_nmerge.drop(['feature_id','NextDownID','LENGTHKM','LakeID','order_'], axis=1, inplace=True) # drop these columns to avoid duplicates where merging with the full hydroTable df + + ## Merge in previous SRC adjustments (where available) for hydroIDs that do not have a new adjusted roughness value + if not df_prev_adj.empty: + df_nmerge = pd.merge(df_nmerge,df_prev_adj, on='HydroID', how='outer') + df_nmerge['submitter'] = np.where((df_nmerge['adjust_ManningN'].isnull() & df_nmerge['adjust_ManningN_prev'].notnull()),df_nmerge['submitter_prev'],df_nmerge['submitter']) + df_nmerge['last_updated'] = np.where((df_nmerge['adjust_ManningN'].isnull() & df_nmerge['adjust_ManningN_prev'].notnull()),df_nmerge['last_updated_prev'],df_nmerge['last_updated']) + df_nmerge['obs_source'] = np.where((df_nmerge['adjust_ManningN'].isnull() & df_nmerge['adjust_ManningN_prev'].notnull()),df_nmerge['obs_source_prev'],df_nmerge['obs_source']) + df_nmerge['adjust_ManningN'] = np.where((df_nmerge['adjust_ManningN'].isnull() & df_nmerge['adjust_ManningN_prev'].notnull()),df_nmerge['adjust_ManningN_prev'],df_nmerge['adjust_ManningN']) + df_nmerge.drop(['submitter_prev','last_updated_prev','adjust_ManningN_prev','obs_source_prev'], axis=1, inplace=True) + + ## Update the catchments polygon .gpkg with joined attribute - "src_calibrated" + if os.path.isfile(catchments_poly_path): + input_catchments = gpd.read_file(catchments_poly_path) + ## Create new "src_calibrated" column for viz query + if 'src_calibrated' in input_catchments.columns: # check if this attribute already exists and drop if needed + input_catchments.drop(['src_calibrated'], axis=1, inplace=True) + df_nmerge['src_calibrated'] = np.where(df_nmerge['adjust_ManningN'].notnull(), 'True', 'False') + output_catchments = input_catchments.merge(df_nmerge[['HydroID','src_calibrated']], how='left', on='HydroID') + output_catchments['src_calibrated'].fillna('False', inplace=True) + output_catchments.to_file(catchments_poly_path,driver="GPKG",index=False) # overwrite the previous layer + df_nmerge.drop(['src_calibrated'], axis=1, inplace=True) + ## Optional ouputs: 1) merge_n_csv csv with all of the calculated n values and 2) a catchments .gpkg with new joined attributes + if debug_outputs_option: + output_merge_n_csv = os.path.join(fim_directory, 'merge_src_n_vals_' + branch_id + '.csv') + df_nmerge.to_csv(output_merge_n_csv,index=False) + ## output new catchments polygon layer with several new attributes appended + if os.path.isfile(catchments_poly_path): + input_catchments = gpd.read_file(catchments_poly_path) + output_catchments_fileName = os.path.join(os.path.split(catchments_poly_path)[0],"gw_catchments_src_adjust_" + str(branch_id) + ".gpkg") + output_catchments = input_catchments.merge(df_nmerge, how='left', on='HydroID') + output_catchments.to_file(output_catchments_fileName,driver="GPKG",index=False) + + ## Merge the final ManningN dataframe to the original hydroTable + df_nmerge.drop(['ahps_lid','start_catch','route_count','branch_id','hydroid_ManningN','featid_ManningN','group_ManningN',], axis=1, inplace=True) # drop these columns to avoid duplicates where merging with the full hydroTable df + df_htable = df_htable.merge(df_nmerge, how='left', on='HydroID') + df_htable['adjust_src_on'] = np.where(df_htable['adjust_ManningN'].notnull(), 'True', 'False') # create true/false column to clearly identify where new roughness values are applied + + ## Create the ManningN column by combining the hydroid_ManningN with the default_ManningN (use modified where available) + df_htable['ManningN'] = np.where(df_htable['adjust_ManningN'].isnull(),df_htable['default_ManningN'],df_htable['adjust_ManningN']) + + ## Calculate new discharge_cms with new adjusted ManningN + df_htable['discharge_cms'] = df_htable['WetArea (m2)']* \ + pow(df_htable['HydraulicRadius (m)'],2.0/3)* \ + pow(df_htable['SLOPE'],0.5)/df_htable['ManningN'] + + ## Replace discharge_cms with 0 or -999 if present in the original discharge (carried over from thalweg notch workaround in SRC post-processing) + df_htable['discharge_cms'].mask(df_htable['default_discharge_cms']==0.0,0.0,inplace=True) + df_htable['discharge_cms'].mask(df_htable['default_discharge_cms']==-999,-999,inplace=True) + + ## Export a new hydroTable.csv and overwrite the previous version + out_htable = os.path.join(fim_directory, 'hydroTable_' + branch_id + '.csv') + df_htable.to_csv(out_htable,index=False) + + else: + print('ALERT!! HUC: ' + str(huc) + ' branch id: ' + str(branch_id) + ' --> no valid hydroid roughness calculations after removing lakeid catchments from consideration') + log_text += 'ALERT!! HUC: ' + str(huc) + ' branch id: ' + str(branch_id) + ' --> no valid hydroid roughness calculations after removing lakeid catchments from consideration\n' + else: + print('WARNING!! HUC: ' + str(huc) + ' branch id: ' + str(branch_id) + ' --> hydrotable is empty after removing lake catchments (will ignore branch)') + log_text += 'ALERT!! HUC: ' + str(huc) + ' branch id: ' + str(branch_id) + ' --> hydrotable is empty after removing lake catchments (will ignore branch)\n' + else: + print('ALERT!! HUC: ' + str(huc) + ' branch id: ' + str(branch_id) + ' --> no valid roughness calculations - please check point data and src calculations to evaluate') + log_text += 'ALERT!! HUC: ' + str(huc) + ' branch id: ' + str(branch_id) + ' --> no valid roughness calculations - please check point data and src calculations to evaluate\n' + + log_text += 'Completed: ' + str(huc) + ' --> branch: ' + str(branch_id) + '\n' + log_text += '#########################################################\n' + print("Completed huc: " + str(huc) + ' --> branch: ' + str(branch_id)) + return(log_text) + +def branch_network(df_input_htable): + df_input_htable = df_input_htable.astype({'NextDownID': 'int64'}) # ensure attribute has consistent format as int + df_input_htable = df_input_htable.loc[df_input_htable['LakeID'] == -999] # remove all hydroids associated with lake/water body (these often have disjoined artifacts in the network) + df_input_htable["start_catch"] = ~df_input_htable['HydroID'].isin(df_input_htable['NextDownID']) # define start catchments as hydroids that are not found in the "NextDownID" attribute for all other hydroids + + df_input_htable.set_index('HydroID',inplace=True,drop=False) # set index to the hydroid + branch_heads = deque(df_input_htable[df_input_htable['start_catch'] == True]['HydroID'].tolist()) # create deque of hydroids to define start points in the while loop + visited = set() # create set to keep track of all hydroids that have been accounted for + branch_count = 0 # start branch id + while branch_heads: + hid = branch_heads.popleft() # pull off left most hydroid from deque of start hydroids + Q = deque(df_input_htable[df_input_htable['HydroID'] == hid]['HydroID'].tolist()) # create a new deque that will be used to populate all relevant downstream hydroids + vert_count = 0; branch_count += 1 + while Q: + q = Q.popleft() + if q not in visited: + df_input_htable.loc[df_input_htable.HydroID==q,'route_count'] = vert_count # assign var with flow order ranking + df_input_htable.loc[df_input_htable.HydroID==q,'branch_id'] = branch_count # assign var with current branch id + vert_count += 1 + visited.add(q) + nextid = df_input_htable.loc[q,'NextDownID'] # find the id for the next downstream hydroid + order = df_input_htable.loc[q,'order_'] # find the streamorder for the current hydroid + + if nextid not in visited and nextid in df_input_htable.HydroID: + check_confluence = (df_input_htable.NextDownID == nextid).sum() > 1 # check if the NextDownID is referenced by more than one hydroid (>1 means this is a confluence) + nextorder = df_input_htable.loc[nextid,'order_'] # find the streamorder for the next downstream hydroid + if nextorder > order and check_confluence == True: # check if the nextdownid streamorder is greater than the current hydroid order and the nextdownid is a confluence (more than 1 upstream hydroid draining to it) + branch_heads.append(nextid) # found a terminal point in the network (append to branch_heads for second pass) + continue # if above conditions are True than stop traversing downstream and move on to next starting hydroid + Q.append(nextid) + df_input_htable.reset_index(drop=True, inplace=True) # reset index (previously using hydroid as index) + df_input_htable.sort_values(['branch_id','route_count'], inplace=True) # sort the dataframe by branch_id and then by route_count (need this ordered to ensure upstream to downstream ranking for each branch) + return(df_input_htable) + +def group_manningn_calc(df_nmerge, down_dist_thresh): + ## Calculate group_ManningN (mean calb n for consective hydroids) and apply values downsteam to non-calb hydroids (constrained to first Xkm of hydroids - set downstream diststance var as input arg + #df_nmerge.sort_values(by=['NextDownID'], inplace=True) + dist_accum = 0; hyid_count = 0; hyid_accum_count = 0; + run_accum_mann = 0; group_ManningN = 0; branch_start = 1 # initialize counter and accumulation variables + lid_count = 0; prev_lid = 'x' + for index, row in df_nmerge.iterrows(): # loop through the df (parse by hydroid) + if int(df_nmerge.loc[index,'branch_id']) != branch_start: # check if start of new branch + dist_accum = 0; hyid_count = 0; hyid_accum_count = 0; # initialize counter vars + run_accum_mann = 0; group_ManningN = 0 # initialize counter vars + branch_start = int(df_nmerge.loc[index,'branch_id']) # reassign the branch_start var to evaluate on next iteration + # use the code below to withold downstream hydroid_ManningN values (use this for downstream evaluation tests) + ''' + lid_count = 0 + if not pd.isna(df_nmerge.loc[index,'ahps_lid']): + if df_nmerge.loc[index,'ahps_lid'] == prev_lid: + lid_count += 1 + if lid_count > 3: # only keep the first 3 HydroID n values (everything else set to null for downstream application) + df_nmerge.loc[index,'hydroid_ManningN'] = np.nan + df_nmerge.loc[index,'featid_ManningN'] = np.nan + else: + lid_count = 1 + prev_lid = df_nmerge.loc[index,'ahps_lid'] + ''' + if np.isnan(df_nmerge.loc[index,'hydroid_ManningN']): # check if the hydroid_ManningN value is nan (indicates a non-calibrated hydroid) + df_nmerge.loc[index,'accum_dist'] = row['LENGTHKM'] + dist_accum # calculate accumulated river distance + dist_accum += row['LENGTHKM'] # add hydroid length to the dist_accum var + hyid_count = 0 # reset the hydroid counter to 0 + df_nmerge.loc[index,'hyid_accum_count'] = hyid_accum_count # output the hydroid accum counter + if dist_accum < down_dist_thresh: # check if the accum distance is less than Xkm downstream from valid hydroid_ManningN group value + if hyid_accum_count > 1: # only apply the group_ManningN if there are 2 or more valid hydorids that contributed to the upstream group_ManningN + df_nmerge.loc[index,'group_ManningN'] = group_ManningN # output the group_ManningN var + else: + run_avg_mann = 0 # reset the running average manningn variable (greater than 10km downstream) + else: # performs the following for hydroids that have a valid hydroid_ManningN value + dist_accum = 0; hyid_count += 1 # initialize vars + df_nmerge.loc[index,'accum_dist'] = 0 # output the accum_dist value (set to 0) + if hyid_count == 1: # checks if this the first in a series of valid hydroid_ManningN values + run_accum_mann = 0; hyid_accum_count = 0 # initialize counter and running accumulated manningN value + group_ManningN = (row['hydroid_ManningN'] + run_accum_mann)/float(hyid_count) # calculate the group_ManningN (NOTE: this will continue to change as more hydroid values are accumulated in the "group" moving downstream) + df_nmerge.loc[index,'group_ManningN'] = group_ManningN # output the group_ManningN var + df_nmerge.loc[index,'hyid_count'] = hyid_count # output the hyid_count var + run_accum_mann += row['hydroid_ManningN'] # add current hydroid manningn value to the running accum mann var + hyid_accum_count += 1 # increase the # of hydroid accum counter + df_nmerge.loc[index,'hyid_accum_count'] = hyid_accum_count # output the hyid_accum_count var + + ## Delete unnecessary intermediate outputs + if 'hyid_count' in df_nmerge.columns: + df_nmerge.drop(['hyid_count','accum_dist','hyid_accum_count'], axis=1, inplace=True) # drop hydroid counter if it exists + #df_nmerge.drop(['accum_dist','hyid_accum_count'], axis=1, inplace=True) # drop accum vars from group calc + return(df_nmerge) \ No newline at end of file diff --git a/src/usgs_gage_aggregate.py b/src/usgs_gage_aggregate.py index f6967c407..24d6d26c4 100644 --- a/src/usgs_gage_aggregate.py +++ b/src/usgs_gage_aggregate.py @@ -21,7 +21,7 @@ def __init__(self, path, limit_branches=[]): 'levpa_id':str, 'dem_elevation':float, 'dem_adj_elevation':float, - 'order_':int, + 'order_':str, 'LakeID':object, 'HUC8':str, 'snap_distance':float} diff --git a/src/usgs_gage_unit_setup.py b/src/usgs_gage_unit_setup.py index 085800e77..93c4095f6 100755 --- a/src/usgs_gage_unit_setup.py +++ b/src/usgs_gage_unit_setup.py @@ -56,6 +56,12 @@ def sort_into_branch(self, nwm_subset_streams_levelPaths): self.gages.feature_id = self.gages.feature_id.astype(int) self.gages = self.gages.merge(nwm_reaches[['feature_id','levpa_id','order_']], on='feature_id', how='left') return self.gages + + def branch_zero(self, bzero_id): + + # note that some gages will not have a valid "order_" attribute (not attributed to a level path in the step before - likely a gage on dropped stream order) + self.gages.levpa_id = bzero_id + return self.gages def write(self, out_name): @@ -98,6 +104,8 @@ def filter_gage_branches(gms_inputs_filename): parser.add_argument('-nwm','--input-nwm-filename', help='NWM stream subset', required=True) parser.add_argument('-o','--output-filename', help='Table to append data', required=True) parser.add_argument('-huc','--huc8-id', help='HUC8 ID (to verify gage location huc)', type=str, required=True) + parser.add_argument('-bzero','--branch-zero-check', help='Check for determining if branch zero is created', type=int, required=True) + parser.add_argument('-bzero_id','--branch-zero-id', help='Branch zero ID value', type=str, required=True) parser.add_argument('-ff','--filter-gms-inputs', help='WARNING: only run this parameter if you know exactly what you are doing', required=False) args = vars(parser.parse_args()) @@ -107,6 +115,8 @@ def filter_gage_branches(gms_inputs_filename): input_nwm_filename = args['input_nwm_filename'] output_filename = args['output_filename'] huc8 = args['huc8_id'] + bzero_check = args['branch_zero_check'] + bzero_id = args['branch_zero_id'] filter_gms_inputs = args['filter_gms_inputs'] if not filter_gms_inputs: @@ -118,6 +128,12 @@ def filter_gage_branches(gms_inputs_filename): usgs_gage_subset.sort_into_branch(input_nwm_filename) usgs_gage_subset.write(output_filename) + # Create seperate output for branch zero + if bzero_check != 0: + output_filename_zero = os.path.splitext(output_filename)[0] + '_' + bzero_id + os.path.splitext(output_filename)[-1] + usgs_gage_subset.branch_zero(bzero_id) + usgs_gage_subset.write(output_filename_zero) + else: ''' This is an easy way to filter gms_inputs so that only branches with gages will run during gms_run_branch.sh. diff --git a/src/utils/shared_functions.py b/src/utils/shared_functions.py index ea464667a..dbf97333b 100644 --- a/src/utils/shared_functions.py +++ b/src/utils/shared_functions.py @@ -2,11 +2,14 @@ import os, inspect from os.path import splitext +from datetime import datetime, timezone import fiona import rasterio import numpy as np +from pathlib import Path from rasterio.warp import calculate_default_transform, reproject, Resampling from pyproj.crs import CRS +import pandas as pd def getDriver(fileName): @@ -320,4 +323,48 @@ def vprint (message, is_verbose, show_caller = False): msg += f" [from : {caller_name}]" print (msg) - +######################################################################## +#Function to check the age of a file (use for flagging potentially outdated input) +######################################################################## +def check_file_age(file): + ''' + Checks if file exists, determines the file age + Returns + ------- + None. + ''' + file = Path(file) + if file.is_file(): + modified_date = datetime.fromtimestamp(file.stat().st_mtime, tz=timezone.utc) + + return modified_date + + +######################################################################## +#Function to concatenate huc csv files to a single dataframe/csv +######################################################################## +def concat_huc_csv(fim_dir,csv_name): + ''' + Checks if huc csv file exist, concatenates contents of csv + Returns + ------- + None. + ''' + + merged_csv = [] + huc_list = os.listdir(fim_dir) + for huc in huc_list: + if huc != 'logs': + csv_file = os.path.join(fim_dir,huc,str(csv_name)) + if Path(csv_file).is_file(): + # Aggregate all of the individual huc elev_tables into one aggregate for accessing all data in one csv + read_csv = pd.read_csv(csv_file, dtype={'HUC8': object, 'location_id': object, 'feature_id': int, 'levpa_id': object}) + # Add huc field to dataframe + read_csv['HUC8'] = huc + merged_csv.append(read_csv) + + # Create and return a concatenated pd dataframe + if merged_csv: + print(f"Creating aggregate csv") + concat_df = pd.concat(merged_csv) + return concat_df \ No newline at end of file diff --git a/src/utils/shared_variables.py b/src/utils/shared_variables.py index d8219b2e4..4e704aa46 100644 --- a/src/utils/shared_variables.py +++ b/src/utils/shared_variables.py @@ -47,6 +47,11 @@ OVERWRITE_NHD = 'OVERWRITE_NHD' OVERWRITE_ALL = 'OVERWRITE_ALL' +# Rating Curve Adjustment (local calibration) variables +DOWNSTREAM_THRESHOLD = 10 # distance in km to propogate new roughness values downstream +ROUGHNESS_MAX_THRESH = 0.6 # max allowable adjusted roughness value (void values larger than this) +ROUGHNESS_MIN_THRESH = 0.001 # min allowable adjusted roughness value (void values smaller than this) + ## Input Paths and Directories # Directories os.environ['src_dir'] = '/foss_fim/src'