Skip to content

Serialise the harlequin beetle jobs #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ credentials.json
*.out
data/
models/
keys/
2 changes: 1 addition & 1 deletion print_deployments.py → 01_print_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def print_deployments(include_inactive=False, subset_countries=None, print_image
for dep in sorted(all_deps):
dep_info = [x for x in country_depl if x['deployment_id'] == dep][0]
print(f"\033[1m - Deployment ID: {dep_info['deployment_id']}, Name: {dep_info['location_name']}, Deployment Key: '{dep_info['location_name']} - {dep_info['camera_id']}'\033[0m")
print(f" Location ID: {dep_info['location_id']}, Latitute: {dep_info['lat']}, Longitute: {dep_info['lon']}, Camera ID: {dep_info['camera_id']}, System ID: {dep_info['system_id']}, Status: {dep_info['status']}")
print(f" Location ID: {dep_info['location_id']}, Country code: {dep_info['country_code'].lower()}, Latitute: {dep_info['lat']}, Longitute: {dep_info['lon']}, Camera ID: {dep_info['camera_id']}, System ID: {dep_info['system_id']}, Status: {dep_info['status']}")

# get the number of images for this deployment
prefix = f"{dep_info['deployment_id']}/snapshot_images"
Expand Down
82 changes: 82 additions & 0 deletions 02_generate_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import boto3
import argparse
import json

def list_s3_keys(bucket_name, deployment_id=""):
"""
List all keys in an S3 bucket under a specific prefix.

Parameters:
bucket_name (str): The name of the S3 bucket.
prefix (str): The prefix to filter keys (default: "").

Returns:
list: A list of S3 object keys.
"""
with open("./credentials.json", encoding="utf-8") as config_file:
aws_credentials = json.load(config_file)


session = boto3.Session(
aws_access_key_id=aws_credentials["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=aws_credentials["AWS_SECRET_ACCESS_KEY"],
region_name=aws_credentials["AWS_REGION"],
)
s3_client = session.client("s3", endpoint_url=aws_credentials["AWS_URL_ENDPOINT"])

keys = []
continuation_token = None

while True:
list_kwargs = {
"Bucket": bucket_name,
"Prefix": deployment_id,
}
if continuation_token:
list_kwargs["ContinuationToken"] = continuation_token

response = s3_client.list_objects_v2(**list_kwargs)

# Add object keys to the list
for obj in response.get("Contents", []):
keys.append(obj["Key"])

# Check if there are more objects to list
if response.get("IsTruncated"): # If True, there are more results
continuation_token = response["NextContinuationToken"]
else:
break

return keys

def save_keys_to_file(keys, output_file):
"""
Save S3 keys to a file, one per line.

Parameters:
keys (list): List of S3 keys.
output_file (str): Path to the output file.
"""
with open(output_file, "w") as f:
for key in keys:
f.write(key + "\n")

def main():
parser = argparse.ArgumentParser(description="Generate a file containing S3 keys from a bucket.")
parser.add_argument("--bucket", type=str, required=True, help="Name of the S3 bucket.")
parser.add_argument("--deployment_id", type=str, default="", help="The deployment id to filter objects. If set to '' then all deployments are used. (default: '')")
parser.add_argument("--output_file", type=str, default="s3_keys.txt", help="Output file to save S3 keys.")
args = parser.parse_args()



# List keys from the specified S3 bucket and prefix
print(f"Listing keys from bucket '{args.bucket}' with deployment '{args.deployment_id}'...")
keys = list_s3_keys(args.bucket, args.deployment_id)

# Save keys to the output file
save_keys_to_file(keys, args.output_file)
print(f"Saved {len(keys)} keys to {args.output_file}")

if __name__ == "__main__":
main()
61 changes: 61 additions & 0 deletions 03_pre_chop_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import json
import argparse
from math import ceil
import os

def load_workload(input_file, file_extension):
"""
Load workload from a file. Assumes each line contains an S3 key.
"""
with open(input_file, 'r') as f:
all_keys = [line.strip() for line in f.readlines()]
subset_keys = [x for x in all_keys if x.endswith(file_extension)]

# remove corrupt keys
subset_keys = [x for x in subset_keys if not os.path.basename(x).startswith('$')]

# remove keys uploaded from the recycle bin (legacy code)
subset_keys = [x for x in subset_keys if not 'recycle' in x]

return subset_keys

def split_workload(keys, chunk_size):
"""
Split a list of keys into chunks of a specified size.
"""
num_chunks = ceil(len(keys) / chunk_size)
chunks = {
str(i + 1): {"keys": keys[i * chunk_size: (i + 1) * chunk_size]}
for i in range(num_chunks)
}
return chunks

def save_chunks(chunks, output_file):
"""
Save chunks to a JSON file.
"""
with open(output_file, 'w') as f:
json.dump(chunks, f, indent=4)

def main():
parser = argparse.ArgumentParser(description="Pre-chop S3 workload into manageable chunks.")
parser.add_argument("--input_file", type=str, required=True, help="Path to file containing S3 keys, one per line.")
parser.add_argument("--file_extension", type=str, required=True, default="jpg|jpeg", help="File extensions to be chuncked. If empty, all extensions used.")
parser.add_argument("--chunk_size", type=int, default=100, help="Number of keys per chunk.")
parser.add_argument("--output_file", type=str, required=True, help="Path to save the output JSON file.")
args = parser.parse_args()

# Load the workload from the input file
keys = load_workload(args.input_file, args.file_extension)

# Split the workload into chunks
chunks = split_workload(keys, args.chunk_size)

# Save the chunks to a JSON file
save_chunks(chunks, args.output_file)

print(f"Successfully split {len(keys)} keys into {len(chunks)} chunks.")
print(f"Chunks saved to {args.output_file}")

if __name__ == "__main__":
main()
217 changes: 217 additions & 0 deletions 04_process_chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
import argparse
import boto3
import json
import os
from utils.inference_scripts import perform_inf
from boto3.s3.transfer import TransferConfig
import torch
from utils.custom_models import load_models


# Transfer configuration for optimised S3 download
transfer_config = TransferConfig(
max_concurrency=20, # Increase the number of concurrent transfers
multipart_threshold=8 * 1024 * 1024, # 8MB
max_io_queue=1000,
io_chunksize=262144, # 256KB
)

def initialise_session(credentials_file="credentials.json"):
"""
Load AWS and API credentials from a configuration file and initialise an AWS session.

Args:
credentials_file (str): Path to the credentials JSON file.

Returns:
boto3.Client: Initialised S3 client.
"""
with open(credentials_file, encoding="utf-8") as config_file:
aws_credentials = json.load(config_file)
session = boto3.Session(
aws_access_key_id=aws_credentials["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=aws_credentials["AWS_SECRET_ACCESS_KEY"],
region_name=aws_credentials["AWS_REGION"],
)
client = session.client("s3", endpoint_url=aws_credentials["AWS_URL_ENDPOINT"])
return client


def download_and_analyse(
keys,
output_dir,
bucket_name,
client,
remove_image=True,
perform_inference=True,
localisation_model=None,
binary_model=None,
order_model=None,
order_labels=None,
species_model=None,
species_labels=None,
device=None,
order_data_thresholds=None,
csv_file="results.csv",
):
"""
Download images from S3 and perform analysis.

Args:
keys (list): List of S3 keys to process.
output_dir (str): Directory to save downloaded files and results.
bucket_name (str): S3 bucket name.
client (boto3.Client): Initialised S3 client.
Other args: Parameters for inference and analysis.
"""
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)

for key in keys:
local_path = os.path.join(output_dir, os.path.basename(key))
print(f"Downloading {key} to {local_path}")
client.download_file(bucket_name, key, local_path, Config=transfer_config)

# Perform image analysis if enabled
print(f"Analysing {local_path}")
if perform_inference:
perform_inf(
local_path,
bucket_name=bucket_name,
loc_model=localisation_model,
binary_model=binary_model,
order_model=order_model,
order_labels=order_labels,
regional_model=species_model,
regional_category_map=species_labels,
device=device,
order_data_thresholds=order_data_thresholds,
csv_file=csv_file,
save_crops=True,
)
# Remove the image if cleanup is enabled
if remove_image:
os.remove(local_path)


def main(
chunk_id,
json_file,
output_dir,
bucket_name,
credentials_file="credentials.json",
remove_image=True,
perform_inference=True,
localisation_model=None,
binary_model=None,
order_model=None,
order_labels=None,
species_model=None,
species_labels=None,
device=None,
order_data_thresholds=None,
csv_file="results.csv",
):
"""
Main function to process a specific chunk of S3 keys.

Args:
chunk_id (str): ID of the chunk to process (e.g., chunk_0).
json_file (str): Path to the JSON file with key chunks.
output_dir (str): Directory to save results.
bucket_name (str): S3 bucket name.
Other args: Parameters for download and analysis.
"""
with open(json_file, "r") as f:
chunks = json.load(f)

if chunk_id not in chunks:
raise ValueError(f"Chunk ID {chunk_id} not found in JSON file.")

client = initialise_session(credentials_file)

keys = chunks[chunk_id]['keys']
download_and_analyse(
keys=keys,
output_dir=output_dir,
bucket_name=bucket_name,
client=client,
remove_image=remove_image,
perform_inference=perform_inference,
localisation_model=localisation_model,
binary_model=binary_model,
order_model=order_model,
order_labels=order_labels,
species_model=species_model,
species_labels=species_labels,
device=device,
order_data_thresholds=order_data_thresholds,
csv_file=csv_file,
)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process a specific chunk of S3 keys.")
parser.add_argument("--chunk_id", required=True, help="ID of the chunk to process (e.g., 0, 1, 2, 3).")
parser.add_argument("--json_file", required=True, help="Path to the JSON file with key chunks.")
parser.add_argument("--output_dir", required=True, help="Directory to save downloaded files and analysis results.", default="./data/")
parser.add_argument("--bucket_name", required=True, help="Name of the S3 bucket.")
parser.add_argument("--credentials_file", default="credentials.json", help="Path to AWS credentials file.")
parser.add_argument("--remove_image", action="store_true", help="Remove images after processing.")
parser.add_argument("--perform_inference", action="store_true", help="Enable inference.")
parser.add_argument("--localisation_model_path", type=str, help="Path to the localisation model weights.", default="./models/v1_localizmodel_2021-08-17-12-06.pt")
parser.add_argument("--binary_model_path", type=str, help="Path to the binary model weights.", default="./models/moth-nonmoth-effv2b3_20220506_061527_30.pth")
parser.add_argument("--order_model_path", type=str, help="Path to the order model weights.", default="./models/dhc_best_128.pth")
parser.add_argument("--order_labels", type=str, help="Path to the order labels file.")
parser.add_argument("--species_model_path", type=str, help="Path to the species model weights.", default="./models/turing-costarica_v03_resnet50_2024-06-04-16-17_state.pt")
parser.add_argument("--species_labels", type=str, help="Path to the species labels file.",
default="./models/03_costarica_data_category_map.json")
parser.add_argument("--device", type=str, default="cpu", help="Device to run inference on (e.g., cpu or cuda).")
parser.add_argument("--order_thresholds_path", type=str, help="Path to the order data thresholds file.", default="./models/thresholdsTestTrain.csv")
parser.add_argument("--csv_file", default="results.csv", help="Path to save analysis results.")


args = parser.parse_args()

if torch.cuda.is_available():
device = torch.device("cuda")
print(
"\033[95m\033[1mCuda available, using GPU "
+ "\N{White Heavy Check Mark}\033[0m\033[0m"
)
else:
device = torch.device("cpu")
print(
"\033[95m\033[1mCuda not available, using CPU "
+ "\N{Cross Mark}\033[0m\033[0m"
)

models = load_models(
device,
getattr(args, 'localisation_model_path'),
getattr(args, 'binary_model_path'),
getattr(args, 'order_model_path'),
getattr(args, 'order_thresholds_path'),
getattr(args, 'species_model_path'),
getattr(args, 'species_labels')
)


main(
chunk_id=args.chunk_id,
json_file=args.json_file,
output_dir=args.output_dir,
bucket_name=args.bucket_name,
credentials_file=args.credentials_file,
remove_image=args.remove_image,
perform_inference=args.perform_inference,
localisation_model=models['localisation_model'],
binary_model=models['classification_model'],
order_model=models['order_model'],
order_labels=models['order_model_labels'],
order_data_thresholds=models['order_model_thresholds'],
species_model=models['species_model'],
species_labels=models['species_model_labels'],
device=device,
csv_file=args.csv_file,
)
Loading
Loading