diff --git a/src/hope_dedup_engine/apps/faces/exceptions.py b/src/hope_dedup_engine/apps/faces/exceptions.py deleted file mode 100644 index 4d9d1331..00000000 --- a/src/hope_dedup_engine/apps/faces/exceptions.py +++ /dev/null @@ -1,4 +0,0 @@ -class NoFaceRegionsDetectedException(Exception): - def __init__(self, message="No face regions detected"): - self.message = message - super().__init__(self.message) diff --git a/src/hope_dedup_engine/apps/faces/utils/duplication_detector.py b/src/hope_dedup_engine/apps/faces/utils/duplication_detector.py index 4418f84f..d20c8d91 100644 --- a/src/hope_dedup_engine/apps/faces/utils/duplication_detector.py +++ b/src/hope_dedup_engine/apps/faces/utils/duplication_detector.py @@ -1,7 +1,6 @@ import logging import os import re -from typing import Dict, List, Set, Tuple from django.conf import settings @@ -11,7 +10,6 @@ from constance import config from hope_dedup_engine.apps.core.storage import CV2DNNStorage, HDEAzureStorage, HOPEAzureStorage -from hope_dedup_engine.apps.faces.exceptions import NoFaceRegionsDetectedException class DuplicationDetector: @@ -27,7 +25,7 @@ def __init__(self, filename: str) -> None: filename (str): The filename of the image to process. """ self.logger: logging.Logger = logging.getLogger(__name__) - self.storages: Dict[str, CV2DNNStorage | HDEAzureStorage | HOPEAzureStorage] = { + self.storages: dict[str, CV2DNNStorage | HDEAzureStorage | HOPEAzureStorage] = { "images": HOPEAzureStorage(), "cv2dnn": CV2DNNStorage(settings.CV2DNN_PATH), "encoded": HDEAzureStorage(), @@ -37,16 +35,18 @@ def __init__(self, filename: str) -> None: if not self.storages.get("cv2dnn").exists(file): raise FileNotFoundError(f"File {file} does not exist in storage.") - self.shape: Dict[str, int] = self._get_shape() + self.shape: dict[str, int] = self._get_shape() self.net: cv2.dnn_Net = self._set_net(self.storages.get("cv2dnn")) self.filename: str = filename self.encodings_filename: str = f"{self.filename}.npy" - self.scale_factor: float = config.SCALE_FACTOR - self.mean_values: Tuple[float, float, float] = tuple(map(float, config.MEAN_VALUES.split(", "))) + self.scale_factor: float = config.BLOB_FROM_IMAGE_SCALE_FACTOR + self.mean_values: tuple[float, float, float] = tuple(map(float, config.BLOB_FROM_IMAGE_MEAN_VALUES.split(", "))) + # self.mean_values: config.BLOB_FROM_IMAGE_MEAN_VALUES self.face_detection_confidence: float = config.FACE_DETECTION_CONFIDENCE - self.face_detection_model: str = config.FACE_DETECTION_MODEL - self.distance_threshold: float = config.DISTANCE_THRESHOLD + self.face_encodings_model: str = config.FACE_ENCODINGS_MODEL + self.face_encodings_num_jitters: int = config.FACE_ENCODINGS_NUM_JITTERS + self.distance_threshold: float = config.FACE_DISTANCE_THRESHOLD self.nms_threshold: float = config.NMS_THRESHOLD @property @@ -62,21 +62,21 @@ def _set_net(self, storage: CV2DNNStorage) -> cv2.dnn_Net: net.setPreferableTarget(int(config.DNN_TARGET)) return net - def _get_shape(self) -> Dict[str, int]: + def _get_shape(self) -> dict[str, int]: pattern = r"input_shape\s*\{\s*" r"dim:\s*(\d+)\s*" r"dim:\s*(\d+)\s*" r"dim:\s*(\d+)\s*" r"dim:\s*(\d+)\s*\}" with open(settings.PROTOTXT_FILE, "r") as file: - match = re.search(pattern, file.read()) - if not match: + if match := re.search(pattern, file.read()): + return { + "batch_size": int(match.group(1)), + "channels": int(match.group(2)), + "height": int(match.group(3)), + "width": int(match.group(4)), + } + else: raise ValueError("Could not find input_shape in prototxt file.") - return { - "batch_size": int(match.group(1)), - "channels": int(match.group(2)), - "height": int(match.group(3)), - "width": int(match.group(4)), - } - def _get_face_detections_dnn(self) -> List[Tuple[int, int, int, int]]: - face_regions: List[Tuple[int, int, int, int]] = [] + def _get_face_detections_dnn(self) -> list[tuple[int, int, int, int]]: + face_regions: list[tuple[int, int, int, int]] = [] try: with self.storages["images"].open(self.filename, "rb") as img_file: img_array = np.frombuffer(img_file.read(), dtype=np.uint8) @@ -111,12 +111,12 @@ def _get_face_detections_dnn(self) -> List[Tuple[int, int, int, int]]: for i in indices: face_regions.append(tuple(boxes[i])) except Exception as e: - self.logger.exception(f"Error processing face detection for image {self.filename}", exc_info=e) + self.logger.exception(f"Error processing face detection for image {self.filename}", exc_info=True) raise e return face_regions - def _load_encodings_all(self) -> Dict[str, List[np.ndarray]]: - data: Dict[str, List[np.ndarray]] = {} + def _load_encodings_all(self) -> dict[str, list[np.ndarray]]: + data: dict[str, list[np.ndarray]] = {} try: _, files = self.storages["encoded"].listdir("") for file in files: @@ -136,31 +136,34 @@ def _encode_face(self) -> None: face_regions = self._get_face_detections_dnn() if not face_regions: self.logger.error(f"No face regions detected in image {self.filename}") - raise NoFaceRegionsDetectedException(f"No face regions detected in image {self.filename}") - for region in face_regions: - if isinstance(region, (list, tuple)) and len(region) == 4: - top, right, bottom, left = region - # Compute the face encodings for the face regions in the image - face_encodings = face_recognition.face_encodings( - image, [(top, right, bottom, left)], model=self.face_detection_model - ) - encodings.extend(face_encodings) - else: - self.logger.error(f"Invalid face region {region}") - with self.storages["encoded"].open(self.encodings_filename, "wb") as f: - np.save(f, encodings) + else: + for region in face_regions: + if isinstance(region, (list, tuple)) and len(region) == 4: + top, right, bottom, left = region + # Compute the face encodings for the face regions in the image + face_encodings = face_recognition.face_encodings( + image, + [(top, right, bottom, left)], + num_jitters=self.face_encodings_num_jitters, + model=self.face_encodings_model, + ) + encodings.extend(face_encodings) + else: + self.logger.error(f"Invalid face region {region}") + with self.storages["encoded"].open(self.encodings_filename, "wb") as f: + np.save(f, encodings) except Exception as e: - self.logger.exception(f"Error processing face encodings for image {self.filename}", exc_info=e) + self.logger.exception(f"Error processing face encodings for image {self.filename}", exc_info=True) raise e - def find_duplicates(self) -> Tuple[str]: + def find_duplicates(self) -> tuple[str]: """ Find and return a list of duplicate images based on face encodings. Returns: - Tuple[str]: A tuple of filenames of duplicate images. + tuple[str]: A tuple of filenames of duplicate images. """ - duplicated_images: Set[str] = set() + duplicated_images: set[str] = set() path1 = self.filename try: if not self.has_encodings: @@ -186,5 +189,5 @@ def find_duplicates(self) -> Tuple[str]: break return tuple(duplicated_images) except Exception as e: - self.logger.exception(f"Error finding duplicates for image {path1}", exc_info=e) + self.logger.exception(f"Error finding duplicates for image {path1}", exc_info=True) raise e diff --git a/src/hope_dedup_engine/config/fragments/constance.py b/src/hope_dedup_engine/config/fragments/constance.py index b3852113..555dbc49 100644 --- a/src/hope_dedup_engine/config/fragments/constance.py +++ b/src/hope_dedup_engine/config/fragments/constance.py @@ -17,16 +17,16 @@ "Specifies the target device on which OpenCV will perform the deep learning computations.", "dnn_target", ), - "SCALE_FACTOR": ( + "BLOB_FROM_IMAGE_SCALE_FACTOR": ( 1.0, """Specifies the scaling factor applied to all pixel values when converting an image to a blob. Mostly it equals 1.0 for no scaling or 1.0/255.0 and normalizing to the [0, 1] range. - Remember that mean values are also applied to scaling factor. Both scaling factor and mean values + Remember that scaling factor is also applied to mean values. Both scaling factor and mean values must be the same for the training and inference to get the correct results. """, float, ), - "MEAN_VALUES": ( + "BLOB_FROM_IMAGE_MEAN_VALUES": ( "104.0, 177.0, 123.0", """Specifies the mean BGR values used in image preprocessing to normalize pixel values by subtracting the mean values of the training dataset. This helps in reducing model bias and improving accuracy. @@ -54,7 +54,25 @@ """, float, ), - "DISTANCE_THRESHOLD": ( + "FACE_ENCODINGS_NUM_JITTERS": ( + 1, + """ + Specifies the number of times to re-sample the face when calculating the encoding. Higher values increase + accuracy but are computationally more expensive and slower. For example, setting 'num_jitters' to 100 makes + the process 100 times slower. + """, + int, + ), + "FACE_ENCODINGS_MODEL": ( + "small", + """ + Specifies the model type used for encoding face landmarks. It can be either 'small' which is faster and + detects only 5 key facial landmarks, or 'large' which is more precise and identifies 68 key facial landmarks + but requires more computational resources. + """, + "face_encodings_model", + ), + "FACE_DISTANCE_THRESHOLD": ( 0.5, """ Specifies the maximum allowable distance between two face embeddings for them to be considered a match. It helps @@ -63,14 +81,6 @@ """, float, ), - "FACE_DETECTION_MODEL": ( - "hog", - """ - Specifies the model type used for face detection. It can be either faster 'hog'(Histogram of Oriented Gradients) - or more accurate 'cnn'(Convolutional Neural Network).", - """, - "face_detection_model", - ), } @@ -83,12 +93,13 @@ "fields": ( "DNN_BACKEND", "DNN_TARGET", - "SCALE_FACTOR", - "MEAN_VALUES", + "BLOB_FROM_IMAGE_SCALE_FACTOR", + "BLOB_FROM_IMAGE_MEAN_VALUES", "FACE_DETECTION_CONFIDENCE", "NMS_THRESHOLD", - "DISTANCE_THRESHOLD", - "FACE_DETECTION_MODEL", + "FACE_ENCODINGS_NUM_JITTERS", + "FACE_ENCODINGS_MODEL", + "FACE_DISTANCE_THRESHOLD", ), "collapse": False, }, @@ -111,10 +122,10 @@ "choices": ((cv2.dnn.DNN_TARGET_CPU, "DNN_TARGET_CPU"),), }, ], - "face_detection_model": [ + "face_encodings_model": [ "django.forms.ChoiceField", { - "choices": (("hog", "HOG"), ("cnn", "CNN")), + "choices": (("small", "SMALL"), ("large", "LARGE")), }, ], "tuple_field": ["hope_dedup_engine.apps.faces.validators.MeanValuesTupleField", {}], diff --git a/tests/faces/faces_const.py b/tests/faces/faces_const.py index 1593ebfb..7a506a4e 100644 --- a/tests/faces/faces_const.py +++ b/tests/faces/faces_const.py @@ -1,5 +1,29 @@ -from typing import Dict, Final +from typing import Final FILENAME: Final[str] = "test_file.jpg" FILENAMES: Final[list[str]] = ["test_file.jpg", "test_file2.jpg"] -DEPLOY_PROTO_SHAPE: Final[Dict[str, int]] = {"batch_size": 1, "channels": 3, "height": 300, "width": 300} +DEPLOY_PROTO_CONTENT: Final[str] = "input_shape { dim: 1 dim: 3 dim: 300 dim: 300 }" +DEPLOY_PROTO_SHAPE: Final[dict[str, int]] = {"batch_size": 1, "channels": 3, "height": 300, "width": 300} +FACE_REGIONS_INVALID: Final[list[list[tuple[int, int, int, int]]]] = [[], [(0, 0, 10)]] +FACE_REGIONS_VALID: Final[list[tuple[int, int, int, int]]] = [ + (10, 10, 20, 20), + (30, 30, 40, 40), +] +FACE_DETECTION_CONFIDENCE: Final[float] = 0.7 +FACE_DETECTIONS: Final[list[tuple[float]]] = [ + (0, 0, 0.95, 0.1, 0.1, 0.2, 0.2), # with confidence 0.95 -> valid detection + (0, 0, 0.75, 0.3, 0.3, 0.4, 0.4), # with confidence 0.75 -> valid detection + (0, 0, 0.15, 0.1, 0.1, 0.2, 0.2), # with confidence 0.15 -> invalid detection +] +IMAGE_SIZE: Final[tuple[int, int, int]] = (100, 100, 3) # Size of the image after decoding (h, w, number of channels) +RESIZED_IMAGE_SIZE: Final[tuple[int, int, int]] = ( + 300, + 300, + 3, +) # Size of the image after resizing for processing (h, w, number of channels) +BLOB_SHAPE: Final[tuple[int, int, int, int]] = ( + 1, + 3, + 300, + 300, +) # Shape of the blob (4D tensor) for input to the neural network (batch_size, channels, h, w) diff --git a/tests/faces/fixtures/duplication_detector.py b/tests/faces/fixtures/duplication_detector.py index 6a833e5d..af547724 100644 --- a/tests/faces/fixtures/duplication_detector.py +++ b/tests/faces/fixtures/duplication_detector.py @@ -9,7 +9,15 @@ from hope_dedup_engine.apps.core.storage import CV2DNNStorage, HDEAzureStorage, HOPEAzureStorage from hope_dedup_engine.apps.faces.utils.duplication_detector import DuplicationDetector -from ..faces_const import FILENAME +from ..faces_const import ( + BLOB_SHAPE, + DEPLOY_PROTO_CONTENT, + FACE_DETECTIONS, + FACE_REGIONS_VALID, + FILENAME, + IMAGE_SIZE, + RESIZED_IMAGE_SIZE, +) @pytest.fixture @@ -20,62 +28,36 @@ def dd(mock_hope_azure_storage, mock_cv2dnn_storage, mock_hde_azure_storage, moc patch("hope_dedup_engine.apps.faces.utils.duplication_detector.HDEAzureStorage", mock_hde_azure_storage), patch("builtins.open", mock_prototxt_file), ): - mock_cv2dnn_storage.exists.return_value = False - detector = DuplicationDetector(FILENAME) - mock_logger = MagicMock() - detector.logger = mock_logger - return detector + return DuplicationDetector(FILENAME) @pytest.fixture def mock_prototxt_file(): - content = "input_shape { dim: 1 dim: 3 dim: 300 dim: 300 }" - return mock_open(read_data=content) + return mock_open(read_data=DEPLOY_PROTO_CONTENT) @pytest.fixture def mock_cv2dnn_storage(): - storage = MagicMock(spec=CV2DNNStorage) - storage.exists.return_value = True - storage.path.side_effect = lambda filename: FILENAME - return storage + return MagicMock(spec=CV2DNNStorage) @pytest.fixture def mock_hde_azure_storage(): - storage = MagicMock(spec=HDEAzureStorage) - storage.exists.return_value = True - # storage.listdir.return_value = (None, FILENAMES) - storage.open.return_value.__enter__.return_value.read.return_value = b"binary image data" - return storage + return MagicMock(spec=HDEAzureStorage) @pytest.fixture def mock_hope_azure_storage(): - storage = MagicMock(spec=HOPEAzureStorage) - storage.exists.return_value = True - storage.open.return_value.__enter__.return_value.read.return_value = b"binary image data" - return storage + return MagicMock(spec=HOPEAzureStorage) @pytest.fixture def image_bytes_io(dd): - # Create an image and save it to a BytesIO object - image = Image.new("RGB", (100, 100), color="red") img_byte_arr = BytesIO() + image = Image.new("RGB", (100, 100), color="red") image.save(img_byte_arr, format="JPEG") img_byte_arr.seek(0) - - def fake_open(file, mode="rb", *args, **kwargs): - if "rb" in mode and file == dd.filename: - # Return a new BytesIO object with image data each time to avoid file closure - return BytesIO(img_byte_arr.getvalue()) - else: - # Return a MagicMock for other cases to simulate other file behaviors - return MagicMock() - - img_byte_arr.fake_open = fake_open - + img_byte_arr.fake_open = lambda *_: BytesIO(img_byte_arr.getvalue()) return img_byte_arr @@ -89,20 +71,10 @@ def mock_open_context_manager(image_bytes_io): @pytest.fixture def mock_net(): mock_net = MagicMock(spec=cv2.dnn_Net) # Mocking the neural network object - mock_detections = np.array( - [ - [ - [ - [0, 0, 0.95, 0.1, 0.1, 0.2, 0.2], # with confidence 0.95 - [0, 0, 0.15, 0.1, 0.1, 0.2, 0.2], # with confidence 0.15 - ] - ] - ], - dtype=np.float32, - ) # Mocking the detections array - expected_regions = [(10, 10, 20, 20)] # Mocking the expected regions + mock_detections = np.array([[FACE_DETECTIONS]], dtype=np.float32) # Mocking the detections array + mock_expected_regions = FACE_REGIONS_VALID mock_net.forward.return_value = mock_detections # Setting up the forward method of the mock network - mock_imdecode = MagicMock(return_value=np.ones((100, 100, 3), dtype=np.uint8)) - mock_resize = MagicMock(return_value=np.ones((300, 300, 3), dtype=np.uint8)) - mock_blob = np.zeros((1, 3, 300, 300)) - return mock_net, mock_imdecode, mock_resize, mock_blob, expected_regions + mock_imdecode = MagicMock(return_value=np.ones(IMAGE_SIZE, dtype=np.uint8)) + mock_resize = MagicMock(return_value=np.ones(RESIZED_IMAGE_SIZE, dtype=np.uint8)) + mock_blob = np.zeros(BLOB_SHAPE) + return mock_net, mock_imdecode, mock_resize, mock_blob, mock_expected_regions diff --git a/tests/faces/test_duplication_detector.py b/tests/faces/test_duplication_detector.py index c03a3ef5..206f6c50 100644 --- a/tests/faces/test_duplication_detector.py +++ b/tests/faces/test_duplication_detector.py @@ -7,21 +7,21 @@ import numpy as np import pytest from constance import config -from faces_const import DEPLOY_PROTO_SHAPE, FILENAME, FILENAMES +from faces_const import DEPLOY_PROTO_SHAPE, FACE_REGIONS_INVALID, FILENAME, FILENAMES from hope_dedup_engine.apps.faces.utils.duplication_detector import DuplicationDetector def test_duplication_detector_initialization(dd): assert isinstance(dd.net, cv2.dnn_Net) - assert isinstance(dd.logger, MagicMock) assert dd.face_detection_confidence == config.FACE_DETECTION_CONFIDENCE - assert dd.distance_threshold == config.DISTANCE_THRESHOLD + assert dd.distance_threshold == config.FACE_DISTANCE_THRESHOLD assert dd.filename == FILENAME assert dd.encodings_filename == f"{FILENAME}.npy" - assert dd.scale_factor == config.SCALE_FACTOR - assert dd.mean_values == tuple(map(float, config.MEAN_VALUES.split(", "))) - assert dd.face_detection_model == config.FACE_DETECTION_MODEL + assert dd.scale_factor == config.BLOB_FROM_IMAGE_SCALE_FACTOR + assert dd.mean_values == tuple(map(float, config.BLOB_FROM_IMAGE_MEAN_VALUES.split(", "))) + assert dd.face_encodings_model == config.FACE_ENCODINGS_MODEL + assert dd.face_encodings_num_jitters == config.FACE_ENCODINGS_NUM_JITTERS assert dd.nms_threshold == config.NMS_THRESHOLD assert dd.shape == DEPLOY_PROTO_SHAPE @@ -54,22 +54,24 @@ def test_set_net(dd, mock_cv2dnn_storage, mock_net): storage.path.assert_any_call(settings.CAFFEMODEL_FILE) -def test_missing_files_in_storage(dd, mock_cv2dnn_storage): +@pytest.mark.parametrize("missing_file", [settings.PROTOTXT_FILE, settings.CAFFEMODEL_FILE]) +def test_initialization_missing_files_in_cv2dnn_storage(mock_cv2dnn_storage, missing_file): with patch( - "hope_dedup_engine.apps.faces.utils.duplication_detector.CV2DNNStorage", new=lambda _: mock_cv2dnn_storage + "hope_dedup_engine.apps.faces.utils.duplication_detector.CV2DNNStorage", return_value=mock_cv2dnn_storage ): - mock_cv2dnn_storage.exists.return_value = False + mock_cv2dnn_storage.exists.side_effect = lambda filename: filename != missing_file with pytest.raises(FileNotFoundError): DuplicationDetector(FILENAME) + mock_cv2dnn_storage.exists.assert_any_call(missing_file) def test_has_encodings_false(dd): - dd.storages["encoded"].exists = MagicMock(return_value=False) + dd.storages["encoded"].exists.return_value = False assert not dd.has_encodings def test_has_encodings_true(dd): - dd.storages["encoded"].exists = MagicMock(return_value=True) + dd.storages["encoded"].exists.return_value = True assert dd.has_encodings @@ -78,13 +80,12 @@ def test_get_face_detections_dnn_no_detections(dd, mock_open_context_manager): patch.object(dd.storages["images"], "open", return_value=mock_open_context_manager), patch.object(dd, "_get_face_detections_dnn", return_value=[]), ): - face_regions = dd._get_face_detections_dnn() - assert len(face_regions) == 0 # Assuming no faces are detected + assert len(face_regions) == 0 def test_get_face_detections_dnn_with_detections(dd, mock_net, mock_open_context_manager): - net, imdecode, resize, blob, expected_regions = mock_net + net, imdecode, resize, _, expected_regions = mock_net with ( patch.object(dd.storages["images"], "open", return_value=mock_open_context_manager), patch("cv2.imdecode", imdecode), @@ -94,19 +95,24 @@ def test_get_face_detections_dnn_with_detections(dd, mock_net, mock_open_context face_regions = dd._get_face_detections_dnn() assert face_regions == expected_regions - assert len(face_regions) == 1 # Assuming one face is detected - assert isinstance(face_regions[0], tuple) # Each detected face region should be a tuple - assert len(face_regions[0]) == 4 # Each tuple should have four elements (coordinates of the bounding box) + for region in face_regions: + assert isinstance(region, tuple) + assert len(region) == 4 def test_get_face_detections_dnn_exception_handling(dd): - with patch("builtins.open", side_effect=Exception("Test exception")): - try: + with ( + patch.object(dd.storages["images"], "open", side_effect=Exception("Test exception")) as mock_storage_open, + patch.object(dd.logger, "exception") as mock_logger_exception, + ): + with pytest.raises(Exception, match="Test exception"): dd._get_face_detections_dnn() - except Exception: - ... - dd.logger.exception.assert_called_once() - dd.logger.reset_mock() + + mock_storage_open.assert_called_once_with(dd.filename, "rb") + mock_logger_exception.assert_called_once() + + assert dd.filename in mock_logger_exception.call_args[0][0] + assert mock_logger_exception.call_args[1]["exc_info"] is True def test_load_encodings_all_no_files(dd): @@ -135,13 +141,34 @@ def test_load_encodings_all_with_files(dd): assert np.array_equal(result[key], value) -def test_load_encodings_all_exception_handling(dd): - with patch("builtins.open", side_effect=Exception("Test exception")): - try: +def test_load_encodings_all_exception_handling_listdir(dd): + with ( + patch.object(dd.storages["encoded"], "listdir", side_effect=Exception("Test exception")) as mock_listdir, + patch.object(dd.logger, "exception") as mock_logger_exception, + ): + with pytest.raises(Exception, match="Test exception"): dd._load_encodings_all() - except Exception: - ... - dd.logger.reset_mock() + + mock_listdir.assert_called_once_with("") + + mock_logger_exception.assert_called_once() + assert mock_logger_exception.call_args[1]["exc_info"] is True + + +def test_load_encodings_all_exception_handling_open(dd): + with ( + patch.object(dd.storages["encoded"], "listdir", return_value=(None, [f"{FILENAME}.npy"])) as mock_listdir, + patch.object(dd.storages["encoded"], "open", side_effect=Exception("Test exception")) as mock_open, + patch.object(dd.logger, "exception") as mock_logger_exception, + ): + with pytest.raises(Exception, match="Test exception"): + dd._load_encodings_all() + + mock_listdir.assert_called_once_with("") + mock_open.assert_called_once_with(f"{FILENAME}.npy", "rb") + + mock_logger_exception.assert_called_once() + assert mock_logger_exception.call_args[1]["exc_info"] is True def test_encode_face_successful(dd, image_bytes_io, mock_net): @@ -151,38 +178,41 @@ def test_encode_face_successful(dd, image_bytes_io, mock_net): patch.object(dd, "net", mock_net), ): dd._encode_face() + mocked_image_open.assert_called_with(dd.filename, "rb") + assert mocked_image_open.side_effect == image_bytes_io.fake_open assert mocked_image_open.called -def test_encode_face_invalid_region(dd, image_bytes_io): - # Mock _get_face_detections_dnn to return an invalid region +@pytest.mark.parametrize("face_regions", FACE_REGIONS_INVALID) +def test_encode_face_error(dd, image_bytes_io, face_regions): with ( - patch("builtins.open", new_callable=lambda: image_bytes_io.fake_open), - patch.object(dd.storages["images"], "open", side_effect=image_bytes_io.fake_open), - patch.object(dd, "_get_face_detections_dnn", return_value=[(0, 0, 10)]), + patch.object(dd.storages["images"], "open", side_effect=image_bytes_io.fake_open) as mock_storage_open, + patch.object(dd, "_get_face_detections_dnn", return_value=face_regions) as mock_get_face_detections_dnn, patch.object(dd.logger, "error") as mock_error_logger, ): - - # Invoke the _encode_face method, expecting an error log due to an invalid region dd._encode_face() - # Check that the error was logged with the correct message - mock_error_logger.assert_called_once_with(f"Invalid face region {(0, 0, 10)}") - dd.logger.reset_mock() + mock_storage_open.assert_called_with(dd.filename, "rb") + mock_get_face_detections_dnn.assert_called_once() + + mock_error_logger.assert_called_once() def test_encode_face_exception_handling(dd): - with patch("builtins.open", side_effect=Exception("Test exception")): - try: + with ( + patch.object(dd.storages["images"], "open", side_effect=Exception("Test exception")) as mock_storage_open, + patch.object(dd.logger, "exception") as mock_logger_exception, + ): + with pytest.raises(Exception, match="Test exception"): dd._encode_face() - except Exception: - ... - dd.logger.exception.assert_called_once() - dd.logger.reset_mock() + + mock_storage_open.assert_called_with(dd.filename, "rb") + mock_logger_exception.assert_called_once() + assert mock_logger_exception.call_args[1]["exc_info"] is True -def test_find_duplicates_successful(dd, mock_hde_azure_storage): +def test_find_duplicates_successful_when_encoded(dd, mock_hde_azure_storage): # Generate mock return values dynamically based on FILENAMES mock_encodings = {filename: [np.array([0.1, 0.2, 0.3 + i * 0.001])] for i, filename in enumerate(FILENAMES)} @@ -207,25 +237,22 @@ def test_find_duplicates_successful(dd, mock_hde_azure_storage): def test_find_duplicates_calls_encode_face_when_no_encodings(dd): with ( - patch( - "hope_dedup_engine.apps.faces.utils.duplication_detector.DuplicationDetector.has_encodings", - new_callable=MagicMock(return_value=False), - ), patch.object(dd, "_encode_face") as mock_encode_face, - patch.object(dd, "_load_encodings_all", return_value={"test_file.jpg": [MagicMock()]}), + patch.object(dd, "_load_encodings_all", return_value={FILENAME: [MagicMock()]}), ): + dd.storages["encoded"].exists.return_value = False dd.find_duplicates() mock_encode_face.assert_called_once() def test_find_duplicates_exception_handling(dd): - with patch.object(dd, "_load_encodings_all", side_effect=Exception("Test exception")): - try: + with ( + patch.object(dd, "_load_encodings_all", side_effect=Exception("Test exception")), + patch.object(dd.logger, "exception") as mock_logger_exception, + ): + with pytest.raises(Exception, match="Test exception"): dd.find_duplicates() - except Exception: - ... - dd.logger.exception.assert_called_once() - args, kwargs = dd.logger.exception.call_args - assert args[0] == f"Error finding duplicates for image {dd.filename}" - assert isinstance(kwargs["exc_info"], Exception) - dd.logger.reset_mock() + + mock_logger_exception.assert_called_once() + assert mock_logger_exception.call_args[1]["exc_info"] is True + assert dd.filename in mock_logger_exception.call_args[0][0]