diff --git a/CHANGELOG.md b/CHANGELOG.md index 07be1fdb7..164ce1a44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. + +## v3.0.11.0 - 2021-03-22 - [PR #319](https://github.com/NOAA-OWP/cahaba/pull/298) + + Improvements to CatFIM service source data generation. + + ### Changes + - Renamed `generate_categorical_fim.py` to `generate_categorical_fim_mapping.py`. + - Updated the status outputs of the `nws_lid_sites layer` and saved it in the same directory as the `merged catfim_library layer`. + - Additional stability fixes (such as improved compatability with WRDS updates). +### Additions + - Added `generate_categorical_fim.py` to wrap `generate_categorical_fim_flows.py` and `generate_categorical_fim_mapping.py`. + - Create new `nws_lid_sites` shapefile located in same directory as the `catfim_library` shapefile. +

## v3.0.10.1 - 2021-03-24 - [PR #320](https://github.com/NOAA-OWP/cahaba/pull/320) diff --git a/tools/generate_categorical_fim.py b/tools/generate_categorical_fim.py index bec059960..f51bf5aa8 100755 --- a/tools/generate_categorical_fim.py +++ b/tools/generate_categorical_fim.py @@ -1,317 +1,112 @@ #!/usr/bin/env python3 - -import sys -import os -from multiprocessing import Pool +import subprocess import argparse -import traceback -import rasterio +import time +from pathlib import Path import geopandas as gpd import pandas as pd -import shutil -from rasterio.features import shapes -from shapely.geometry.polygon import Polygon -from shapely.geometry.multipolygon import MultiPolygon -from inundation import inundate -sys.path.append('/foss_fim/src') -from utils.shared_variables import PREP_PROJECTION,VIZ_PROJECTION -from utils.shared_functions import getDriver - -INPUTS_DIR = r'/data/inputs' -magnitude_list = ['action', 'minor', 'moderate','major', 'record'] - -# Define necessary variables for inundation() -hucs, hucs_layerName = os.path.join(INPUTS_DIR, 'wbd', 'WBD_National.gpkg'), 'WBDHU8' -mask_type, catchment_poly = 'huc', '' - - -def generate_categorical_fim(fim_run_dir, source_flow_dir, output_cat_fim_dir, number_of_jobs, depthtif, log_file): - - no_data_list = [] - procs_list = [] - - source_flow_dir_list = os.listdir(source_flow_dir) - output_flow_dir_list = os.listdir(fim_run_dir) - - # Log missing hucs - missing_hucs = list(set(source_flow_dir_list) - set(output_flow_dir_list)) - missing_hucs = [huc for huc in missing_hucs if "." not in huc] - if len(missing_hucs) > 0: - f = open(log_file, 'a+') - f.write(f"Missing hucs from output directory: {', '.join(missing_hucs)}\n") - f.close() - - # Loop through matching huc directories in the source_flow directory - matching_hucs = list(set(output_flow_dir_list) & set(source_flow_dir_list)) - for huc in matching_hucs: - - if "." not in huc: - - # Get list of AHPS site directories - ahps_site_dir = os.path.join(source_flow_dir, huc) - ahps_site_dir_list = os.listdir(ahps_site_dir) - - # Map paths to HAND files needed for inundation() - fim_run_huc_dir = os.path.join(fim_run_dir, huc) - rem = os.path.join(fim_run_huc_dir, 'rem_zeroed_masked.tif') - catchments = os.path.join(fim_run_huc_dir, 'gw_catchments_reaches_filtered_addedAttributes.tif') - hydroTable = os.path.join(fim_run_huc_dir, 'hydroTable.csv') - - exit_flag = False # Default to False. - - # Check if necessary data exist; set exit_flag to True if they don't exist - for f in [rem, catchments, hydroTable]: - if not os.path.exists(f): - no_data_list.append(f) - exit_flag = True - - # Log missing data - if exit_flag == True: - f = open(log_file, 'a+') - f.write(f"Missing data for: {fim_run_huc_dir}\n") - f.close() - - # Map path to huc directory inside out output_cat_fim_dir - cat_fim_huc_dir = os.path.join(output_cat_fim_dir, huc) - if not os.path.exists(cat_fim_huc_dir): - os.mkdir(cat_fim_huc_dir) - - # Loop through AHPS sites - for ahps_site in ahps_site_dir_list: - # map parent directory for AHPS source data dir and list AHPS thresholds (act, min, mod, maj) - ahps_site_parent = os.path.join(ahps_site_dir, ahps_site) - thresholds_dir_list = os.listdir(ahps_site_parent) - - # Map parent directory for all inundation output filesoutput files. - cat_fim_huc_ahps_dir = os.path.join(cat_fim_huc_dir, ahps_site) - if not os.path.exists(cat_fim_huc_ahps_dir): - os.mkdir(cat_fim_huc_ahps_dir) - - # Loop through thresholds/magnitudes and define inundation output files paths - for magnitude in thresholds_dir_list: - - if "." not in magnitude: - - magnitude_flows_csv = os.path.join(ahps_site_parent, magnitude, 'ahps_' + ahps_site + '_huc_' + huc + '_flows_' + magnitude + '.csv') - - if os.path.exists(magnitude_flows_csv): - - output_extent_grid = os.path.join(cat_fim_huc_ahps_dir, ahps_site + '_' + magnitude + '_extent.tif') - - if depthtif: - output_depth_grid = os.path.join(cat_fim_huc_ahps_dir, ahps_site + '_' + magnitude + '_depth.tif') - else: - output_depth_grid = None - - # Append necessary variables to list for multiprocessing. - procs_list.append([rem, catchments, catchment_poly, magnitude_flows_csv, huc, hydroTable, output_extent_grid, output_depth_grid, ahps_site, magnitude, log_file]) - - # Initiate multiprocessing - print(f"Running inundation for {len(procs_list)} sites using {number_of_jobs} jobs") - pool = Pool(number_of_jobs) - pool.map(run_inundation, procs_list) - - -def run_inundation(args): - - rem = args[0] - catchments = args[1] - catchment_poly = args[2] - magnitude_flows_csv = args[3] - huc = args[4] - hydroTable = args[5] - output_extent_grid = args[6] - output_depth_grid = args[7] - ahps_site = args[8] - magnitude = args[9] - log_file = args[10] - - try: - inundate(rem,catchments,catchment_poly,hydroTable,magnitude_flows_csv,mask_type,hucs=hucs,hucs_layerName=hucs_layerName, - subset_hucs=huc,num_workers=1,aggregate=False,inundation_raster=output_extent_grid,inundation_polygon=None, - depths=output_depth_grid,out_raster_profile=None,out_vector_profile=None,quiet=True - ) - - except Exception: - # Log errors and their tracebacks - f = open(log_file, 'a+') - f.write(f"{output_extent_grid} - inundation error: {traceback.format_exc()}\n") - f.close() - - -def post_process_cat_fim_for_viz(number_of_jobs, output_cat_fim_dir, nws_lid_attributes_filename, log_file): - - # Create workspace - gpkg_dir = os.path.join(output_cat_fim_dir, 'gpkg') - if not os.path.exists(gpkg_dir): - os.mkdir(gpkg_dir) - - fim_version = os.path.basename(output_cat_fim_dir) - merged_layer = os.path.join(output_cat_fim_dir, 'catfim_library.gpkg') - - if not os.path.exists(merged_layer): # prevents appending to existing output - - huc_ahps_dir_list = os.listdir(output_cat_fim_dir) - skip_list=['errors','logs','gpkg',merged_layer] - - for magnitude in magnitude_list: - - procs_list = [] - - # Loop through all categories - for huc in huc_ahps_dir_list: - - if huc not in skip_list: - - huc_dir = os.path.join(output_cat_fim_dir, huc) - ahps_dir_list = os.listdir(huc_dir) - - # Loop through ahps sites - for ahps_lid in ahps_dir_list: - ahps_lid_dir = os.path.join(huc_dir, ahps_lid) - - extent_grid = os.path.join(ahps_lid_dir, ahps_lid + '_' + magnitude + '_extent_' + huc + '.tif') - - if os.path.exists(extent_grid): - procs_list.append([ahps_lid, extent_grid, gpkg_dir, fim_version, huc, magnitude, nws_lid_attributes_filename]) - - else: - try: - f = open(log_file, 'a+') - f.write(f"Missing layers: {extent_gpkg}\n") - f.close() - except: - pass - - # Multiprocess with instructions - pool = Pool(number_of_jobs) - pool.map(reformat_inundation_maps, procs_list) - - # Merge all layers - print(f"Merging {len(os.listdir(gpkg_dir))} layers...") - - for layer in os.listdir(gpkg_dir): - - diss_extent_filename = os.path.join(gpkg_dir, layer) - - # Open diss_extent - diss_extent = gpd.read_file(diss_extent_filename) - - # Write/append aggregate diss_extent - if os.path.isfile(merged_layer): - diss_extent.to_file(merged_layer,driver=getDriver(merged_layer),index=False, mode='a') - else: - diss_extent.to_file(merged_layer,driver=getDriver(merged_layer),index=False) - - del diss_extent - - shutil.rmtree(gpkg_dir) - - else: - print(f"{merged_layer} already exists.") - - -def reformat_inundation_maps(args): - - try: - lid = args[0] - grid_path = args[1] - gpkg_dir = args[2] - fim_version = args[3] - huc = args[4] - magnitude = args[5] - nws_lid_attributes_filename = args[6] - - # Convert raster to to shapes - with rasterio.open(grid_path) as src: - image = src.read(1) - mask = image > 0 - - # Aggregate shapes - results = ({'properties': {'extent': 1}, 'geometry': s} for i, (s, v) in enumerate(shapes(image, mask=mask,transform=src.transform))) - - # convert list of shapes to polygon - extent_poly = gpd.GeoDataFrame.from_features(list(results), crs=PREP_PROJECTION) - - # Dissolve polygons - extent_poly_diss = extent_poly.dissolve(by='extent') - - # Update attributes - extent_poly_diss = extent_poly_diss.reset_index(drop=True) - extent_poly_diss['ahps_lid'] = lid - extent_poly_diss['magnitude'] = magnitude - extent_poly_diss['version'] = fim_version - extent_poly_diss['huc'] = huc - - # Project to Web Mercator - extent_poly = extent_poly.to_crs(VIZ_PROJECTION) - - # Join attributes - nws_lid_attributes_table = pd.read_csv(nws_lid_attributes_filename, dtype={'huc':str}) - nws_lid_attributes_table = nws_lid_attributes_table.loc[(nws_lid_attributes_table.magnitude==magnitude) & (nws_lid_attributes_table.nws_lid==lid)] - - - extent_poly_diss = extent_poly_diss.merge(nws_lid_attributes_table, left_on=['ahps_lid','magnitude','huc'], right_on=['nws_lid','magnitude','huc']) - - extent_poly_diss = extent_poly_diss.drop(columns='nws_lid') - - # Save dissolved multipolygon - handle = os.path.split(grid_path)[1].replace('.tif', '') - - diss_extent_filename = os.path.join(gpkg_dir, handle + "_dissolved.gpkg") - - extent_poly_diss["geometry"] = [MultiPolygon([feature]) if type(feature) == Polygon else feature for feature in extent_poly_diss["geometry"]] - - if not extent_poly_diss.empty: - - extent_poly_diss.to_file(diss_extent_filename,driver=getDriver(diss_extent_filename),index=False) - - except Exception as e: - # Log and clean out the gdb so it's not merged in later - try: - f = open(log_file, 'a+') - f.write(str(diss_extent_filename) + " - dissolve error: " + str(e)) - f.close() - except: - pass - - +from datetime import date + +def update_mapping_status(output_mapping_dir, output_flows_dir): + ''' + Updates the status for nws_lids from the flows subdirectory. Status + is updated for sites where the inundation.py routine was not able to + produce inundation for the supplied flow files. It is assumed that if + an error occured in inundation.py that all flow files for a given site + experienced the error as they all would have the same nwm segments. + + Parameters + ---------- + output_mapping_dir : STR + Path to the output directory of all inundation maps. + output_flows_dir : STR + Path to the directory containing all flows. + + Returns + ------- + None. + + ''' + #Find all LIDs with empty mapping output folders + subdirs = [str(i) for i in Path(output_mapping_dir).rglob('**/*') if i.is_dir()] + empty_nws_lids = [Path(directory).name for directory in subdirs if not list(Path(directory).iterdir())] + + #Write list of empty nws_lids to DataFrame, these are sites that failed in inundation.py + mapping_df = pd.DataFrame({'nws_lid':empty_nws_lids}) + mapping_df['did_it_map'] = 'no' + mapping_df['map_status'] = ' and all categories failed to map' + + #Import shapefile output from flows creation + shapefile = Path(output_flows_dir)/'nws_lid_flows_sites.shp' + flows_df = gpd.read_file(shapefile) + + #Join failed sites to flows df + flows_df = flows_df.merge(mapping_df, how = 'left', on = 'nws_lid') + + #Switch mapped column to no for failed sites and update status + flows_df.loc[flows_df['did_it_map'] == 'no', 'mapped'] = 'no' + flows_df.loc[flows_df['did_it_map']=='no','status'] = flows_df['status'] + flows_df['map_status'] + + #Perform pass for HUCs where mapping was skipped due to missing data. + flows_hucs = [i.stem for i in Path(output_flows_dir).iterdir() if i.is_dir()] + mapping_hucs = [i.stem for i in Path(output_mapping_dir).iterdir() if i.is_dir()] + missing_mapping_hucs = list(set(flows_hucs) - set(mapping_hucs)) + #Update status for nws_lid in missing hucs and change mapped attribute to 'no' + flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'status'] = flows_df['status'] + ' and all categories failed to map because missing HUC information' + flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'mapped'] = 'no' + + #Clean up GeoDataFrame and rename columns for consistency. + flows_df = flows_df.drop(columns = ['did_it_map','map_status']) + flows_df = flows_df.rename(columns = {'nws_lid':'ahps_lid'}) + + #Write out to file + nws_lid_path = Path(output_mapping_dir) / 'nws_lid_sites.shp' + flows_df.to_file(nws_lid_path) + if __name__ == '__main__': - - # Parse arguments - parser = argparse.ArgumentParser(description='Categorical inundation mapping for FOSS FIM.') - parser.add_argument('-r','--fim-run-dir',help='Name of directory containing outputs of fim_run.sh',required=True) - parser.add_argument('-s', '--source-flow-dir',help='Path to directory containing flow CSVs to use to generate categorical FIM.',required=True, default="") - parser.add_argument('-o', '--output-cat-fim-dir',help='Path to directory where categorical FIM outputs will be written.',required=True, default="") - parser.add_argument('-j','--number-of-jobs',help='Number of processes to use. Default is 1.',required=False, default="1",type=int) - parser.add_argument('-depthtif','--write-depth-tiff',help='Using this option will write depth TIFFs.',required=False, action='store_true') - + + #Parse arguments + parser = argparse.ArgumentParser(description = 'Run Categorical FIM') + parser.add_argument('-f','--fim_version',help='Name of directory containing outputs of fim_run.sh',required=True) + parser.add_argument('-j','--number_of_jobs',help='Number of processes to use. Default is 1.',required=False, default="1",type=int) args = vars(parser.parse_args()) - - fim_run_dir = args['fim_run_dir'] - source_flow_dir = args['source_flow_dir'] - output_cat_fim_dir = args['output_cat_fim_dir'] - number_of_jobs = int(args['number_of_jobs']) - depthtif = args['write_depth_tiff'] - - - # Create output directory - if not os.path.exists(output_cat_fim_dir): - os.mkdir(output_cat_fim_dir) - - # Create log directory - log_dir = os.path.join(output_cat_fim_dir, 'logs') - if not os.path.exists(log_dir): - os.mkdir(log_dir) - - # Create error log path - log_file = os.path.join(log_dir, 'errors.log') - - # Map path to points with attributes - nws_lid_attributes_filename = os.path.join(source_flow_dir, 'nws_lid_attributes.csv') - - print("Generating Categorical FIM") - generate_categorical_fim(fim_run_dir, source_flow_dir, output_cat_fim_dir, number_of_jobs, depthtif,log_file) - - print("Aggregating Categorical FIM") - post_process_cat_fim_for_viz(number_of_jobs, output_cat_fim_dir,nws_lid_attributes_filename,log_file) + + #Get arguments + fim_version = args['fim_version'] + number_of_jobs = args['number_of_jobs'] + + #################################################################### + #Define default arguments. Modify these if necessary. + today = date.today().strftime('%m%d%Y') + fim_run_dir = Path(f'/data/previous_fim/{fim_version}') + output_flows_dir = Path(f'/data/catfim/{fim_version}/{today}/flows') + output_mapping_dir = Path(f'/data/catfim/{fim_version}/{today}/mapping') + nwm_us_search = '10' + nwm_ds_search = '10' + write_depth_tiff = False + #################################################################### + + #################################################################### + #Run CatFIM scripts in sequence + #################################################################### + #Generate CatFIM flow files. + print('Creating flow files') + start = time.time() + subprocess.call(['python3','generate_categorical_fim_flows.py', '-w' , str(output_flows_dir), '-u', nwm_us_search, '-d', nwm_ds_search]) + end = time.time() + elapsed_time = (end-start)/60 + print(f'Finished creating flow files in {elapsed_time} minutes') + + #Generate CatFIM mapping. + print('Begin mapping') + start = time.time() + subprocess.call(['python3','generate_categorical_fim_mapping.py', '-r' , str(fim_run_dir), '-s', str(output_flows_dir), '-o', str(output_mapping_dir), '-j', str(number_of_jobs)]) + end = time.time() + elapsed_time = (end-start)/60 + print(f'Finished mapping in {elapsed_time} minutes') + + #Updating Mapping Status + print('Updating mapping status') + update_mapping_status(str(output_mapping_dir), str(output_flows_dir)) + + \ No newline at end of file diff --git a/tools/generate_categorical_fim_flows.py b/tools/generate_categorical_fim_flows.py index 562b3e8e5..ca57e089c 100755 --- a/tools/generate_categorical_fim_flows.py +++ b/tools/generate_categorical_fim_flows.py @@ -11,13 +11,15 @@ sys.path.append('/foss_fim/src') from utils.shared_variables import PREP_PROJECTION,VIZ_PROJECTION -load_dotenv() -#import variables from .env file -API_BASE_URL = os.getenv("API_BASE_URL") -EVALUATED_SITES_CSV = os.getenv("EVALUATED_SITES_CSV") -WBD_LAYER = os.getenv("WBD_LAYER") +def get_env_paths(): + load_dotenv() + #import variables from .env file + API_BASE_URL = os.getenv("API_BASE_URL") + EVALUATED_SITES_CSV = os.getenv("EVALUATED_SITES_CSV") + WBD_LAYER = os.getenv("WBD_LAYER") + return API_BASE_URL, EVALUATED_SITES_CSV, WBD_LAYER -def static_flow_lids(workspace, nwm_us_search, nwm_ds_search): +def generate_catfim_flows(workspace, nwm_us_search, nwm_ds_search): ''' This will create static flow files for all nws_lids and save to the workspace directory with the following format: @@ -45,19 +47,20 @@ def static_flow_lids(workspace, nwm_us_search, nwm_ds_search): ------- None. - ''' + ''' + all_start = time.time() #Define workspace and wbd_path as a pathlib Path. Convert search distances to integer. workspace = Path(workspace) nwm_us_search = int(nwm_us_search) nwm_ds_search = int(nwm_ds_search) metadata_url = f'{API_BASE_URL}/metadata' - threshold_url = f'{API_BASE_URL}/threshold' + threshold_url = f'{API_BASE_URL}/nws_threshold' ################################################################### + #Create workspace - workspace.mkdir(exist_ok = True) + workspace.mkdir(parents=True,exist_ok = True) - #Return dictionary of huc (key) and sublist of ahps(value) as well as geodataframe of sites. print('Retrieving metadata...') #Get metadata for 'CONUS' conus_list, conus_dataframe = get_metadata(metadata_url, select_by = 'nws_lid', selector = ['all'], must_include = 'nws_data.rfc_forecast_point', upstream_trace_distance = nwm_us_search, downstream_trace_distance = nwm_ds_search ) @@ -67,42 +70,41 @@ def static_flow_lids(workspace, nwm_us_search, nwm_ds_search): #Append the dataframes and lists all_lists = conus_list + islands_list - all_dataframe = conus_dataframe.append(islands_dataframe) print('Determining HUC using WBD layer...') - #Assign FIM HUC to GeoDataFrame and export to shapefile all candidate sites. - agg_start = time.time() + #Assign HUCs to all sites using a spatial join of the FIM 3 HUC layer. + #Get a dictionary of hucs (key) and sites (values) as well as a GeoDataFrame + #of all sites used later in script. huc_dictionary, out_gdf = aggregate_wbd_hucs(metadata_list = all_lists, wbd_huc8_path = WBD_LAYER) - viz_out_gdf = out_gdf.to_crs(VIZ_PROJECTION) - viz_out_gdf.to_file(workspace / f'candidate_sites.shp') - agg_end = time.time() - print(f'agg time is {(agg_end - agg_start)/60} minutes') + #Get all possible mainstem segments print('Getting list of mainstem segments') #Import list of evaluated sites list_of_sites = pd.read_csv(EVALUATED_SITES_CSV)['Total_List'].to_list() - #The entire routine to get mainstems is harcoded in this function. + #The entire routine to get mainstems is hardcoded in this function. ms_segs = mainstem_nwm_segs(metadata_url, list_of_sites) - #Loop through each huc unit + #Loop through each huc unit, first define message variable and flood categories. all_messages = [] + flood_categories = ['action', 'minor', 'moderate', 'major', 'record'] for huc in huc_dictionary: print(f'Iterating through {huc}') #Get list of nws_lids nws_lids = huc_dictionary[huc] #Loop through each lid in list to create flow file for lid in nws_lids: - #In some instances the lid is not assigned a name, skip over these. - if not isinstance(lid,str): - print(f'{lid} is {type(lid)}') - continue #Convert lid to lower case lid = lid.lower() #Get stages and flows for each threshold from the WRDS API. Priority given to USGS calculated flows. - stages, flows = get_thresholds(threshold_url = threshold_url, location_ids = lid, physical_element = 'all', threshold = 'all', bypass_source_flag = False) - #If stages/flows don't exist write message and exit out. - if not (stages and flows): - message = f'{lid} no thresholds' + stages, flows = get_thresholds(threshold_url = threshold_url, select_by = 'nws_lid', selector = lid, threshold = 'all') + #Check if stages are supplied, if not write message and exit. + if all(stages.get(category, None)==None for category in flood_categories): + message = f'{lid}:missing threshold stages' + all_messages.append(message) + continue + #Check if calculated flows are supplied, if not write message and exit. + if all(flows.get(category, None) == None for category in flood_categories): + message = f'{lid}:missing calculated flows' all_messages.append(message) continue @@ -116,11 +118,11 @@ def static_flow_lids(workspace, nwm_us_search, nwm_ds_search): #if no segments, write message and exit out if not segments: print(f'{lid} no segments') - message = f'{lid} no segments' + message = f'{lid}:missing nwm segments' all_messages.append(message) continue #For each flood category - for category in ['action', 'minor', 'moderate', 'major', 'record']: + for category in flood_categories: #Get the flow flow = flows[category] #If there is a valid flow value, write a flow file. @@ -135,102 +137,99 @@ def static_flow_lids(workspace, nwm_us_search, nwm_ds_search): #Write flow file to file flow_info.to_csv(output_file, index = False) else: - message = f'{lid}_{category}_no flow' + message = f'{lid}:{category} is missing calculated flow' all_messages.append(message) - #This section will produce a point file of the LID location + #Get various attributes of the site. - lat = float(metadata['usgs_data']['latitude']) - lon = float(metadata['usgs_data']['longitude']) + lat = float(metadata['usgs_preferred']['latitude']) + lon = float(metadata['usgs_preferred']['longitude']) wfo = metadata['nws_data']['wfo'] rfc = metadata['nws_data']['rfc'] state = metadata['nws_data']['state'] county = metadata['nws_data']['county'] name = metadata['nws_data']['name'] - q_act = flows['action'] - q_min = flows['minor'] - q_mod = flows['moderate'] - q_maj = flows['major'] - q_rec = flows['record'] flow_units = flows['units'] flow_source = flows['source'] - s_act = stages['action'] - s_min = stages['minor'] - s_mod = stages['moderate'] - s_maj = stages['major'] - s_rec = stages['record'] stage_units = stages['units'] stage_source = stages['source'] wrds_timestamp = stages['wrds_timestamp'] - #Create a DataFrame using the collected attributes - df = pd.DataFrame({'nws_lid': [lid], 'name':name, 'WFO': wfo, 'rfc':rfc, 'huc':[huc], 'state':state, 'county':county, 'q_act':q_act, 'q_min':q_min, 'q_mod':q_mod, 'q_maj':q_maj, 'q_rec':q_rec, 'q_uni':flow_units, 'q_src':flow_source, 'stage_act':s_act, 'stage_min':s_min, 'stage_mod':s_mod, 'stage_maj':s_maj, 'stage_rec':s_rec, 'stage_uni':stage_units, 's_src':stage_source, 'wrds_time':wrds_timestamp, 'lat':[lat], 'lon':[lon]}) - #Round stages and flows to nearest hundredth - df = df.round({'q_act':2,'q_min':2,'q_mod':2,'q_maj':2,'q_rec':2,'stage_act':2,'stage_min':2,'stage_mod':2,'stage_maj':2,'stage_rec':2}) - - #Create a geodataframe using usgs lat/lon property from WRDS then reproject to WGS84. - #Define EPSG codes for possible usgs latlon datum names (NAD83WGS84 assigned NAD83) - crs_lookup ={'NAD27':'EPSG:4267', 'NAD83':'EPSG:4269', 'NAD83WGS84': 'EPSG:4269'} - #Get horizontal datum (from dataframe) and assign appropriate EPSG code, assume NAD83 if not assigned. - h_datum = metadata['usgs_data']['latlon_datum_name'] - src_crs = crs_lookup.get(h_datum, 'EPSG:4269') - gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df['lon'], df['lat']), crs = src_crs) - #Reproject to VIZ_PROJECTION - viz_gdf = gdf.to_csv(VIZ_PROJECTION) + nrldb_timestamp = metadata['nrldb_timestamp'] + nwis_timestamp = metadata['nwis_timestamp'] - #Create a csv with same info as shapefile + #Create a csv with same information as shapefile but with each threshold as new record. csv_df = pd.DataFrame() - for threshold in ['action', 'minor', 'moderate', 'major', 'record']: - line_df = pd.DataFrame({'nws_lid': [lid], 'name':name, 'WFO': wfo, 'rfc':rfc, 'huc':[huc], 'state':state, 'county':county, 'magnitude': threshold, 'q':flows[threshold], 'q_uni':flows['units'], 'q_src':flow_source, 'stage':stages[threshold], 'stage_uni':stages['units'], 's_src':stage_source, 'wrds_time':wrds_timestamp, 'lat':[lat], 'lon':[lon]}) + for threshold in flood_categories: + line_df = pd.DataFrame({'nws_lid': [lid], 'name':name, 'WFO': wfo, 'rfc':rfc, 'huc':[huc], 'state':state, 'county':county, 'magnitude': threshold, 'q':flows[threshold], 'q_uni':flows['units'], 'q_src':flow_source, 'stage':stages[threshold], 'stage_uni':stages['units'], 's_src':stage_source, 'wrds_time':wrds_timestamp, 'nrldb_time':nrldb_timestamp,'nwis_time':nwis_timestamp, 'lat':[lat], 'lon':[lon]}) csv_df = csv_df.append(line_df) #Round flow and stage columns to 2 decimal places. - csv = csv_df.round({'q':2,'stage':2}) + csv_df = csv_df.round({'q':2,'stage':2}) #If a site folder exists (ie a flow file was written) save files containing site attributes. - try: - #Save GeoDataFrame to shapefile format and export csv containing attributes - output_dir = workspace / huc / lid - viz_gdf.to_file(output_dir / f'{lid}_location.shp' ) + output_dir = workspace / huc / lid + if output_dir.exists(): + #Export DataFrame to csv containing attributes csv_df.to_csv(output_dir / f'{lid}_attributes.csv', index = False) - except: - print(f'{lid} missing all flows') - message = f'{lid} missing all flows' + else: + message = f'{lid}:missing all calculated flows' all_messages.append(message) - #Write out messages to file - messages_df = pd.DataFrame(all_messages, columns = ['message']) - messages_df.to_csv(workspace / f'all_messages.csv', index = False) - - #Recursively find all location shapefiles - locations_files = list(workspace.rglob('*_location.shp')) - spatial_layers = gpd.GeoDataFrame() - #Append all shapefile info to a geodataframe - for location in locations_files: - location_gdf = gpd.read_file(location) - spatial_layers = spatial_layers.append(location_gdf) - #Write appended spatial data to disk. - output_file = workspace /'all_mapped_ahps.shp' - spatial_layers.to_file(output_file) - - #Recursively find all *_info csv files and append + + print('wrapping up...') + #Recursively find all *_attributes csv files and append csv_files = list(workspace.rglob('*_attributes.csv')) all_csv_df = pd.DataFrame() for csv in csv_files: + #Huc has to be read in as string to preserve leading zeros. temp_df = pd.read_csv(csv, dtype={'huc':str}) all_csv_df = all_csv_df.append(temp_df, ignore_index = True) - #Write appended _info.csvs to file - all_info_csv = workspace / 'nws_lid_attributes.csv' - all_csv_df.to_csv(all_info_csv, index = False) + #Write to file + all_csv_df.to_csv(workspace / 'nws_lid_attributes.csv', index = False) + + #This section populates a shapefile of all potential sites and details + #whether it was mapped or not (mapped field) and if not, why (status field). + + #Preprocess the out_gdf GeoDataFrame. Reproject and reformat fields. + viz_out_gdf = out_gdf.to_crs(VIZ_PROJECTION) + viz_out_gdf.rename(columns = {'identifiers_nwm_feature_id': 'nwm_seg', 'identifiers_nws_lid':'nws_lid', 'identifiers_usgs_site_code':'usgs_gage'}, inplace = True) + viz_out_gdf['nws_lid'] = viz_out_gdf['nws_lid'].str.lower() + + #Using list of csv_files, populate DataFrame of all nws_lids that had + #a flow file produced and denote with "mapped" column. + nws_lids = [file.stem.split('_attributes')[0] for file in csv_files] + lids_df = pd.DataFrame(nws_lids, columns = ['nws_lid']) + lids_df['mapped'] = 'yes' + + #Identify what lids were mapped by merging with lids_df. Populate + #'mapped' column with 'No' if sites did not map. + viz_out_gdf = viz_out_gdf.merge(lids_df, how = 'left', on = 'nws_lid') + viz_out_gdf['mapped'] = viz_out_gdf['mapped'].fillna('no') + + #Write messages to DataFrame, split into columns, aggregate messages. + messages_df = pd.DataFrame(all_messages, columns = ['message']) + messages_df = messages_df['message'].str.split(':', n = 1, expand = True).rename(columns={0:'nws_lid', 1:'status'}) + status_df = messages_df.groupby(['nws_lid'])['status'].apply(', '.join).reset_index() + + #Join messages to populate status field to candidate sites. Assign + #status for null fields. + viz_out_gdf = viz_out_gdf.merge(status_df, how = 'left', on = 'nws_lid') + viz_out_gdf['status'] = viz_out_gdf['status'].fillna('all calculated flows available') + + #Filter out columns and write out to file + viz_out_gdf = viz_out_gdf.filter(['nws_lid','usgs_gage','nwm_seg','HUC8','mapped','status','geometry']) + viz_out_gdf.to_file(workspace /'nws_lid_flows_sites.shp') + + #time operation all_end = time.time() - print(f'total time is {(all_end - all_start)/60} minutes') + print(f'total time is {round((all_end - all_start)/60),1} minutes') - if __name__ == '__main__': #Parse arguments parser = argparse.ArgumentParser(description = 'Create forecast files for all nws_lid sites') parser.add_argument('-w', '--workspace', help = 'Workspace where all data will be stored.', required = True) parser.add_argument('-u', '--nwm_us_search', help = 'Walk upstream on NWM network this many miles', required = True) parser.add_argument('-d', '--nwm_ds_search', help = 'Walk downstream on NWM network this many miles', required = True) - #Extract to dictionary and assign to variables. args = vars(parser.parse_args()) - #Run create_flow_forecast_file - static_flow_lids(**args) + #Run get_env_paths and static_flow_lids + API_BASE_URL, EVALUATED_SITES_CSV, WBD_LAYER = get_env_paths() + generate_catfim_flows(**args) diff --git a/tools/generate_categorical_fim_mapping.py b/tools/generate_categorical_fim_mapping.py new file mode 100755 index 000000000..3d591b989 --- /dev/null +++ b/tools/generate_categorical_fim_mapping.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 + +import sys +import os +from multiprocessing import Pool +import argparse +import traceback +import rasterio +import geopandas as gpd +import pandas as pd +import shutil +from rasterio.features import shapes +from shapely.geometry.polygon import Polygon +from shapely.geometry.multipolygon import MultiPolygon +from inundation import inundate +sys.path.append('/foss_fim/src') +from utils.shared_variables import PREP_PROJECTION,VIZ_PROJECTION +from utils.shared_functions import getDriver + +INPUTS_DIR = r'/data/inputs' +magnitude_list = ['action', 'minor', 'moderate','major', 'record'] + +# Define necessary variables for inundation() +hucs, hucs_layerName = os.path.join(INPUTS_DIR, 'wbd', 'WBD_National.gpkg'), 'WBDHU8' +mask_type, catchment_poly = 'huc', '' + + +def generate_categorical_fim(fim_run_dir, source_flow_dir, output_cat_fim_dir, number_of_jobs, depthtif, log_file): + + no_data_list = [] + procs_list = [] + + source_flow_dir_list = os.listdir(source_flow_dir) + output_flow_dir_list = os.listdir(fim_run_dir) + + # Log missing hucs + missing_hucs = list(set(source_flow_dir_list) - set(output_flow_dir_list)) + missing_hucs = [huc for huc in missing_hucs if "." not in huc] + if len(missing_hucs) > 0: + f = open(log_file, 'a+') + f.write(f"Missing hucs from output directory: {', '.join(missing_hucs)}\n") + f.close() + + # Loop through matching huc directories in the source_flow directory + matching_hucs = list(set(output_flow_dir_list) & set(source_flow_dir_list)) + for huc in matching_hucs: + + if "." not in huc: + + # Get list of AHPS site directories + ahps_site_dir = os.path.join(source_flow_dir, huc) + ahps_site_dir_list = os.listdir(ahps_site_dir) + + # Map paths to HAND files needed for inundation() + fim_run_huc_dir = os.path.join(fim_run_dir, huc) + rem = os.path.join(fim_run_huc_dir, 'rem_zeroed_masked.tif') + catchments = os.path.join(fim_run_huc_dir, 'gw_catchments_reaches_filtered_addedAttributes.tif') + hydroTable = os.path.join(fim_run_huc_dir, 'hydroTable.csv') + + exit_flag = False # Default to False. + + # Check if necessary data exist; set exit_flag to True if they don't exist + for f in [rem, catchments, hydroTable]: + if not os.path.exists(f): + no_data_list.append(f) + exit_flag = True + + # Log missing data + if exit_flag == True: + f = open(log_file, 'a+') + f.write(f"Missing data for: {fim_run_huc_dir}\n") + f.close() + + # Map path to huc directory inside out output_cat_fim_dir + cat_fim_huc_dir = os.path.join(output_cat_fim_dir, huc) + if not os.path.exists(cat_fim_huc_dir): + os.mkdir(cat_fim_huc_dir) + + # Loop through AHPS sites + for ahps_site in ahps_site_dir_list: + # map parent directory for AHPS source data dir and list AHPS thresholds (act, min, mod, maj) + ahps_site_parent = os.path.join(ahps_site_dir, ahps_site) + thresholds_dir_list = os.listdir(ahps_site_parent) + + # Map parent directory for all inundation output filesoutput files. + cat_fim_huc_ahps_dir = os.path.join(cat_fim_huc_dir, ahps_site) + if not os.path.exists(cat_fim_huc_ahps_dir): + os.mkdir(cat_fim_huc_ahps_dir) + + # Loop through thresholds/magnitudes and define inundation output files paths + for magnitude in thresholds_dir_list: + + if "." not in magnitude: + + magnitude_flows_csv = os.path.join(ahps_site_parent, magnitude, 'ahps_' + ahps_site + '_huc_' + huc + '_flows_' + magnitude + '.csv') + + if os.path.exists(magnitude_flows_csv): + + output_extent_grid = os.path.join(cat_fim_huc_ahps_dir, ahps_site + '_' + magnitude + '_extent.tif') + + if depthtif: + output_depth_grid = os.path.join(cat_fim_huc_ahps_dir, ahps_site + '_' + magnitude + '_depth.tif') + else: + output_depth_grid = None + + # Append necessary variables to list for multiprocessing. + procs_list.append([rem, catchments, magnitude_flows_csv, huc, hydroTable, output_extent_grid, output_depth_grid, ahps_site, magnitude, log_file]) + + # Initiate multiprocessing + print(f"Running inundation for {len(procs_list)} sites using {number_of_jobs} jobs") + pool = Pool(number_of_jobs) + pool.map(run_inundation, procs_list) + + +def run_inundation(args): + + rem = args[0] + catchments = args[1] + magnitude_flows_csv = args[2] + huc = args[3] + hydroTable = args[4] + output_extent_grid = args[5] + output_depth_grid = args[6] + ahps_site = args[7] + magnitude = args[8] + log_file = args[9] + + try: + inundate(rem,catchments,catchment_poly,hydroTable,magnitude_flows_csv,mask_type,hucs=hucs,hucs_layerName=hucs_layerName, + subset_hucs=huc,num_workers=1,aggregate=False,inundation_raster=output_extent_grid,inundation_polygon=None, + depths=output_depth_grid,out_raster_profile=None,out_vector_profile=None,quiet=True + ) + + except: + # Log errors and their tracebacks + f = open(log_file, 'a+') + f.write(f"{output_extent_grid} - inundation error: {traceback.format_exc()}\n") + f.close() + + #Inundation.py appends the huc code to the supplied output_extent_grid. + #Modify output_extent_grid to match inundation.py saved filename. + #Search for this file, if it didn't create, send message to log file. + base_file_path,extension = os.path.splitext(output_extent_grid) + saved_extent_grid_filename = "{}_{}{}".format(base_file_path,huc,extension) + if not os.path.exists(saved_extent_grid_filename): + with open(log_file, 'a+') as f: + f.write('FAILURE_huc_{}:{}:{} map failed to create\n'.format(huc,ahps_site,magnitude)) + +def post_process_cat_fim_for_viz(number_of_jobs, output_cat_fim_dir, nws_lid_attributes_filename, log_file): + + # Create workspace + gpkg_dir = os.path.join(output_cat_fim_dir, 'gpkg') + if not os.path.exists(gpkg_dir): + os.mkdir(gpkg_dir) + + + #Find the FIM version + norm_path = os.path.normpath(output_cat_fim_dir) + cat_fim_dir_parts = norm_path.split(os.sep) + [fim_version] = [part for part in cat_fim_dir_parts if part.startswith('fim_3')] + merged_layer = os.path.join(output_cat_fim_dir, 'catfim_library.shp') + + if not os.path.exists(merged_layer): # prevents appending to existing output + + huc_ahps_dir_list = os.listdir(output_cat_fim_dir) + skip_list=['errors','logs','gpkg',merged_layer] + + for magnitude in magnitude_list: + + procs_list = [] + + # Loop through all categories + for huc in huc_ahps_dir_list: + + if huc not in skip_list: + + huc_dir = os.path.join(output_cat_fim_dir, huc) + ahps_dir_list = os.listdir(huc_dir) + + # Loop through ahps sites + for ahps_lid in ahps_dir_list: + ahps_lid_dir = os.path.join(huc_dir, ahps_lid) + + extent_grid = os.path.join(ahps_lid_dir, ahps_lid + '_' + magnitude + '_extent_' + huc + '.tif') + + if os.path.exists(extent_grid): + procs_list.append([ahps_lid, extent_grid, gpkg_dir, fim_version, huc, magnitude, nws_lid_attributes_filename]) + + else: + try: + f = open(log_file, 'a+') + f.write(f"Missing layers: {extent_gpkg}\n") + f.close() + except: + pass + + # Multiprocess with instructions + pool = Pool(number_of_jobs) + pool.map(reformat_inundation_maps, procs_list) + + # Merge all layers + print(f"Merging {len(os.listdir(gpkg_dir))} layers...") + + for layer in os.listdir(gpkg_dir): + + diss_extent_filename = os.path.join(gpkg_dir, layer) + + # Open diss_extent + diss_extent = gpd.read_file(diss_extent_filename) + diss_extent['viz'] = 'yes' + + # Write/append aggregate diss_extent + if os.path.isfile(merged_layer): + diss_extent.to_file(merged_layer,driver=getDriver(merged_layer),index=False, mode='a') + else: + diss_extent.to_file(merged_layer,driver=getDriver(merged_layer),index=False) + + del diss_extent + + shutil.rmtree(gpkg_dir) + + else: + print(f"{merged_layer} already exists.") + + +def reformat_inundation_maps(args): + + try: + lid = args[0] + grid_path = args[1] + gpkg_dir = args[2] + fim_version = args[3] + huc = args[4] + magnitude = args[5] + nws_lid_attributes_filename = args[6] + + # Convert raster to to shapes + with rasterio.open(grid_path) as src: + image = src.read(1) + mask = image > 0 + + # Aggregate shapes + results = ({'properties': {'extent': 1}, 'geometry': s} for i, (s, v) in enumerate(shapes(image, mask=mask,transform=src.transform))) + + # convert list of shapes to polygon + extent_poly = gpd.GeoDataFrame.from_features(list(results), crs=PREP_PROJECTION) + + # Dissolve polygons + extent_poly_diss = extent_poly.dissolve(by='extent') + + # Update attributes + extent_poly_diss = extent_poly_diss.reset_index(drop=True) + extent_poly_diss['ahps_lid'] = lid + extent_poly_diss['magnitude'] = magnitude + extent_poly_diss['version'] = fim_version + extent_poly_diss['huc'] = huc + + # Project to Web Mercator + extent_poly_diss = extent_poly_diss.to_crs(VIZ_PROJECTION) + + # Join attributes + nws_lid_attributes_table = pd.read_csv(nws_lid_attributes_filename, dtype={'huc':str}) + nws_lid_attributes_table = nws_lid_attributes_table.loc[(nws_lid_attributes_table.magnitude==magnitude) & (nws_lid_attributes_table.nws_lid==lid)] + + + extent_poly_diss = extent_poly_diss.merge(nws_lid_attributes_table, left_on=['ahps_lid','magnitude','huc'], right_on=['nws_lid','magnitude','huc']) + + extent_poly_diss = extent_poly_diss.drop(columns='nws_lid') + + # Save dissolved multipolygon + handle = os.path.split(grid_path)[1].replace('.tif', '') + + diss_extent_filename = os.path.join(gpkg_dir, handle + "_dissolved.gpkg") + + extent_poly_diss["geometry"] = [MultiPolygon([feature]) if type(feature) == Polygon else feature for feature in extent_poly_diss["geometry"]] + + if not extent_poly_diss.empty: + + extent_poly_diss.to_file(diss_extent_filename,driver=getDriver(diss_extent_filename),index=False) + + except Exception as e: + # Log and clean out the gdb so it's not merged in later + try: + f = open(log_file, 'a+') + f.write(str(diss_extent_filename) + " - dissolve error: " + str(e)) + f.close() + except: + pass + + +if __name__ == '__main__': + + # Parse arguments + parser = argparse.ArgumentParser(description='Categorical inundation mapping for FOSS FIM.') + parser.add_argument('-r','--fim-run-dir',help='Name of directory containing outputs of fim_run.sh',required=True) + parser.add_argument('-s', '--source-flow-dir',help='Path to directory containing flow CSVs to use to generate categorical FIM.',required=True, default="") + parser.add_argument('-o', '--output-cat-fim-dir',help='Path to directory where categorical FIM outputs will be written.',required=True, default="") + parser.add_argument('-j','--number-of-jobs',help='Number of processes to use. Default is 1.',required=False, default="1",type=int) + parser.add_argument('-depthtif','--write-depth-tiff',help='Using this option will write depth TIFFs.',required=False, action='store_true') + + args = vars(parser.parse_args()) + + fim_run_dir = args['fim_run_dir'] + source_flow_dir = args['source_flow_dir'] + output_cat_fim_dir = args['output_cat_fim_dir'] + number_of_jobs = int(args['number_of_jobs']) + depthtif = args['write_depth_tiff'] + + + # Create output directory + if not os.path.exists(output_cat_fim_dir): + os.mkdir(output_cat_fim_dir) + + # Create log directory + log_dir = os.path.join(output_cat_fim_dir, 'logs') + if not os.path.exists(log_dir): + os.mkdir(log_dir) + + # Create error log path + log_file = os.path.join(log_dir, 'errors.log') + + # Map path to points with attributes + nws_lid_attributes_filename = os.path.join(source_flow_dir, 'nws_lid_attributes.csv') + + print("Generating Categorical FIM") + generate_categorical_fim(fim_run_dir, source_flow_dir, output_cat_fim_dir, number_of_jobs, depthtif,log_file) + + print("Aggregating Categorical FIM") + post_process_cat_fim_for_viz(number_of_jobs, output_cat_fim_dir,nws_lid_attributes_filename,log_file) diff --git a/tools/inundation.py b/tools/inundation.py index e7c600510..d093385b8 100755 --- a/tools/inundation.py +++ b/tools/inundation.py @@ -17,6 +17,7 @@ from warnings import warn from gdal import BuildVRT import geopandas as gpd +import sys def inundate( @@ -454,14 +455,14 @@ def __subset_hydroTable_to_forecast(hydroTable,forecast,subset_hucs=None): 'HydroID':str,'stage':float, 'discharge_cms':float,'LakeID' : int} ) - + huc_error = hydroTable.HUC.unique() hydroTable.set_index(['HUC','feature_id','HydroID'],inplace=True) hydroTable = hydroTable[hydroTable["LakeID"] == -999] # Subset hydroTable to include only non-lake catchments. if hydroTable.empty: - print ("All stream segments in HUC are within lake boundaries.") - return + print(f"All stream segments in HUC(s): {huc_error} are within lake boundaries.") + sys.exit(0) elif isinstance(hydroTable,pd.DataFrame): pass #consider checking for correct dtypes, indices, and columns @@ -504,7 +505,11 @@ def __subset_hydroTable_to_forecast(hydroTable,forecast,subset_hucs=None): hydroTable = hydroTable[np.in1d(hydroTable.index.get_level_values('HUC'), subset_hucs)] # join tables - hydroTable = hydroTable.join(forecast,on=['feature_id'],how='inner') + try: + hydroTable = hydroTable.join(forecast,on=['feature_id'],how='inner') + except AttributeError: + print (f"No matching feature IDs between forecast and hydrotable for HUC(s): {subset_hucs}") + sys.exit(0) # initialize dictionary catchmentStagesDict = typed.Dict.empty(types.int32,types.float64) diff --git a/tools/tools_shared_functions.py b/tools/tools_shared_functions.py index aed515f5c..13534f87b 100755 --- a/tools/tools_shared_functions.py +++ b/tools/tools_shared_functions.py @@ -728,15 +728,16 @@ def get_metadata(metadata_url, select_by, selector, must_include = None, upstrea metadata_list = metadata_json['locations'] #Add timestamp of WRDS retrieval timestamp = response.headers['Date'] + #Add timestamp of sources retrieval + nrldb_timestamp, nwis_timestamp = metadata_json['data_sources']['metadata_sources'] #get crosswalk info (always last dictionary in list) - *metadata_list, crosswalk_info = metadata_list - #Update each dictionary with timestamp and crosswalk info + crosswalk_info = metadata_json['data_sources'] + #Update each dictionary with timestamp and crosswalk info also save to DataFrame. for metadata in metadata_list: metadata.update({"wrds_timestamp": timestamp}) + metadata.update({"nrldb_timestamp":nrldb_timestamp}) + metadata.update({"nwis_timestamp":nwis_timestamp}) metadata.update(crosswalk_info) - #If count is 1 - if location_count == 1: - metadata_list = metadata_json['locations'][0] metadata_dataframe = pd.json_normalize(metadata_list) #Replace all periods with underscores in column names metadata_dataframe.columns = metadata_dataframe.columns.str.replace('.','_') @@ -754,8 +755,8 @@ def get_metadata(metadata_url, select_by, selector, must_include = None, upstrea def aggregate_wbd_hucs(metadata_list, wbd_huc8_path, retain_attributes = False): ''' Assigns the proper FIM HUC 08 code to each site in the input DataFrame. - Converts input DataFrame to a GeoDataFrame using the lat/lon attributes - with sites containing null lat/lon removed. Reprojects GeoDataFrame + Converts input DataFrame to a GeoDataFrame using lat/lon attributes + with sites containing null nws_lid/lat/lon removed. Reprojects GeoDataFrame to same CRS as the HUC 08 layer. Performs a spatial join to assign the HUC 08 layer to the GeoDataFrame. Sites that are not assigned a HUC code removed as well as sites in Alaska and Canada. @@ -782,8 +783,8 @@ def aggregate_wbd_hucs(metadata_list, wbd_huc8_path, retain_attributes = False): #Import huc8 layer as geodataframe and retain necessary columns huc8 = gpd.read_file(wbd_huc8_path, layer = 'WBDHU8') huc8 = huc8[['HUC8','name','states', 'geometry']] - #Define EPSG codes for possible usgs latlon datum names (NAD83WGS84 assigned NAD83) - crs_lookup ={'NAD27':'EPSG:4267', 'NAD83':'EPSG:4269', 'NAD83WGS84': 'EPSG:4269'} + #Define EPSG codes for possible latlon datum names (default of NAD83 if unassigned) + crs_lookup ={'NAD27':'EPSG:4267', 'NAD83':'EPSG:4269', 'WGS84': 'EPSG:4326'} #Create empty geodataframe and define CRS for potential horizontal datums metadata_gdf = gpd.GeoDataFrame() #Iterate through each site @@ -793,14 +794,20 @@ def aggregate_wbd_hucs(metadata_list, wbd_huc8_path, retain_attributes = False): #Columns have periods due to nested dictionaries df.columns = df.columns.str.replace('.', '_') #Drop any metadata sites that don't have lat/lon populated - df.dropna(subset = ['identifiers_nws_lid','usgs_data_latitude','usgs_data_longitude'], inplace = True) + df.dropna(subset = ['identifiers_nws_lid','usgs_preferred_latitude', 'usgs_preferred_longitude'], inplace = True) #If dataframe still has data if not df.empty: - #Get horizontal datum (use usgs) and assign appropriate EPSG code - h_datum = df.usgs_data_latlon_datum_name.item() - src_crs = crs_lookup[h_datum] - #Convert dataframe to geodataframe using lat/lon (USGS) - site_gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.usgs_data_longitude, df.usgs_data_latitude), crs = src_crs) + #Get horizontal datum + h_datum = df['usgs_preferred_latlon_datum_name'].item() + #Look up EPSG code, if not returned Assume NAD83 as default. + dict_crs = crs_lookup.get(h_datum,'EPSG:4269_ Assumed') + #We want to know what sites were assumed, hence the split. + src_crs, *message = dict_crs.split('_') + #Convert dataframe to geodataframe using lat/lon (USGS). Add attribute of assigned crs (label ones that are assumed) + site_gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df['usgs_preferred_longitude'], df['usgs_preferred_latitude']), crs = src_crs) + #Field to indicate if a latlon datum was assumed + site_gdf['assigned_crs'] = src_crs + ''.join(message) + #Reproject to huc 8 crs site_gdf = site_gdf.to_crs(huc8.crs) #Append site geodataframe to metadata geodataframe @@ -808,7 +815,7 @@ def aggregate_wbd_hucs(metadata_list, wbd_huc8_path, retain_attributes = False): #Trim metadata to only have certain fields. if not retain_attributes: - metadata_gdf = metadata_gdf[['identifiers_nwm_feature_id', 'identifiers_nws_lid', 'geometry']] + metadata_gdf = metadata_gdf[['identifiers_nwm_feature_id', 'identifiers_nws_lid', 'identifiers_usgs_site_code', 'geometry']] #If a list of attributes is supplied then use that list. elif isinstance(retain_attributes,list): metadata_gdf = metadata_gdf[retain_attributes] @@ -937,7 +944,7 @@ def get_nwm_segs(metadata): ####################################################################### #Thresholds ####################################################################### -def get_thresholds(threshold_url, location_ids, physical_element = 'all', threshold = 'all', bypass_source_flag = False): +def get_thresholds(threshold_url, select_by, selector, threshold = 'all'): ''' Get nws_lid threshold stages and flows (i.e. bankfull, action, minor, moderate, major). Returns a dictionary for stages and one for flows. @@ -946,17 +953,12 @@ def get_thresholds(threshold_url, location_ids, physical_element = 'all', thresh ---------- threshold_url : STR WRDS threshold API. - location_ids : STR - nws_lid code (only a single code). - physical_element : STR, optional - Physical element option. The default is 'all'. + select_by : STR + Type of site (nws_lid, usgs_site_code etc). + selector : STR + Site for selection. Must be a single site. threshold : STR, optional Threshold option. The default is 'all'. - bypass_source_flag : BOOL, optional - Special case if calculated values are not available (e.g. no rating - curve is available) then this allows for just a stage to be returned. - Used in case a flow is already known from another source, such as - a model. The default is False. Returns ------- @@ -966,13 +968,14 @@ def get_thresholds(threshold_url, location_ids, physical_element = 'all', thresh Dictionary of flows at each threshold. ''' - - url = f'{threshold_url}/{physical_element}/{threshold}/{location_ids}' - response = requests.get(url) + params = {} + params['threshold'] = threshold + url = f'{threshold_url}/{select_by}/{selector}' + response = requests.get(url, params = params) if response.ok: thresholds_json = response.json() #Get metadata - thresholds_info = thresholds_json['stream_thresholds'] + thresholds_info = thresholds_json['value_set'] #Initialize stages/flows dictionaries stages = {} flows = {} @@ -985,24 +988,27 @@ def get_thresholds(threshold_url, location_ids, physical_element = 'all', thresh threshold_data = thresholds_info[rating_sources['USGS Rating Depot']] elif 'NRLDB' in rating_sources: threshold_data = thresholds_info[rating_sources['NRLDB']] - #If neither USGS or NRLDB is available + #If neither USGS or NRLDB is available use first dictionary to get stage values. else: - #A flag option for cases where only a stage is needed for USGS scenario where a rating curve source is not available yet stages are available for the site. If flag is enabled, then stages are retrieved from the first record in thresholds_info. Typically the flows will not be populated as no rating curve is available. Flag should only be enabled when flows are already supplied by source (e.g. USGS) and threshold stages are needed. - if bypass_source_flag: - threshold_data = thresholds_info[0] - else: - threshold_data = [] + threshold_data = thresholds_info[0] #Get stages and flows for each threshold if threshold_data: stages = threshold_data['stage_values'] flows = threshold_data['calc_flow_values'] #Add source information to stages and flows. Flows source inside a nested dictionary. Remove key once source assigned to flows. - stages['source'] = threshold_data['metadata']['threshold_source'] - flows['source'] = flows['rating_curve']['source'] + stages['source'] = threshold_data.get('metadata').get('threshold_source') + flows['source'] = flows.get('rating_curve', {}).get('source') flows.pop('rating_curve', None) #Add timestamp WRDS data was retrieved. stages['wrds_timestamp'] = response.headers['Date'] flows['wrds_timestamp'] = response.headers['Date'] + #Add Site information + stages['nws_lid'] = threshold_data.get('metadata').get('nws_lid') + flows['nws_lid'] = threshold_data.get('metadata').get('nws_lid') + stages['usgs_site_code'] = threshold_data.get('metadata').get('usgs_site_code') + flows['usgs_site_code'] = threshold_data.get('metadata').get('usgs_site_code') + stages['units'] = threshold_data.get('metadata').get('stage_units') + flows['units'] = threshold_data.get('metadata').get('calc_flow_units') return stages, flows ########################################################################