Skip to content

Commit

Permalink
chg ! tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vitali-yanushchyk-valor committed Jun 6, 2024
1 parent d594a93 commit b4df7e7
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 174 deletions.
4 changes: 0 additions & 4 deletions src/hope_dedup_engine/apps/faces/exceptions.py

This file was deleted.

83 changes: 43 additions & 40 deletions src/hope_dedup_engine/apps/faces/utils/duplication_detector.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import os
import re
from typing import Dict, List, Set, Tuple

from django.conf import settings

Expand All @@ -11,7 +10,6 @@
from constance import config

from hope_dedup_engine.apps.core.storage import CV2DNNStorage, HDEAzureStorage, HOPEAzureStorage
from hope_dedup_engine.apps.faces.exceptions import NoFaceRegionsDetectedException


class DuplicationDetector:
Expand All @@ -27,7 +25,7 @@ def __init__(self, filename: str) -> None:
filename (str): The filename of the image to process.
"""
self.logger: logging.Logger = logging.getLogger(__name__)
self.storages: Dict[str, CV2DNNStorage | HDEAzureStorage | HOPEAzureStorage] = {
self.storages: dict[str, CV2DNNStorage | HDEAzureStorage | HOPEAzureStorage] = {
"images": HOPEAzureStorage(),
"cv2dnn": CV2DNNStorage(settings.CV2DNN_PATH),
"encoded": HDEAzureStorage(),
Expand All @@ -37,16 +35,18 @@ def __init__(self, filename: str) -> None:
if not self.storages.get("cv2dnn").exists(file):
raise FileNotFoundError(f"File {file} does not exist in storage.")

self.shape: Dict[str, int] = self._get_shape()
self.shape: dict[str, int] = self._get_shape()
self.net: cv2.dnn_Net = self._set_net(self.storages.get("cv2dnn"))

self.filename: str = filename
self.encodings_filename: str = f"{self.filename}.npy"
self.scale_factor: float = config.SCALE_FACTOR
self.mean_values: Tuple[float, float, float] = tuple(map(float, config.MEAN_VALUES.split(", ")))
self.scale_factor: float = config.BLOB_FROM_IMAGE_SCALE_FACTOR
self.mean_values: tuple[float, float, float] = tuple(map(float, config.BLOB_FROM_IMAGE_MEAN_VALUES.split(", ")))
# self.mean_values: config.BLOB_FROM_IMAGE_MEAN_VALUES
self.face_detection_confidence: float = config.FACE_DETECTION_CONFIDENCE
self.face_detection_model: str = config.FACE_DETECTION_MODEL
self.distance_threshold: float = config.DISTANCE_THRESHOLD
self.face_encodings_model: str = config.FACE_ENCODINGS_MODEL
self.face_encodings_num_jitters: int = config.FACE_ENCODINGS_NUM_JITTERS
self.distance_threshold: float = config.FACE_DISTANCE_THRESHOLD
self.nms_threshold: float = config.NMS_THRESHOLD

@property
Expand All @@ -62,21 +62,21 @@ def _set_net(self, storage: CV2DNNStorage) -> cv2.dnn_Net:
net.setPreferableTarget(int(config.DNN_TARGET))
return net

def _get_shape(self) -> Dict[str, int]:
def _get_shape(self) -> dict[str, int]:
pattern = r"input_shape\s*\{\s*" r"dim:\s*(\d+)\s*" r"dim:\s*(\d+)\s*" r"dim:\s*(\d+)\s*" r"dim:\s*(\d+)\s*\}"
with open(settings.PROTOTXT_FILE, "r") as file:
match = re.search(pattern, file.read())
if not match:
if match := re.search(pattern, file.read()):
return {
"batch_size": int(match.group(1)),
"channels": int(match.group(2)),
"height": int(match.group(3)),
"width": int(match.group(4)),
}
else:
raise ValueError("Could not find input_shape in prototxt file.")
return {
"batch_size": int(match.group(1)),
"channels": int(match.group(2)),
"height": int(match.group(3)),
"width": int(match.group(4)),
}

def _get_face_detections_dnn(self) -> List[Tuple[int, int, int, int]]:
face_regions: List[Tuple[int, int, int, int]] = []
def _get_face_detections_dnn(self) -> list[tuple[int, int, int, int]]:
face_regions: list[tuple[int, int, int, int]] = []
try:
with self.storages["images"].open(self.filename, "rb") as img_file:
img_array = np.frombuffer(img_file.read(), dtype=np.uint8)
Expand Down Expand Up @@ -111,12 +111,12 @@ def _get_face_detections_dnn(self) -> List[Tuple[int, int, int, int]]:
for i in indices:
face_regions.append(tuple(boxes[i]))
except Exception as e:
self.logger.exception(f"Error processing face detection for image {self.filename}", exc_info=e)
self.logger.exception(f"Error processing face detection for image {self.filename}", exc_info=True)
raise e
return face_regions

def _load_encodings_all(self) -> Dict[str, List[np.ndarray]]:
data: Dict[str, List[np.ndarray]] = {}
def _load_encodings_all(self) -> dict[str, list[np.ndarray]]:
data: dict[str, list[np.ndarray]] = {}
try:
_, files = self.storages["encoded"].listdir("")
for file in files:
Expand All @@ -136,31 +136,34 @@ def _encode_face(self) -> None:
face_regions = self._get_face_detections_dnn()
if not face_regions:
self.logger.error(f"No face regions detected in image {self.filename}")
raise NoFaceRegionsDetectedException(f"No face regions detected in image {self.filename}")
for region in face_regions:
if isinstance(region, (list, tuple)) and len(region) == 4:
top, right, bottom, left = region
# Compute the face encodings for the face regions in the image
face_encodings = face_recognition.face_encodings(
image, [(top, right, bottom, left)], model=self.face_detection_model
)
encodings.extend(face_encodings)
else:
self.logger.error(f"Invalid face region {region}")
with self.storages["encoded"].open(self.encodings_filename, "wb") as f:
np.save(f, encodings)
else:
for region in face_regions:
if isinstance(region, (list, tuple)) and len(region) == 4:
top, right, bottom, left = region
# Compute the face encodings for the face regions in the image
face_encodings = face_recognition.face_encodings(
image,
[(top, right, bottom, left)],
num_jitters=self.face_encodings_num_jitters,
model=self.face_encodings_model,
)
encodings.extend(face_encodings)
else:
self.logger.error(f"Invalid face region {region}")
with self.storages["encoded"].open(self.encodings_filename, "wb") as f:
np.save(f, encodings)
except Exception as e:
self.logger.exception(f"Error processing face encodings for image {self.filename}", exc_info=e)
self.logger.exception(f"Error processing face encodings for image {self.filename}", exc_info=True)
raise e

def find_duplicates(self) -> Tuple[str]:
def find_duplicates(self) -> tuple[str]:
"""
Find and return a list of duplicate images based on face encodings.
Returns:
Tuple[str]: A tuple of filenames of duplicate images.
tuple[str]: A tuple of filenames of duplicate images.
"""
duplicated_images: Set[str] = set()
duplicated_images: set[str] = set()
path1 = self.filename
try:
if not self.has_encodings:
Expand All @@ -186,5 +189,5 @@ def find_duplicates(self) -> Tuple[str]:
break
return tuple(duplicated_images)
except Exception as e:
self.logger.exception(f"Error finding duplicates for image {path1}", exc_info=e)
self.logger.exception(f"Error finding duplicates for image {path1}", exc_info=True)
raise e
47 changes: 29 additions & 18 deletions src/hope_dedup_engine/config/fragments/constance.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
"Specifies the target device on which OpenCV will perform the deep learning computations.",
"dnn_target",
),
"SCALE_FACTOR": (
"BLOB_FROM_IMAGE_SCALE_FACTOR": (
1.0,
"""Specifies the scaling factor applied to all pixel values when converting an image to a blob. Mostly
it equals 1.0 for no scaling or 1.0/255.0 and normalizing to the [0, 1] range.
Remember that mean values are also applied to scaling factor. Both scaling factor and mean values
Remember that scaling factor is also applied to mean values. Both scaling factor and mean values
must be the same for the training and inference to get the correct results.
""",
float,
),
"MEAN_VALUES": (
"BLOB_FROM_IMAGE_MEAN_VALUES": (
"104.0, 177.0, 123.0",
"""Specifies the mean BGR values used in image preprocessing to normalize pixel values by subtracting
the mean values of the training dataset. This helps in reducing model bias and improving accuracy.
Expand Down Expand Up @@ -54,7 +54,25 @@
""",
float,
),
"DISTANCE_THRESHOLD": (
"FACE_ENCODINGS_NUM_JITTERS": (
1,
"""
Specifies the number of times to re-sample the face when calculating the encoding. Higher values increase
accuracy but are computationally more expensive and slower. For example, setting 'num_jitters' to 100 makes
the process 100 times slower.
""",
int,
),
"FACE_ENCODINGS_MODEL": (
"small",
"""
Specifies the model type used for encoding face landmarks. It can be either 'small' which is faster and
detects only 5 key facial landmarks, or 'large' which is more precise and identifies 68 key facial landmarks
but requires more computational resources.
""",
"face_encodings_model",
),
"FACE_DISTANCE_THRESHOLD": (
0.5,
"""
Specifies the maximum allowable distance between two face embeddings for them to be considered a match. It helps
Expand All @@ -63,14 +81,6 @@
""",
float,
),
"FACE_DETECTION_MODEL": (
"hog",
"""
Specifies the model type used for face detection. It can be either faster 'hog'(Histogram of Oriented Gradients)
or more accurate 'cnn'(Convolutional Neural Network).",
""",
"face_detection_model",
),
}


Expand All @@ -83,12 +93,13 @@
"fields": (
"DNN_BACKEND",
"DNN_TARGET",
"SCALE_FACTOR",
"MEAN_VALUES",
"BLOB_FROM_IMAGE_SCALE_FACTOR",
"BLOB_FROM_IMAGE_MEAN_VALUES",
"FACE_DETECTION_CONFIDENCE",
"NMS_THRESHOLD",
"DISTANCE_THRESHOLD",
"FACE_DETECTION_MODEL",
"FACE_ENCODINGS_NUM_JITTERS",
"FACE_ENCODINGS_MODEL",
"FACE_DISTANCE_THRESHOLD",
),
"collapse": False,
},
Expand All @@ -111,10 +122,10 @@
"choices": ((cv2.dnn.DNN_TARGET_CPU, "DNN_TARGET_CPU"),),
},
],
"face_detection_model": [
"face_encodings_model": [
"django.forms.ChoiceField",
{
"choices": (("hog", "HOG"), ("cnn", "CNN")),
"choices": (("small", "SMALL"), ("large", "LARGE")),
},
],
"tuple_field": ["hope_dedup_engine.apps.faces.validators.MeanValuesTupleField", {}],
Expand Down
28 changes: 26 additions & 2 deletions tests/faces/faces_const.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,29 @@
from typing import Dict, Final
from typing import Final

FILENAME: Final[str] = "test_file.jpg"
FILENAMES: Final[list[str]] = ["test_file.jpg", "test_file2.jpg"]
DEPLOY_PROTO_SHAPE: Final[Dict[str, int]] = {"batch_size": 1, "channels": 3, "height": 300, "width": 300}
DEPLOY_PROTO_CONTENT: Final[str] = "input_shape { dim: 1 dim: 3 dim: 300 dim: 300 }"
DEPLOY_PROTO_SHAPE: Final[dict[str, int]] = {"batch_size": 1, "channels": 3, "height": 300, "width": 300}
FACE_REGIONS_INVALID: Final[list[list[tuple[int, int, int, int]]]] = [[], [(0, 0, 10)]]
FACE_REGIONS_VALID: Final[list[tuple[int, int, int, int]]] = [
(10, 10, 20, 20),
(30, 30, 40, 40),
]
FACE_DETECTION_CONFIDENCE: Final[float] = 0.7
FACE_DETECTIONS: Final[list[tuple[float]]] = [
(0, 0, 0.95, 0.1, 0.1, 0.2, 0.2), # with confidence 0.95 -> valid detection
(0, 0, 0.75, 0.3, 0.3, 0.4, 0.4), # with confidence 0.75 -> valid detection
(0, 0, 0.15, 0.1, 0.1, 0.2, 0.2), # with confidence 0.15 -> invalid detection
]
IMAGE_SIZE: Final[tuple[int, int, int]] = (100, 100, 3) # Size of the image after decoding (h, w, number of channels)
RESIZED_IMAGE_SIZE: Final[tuple[int, int, int]] = (
300,
300,
3,
) # Size of the image after resizing for processing (h, w, number of channels)
BLOB_SHAPE: Final[tuple[int, int, int, int]] = (
1,
3,
300,
300,
) # Shape of the blob (4D tensor) for input to the neural network (batch_size, channels, h, w)
Loading

0 comments on commit b4df7e7

Please sign in to comment.