From dc61e67ad823482a838819d1f2c7000cc40e84d2 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Thu, 3 Jul 2025 22:34:50 -0700 Subject: [PATCH 1/2] python(feat): Add support for start and end times to rule evaluation --- .../sift_py/rule_evaluation/_service_test.py | 97 +++++++++++++++++++ python/lib/sift_py/rule_evaluation/service.py | 76 +++++++++++++-- 2 files changed, 163 insertions(+), 10 deletions(-) diff --git a/python/lib/sift_py/rule_evaluation/_service_test.py b/python/lib/sift_py/rule_evaluation/_service_test.py index aa1a2a6c..51e40c7e 100644 --- a/python/lib/sift_py/rule_evaluation/_service_test.py +++ b/python/lib/sift_py/rule_evaluation/_service_test.py @@ -45,12 +45,16 @@ def test_evaluate_and_preview_rule_identifiers_against_run(rule_evaluation_servi assert request.run.id == run_id assert request.rules.rules.ids.ids[0] == rule_identifiers[0].rule_id assert request.rules.rules.ids.ids[1] == rule_identifiers[1].rule_id + assert request.run_time_range.start_time.seconds == 0 + assert request.run_time_range.end_time.seconds == 0 rule_evaluation_service.preview_against_run(run_id, rule_identifiers) request = mock_stub.mock_calls[1].args[0] assert request.run.id == run_id assert request.rules.rules.ids.ids[0] == rule_identifiers[0].rule_id assert request.rules.rules.ids.ids[1] == rule_identifiers[1].rule_id + assert request.run_time_range.start_time.seconds == 0 + assert request.run_time_range.end_time.seconds == 0 def test_evaluate_and_preview_report_template_against_run(rule_evaluation_service): @@ -68,11 +72,51 @@ def test_evaluate_and_preview_report_template_against_run(rule_evaluation_servic assert request.report_name == report_name assert request.run.id == run_id assert request.report_template.report_template.id == report_template.template_id + assert request.run_time_range.start_time.seconds == 0 + assert request.run_time_range.end_time.seconds == 0 rule_evaluation_service.preview_against_run(run_id, report_template) request = mock_stub.mock_calls[1].args[0] assert request.run.id == run_id assert request.report_template.report_template.id == report_template.template_id + assert request.run_time_range.start_time.seconds == 0 + assert request.run_time_range.end_time.seconds == 0 + + +def test_evaluate_and_preview_report_template_against_run_with_start_end_times( + rule_evaluation_service, +): + mock_stub = MagicMock() + rule_evaluation_service._rule_evaluation_stub = mock_stub + mock_stub.EvaluateRules.return_value = EvaluateRulesResponse(report_id="test_report_id") + + run_id = "test_run_id" + report_name = "test_report" + report_template = ReportTemplateConfig(name=report_name, template_id="template-id") + start_time = datetime(2025, 1, 1, 1, 1, 1, tzinfo=timezone.utc) + end_time = datetime(2025, 1, 2, 1, 1, 59, tzinfo=timezone.utc) + + report = rule_evaluation_service.evaluate_against_run( + run_id, + report_template, + report_name, + start_time=start_time, + end_time=end_time, + ) + request = mock_stub.mock_calls[0].args[0] + assert report.report_id == "test_report_id" + assert request.report_name == report_name + assert request.run_time_range.run.id == run_id + assert request.run_time_range.start_time.seconds == int(start_time.timestamp()) + assert request.run_time_range.end_time.seconds == int(end_time.timestamp()) + assert request.report_template.report_template.id == report_template.template_id + + rule_evaluation_service.preview_against_run(run_id, report_template, start_time, end_time) + request = mock_stub.mock_calls[1].args[0] + assert request.report_template.report_template.id == report_template.template_id + assert request.run_time_range.run.id == run_id + assert request.run_time_range.start_time.seconds == int(start_time.timestamp()) + assert request.run_time_range.end_time.seconds == int(end_time.timestamp()) def test_evaluate_and_preview_rule_configs_against_run(rule_evaluation_service): @@ -106,12 +150,65 @@ def test_evaluate_and_preview_rule_configs_against_run(rule_evaluation_service): assert request.run.id == run_id assert request.rules.rules.client_keys.client_keys[0] == rule_configs[0].rule_client_key assert request.rules.rules.client_keys.client_keys[1] == rule_configs[1].rule_client_key + assert request.run_time_range.start_time.seconds == 0 + assert request.run_time_range.end_time.seconds == 0 rule_evaluation_service.preview_against_run(run_id, rule_configs) request = mock_stub.mock_calls[1].args[0] assert request.run.id == run_id assert request.rules.rules.client_keys.client_keys[0] == rule_configs[0].rule_client_key assert request.rules.rules.client_keys.client_keys[1] == rule_configs[1].rule_client_key + assert request.run_time_range.start_time.seconds == 0 + assert request.run_time_range.end_time.seconds == 0 + + +def test_evaluate_and_preview_rule_configs_against_run_with_start_end_times( + rule_evaluation_service, +): + mock_stub = MagicMock() + rule_evaluation_service._rule_evaluation_stub = mock_stub + mock_stub.EvaluateRules.return_value = EvaluateRulesResponse(report_id="test_report_id") + + run_id = "test_run_id" + report_name = "test_report" + rule_configs = [ + RuleConfig( + name="rule1", + rule_client_key="key1", + channel_references=[], + expression="$1 == 1", + action=RuleActionCreateDataReviewAnnotation(), + ), + RuleConfig( + name="rule2", + rule_client_key="key2", + channel_references=[], + expression="$2 == 2", + action=RuleActionCreateDataReviewAnnotation(), + ), + ] + start_time = datetime(2025, 1, 1, 1, 1, 1, tzinfo=timezone.utc) + end_time = datetime(2025, 1, 2, 1, 1, 59, tzinfo=timezone.utc) + + report = rule_evaluation_service.evaluate_against_run( + run_id, rule_configs, report_name, start_time, end_time + ) + request = mock_stub.mock_calls[0].args[0] + assert report.report_id == "test_report_id" + assert request.report_name == report_name + assert request.rules.rules.client_keys.client_keys[0] == rule_configs[0].rule_client_key + assert request.rules.rules.client_keys.client_keys[1] == rule_configs[1].rule_client_key + assert request.run_time_range.run.id == run_id + assert request.run_time_range.start_time.seconds == int(start_time.timestamp()) + assert request.run_time_range.end_time.seconds == int(end_time.timestamp()) + + rule_evaluation_service.preview_against_run(run_id, rule_configs, start_time, end_time) + request = mock_stub.mock_calls[1].args[0] + assert request.rules.rules.client_keys.client_keys[0] == rule_configs[0].rule_client_key + assert request.rules.rules.client_keys.client_keys[1] == rule_configs[1].rule_client_key + assert request.run_time_range.run.id == run_id + assert request.run_time_range.start_time.seconds == int(start_time.timestamp()) + assert request.run_time_range.end_time.seconds == int(end_time.timestamp()) def test_evaluate_rules_against_assets(rule_evaluation_service): diff --git a/python/lib/sift_py/rule_evaluation/service.py b/python/lib/sift_py/rule_evaluation/service.py index 677cf667..06af93f0 100644 --- a/python/lib/sift_py/rule_evaluation/service.py +++ b/python/lib/sift_py/rule_evaluation/service.py @@ -20,6 +20,7 @@ EvaluateRulesPreviewResponse, EvaluateRulesRequest, EvaluateRulesResponse, + RunTimeRange, ) from sift.rule_evaluation.v1.rule_evaluation_pb2_grpc import RuleEvaluationServiceStub from sift_py._internal.time import to_timestamp_pb @@ -49,6 +50,8 @@ def evaluate_against_run( run_id: str, rules: Union[ReportTemplateConfig, List[RuleConfig], List[RuleIdentifier]], report_name: str = "", + start_time: Optional[Union[datetime, str, int]] = None, + end_time: Optional[Union[datetime, str, int]] = None, ) -> ReportService: """Evaluate a set of rules against a run. @@ -57,16 +60,19 @@ def evaluate_against_run( rules: Either a ReportTemplateConfig, a list of RuleConfigs, or a list of RuleIdentifiers (typically from `RuleService.create_external_rules`). report_name: Optional report name. + start_time: Optional start time to evaluate. + end_time: Optional end time to evaluate. Returns: A ReportService object that can be use to get the status of the executed report. """ - eval_kwargs = self._get_rules_kwargs(rules) + rules_kwargs = self._get_rules_kwargs(rules) + run_kwargs = self._get_run_kwargs(run_id, start_time, end_time) req = EvaluateRulesRequest( report_name=report_name, - run=ResourceIdentifier(id=run_id), - **eval_kwargs, + **rules_kwargs, + **run_kwargs, ) res = cast(EvaluateRulesResponse, self._rule_evaluation_stub.EvaluateRules(req)) @@ -98,12 +104,12 @@ def evaluate_against_assets( start_time=to_timestamp_pb(start_time), end_time=to_timestamp_pb(end_time), ) - eval_kwargs = self._get_rules_kwargs(rules) + rules_kwargs = self._get_rules_kwargs(rules) req = EvaluateRulesRequest( report_name=report_name, assets=asset_time_range, - **eval_kwargs, + **rules_kwargs, ) res = cast(EvaluateRulesResponse, self._rule_evaluation_stub.EvaluateRules(req)) @@ -113,6 +119,8 @@ def preview_against_run( self, run_id: str, rules: Union[ReportTemplateConfig, List[RuleConfig], List[RuleIdentifier]], + start_time: Optional[Union[datetime, str, int]] = None, + end_time: Optional[Union[datetime, str, int]] = None, ) -> EvaluateRulesPreviewResponse: """Preview the evaluation of a set of rules against a run. @@ -120,15 +128,18 @@ def preview_against_run( run_id: The Run ID to run against. rules: Either a ReportTemplateConfig, a list of RuleConfigs, or a list of RuleIdentifiers (typically from `RuleService.create_external_rules`). + start_time: Optional start time to evaluate. + end_time: Optional end time to evaluate. Returns: The EvaluateRulesPreviewResponse object. """ eval_kwargs = self._get_rules_kwargs(rules) + run_kwargs = self._get_run_kwargs(run_id, start_time, end_time) req = EvaluateRulesPreviewRequest( - run=ResourceIdentifier(id=run_id), **eval_kwargs, + **run_kwargs, ) return self._rule_evaluation_stub.EvaluateRulesPreview(req) @@ -138,6 +149,8 @@ def evaluate_external_rules( run_id: str, rules: List[RuleConfig], report_name: str = "", + start_time: Optional[Union[datetime, str, int]] = None, + end_time: Optional[Union[datetime, str, int]] = None, ) -> ReportService: """Evaluate a set of external rules against a run. @@ -145,12 +158,14 @@ def evaluate_external_rules( run_id: The Run ID to run against. rules: A list of RuleConfigs. These must be external rules. report_name: Optional report name. + start_time: Optional start time to evaluate. + end_time: Optional end time to evaluate. Returns: A Report object that can be use to get the status of the executed report. """ rule_ids = self._rule_service.create_external_rules(rules) - return self.evaluate_against_run(run_id, rule_ids, report_name) + return self.evaluate_against_run(run_id, rule_ids, report_name, start_time, end_time) def evaluate_external_rules_from_yaml( self, @@ -158,6 +173,8 @@ def evaluate_external_rules_from_yaml( paths: List[Path], named_expressions: Optional[Dict[str, str]] = None, report_name: str = "", + start_time: Optional[Union[datetime, str, int]] = None, + end_time: Optional[Union[datetime, str, int]] = None, ) -> ReportService: """Evaluate a set of external rules from a YAML config against a run. @@ -165,35 +182,43 @@ def evaluate_external_rules_from_yaml( run_id: The Run ID to run against. paths: The YAML paths to load rules from. report_name: Optional report name. + start_time: Optional start time to evaluate. + end_time: Optional end time to evaluate. Returns: A Report object that can be use to get the status of the executed report. """ rule_ids = self._rule_service.create_external_rules_from_yaml(paths, named_expressions) - return self.evaluate_against_run(run_id, rule_ids, report_name) + return self.evaluate_against_run(run_id, rule_ids, report_name, start_time, end_time) def preview_external_rules( self, run_id: str, rules: List[RuleConfig], + start_time: Optional[Union[datetime, str, int]] = None, + end_time: Optional[Union[datetime, str, int]] = None, ) -> EvaluateRulesPreviewResponse: """Preview the evaluation a set of external rules against a run. Args: run_id: The Run ID to run against. rules: A list of RuleConfigs. These must be external rules. + start_time: Optional start time to evaluate. + end_time: Optional end time to evaluate. Returns: The EvaluateRulesPreviewResponse object. """ rule_ids = self._rule_service.create_external_rules(rules) - return self.preview_against_run(run_id, rule_ids) + return self.preview_against_run(run_id, rule_ids, start_time, end_time) def preview_external_rules_from_yaml( self, run_id: str, paths: List[Path], named_expressions: Optional[Dict[str, str]] = None, + start_time: Optional[Union[datetime, str, int]] = None, + end_time: Optional[Union[datetime, str, int]] = None, ) -> EvaluateRulesPreviewResponse: """Preview the evaluation a set of external rules from a YAML config against a run. @@ -201,12 +226,14 @@ def preview_external_rules_from_yaml( run_id: The Run ID to run against. paths: The YAML paths to load rules from. named_expressions: The named expressions to substitute in the rules. + start_time: Optional start time to evaluate. + end_time: Optional end time to evaluate. Returns: The EvaluateRulesPreviewResponse object. """ rule_ids = self._rule_service.create_external_rules_from_yaml(paths, named_expressions) - return self.preview_against_run(run_id, rule_ids) + return self.preview_against_run(run_id, rule_ids, start_time, end_time) def _get_rules_kwargs( self, rules: Union[ReportTemplateConfig, List[RuleConfig], List[RuleIdentifier]] @@ -259,3 +286,32 @@ def _get_rules_kwargs( } raise ValueError("Invalid rules argument") + + def _get_run_kwargs( + self, + run_id: str, + start_time: Optional[Union[datetime, str, int]] = None, + end_time: Optional[Union[datetime, str, int]] = None, + ) -> dict: + """Returns the Run specific keyword arguments for a EvalutateRules request based on the input type. + + Args: + run_id: The Run ID to run against. + start_time: Optional start time to evaluate. + end_time: Optional end time to evaluate. + + Returns: + dict: The keyword arguments. + """ + run = ResourceIdentifier(id=run_id) + + if start_time or end_time: + return { + "run_time_range": RunTimeRange( + run=run, + start_time=to_timestamp_pb(start_time) if start_time else None, + end_time=to_timestamp_pb(end_time) if end_time else None, + ) + } + else: + return {"run": run} From 830362bf7c9ac2908c638df556f7702d3bb41093 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Mon, 7 Jul 2025 22:07:13 -0700 Subject: [PATCH 2/2] Update time args to support floats --- python/lib/sift_py/_internal/time.py | 4 +- python/lib/sift_py/rule_evaluation/service.py | 64 +++++++++---------- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/python/lib/sift_py/_internal/time.py b/python/lib/sift_py/_internal/time.py index 8ff926e9..cd277a6b 100644 --- a/python/lib/sift_py/_internal/time.py +++ b/python/lib/sift_py/_internal/time.py @@ -29,7 +29,7 @@ def to_timestamp_nanos(arg: Union[TimestampPb, pd.Timestamp, datetime, str, int] return cast(pd.Timestamp, pd.Timestamp(arg)) -def to_timestamp_pb(arg: Union[datetime, str, int]) -> TimestampPb: +def to_timestamp_pb(arg: Union[datetime, str, int, float]) -> TimestampPb: """ Mainly used for testing at the moment. If using this for non-testing purposes should probably make this more robust and support nano-second precision. @@ -40,7 +40,7 @@ def to_timestamp_pb(arg: Union[datetime, str, int]) -> TimestampPb: if isinstance(arg, datetime): ts.FromDatetime(arg) return ts - elif isinstance(arg, int): + elif isinstance(arg, (int, float)): ts.FromDatetime(datetime.fromtimestamp(arg)) return ts else: diff --git a/python/lib/sift_py/rule_evaluation/service.py b/python/lib/sift_py/rule_evaluation/service.py index 06af93f0..d5343168 100644 --- a/python/lib/sift_py/rule_evaluation/service.py +++ b/python/lib/sift_py/rule_evaluation/service.py @@ -50,8 +50,8 @@ def evaluate_against_run( run_id: str, rules: Union[ReportTemplateConfig, List[RuleConfig], List[RuleIdentifier]], report_name: str = "", - start_time: Optional[Union[datetime, str, int]] = None, - end_time: Optional[Union[datetime, str, int]] = None, + start_time: Optional[Union[datetime, str, int, float]] = None, + end_time: Optional[Union[datetime, str, int, float]] = None, ) -> ReportService: """Evaluate a set of rules against a run. @@ -60,8 +60,8 @@ def evaluate_against_run( rules: Either a ReportTemplateConfig, a list of RuleConfigs, or a list of RuleIdentifiers (typically from `RuleService.create_external_rules`). report_name: Optional report name. - start_time: Optional start time to evaluate. - end_time: Optional end time to evaluate. + start_time: Optional start time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). + end_time: Optional end time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). Returns: A ReportService object that can be use to get the status of the executed report. @@ -81,8 +81,8 @@ def evaluate_against_run( def evaluate_against_assets( self, asset_names: List[str], - start_time: Union[datetime, str, int], - end_time: Union[datetime, str, int], + start_time: Union[datetime, str, int, float], + end_time: Union[datetime, str, int, float], rules: Union[ReportTemplateConfig, List[RuleConfig], List[RuleIdentifier]], report_name: str = "", ) -> ReportService: @@ -90,8 +90,8 @@ def evaluate_against_assets( Args: asset_names: The list of assets to run against. - start_time: The start time to evaluate. - end_time: The end time to evaluate. + start_time: The start time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). + end_time: The end time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). rules: Either a ReportTemplateConfig, a list of RuleConfigs, or a list of RuleIdentifiers (typically from `RuleService.create_external_rules`). report_name: Optional report name. @@ -119,8 +119,8 @@ def preview_against_run( self, run_id: str, rules: Union[ReportTemplateConfig, List[RuleConfig], List[RuleIdentifier]], - start_time: Optional[Union[datetime, str, int]] = None, - end_time: Optional[Union[datetime, str, int]] = None, + start_time: Optional[Union[datetime, str, int, float]] = None, + end_time: Optional[Union[datetime, str, int, float]] = None, ) -> EvaluateRulesPreviewResponse: """Preview the evaluation of a set of rules against a run. @@ -128,8 +128,8 @@ def preview_against_run( run_id: The Run ID to run against. rules: Either a ReportTemplateConfig, a list of RuleConfigs, or a list of RuleIdentifiers (typically from `RuleService.create_external_rules`). - start_time: Optional start time to evaluate. - end_time: Optional end time to evaluate. + start_time: Optional start time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). + end_time: Optional end time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). Returns: The EvaluateRulesPreviewResponse object. @@ -149,8 +149,8 @@ def evaluate_external_rules( run_id: str, rules: List[RuleConfig], report_name: str = "", - start_time: Optional[Union[datetime, str, int]] = None, - end_time: Optional[Union[datetime, str, int]] = None, + start_time: Optional[Union[datetime, str, int, float]] = None, + end_time: Optional[Union[datetime, str, int, float]] = None, ) -> ReportService: """Evaluate a set of external rules against a run. @@ -158,8 +158,8 @@ def evaluate_external_rules( run_id: The Run ID to run against. rules: A list of RuleConfigs. These must be external rules. report_name: Optional report name. - start_time: Optional start time to evaluate. - end_time: Optional end time to evaluate. + start_time: Optional start time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). + end_time: Optional end time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). Returns: A Report object that can be use to get the status of the executed report. @@ -173,8 +173,8 @@ def evaluate_external_rules_from_yaml( paths: List[Path], named_expressions: Optional[Dict[str, str]] = None, report_name: str = "", - start_time: Optional[Union[datetime, str, int]] = None, - end_time: Optional[Union[datetime, str, int]] = None, + start_time: Optional[Union[datetime, str, int, float]] = None, + end_time: Optional[Union[datetime, str, int, float]] = None, ) -> ReportService: """Evaluate a set of external rules from a YAML config against a run. @@ -182,8 +182,8 @@ def evaluate_external_rules_from_yaml( run_id: The Run ID to run against. paths: The YAML paths to load rules from. report_name: Optional report name. - start_time: Optional start time to evaluate. - end_time: Optional end time to evaluate. + start_time: Optional start time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). + end_time: Optional end time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). Returns: A Report object that can be use to get the status of the executed report. @@ -195,16 +195,16 @@ def preview_external_rules( self, run_id: str, rules: List[RuleConfig], - start_time: Optional[Union[datetime, str, int]] = None, - end_time: Optional[Union[datetime, str, int]] = None, + start_time: Optional[Union[datetime, str, int, float]] = None, + end_time: Optional[Union[datetime, str, int, float]] = None, ) -> EvaluateRulesPreviewResponse: """Preview the evaluation a set of external rules against a run. Args: run_id: The Run ID to run against. rules: A list of RuleConfigs. These must be external rules. - start_time: Optional start time to evaluate. - end_time: Optional end time to evaluate. + start_time: Optional start time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). + end_time: Optional end time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). Returns: The EvaluateRulesPreviewResponse object. @@ -217,8 +217,8 @@ def preview_external_rules_from_yaml( run_id: str, paths: List[Path], named_expressions: Optional[Dict[str, str]] = None, - start_time: Optional[Union[datetime, str, int]] = None, - end_time: Optional[Union[datetime, str, int]] = None, + start_time: Optional[Union[datetime, str, int, float]] = None, + end_time: Optional[Union[datetime, str, int, float]] = None, ) -> EvaluateRulesPreviewResponse: """Preview the evaluation a set of external rules from a YAML config against a run. @@ -226,8 +226,8 @@ def preview_external_rules_from_yaml( run_id: The Run ID to run against. paths: The YAML paths to load rules from. named_expressions: The named expressions to substitute in the rules. - start_time: Optional start time to evaluate. - end_time: Optional end time to evaluate. + start_time: Optional start time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). + end_time: Optional end time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). Returns: The EvaluateRulesPreviewResponse object. @@ -290,15 +290,15 @@ def _get_rules_kwargs( def _get_run_kwargs( self, run_id: str, - start_time: Optional[Union[datetime, str, int]] = None, - end_time: Optional[Union[datetime, str, int]] = None, + start_time: Optional[Union[datetime, str, int, float]] = None, + end_time: Optional[Union[datetime, str, int, float]] = None, ) -> dict: """Returns the Run specific keyword arguments for a EvalutateRules request based on the input type. Args: run_id: The Run ID to run against. - start_time: Optional start time to evaluate. - end_time: Optional end time to evaluate. + start_time: Optional start time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). + end_time: Optional end time to evaluate (datetime, ISO 8601 formatted string, or POSIX timestamp). Returns: dict: The keyword arguments.