Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1pt] Updated 1-foot interval CatFIM with restart capabilities #809

Merged
merged 6 commits into from
Feb 17, 2023
172 changes: 111 additions & 61 deletions tools/generate_categorical_fim.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def create_csvs(output_mapping_dir, reformatted_catfim_method):
# Convert any geopackage in the root level of output_mapping_dir to CSV and rename.
gpkg_list = glob.glob(os.path.join(output_mapping_dir, '*.gpkg'))
for gpkg in gpkg_list:
print(f"creating CSV for {gpkg}")
print(f"Creating CSV for {gpkg}")
gdf = gpd.read_file(gpkg)
parent_directory = os.path.split(gpkg)[0]
if 'catfim_library' in gpkg:
Expand Down Expand Up @@ -174,32 +174,34 @@ def update_mapping_status(output_mapping_dir, output_flows_dir, nws_sites_layer,
# Import geopackage output from flows creation
flows_df = gpd.read_file(nws_sites_layer)

# Join failed sites to flows df
flows_df = flows_df.merge(mapping_df, how = 'left', on = 'nws_lid')

# Switch mapped column to no for failed sites and update status
flows_df.loc[flows_df['did_it_map'] == 'no', 'mapped'] = 'no'
flows_df.loc[flows_df['did_it_map']=='no','status'] = flows_df['status'] + flows_df['map_status']

# # Perform pass for HUCs where mapping was skipped due to missing data #TODO check with Brian
# if stage_based:
# missing_mapping_hucs =
# else:
# flows_hucs = [i.stem for i in Path(output_flows_dir).iterdir() if i.is_dir()]
# mapping_hucs = [i.stem for i in Path(output_mapping_dir).iterdir() if i.is_dir()]
# missing_mapping_hucs = list(set(flows_hucs) - set(mapping_hucs))
#
# # Update status for nws_lid in missing hucs and change mapped attribute to 'no'
# flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'status'] = flows_df['status'] + ' and all categories failed to map because missing HUC information'
# flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'mapped'] = 'no'

# Clean up GeoDataFrame and rename columns for consistency
flows_df = flows_df.drop(columns = ['did_it_map','map_status'])
flows_df = flows_df.rename(columns = {'nws_lid':'ahps_lid'})

# Write out to file
flows_df.to_file(nws_sites_layer)
try:
# Join failed sites to flows df
flows_df = flows_df.merge(mapping_df, how = 'left', on = 'nws_lid')

# Switch mapped column to no for failed sites and update status
flows_df.loc[flows_df['did_it_map'] == 'no', 'mapped'] = 'no'
flows_df.loc[flows_df['did_it_map']=='no','status'] = flows_df['status'] + flows_df['map_status']

# # Perform pass for HUCs where mapping was skipped due to missing data #TODO check with Brian
# if stage_based:
# missing_mapping_hucs =
# else:
# flows_hucs = [i.stem for i in Path(output_flows_dir).iterdir() if i.is_dir()]
# mapping_hucs = [i.stem for i in Path(output_mapping_dir).iterdir() if i.is_dir()]
# missing_mapping_hucs = list(set(flows_hucs) - set(mapping_hucs))
#
# # Update status for nws_lid in missing hucs and change mapped attribute to 'no'
# flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'status'] = flows_df['status'] + ' and all categories failed to map because missing HUC information'
# flows_df.loc[flows_df.eval('HUC8 in @missing_mapping_hucs & mapped == "yes"'), 'mapped'] = 'no'

# Clean up GeoDataFrame and rename columns for consistency
flows_df = flows_df.drop(columns = ['did_it_map','map_status'])
flows_df = flows_df.rename(columns = {'nws_lid':'ahps_lid'})

# Write out to file
flows_df.to_file(nws_sites_layer)
except:
print("No LIDs")

def produce_inundation_map_with_stage_and_feature_ids(rem_path, catchments_path, hydroid_list, hand_stage, lid_directory, category, huc, lid, branch):

Expand Down Expand Up @@ -229,6 +231,10 @@ def produce_inundation_map_with_stage_and_feature_ids(rem_path, catchments_path,
with rasterio.open(output_tif, 'w', **profile) as dst:
dst.write(masked_reclass_rem_array, 1)

def mark_complete(site_directory):
marker_file = Path(site_directory) / 'complete.txt'
marker_file.touch()
return

def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, threshold_url, flood_categories, all_lists, past_major_interval_cap, number_of_jobs, number_of_interval_jobs, attributes_dir):
missing_huc_files = []
Expand Down Expand Up @@ -262,9 +268,14 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr
lid_directory = os.path.join(huc_directory, lid)
if not os.path.exists(lid_directory):
os.mkdir(lid_directory)
else:
complete_marker = os.path.join(lid_directory, 'complete.txt')
if os.path.exists(complete_marker):
all_messages.append([f"{lid}: already completed in previous run."])
continue
# Get stages and flows for each threshold from the WRDS API. Priority given to USGS calculated flows.
stages, flows = get_thresholds(threshold_url = threshold_url, select_by = 'nws_lid', selector = lid, threshold = 'all')

if stages == None:
all_messages.append([f'{lid}:error getting thresholds from WRDS API'])
continue
Expand All @@ -273,20 +284,28 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr
all_messages.append([f'{lid}:missing threshold stages'])
continue

# Drop columns that offend acceptance criteria
usgs_elev_df['acceptable_codes'] = (usgs_elev_df['usgs_data_coord_accuracy_code'].isin(acceptable_coord_acc_code_list)
& usgs_elev_df['usgs_data_coord_method_code'].isin(acceptable_coord_method_code_list)
& usgs_elev_df['usgs_data_alt_method_code'].isin(acceptable_alt_meth_code_list)
& usgs_elev_df['usgs_data_site_type'].isin(acceptable_site_type_list))

usgs_elev_df = usgs_elev_df.astype({'usgs_data_alt_accuracy_code': float})
usgs_elev_df['acceptable_alt_error'] = np.where(usgs_elev_df['usgs_data_alt_accuracy_code'] <= acceptable_alt_acc_thresh, True, False)

acceptable_usgs_elev_df = usgs_elev_df[(usgs_elev_df['acceptable_codes'] == True) & (usgs_elev_df['acceptable_alt_error'] == True)]

try:
# Drop columns that offend acceptance criteria
usgs_elev_df['acceptable_codes'] = (usgs_elev_df['usgs_data_coord_accuracy_code'].isin(acceptable_coord_acc_code_list)
& usgs_elev_df['usgs_data_coord_method_code'].isin(acceptable_coord_method_code_list)
& usgs_elev_df['usgs_data_alt_method_code'].isin(acceptable_alt_meth_code_list)
& usgs_elev_df['usgs_data_site_type'].isin(acceptable_site_type_list))

usgs_elev_df = usgs_elev_df.astype({'usgs_data_alt_accuracy_code': float})
usgs_elev_df['acceptable_alt_error'] = np.where(usgs_elev_df['usgs_data_alt_accuracy_code'] <= acceptable_alt_acc_thresh, True, False)

acceptable_usgs_elev_df = usgs_elev_df[(usgs_elev_df['acceptable_codes'] == True) & (usgs_elev_df['acceptable_alt_error'] == True)]
except Exception as e:
# Not sure any of the sites actually have those USGS-related
# columns in this particular file, so just assume it's fine to use

#print("(Various columns related to USGS probably not in this csv)")
acceptable_usgs_elev_df = usgs_elev_df

# Get the dem_adj_elevation value from usgs_elev_table.csv. Prioritize the value that is not from branch 0.
try:
matching_rows = acceptable_usgs_elev_df.loc[acceptable_usgs_elev_df['nws_lid'] == lid.upper(), 'dem_adj_elevation']

if len(matching_rows) == 2: # It means there are two level paths, use the one that is not 0
lid_usgs_elev = acceptable_usgs_elev_df.loc[(acceptable_usgs_elev_df['nws_lid'] == lid.upper()) & ('levpa_id' != 0), 'dem_adj_elevation'].values[0]
else:
Expand All @@ -301,6 +320,32 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr
metadata = next((item for item in all_lists if item['identifiers']['nws_lid'] == lid.upper()), False)
lid_altitude = metadata['usgs_data']['altitude']

# Filter out sites that don't have "good" data
try:
if not metadata['usgs_data']['coord_accuracy_code'] in \
acceptable_coord_acc_code_list:
print(f"\t{lid}: {metadata['usgs_data']['coord_accuracy_code']} Not in acceptable coord acc codes")
continue
if not metadata['usgs_data']['coord_method_code'] in \
acceptable_coord_method_code_list:
print(f"\t{lid}: Not in acceptable coord method codes")
continue
if not metadata['usgs_data']['alt_method_code'] in \
acceptable_alt_meth_code_list:
print(f"\t{lid}: Not in acceptable alt method codes")
continue
if not metadata['usgs_data']['site_type'] in \
acceptable_site_type_list:
print(f"\t{lid}: Not in acceptable site type codes")
continue
if not float(metadata['usgs_data']['alt_accuracy_code']) <= \
acceptable_alt_acc_thresh:
print(f"\t{lid}: Not in acceptable threshold range")
continue
except Exception as e:
print(e)
continue

### --- Do Datum Offset --- ###
#determine source of interpolated threshold flows, this will be the rating curve that will be used.
rating_curve_source = flows.get('source')
Expand Down Expand Up @@ -461,7 +506,6 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr
csv_df = csv_df.append(line_df)

except Exception as e:
print("Exception")
print(e)

# Round flow and stage columns to 2 decimal places.
Expand All @@ -476,6 +520,7 @@ def iterate_through_huc_stage_based(workspace, huc, fim_dir, huc_dictionary, thr

# If it made it to this point (i.e. no continues), there were no major preventers of mapping
all_messages.append([f'{lid}:OK'])
mark_complete(output_dir)
# Write all_messages by HUC to be scraped later.
messages_dir = os.path.join(workspace, 'messages')
if not os.path.exists(messages_dir):
Expand Down Expand Up @@ -547,30 +592,35 @@ def generate_stage_based_categorical_fim(workspace, fim_version, fim_dir, nwm_us
for row in reader:
all_messages.append(row)

# Write messages to DataFrame, split into columns, aggregate messages.
messages_df = pd.DataFrame(all_messages, columns = ['message'])

messages_df = messages_df['message'].str.split(':', n = 1, expand = True).rename(columns={0:'nws_lid', 1:'status'})
status_df = messages_df.groupby(['nws_lid'])['status'].apply(', '.join).reset_index()

# Join messages to populate status field to candidate sites. Assign
# status for null fields.
viz_out_gdf = viz_out_gdf.merge(status_df, how = 'left', on = 'nws_lid')

# viz_out_gdf['status'] = viz_out_gdf['status'].fillna('OK')

# Filter out columns and write out to file
nws_sites_layer = os.path.join(workspace, 'nws_lid_sites.gpkg')

# Add acceptance criteria to viz_out_gdf before writing
viz_out_gdf['acceptable_coord_acc_code_list'] = str(acceptable_coord_acc_code_list)
viz_out_gdf['acceptable_coord_method_code_list'] = str(acceptable_coord_method_code_list)
viz_out_gdf['acceptable_alt_acc_thresh'] = float(acceptable_alt_acc_thresh)
viz_out_gdf['acceptable_alt_meth_code_list'] = str(acceptable_alt_meth_code_list)
viz_out_gdf['acceptable_site_type_list'] = str(acceptable_site_type_list)

viz_out_gdf.to_file(nws_sites_layer, driver='GPKG')


# Only write to sites geopackage if it didn't exist yet
# (and this line shouldn't have been reached if we had an interrupted
# run previously and are picking back up with a restart)
if not os.path.exists(nws_sites_layer):

# Write messages to DataFrame, split into columns, aggregate messages.
messages_df = pd.DataFrame(all_messages, columns = ['message'])

messages_df = messages_df['message'].str.split(':', n = 1, expand = True).rename(columns={0:'nws_lid', 1:'status'})
status_df = messages_df.groupby(['nws_lid'])['status'].apply(', '.join).reset_index()

# Join messages to populate status field to candidate sites. Assign
# status for null fields.
viz_out_gdf = viz_out_gdf.merge(status_df, how = 'left', on = 'nws_lid')

# viz_out_gdf['status'] = viz_out_gdf['status'].fillna('OK')

# Add acceptance criteria to viz_out_gdf before writing
viz_out_gdf['acceptable_coord_acc_code_list'] = str(acceptable_coord_acc_code_list)
viz_out_gdf['acceptable_coord_method_code_list'] = str(acceptable_coord_method_code_list)
viz_out_gdf['acceptable_alt_acc_thresh'] = float(acceptable_alt_acc_thresh)
viz_out_gdf['acceptable_alt_meth_code_list'] = str(acceptable_alt_meth_code_list)
viz_out_gdf['acceptable_site_type_list'] = str(acceptable_site_type_list)

viz_out_gdf.to_file(nws_sites_layer, driver='GPKG')

return nws_sites_layer


Expand Down