diff --git a/PyFunceble/checker/availability/base.py b/PyFunceble/checker/availability/base.py index 311845f1..1e7f77e1 100644 --- a/PyFunceble/checker/availability/base.py +++ b/PyFunceble/checker/availability/base.py @@ -50,6 +50,8 @@ limitations under the License. """ +# pylint: disable=too-many-lines + import multiprocessing from datetime import datetime from typing import Dict, List, Optional @@ -60,6 +62,7 @@ import PyFunceble.facility import PyFunceble.factory import PyFunceble.storage +from PyFunceble.checker.availability.extra_rules import ExtraRulesHandler from PyFunceble.checker.availability.params import AvailabilityCheckerParams from PyFunceble.checker.availability.status import AvailabilityCheckerStatus from PyFunceble.checker.base import CheckerBase @@ -123,6 +126,7 @@ class AvailabilityCheckerBase(CheckerBase): domain_syntax_checker: Optional[DomainSyntaxChecker] = None ip_syntax_checker: Optional[IPSyntaxChecker] = None url_syntax_checker: Optional[URLSyntaxChecker] = None + extra_rules_handler: Optional[ExtraRulesHandler] = None _use_extra_rules: bool = False _use_whois_lookup: bool = False @@ -148,8 +152,9 @@ def __init__( do_syntax_check_first: Optional[bool] = None, db_session: Optional[Session] = None, use_whois_db: Optional[bool] = None, + use_collection: Optional[bool] = None, ) -> None: - self.dns_query_tool = DNSQueryTool().guess_all_settings() + self.dns_query_tool = DNSQueryTool() self.whois_query_tool = WhoisQueryTool() self.addressinfo_query_tool = AddressInfo() self.hostbyaddr_query_tool = HostByAddrInfo() @@ -157,6 +162,7 @@ def __init__( self.domain_syntax_checker = DomainSyntaxChecker() self.ip_syntax_checker = IPSyntaxChecker() self.url_syntax_checker = URLSyntaxChecker() + self.extra_rules_handler = ExtraRulesHandler() self.db_session = db_session self.params = AvailabilityCheckerParams() @@ -202,7 +208,10 @@ def __init__( self.guess_and_set_use_whois_db() super().__init__( - subject, do_syntax_check_first=do_syntax_check_first, db_session=db_session + subject, + do_syntax_check_first=do_syntax_check_first, + db_session=db_session, + use_collection=use_collection, ) @property @@ -940,6 +949,48 @@ def try_to_query_status_from_reputation(self) -> "AvailabilityCheckerBase": raise NotImplementedError() + def try_to_query_status_from_collection(self) -> "AvailabilityCheckerBase": + """ + Tries to get and set the status from the Collection API. + """ + + PyFunceble.facility.Logger.info( + "Started to try to query the status of %r from: Collection Lookup", + self.status.idna_subject, + ) + + data = self.collection_query_tool.pull(self.idna_subject) + + if data and "status" in data: + if ( + self.collection_query_tool.preferred_status_origin == "frequent" + and data["status"]["availability"]["frequent"] + ): + self.status.status = data["status"]["availability"]["frequent"] + self.status.status_source = "COLLECTION" + elif ( + self.collection_query_tool.preferred_status_origin == "latest" + and data["status"]["availability"]["latest"] + ): + self.status.status = data["status"]["availability"]["latest"]["status"] + self.status.status_source = "COLLECTION" + elif ( + self.collection_query_tool.preferred_status_origin == "recommended" + and data["status"]["availability"]["recommended"] + ): + self.status.status = data["status"]["availability"]["recommended"] + self.status.status_source = "COLLECTION" + + PyFunceble.facility.Logger.info( + "Could define the status of %r from: Collection Lookup", + self.status.idna_subject, + ) + + PyFunceble.facility.Logger.info( + "Finished to try to query the status of %r from: Collection Lookup", + self.status.idna_subject, + ) + @CheckerBase.ensure_subject_is_given @CheckerBase.update_status_date_after_query def query_status(self) -> "AvailabilityCheckerBase": diff --git a/PyFunceble/checker/availability/domain.py b/PyFunceble/checker/availability/domain.py index 459913ff..1987399d 100644 --- a/PyFunceble/checker/availability/domain.py +++ b/PyFunceble/checker/availability/domain.py @@ -55,7 +55,6 @@ import PyFunceble.factory import PyFunceble.storage from PyFunceble.checker.availability.base import AvailabilityCheckerBase -from PyFunceble.checker.availability.extra_rules import ExtraRulesHandler from PyFunceble.checker.reputation.domain import DomainReputationChecker @@ -131,6 +130,9 @@ def query_status( status_post_syntax_checker = None + if not self.status.status and self.use_collection: + self.try_to_query_status_from_collection() + if not self.status.status and self.do_syntax_check_first: self.try_to_query_status_from_syntax_lookup() @@ -176,7 +178,7 @@ def query_status( ) if self.use_extra_rules: - ExtraRulesHandler(self.status).start() + self.extra_rules_handler.set_status(self.status).start() return self diff --git a/PyFunceble/checker/availability/extra_rules.py b/PyFunceble/checker/availability/extra_rules.py index 0b39dd2a..8c39d6df 100644 --- a/PyFunceble/checker/availability/extra_rules.py +++ b/PyFunceble/checker/availability/extra_rules.py @@ -79,7 +79,7 @@ class ExtraRulesHandler: http_codes_dataset: Optional[Box] = None - def __init__(self, status: Optional[AvailabilityCheckerStatus]) -> None: + def __init__(self, status: Optional[AvailabilityCheckerStatus] = None) -> None: self.regex_active2inactive = { r"\.000webhostapp\.com": [ (self.__switch_to_down_if, 410), @@ -88,6 +88,7 @@ def __init__(self, status: Optional[AvailabilityCheckerStatus]) -> None: r"\.angelfire\.com$": [(self.__switch_to_down_if, 404)], r"\.blogspot\.": [self.__handle_blogspot], r"\.canalblog\.com$": [(self.__switch_to_down_if, 404)], + r"\.fc2\.com$": [self.__handle_fc2_dot_com], r"\.github\.io$": [(self.__switch_to_down_if, 404)], r"\.godaddysites\.com$": [(self.__switch_to_down_if, 404)], r"\.hpg.com.br$": [(self.__switch_to_down_if, 404)], @@ -294,6 +295,26 @@ def __handle_wordpress_dot_com(self) -> "ExtraRulesHandler": return self + def __handle_fc2_dot_com(self) -> "ExtraRulesHandler": + """ + Handles the :code:`fc2.com` case. + + .. warning:: + This method assume that we know that we are handling a fc2 domain. + """ + + if self.status.subject.startswith("http:"): + url = self.status.subject + else: + url = f"http://{self.status.subject}:80" + + req = PyFunceble.factory.Requester.get(url, allow_redirects=False) + + if "Location" in req.headers and "error.fc2.com" in req.headers["Location"]: + self.__switch_to_down() + + return self + def __handle_active2inactive(self) -> "ExtraRulesHandler": """ Handles the status deescalation. diff --git a/PyFunceble/checker/availability/ip.py b/PyFunceble/checker/availability/ip.py index 1779f484..b8181669 100644 --- a/PyFunceble/checker/availability/ip.py +++ b/PyFunceble/checker/availability/ip.py @@ -55,7 +55,6 @@ import PyFunceble.factory import PyFunceble.storage from PyFunceble.checker.availability.base import AvailabilityCheckerBase -from PyFunceble.checker.availability.extra_rules import ExtraRulesHandler from PyFunceble.checker.reputation.ip import IPReputationChecker @@ -171,7 +170,7 @@ def query_status( ) if self.use_extra_rules: - ExtraRulesHandler(self.status).start() + self.extra_rules_handler.set_status(self.status).start() return self diff --git a/PyFunceble/checker/base.py b/PyFunceble/checker/base.py index c5230a9e..499854bb 100644 --- a/PyFunceble/checker/base.py +++ b/PyFunceble/checker/base.py @@ -57,8 +57,11 @@ import domain2idna from sqlalchemy.orm import Session +import PyFunceble.facility +import PyFunceble.storage from PyFunceble.checker.params_base import CheckerParamsBase from PyFunceble.checker.status_base import CheckerStatusBase +from PyFunceble.query.collection import CollectionQueryTool class CheckerBase: @@ -75,13 +78,16 @@ class CheckerBase: """ STD_DO_SYNTAX_CHECK_FIRST: bool = False + STD_USE_COLLECTION: bool = False _do_syntax_check_first: bool = False + _use_collection: bool = False _subject: Optional[str] = None _idna_subject: Optional[str] = None db_session: Optional[Session] = None + collection_query_tool: Optional[CollectionQueryTool] = None status: Optional[CheckerStatusBase] = None params: Optional[CheckerParamsBase] = None @@ -92,7 +98,10 @@ def __init__( *, do_syntax_check_first: Optional[bool] = None, db_session: Optional[Session] = None, + use_collection: Optional[bool] = None, ) -> None: + self.collection_query_tool = CollectionQueryTool() + if self.params is None: self.params = CheckerParamsBase() @@ -107,6 +116,11 @@ def __init__( else: self.do_syntax_check_first = self.STD_DO_SYNTAX_CHECK_FIRST + if use_collection is not None: + self.use_collection = use_collection + else: + self.guess_and_set_use_collection() + self.db_session = db_session def propagate_subject(func): # pylint: disable=no-self-argument @@ -301,6 +315,56 @@ def set_do_syntax_check_first(self, value: bool) -> "CheckerBase": return self + @property + def use_collection(self) -> bool: + """ + Provides the current value of the :code:`_use_collection` attribute. + """ + + return self._use_collection + + @use_collection.setter + def use_collection(self, value: bool) -> None: + """ + Sets the value which authorizes the usage of the Collection. + + :param value: + The value to set. + + :param TypeError: + When the given :code:`value` is not a :py:class:`bool`. + """ + + if not isinstance(value, bool): + raise TypeError(f" should be {bool}, {type(value)} given.") + + self._use_collection = self.params.use_collection = value + + def set_use_collection(self, value: bool) -> "CheckerBase": + """ + Sets the value which authorizes the usage of the Collection. + + :param value: + The value to set. + """ + + self.use_collection = value + + return self + + def guess_and_set_use_collection(self) -> "CheckerBase": + """ + Try to guess and set the value of the :code:`use_collection` attribute. + """ + + if PyFunceble.facility.ConfigLoader.is_already_loaded(): + if isinstance(PyFunceble.storage.CONFIGURATION.lookup.collection, bool): + self.use_collection = PyFunceble.storage.CONFIGURATION.lookup.collection + else: + self.use_collection = self.STD_USE_COLLECTION + else: + self.use_collection = self.STD_USE_COLLECTION + def subject_propagator(self) -> "CheckerBase": """ Propagate the currently set subject. diff --git a/PyFunceble/checker/params_base.py b/PyFunceble/checker/params_base.py index e8d39114..d31b25e0 100644 --- a/PyFunceble/checker/params_base.py +++ b/PyFunceble/checker/params_base.py @@ -64,6 +64,7 @@ class CheckerParamsBase: """ do_syntax_check_first: Optional[bool] = None + use_collection: Optional[bool] = None def to_dict(self) -> dict: """ diff --git a/PyFunceble/checker/reputation/base.py b/PyFunceble/checker/reputation/base.py index a3f94c71..5168526a 100644 --- a/PyFunceble/checker/reputation/base.py +++ b/PyFunceble/checker/reputation/base.py @@ -93,8 +93,9 @@ def __init__( subject: Optional[str] = None, do_syntax_check_first: Optional[bool] = None, db_session: Optional[Session] = None, + use_collection: Optional[bool] = None, ) -> None: - self.dns_query_tool = DNSQueryTool().guess_all_settings() + self.dns_query_tool = DNSQueryTool() self.ipv4_reputation_query_tool = IPV4ReputationDataset() self.domain_syntax_checker = DomainSyntaxChecker() self.ip_syntax_checker = IPSyntaxChecker() @@ -107,7 +108,10 @@ def __init__( self.status.dns_lookup_record = self.dns_query_tool.lookup_record super().__init__( - subject, do_syntax_check_first=do_syntax_check_first, db_session=db_session + subject, + do_syntax_check_first=do_syntax_check_first, + db_session=db_session, + use_collection=use_collection, ) @staticmethod @@ -244,6 +248,48 @@ def try_to_query_status_from_syntax_lookup(self) -> "ReputationCheckerBase": return self + def try_to_query_status_from_collection(self) -> "ReputationCheckerBase": + """ + Tries to get and set the status from the Collection API. + """ + + PyFunceble.facility.Logger.info( + "Started to try to query the status of %r from: Collection Lookup", + self.status.idna_subject, + ) + + data = self.collection_query_tool[self.idna_subject] + + if data and "status" in data: + if ( + self.collection_query_tool.preferred_status_origin == "frequent" + and data["status"]["reputation"]["frequent"] + ): + self.status.status = data["status"]["reputation"]["frequent"] + self.status.status_source = "COLLECTION" + elif ( + self.collection_query_tool.preferred_status_origin == "latest" + and data["status"]["reputation"]["latest"] + ): + self.status.status = data["status"]["reputation"]["latest"]["status"] + self.status.status_source = "COLLECTION" + elif ( + self.collection_query_tool.preferred_status_origin == "recommended" + and data["status"]["reputation"]["recommended"] + ): + self.status.status = data["status"]["reputation"]["recommended"] + self.status.status_source = "COLLECTION" + + PyFunceble.facility.Logger.info( + "Could define the status of %r from: Collection Lookup", + self.status.idna_subject, + ) + + PyFunceble.facility.Logger.info( + "Finished to try to query the status of %r from: Collection Lookup", + self.status.idna_subject, + ) + @CheckerBase.ensure_subject_is_given @CheckerBase.update_status_date_after_query def query_status(self) -> "ReputationCheckerBase": @@ -253,6 +299,11 @@ def query_status(self) -> "ReputationCheckerBase": status_post_syntax_checker = None + if ( + not self.status.status and self.use_collection + ): # pragma: no cover ## Underlying tested + self.try_to_query_status_from_collection() + if not self.status.status and self.do_syntax_check_first: self.try_to_query_status_from_syntax_lookup() diff --git a/PyFunceble/cli/entry_points/pyfunceble/cli.py b/PyFunceble/cli/entry_points/pyfunceble/cli.py index 16cdd58c..ebc3e104 100644 --- a/PyFunceble/cli/entry_points/pyfunceble/cli.py +++ b/PyFunceble/cli/entry_points/pyfunceble/cli.py @@ -349,6 +349,25 @@ def get_test_control_group_data() -> List[Tuple[List[str], dict]]: % get_configured_value("cli_testing.local_network"), }, ), + ( + ["--collection-preferred-origin"], + { + "dest": "collection.preferred_status_origin", + "type": str, + "choices": ["frequent", "latest", "recommended"], + "help": "Sets the preferred status origin. %s" + % get_configured_value("collection.preferred_status_origin"), + }, + ), + ( + ["--collection-lookup"], + { + "dest": "lookup.collection", + "action": "store_true", + "help": "Activates or disables the usage of the Collection lookup\n" + "whether possible. %s" % get_configured_value("lookup.collection"), + }, + ), ( ["--dns-lookup"], { @@ -822,6 +841,17 @@ def get_output_control_group_data() -> List[Tuple[List[str], dict]]: "help": argparse.SUPPRESS, }, ), + ( + [ + "--push-collection", + ], + { + "dest": "collection.push", + "action": "store_true", + "help": "Activates or disables the push of test result into the\n" + "collection API. %s" % get_configured_value("collection.push"), + }, + ), ( [ "-s", diff --git a/PyFunceble/cli/filesystem/counter.py b/PyFunceble/cli/filesystem/counter.py index 011882d9..00f9dccc 100644 --- a/PyFunceble/cli/filesystem/counter.py +++ b/PyFunceble/cli/filesystem/counter.py @@ -228,6 +228,10 @@ def count(self, status: CheckerStatusBase) -> "FilesystemCounter": f" should be {CheckerStatusBase}, {type(status)} given." ) + if "counter" not in self.dataset: + self.dataset["counter"] = {} + self.dataset["percentage"] = {} + self.dataset["counter"][status.status] += 1 self.dataset["counter"]["total"] += 1 diff --git a/PyFunceble/cli/processes/workers/base.py b/PyFunceble/cli/processes/workers/base.py index eee39c21..33987bb6 100644 --- a/PyFunceble/cli/processes/workers/base.py +++ b/PyFunceble/cli/processes/workers/base.py @@ -277,6 +277,9 @@ def break_now() -> bool: PyFunceble.cli.facility.CredentialLoader.start() PyFunceble.cli.factory.DBSession.init_db_sessions() + # Be sure that all settings are loaded proprely!! + PyFunceble.factory.Requester.guess_all_settings() + wait_for_stop = ( bool(PyFunceble.storage.CONFIGURATION.cli_testing.mining) is True ) diff --git a/PyFunceble/cli/processes/workers/miner.py b/PyFunceble/cli/processes/workers/miner.py index 564bef2e..ae6ffcd3 100644 --- a/PyFunceble/cli/processes/workers/miner.py +++ b/PyFunceble/cli/processes/workers/miner.py @@ -82,9 +82,6 @@ class MinerWorker(WorkerBase): url2netloc: Optional[Url2Netloc] = None def __post_init__(self) -> None: - # Be sure that all settings are loaded proprely!! - PyFunceble.factory.Requester.guess_all_settings() - self.url2netloc = Url2Netloc() return super().__post_init__() diff --git a/PyFunceble/cli/processes/workers/producer.py b/PyFunceble/cli/processes/workers/producer.py index b9ef3c20..f758d990 100644 --- a/PyFunceble/cli/processes/workers/producer.py +++ b/PyFunceble/cli/processes/workers/producer.py @@ -74,6 +74,7 @@ from PyFunceble.dataset.autocontinue.csv import CSVContinueDataset from PyFunceble.dataset.inactive.base import InactiveDatasetBase from PyFunceble.dataset.whois.base import WhoisDatasetBase +from PyFunceble.query.collection import CollectionQueryTool class ProducerWorker(WorkerBase): @@ -92,6 +93,7 @@ class ProducerWorker(WorkerBase): continue_dataset: Optional[ContinueDatasetBase] = None status_file_generator: Optional[StatusFileGenerator] = None counter: Optional[FilesystemCounter] = None + collection_query_tool: Optional[CollectionQueryTool] = None header_already_printed: Optional[bool] = None @@ -110,6 +112,7 @@ def __post_init__(self) -> None: ) self.status_file_generator = StatusFileGenerator().guess_all_settings() self.counter = FilesystemCounter() + self.collection_query_tool = CollectionQueryTool() self.header_already_printed = False @@ -374,13 +377,19 @@ def target(self, consumed: Any) -> Optional[Tuple[Any, ...]]: self.run_ignored_file_printer(test_dataset, test_result) return None - if not isinstance(consumed[1], CheckerStatusBase): + if not isinstance(test_result, CheckerStatusBase): PyFunceble.facility.Logger.info( "Skipping latest dataset because consumed status is not " "a status object.." ) return None + if ( + PyFunceble.storage.CONFIGURATION.collection.push + and test_result.status_source != "COLLECTION" + ): + self.collection_query_tool.push(test_result) + self.run_whois_backup(test_result) self.run_inactive_backup(test_dataset, test_result) self.run_continue_backup(test_dataset, test_result) diff --git a/PyFunceble/cli/processes/workers/tester.py b/PyFunceble/cli/processes/workers/tester.py index 8f5f9279..ec98ba34 100644 --- a/PyFunceble/cli/processes/workers/tester.py +++ b/PyFunceble/cli/processes/workers/tester.py @@ -85,6 +85,7 @@ class TesterWorker(WorkerBase): testing_object: Optional[CheckerBase] = None known_testing_objects: dict = {} + initiated_testing_objects: dict = {} def __post_init__(self) -> None: self.continue_dataset = ( @@ -110,6 +111,18 @@ def __post_init__(self) -> None: }, } + self.initiated_testing_objects = { + "SYNTAX": {"domain": None, "url": None}, + "AVAILABILITY": { + "domain": None, + "url": None, + }, + "REPUTATION": { + "domain": None, + "url": None, + }, + } + return super().__post_init__() @staticmethod @@ -168,31 +181,25 @@ def _init_testing_object( When the given subject type is unknown. """ - if checker_type in self.known_testing_objects: - if subject_type in self.known_testing_objects[checker_type]: - # Yes, we initialize before returning! - - if not isinstance( - self.known_testing_objects[checker_type][subject_type], - type(self.testing_object), - ): - self.testing_object = self.known_testing_objects[checker_type][ - subject_type - ](db_session=self.db_session) - - # We want to always check the syntax first (ONLY UNDER THE CLI) - self.testing_object.set_do_syntax_check_first( - not bool( - PyFunceble.storage.CONFIGURATION.cli_testing.local_network - ) - ) + if checker_type not in self.known_testing_objects: + raise ValueError(f" ({checker_type!r}) is unknown.") + + if subject_type not in self.known_testing_objects[checker_type]: + raise ValueError(f" ({subject_type!r}) is unknown.") - return self.testing_object + if not self.initiated_testing_objects[checker_type][subject_type]: + self.initiated_testing_objects[checker_type][ + subject_type + ] = self.known_testing_objects[checker_type][subject_type]( + db_session=self.db_session + ).set_do_syntax_check_first( + # We want to always check the syntax first (ONLY UNDER THE CLI) + not bool(PyFunceble.storage.CONFIGURATION.cli_testing.local_network) + ) - return None + self.testing_object = self.initiated_testing_objects[checker_type][subject_type] - raise ValueError(f" ({subject_type!r}) is unknown.") - raise ValueError(f" ({checker_type!r}) is unknown.") + return self.testing_object def target(self, consumed: dict) -> Optional[Tuple[Any, ...]]: """ diff --git a/PyFunceble/cli/storage.py b/PyFunceble/cli/storage.py index 215d2c8e..3a5f1956 100644 --- a/PyFunceble/cli/storage.py +++ b/PyFunceble/cli/storage.py @@ -51,7 +51,8 @@ """ import os -from typing import Optional +import sys +from typing import List, Optional import colorama from box import Box @@ -60,17 +61,17 @@ from PyFunceble.helpers.merge import Merge from PyFunceble.utils.platform import PlatformUtility -STD_EPILOG: str = ( - f"Crafted with {colorama.Fore.RED}♥{colorama.Fore.RESET} by " - f"{colorama.Style.BRIGHT}{colorama.Fore.CYAN}Nissar Chababy (@funilrys)" - f"{colorama.Style.RESET_ALL} " - f"with the help of\n{colorama.Style.BRIGHT}{colorama.Fore.GREEN}" - f"https://git.io/JkUPS{colorama.Style.RESET_ALL} " - f"&& {colorama.Style.BRIGHT}{colorama.Fore.GREEN}" - f"https://git.io/JkUPF{colorama.Style.RESET_ALL}" -) +if PlatformUtility.is_unix() and sys.stdin.encoding == "utf-8": + STD_EPILOG: str = ( + f"Crafted with {colorama.Fore.RED}♥{colorama.Fore.RESET} by " + f"{colorama.Style.BRIGHT}{colorama.Fore.CYAN}Nissar Chababy (@funilrys)" + f"{colorama.Style.RESET_ALL} " + f"with the help of\n{colorama.Style.BRIGHT}{colorama.Fore.GREEN}" + f"https://git.io/JkUPS{colorama.Style.RESET_ALL} " + f"&& {colorama.Style.BRIGHT}{colorama.Fore.GREEN}" + f"https://git.io/JkUPF{colorama.Style.RESET_ALL}" + ) -if PlatformUtility.is_unix(): ASCII_PYFUNCEBLE = """ ██████╗ ██╗ ██╗███████╗██╗ ██╗███╗ ██╗ ██████╗███████╗██████╗ ██╗ ███████╗ ██╔══██╗╚██╗ ██╔╝██╔════╝██║ ██║████╗ ██║██╔════╝██╔════╝██╔══██╗██║ ██╔════╝ @@ -79,7 +80,18 @@ ██║ ██║ ██║ ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝ """ + DONE: str = f"{colorama.Fore.GREEN}✔" + ERROR: str = f"{colorama.Fore.RED}✘" else: + STD_EPILOG: str = ( + f"Crafted with {colorama.Fore.RED}HEART{colorama.Fore.RESET} by " + f"{colorama.Style.BRIGHT}{colorama.Fore.CYAN}Nissar Chababy (@funilrys)" + f"{colorama.Style.RESET_ALL} " + f"with the help of\n{colorama.Style.BRIGHT}{colorama.Fore.GREEN}" + f"https://git.io/JkUPS{colorama.Style.RESET_ALL} " + f"&& {colorama.Style.BRIGHT}{colorama.Fore.GREEN}" + f"https://git.io/JkUPF{colorama.Style.RESET_ALL}" + ) ASCII_PYFUNCEBLE = """ ######## ## ## ######## ## ## ## ## ###### ######## ######## ## ######## ## ## ## ## ## ## ## ### ## ## ## ## ## ## ## ## @@ -89,9 +101,8 @@ ## ## ## ## ## ## ### ## ## ## ## ## ## ## ## ## ## ####### ## ## ###### ######## ######## ######## ######## """ - -DONE: str = f"{colorama.Fore.GREEN}✔" -ERROR: str = f"{colorama.Fore.RED}✘" + DONE: str = f"{colorama.Fore.GREEN}DONE" + ERROR: str = f"{colorama.Fore.RED}ERROR" VERSION_DUMP_LINK: str = ( "https://raw.githubusercontent.com/funilrys/PyFunceble/master/version.yaml" @@ -202,3 +213,6 @@ PyFunceble.cli.storage_facility.get_output_directory(), OUTPUTS.parent_directory, ) + +# This one will store some extra messages to print to the user. +EXTRA_MESSAGES: Optional[List[str]] = [] diff --git a/PyFunceble/cli/system/integrator.py b/PyFunceble/cli/system/integrator.py index 5d9e8722..37446669 100644 --- a/PyFunceble/cli/system/integrator.py +++ b/PyFunceble/cli/system/integrator.py @@ -53,6 +53,8 @@ import os +import colorama + import PyFunceble.cli.facility import PyFunceble.cli.factory import PyFunceble.cli.storage @@ -151,6 +153,30 @@ def inject_into_config(self) -> "SystemIntegrator": return self + @SystemBase.ensure_args_is_given + def check_config(self) -> "SystemIntegrator": # pylint: disable=no-self-use + """ + Checks or do some sanity check of the configuration. + + This method will basically check that the common mistakes while mixing + configuration and CLI arguments are not found. + + .. warning:: + The messages are not directly printed, but rather stored in the + PyFunceble.cli.storage.EXTRA_MESSAGES list. + """ + + if ( + not PyFunceble.storage.CONFIGURATION.cli_testing.file_generation.hosts + and not PyFunceble.storage.CONFIGURATION.cli_testing.file_generation.plain + ): + PyFunceble.cli.storage.EXTRA_MESSAGES.append( + f"{colorama.Style.BRIGHT}{colorama.Fore.MAGENTA}Your setup won't " + "generate any output! " + "Reason: file_generation.hosts and file_generation.plain are " + "both disabled." + ) + @SystemBase.ensure_args_is_given def start(self) -> "SystemIntegrator": """ @@ -170,6 +196,7 @@ def start(self) -> "SystemIntegrator": PyFunceble.facility.Logger.debug("Given arguments:\n%r", self.args) self.inject_into_config() + self.check_config() PyFunceble.cli.facility.CredentialLoader.start() PyFunceble.cli.factory.DBSession.init_db_sessions() diff --git a/PyFunceble/cli/utils/version.py b/PyFunceble/cli/utils/version.py index cb416864..7451d540 100644 --- a/PyFunceble/cli/utils/version.py +++ b/PyFunceble/cli/utils/version.py @@ -60,6 +60,7 @@ import PyFunceble.cli.storage import PyFunceble.facility import PyFunceble.storage +from PyFunceble.cli.utils.stdout import print_single_line from PyFunceble.converter.internal_url import InternalUrlConverter from PyFunceble.helpers.dict import DictHelper from PyFunceble.helpers.download import DownloadHelper @@ -340,7 +341,7 @@ def handle_messages(upstream_version: Box) -> None: def print_central_messages(check_force_update: bool = False) -> None: """ - Compares the version with the upstream one and handle the messages. + Collect all possible messages from upstream and downstream and print them. """ upstream_version = get_upstream_version() @@ -355,3 +356,6 @@ def print_central_messages(check_force_update: bool = False) -> None: ) handle_messages(upstream_version) + + for extra_message in PyFunceble.cli.storage.EXTRA_MESSAGES: + print_single_line(extra_message, force=True) diff --git a/PyFunceble/config/compare.py b/PyFunceble/config/compare.py index 92b722b1..78b70926 100644 --- a/PyFunceble/config/compare.py +++ b/PyFunceble/config/compare.py @@ -261,6 +261,8 @@ def is_local_identical(self) -> bool: or "dns" not in self.local_config or "follow_server_order" not in self.local_config["dns"] or "trust_server" not in self.local_config["dns"] + or "collection" not in self.local_config + or "collection" not in self.local_config["lookup"] ): return False diff --git a/PyFunceble/config/loader.py b/PyFunceble/config/loader.py index 0fff9ad7..68a1ad3e 100644 --- a/PyFunceble/config/loader.py +++ b/PyFunceble/config/loader.py @@ -50,7 +50,6 @@ limitations under the License. """ -import copy import functools import os from typing import Any, Optional @@ -391,17 +390,17 @@ def start(self) -> "ConfigLoader": config = self.conditional_switch(config) PyFunceble.storage.CONFIGURATION = Box( - copy.deepcopy(config), + config, ) PyFunceble.storage.FLATTEN_CONFIGURATION = DictHelper( PyFunceble.storage.CONFIGURATION ).flatten() PyFunceble.storage.HTTP_CODES = Box( - copy.deepcopy(config["http_codes"]), - ) - PyFunceble.storage.LINKS = Box( - copy.deepcopy(config["links"]), + config["http_codes"], ) + if "collection" in config: + PyFunceble.storage.COLLECTION = Box(config["collection"]) + PyFunceble.storage.LINKS = Box(config["links"]) return self @@ -416,6 +415,7 @@ def destroy(self) -> "ConfigLoader": ) PyFunceble.storage.FLATTEN_CONFIGURATION = {} PyFunceble.storage.HTTP_CODES = Box({}) + PyFunceble.storage.COLLECTION = Box({}) PyFunceble.storage.LINKS = Box({}) except (AttributeError, TypeError): # pragma: no cover ## Safety. pass diff --git a/PyFunceble/data/infrastructure/.PyFunceble_production.yaml b/PyFunceble/data/infrastructure/.PyFunceble_production.yaml index 622bdc6d..1581f968 100644 --- a/PyFunceble/data/infrastructure/.PyFunceble_production.yaml +++ b/PyFunceble/data/infrastructure/.PyFunceble_production.yaml @@ -215,6 +215,9 @@ lookup: # Activates the usage of the reputation data reputation. reputation: False + # Activate the usage of the collection. + collection: False + # Sets the timeout to apply to each of our lookup tools who support a timeout # option. timeout: 5 @@ -345,3 +348,25 @@ links: # rewritten. api_date_format: "https://pyfunceble.funilrys.com/api/date-format" api_no_referrer: "https://pyfunceble.funilrys.com/api/no-referrer" + +collection: + # Provides everything related to the collection. + + # Provides the base of the URL to access to communicate with the collection. + url_base: https://collection.dead-hosts.funilrys.com + + # Activates the push of dataset into the collection. + # WARNING: This is useless, if you don't have an API Token set as the + # COLLECTION_API_TOKEN environment variable. + push: False + + # Sets the preferred pull origin. + # + # When choosing, `frequent`, the system will pull the most frequent status + # that is provided and calculated by the collection. + # + # When choosing, `latest`, the system will pull the latest status that is + # provided and available into the collection. + # + # Available: frequent | latest | recommended + preferred_status_origin: recommended diff --git a/PyFunceble/query/collection.py b/PyFunceble/query/collection.py new file mode 100644 index 00000000..c38ba350 --- /dev/null +++ b/PyFunceble/query/collection.py @@ -0,0 +1,566 @@ +""" +The tool to check the availability or syntax of domain, IP or URL. + +:: + + + ██████╗ ██╗ ██╗███████╗██╗ ██╗███╗ ██╗ ██████╗███████╗██████╗ ██╗ ███████╗ + ██╔══██╗╚██╗ ██╔╝██╔════╝██║ ██║████╗ ██║██╔════╝██╔════╝██╔══██╗██║ ██╔════╝ + ██████╔╝ ╚████╔╝ █████╗ ██║ ██║██╔██╗ ██║██║ █████╗ ██████╔╝██║ █████╗ + ██╔═══╝ ╚██╔╝ ██╔══╝ ██║ ██║██║╚██╗██║██║ ██╔══╝ ██╔══██╗██║ ██╔══╝ + ██║ ██║ ██║ ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗ + ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝ + +Provides ans interface which let us interact with the Collection API. + +Author: + Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom + +Special thanks: + https://pyfunceble.github.io/#/special-thanks + +Contributors: + https://pyfunceble.github.io/#/contributors + +Project link: + https://github.com/funilrys/PyFunceble + +Project documentation: + https://pyfunceble.readthedocs.io/en/latest/ + +Project homepage: + https://pyfunceble.github.io/ + +License: +:: + + + Copyright 2017, 2018, 2019, 2020, 2021 Nissar Chababy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +import json +import logging +from datetime import datetime +from typing import List, Optional, Union + +import requests +import requests.exceptions + +import PyFunceble.facility +import PyFunceble.storage +from PyFunceble.checker.availability.status import AvailabilityCheckerStatus +from PyFunceble.checker.reputation.status import ReputationCheckerStatus +from PyFunceble.checker.syntax.status import SyntaxCheckerStatus +from PyFunceble.helpers.environment_variable import EnvironmentVariableHelper + + +class CollectionQueryTool: + """ + Provides the interface to the collection dataset. + + :param token: + The token to use to communicate with the API. + + .. warning:: + If :code:`None` is given, the class constructor will try to load the + PYFUNCEBLE_COLLECTION_API_TOKEN environment variable. + + :param url_base: + The base of the URL to communicate with. + + :param preferred_status_origin: + The preferred data origin. + It can be :code:`frequent`, :code:`latest` or :code:`recommended`. + """ + + SUPPORTED_CHECKERS: List[str] = ["syntax", "reputation", "availability"] + SUPPORTED_STATUS_ORIGIN: List[str] = ["frequent", "latest", "recommended"] + + STD_URL_BASE: str = "http://localhost:8001" + STD_PREFERRED_STATUS_ORIGIN: str = "frequent" + + _token: Optional[str] = None + """ + The token to use while communicating with the collection API. + """ + + _url_base: Optional[str] = None + """ + The base of the URL to communicate with. + """ + + _preferred_status_origin: Optional[str] = None + """ + The preferred data origin + """ + + session: Optional[requests.Session] = None + + def __init__( + self, + *, + token: Optional[str] = None, + url_base: Optional[str] = None, + preferred_status_origin: Optional[str] = None, + ) -> None: + + if token is not None: + self.token = token + else: + self.token = EnvironmentVariableHelper( + "PYFUNCEBLE_COLLECTION_API_TOKEN" + ).get_value(default="") + + if url_base is not None: + self.url_base = url_base + else: + self.guess_and_set_url_base() + + if preferred_status_origin is not None: + self.preferred_status_origin = preferred_status_origin + else: + self.guess_and_set_preferred_status_origin() + + self.session = requests.Session() + self.session.headers.update( + { + "Authorization": f"Bearer {self.token}" if self.token else None, + "Content-Type": "application/json", + } + ) + + def __contains__(self, value: str) -> bool: + """ + Checks if the given value is in the collection. + + :param value: + The value to check. + """ + + return self.pull(value) is not None + + def __getitem__(self, value: str) -> Optional[dict]: + """ + Gets the information about the given value. + + :param value: + The value to get the information about. + """ + + return self.pull(value) + + @property + def token(self) -> Optional[str]: + """ + Provides the currently set token. + """ + + return self._token + + @token.setter + def token(self, value: str) -> None: + """ + Sets the value of the :code:`_token` attribute. + + :param value: + The value to set. + + :raise TypeError: + When the given :code:`value` is not a :py:class:`str` + """ + + if not isinstance(value, str): + raise TypeError(f" should be {str}, {type(value)} given.") + + self._token = value + + def set_token(self, value: str) -> "CollectionQueryTool": + """ + Sets the value of the :code:`_token` attribute. + + :param value: + The value to set. + """ + + self.token = value + + return self + + @property + def url_base(self) -> Optional[str]: + """ + Provides the value of the :code:`_url_base` attribute. + """ + + return self._url_base + + @url_base.setter + def url_base(self, value: str) -> None: + """ + Sets the base of the URL to work with. + + :param value: + The value to set. + + :raise TypeError: + When the given :code:`value` is not a :py:class:`str`. + + :raise ValueError: + When the given :code:`value` does not have a scheme. + """ + + if not isinstance(value, str): + raise TypeError(f" should be {str}, {type(value)} given.") + + if not value.startswith(("http", "https")): + raise ValueError( + f" is missing the scheme (http/https), {value} given." + ) + + self._url_base = value.rstrip("/") + + def set_url_base(self, value: str) -> "CollectionQueryTool": + """ + Sets the base of the URL to work with. + + :parma value: + The value to set. + """ + + self.url_base = value + + return self + + def guess_and_set_url_base(self) -> "CollectionQueryTool": + """ + Try to guess the URL base to work with. + """ + + if PyFunceble.facility.ConfigLoader.is_already_loaded(): + if isinstance(PyFunceble.storage.CONFIGURATION.collection.url_base, str): + self.url_base = PyFunceble.storage.CONFIGURATION.collection.url_base + else: + self.url_base = self.STD_URL_BASE + else: + self.url_base = self.STD_URL_BASE + + return self + + @property + def preferred_status_origin(self) -> Optional[str]: + """ + Provides the value of the :code:`_preferred_status_origin` attribute. + """ + + return self._preferred_status_origin + + @preferred_status_origin.setter + def preferred_status_origin(self, value: str) -> None: + """ + Sets the preferred status origin. + + :param value: + The value to set. + + :raise TypeError: + When the given :code:`value` is not a :py:class:`str`. + + :raise ValueError: + When the given :code:`value` is not supported. + """ + + if not isinstance(value, str): + raise TypeError(f" should be {str}, {type(value)} given.") + + if value not in self.SUPPORTED_STATUS_ORIGIN: + raise ValueError(f" ({value}) is not supported.") + + self._preferred_status_origin = value + + def set_preferred_status_origin(self, value: str) -> "CollectionQueryTool": + """ + Sets the preferred status origin. + + :parma value: + The value to set. + """ + + self.preferred_status_origin = value + + return self + + def guess_and_set_preferred_status_origin(self) -> "CollectionQueryTool": + """ + Try to guess the preferred status origin. + """ + + if PyFunceble.facility.ConfigLoader.is_already_loaded(): + if isinstance( + PyFunceble.storage.CONFIGURATION.collection.preferred_status_origin, str + ): + self.preferred_status_origin = ( + PyFunceble.storage.CONFIGURATION.collection.preferred_status_origin + ) + else: + self.preferred_status_origin = self.STD_PREFERRED_STATUS_ORIGIN + else: + self.preferred_status_origin = self.STD_PREFERRED_STATUS_ORIGIN + + return self + + def pull(self, subject: str) -> Optional[dict]: + """ + Pulls all data related to the subject or :py:class:`None` + + :param subject: + The subject to search for. + + :raise TypeError: + When the given :code:`subject` is not a :py:class:`str`. + + :return: + The response of the search. + """ + + logging.info("Starting to search subject: %r", subject) + + if not isinstance(subject, str): + raise TypeError(f" should be {str}, {type(subject)} given.") + + url = f"{self.url_base}/v1/subject/search" + + try: + response = self.session.post( + url, + json={"subject": subject}, + ) + + response_json = response.json() + + if response.status_code == 200: + logging.debug( + "Successfully search subject: %r. Response: %r", + subject, + response_json, + ) + + logging.info("Finished to search subject: %r", subject) + + return response_json + except (requests.RequestException, json.decoder.JSONDecodeError): + response_json = {} + + logging.debug( + "Failed to search subject: %r. Response: %r", subject, response_json + ) + logging.info("Finished to search subject: %r", subject) + + return None + + def push( + self, + checker_status: Union[ + AvailabilityCheckerStatus, SyntaxCheckerStatus, ReputationCheckerStatus + ], + ) -> Optional[dict]: + """ + Push the given status to the collection. + + :param checker_status: + The status to push. + + :raise TypeError: + - When the given :code:`checker_status` is not a + :py:class:`AvailabilityCheckerStatus`, + :py:class:`SyntaxCheckerStatus` or + :py:class:`ReputationCheckerStatus`. + + - When the given :code:`checker_status.subject` is not a + :py:class:`str`. + + :raise ValueError: + When the given :code:`checker_status.subject` is empty. + """ + + if not isinstance( + checker_status, + (AvailabilityCheckerStatus, SyntaxCheckerStatus, ReputationCheckerStatus), + ): + raise TypeError( + f" should be {AvailabilityCheckerStatus}, " + f"{SyntaxCheckerStatus} or {ReputationCheckerStatus}, " + f"{type(checker_status)} given." + ) + + if not isinstance(checker_status.subject, str): + raise TypeError( + f" should be {str}, " + f"{type(checker_status.subject)} given." + ) + + if not isinstance(checker_status.checker_type, str): + raise TypeError( + f" should be {str}, " + f"{type(checker_status.subject)} given." + ) + + if checker_status.subject == "": + raise ValueError(" cannot be empty.") + + status_to_subject = { + "status": checker_status.status, + "status_source": checker_status.status_source, + "tested_at": checker_status.tested_at.isoformat(), + "subject": checker_status.idna_subject, + } + + if ( + hasattr(checker_status, "expiration_date") + and checker_status.expiration_date + ): + self.__push_whois( + { + "subject": checker_status.idna_subject, + "expiration_date": datetime.strptime( + checker_status.expiration_date, "%d-%b-%Y" + ).isoformat(), + } + ) + + data = self.__push_status( + checker_status.checker_type.lower(), status_to_subject + ) + + return data + + def guess_all_settings( + self, + ) -> "CollectionQueryTool": # pragma: no cover ## Underlying tested + """ + Try to guess all settings. + """ + + to_ignore = ["guess_all_settings"] + + for method in dir(self): + if method in to_ignore or not method.startswith("guess_"): + continue + + getattr(self, method)() + + return self + + def __push_status(self, checker_type: str, data: dict) -> Optional[dict]: + """ + Submits the given status to the collection. + + :param checker_type: + The type of the checker. + :param data: + The data to submit. + + :raise TypeError: + - When :code:`checker_type` is not a :py:class:`str`. + + - When :code:`data` is not a :py:class:`dict`. + + :raise ValueError: + When the given :code:`checker_type` is not a subject checker type. + """ + + if not self.token: + return None + + if checker_type not in self.SUPPORTED_CHECKERS: + raise ValueError(f" ({checker_type}) is not supported.") + + logging.info("Starting to submit status: %r", data) + + url = f"{self.url_base}/v1/status/{checker_type}" + + try: + response = self.session.post( + url, + json=data, + ) + + response_json = response.json() + + if response.status_code == 200: + logging.debug("Successfully submitted data: %r to %s", data, url) + + logging.info("Finished to submit status: %r", data) + return response_json + except (requests.RequestException, json.decoder.JSONDecodeError): + response_json = {} + + logging.debug( + "Failed to submit data: %r to %s. Response: %r", data, url, response_json + ) + + logging.info("Finished to submit status: %r", data) + + return None + + def __push_whois(self, data: dict) -> Optional[dict]: + """ + Submits the given WHOIS data into the given subject. + + :param checker_type: + The type of the checker. + :param data: + The data to submit. + + :raise TypeError: + - When :code:`checker_type` is not a :py:class:`str`. + + - When :code:`data` is not a :py:class:`dict`. + + :raise ValueError: + When the given :code:`checker_type` is not a subject checker type. + """ + + if not self.token: + return None + + if not isinstance(data, dict): # pragma: no cover ## Should never happen + raise TypeError(f" should be {dict}, {type(data)} given.") + + logging.info("Starting to submit WHOIS: %r", data) + + url = f"{self.url_base}/v1/status/whois" + + try: + response = self.session.post( + url, + json=data, + ) + + response_json = response.json() + + if response.status_code == 200: + logging.debug("Successfully submitted WHOIS data: %r to %s", data, url) + + logging.info("Finished to submit WHOIS: %r", data) + return response_json + except (requests.RequestException, json.decoder.JSONDecodeError): + response_json = {} + + logging.debug( + "Failed to WHOIS data: %r to %s. Response: %r", data, url, response_json + ) + + logging.info("Finished to submit WHOIS: %r", data) + return None diff --git a/PyFunceble/query/http_status_code.py b/PyFunceble/query/http_status_code.py index 4cd74554..b4e38611 100644 --- a/PyFunceble/query/http_status_code.py +++ b/PyFunceble/query/http_status_code.py @@ -357,7 +357,11 @@ def get_status_code(self) -> int: req.url ).get_converted() - if not self.allow_redirects and first_origin != final_origin: + if ( + not self.allow_redirects + and first_origin != final_origin + and req.history + ): return req.history[0].status_code return req.status_code diff --git a/PyFunceble/query/requests/adapter/base.py b/PyFunceble/query/requests/adapter/base.py index 0a291887..ec129c3c 100644 --- a/PyFunceble/query/requests/adapter/base.py +++ b/PyFunceble/query/requests/adapter/base.py @@ -80,7 +80,11 @@ def __init__(self, *args, **kwargs): total=kwargs["max_retries"], respect_retry_after_header=False ) - self.dns_query_tool = DNSQueryTool().guess_all_settings() + if "dns_query_tool" in kwargs: + self.dns_query_tool = kwargs["dns_query_tool"] + del kwargs["dns_query_tool"] + else: + self.dns_query_tool = DNSQueryTool() super().__init__(*args, **kwargs) @@ -176,6 +180,8 @@ def resolve(self, hostname: str) -> Optional[str]: Resolves with the prefered method. """ - if self.resolving_use_cache: - return self.resolve_with_cache(hostname) - return self.resolve_without_cache(hostname) + if hostname: + if self.resolving_use_cache: + return self.resolve_with_cache(hostname) + return self.resolve_without_cache(hostname) + return None diff --git a/PyFunceble/query/requests/requester.py b/PyFunceble/query/requests/requester.py index 85f54b0c..9846fb98 100644 --- a/PyFunceble/query/requests/requester.py +++ b/PyFunceble/query/requests/requester.py @@ -61,6 +61,7 @@ import PyFunceble.facility import PyFunceble.storage from PyFunceble.dataset.user_agent import UserAgentDataset +from PyFunceble.query.dns.query_tool import DNSQueryTool from PyFunceble.query.requests.adapter.http import RequestHTTPAdapter from PyFunceble.query.requests.adapter.https import RequestHTTPSAdapter @@ -91,6 +92,7 @@ class Requester: _max_redirects: int = 60 session: Optional[requests.Session] = None + dns_query_tool: Optional[DNSQueryTool] = None def __init__( self, @@ -99,6 +101,7 @@ def __init__( verify_certificate: Optional[bool] = None, timeout: Optional[float] = None, max_redirects: Optional[int] = None, + dns_query_tool: Optional[DNSQueryTool] = None, ) -> None: if max_retries is not None: self.max_retries = max_retries @@ -116,6 +119,11 @@ def __init__( if max_redirects is not None: self.max_redirects = max_redirects + if dns_query_tool is not None: + self.dns_query_tool = dns_query_tool + else: + self.dns_query_tool = DNSQueryTool() + self.session = self.get_session() warnings.simplefilter("ignore", urllib3.exceptions.InsecureRequestWarning) @@ -408,11 +416,19 @@ def get_session(self) -> requests.Session: session.mount( "https://", - RequestHTTPSAdapter(max_retries=self.max_retries, timeout=self.timeout), + RequestHTTPSAdapter( + max_retries=self.max_retries, + timeout=self.timeout, + dns_query_tool=self.dns_query_tool, + ), ) session.mount( "http://", - RequestHTTPAdapter(max_retries=self.max_retries, timeout=self.timeout), + RequestHTTPAdapter( + max_retries=self.max_retries, + timeout=self.timeout, + dns_query_tool=self.dns_query_tool, + ), ) custom_headers = {"User-Agent": UserAgentDataset().get_latest()} diff --git a/PyFunceble/storage.py b/PyFunceble/storage.py index d80b69d9..30963eed 100644 --- a/PyFunceble/storage.py +++ b/PyFunceble/storage.py @@ -61,7 +61,7 @@ from PyFunceble.storage_facility import get_config_directory PROJECT_NAME: str = "PyFunceble" -PROJECT_VERSION: str = "4.0.1. (Blue Duckling: Chestnut)" +PROJECT_VERSION: str = "4.0.2. (Blue Duckling: Grandiflora)" DISTRIBUTED_CONFIGURATION_FILENAME: str = ".PyFunceble_production.yaml" DISTRIBUTED_DIR_STRUCTURE_FILENAME: str = "dir_structure_production.json" @@ -124,6 +124,7 @@ frozen_box=True, ) HTTP_CODES: Optional[Box] = Box({}) +COLLECTION: Optional[Box] = Box({}) LINKS: Optional[Box] = Box({}) diff --git a/PyFunceble/utils/profile.py b/PyFunceble/utils/profile.py index 258595cf..d43cf3a7 100644 --- a/PyFunceble/utils/profile.py +++ b/PyFunceble/utils/profile.py @@ -54,7 +54,9 @@ import contextlib import cProfile import io +import linecache import pstats +import tracemalloc @contextlib.contextmanager @@ -89,3 +91,43 @@ def profile_it(*, sort_stats: str = "cumulative", show_callers: bool = False): profiler_starts.print_callees() print(our_stream.getvalue()) + + +@contextlib.contextmanager +def profile_memory(stats_mode: str = "lineno", top_limit: int = 10): + """ + Provides a context manager which will activates memory profiling of our + source code. + """ + + tracemalloc.start() + + yield + + snapshot = tracemalloc.take_snapshot() + snapshot = snapshot.filter_traces( + ( + tracemalloc.Filter(False, ""), + tracemalloc.Filter(False, ""), + tracemalloc.Filter(False, ""), + ) + ) + top_stats = snapshot.statistics(stats_mode) + + print("Top %s lines" % top_limit) + for index, stat in enumerate(top_stats[:top_limit], 1): + frame = stat.traceback[0] + print( + "#%s: %s:%s: %.1f KiB" + % (index, frame.filename, frame.lineno, stat.size / 1024) + ) + line = linecache.getline(frame.filename, frame.lineno).strip() + if line: + print(" %s" % line) + + other = top_stats[top_limit:] + if other: + size = sum(stat.size for stat in other) + print("%s other: %.1f KiB" % (len(other), size / 1024)) + total = sum(stat.size for stat in top_stats) + print("Total allocated size: %.1f KiB" % (total / 1024)) diff --git a/README.rst b/README.rst index 09151773..28c1c56b 100644 --- a/README.rst +++ b/README.rst @@ -151,6 +151,7 @@ contribution(s) and or issue report which made or make `PyFunceble`_ a better to - Avinash Reddy - `@AvinashReddy3108`_ - Daniel - `@dnmTX`_ - hawkeye116477 - `@hawkeye116477`_ +- Human Being - `@T145`_ - Imre Kristoffer Eilertsen - `@DandelionSprout`_ - jawz101 - `@jawz101`_ - keczuppp - `@keczuppp`_ @@ -290,6 +291,7 @@ License .. _@SMed79: https://github.com/SMed79 .. _@speedmann: https://github.com/speedmann .. _@spirillen: https://www.mypdns.org/p/Spirillen/ +.. _@T145: https://github.com/T145 .. _@tartley: https://github.com/tartley .. _@theskumar: https://github.com/theskumar .. _@Wally3K: https://github.com/WaLLy3K diff --git a/docs/code/PyFunceble.query.rst b/docs/code/PyFunceble.query.rst index 16680895..2fb35679 100644 --- a/docs/code/PyFunceble.query.rst +++ b/docs/code/PyFunceble.query.rst @@ -16,6 +16,14 @@ Subpackages Submodules ---------- +PyFunceble.query.collection module +---------------------------------- + +.. automodule:: PyFunceble.query.collection + :members: + :undoc-members: + :show-inheritance: + PyFunceble.query.http\_status\_code module ------------------------------------------ diff --git a/docs/components/special-rules.rst b/docs/components/special-rules.rst index 0e7a0054..f81254ba 100644 --- a/docs/components/special-rules.rst +++ b/docs/components/special-rules.rst @@ -65,6 +65,14 @@ supplied as :code:`INACTIVE`. ------ +:code:`*.fc2.com` +""""""""""""""""" + +Any subjects matching the given pattern and the :code:`error.fc2.com` subdomain +is into the `Location` headers are supplied as :code:`INACTIVE`. + +------ + :code:`*.github.io` """"""""""""""""""" diff --git a/docs/configuration/collection.rst b/docs/configuration/collection.rst new file mode 100644 index 00000000..6af0c630 --- /dev/null +++ b/docs/configuration/collection.rst @@ -0,0 +1,44 @@ +:code:`collection` +^^^^^^^^^^^^^^^^^^ + + **Type:** :code:`dict` + + **Description:** Configures everything related to the interaction with the + collection API. + +:code:`collection[url_base]` +"""""""""""""""""""""""""""" + + **Type:** :code:`str` + + **Default value:** :code:`http://localhost:8080` + + **Description:** Sets the base URL of the collection API. + +:code:`collection[push]` +"""""""""""""""""""""""" + + **Type:** :code:`bool` + + **Default value:** :code:`False` + + **Description:** Activates or disables the push of the test datasets to the + collection API. + + + .. warning:: + + This argument is useless if the :code:`PYFUNCEBLE_COLLECTION_API` + environment variable is not defined. + +:code:`collection[preferred_status_origin]` +""""""""""""""""""""""""""""""""""""""""""" + + **Type:** :code:`str` + + **Default value:** :code:`frequent` + + **Available values:** :code:`frequent`, :code:`latest` , :code:`recommended` + + **Description:** Sets the preferred status origin when fetching data from + the collection diff --git a/docs/configuration/indexes.rst b/docs/configuration/indexes.rst index 15b017d2..6cc3a2e1 100644 --- a/docs/configuration/indexes.rst +++ b/docs/configuration/indexes.rst @@ -13,4 +13,5 @@ This page will try to detail each configuration available into :code:`.PyFuncebl .. include:: user-agent.rst .. include:: http-codes.rst .. include:: links.rst +.. include:: collection.rst diff --git a/docs/configuration/lookup.rst b/docs/configuration/lookup.rst index 959dd525..6d62707a 100644 --- a/docs/configuration/lookup.rst +++ b/docs/configuration/lookup.rst @@ -76,3 +76,12 @@ **Description:** Sets the default timeout to apply to each lookup utilities everytime it is possible to define a timeout. +:code:`lookup[collection]` +"""""""""""""""""""""""""" + + **Type:** :code:`boolean` + + **Default value:** :code:`True` + + **Description:** Activates or disables the usage of the collection dataset + whether possible. diff --git a/docs/contributors.rst b/docs/contributors.rst index 68728392..375d7214 100644 --- a/docs/contributors.rst +++ b/docs/contributors.rst @@ -18,6 +18,7 @@ contribution(s) and or issue report which made or make `PyFunceble`_ a better to - Avinash Reddy - `@AvinashReddy3108`_ - Daniel - `@dnmTX`_ - hawkeye116477 - `@hawkeye116477`_ +- Human Being - `@T145`_ - Imre Kristoffer Eilertsen - `@DandelionSprout`_ - jawz101 - `@jawz101`_ - keczuppp - `@keczuppp`_ @@ -56,6 +57,7 @@ contribution(s) and or issue report which made or make `PyFunceble`_ a better to .. _@sjhgvr: https://github.com/sjhgvr .. _@speedmann: https://github.com/speedmann .. _@spirillen: https://mypdns.org/spirillen +.. _@T145: https://github.com/T145 .. _@Wally3K: https://github.com/WaLLy3K .. _@xxcriticxx: https://github.com/xxcriticxx .. _@ybreza: https://github.com/ybreza diff --git a/docs/facts/they-use-d-it.rst b/docs/facts/they-use-d-it.rst index c2aabd14..7c08a64e 100644 --- a/docs/facts/they-use-d-it.rst +++ b/docs/facts/they-use-d-it.rst @@ -24,12 +24,14 @@ PyFunceble! * `KADhosts`_ * `MobileAdTrackers`_ * `My Privacy DNS`_ +* `Phishing`_ * `Phishing-URL-Testing-Database-of-Link-Statuses`_ * `Phishing.Database`_ * `polish-adblock-filters`_ * `polish-pihole-filters`_ -* `porn-records`_ -* `pornhosts`_ +* `Porn Records`_ (**NSFW**: A Anti pornographic project) +* `porn-hosts`_ (Formerly `https://github.com/Clefspeare13/pornhosts`) (**NSFW**: A Anti pornographic project) +* `pornhosts`_ (**NSFW**: A Anti pornographic project) * `Steven Black ad-hoc list`_ * `Stop.Google.Analytics.Ghost.Spam.HOWTO`_ * `The-Big-List-of-Hacked-Malware-Web-Sites`_ @@ -58,13 +60,15 @@ PyFunceble! .. _Google AdService and ID.Google tracking hosts : https://github.com/kowith337/PersonalFilterListCollection/blob/master/hosts/hosts_google_adservice_id.txt .. _KADhosts : https://github.com/azet12/KADhosts .. _MobileAdTrackers : https://github.com/jawz101/MobileAdTrackers -.. _My Privacy DNS : https://www.mypdns.org +.. _My Privacy DNS : https://mypdns.org +.. _Phishing : https://github.com/mitchellkrogza/Phishing .. _Phishing-URL-Testing-Database-of-Link-Statuses : https://github.com/mitchellkrogza/Phishing-URL-Testing-Database-of-Link-Statuses .. _Phishing.Database : https://github.com/mitchellkrogza/Phishing.Database .. _polish-adblock-filters : https://github.com/MajkiIT/polish-ads-filter/blob/master/polish-adblock-filters/adblock.txt .. _polish-pihole-filters : https://github.com/MajkiIT/polish-ads-filter/blob/master/polish-pihole-filters/hostfile.txt -.. _porn-records : https://mypdns.org/my-privacy-dns/porn-records -.. _pornhosts : https://github.com/Import-External-Sources/pornhosts +.. _Porn Records : https://mypdns.org/my-privacy-dns/porn-records +.. _porn-hosts : https://mypdns.org/clefspeare13/pornhosts +.. _pornhosts : https://mypdns.org/import-external-sources/pornhosts .. _Steven Black ad-hoc list : https://github.com/StevenBlack/hosts/blob/master/data/StevenBlack/hosts .. _Stop.Google.Analytics.Ghost.Spam.HOWTO : https://github.com/mitchellkrogza/Stop.Google.Analytics.Ghost.Spam.HOWTO .. _The-Big-List-of-Hacked-Malware-Web-Sites : https://github.com/mitchellkrogza/The-Big-List-of-Hacked-Malware-Web-Sites diff --git a/docs/responses/api-explained.rst b/docs/responses/api-explained.rst index 1313cbcd..813dd933 100644 --- a/docs/responses/api-explained.rst +++ b/docs/responses/api-explained.rst @@ -52,7 +52,8 @@ With the availability checker, the following is provided. "use_netinfo_lookup": true, "use_reputation_lookup": false, "use_whois_db": true, - "use_whois_lookup": false + "use_whois_lookup": false, + "use_collection": false } :code:`do_syntax_check_first` @@ -110,6 +111,12 @@ the status of the given subject. This parameter lets the checker know that is it allowed to perform a WHOIS lookup to determine the status of the given subject. +:code:`use_collection` +~~~~~~~~~~~~~~~~~~~~~~ + +This parameter lets the checker know that it is allowed to perform a lookup into +the collection API before starting an extensive local test. + Reputation Checker """""""""""""""""" @@ -118,7 +125,8 @@ With the availability checker, the following is provided. .. code-block:: json { - "do_syntax_check_first": false + "do_syntax_check_first": false, + "use_collection": false } :code:`do_syntax_check_first` @@ -128,6 +136,12 @@ This parameter lets the checker know that it has to do a syntax check before starting an extensive test. Meaning that the status strongly depends on the caught syntax. +:code:`use_collection` +~~~~~~~~~~~~~~~~~~~~~~ + +This parameter lets the checker know that it is allowed to perform a lookup into +the collection API before starting an extensive local test. + :code:`status` ^^^^^^^^^^^^^^ @@ -194,6 +208,7 @@ It should be one of the following: - :code:`NETINFO` - :code:`HTTP CODE` - :code:`SPECIAL` (extra rules) +- :code:`COLLECTION` :code:`status_source_after_extra_rules` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -224,6 +239,7 @@ It should be one of the following: - :code:`DNSLOOKUP` - :code:`NETINFO` - :code:`HTTP CODE` +- :code:`COLLECTION` If no rules were matched, :code:`null` is provided. diff --git a/docs/responses/api.rst b/docs/responses/api.rst index 9a56a3da..1c817a7d 100644 --- a/docs/responses/api.rst +++ b/docs/responses/api.rst @@ -63,7 +63,8 @@ Availability Checker "use_netinfo_lookup": true, "use_reputation_lookup": false, "use_whois_db": true, - "use_whois_lookup": false + "use_whois_lookup": false, + "use_collection": false }, "second_level_domain_syntax": true, "status": "ACTIVE", @@ -119,7 +120,8 @@ Reputation Checker "ipv6_range_syntax": false, "ipv6_syntax": false, "params": { - "do_syntax_check_first": false + "do_syntax_check_first": false, + "use_collection": false }, "second_level_domain_syntax": true, "status": "SANE", diff --git a/docs/usage/terminal.rst b/docs/usage/terminal.rst index dbdc9937..0daa7caa 100644 --- a/docs/usage/terminal.rst +++ b/docs/usage/terminal.rst @@ -366,6 +366,32 @@ the limitation which does not apply to private networks. **Default value:** :code:`local_network: False` +------ + +:code:`--collection-preferred-origin` +""""""""""""""""""""""""""""""""""""" + +.. versionadded: 4.0.0 + +Sets the preferred status origin. + +**Default value:** :code:`collection.preferred_status_origin: frequent` + +**Available values:** :code:`frequent`, :code:`latest`, :code:`recommended` + + +------ + +:code:`--collection-lookup` +""""""""""""""""""""""""""" + +.. versionadded: 4.0.0 + +Activates or disables the usage of the collection lookup whether possible. + +**Default value:** :code:`lookup.collection: False` + +Want to take advantage of the collection API ? This argument is for you. ------ @@ -1041,6 +1067,23 @@ Activates or disables the display of output to the terminal. **Default value:** :code:`quiet: False` +------ + +:code:`--push-collection` +"""""""""""""""""""""""" + +.. versionadded: 4.0.0 + +Activates or disables the push of the test results into the collection API. + +**Default value:** :code:`collection.push: False` + +Want to take submit data into the collection API ? This argument is for you. + +.. warning:: + + This argument is useless if the :code:`PYFUNCEBLE_COLLECTION_API` environment + variable is not defined. ------ @@ -1321,51 +1364,53 @@ are set. If used in a script like bash or a terminal directly you have to use the :code:`export` as PyFunceble is running as sub-processes -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| **Environment Variable** | **How to use them?** | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_AUTO_CONFIGURATION` | Tell us if we have to install/update the configuration file automatically. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_DB_CHARSET` | Tell us the MariaDB charset to use. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_DB_HOST` | Tell us the host or the Unix socket (absolute file path) of the MariaDB database. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_DB_NAME` | Tell us the name of the MariaDB database to use. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_DB_PASSWORD` | Tell us the MariaDB user password to use. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_DB_PORT` | Tell us the MariaDB connection port to use. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_DB_USERNAME` | Tell us the MariaDB user-name to use. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_DEBUG` | Tell us to log everything into the :code:`output/logs/*.log` files. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_DEBUG_LVL` | Sets the logging level to use. :ref:`logging-level` | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_LOGGING_LVL` | Same as :code:`PYFUNCEBLE_DEBUG_LVL`. :ref:`logging-level` | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_DEBUG_ON_SCREEN` | Tell us to log everything to :code:`stdout` bool (true | false) | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_CONFIG_DIR` | Tell us the location of the directory to use as the configuration directory. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`PYFUNCEBLE_OUTPUT_LOCATION` | Tell us where we should generate the :code:`output/` directory. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`APPDATA` | Used under Windows to construct/get the configuration directory if :code:`PYFUNCEBLE_CONFIG_DIR` is not found. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`GH_TOKEN` | Tell us the GitHub token to set into the repository configuration when using PyFunceble under Travis CI. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`GL_TOKEN` | Tell us the GitLab token to set into the repository configuration when using PyFunceble under GitLab CI/CD. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`GIT_EMAIL` | Tell us the :code:`git.email` configuration to set when using PyFunceble under any supported CI environment. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`GIT_NAME` | Tell us the :code:`git.name` configuration to set when using PyFunceble under any supported CI environment. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`TRAVIS_BUILD_DIR` | Used to confirm that we are running under a Travis CI container. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`GITLAB_CI` | Used to confirm that we are running under a GitLab CI/CD environment. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ -| :code:`GITLAB_USER_ID` | Used to confirm that we are running under a GitLab CI/CD environment. | -+---------------------------------------+----------------------------------------------------------------------------------------------------------------------+ ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| **Environment Variable** | **How to use them?** | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_AUTO_CONFIGURATION` | Tell us if we have to install/update the configuration file automatically. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_COLLECTION_API_TOKEN` | Sets the API token to use when pushing data into the collection API. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_CONFIG_DIR` | Tell us the location of the directory to use as the configuration directory. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_DB_CHARSET` | Tell us the MariaDB charset to use. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_DB_HOST` | Tell us the host or the Unix socket (absolute file path) of the MariaDB database. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_DB_NAME` | Tell us the name of the MariaDB database to use. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_DB_PASSWORD` | Tell us the MariaDB user password to use. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_DB_PORT` | Tell us the MariaDB connection port to use. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_DB_USERNAME` | Tell us the MariaDB user-name to use. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_DEBUG` | Tell us to log everything into the :code:`output/logs/*.log` files. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_DEBUG_LVL` | Sets the logging level to use. :ref:`logging-level` | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_DEBUG_ON_SCREEN` | Tell us to log everything to :code:`stdout` bool (true | false) | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_LOGGING_LVL` | Same as :code:`PYFUNCEBLE_DEBUG_LVL`. :ref:`logging-level` | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`PYFUNCEBLE_OUTPUT_LOCATION` | Tell us where we should generate the :code:`output/` directory. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`APPDATA` | Used under Windows to construct/get the configuration directory if :code:`PYFUNCEBLE_CONFIG_DIR` is not found. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`GH_TOKEN` | Tell us the GitHub token to set into the repository configuration when using PyFunceble under Travis CI. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`GL_TOKEN` | Tell us the GitLab token to set into the repository configuration when using PyFunceble under GitLab CI/CD. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`GIT_EMAIL` | Tell us the :code:`git.email` configuration to set when using PyFunceble under any supported CI environment. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`GIT_NAME` | Tell us the :code:`git.name` configuration to set when using PyFunceble under any supported CI environment. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`TRAVIS_BUILD_DIR` | Used to confirm that we are running under a Travis CI container. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`GITLAB_CI` | Used to confirm that we are running under a GitLab CI/CD environment. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ +| :code:`GITLAB_USER_ID` | Used to confirm that we are running under a GitLab CI/CD environment. | ++-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+ Global overview @@ -1447,6 +1492,12 @@ Global overview --local Activates or disables the consideration of the test(s) in or for a local or private network context. Configured value: False + --collection-preferred-origin {frequent,latest,recommended} + Sets the preferred status origin. + Configured value: 'frequent' + --collection-lookup Activates or disables the usage of the Collection lookup + whether possible. + Configured value: False --dns-lookup Activates or disables the usage of the DNS lookup whether possible. Configured value: True @@ -1575,6 +1626,9 @@ Global overview -q, --quiet Activates or disables the display of output to the terminal. Configured value: False + --push-collection Activates or disables the push of the test results into the + collection API. + Configured value: False -s, --simple Activates or disables the simple output mode. Configured value: False diff --git a/tests/checker/availability/test_base.py b/tests/checker/availability/test_base.py index 8d9838af..710a4b61 100644 --- a/tests/checker/availability/test_base.py +++ b/tests/checker/availability/test_base.py @@ -1244,6 +1244,123 @@ def test_try_to_query_status_from_syntax_lookup(self) -> None: self.assertEqual(expected_source, actual_source) + @staticmethod + def fake_pull_response(subject: str) -> dict: + """ + Provides a fake pull response to work with. + + :param subject: + The subject to work with. + """ + + return { + "subject": subject, + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "status": { + "syntax": { + "latest": { + "status": "INVALID", + "status_source": "SYNTAX", + "tested_at": "2021-09-28T19:32:07.167Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + }, + "frequent": "VALID", + "recommended": "VALID", + }, + "availability": { + "latest": { + "status": "INACTIVE", + "status_source": "DNSLOOKUP", + "tested_at": "2021-09-28T19:32:07.167Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + }, + "frequent": "ACTIVE", + "recommended": "ACTIVE", + }, + "reputation": { + "latest": { + "status": "MALICIOUS", + "status_source": "REPUTATION", + "tested_at": "2021-09-28T19:32:07.167Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + }, + "frequent": "SANE", + "recommended": "MALICIOUS", + }, + "whois": { + "expiration_date": "2021-09-28T19:32:07.167Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "subject_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + }, + }, + } + + @staticmethod + def fake_response_no_data(_: str) -> None: + """ + Provides an empty response. + """ + + return None + + def test_try_to_query_status_from_collection(self) -> None: + """ + Tests the method that tries to define the status from the collection lookup. + """ + + # Let's check the case that the subject is known. + self.checker.subject = "example.com" + self.checker.collection_query_tool.preferred_status_origin = "frequent" + self.checker.collection_query_tool.pull = self.fake_pull_response + + self.checker.try_to_query_status_from_collection() + + expected_status = "ACTIVE" + actual_status = self.checker.status.status + self.assertEqual(expected_status, actual_status) + + expected_status_source = "COLLECTION" + actual_status_source = self.checker.status.status_source + self.assertEqual(expected_status_source, actual_status_source) + + self.checker.collection_query_tool.preferred_status_origin = "latest" + + self.checker.try_to_query_status_from_collection() + + expected_status = "INACTIVE" + actual_status = self.checker.status.status + self.assertEqual(expected_status, actual_status) + + expected_status_source = "COLLECTION" + actual_status_source = self.checker.status.status_source + self.assertEqual(expected_status_source, actual_status_source) + + self.checker.collection_query_tool.preferred_status_origin = "recommended" + + self.checker.try_to_query_status_from_collection() + + expected_status = "ACTIVE" + actual_status = self.checker.status.status + self.assertEqual(expected_status, actual_status) + + expected_status_source = "COLLECTION" + actual_status_source = self.checker.status.status_source + self.assertEqual(expected_status_source, actual_status_source) + + # Let's check the case that the subject is unknown. + self.checker.subject = "102117110105108114121115" + self.checker.collection_query_tool.pull = self.fake_response_no_data + + self.checker.try_to_query_status_from_collection() + + expected_status = None + actual_status = self.checker.status.status + self.assertEqual(expected_status, actual_status) + + expected_status_source = None + actual_status_source = self.checker.status.status_source + self.assertEqual(expected_status_source, actual_status_source) + def test_get_status(self) -> None: """ Tests the method that let us get the whole status object. diff --git a/tests/checker/reputation/test_base.py b/tests/checker/reputation/test_base.py index 07f6ac38..378c36a3 100644 --- a/tests/checker/reputation/test_base.py +++ b/tests/checker/reputation/test_base.py @@ -451,6 +451,123 @@ def fake_try_to_query_status_from_dns_lookup(): self.assertEqual(expected_status, actual_status) self.assertEqual(expected_source, actual_source) + @staticmethod + def fake_pull_response(subject: str) -> dict: + """ + Provides a fake pull response to work with. + + :param subject: + The subject to work with. + """ + + return { + "subject": subject, + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "status": { + "syntax": { + "latest": { + "status": "VALID", + "status_source": "SYNTAX", + "tested_at": "2021-09-28T19:32:07.167Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + }, + "frequent": "INVALID", + "recommended": "VALID", + }, + "availability": { + "latest": { + "status": "ACTIVE", + "status_source": "WHOIS", + "tested_at": "2021-09-28T19:32:07.167Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + }, + "frequent": "INACTIVE", + "recommended": "ACTIVE", + }, + "reputation": { + "latest": { + "status": "SANE", + "status_source": "REPUTATION", + "tested_at": "2021-09-28T19:32:07.167Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + }, + "frequent": "MALICIOUS", + "recommended": "MALICIOUS", + }, + "whois": { + "expiration_date": "2021-09-28T19:32:07.167Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "subject_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + }, + }, + } + + @staticmethod + def fake_response_no_data(_: str) -> None: + """ + Provides an empty response. + """ + + return None + + def test_try_to_query_status_from_collection(self) -> None: + """ + Tests the method that tries to define the status from the collection lookup. + """ + + # Let's check the case that the subject is known. + self.checker.subject = "example.com" + self.checker.collection_query_tool.preferred_status_origin = "frequent" + self.checker.collection_query_tool.pull = self.fake_pull_response + + self.checker.try_to_query_status_from_collection() + + expected_status = "MALICIOUS" + actual_status = self.checker.status.status + self.assertEqual(expected_status, actual_status) + + expected_status_source = "COLLECTION" + actual_status_source = self.checker.status.status_source + self.assertEqual(expected_status_source, actual_status_source) + + self.checker.collection_query_tool.preferred_status_origin = "latest" + + self.checker.try_to_query_status_from_collection() + + expected_status = "SANE" + actual_status = self.checker.status.status + self.assertEqual(expected_status, actual_status) + + expected_status_source = "COLLECTION" + actual_status_source = self.checker.status.status_source + self.assertEqual(expected_status_source, actual_status_source) + + self.checker.collection_query_tool.preferred_status_origin = "recommended" + + self.checker.try_to_query_status_from_collection() + + expected_status = "MALICIOUS" + actual_status = self.checker.status.status + self.assertEqual(expected_status, actual_status) + + expected_status_source = "COLLECTION" + actual_status_source = self.checker.status.status_source + self.assertEqual(expected_status_source, actual_status_source) + + # Let's check the case that the subject is unknown. + self.checker.subject = "102117110105108114121115" + self.checker.collection_query_tool.pull = self.fake_response_no_data + + self.checker.try_to_query_status_from_collection() + + expected_status = None + actual_status = self.checker.status.status + self.assertEqual(expected_status, actual_status) + + expected_status_source = None + actual_status_source = self.checker.status.status_source + self.assertEqual(expected_status_source, actual_status_source) + def test_get_status(self) -> None: """ Tests the method which let us query the status to interact with. diff --git a/tests/checker/test_base.py b/tests/checker/test_base.py index 555c168d..65fa03c9 100644 --- a/tests/checker/test_base.py +++ b/tests/checker/test_base.py @@ -53,6 +53,7 @@ import unittest from PyFunceble.checker.base import CheckerBase, CheckerStatusBase +from PyFunceble.config.loader import ConfigLoader class TestCheckerBase(unittest.TestCase): @@ -296,6 +297,106 @@ def test_set_do_syntax_check_first_not_bool(self) -> None: TypeError, lambda: self.checker.set_do_syntax_check_first(given) ) + def test_set_use_collection_return(self) -> None: + """ + Tests the response of the method which let us define that we want to + interact with the collection. + """ + + given = True + + actual = self.checker.set_use_collection(given) + + self.assertIsInstance(actual, CheckerBase) + + def test_set_use_collection_method(self) -> None: + """ + Tests the method which let us define that we want ti interact with the + collection. + """ + + given = False + expected = False + + self.checker.set_use_collection(given) + + actual = self.checker.use_collection + + self.assertEqual(expected, actual) + + def test_set_use_collection_init(self) -> None: + """ + Tests the definition of the :code:`use_collection` attribute + through the class constructor. + """ + + given = True + expected = True + + checker = CheckerBase(use_collection=given) + + actual = checker.use_collection + + self.assertEqual(expected, actual) + + def test_set_use_collection_not_bool(self) -> None: + """ + Tests the case that we want to overwrite the + :code:`use_collection` attribute with a non-boolean value. + """ + + given = ["Hello", "World!"] + + self.assertRaises(TypeError, lambda: self.checker.set_use_collection(given)) + + def test_guess_and_set_use_collection(self) -> None: + """ + Tests the method that let us guess and set the collection from the + configuration. + """ + + config_loader = ConfigLoader() + config_loader.set_custom_config({"lookup": {"collection": True}}).start() + + self.checker.guess_and_set_use_collection() + actual = self.checker.use_collection + expected = True + + self.assertEqual(expected, actual) + + def test_guess_and_set_use_collection_not_boolean(self) -> None: + """ + Tests the method that let us guess and set the collection from the + configuration. + + In this case, we test the case that the given value is not a boolean. + """ + + config_loader = ConfigLoader() + config_loader.set_custom_config({"lookup": {"collection": None}}).start() + + self.checker.guess_and_set_use_collection() + actual = self.checker.use_collection + expected = False + + self.assertEqual(expected, actual) + + del config_loader + + def test_guess_and_set_use_collection_no_configuration(self) -> None: + """ + Tests the method that let us guess and set the collection from the + configuration. + + In this case, we test the case that no configuration is loaded. + """ + + self.checker.guess_and_set_use_collection() + actual = self.checker.use_collection + expected = False + + self.assertEqual(expected, actual) + def test_get_status(self) -> None: """ Tests the method which let us get the status. diff --git a/tests/checker/test_params_base.py b/tests/checker/test_params_base.py index 5fb9a68c..61f4ff23 100644 --- a/tests/checker/test_params_base.py +++ b/tests/checker/test_params_base.py @@ -81,8 +81,9 @@ def test_to_dict(self) -> None: """ self.params.do_syntax_check_first = True + self.params.use_collection = False - expected = {"do_syntax_check_first": True} + expected = {"do_syntax_check_first": True, "use_collection": False} actual = self.params.to_dict() @@ -95,9 +96,11 @@ def test_to_json(self) -> None: """ self.params.do_syntax_check_first = True + self.params.use_collection = False expected = """{ - "do_syntax_check_first": true + "do_syntax_check_first": true, + "use_collection": false }""" actual = self.params.to_json() diff --git a/tests/helpers/test_command.py b/tests/helpers/test_command.py index 6ee99595..56557fdc 100644 --- a/tests/helpers/test_command.py +++ b/tests/helpers/test_command.py @@ -294,7 +294,7 @@ def test_run(self) -> None: expected = ["Hello, World!"] actual = list(CommandHelper("echo 'Hello, World!'").run()) - self.assertEqual(expected, actual) + self.assertEqual(expected, actual[:1]) def test_run_to_stdout(self) -> None: """ diff --git a/tests/pyf_test_dataset.py b/tests/pyf_test_dataset.py index 262e04b6..7ffb10bd 100644 --- a/tests/pyf_test_dataset.py +++ b/tests/pyf_test_dataset.py @@ -375,6 +375,11 @@ "testing_mode": {"availability": True, "reputation": False, "syntax": False}, "whois_db": True, }, + "collection": { + "url_base": "http://localhost:8080", + "push": False, + "preferred_data_origin": "frequent", + }, "debug": {"active": False, "level": "info"}, "dns": { "follow_server_order": True, @@ -453,6 +458,7 @@ "special": True, "timeout": 5, "whois": True, + "collection": False, }, "share_logs": False, "user_agent": {"browser": "chrome", "custom": None, "platform": "linux"}, diff --git a/tests/query/test_collection.py b/tests/query/test_collection.py new file mode 100644 index 00000000..fc8ea58a --- /dev/null +++ b/tests/query/test_collection.py @@ -0,0 +1,1050 @@ +""" +The tool to check the availability or syntax of domain, IP or URL. + +:: + + + ██████╗ ██╗ ██╗███████╗██╗ ██╗███╗ ██╗ ██████╗███████╗██████╗ ██╗ ███████╗ + ██╔══██╗╚██╗ ██╔╝██╔════╝██║ ██║████╗ ██║██╔════╝██╔════╝██╔══██╗██║ ██╔════╝ + ██████╔╝ ╚████╔╝ █████╗ ██║ ██║██╔██╗ ██║██║ █████╗ ██████╔╝██║ █████╗ + ██╔═══╝ ╚██╔╝ ██╔══╝ ██║ ██║██║╚██╗██║██║ ██╔══╝ ██╔══██╗██║ ██╔══╝ + ██║ ██║ ██║ ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗ + ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝ + +Tests of the Collection query tool. + +Author: + Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom + +Special thanks: + https://pyfunceble.github.io/special-thanks.html + +Contributors: + https://pyfunceble.github.io/contributors.html + +Project link: + https://github.com/funilrys/PyFunceble + +Project documentation: + https://pyfunceble.readthedocs.io/en/latest/ + +Project homepage: + https://pyfunceble.github.io/ + +License: +:: + + + Copyright 2017, 2018, 2019, 2020, 2021 Nissar Chababy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +# pylint: disable=too-many-lines + +import json +import os +import secrets +import unittest +import unittest.mock +from datetime import datetime + +import requests +import requests.models + +from PyFunceble.checker.availability.status import AvailabilityCheckerStatus +from PyFunceble.config.loader import ConfigLoader +from PyFunceble.query.collection import CollectionQueryTool + + +class TestCollectionQueryTool(unittest.TestCase): + """ + Tests the Collection query tool. + """ + + def setUp(self) -> None: + """ + Sets everything needed by the tests. + """ + + self.query_tool = CollectionQueryTool() + + self.response_dataset = { + "subject": "example.net", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "status": { + "syntax": { + "latest": { + "status": "VALID", + "status_source": "SYNTAX", + "tested_at": "2021-09-28T19:32:07.167Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + }, + "frequent": "VALID", + }, + "availability": { + "latest": { + "status": "ACTIVE", + "status_source": "WHOIS", + "tested_at": "2021-09-28T19:32:07.167Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + }, + "frequent": "ACTIVE", + }, + "reputation": { + "latest": { + "status": "SANE", + "status_source": "REPUTATION", + "tested_at": "2021-09-28T19:32:07.167Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + }, + "frequent": "SANE", + }, + "whois": { + "expiration_date": "2021-09-28T19:32:07.167Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "subject_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + }, + }, + } + + self.status_dataset = { + "status": "ACTIVE", + "status_source": "WHOIS", + "tested_at": "2021-09-28T20:55:41.730Z", + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "subject_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + } + + self.availability_status_dataset = { + "checker_type": "AVAILABILITY", + "dns_lookup": {"NS": ["a.iana-servers.net.", "b.iana-servers.net."]}, + "dns_lookup_record": { + "dns_name": "example.com.", + "follow_nameserver_order": True, + "nameserver": "9.9.9.9", + "port": 53, + "preferred_protocol": "UDP", + "query_record_type": "NS", + "query_timeout": 5.0, + "response": ["a.iana-servers.net.", "b.iana-servers.net."], + "subject": "example.com", + "used_protocol": "UDP", + }, + "domain_syntax": True, + "expiration_date": None, + "http_status_code": None, + "idna_subject": "example.com", + "ip_syntax": False, + "ipv4_range_syntax": False, + "ipv4_syntax": False, + "ipv6_range_syntax": False, + "ipv6_syntax": False, + "netinfo": None, + "params": { + "do_syntax_check_first": False, + "use_dns_lookup": True, + "use_extra_rules": True, + "use_http_code_lookup": True, + "use_netinfo_lookup": True, + "use_reputation_lookup": False, + "use_whois_db": True, + "use_whois_lookup": False, + }, + "second_level_domain_syntax": True, + "status": "ACTIVE", + "status_after_extra_rules": None, + "status_before_extra_rules": None, + "status_source": "DNSLOOKUP", + "status_source_after_extra_rules": None, + "status_source_before_extra_rules": None, + "subdomain_syntax": False, + "subject": "example.com", + "tested_at": datetime.fromisoformat("2021-03-09T17:42:15.771647"), + "url_syntax": False, + "whois_lookup_record": { + "expiration_date": None, + "port": 43, + "query_timeout": 5.0, + "record": None, + "server": None, + "subject": "example.com", + }, + "whois_record": None, + } + return super().setUp() + + def tearDown(self) -> None: + """ + Destroys everything needed by the tests. + """ + + del self.query_tool + del self.response_dataset + del self.availability_status_dataset + + def test_set_token_return(self) -> None: + """ + Tests the response from the method which let us set the token to work + with. + """ + + given = secrets.token_urlsafe(6) + + actual = self.query_tool.set_token(given) + + self.assertIsInstance(actual, CollectionQueryTool) + + def test_set_token_method(self) -> None: + """ + Tests the method which let us set the token to work with. + """ + + given = secrets.token_urlsafe(6) + expected = given + + self.query_tool.set_token(given) + actual = self.query_tool.token + + self.assertEqual(expected, actual) + + def test_set_token_attribute(self) -> None: + """ + Tests the overwritting of the token attribute. + """ + + given = secrets.token_urlsafe(6) + expected = given + + self.query_tool.token = given + actual = self.query_tool.token + + self.assertEqual(expected, actual) + + def test_set_token_through_init(self) -> None: + """ + Tests the overwritting of the token to work through the class + constructor. + """ + + given = secrets.token_urlsafe(6) + expected = given + + query_tool = CollectionQueryTool(token=given) + actual = query_tool.token + + self.assertEqual(expected, actual) + + def test_set_token_through_init_environment_variable_not_given(self) -> None: + """ + Tests the overwritting of the token to work through the class + constructor. + + In this test we test the case that nothing is given or declared. + """ + + if "PYFUNCEBLE_COLLECTION_API_TOKEN" in os.environ: + del os.environ["PYFUNCEBLE_COLLECTION_API_TOKEN"] + + expected = "" + + query_tool = CollectionQueryTool(token=None) + actual = query_tool.token + + self.assertEqual(expected, actual) + + def test_set_token_through_init_environment_variable_given(self) -> None: + """ + Tests the overwritting of the token to work through the class + constructor. + + In this test we test the case that the environment variable is given. + """ + + given = secrets.token_urlsafe(6) + expected = given + + os.environ["PYFUNCEBLE_COLLECTION_API_TOKEN"] = given + + query_tool = CollectionQueryTool(token=None) + actual = query_tool.token + + self.assertEqual(expected, actual) + + def test_set_token_not_str(self) -> None: + """ + Tests the method which let us set the token to work with for the case + that the given token is not a :py:class:`str`. + """ + + given = ["Hello", "World!"] + + self.assertRaises(TypeError, lambda: self.query_tool.set_token(given)) + + def test_set_url_base_return(self) -> None: + """ + Tests the response from the method which let us set the URL to work + from. + """ + + given = "https://example.org" + + actual = self.query_tool.set_url_base(given) + + self.assertIsInstance(actual, CollectionQueryTool) + + def test_set_url_base_method(self) -> None: + """ + Tests the method which let us set the URL to work from. + """ + + given = "https://example.org" + expected = given + + self.query_tool.set_url_base(given) + actual = self.query_tool.url_base + + self.assertEqual(expected, actual) + + def test_set_url_base_attribute(self) -> None: + """ + Tests the overwritting of the url_base attribute. + """ + + given = "https://example.org" + expected = given + + self.query_tool.url_base = given + actual = self.query_tool.url_base + + self.assertEqual(expected, actual) + + def test_set_url_base_through_init(self) -> None: + """ + Tests the overwritting of the URL to work from through the class + constructor. + """ + + given = "https://example.net" + expected = given + + query_tool = CollectionQueryTool(url_base=given) + actual = query_tool.url_base + + self.assertEqual(expected, actual) + + def test_set_url_base_through_init_none_given(self) -> None: + """ + Tests the overwritting of the URL to work from through the class + constructor. + + In this test, we test the case that the URL base is not given. + """ + + given = None + expected = "http://localhost:8001" + + query_tool = CollectionQueryTool(url_base=given) + actual = query_tool.url_base + + self.assertEqual(expected, actual) + + def test_set_url_base_not_str(self) -> None: + """ + Tests the method which let us set the URL to work from for the case + that the given URL is not a :py:class:`str`. + """ + + given = ["Hello", "World!"] + + self.assertRaises(TypeError, lambda: self.query_tool.set_url_base(given)) + + def test_set_url_base_not_url(self) -> None: + """ + Tests the method which let us set the URL to work from for the case + that the given URL is not a supported URL. + """ + + given = "example.org" + + self.assertRaises(ValueError, lambda: self.query_tool.set_url_base(given)) + + def test_set_url_base_ends_with_slash(self) -> None: + """ + Tests the method which let us set the URL to work from for the case + that the given URL is not a supported URL. + """ + + given = "http://example.org/" + expected = "http://example.org" + + self.query_tool.url_base = given + actual = self.query_tool.url_base + + self.assertEqual(expected, actual) + + def test_guess_and_set_url_base(self) -> None: + """ + Tests the method which let us guess and set the URL base. + """ + + config_loader = ConfigLoader() + config_loader.set_custom_config( + {"collection": {"url_base": "https://example.org:8443"}} + ).start() + + self.query_tool.guess_and_set_url_base() + + expected = "https://example.org:8443" + actual = self.query_tool.url_base + + self.assertEqual(expected, actual) + + del config_loader + + def test_guess_and_set_url_base_not_str(self) -> None: + """ + Tests the method which let us guess and set the URL base. + """ + + config_loader = ConfigLoader() + config_loader.set_custom_config({"collection": {"url_base": False}}).start() + + self.query_tool.guess_and_set_url_base() + + expected = "http://localhost:8001" + actual = self.query_tool.url_base + + self.assertEqual(expected, actual) + + del config_loader + + def test_set_preferred_status_origin_return(self) -> None: + """ + Tests the response from the method which let us set the preferred status + origin. + """ + + given = "latest" + + actual = self.query_tool.set_preferred_status_origin(given) + + self.assertIsInstance(actual, CollectionQueryTool) + + def test_set_preferred_status_origin_method(self) -> None: + """ + Tests the method which let us set the preferred status origin. + """ + + given = "frequent" + expected = given + + self.query_tool.set_preferred_status_origin(given) + actual = self.query_tool.preferred_status_origin + + self.assertEqual(expected, actual) + + def test_set_preferred_status_origin_attribute(self) -> None: + """ + Tests the overwritting of the the preferred status origin. + """ + + given = "latest" + expected = given + + self.query_tool.preferred_status_origin = given + actual = self.query_tool.preferred_status_origin + + self.assertEqual(expected, actual) + + def test_setpreferred_status_origin_through_init(self) -> None: + """ + Tests the overwritting of the preferred status origin through the class + constructor. + """ + + given = "frequent" + expected = given + + query_tool = CollectionQueryTool(preferred_status_origin=given) + actual = query_tool.preferred_status_origin + + self.assertEqual(expected, actual) + + def test_set_preferred_status_origin_through_init_none_given(self) -> None: + """ + Tests the overwritting of the preferred status origin through the class + constructor. + + In this test, we test the case that nothing is given. + """ + + given = None + expected = "frequent" + + query_tool = CollectionQueryTool(preferred_status_origin=given) + actual = query_tool.preferred_status_origin + + self.assertEqual(expected, actual) + + def test_set_preferred_status_origin_not_str(self) -> None: + """ + Tests the method which let us set the preferred status origin for the case + that the given value is not a :py:class:`str`. + """ + + given = ["Hello", "World!"] + + self.assertRaises( + TypeError, lambda: self.query_tool.set_preferred_status_origin(given) + ) + + def test_set_preferred_status_origin_not_supported(self) -> None: + """ + Tests the method which let us set the URL to work from for the case + that the given URL is not a supported URL. + """ + + given = "hello" + + self.assertRaises( + ValueError, lambda: self.query_tool.set_preferred_status_origin(given) + ) + + def test_guess_and_set_preferred_status_origin(self) -> None: + """ + Tests the method which let us guess and set the preferred status origin. + """ + + config_loader = ConfigLoader() + config_loader.set_custom_config( + {"collection": {"preferred_status_origin": "latest"}} + ).start() + + self.query_tool.guess_and_set_preferred_status_origin() + + expected = "latest" + actual = self.query_tool.preferred_status_origin + + self.assertEqual(expected, actual) + + del config_loader + + def test_guess_and_set_preferred_status_origin_not_str(self) -> None: + """ + Tests the method which let us guess and set the preferred status origin. + """ + + config_loader = ConfigLoader() + config_loader.set_custom_config( + {"collection": {"preferred_status_origin": None}} + ).start() + + self.query_tool.guess_and_set_preferred_status_origin() + + expected = "frequent" + actual = self.query_tool.preferred_status_origin + + self.assertEqual(expected, actual) + + del config_loader + + @unittest.mock.patch.object(requests.Session, "post") + def test_collection_contain(self, request_mock) -> None: + """ + Tests the method which let us pull the subject from the collection. + """ + + response_dict = self.response_dataset + response_dict["subject"] = "example.com" + + def mocking(*args, **kwargs): # pylint: disable=unused-argument + response_content = json.dumps(response_dict) + + response = requests.models.Response() + response.url = "https://example.org/v1/search" + response.status_code = 200 + + # pylint: disable=protected-access + response._content = str.encode(response_content) + + response.history = [response] + + return response + + self.query_tool.url_base = "https://example.org" + request_mock.side_effect = mocking + + expected = True + actual = "example.com" in self.query_tool + + self.assertEqual(expected, actual) + + @unittest.mock.patch.object(requests.Session, "post") + def test_collection_not_contain(self, request_mock) -> None: + """ + Tests the method which let us pull the subject from the collection. + """ + + response_dict = {"detail": "Invalid subject."} + + def mocking(*args, **kwargs): # pylint: disable=unused-argument + response_content = json.dumps(response_dict) + + response = requests.models.Response() + response.url = "https://example.org/v1/search" + response.status_code = 404 + + # pylint: disable=protected-access + response._content = str.encode(response_content) + + response.history = [response] + + return response + + self.query_tool.url_base = "https://example.org" + request_mock.side_effect = mocking + + expected = False + actual = "example.com" in self.query_tool + + self.assertEqual(expected, actual) + + @unittest.mock.patch.object(requests.Session, "post") + def test_getitem(self, request_mock) -> None: + """ + Tests the method which let us pull the subject from the collection. + """ + + response_dict = self.response_dataset + response_dict["subject"] = "example.org" + + def mocking(*args, **kwargs): # pylint: disable=unused-argument + response_content = json.dumps(response_dict) + + response = requests.models.Response() + response.url = "https://example.org/v1/search" + response.status_code = 200 + + # pylint: disable=protected-access + response._content = str.encode(response_content) + + response.history = [response] + + return response + + self.query_tool.url_base = "https://example.org" + request_mock.side_effect = mocking + + expected = response_dict + actual = self.query_tool["example.org"] + + self.assertEqual(expected, actual) + + @unittest.mock.patch.object(requests.Session, "post") + def test_getitem_not_found(self, request_mock) -> None: + """ + Tests the method which let us pull the subject from the collection. + """ + + response_dict = {"detail": "Invalid subject."} + + def mocking(*args, **kwargs): # pylint: disable=unused-argument + response_content = json.dumps(response_dict) + + response = requests.models.Response() + response.url = "https://example.org/v1/search" + response.status_code = 404 + + # pylint: disable=protected-access + response._content = str.encode(response_content) + + response.history = [response] + + return response + + self.query_tool.url_base = "https://example.org" + request_mock.side_effect = mocking + + expected = None + actual = self.query_tool["example.de"] + + self.assertEqual(expected, actual) + + @unittest.mock.patch.object(requests.Session, "post") + def test_pull(self, request_mock) -> None: + """ + Tests the method which let us pull the subject from the collection. + """ + + response_dict = self.response_dataset + response_dict["subject"] = "example.net" + + def mocking(*args, **kwargs): # pylint: disable=unused-argument + response_content = json.dumps(response_dict) + + response = requests.models.Response() + response.url = "https://example.org/v1/search" + response.status_code = 200 + + # pylint: disable=protected-access + response._content = str.encode(response_content) + + response.history = [response] + + return response + + self.query_tool.url_base = "https://example.org" + request_mock.side_effect = mocking + + expected = response_dict + actual = self.query_tool.pull("example.net") + + self.assertEqual(expected, actual) + + @unittest.mock.patch.object(requests.Session, "post") + def test_pull_subject_not_found(self, request_mock) -> None: + """ + Tests the method which let us pull the subject from the collection. + + In this test case we check what happens when a subject is not found. + """ + + response_dict = {"detail": "Invalid subject."} + + def mocking(*args, **kwargs): # pylint: disable=unused-argument + response_content = json.dumps(response_dict) + + response = requests.models.Response() + response.url = "https://example.org/v1/search" + response.status_code = 404 + + # pylint: disable=protected-access + response._content = str.encode(response_content) + + response.history = [response] + + return response + + self.query_tool.url_base = "https://example.org" + request_mock.side_effect = mocking + + expected = None + actual = self.query_tool.pull("example.net") + + self.assertEqual(expected, actual) + + @unittest.mock.patch.object(requests.Session, "post") + def test_pull_subject_no_json_response(self, request_mock) -> None: + """ + Tests the method which let us pull the subject from the collection. + + In this test case we check what happens when no JSON response is given. + """ + + def mocking(*args, **kwargs): # pylint: disable=unused-argument + response_content = "I'm a teapot." + + response = requests.models.Response() + response.url = "https://example.org/v1/search" + response.status_code = 418 + + # pylint: disable=protected-access + response._content = str.encode(response_content) + + response.history = [response] + + return response + + self.query_tool.url_base = "https://example.org" + request_mock.side_effect = mocking + + expected = None + actual = self.query_tool.pull("example.net") + + self.assertEqual(expected, actual) + + def test_pull_subject_not_str(self) -> None: + """ + Tests the method which let us pull the subject from the collection. + + In this test we test the case that the given subject is not a + :py:class:`str`. + """ + + self.query_tool.url_base = "https://example.org" + + self.assertRaises(TypeError, lambda: self.query_tool.pull(284)) + + @unittest.mock.patch.object(requests.Session, "post") + def test_push(self, request_mock) -> None: + """ + Tests the method which let us push some dataset into the collection. + """ + + response_dict = self.response_dataset + response_dict["subject"] = "example.net" + + def mocking(*args, **kwargs): # pylint: disable=unused-argument + response_content = json.dumps(response_dict) + + response = requests.models.Response() + response.url = "https://example.org/v1/status/availability" + response.status_code = 200 + + # pylint: disable=protected-access + response._content = str.encode(response_content) + + response.history = [response] + + return response + + self.query_tool.url_base = "https://example.org" + self.query_tool.token = secrets.token_urlsafe(6) + + request_mock.side_effect = mocking + + expected = response_dict + actual = self.query_tool.push( + AvailabilityCheckerStatus(**self.availability_status_dataset) + ) + + self.assertEqual(expected, actual) + + @unittest.mock.patch.object(requests.Session, "post") + def test_push_no_json_response(self, request_mock) -> None: + """ + Tests the method which let us push some dataset into the collection. + + In this test case, we test the case that the response is not JSON + encoded. + """ + + response_dict = self.response_dataset + response_dict["subject"] = "example.net" + + def mocking(*args, **kwargs): # pylint: disable=unused-argument + response_content = "I'm a teapot." + + response = requests.models.Response() + response.url = "https://example.org/v1/status/availability" + response.status_code = 418 + + # pylint: disable=protected-access + response._content = str.encode(response_content) + + response.history = [response] + + return response + + self.query_tool.url_base = "https://example.org" + request_mock.side_effect = mocking + + expected = None + actual = self.query_tool.push( + AvailabilityCheckerStatus(**self.availability_status_dataset) + ) + + self.assertEqual(expected, actual) + + @unittest.mock.patch.object(requests.Session, "post") + def test_push_with_whois(self, request_mock) -> None: + """ + Tests the method which let us push some dataset into the collection. + """ + + response_dict = self.response_dataset + response_dict["subject"] = "example.net" + self.availability_status_dataset["expiration_date"] = "23-nov-2090" + + def mocking(*args, **kwargs): # pylint: disable=unused-argument + response_content = json.dumps(response_dict) + + response = requests.models.Response() + response.url = "https://example.org/v1/status/availability" + response.status_code = 200 + + # pylint: disable=protected-access + response._content = str.encode(response_content) + + response.history = [response] + + return response + + self.query_tool.url_base = "https://example.org" + self.query_tool.token = secrets.token_urlsafe(6) + request_mock.side_effect = mocking + + expected = response_dict + actual = self.query_tool.push( + AvailabilityCheckerStatus(**self.availability_status_dataset) + ) + + self.assertEqual(expected, actual) + + @unittest.mock.patch.object(requests.Session, "post") + def test_push_with_whois_no_json_response(self, request_mock) -> None: + """ + Tests the method which let us push some dataset into the collection. + """ + + response_dict = self.response_dataset + response_dict["subject"] = "example.net" + self.availability_status_dataset["expiration_date"] = "23-nov-2090" + + def mocking(*args, **kwargs): # pylint: disable=unused-argument + response_content = "I'm a teapot." + + response = requests.models.Response() + response.url = "https://example.org/v1/status/availability" + response.status_code = 418 + + # pylint: disable=protected-access + response._content = str.encode(response_content) + + response.history = [response] + + return response + + self.query_tool.url_base = "https://example.org" + self.query_tool.token = secrets.token_urlsafe(6) + request_mock.side_effect = mocking + + expected = None + actual = self.query_tool.push( + AvailabilityCheckerStatus(**self.availability_status_dataset) + ) + + self.assertEqual(expected, actual) + + def test_push_with_whois_token_not_given(self) -> None: + """ + Tests the method which let us push some dataset into the collection. + + In this test, we test the case that no token is given. + """ + + response_dict = self.response_dataset + response_dict["subject"] = "example.net" + self.availability_status_dataset["expiration_date"] = "23-nov-2090" + + if "PYFUNCEBLE_COLLECTION_API_TOKEN" in os.environ: + del os.environ["PYFUNCEBLE_COLLECTION_API_TOKEN"] + + self.query_tool.token = "" + + expected = None + actual = self.query_tool.push( + AvailabilityCheckerStatus(**self.availability_status_dataset) + ) + + self.assertEqual(expected, actual) + + def test_push_subject_not_str(self) -> None: + """ + Tests the method which let us push some dataset into the collection. + + In this test, we test the case that the given subject is not a string. + """ + + self.availability_status_dataset["subject"] = 293 + + self.assertRaises( + TypeError, + lambda: self.query_tool.push( + AvailabilityCheckerStatus(**self.availability_status_dataset) + ), + ) + + def test_push_checker_status_not_correct(self) -> None: + """ + Tests the method which let us push some dataset into the collection. + + In this test, we test the case that the given checker status is not + correct. + """ + + self.availability_status_dataset["subject"] = "foo.example.org" + + self.assertRaises( + TypeError, + lambda: self.query_tool.push(self.availability_status_dataset), + ) + + def test_push_subject_empty_str(self) -> None: + """ + Tests the method which let us push some dataset into the collection. + + In this test, we test the case that the given subject is an empty string. + """ + + self.availability_status_dataset["subject"] = "" + + self.assertRaises( + ValueError, + lambda: self.query_tool.push( + AvailabilityCheckerStatus(**self.availability_status_dataset) + ), + ) + + def test_push_checker_type_not_str(self) -> None: + """ + Tests the method which let us push some dataset into the collection. + + In this test, we test the case that the given subject is not a string. + """ + + self.availability_status_dataset["checker_type"] = 987 + + self.assertRaises( + TypeError, + lambda: self.query_tool.push( + AvailabilityCheckerStatus(**self.availability_status_dataset) + ), + ) + + def test_push_checker_type_not_supported(self) -> None: + """ + Tests the method which let us push some dataset into the collection. + + In this test, we test the case that the given subject is not a string. + """ + + self.availability_status_dataset["checker_type"] = "GIT" + self.query_tool.token = secrets.token_urlsafe(6) + + self.assertRaises( + ValueError, + lambda: self.query_tool.push( + AvailabilityCheckerStatus(**self.availability_status_dataset) + ), + ) + + def test_push_token_not_given(self) -> None: + """ + Tests the method which let us push some dataset into the collection. + + In this test, we test the case that no token is given. + """ + + if "PYFUNCEBLE_COLLECTION_API_TOKEN" in os.environ: + del os.environ["PYFUNCEBLE_COLLECTION_API_TOKEN"] + + self.query_tool.token = "" + + expected = None + actual = self.query_tool.push( + AvailabilityCheckerStatus(**self.availability_status_dataset) + ) + + self.assertEqual(expected, actual) diff --git a/version.yaml b/version.yaml index cb622b07..e0a3dded 100644 --- a/version.yaml +++ b/version.yaml @@ -1,4 +1,4 @@ -current_version: '4.0.1.dev (Blue Duckling: Chestnut)' +current_version: '4.0.2. (Blue Duckling: Grandiflora)' deprecated: - 3.0.21 - 3.1.20