From 32eab723b918de94c80fb2a30b7bb6a087510752 Mon Sep 17 00:00:00 2001 From: LauraKeys-NOAA <70279180+LauraKeys-NOAA@users.noreply.github.com> Date: Fri, 17 Feb 2023 10:52:02 -0600 Subject: [PATCH] v4.1.1.0 Updated 1-foot interval CatFIM with restart capabilities (#809) --- tools/generate_categorical_fim.py | 172 +++++++++++++++++++----------- 1 file changed, 111 insertions(+), 61 deletions(-) diff --git a/tools/generate_categorical_fim.py b/tools/generate_categorical_fim.py index dfb25438b..ee0c5ea03 100755 --- a/tools/generate_categorical_fim.py +++ b/tools/generate_categorical_fim.py @@ -130,7 +130,7 @@ def create_csvs(output_mapping_dir, reformatted_catfim_method): # Convert any geopackage in the root level of output_mapping_dir to CSV and rename. gpkg_list = glob.glob(os.path.join(output_mapping_dir, '*.gpkg')) for gpkg in gpkg_list: - print(f"creating CSV for {gpkg}") + print(f"Creating CSV for {gpkg}") gdf = gpd.read_file(gpkg) parent_directory = os.path.split(gpkg)[0] if 'catfim_library' in gpkg: @@ -174,32 +174,34 @@ def update_mapping_status(output_mapping_dir, output_flows_dir, nws_sites_layer, # Import geopackage output from flows creation flows_df = gpd.read_file(nws_sites_layer) - # Join failed sites to flows df - flows_df = flows_df.merge(mapping_df, how = 'left', on = 'nws_lid') - - # Switch mapped column to no for failed sites and update status - flows_df.loc[flows_df['did_it_map'] == 'no', 'mapped'] = 'no' - flows_df.loc[flows_df['did_it_map']=='no','status'] = flows_df['status'] + flows_df['map_status'] - -# # Perform pass for HUCs where mapping was skipped due to missing data #TODO check with Brian -# if stage_based: -# missing_mapping_hucs = -# else: -# flows_hucs = [i.stem for i in Path(output_flows_dir).iterdir() if i.is_dir()] -# mapping_hucs = [i.stem for i in Path(output_mapping_dir).iterdir() if i.is_dir()] -# missing_mapping_hucs = list(set(flows_hucs) - set(mapping_hucs)) -# -# # Update status for nws_lid in missing hucs and change mapped attribute to 'no' -# flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'status'] = flows_df['status'] + ' and all categories failed to map because missing HUC information' -# flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'mapped'] = 'no' - - # Clean up GeoDataFrame and rename columns for consistency - flows_df = flows_df.drop(columns = ['did_it_map','map_status']) - flows_df = flows_df.rename(columns = {'nws_lid':'ahps_lid'}) - - # Write out to file - flows_df.to_file(nws_sites_layer) + try: + # Join failed sites to flows df + flows_df = flows_df.merge(mapping_df, how = 'left', on = 'nws_lid') + # Switch mapped column to no for failed sites and update status + flows_df.loc[flows_df['did_it_map'] == 'no', 'mapped'] = 'no' + flows_df.loc[flows_df['did_it_map']=='no','status'] = flows_df['status'] + flows_df['map_status'] + + # # Perform pass for HUCs where mapping was skipped due to missing data #TODO check with Brian + # if stage_based: + # missing_mapping_hucs = + # else: + # flows_hucs = [i.stem for i in Path(output_flows_dir).iterdir() if i.is_dir()] + # mapping_hucs = [i.stem for i in Path(output_mapping_dir).iterdir() if i.is_dir()] + # missing_mapping_hucs = list(set(flows_hucs) - set(mapping_hucs)) + # + # # Update status for nws_lid in missing hucs and change mapped attribute to 'no' + # flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'status'] = flows_df['status'] + ' and all categories failed to map because missing HUC information' + # flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'mapped'] = 'no' + + # Clean up GeoDataFrame and rename columns for consistency + flows_df = flows_df.drop(columns = ['did_it_map','map_status']) + flows_df = flows_df.rename(columns = {'nws_lid':'ahps_lid'}) + + # Write out to file + flows_df.to_file(nws_sites_layer) + except: + print("No LIDs") def produce_inundation_map_with_stage_and_feature_ids(rem_path, catchments_path, hydroid_list, hand_stage, lid_directory, category, huc, lid, branch): @@ -229,6 +231,10 @@ def produce_inundation_map_with_stage_and_feature_ids(rem_path, catchments_path, with rasterio.open(output_tif, 'w', **profile) as dst: dst.write(masked_reclass_rem_array, 1) +def mark_complete(site_directory): + marker_file = Path(site_directory) / 'complete.txt' + marker_file.touch() + return def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, threshold_url, flood_categories, all_lists, past_major_interval_cap, number_of_jobs, number_of_interval_jobs, attributes_dir): missing_huc_files = [] @@ -262,9 +268,14 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr lid_directory = os.path.join(huc_directory, lid) if not os.path.exists(lid_directory): os.mkdir(lid_directory) + else: + complete_marker = os.path.join(lid_directory, 'complete.txt') + if os.path.exists(complete_marker): + all_messages.append([f"{lid}: already completed in previous run."]) + continue # Get stages and flows for each threshold from the WRDS API. Priority given to USGS calculated flows. stages, flows = get_thresholds(threshold_url = threshold_url, select_by = 'nws_lid', selector = lid, threshold = 'all') - + if stages == None: all_messages.append([f'{lid}:error getting thresholds from WRDS API']) continue @@ -273,20 +284,28 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr all_messages.append([f'{lid}:missing threshold stages']) continue - # Drop columns that offend acceptance criteria - usgs_elev_df['acceptable_codes'] = (usgs_elev_df['usgs_data_coord_accuracy_code'].isin(acceptable_coord_acc_code_list) - & usgs_elev_df['usgs_data_coord_method_code'].isin(acceptable_coord_method_code_list) - & usgs_elev_df['usgs_data_alt_method_code'].isin(acceptable_alt_meth_code_list) - & usgs_elev_df['usgs_data_site_type'].isin(acceptable_site_type_list)) - - usgs_elev_df = usgs_elev_df.astype({'usgs_data_alt_accuracy_code': float}) - usgs_elev_df['acceptable_alt_error'] = np.where(usgs_elev_df['usgs_data_alt_accuracy_code'] <= acceptable_alt_acc_thresh, True, False) - - acceptable_usgs_elev_df = usgs_elev_df[(usgs_elev_df['acceptable_codes'] == True) & (usgs_elev_df['acceptable_alt_error'] == True)] - + try: + # Drop columns that offend acceptance criteria + usgs_elev_df['acceptable_codes'] = (usgs_elev_df['usgs_data_coord_accuracy_code'].isin(acceptable_coord_acc_code_list) + & usgs_elev_df['usgs_data_coord_method_code'].isin(acceptable_coord_method_code_list) + & usgs_elev_df['usgs_data_alt_method_code'].isin(acceptable_alt_meth_code_list) + & usgs_elev_df['usgs_data_site_type'].isin(acceptable_site_type_list)) + + usgs_elev_df = usgs_elev_df.astype({'usgs_data_alt_accuracy_code': float}) + usgs_elev_df['acceptable_alt_error'] = np.where(usgs_elev_df['usgs_data_alt_accuracy_code'] <= acceptable_alt_acc_thresh, True, False) + + acceptable_usgs_elev_df = usgs_elev_df[(usgs_elev_df['acceptable_codes'] == True) & (usgs_elev_df['acceptable_alt_error'] == True)] + except Exception as e: + # Not sure any of the sites actually have those USGS-related + # columns in this particular file, so just assume it's fine to use + + #print("(Various columns related to USGS probably not in this csv)") + acceptable_usgs_elev_df = usgs_elev_df + # Get the dem_adj_elevation value from usgs_elev_table.csv. Prioritize the value that is not from branch 0. try: matching_rows = acceptable_usgs_elev_df.loc[acceptable_usgs_elev_df['nws_lid'] == lid.upper(), 'dem_adj_elevation'] + if len(matching_rows) == 2: # It means there are two level paths, use the one that is not 0 lid_usgs_elev = acceptable_usgs_elev_df.loc[(acceptable_usgs_elev_df['nws_lid'] == lid.upper()) & ('levpa_id' != 0), 'dem_adj_elevation'].values[0] else: @@ -301,6 +320,32 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr metadata = next((item for item in all_lists if item['identifiers']['nws_lid'] == lid.upper()), False) lid_altitude = metadata['usgs_data']['altitude'] + # Filter out sites that don't have "good" data + try: + if not metadata['usgs_data']['coord_accuracy_code'] in \ + acceptable_coord_acc_code_list: + print(f"\t{lid}: {metadata['usgs_data']['coord_accuracy_code']} Not in acceptable coord acc codes") + continue + if not metadata['usgs_data']['coord_method_code'] in \ + acceptable_coord_method_code_list: + print(f"\t{lid}: Not in acceptable coord method codes") + continue + if not metadata['usgs_data']['alt_method_code'] in \ + acceptable_alt_meth_code_list: + print(f"\t{lid}: Not in acceptable alt method codes") + continue + if not metadata['usgs_data']['site_type'] in \ + acceptable_site_type_list: + print(f"\t{lid}: Not in acceptable site type codes") + continue + if not float(metadata['usgs_data']['alt_accuracy_code']) <= \ + acceptable_alt_acc_thresh: + print(f"\t{lid}: Not in acceptable threshold range") + continue + except Exception as e: + print(e) + continue + ### --- Do Datum Offset --- ### #determine source of interpolated threshold flows, this will be the rating curve that will be used. rating_curve_source = flows.get('source') @@ -461,7 +506,6 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr csv_df = csv_df.append(line_df) except Exception as e: - print("Exception") print(e) # Round flow and stage columns to 2 decimal places. @@ -476,6 +520,7 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr # If it made it to this point (i.e. no continues), there were no major preventers of mapping all_messages.append([f'{lid}:OK']) + mark_complete(output_dir) # Write all_messages by HUC to be scraped later. messages_dir = os.path.join(workspace, 'messages') if not os.path.exists(messages_dir): @@ -547,30 +592,35 @@ def generate_stage_based_categorical_fim(workspace, fim_version, fim_dir, nwm_us for row in reader: all_messages.append(row) - # Write messages to DataFrame, split into columns, aggregate messages. - messages_df = pd.DataFrame(all_messages, columns = ['message']) - - messages_df = messages_df['message'].str.split(':', n = 1, expand = True).rename(columns={0:'nws_lid', 1:'status'}) - status_df = messages_df.groupby(['nws_lid'])['status'].apply(', '.join).reset_index() - - # Join messages to populate status field to candidate sites. Assign - # status for null fields. - viz_out_gdf = viz_out_gdf.merge(status_df, how = 'left', on = 'nws_lid') - -# viz_out_gdf['status'] = viz_out_gdf['status'].fillna('OK') - # Filter out columns and write out to file nws_sites_layer = os.path.join(workspace, 'nws_lid_sites.gpkg') - - # Add acceptance criteria to viz_out_gdf before writing - viz_out_gdf['acceptable_coord_acc_code_list'] = str(acceptable_coord_acc_code_list) - viz_out_gdf['acceptable_coord_method_code_list'] = str(acceptable_coord_method_code_list) - viz_out_gdf['acceptable_alt_acc_thresh'] = float(acceptable_alt_acc_thresh) - viz_out_gdf['acceptable_alt_meth_code_list'] = str(acceptable_alt_meth_code_list) - viz_out_gdf['acceptable_site_type_list'] = str(acceptable_site_type_list) - - viz_out_gdf.to_file(nws_sites_layer, driver='GPKG') - + + # Only write to sites geopackage if it didn't exist yet + # (and this line shouldn't have been reached if we had an interrupted + # run previously and are picking back up with a restart) + if not os.path.exists(nws_sites_layer): + + # Write messages to DataFrame, split into columns, aggregate messages. + messages_df = pd.DataFrame(all_messages, columns = ['message']) + + messages_df = messages_df['message'].str.split(':', n = 1, expand = True).rename(columns={0:'nws_lid', 1:'status'}) + status_df = messages_df.groupby(['nws_lid'])['status'].apply(', '.join).reset_index() + + # Join messages to populate status field to candidate sites. Assign + # status for null fields. + viz_out_gdf = viz_out_gdf.merge(status_df, how = 'left', on = 'nws_lid') + + # viz_out_gdf['status'] = viz_out_gdf['status'].fillna('OK') + + # Add acceptance criteria to viz_out_gdf before writing + viz_out_gdf['acceptable_coord_acc_code_list'] = str(acceptable_coord_acc_code_list) + viz_out_gdf['acceptable_coord_method_code_list'] = str(acceptable_coord_method_code_list) + viz_out_gdf['acceptable_alt_acc_thresh'] = float(acceptable_alt_acc_thresh) + viz_out_gdf['acceptable_alt_meth_code_list'] = str(acceptable_alt_meth_code_list) + viz_out_gdf['acceptable_site_type_list'] = str(acceptable_site_type_list) + + viz_out_gdf.to_file(nws_sites_layer, driver='GPKG') + return nws_sites_layer