Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/ps-bulk-reprocess' into 1203-ind…
Browse files Browse the repository at this point in the history
…ividually-select-and-reprocess-DSRs-that-have-errored
  • Loading branch information
chriscalhoun1974 committed Oct 18, 2022
2 parents 28d3e33 + 68a8fa1 commit d446fe7
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 22 deletions.
110 changes: 91 additions & 19 deletions src/fides/api/ops/api/v1/endpoints/privacy_request_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from fides.api.ops.api.v1.urn_registry import (
PRIVACY_REQUEST_ACCESS_MANUAL_WEBHOOK_INPUT,
PRIVACY_REQUEST_APPROVE,
PRIVACY_REQUEST_BULK_RETRY,
PRIVACY_REQUEST_DENY,
PRIVACY_REQUEST_MANUAL_ERASURE,
PRIVACY_REQUEST_MANUAL_INPUT,
Expand Down Expand Up @@ -996,6 +997,68 @@ async def resume_with_erasure_confirmation(
)


@router.post(
PRIVACY_REQUEST_BULK_RETRY,
status_code=HTTP_200_OK,
response_model=BulkPostPrivacyRequests,
dependencies=[
Security(verify_oauth_client, scopes=[PRIVACY_REQUEST_CALLBACK_RESUME])
],
)
async def bulk_restart_privacy_request_from_failure(
privacy_request_ids: List[str],
*,
db: Session = Depends(deps.get_db),
) -> BulkPostPrivacyRequests:
"""Bulk restart a of privacy request from failure."""

succeeded: List[PrivacyRequestResponse] = []
failed: List[Dict[str, Any]] = []

# privacy_request = PrivacyRequest.get(db, object_id=request_id)

for privacy_request_id in privacy_request_ids:
privacy_request = PrivacyRequest.get(db, object_id=privacy_request_id)

if not privacy_request:
failed.append(
{
"message": f"No privacy request found with id '{privacy_request_id}'",
"data": {"privacy_request_id": privacy_request_id},
}
)
continue

if privacy_request.status != PrivacyRequestStatus.error:
failed.append(
{
"message": f"Cannot restart privacy request from failure: privacy request '{privacy_request.id}' status = {privacy_request.status.value}.",
"data": {"privacy_request_id": privacy_request_id},
}
)
continue

failed_details: Optional[
CheckpointActionRequired
] = privacy_request.get_failed_checkpoint_details()
if not failed_details:
failed.append(
{
"message": f"Cannot restart privacy request from failure '{privacy_request.id}'; no failed step or collection.",
"data": {"privacy_request_id": privacy_request_id},
}
)
continue

succeeded.append(
_process_privacy_request_restart(
privacy_request, failed_details.step, failed_details.collection, db
)
)

return BulkPostPrivacyRequests(succeeded=succeeded, failed=failed)


@router.post(
PRIVACY_REQUEST_RETRY,
status_code=HTTP_200_OK,
Expand Down Expand Up @@ -1029,27 +1092,10 @@ async def restart_privacy_request_from_failure(
detail=f"Cannot restart privacy request from failure '{privacy_request.id}'; no failed step or collection.",
)

failed_step: CurrentStep = failed_details.step
failed_collection: Optional[CollectionAddress] = failed_details.collection

logger.info(
"Restarting failed privacy request '%s' from '%s step, 'collection '%s'",
privacy_request_id,
failed_step,
failed_collection,
return _process_privacy_request_restart(
privacy_request, failed_details.step, failed_details.collection, db
)

privacy_request.status = PrivacyRequestStatus.in_processing
privacy_request.save(db=db)
queue_privacy_request(
privacy_request_id=privacy_request.id,
from_step=failed_step.value,
)

privacy_request.cache_failed_checkpoint_details() # Reset failed step and collection to None

return privacy_request


def review_privacy_request(
db: Session,
Expand Down Expand Up @@ -1434,3 +1480,29 @@ async def resume_privacy_request_from_requires_input(
)

return privacy_request


def _process_privacy_request_restart(
privacy_request: PrivacyRequest,
failed_step: CurrentStep,
failed_collection: Optional[CollectionAddress],
db: Session,
) -> PrivacyRequestResponse:

logger.info(
"Restarting failed privacy request '%s' from '%s step, 'collection '%s'",
privacy_request.id,
failed_step,
failed_collection,
)

privacy_request.status = PrivacyRequestStatus.in_processing
privacy_request.save(db=db)
queue_privacy_request(
privacy_request_id=privacy_request.id,
from_step=failed_step.value,
)

privacy_request.cache_failed_checkpoint_details() # Reset failed step and collection to None

return privacy_request
1 change: 1 addition & 0 deletions src/fides/api/ops/api/v1/urn_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
# Privacy request URLs
PRIVACY_REQUESTS = "/privacy-request"
PRIVACY_REQUEST_APPROVE = "/privacy-request/administrate/approve"
PRIVACY_REQUEST_BULK_RETRY = "/privacy-request/bulk/retry"
PRIVACY_REQUEST_DENY = "/privacy-request/administrate/deny"
REQUEST_STATUS_LOGS = "/privacy-request/{privacy_request_id}/log"
PRIVACY_REQUEST_VERIFY_IDENTITY = "/privacy-request/{privacy_request_id}/verify"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import math
import time
from typing import Any, Dict, Optional

import jwt.utils
from requests import PreparedRequest
Expand Down Expand Up @@ -42,13 +43,15 @@ def add_authentication(
Generate a Doordash JWT and add it as bearer auth
"""

secrets = connection_config.secrets
secrets: Optional[Dict[str, Any]] = connection_config.secrets

token = jwt.encode(
{
"aud": "doordash",
"iss": assign_placeholders(self.developer_id, secrets),
"kid": assign_placeholders(self.key_id, secrets),
"iss": assign_placeholders(self.developer_id, secrets)
if secrets
else None,
"kid": assign_placeholders(self.key_id, secrets) if secrets else None,
"exp": str(math.floor(time.time() + 60)),
"iat": str(math.floor(time.time())),
},
Expand Down
163 changes: 163 additions & 0 deletions tests/ops/api/v1/endpoints/test_privacy_request_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
DATASETS,
PRIVACY_REQUEST_ACCESS_MANUAL_WEBHOOK_INPUT,
PRIVACY_REQUEST_APPROVE,
PRIVACY_REQUEST_BULK_RETRY,
PRIVACY_REQUEST_DENY,
PRIVACY_REQUEST_MANUAL_ERASURE,
PRIVACY_REQUEST_MANUAL_INPUT,
Expand Down Expand Up @@ -2574,6 +2575,168 @@ def test_resume_with_manual_count(
privacy_request.delete(db)


class TestBulkRestartFromFailure:
@pytest.fixture(scope="function")
def url(self):
return f"{V1_URL_PREFIX}{PRIVACY_REQUEST_BULK_RETRY}"

def test_restart_from_failure_not_authenticated(self, api_client, url):
data = ["1234", "5678"]
response = api_client.post(url, json=data, headers={})
assert response.status_code == 401

def test_restart_from_failure_wrong_scope(
self, api_client, url, generate_auth_header
):
auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_READ])
data = ["1234", "5678"]

response = api_client.post(url, json=data, headers=auth_header)
assert response.status_code == 403

@pytest.mark.usefixtures("privacy_requests")
def test_restart_from_failure_not_errored(
self, api_client, url, generate_auth_header
):
auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_CALLBACK_RESUME])
data = ["1234", "5678"]

response = api_client.post(url, json=data, headers=auth_header)
assert response.status_code == 200

assert response.json()["succeeded"] == []

failed_ids = [
x["data"]["privacy_request_id"] for x in response.json()["failed"]
]
assert sorted(failed_ids) == sorted(data)

def test_restart_from_failure_no_stopped_step(
self, api_client, url, generate_auth_header, db, privacy_requests
):
auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_CALLBACK_RESUME])
data = [privacy_requests[0].id]

privacy_requests[0].status = PrivacyRequestStatus.error
privacy_requests[0].save(db)

response = api_client.post(url, json=data, headers=auth_header)

assert response.status_code == 200
assert response.json()["succeeded"] == []

failed_ids = [
x["data"]["privacy_request_id"] for x in response.json()["failed"]
]

assert privacy_requests[0].id in failed_ids

@mock.patch(
"fides.api.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
def test_restart_from_failure_from_specific_collection(
self, submit_mock, api_client, url, generate_auth_header, db, privacy_requests
):
auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_CALLBACK_RESUME])
data = [privacy_requests[0].id]
privacy_requests[0].status = PrivacyRequestStatus.error
privacy_requests[0].save(db)

privacy_requests[0].cache_failed_checkpoint_details(
step=CurrentStep.access,
collection=CollectionAddress("test_dataset", "test_collection"),
)

response = api_client.post(url, json=data, headers=auth_header)
assert response.status_code == 200

db.refresh(privacy_requests[0])
assert privacy_requests[0].status == PrivacyRequestStatus.in_processing
assert response.json()["failed"] == []

succeeded_ids = [x["id"] for x in response.json()["succeeded"]]

assert privacy_requests[0].id in succeeded_ids

submit_mock.assert_called_with(
privacy_request_id=privacy_requests[0].id,
from_step=CurrentStep.access.value,
from_webhook_id=None,
)

@mock.patch(
"fides.api.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
def test_restart_from_failure_outside_graph(
self, submit_mock, api_client, url, generate_auth_header, db, privacy_requests
):
auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_CALLBACK_RESUME])
data = [privacy_requests[0].id]
privacy_requests[0].status = PrivacyRequestStatus.error
privacy_requests[0].save(db)

privacy_requests[0].cache_failed_checkpoint_details(
step=CurrentStep.erasure_email_post_send,
collection=None,
)

response = api_client.post(url, json=data, headers=auth_header)
assert response.status_code == 200

db.refresh(privacy_requests[0])
assert privacy_requests[0].status == PrivacyRequestStatus.in_processing
assert response.json()["failed"] == []

succeeded_ids = [x["id"] for x in response.json()["succeeded"]]

assert privacy_requests[0].id in succeeded_ids

submit_mock.assert_called_with(
privacy_request_id=privacy_requests[0].id,
from_step=CurrentStep.erasure_email_post_send.value,
from_webhook_id=None,
)

@mock.patch(
"fides.api.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
def test_mixed_result(
self, submit_mock, api_client, url, generate_auth_header, db, privacy_requests
):
auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_CALLBACK_RESUME])
data = [privacy_requests[0].id, privacy_requests[1].id]
privacy_requests[0].status = PrivacyRequestStatus.error
privacy_requests[0].save(db)

privacy_requests[0].cache_failed_checkpoint_details(
step=CurrentStep.access,
collection=CollectionAddress("test_dataset", "test_collection"),
)

privacy_requests[1].status = PrivacyRequestStatus.error
privacy_requests[1].save(db)

response = api_client.post(url, json=data, headers=auth_header)
assert response.status_code == 200

db.refresh(privacy_requests[0])
assert privacy_requests[0].status == PrivacyRequestStatus.in_processing

succeeded_ids = [x["id"] for x in response.json()["succeeded"]]
failed_ids = [
x["data"]["privacy_request_id"] for x in response.json()["failed"]
]

assert privacy_requests[0].id in succeeded_ids
assert privacy_requests[1].id in failed_ids

submit_mock.assert_called_with(
privacy_request_id=privacy_requests[0].id,
from_step=CurrentStep.access.value,
from_webhook_id=None,
)


class TestRestartFromFailure:
@pytest.fixture(scope="function")
def url(self, db, privacy_request):
Expand Down

0 comments on commit d446fe7

Please sign in to comment.