Skip to content

Commit

Permalink
v4.1.1.0 Updated 1-foot interval CatFIM with restart capabilities (#809)
Browse files Browse the repository at this point in the history
  • Loading branch information
LauraKeys-NOAA authored Feb 17, 2023
1 parent 2f44e73 commit 32eab72
Showing 1 changed file with 111 additions and 61 deletions.
172 changes: 111 additions & 61 deletions tools/generate_categorical_fim.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def create_csvs(output_mapping_dir, reformatted_catfim_method):
# Convert any geopackage in the root level of output_mapping_dir to CSV and rename.
gpkg_list = glob.glob(os.path.join(output_mapping_dir, '*.gpkg'))
for gpkg in gpkg_list:
print(f"creating CSV for {gpkg}")
print(f"Creating CSV for {gpkg}")
gdf = gpd.read_file(gpkg)
parent_directory = os.path.split(gpkg)[0]
if 'catfim_library' in gpkg:
Expand Down Expand Up @@ -174,32 +174,34 @@ def update_mapping_status(output_mapping_dir, output_flows_dir, nws_sites_layer,
# Import geopackage output from flows creation
flows_df = gpd.read_file(nws_sites_layer)

# Join failed sites to flows df
flows_df = flows_df.merge(mapping_df, how = 'left', on = 'nws_lid')

# Switch mapped column to no for failed sites and update status
flows_df.loc[flows_df['did_it_map'] == 'no', 'mapped'] = 'no'
flows_df.loc[flows_df['did_it_map']=='no','status'] = flows_df['status'] + flows_df['map_status']

# # Perform pass for HUCs where mapping was skipped due to missing data #TODO check with Brian
# if stage_based:
# missing_mapping_hucs =
# else:
# flows_hucs = [i.stem for i in Path(output_flows_dir).iterdir() if i.is_dir()]
# mapping_hucs = [i.stem for i in Path(output_mapping_dir).iterdir() if i.is_dir()]
# missing_mapping_hucs = list(set(flows_hucs) - set(mapping_hucs))
#
# # Update status for nws_lid in missing hucs and change mapped attribute to 'no'
# flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'status'] = flows_df['status'] + ' and all categories failed to map because missing HUC information'
# flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'mapped'] = 'no'

# Clean up GeoDataFrame and rename columns for consistency
flows_df = flows_df.drop(columns = ['did_it_map','map_status'])
flows_df = flows_df.rename(columns = {'nws_lid':'ahps_lid'})

# Write out to file
flows_df.to_file(nws_sites_layer)
try:
# Join failed sites to flows df
flows_df = flows_df.merge(mapping_df, how = 'left', on = 'nws_lid')

# Switch mapped column to no for failed sites and update status
flows_df.loc[flows_df['did_it_map'] == 'no', 'mapped'] = 'no'
flows_df.loc[flows_df['did_it_map']=='no','status'] = flows_df['status'] + flows_df['map_status']

# # Perform pass for HUCs where mapping was skipped due to missing data #TODO check with Brian
# if stage_based:
# missing_mapping_hucs =
# else:
# flows_hucs = [i.stem for i in Path(output_flows_dir).iterdir() if i.is_dir()]
# mapping_hucs = [i.stem for i in Path(output_mapping_dir).iterdir() if i.is_dir()]
# missing_mapping_hucs = list(set(flows_hucs) - set(mapping_hucs))
#
# # Update status for nws_lid in missing hucs and change mapped attribute to 'no'
# flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'status'] = flows_df['status'] + ' and all categories failed to map because missing HUC information'
# flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'mapped'] = 'no'

# Clean up GeoDataFrame and rename columns for consistency
flows_df = flows_df.drop(columns = ['did_it_map','map_status'])
flows_df = flows_df.rename(columns = {'nws_lid':'ahps_lid'})

# Write out to file
flows_df.to_file(nws_sites_layer)
except:
print("No LIDs")

def produce_inundation_map_with_stage_and_feature_ids(rem_path, catchments_path, hydroid_list, hand_stage, lid_directory, category, huc, lid, branch):

Expand Down Expand Up @@ -229,6 +231,10 @@ def produce_inundation_map_with_stage_and_feature_ids(rem_path, catchments_path,
with rasterio.open(output_tif, 'w', **profile) as dst:
dst.write(masked_reclass_rem_array, 1)

def mark_complete(site_directory):
marker_file = Path(site_directory) / 'complete.txt'
marker_file.touch()
return

def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, threshold_url, flood_categories, all_lists, past_major_interval_cap, number_of_jobs, number_of_interval_jobs, attributes_dir):
missing_huc_files = []
Expand Down Expand Up @@ -262,9 +268,14 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr
lid_directory = os.path.join(huc_directory, lid)
if not os.path.exists(lid_directory):
os.mkdir(lid_directory)
else:
complete_marker = os.path.join(lid_directory, 'complete.txt')
if os.path.exists(complete_marker):
all_messages.append([f"{lid}: already completed in previous run."])
continue
# Get stages and flows for each threshold from the WRDS API. Priority given to USGS calculated flows.
stages, flows = get_thresholds(threshold_url = threshold_url, select_by = 'nws_lid', selector = lid, threshold = 'all')

if stages == None:
all_messages.append([f'{lid}:error getting thresholds from WRDS API'])
continue
Expand All @@ -273,20 +284,28 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr
all_messages.append([f'{lid}:missing threshold stages'])
continue

# Drop columns that offend acceptance criteria
usgs_elev_df['acceptable_codes'] = (usgs_elev_df['usgs_data_coord_accuracy_code'].isin(acceptable_coord_acc_code_list)
& usgs_elev_df['usgs_data_coord_method_code'].isin(acceptable_coord_method_code_list)
& usgs_elev_df['usgs_data_alt_method_code'].isin(acceptable_alt_meth_code_list)
& usgs_elev_df['usgs_data_site_type'].isin(acceptable_site_type_list))

usgs_elev_df = usgs_elev_df.astype({'usgs_data_alt_accuracy_code': float})
usgs_elev_df['acceptable_alt_error'] = np.where(usgs_elev_df['usgs_data_alt_accuracy_code'] <= acceptable_alt_acc_thresh, True, False)

acceptable_usgs_elev_df = usgs_elev_df[(usgs_elev_df['acceptable_codes'] == True) & (usgs_elev_df['acceptable_alt_error'] == True)]

try:
# Drop columns that offend acceptance criteria
usgs_elev_df['acceptable_codes'] = (usgs_elev_df['usgs_data_coord_accuracy_code'].isin(acceptable_coord_acc_code_list)
& usgs_elev_df['usgs_data_coord_method_code'].isin(acceptable_coord_method_code_list)
& usgs_elev_df['usgs_data_alt_method_code'].isin(acceptable_alt_meth_code_list)
& usgs_elev_df['usgs_data_site_type'].isin(acceptable_site_type_list))

usgs_elev_df = usgs_elev_df.astype({'usgs_data_alt_accuracy_code': float})
usgs_elev_df['acceptable_alt_error'] = np.where(usgs_elev_df['usgs_data_alt_accuracy_code'] <= acceptable_alt_acc_thresh, True, False)

acceptable_usgs_elev_df = usgs_elev_df[(usgs_elev_df['acceptable_codes'] == True) & (usgs_elev_df['acceptable_alt_error'] == True)]
except Exception as e:
# Not sure any of the sites actually have those USGS-related
# columns in this particular file, so just assume it's fine to use

#print("(Various columns related to USGS probably not in this csv)")
acceptable_usgs_elev_df = usgs_elev_df

# Get the dem_adj_elevation value from usgs_elev_table.csv. Prioritize the value that is not from branch 0.
try:
matching_rows = acceptable_usgs_elev_df.loc[acceptable_usgs_elev_df['nws_lid'] == lid.upper(), 'dem_adj_elevation']

if len(matching_rows) == 2: # It means there are two level paths, use the one that is not 0
lid_usgs_elev = acceptable_usgs_elev_df.loc[(acceptable_usgs_elev_df['nws_lid'] == lid.upper()) & ('levpa_id' != 0), 'dem_adj_elevation'].values[0]
else:
Expand All @@ -301,6 +320,32 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr
metadata = next((item for item in all_lists if item['identifiers']['nws_lid'] == lid.upper()), False)
lid_altitude = metadata['usgs_data']['altitude']

# Filter out sites that don't have "good" data
try:
if not metadata['usgs_data']['coord_accuracy_code'] in \
acceptable_coord_acc_code_list:
print(f"\t{lid}: {metadata['usgs_data']['coord_accuracy_code']} Not in acceptable coord acc codes")
continue
if not metadata['usgs_data']['coord_method_code'] in \
acceptable_coord_method_code_list:
print(f"\t{lid}: Not in acceptable coord method codes")
continue
if not metadata['usgs_data']['alt_method_code'] in \
acceptable_alt_meth_code_list:
print(f"\t{lid}: Not in acceptable alt method codes")
continue
if not metadata['usgs_data']['site_type'] in \
acceptable_site_type_list:
print(f"\t{lid}: Not in acceptable site type codes")
continue
if not float(metadata['usgs_data']['alt_accuracy_code']) <= \
acceptable_alt_acc_thresh:
print(f"\t{lid}: Not in acceptable threshold range")
continue
except Exception as e:
print(e)
continue

### --- Do Datum Offset --- ###
#determine source of interpolated threshold flows, this will be the rating curve that will be used.
rating_curve_source = flows.get('source')
Expand Down Expand Up @@ -461,7 +506,6 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr
csv_df = csv_df.append(line_df)

except Exception as e:
print("Exception")
print(e)

# Round flow and stage columns to 2 decimal places.
Expand All @@ -476,6 +520,7 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr

# If it made it to this point (i.e. no continues), there were no major preventers of mapping
all_messages.append([f'{lid}:OK'])
mark_complete(output_dir)
# Write all_messages by HUC to be scraped later.
messages_dir = os.path.join(workspace, 'messages')
if not os.path.exists(messages_dir):
Expand Down Expand Up @@ -547,30 +592,35 @@ def generate_stage_based_categorical_fim(workspace, fim_version, fim_dir, nwm_us
for row in reader:
all_messages.append(row)

# Write messages to DataFrame, split into columns, aggregate messages.
messages_df = pd.DataFrame(all_messages, columns = ['message'])

messages_df = messages_df['message'].str.split(':', n = 1, expand = True).rename(columns={0:'nws_lid', 1:'status'})
status_df = messages_df.groupby(['nws_lid'])['status'].apply(', '.join).reset_index()

# Join messages to populate status field to candidate sites. Assign
# status for null fields.
viz_out_gdf = viz_out_gdf.merge(status_df, how = 'left', on = 'nws_lid')

# viz_out_gdf['status'] = viz_out_gdf['status'].fillna('OK')

# Filter out columns and write out to file
nws_sites_layer = os.path.join(workspace, 'nws_lid_sites.gpkg')

# Add acceptance criteria to viz_out_gdf before writing
viz_out_gdf['acceptable_coord_acc_code_list'] = str(acceptable_coord_acc_code_list)
viz_out_gdf['acceptable_coord_method_code_list'] = str(acceptable_coord_method_code_list)
viz_out_gdf['acceptable_alt_acc_thresh'] = float(acceptable_alt_acc_thresh)
viz_out_gdf['acceptable_alt_meth_code_list'] = str(acceptable_alt_meth_code_list)
viz_out_gdf['acceptable_site_type_list'] = str(acceptable_site_type_list)

viz_out_gdf.to_file(nws_sites_layer, driver='GPKG')


# Only write to sites geopackage if it didn't exist yet
# (and this line shouldn't have been reached if we had an interrupted
# run previously and are picking back up with a restart)
if not os.path.exists(nws_sites_layer):

# Write messages to DataFrame, split into columns, aggregate messages.
messages_df = pd.DataFrame(all_messages, columns = ['message'])

messages_df = messages_df['message'].str.split(':', n = 1, expand = True).rename(columns={0:'nws_lid', 1:'status'})
status_df = messages_df.groupby(['nws_lid'])['status'].apply(', '.join).reset_index()

# Join messages to populate status field to candidate sites. Assign
# status for null fields.
viz_out_gdf = viz_out_gdf.merge(status_df, how = 'left', on = 'nws_lid')

# viz_out_gdf['status'] = viz_out_gdf['status'].fillna('OK')

# Add acceptance criteria to viz_out_gdf before writing
viz_out_gdf['acceptable_coord_acc_code_list'] = str(acceptable_coord_acc_code_list)
viz_out_gdf['acceptable_coord_method_code_list'] = str(acceptable_coord_method_code_list)
viz_out_gdf['acceptable_alt_acc_thresh'] = float(acceptable_alt_acc_thresh)
viz_out_gdf['acceptable_alt_meth_code_list'] = str(acceptable_alt_meth_code_list)
viz_out_gdf['acceptable_site_type_list'] = str(acceptable_site_type_list)

viz_out_gdf.to_file(nws_sites_layer, driver='GPKG')

return nws_sites_layer


Expand Down

0 comments on commit 32eab72

Please sign in to comment.