diff --git a/datashuttle/configs/canonical_tags.py b/datashuttle/configs/canonical_tags.py index 233350bc..7f265a5c 100644 --- a/datashuttle/configs/canonical_tags.py +++ b/datashuttle/configs/canonical_tags.py @@ -11,5 +11,24 @@ def tags(tag_name: str) -> str: "datetime": "@DATETIME@", "to": "@TO@", "*": "@*@", + "DATETO": "@DATETO@", + "TIMETO": "@TIMETO@", + "DATETIMETO": "@DATETIMETO@", } return tags[tag_name] + + +def get_datetime_formats() -> dict: + """ + Get all datetime format strings. + + Returns + ------- + dict + A dictionary containing format strings for datetime, time, and date + """ + return { + "datetime": "%Y%m%dT%H%M%S", + "time": "%H%M%S", + "date": "%Y%m%d", + } diff --git a/datashuttle/utils/data_transfer.py b/datashuttle/utils/data_transfer.py index 21121b8e..d2a5efbc 100644 --- a/datashuttle/utils/data_transfer.py +++ b/datashuttle/utils/data_transfer.py @@ -462,7 +462,7 @@ def get_processed_names( processed_names = formatting.check_and_format_names( names_checked, prefix ) - processed_names = folders.search_for_wildcards( + processed_names = folders.search_with_tags( self.__cfg, self.__base_folder, self.__local_or_central, diff --git a/datashuttle/utils/folders.py b/datashuttle/utils/folders.py index 56852640..82e932d1 100644 --- a/datashuttle/utils/folders.py +++ b/datashuttle/utils/folders.py @@ -17,11 +17,14 @@ from datashuttle.utils.custom_types import TopLevelFolder import glob +import re +from datetime import datetime from pathlib import Path from datashuttle.configs import canonical_folders, canonical_tags from datashuttle.utils import ssh, utils, validation from datashuttle.utils.custom_exceptions import NeuroBlueprintError +from datashuttle.utils.utils import get_values_from_bids_formatted_name # ----------------------------------------------------------------------------- # Create Folders @@ -62,6 +65,12 @@ def create_folder_trees( "datatype is deprecated in 0.6.0" ) + # Initialize all_paths with required keys + all_paths = { + "sub": [], + "ses": [], + } + if datatype_passed: error_message = validation.check_datatypes_are_valid( datatype, allow_all=True @@ -69,13 +78,6 @@ def create_folder_trees( if error_message: utils.log_and_raise_error(error_message, NeuroBlueprintError) - all_paths: Dict = {} - else: - all_paths = { - "sub": [], - "ses": [], - } - for sub in sub_names: sub_path = cfg.build_project_path( "local", @@ -355,11 +357,12 @@ def process_glob_to_find_datatype_folders( return zip(ses_folder_keys, ses_folder_values) +# ----------------------------------------------------------------------------- # Wildcards # ----------------------------------------------------------------------------- -def search_for_wildcards( +def search_with_tags( cfg: Configs, base_folder: Path, local_or_central: str, @@ -367,62 +370,335 @@ def search_for_wildcards( sub: Optional[str] = None, ) -> List[str]: """ - Handle wildcard flag in upload or download. + Handle wildcard and datetime range searching in names during upload or download. - All names in name are searched for @*@ string, and replaced - with single * for glob syntax. If sub is passed, it is - assumes all_names is ses_names and the sub folder is searched - for ses_names matching the name including wildcard. Otherwise, - if sub is None it is assumed all_names are sub names and - the level above is searched. + There are two types of special patterns that can be used in names: + 1. Wildcards: Names containing @*@ will be replaced with "*" for glob pattern matching + 2. Datetime ranges: Names containing @DATETO@, @TIMETO@, or @DATETIMETO@ will be used + to filter folders within a specific datetime range - Outputs a new list of names including all original names - but where @*@-containing names have been replaced with - search results. + For datetime ranges, the format must be: + - date: YYYYMMDD@DATETO@YYYYMMDD (e.g., "20240101@DATETO@20241231") + - time: HHMMSS@TIMETO@HHMMSS (e.g., "000000@TIMETO@235959") + - datetime: YYYYMMDDTHHMMSS@DATETIMETO@YYYYMMDDTHHMMSS Parameters ---------- + cfg + datashuttle project configuration + base_folder + folder to search for wildcards in + local_or_central + "local" or "central" project path to search in + all_names + list of names that may contain wildcards or datetime ranges. If sub is + passed, these are treated as session names. If sub is None, they are + treated as subject names + sub + optional subject to search for sessions in. If not provided, + will search for subjects rather than sessions - project : initialised datashuttle project - - base_folder : folder to search for wildcards in - - local_or_central : "local" or "central" project path to - search in - - all_names : list of subject or session names that - may or may not include the wildcard flag. If sub (below) - is passed, it is assumed these are session names. Otherwise, - it is assumed these are subject names. - - sub : optional subject to search for sessions in. If not provided, - will search for subjects rather than sessions. - + Returns + ------- + List[str] + A list of matched folder names after wildcard expansion and datetime filtering. + For datetime ranges, only folders with timestamps within the specified range + will be included. + + Examples + -------- + Wildcards: + >>> search_with_tags(cfg, path, "local", ["sub-@*@"]) + ["sub-001", "sub-002", "sub-003"] + + Date range: + >>> search_with_tags(cfg, path, "local", ["sub-001_20240101@DATETO@20241231_id-*"]) + ["sub-001_date-20240315_id-1", "sub-001_date-20240401_id-2"] + + Time range: + >>> search_with_tags(cfg, path, "local", ["sub-002_000000@TIMETO@120000"]) + ["sub-002_time-083000", "sub-002_time-113000"] """ new_all_names: List[str] = [] for name in all_names: - if canonical_tags.tags("*") in name: - name = name.replace(canonical_tags.tags("*"), "*") + if not (canonical_tags.tags("*") in name or + canonical_tags.tags("DATETO") in name or + canonical_tags.tags("TIMETO") in name or + canonical_tags.tags("DATETIMETO") in name): + # If no special tags, just add the name as is + new_all_names.append(name) + continue - matching_names: List[str] + # Handle wildcard replacement first if present + search_str = name + if canonical_tags.tags("*") in name: + search_str = search_str.replace(canonical_tags.tags("*"), "*") + + # Handle datetime ranges + format_type = None + tag = None + if canonical_tags.tags("DATETO") in search_str: + tag = canonical_tags.tags("DATETO") + format_type = "date" + elif canonical_tags.tags("TIMETO") in search_str: + tag = canonical_tags.tags("TIMETO") + format_type = "time" + elif canonical_tags.tags("DATETIMETO") in search_str: + tag = canonical_tags.tags("DATETIMETO") + format_type = "datetime" + + if format_type is not None: + assert tag is not None + search_str = format_and_validate_datetime_search_str(search_str, format_type, tag) + + # Use the helper function to perform the glob search if sub: - matching_names = search_sub_or_ses_level( # type: ignore - cfg, base_folder, local_or_central, sub, search_str=name + matching_names = search_sub_or_ses_level( + cfg, base_folder, local_or_central, sub, search_str=search_str )[0] else: - matching_names = search_sub_or_ses_level( # type: ignore - cfg, base_folder, local_or_central, search_str=name + matching_names = search_sub_or_ses_level( + cfg, base_folder, local_or_central, search_str=search_str )[0] - new_all_names += matching_names + # Filter results by datetime range + start_timepoint, end_timepoint = strip_start_end_date_from_datetime_tag( + name, format_type, tag + ) + matching_names = filter_names_by_datetime_range( + matching_names, format_type, start_timepoint, end_timepoint + ) + new_all_names.extend(matching_names) else: - new_all_names += [name] + # No datetime range, just perform the glob search with wildcards + if sub: + matching_names = search_sub_or_ses_level( + cfg, base_folder, local_or_central, sub, search_str=search_str + )[0] + else: + matching_names = search_sub_or_ses_level( + cfg, base_folder, local_or_central, search_str=search_str + )[0] + new_all_names.extend(matching_names) + + return list(set(new_all_names)) # Remove duplicates + + +def filter_names_by_datetime_range( + names: List[str], + format_type: str, + start_timepoint: datetime, + end_timepoint: datetime, +) -> List[str]: + """ + Filter a list of names based on a datetime range. + Assumes all names contain the format_type pattern (e.g., date-*, time-*) + as they were searched using this pattern. + + Parameters + ---------- + names + List of names to filter, all containing the datetime pattern + format_type + One of "datetime", "time", or "date" + start_timepoint + Start of the datetime range + end_timepoint + End of the datetime range + + Returns + ------- + List[str] + Filtered list of names that fall within the datetime range + + Raises + ------ + ValueError + If any datetime value does not match the expected ISO format + """ + filtered_names: List[str] = [] + for candidate in names: + candidate_basename = candidate if isinstance(candidate, str) else candidate.name + value = get_values_from_bids_formatted_name([candidate_basename], format_type)[0] + + try: + candidate_timepoint = datetime_object_from_string(value, format_type) + except ValueError: + utils.log_and_raise_error( + f"Invalid {format_type} format in name {candidate_basename}. " + f"Expected ISO format: {canonical_tags.get_datetime_formats()[format_type]}", + ValueError, + ) + + if start_timepoint <= candidate_timepoint <= end_timepoint: + filtered_names.append(candidate) + + return filtered_names + + +# ----------------------------------------------------------------------------- +# Datetime Tag Functions +# ----------------------------------------------------------------------------- + + +def get_expected_datetime_len(format_type: str) -> int: + """ + Get the expected length of characters for a datetime format. + + Parameters + ---------- + format_type + One of "datetime", "time", or "date" + + Returns + ------- + int + The number of characters expected for the format + """ + format_str = canonical_tags.get_datetime_formats()[format_type] + today = datetime.now() + return len(today.strftime(format_str)) + - new_all_names = list( - set(new_all_names) - ) # remove duplicate names in case of wildcard overlap +def find_datetime_in_name(name: str, format_type: str, tag: str) -> tuple[str, str] | None: + """ + Find and extract datetime values from a name using a regex pattern. + + Parameters + ---------- + name + The name containing the datetime range + e.g. "sub-001_20240101@DATETO@20250101_id-*" + format_type + One of "datetime", "time", or "date" + tag + The tag used for the range (e.g. @DATETO@) + + Returns + ------- + tuple[str, str] | None + A tuple containing (start_datetime_str, end_datetime_str) if found, + None if no match is found + """ + expected_len = get_expected_datetime_len(format_type) + full_tag_regex = fr"(\d{{{expected_len}}}){re.escape(tag)}(\d{{{expected_len}}})" + match = re.search(full_tag_regex, name) + return match.groups() if match else None + + +def strip_start_end_date_from_datetime_tag( + search_str: str, format_type: str, tag: str +) -> tuple[datetime, datetime]: + """ + Extract and validate start and end datetime values from a search string. + + Parameters + ---------- + search_str + The search string containing the datetime range + e.g. "sub-001_20240101T000000@DATETIMETO@20250101T235959" + format_type + One of "datetime", "time", or "date" + tag + The tag used for the range (e.g. @DATETIMETO@) - return new_all_names + Returns + ------- + tuple[datetime, datetime] + A tuple containing (start_timepoint, end_timepoint) + + Raises + ------ + NeuroBlueprintError + If the datetime format is invalid, the range is malformed, + or end datetime is before start datetime + """ + expected_len = get_expected_datetime_len(format_type) + full_tag_regex = fr"(\d{{{expected_len}}}){re.escape(tag)}(\d{{{expected_len}}})" + match = re.search(full_tag_regex, search_str) + + if not match: + utils.log_and_raise_error( + f"Invalid {format_type} range format in search string: {search_str}. Ensure the format matches the expected pattern: {canonical_tags.get_datetime_formats()[format_type]}.", + NeuroBlueprintError, + ) + + start_str, end_str = match.groups() + + try: + start_timepoint = datetime_object_from_string(start_str, format_type) + end_timepoint = datetime_object_from_string(end_str, format_type) + except ValueError as e: + utils.log_and_raise_error( + f"Invalid {format_type} format in search string: {search_str}. Error: {str(e)}", + NeuroBlueprintError, + ) + + if end_timepoint <= start_timepoint: + utils.log_and_raise_error( + f"End {format_type} is before start {format_type}. Ensure the end datetime is after the start datetime.", + NeuroBlueprintError, + ) + + return start_timepoint, end_timepoint + + +def format_and_validate_datetime_search_str(search_str: str, format_type: str, tag: str) -> str: + """ + Validate and format a search string containing a datetime range. + + Parameters + ---------- + search_str + The search string containing the datetime range + e.g. "sub-001_20240101@DATETO@20250101_id-*" or "sub-002_000000@TIMETO@235959" + format_type + One of "datetime", "time", or "date" + tag + The tag used for the range (e.g. @DATETO@) + + Returns + ------- + str + The formatted search string with datetime range replaced + e.g. "sub-001_date-*_id-*" or "sub-002_time-*" + + Raises + ------ + NeuroBlueprintError + If the datetime format is invalid or the range is malformed + """ + # Validate the datetime range format + strip_start_end_date_from_datetime_tag(search_str, format_type, tag) + + # Replace datetime range with wildcard pattern + expected_len = get_expected_datetime_len(format_type) + full_tag_regex = fr"(\d{{{expected_len}}}){re.escape(tag)}(\d{{{expected_len}}})" + return re.sub(full_tag_regex, f"{format_type}-*", search_str) + + +def datetime_object_from_string(datetime_string: str, format_type: str) -> datetime: + """ + Convert a datetime string to a datetime object using the appropriate format. + + Parameters + ---------- + datetime_string : The string to convert to a datetime object + format_type : One of "datetime", "time", or "date" + + Returns + ------- + datetime + The parsed datetime object + + Raises + ------ + ValueError + If the string cannot be parsed using the specified format + """ + return datetime.strptime( + datetime_string, canonical_tags.get_datetime_formats()[format_type] + ) # ----------------------------------------------------------------------------- @@ -440,7 +716,7 @@ def search_sub_or_ses_level( search_str: str = "*", verbose: bool = True, return_full_path: bool = False, -) -> Tuple[List[str] | List[Path], List[str]]: +) -> Tuple[Union[List[str], List[Path]], List[str]]: """ Search project folder at the subject or session level. Only returns folders diff --git a/datashuttle/utils/validation.py b/datashuttle/utils/validation.py index e85d757d..da61c727 100644 --- a/datashuttle/utils/validation.py +++ b/datashuttle/utils/validation.py @@ -24,7 +24,7 @@ from itertools import chain from pathlib import Path -from datashuttle.configs import canonical_configs, canonical_folders +from datashuttle.configs import canonical_configs, canonical_folders, canonical_tags from datashuttle.utils import formatting, getters, utils from datashuttle.utils.custom_exceptions import NeuroBlueprintError @@ -321,8 +321,8 @@ def replace_tags_in_regexp(regexp: str) -> str: Note `replace_date_time_tags_in_name()` operates in place on a list. """ regexp_list = [regexp] - date_regexp = "\d\d\d\d\d\d\d\d" - time_regexp = "\d\d\d\d\d\d" + date_regexp = r"\d{8}" + time_regexp = r"\d{6}" formatting.replace_date_time_tags_in_name( regexp_list, @@ -361,7 +361,7 @@ def names_include_special_characters( def name_has_special_character(name: str) -> bool: - return not re.match("^[A-Za-z0-9_-]*$", name) + return not re.match(r"^[A-Za-z0-9_-]*$", name) def dashes_and_underscore_alternate_incorrectly( @@ -432,18 +432,12 @@ def datetime_are_iso_format( """ Check formatting for date-, time-, or datetime- tags. """ - formats = { - "datetime": "%Y%m%dT%H%M%S", - "time": "%H%M%S", - "date": "%Y%m%d", - } - - key = next((key for key in formats if key in name), None) + datetime_keys = list(canonical_tags.get_datetime_formats().keys()) + key = next((key for key in datetime_keys if f"_{key}-" in name), None) error_message: List[str] if not key: error_message = [] - else: try: format_to_check = utils.get_values_from_bids_formatted_name( @@ -452,17 +446,39 @@ def datetime_are_iso_format( except: return [] - strfmt = formats[key] - - try: - datetime.strptime(format_to_check, strfmt) + if datetime_value_str_is_iso_format(format_to_check, key): error_message = [] - except ValueError: - error_message = [get_datetime_error(key, name, strfmt, path_)] + else: + error_message = [get_datetime_error( + key, name, canonical_tags.get_datetime_formats()[key], path_ + )] return error_message +def datetime_value_str_is_iso_format(datetime_str: str, format_type: str) -> bool: + """ + Validate that a datetime string matches the expected ISO format. + + Parameters + ---------- + datetime_str : str + The datetime string to validate + format_type : str + One of "datetime", "time", or "date" + + Returns + ------- + bool + True if the string matches the ISO format, False otherwise + """ + try: + datetime.strptime(datetime_str, canonical_tags.get_datetime_formats()[format_type]) + return True + except ValueError: + return False + + def raise_display_mode( message: str, display_mode: DisplayMode, log: bool ) -> None: @@ -981,3 +997,5 @@ def check_datatypes_are_valid( return message return None + + diff --git a/tests/test_date_search_range.py b/tests/test_date_search_range.py new file mode 100644 index 00000000..d32d61d0 --- /dev/null +++ b/tests/test_date_search_range.py @@ -0,0 +1,200 @@ +import glob +import os +import re +import shutil +import tempfile +from datetime import datetime +from pathlib import Path +from typing import List + +import pytest + +from datashuttle.utils.folders import search_with_tags + + +# Dummy implementation for canonical_tags +class DummyCanonicalTags: + @staticmethod + def tags(x: str) -> str: + tags_dict = { + "*": "@*@", + "DATETO": "@DATETO@", + "TIMETO": "@TIMETO@", + "DATETIMETO": "@DATETIMETO@" + } + return tags_dict.get(x, x) + + @staticmethod + def get_datetime_format(format_type: str) -> str: + formats = { + "datetime": "%Y%m%dT%H%M%S", + "time": "%H%M%S", + "date": "%Y%m%d", + } + if format_type not in formats: + raise ValueError(f"Invalid format type: {format_type}") + return formats[format_type] + + +# Patch canonical_tags +@pytest.fixture(autouse=True) +def patch_canonical_tags(monkeypatch): + from datashuttle.configs import canonical_tags + monkeypatch.setattr(canonical_tags, "tags", DummyCanonicalTags.tags) + monkeypatch.setattr(canonical_tags, "get_datetime_format", DummyCanonicalTags.get_datetime_format) + + +# Dummy implementation for search_sub_or_ses_level that simply performs globbing. +def dummy_search_sub_or_ses_level( + cfg, base_folder: Path, local_or_central: str, *args, search_str: str = "*" +): + pattern = os.path.join(str(base_folder), search_str) + matches: List[str] = sorted(glob.glob(pattern)) + return (matches, []) + + +# Patch search_sub_or_ses_level in the module where search_with_tags is defined. +@pytest.fixture(autouse=True) +def patch_search_sub_or_ses_level(monkeypatch): + from datashuttle.utils import folders + monkeypatch.setattr(folders, "search_sub_or_ses_level", dummy_search_sub_or_ses_level) + + +# Dummy implementation for get_values_from_bids_formatted_name +def dummy_get_values_from_bids_formatted_name(names: List[str], key: str, return_as_int: bool = False) -> List[str]: + results = [] + for name in names: + if key == "date": + m = re.search(r"date-(\d{8})", name) + if m: + results.append(m.group(1)) + return results + + +# Patch get_values_from_bids_formatted_name +@pytest.fixture(autouse=True) +def patch_get_values_from_bids(monkeypatch): + from datashuttle.utils import utils + monkeypatch.setattr(utils, "get_values_from_bids_formatted_name", dummy_get_values_from_bids_formatted_name) + + +# Fixture to create a temporary directory with a simulated folder structure. +@pytest.fixture +def temp_project_dir() -> Path: # type: ignore + temp_dir = Path(tempfile.mkdtemp()) + # Create folders with names in the format "sub-01_date-YYYYMMDD" + folder_dates = [ + "20250305", + "20250306", + "20250307", + "20250308", + "20250309", + "20250310", + ] + for date_str in folder_dates: + folder_name = f"sub-01_date-{date_str}" + os.mkdir(temp_dir / folder_name) + yield temp_dir + shutil.rmtree(temp_dir) + + +def test_date_range_wildcard(temp_project_dir: Path): + """ + When given a date-range wildcard pattern like "sub-01_20250306@DATETO@20250309", + only folders whose embedded date falls between 20250306 and 20250309 (inclusive) + should be returned. + """ + class Configs: + pass + + cfg = Configs() + base_folder = temp_project_dir + local_or_central = "local" + pattern = "sub-01_20250306@DATETO@20250309" + result = search_with_tags(cfg, base_folder, local_or_central, [pattern]) + + # Extract the dates from the returned folder names + found_dates = set() + for folder in result: + basename = os.path.basename(folder) + m = re.search(r"date-(\d{8})", basename) + if m: + found_dates.add(m.group(1)) + + expected_dates = {"20250306", "20250307", "20250308", "20250309"} + assert found_dates == expected_dates + + +def test_simple_wildcard(temp_project_dir: Path): + """ + When given a simple wildcard pattern like "sub-01_@*@", + all folders should be returned. + """ + class Configs: + pass + + cfg = Configs() + base_folder = temp_project_dir + local_or_central = "local" + pattern = "sub-01_@*@" + result = search_with_tags(cfg, base_folder, local_or_central, [pattern]) + # We expect six folders (20250305 through 20250310) + assert len(result) == 6 + + +def test_invalid_date_range(temp_project_dir: Path): + """ + Test that invalid date ranges raise appropriate errors. + """ + class Configs: + pass + + cfg = Configs() + base_folder = temp_project_dir + local_or_central = "local" + + # Test end date before start date + with pytest.raises(Exception) as exc_info: + pattern = "sub-01_20250309@DATETO@20250306" + search_with_tags(cfg, base_folder, local_or_central, [pattern]) + assert "before start" in str(exc_info.value) + + # Test invalid date format + with pytest.raises(Exception) as exc_info: + pattern = "sub-01_2025030@DATETO@20250306" # Missing digit + search_with_tags(cfg, base_folder, local_or_central, [pattern]) + assert "Invalid" in str(exc_info.value) + + +def test_combined_wildcards(temp_project_dir: Path): + """ + Test that wildcard and date range can be combined in the same pattern. + """ + class Configs: + pass + + cfg = Configs() + base_folder = temp_project_dir + local_or_central = "local" + + # Create some additional test folders with different subject numbers + for sub in ["02", "03"]: + for date in ["20250307", "20250308"]: + folder_name = f"sub-{sub}_date-{date}" + os.mkdir(temp_project_dir / folder_name) + + pattern = "sub-*_20250307@DATETO@20250308" + result = search_with_tags(cfg, base_folder, local_or_central, [pattern]) + + # Should match all subjects but only dates within range + matched_folders = set(os.path.basename(f) for f in result) + expected_folders = { + "sub-01_date-20250307", + "sub-01_date-20250308", + "sub-02_date-20250307", + "sub-02_date-20250308", + "sub-03_date-20250307", + "sub-03_date-20250308", + } + assert matched_folders == expected_folders +