From 33016fc8421597b387f5d0d51f8d9d276adedbb2 Mon Sep 17 00:00:00 2001 From: Christinarlong Date: Mon, 30 Sep 2024 10:43:49 -0700 Subject: [PATCH 01/13] Create forward shims for sentry apps tasks --- src/sentry/sentry_apps/tasks/sentry_apps.py | 116 ++++++++++++++++++ src/sentry/sentry_apps/tasks/service_hooks.py | 14 +++ 2 files changed, 130 insertions(+) create mode 100644 src/sentry/sentry_apps/tasks/sentry_apps.py create mode 100644 src/sentry/sentry_apps/tasks/service_hooks.py diff --git a/src/sentry/sentry_apps/tasks/sentry_apps.py b/src/sentry/sentry_apps/tasks/sentry_apps.py new file mode 100644 index 00000000000000..f14c7545582cab --- /dev/null +++ b/src/sentry/sentry_apps/tasks/sentry_apps.py @@ -0,0 +1,116 @@ +from collections.abc import Mapping +from typing import Any + +from sentry.eventstore.models import Event +from sentry.tasks.base import instrumented_task +from sentry.tasks.sentry_apps import CONTROL_TASK_OPTIONS, TASK_OPTIONS +from sentry.tasks.sentry_apps import build_comment_webhook as old_build_comment_webhook +from sentry.tasks.sentry_apps import clear_region_cache as old_clear_region_cache +from sentry.tasks.sentry_apps import ( + create_or_update_service_hooks_for_sentry_app as old_create_or_update_service_hooks_for_sentry_app, +) +from sentry.tasks.sentry_apps import installation_webhook as old_installation_webhook +from sentry.tasks.sentry_apps import ( + process_resource_change_bound as old_process_resource_change_bound, +) +from sentry.tasks.sentry_apps import retry_decorator +from sentry.tasks.sentry_apps import send_alert_event as old_send_alert_event +from sentry.tasks.sentry_apps import ( + send_resource_change_webhook as old_send_resource_change_webhook, +) +from sentry.tasks.sentry_apps import workflow_notification as old_workflow_notification + + +@instrumented_task(name="sentry.sentry_apps.tasks.sentry_apps.send_alert_event", **TASK_OPTIONS) +@retry_decorator +def send_alert_event( + event: Event, + rule: str, + sentry_app_id: int, + additional_payload_key: str | None = None, + additional_payload: Mapping[str, Any] | None = None, +) -> None: + old_send_alert_event( + event=event, + rule=rule, + sentry_app_id=sentry_app_id, + additional_payload_key=additional_payload_key, + additional_payload=additional_payload, + ) + + +@instrumented_task( + "sentry.sentry_apps.tasks.sentry_apps.process_resource_change_bound", bind=True, **TASK_OPTIONS +) +@retry_decorator +def process_resource_change_bound(self, action, sender, instance_id, *args, **kwargs): + old_process_resource_change_bound( + self=self, action=action, sender=sender, instance_id=instance_id, *args, **kwargs + ) + + +@instrumented_task( + name="sentry.sentry_apps.tasks.sentry_apps.installation_webhook", **CONTROL_TASK_OPTIONS +) +@retry_decorator +def installation_webhook(installation_id, user_id, *args, **kwargs): + old_installation_webhook(installation_id=installation_id, user_id=user_id, *args, **kwargs) + + +@instrumented_task( + name="sentry.sentry_apps.tasks.sentry_apps.clear_region_cache", **CONTROL_TASK_OPTIONS +) +def clear_region_cache(sentry_app_id: int, region_name: str) -> None: + old_clear_region_cache(sentry_app_id=sentry_app_id, region_name=region_name) + + +@instrumented_task( + name="sentry.sentry_apps.tasks.sentry_apps.workflow_notification", **TASK_OPTIONS +) +@retry_decorator +def workflow_notification(installation_id, issue_id, type, user_id, *args, **kwargs): + old_workflow_notification( + installation_id=installation_id, + issue_id=issue_id, + type=type, + user_id=user_id, + *args, + **kwargs, + ) + + +@instrumented_task( + name="sentry.sentry_apps.tasks.sentry_apps.build_comment_webhook", **TASK_OPTIONS +) +@retry_decorator +def build_comment_webhook(installation_id, issue_id, type, user_id, *args, **kwargs): + old_build_comment_webhook( + installation_id=installation_id, + issue_id=issue_id, + type=type, + user_id=user_id, + *args, + **kwargs, + ) + + +@instrumented_task( + "sentry.sentry_apps.tasks.sentry_apps.send_process_resource_change_webhook", **TASK_OPTIONS +) +@retry_decorator +def send_resource_change_webhook(installation_id, event, data, *args, **kwargs): + old_send_resource_change_webhook( + installation_id=installation_id, event=event, data=data, *args, **kwargs + ) + + +@instrumented_task( + "sentry.sentry_apps.tasks.sentry_apps.create_or_update_service_hooks_for_sentry_app", + **CONTROL_TASK_OPTIONS, +) +def create_or_update_service_hooks_for_sentry_app( + sentry_app_id: int, webhook_url: str, events: list[str], **kwargs: dict +) -> None: + old_create_or_update_service_hooks_for_sentry_app( + sentry_app_id=sentry_app_id, webhook_url=webhook_url, events=events, **kwargs + ) diff --git a/src/sentry/sentry_apps/tasks/service_hooks.py b/src/sentry/sentry_apps/tasks/service_hooks.py new file mode 100644 index 00000000000000..cd9def6898102c --- /dev/null +++ b/src/sentry/sentry_apps/tasks/service_hooks.py @@ -0,0 +1,14 @@ +from sentry.silo.base import SiloMode +from sentry.tasks.base import instrumented_task, retry +from sentry.tasks.servicehooks import process_service_hook as old_process_service_hook + + +@instrumented_task( + name="sentry.sentry_apps.tasks.service_hooks.process_service_hook", + default_retry_delay=60 * 5, + max_retries=5, + silo_mode=SiloMode.REGION, +) +@retry +def process_service_hook(servicehook_id, event, **kwargs): + old_process_service_hook(servicehook_id=servicehook_id, event=event, **kwargs) From 5fffaa4810cca9e5289eea471f7fc31bd6ca343c Mon Sep 17 00:00:00 2001 From: Christinarlong Date: Mon, 30 Sep 2024 11:28:55 -0700 Subject: [PATCH 02/13] move logic to new tasks --- src/sentry/sentry_apps/tasks/sentry_apps.py | 454 ++++++++++++++++++-- src/sentry/tasks/sentry_apps.py | 454 ++------------------ 2 files changed, 454 insertions(+), 454 deletions(-) diff --git a/src/sentry/sentry_apps/tasks/sentry_apps.py b/src/sentry/sentry_apps/tasks/sentry_apps.py index f14c7545582cab..c51ee6cf9b4968 100644 --- a/src/sentry/sentry_apps/tasks/sentry_apps.py +++ b/src/sentry/sentry_apps/tasks/sentry_apps.py @@ -1,24 +1,97 @@ +from __future__ import annotations + +import logging +from collections import defaultdict from collections.abc import Mapping from typing import Any -from sentry.eventstore.models import Event -from sentry.tasks.base import instrumented_task -from sentry.tasks.sentry_apps import CONTROL_TASK_OPTIONS, TASK_OPTIONS -from sentry.tasks.sentry_apps import build_comment_webhook as old_build_comment_webhook -from sentry.tasks.sentry_apps import clear_region_cache as old_clear_region_cache -from sentry.tasks.sentry_apps import ( - create_or_update_service_hooks_for_sentry_app as old_create_or_update_service_hooks_for_sentry_app, +from celery import current_task +from django.urls import reverse +from requests.exceptions import RequestException + +from sentry import analytics +from sentry.api.serializers import serialize +from sentry.constants import SentryAppInstallationStatus +from sentry.eventstore.models import Event, GroupEvent +from sentry.hybridcloud.rpc.caching import region_caching_service +from sentry.models.activity import Activity +from sentry.models.group import Group +from sentry.models.organization import Organization +from sentry.models.organizationmapping import OrganizationMapping +from sentry.models.project import Project +from sentry.sentry_apps.api.serializers.app_platform_event import AppPlatformEvent +from sentry.sentry_apps.models.sentry_app import VALID_EVENTS, SentryApp +from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation +from sentry.sentry_apps.models.servicehook import ServiceHook, ServiceHookProject +from sentry.sentry_apps.services.app.service import ( + app_service, + get_by_application_id, + get_installation, ) -from sentry.tasks.sentry_apps import installation_webhook as old_installation_webhook -from sentry.tasks.sentry_apps import ( - process_resource_change_bound as old_process_resource_change_bound, +from sentry.shared_integrations.exceptions import ApiHostError, ApiTimeoutError, ClientError +from sentry.silo.base import SiloMode +from sentry.tasks.base import instrumented_task, retry +from sentry.users.services.user.service import user_service +from sentry.utils import metrics +from sentry.utils.http import absolute_uri +from sentry.utils.sentry_apps import send_and_save_webhook_request +from sentry.utils.sentry_apps.service_hook_manager import ( + create_or_update_service_hooks_for_installation, ) -from sentry.tasks.sentry_apps import retry_decorator -from sentry.tasks.sentry_apps import send_alert_event as old_send_alert_event -from sentry.tasks.sentry_apps import ( - send_resource_change_webhook as old_send_resource_change_webhook, + +logger = logging.getLogger("sentry.tasks.sentry_apps") + +TASK_OPTIONS = { + "queue": "app_platform", + "default_retry_delay": (60 * 5), # Five minutes. + "max_retries": 3, + "record_timing": True, + "silo_mode": SiloMode.REGION, +} +CONTROL_TASK_OPTIONS = { + "queue": "app_platform.control", + "default_retry_delay": (60 * 5), # Five minutes. + "max_retries": 3, + "silo_mode": SiloMode.CONTROL, +} + +retry_decorator = retry( + on=(RequestException, ApiHostError, ApiTimeoutError), + ignore=(ClientError,), ) -from sentry.tasks.sentry_apps import workflow_notification as old_workflow_notification + +# We call some models by a different name, publicly, than their class name. +# For example the model Group is called "Issue" in the UI. We want the Service +# Hook events to match what we externally call these primitives. +RESOURCE_RENAMES = {"Group": "issue"} + +TYPES = {"Group": Group, "Error": Event, "Comment": Activity} + + +def _webhook_event_data(event, group_id, project_id): + project = Project.objects.get_from_cache(id=project_id) + organization = Organization.objects.get_from_cache(id=project.organization_id) + + event_context = event.as_dict() + event_context["url"] = absolute_uri( + reverse( + "sentry-api-0-project-event-details", + args=[project.organization.slug, project.slug, event.event_id], + ) + ) + + event_context["web_url"] = absolute_uri( + reverse( + "sentry-organization-event-detail", args=[organization.slug, group_id, event.event_id] + ) + ) + + # The URL has a regex OR in it ("|") which means `reverse` cannot generate + # a valid URL (it can't know which option to pick). We have to manually + # create this URL for, that reason. + event_context["issue_url"] = absolute_uri(f"/api/0/issues/{group_id}/") + event_context["issue_id"] = str(group_id) + return event_context @instrumented_task(name="sentry.sentry_apps.tasks.sentry_apps.send_alert_event", **TASK_OPTIONS) @@ -30,13 +103,129 @@ def send_alert_event( additional_payload_key: str | None = None, additional_payload: Mapping[str, Any] | None = None, ) -> None: - old_send_alert_event( - event=event, - rule=rule, - sentry_app_id=sentry_app_id, - additional_payload_key=additional_payload_key, - additional_payload=additional_payload, + """ + When an incident alert is triggered, send incident data to the SentryApp's webhook. + :param event: The `Event` for which to build a payload. + :param rule: The AlertRule that was triggered. + :param sentry_app_id: The SentryApp to notify. + :param additional_payload_key: The key used to attach additional data to the webhook payload + :param additional_payload: The extra data attached to the payload body at the key specified by `additional_payload_key`. + :return: + """ + group = event.group + project = Project.objects.get_from_cache(id=group.project_id) + organization = Organization.objects.get_from_cache(id=project.organization_id) + + extra = { + "sentry_app_id": sentry_app_id, + "project_slug": project.slug, + "organization_slug": organization.slug, + "rule": rule, + } + + sentry_app = app_service.get_sentry_app_by_id(id=sentry_app_id) + if sentry_app is None: + logger.info("event_alert_webhook.missing_sentry_app", extra=extra) + return + + installations = app_service.get_many( + filter=dict( + organization_id=organization.id, + app_ids=[sentry_app.id], + status=SentryAppInstallationStatus.INSTALLED, + ) ) + if not installations: + logger.info("event_alert_webhook.missing_installation", extra=extra) + return + (install,) = installations + + event_context = _webhook_event_data(event, group.id, project.id) + + data = {"event": event_context, "triggered_rule": rule} + + # Attach extra payload to the webhook + if additional_payload_key and additional_payload: + data[additional_payload_key] = additional_payload + + request_data = AppPlatformEvent( + resource="event_alert", action="triggered", install=install, data=data + ) + + send_and_save_webhook_request(sentry_app, request_data) + + # On success, record analytic event for Alert Rule UI Component + if request_data.data.get("issue_alert"): + analytics.record( + "alert_rule_ui_component_webhook.sent", + organization_id=organization.id, + sentry_app_id=sentry_app_id, + event=f"{request_data.resource}.{request_data.action}", + ) + + +def _process_resource_change(action, sender, instance_id, retryer=None, *args, **kwargs): + # The class is serialized as a string when enqueueing the class. + model = TYPES[sender] + # The Event model has different hooks for the different event types. The sender + # determines which type eg. Error and therefore the 'name' eg. error + if issubclass(model, Event): + if not kwargs.get("instance"): + extra = {"sender": sender, "action": action, "event_id": instance_id} + logger.info("process_resource_change.event_missing_event", extra=extra) + return + name = sender.lower() + else: + # Some resources are named differently than their model. eg. Group vs Issue. + # Looks up the human name for the model. Defaults to the model name. + name = RESOURCE_RENAMES.get(model.__name__, model.__name__.lower()) + + # By default, use Celery's `current_task` but allow a value to be passed for the + # bound Task. + retryer = retryer or current_task + + # We may run into a race condition where this task executes before the + # transaction that creates the Group has committed. + try: + if issubclass(model, Event): + # XXX:(Meredith): Passing through the entire event was an intentional choice + # to avoid having to query NodeStore again for data we had previously in + # post_process. While this is not ideal, changing this will most likely involve + # an overhaul of how we do things in post_process, not just this task alone. + instance = kwargs.get("instance") + else: + instance = model.objects.get(id=instance_id) + except model.DoesNotExist as e: + # Explicitly requeue the task, so we don't report this to Sentry until + # we hit the max number of retries. + return retryer.retry(exc=e) + + event = f"{name}.{action}" + + if event not in VALID_EVENTS: + return + + org = None + + if isinstance(instance, (Group, Event, GroupEvent)): + org = Organization.objects.get_from_cache( + id=Project.objects.get_from_cache(id=instance.project_id).organization_id + ) + + installations = filter( + lambda i: event in i.sentry_app.events, + app_service.get_installed_for_organization(organization_id=org.id), + ) + + for installation in installations: + data = {} + if isinstance(instance, Event) or isinstance(instance, GroupEvent): + data[name] = _webhook_event_data(instance, instance.group_id, instance.project_id) + else: + data[name] = serialize(instance) + + # Trigger a new task for each webhook + send_resource_change_webhook.delay(installation_id=installation.id, event=event, data=data) @instrumented_task( @@ -44,9 +233,7 @@ def send_alert_event( ) @retry_decorator def process_resource_change_bound(self, action, sender, instance_id, *args, **kwargs): - old_process_resource_change_bound( - self=self, action=action, sender=sender, instance_id=instance_id, *args, **kwargs - ) + _process_resource_change(action, sender, instance_id, retryer=self, *args, **kwargs) @instrumented_task( @@ -54,14 +241,60 @@ def process_resource_change_bound(self, action, sender, instance_id, *args, **kw ) @retry_decorator def installation_webhook(installation_id, user_id, *args, **kwargs): - old_installation_webhook(installation_id=installation_id, user_id=user_id, *args, **kwargs) + from sentry.mediators.sentry_app_installations.installation_notifier import InstallationNotifier + + extra = {"installation_id": installation_id, "user_id": user_id} + try: + # we should send the webhook for pending installations on the install event in case that's part of the workflow + install = SentryAppInstallation.objects.get(id=installation_id) + except SentryAppInstallation.DoesNotExist: + logger.info("installation_webhook.missing_installation", extra=extra) + return + + user = user_service.get_user(user_id=user_id) + if not user: + logger.info("installation_webhook.missing_user", extra=extra) + return + + InstallationNotifier.run(install=install, user=user, action="created") @instrumented_task( name="sentry.sentry_apps.tasks.sentry_apps.clear_region_cache", **CONTROL_TASK_OPTIONS ) def clear_region_cache(sentry_app_id: int, region_name: str) -> None: - old_clear_region_cache(sentry_app_id=sentry_app_id, region_name=region_name) + try: + sentry_app = SentryApp.objects.get(id=sentry_app_id) + except SentryApp.DoesNotExist: + return + + # When a sentry app's definition changes purge cache for all the installations. + # This could get slow for large applications, but generally big applications don't change often. + install_query = SentryAppInstallation.objects.filter( + sentry_app=sentry_app, + ).values("id", "organization_id") + + # There isn't a constraint on org : sentryapp so we have to handle lists + install_map: dict[int, list[int]] = defaultdict(list) + for install_row in install_query: + install_map[install_row["organization_id"]].append(install_row["id"]) + + # Clear application_id cache + region_caching_service.clear_key( + key=get_by_application_id.key_from(sentry_app.application_id), region_name=region_name + ) + + # Limit our operations to the region this outbox is for. + # This could be a single query if we use raw_sql. + region_query = OrganizationMapping.objects.filter( + organization_id__in=list(install_map.keys()), region_name=region_name + ).values("organization_id") + for region_row in region_query: + installs = install_map[region_row["organization_id"]] + for install_id in installs: + region_caching_service.clear_key( + key=get_installation.key_from(install_id), region_name=region_name + ) @instrumented_task( @@ -69,13 +302,18 @@ def clear_region_cache(sentry_app_id: int, region_name: str) -> None: ) @retry_decorator def workflow_notification(installation_id, issue_id, type, user_id, *args, **kwargs): - old_workflow_notification( - installation_id=installation_id, - issue_id=issue_id, - type=type, + webhook_data = get_webhook_data(installation_id, issue_id, user_id) + if not webhook_data: + return + install, issue, user = webhook_data + data = kwargs.get("data", {}) + data.update({"issue": serialize(issue)}) + send_webhooks(installation=install, event=f"issue.{type}", data=data, actor=user) + analytics.record( + f"sentry_app.issue.{type}", user_id=user_id, - *args, - **kwargs, + group_id=issue_id, + installation_id=installation_id, ) @@ -84,24 +322,148 @@ def workflow_notification(installation_id, issue_id, type, user_id, *args, **kwa ) @retry_decorator def build_comment_webhook(installation_id, issue_id, type, user_id, *args, **kwargs): - old_build_comment_webhook( - installation_id=installation_id, - issue_id=issue_id, - type=type, + webhook_data = get_webhook_data(installation_id, issue_id, user_id) + if not webhook_data: + return + install, _, user = webhook_data + data = kwargs.get("data", {}) + project_slug = data.get("project_slug") + comment_id = data.get("comment_id") + payload = { + "comment_id": data.get("comment_id"), + "issue_id": issue_id, + "project_slug": data.get("project_slug"), + "timestamp": data.get("timestamp"), + "comment": data.get("comment"), + } + send_webhooks(installation=install, event=type, data=payload, actor=user) + # `type` is comment.created, comment.updated, or comment.deleted + analytics.record( + type, user_id=user_id, - *args, - **kwargs, + group_id=issue_id, + project_slug=project_slug, + installation_id=installation_id, + comment_id=comment_id, ) +def get_webhook_data(installation_id, issue_id, user_id): + extra = {"installation_id": installation_id, "issue_id": issue_id} + install = app_service.installation_by_id(id=installation_id) + if not install: + logger.info("workflow_notification.missing_installation", extra=extra) + return + + try: + issue = Group.objects.get(id=issue_id) + except Group.DoesNotExist: + logger.info("workflow_notification.missing_issue", extra=extra) + return + + user = None + if user_id: + user = user_service.get_user(user_id=user_id) + if not user: + logger.info("workflow_notification.missing_user", extra=extra) + + return (install, issue, user) + + @instrumented_task( "sentry.sentry_apps.tasks.sentry_apps.send_process_resource_change_webhook", **TASK_OPTIONS ) @retry_decorator def send_resource_change_webhook(installation_id, event, data, *args, **kwargs): - old_send_resource_change_webhook( - installation_id=installation_id, event=event, data=data, *args, **kwargs - ) + installation = app_service.installation_by_id(id=installation_id) + if not installation: + logger.info( + "send_process_resource_change_webhook.missing_installation", + extra={"installation_id": installation_id, "event": event}, + ) + return + + send_webhooks(installation, event, data=data) + + metrics.incr("resource_change.processed", sample_rate=1.0, tags={"change_event": event}) + + +def notify_sentry_app(event, futures): + for f in futures: + if not f.kwargs.get("sentry_app"): + continue + + extra_kwargs = { + "additional_payload_key": None, + "additional_payload": None, + } + # If the future comes from a rule with a UI component form in the schema, append the issue alert payload + settings = f.kwargs.get("schema_defined_settings") + if settings: + extra_kwargs["additional_payload_key"] = "issue_alert" + extra_kwargs["additional_payload"] = { + "id": f.rule.id, + "title": f.rule.label, + "sentry_app_id": f.kwargs["sentry_app"].id, + "settings": settings, + } + + send_alert_event.delay( + event=event, + rule=f.rule.label, + sentry_app_id=f.kwargs["sentry_app"].id, + **extra_kwargs, + ) + + +def send_webhooks(installation, event, **kwargs): + try: + servicehook = ServiceHook.objects.get( + organization_id=installation.organization_id, actor_id=installation.id + ) + except ServiceHook.DoesNotExist: + logger.info( + "send_webhooks.missing_servicehook", + extra={"installation_id": installation.id, "event": event}, + ) + return + + if event not in servicehook.events: + return + + # The service hook applies to all projects if there are no + # ServiceHookProject records. Otherwise we want check if + # the event is within the allowed projects. + project_limited = ServiceHookProject.objects.filter(service_hook_id=servicehook.id).exists() + + # TODO(nola): This is disabled for now, because it could potentially affect internal integrations w/ error.created + # # If the event is error.created & the request is going out to the Org that owns the Sentry App, + # # Make sure we don't send the request, to prevent potential infinite loops + # if ( + # event == "error.created" + # and installation.organization_id == installation.sentry_app.owner_id + # ): + # # We just want to exclude error.created from the project that the integration lives in + # # Need to first implement project mapping for integration partners + # metrics.incr( + # "webhook_request.dropped", + # tags={"sentry_app": installation.sentry_app.id, "event": event}, + # ) + # return + + if not project_limited: + resource, action = event.split(".") + + kwargs["resource"] = resource + kwargs["action"] = action + kwargs["install"] = installation + + request_data = AppPlatformEvent(**kwargs) + send_and_save_webhook_request( + installation.sentry_app, + request_data, + servicehook.sentry_app.webhook_url, + ) @instrumented_task( @@ -111,6 +473,10 @@ def send_resource_change_webhook(installation_id, event, data, *args, **kwargs): def create_or_update_service_hooks_for_sentry_app( sentry_app_id: int, webhook_url: str, events: list[str], **kwargs: dict ) -> None: - old_create_or_update_service_hooks_for_sentry_app( - sentry_app_id=sentry_app_id, webhook_url=webhook_url, events=events, **kwargs - ) + installations = SentryAppInstallation.objects.filter(sentry_app_id=sentry_app_id) + for installation in installations: + create_or_update_service_hooks_for_installation( + installation=installation, + events=events, + webhook_url=webhook_url, + ) diff --git a/src/sentry/tasks/sentry_apps.py b/src/sentry/tasks/sentry_apps.py index 8becf3e2f66b65..70162fe6d16b56 100644 --- a/src/sentry/tasks/sentry_apps.py +++ b/src/sentry/tasks/sentry_apps.py @@ -1,97 +1,24 @@ -from __future__ import annotations - -import logging -from collections import defaultdict from collections.abc import Mapping from typing import Any -from celery import current_task -from django.urls import reverse -from requests.exceptions import RequestException - -from sentry import analytics -from sentry.api.serializers import serialize -from sentry.constants import SentryAppInstallationStatus -from sentry.eventstore.models import Event, GroupEvent -from sentry.hybridcloud.rpc.caching import region_caching_service -from sentry.models.activity import Activity -from sentry.models.group import Group -from sentry.models.organization import Organization -from sentry.models.organizationmapping import OrganizationMapping -from sentry.models.project import Project -from sentry.sentry_apps.api.serializers.app_platform_event import AppPlatformEvent -from sentry.sentry_apps.models.sentry_app import VALID_EVENTS, SentryApp -from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation -from sentry.sentry_apps.models.servicehook import ServiceHook, ServiceHookProject -from sentry.sentry_apps.services.app.service import ( - app_service, - get_by_application_id, - get_installation, +from sentry.eventstore.models import Event +from sentry.sentry_apps.tasks.sentry_apps import CONTROL_TASK_OPTIONS, TASK_OPTIONS +from sentry.sentry_apps.tasks.sentry_apps import build_comment_webhook as new_build_comment_webhook +from sentry.sentry_apps.tasks.sentry_apps import clear_region_cache as new_clear_region_cache +from sentry.sentry_apps.tasks.sentry_apps import ( + create_or_update_service_hooks_for_sentry_app as new_create_or_update_service_hooks_for_sentry_app, ) -from sentry.shared_integrations.exceptions import ApiHostError, ApiTimeoutError, ClientError -from sentry.silo.base import SiloMode -from sentry.tasks.base import instrumented_task, retry -from sentry.users.services.user.service import user_service -from sentry.utils import metrics -from sentry.utils.http import absolute_uri -from sentry.utils.sentry_apps import send_and_save_webhook_request -from sentry.utils.sentry_apps.service_hook_manager import ( - create_or_update_service_hooks_for_installation, +from sentry.sentry_apps.tasks.sentry_apps import installation_webhook as new_installation_webhook +from sentry.sentry_apps.tasks.sentry_apps import ( + process_resource_change_bound as new_process_resource_change_bound, ) - -logger = logging.getLogger("sentry.tasks.sentry_apps") - -TASK_OPTIONS = { - "queue": "app_platform", - "default_retry_delay": (60 * 5), # Five minutes. - "max_retries": 3, - "record_timing": True, - "silo_mode": SiloMode.REGION, -} -CONTROL_TASK_OPTIONS = { - "queue": "app_platform.control", - "default_retry_delay": (60 * 5), # Five minutes. - "max_retries": 3, - "silo_mode": SiloMode.CONTROL, -} - -retry_decorator = retry( - on=(RequestException, ApiHostError, ApiTimeoutError), - ignore=(ClientError,), +from sentry.sentry_apps.tasks.sentry_apps import retry_decorator +from sentry.sentry_apps.tasks.sentry_apps import send_alert_event as new_send_alert_event +from sentry.sentry_apps.tasks.sentry_apps import ( + send_resource_change_webhook as new_send_resource_change_webhook, ) - -# We call some models by a different name, publicly, than their class name. -# For example the model Group is called "Issue" in the UI. We want the Service -# Hook events to match what we externally call these primitives. -RESOURCE_RENAMES = {"Group": "issue"} - -TYPES = {"Group": Group, "Error": Event, "Comment": Activity} - - -def _webhook_event_data(event, group_id, project_id): - project = Project.objects.get_from_cache(id=project_id) - organization = Organization.objects.get_from_cache(id=project.organization_id) - - event_context = event.as_dict() - event_context["url"] = absolute_uri( - reverse( - "sentry-api-0-project-event-details", - args=[project.organization.slug, project.slug, event.event_id], - ) - ) - - event_context["web_url"] = absolute_uri( - reverse( - "sentry-organization-event-detail", args=[organization.slug, group_id, event.event_id] - ) - ) - - # The URL has a regex OR in it ("|") which means `reverse` cannot generate - # a valid URL (it can't know which option to pick). We have to manually - # create this URL for, that reason. - event_context["issue_url"] = absolute_uri(f"/api/0/issues/{group_id}/") - event_context["issue_id"] = str(group_id) - return event_context +from sentry.sentry_apps.tasks.sentry_apps import workflow_notification as new_workflow_notification +from sentry.tasks.base import instrumented_task @instrumented_task(name="sentry.tasks.sentry_apps.send_alert_event", **TASK_OPTIONS) @@ -103,357 +30,68 @@ def send_alert_event( additional_payload_key: str | None = None, additional_payload: Mapping[str, Any] | None = None, ) -> None: - """ - When an incident alert is triggered, send incident data to the SentryApp's webhook. - :param event: The `Event` for which to build a payload. - :param rule: The AlertRule that was triggered. - :param sentry_app_id: The SentryApp to notify. - :param additional_payload_key: The key used to attach additional data to the webhook payload - :param additional_payload: The extra data attached to the payload body at the key specified by `additional_payload_key`. - :return: - """ - group = event.group - project = Project.objects.get_from_cache(id=group.project_id) - organization = Organization.objects.get_from_cache(id=project.organization_id) - - extra = { - "sentry_app_id": sentry_app_id, - "project_slug": project.slug, - "organization_slug": organization.slug, - "rule": rule, - } - - sentry_app = app_service.get_sentry_app_by_id(id=sentry_app_id) - if sentry_app is None: - logger.info("event_alert_webhook.missing_sentry_app", extra=extra) - return - - installations = app_service.get_many( - filter=dict( - organization_id=organization.id, - app_ids=[sentry_app.id], - status=SentryAppInstallationStatus.INSTALLED, - ) + new_send_alert_event( + event=event, + rule=rule, + sentry_app_id=sentry_app_id, + additional_payload_key=additional_payload_key, + additional_payload=additional_payload, ) - if not installations: - logger.info("event_alert_webhook.missing_installation", extra=extra) - return - (install,) = installations - - event_context = _webhook_event_data(event, group.id, project.id) - - data = {"event": event_context, "triggered_rule": rule} - - # Attach extra payload to the webhook - if additional_payload_key and additional_payload: - data[additional_payload_key] = additional_payload - - request_data = AppPlatformEvent( - resource="event_alert", action="triggered", install=install, data=data - ) - - send_and_save_webhook_request(sentry_app, request_data) - - # On success, record analytic event for Alert Rule UI Component - if request_data.data.get("issue_alert"): - analytics.record( - "alert_rule_ui_component_webhook.sent", - organization_id=organization.id, - sentry_app_id=sentry_app_id, - event=f"{request_data.resource}.{request_data.action}", - ) - - -def _process_resource_change(action, sender, instance_id, retryer=None, *args, **kwargs): - # The class is serialized as a string when enqueueing the class. - model = TYPES[sender] - # The Event model has different hooks for the different event types. The sender - # determines which type eg. Error and therefore the 'name' eg. error - if issubclass(model, Event): - if not kwargs.get("instance"): - extra = {"sender": sender, "action": action, "event_id": instance_id} - logger.info("process_resource_change.event_missing_event", extra=extra) - return - name = sender.lower() - else: - # Some resources are named differently than their model. eg. Group vs Issue. - # Looks up the human name for the model. Defaults to the model name. - name = RESOURCE_RENAMES.get(model.__name__, model.__name__.lower()) - - # By default, use Celery's `current_task` but allow a value to be passed for the - # bound Task. - retryer = retryer or current_task - - # We may run into a race condition where this task executes before the - # transaction that creates the Group has committed. - try: - if issubclass(model, Event): - # XXX:(Meredith): Passing through the entire event was an intentional choice - # to avoid having to query NodeStore again for data we had previously in - # post_process. While this is not ideal, changing this will most likely involve - # an overhaul of how we do things in post_process, not just this task alone. - instance = kwargs.get("instance") - else: - instance = model.objects.get(id=instance_id) - except model.DoesNotExist as e: - # Explicitly requeue the task, so we don't report this to Sentry until - # we hit the max number of retries. - return retryer.retry(exc=e) - - event = f"{name}.{action}" - - if event not in VALID_EVENTS: - return - - org = None - - if isinstance(instance, (Group, Event, GroupEvent)): - org = Organization.objects.get_from_cache( - id=Project.objects.get_from_cache(id=instance.project_id).organization_id - ) - - installations = filter( - lambda i: event in i.sentry_app.events, - app_service.get_installed_for_organization(organization_id=org.id), - ) - - for installation in installations: - data = {} - if isinstance(instance, Event) or isinstance(instance, GroupEvent): - data[name] = _webhook_event_data(instance, instance.group_id, instance.project_id) - else: - data[name] = serialize(instance) - - # Trigger a new task for each webhook - send_resource_change_webhook.delay(installation_id=installation.id, event=event, data=data) @instrumented_task("sentry.tasks.process_resource_change_bound", bind=True, **TASK_OPTIONS) @retry_decorator def process_resource_change_bound(self, action, sender, instance_id, *args, **kwargs): - _process_resource_change(action, sender, instance_id, retryer=self, *args, **kwargs) + new_process_resource_change_bound( + self=self, action=action, sender=sender, instance_id=instance_id, *args, **kwargs + ) @instrumented_task(name="sentry.tasks.sentry_apps.installation_webhook", **CONTROL_TASK_OPTIONS) @retry_decorator def installation_webhook(installation_id, user_id, *args, **kwargs): - from sentry.mediators.sentry_app_installations.installation_notifier import InstallationNotifier - - extra = {"installation_id": installation_id, "user_id": user_id} - try: - # we should send the webhook for pending installations on the install event in case that's part of the workflow - install = SentryAppInstallation.objects.get(id=installation_id) - except SentryAppInstallation.DoesNotExist: - logger.info("installation_webhook.missing_installation", extra=extra) - return - - user = user_service.get_user(user_id=user_id) - if not user: - logger.info("installation_webhook.missing_user", extra=extra) - return - - InstallationNotifier.run(install=install, user=user, action="created") + new_installation_webhook(installation_id=installation_id, user_id=user_id, *args, **kwargs) @instrumented_task( name="sentry.sentry_apps.tasks.installations.clear_region_cache", **CONTROL_TASK_OPTIONS ) def clear_region_cache(sentry_app_id: int, region_name: str) -> None: - try: - sentry_app = SentryApp.objects.get(id=sentry_app_id) - except SentryApp.DoesNotExist: - return - - # When a sentry app's definition changes purge cache for all the installations. - # This could get slow for large applications, but generally big applications don't change often. - install_query = SentryAppInstallation.objects.filter( - sentry_app=sentry_app, - ).values("id", "organization_id") - - # There isn't a constraint on org : sentryapp so we have to handle lists - install_map: dict[int, list[int]] = defaultdict(list) - for install_row in install_query: - install_map[install_row["organization_id"]].append(install_row["id"]) - - # Clear application_id cache - region_caching_service.clear_key( - key=get_by_application_id.key_from(sentry_app.application_id), region_name=region_name - ) - - # Limit our operations to the region this outbox is for. - # This could be a single query if we use raw_sql. - region_query = OrganizationMapping.objects.filter( - organization_id__in=list(install_map.keys()), region_name=region_name - ).values("organization_id") - for region_row in region_query: - installs = install_map[region_row["organization_id"]] - for install_id in installs: - region_caching_service.clear_key( - key=get_installation.key_from(install_id), region_name=region_name - ) + new_clear_region_cache(sentry_app_id=sentry_app_id, region_name=region_name) @instrumented_task(name="sentry.tasks.sentry_apps.workflow_notification", **TASK_OPTIONS) @retry_decorator def workflow_notification(installation_id, issue_id, type, user_id, *args, **kwargs): - webhook_data = get_webhook_data(installation_id, issue_id, user_id) - if not webhook_data: - return - install, issue, user = webhook_data - data = kwargs.get("data", {}) - data.update({"issue": serialize(issue)}) - send_webhooks(installation=install, event=f"issue.{type}", data=data, actor=user) - analytics.record( - f"sentry_app.issue.{type}", - user_id=user_id, - group_id=issue_id, + new_workflow_notification( installation_id=installation_id, + issue_id=issue_id, + type=type, + user_id=user_id, + *args, + **kwargs, ) @instrumented_task(name="sentry.tasks.sentry_apps.build_comment_webhook", **TASK_OPTIONS) @retry_decorator def build_comment_webhook(installation_id, issue_id, type, user_id, *args, **kwargs): - webhook_data = get_webhook_data(installation_id, issue_id, user_id) - if not webhook_data: - return - install, _, user = webhook_data - data = kwargs.get("data", {}) - project_slug = data.get("project_slug") - comment_id = data.get("comment_id") - payload = { - "comment_id": data.get("comment_id"), - "issue_id": issue_id, - "project_slug": data.get("project_slug"), - "timestamp": data.get("timestamp"), - "comment": data.get("comment"), - } - send_webhooks(installation=install, event=type, data=payload, actor=user) - # `type` is comment.created, comment.updated, or comment.deleted - analytics.record( - type, - user_id=user_id, - group_id=issue_id, - project_slug=project_slug, + new_build_comment_webhook( installation_id=installation_id, - comment_id=comment_id, + issue_id=issue_id, + type=type, + user_id=user_id, + *args, + **kwargs, ) -def get_webhook_data(installation_id, issue_id, user_id): - extra = {"installation_id": installation_id, "issue_id": issue_id} - install = app_service.installation_by_id(id=installation_id) - if not install: - logger.info("workflow_notification.missing_installation", extra=extra) - return - - try: - issue = Group.objects.get(id=issue_id) - except Group.DoesNotExist: - logger.info("workflow_notification.missing_issue", extra=extra) - return - - user = None - if user_id: - user = user_service.get_user(user_id=user_id) - if not user: - logger.info("workflow_notification.missing_user", extra=extra) - - return (install, issue, user) - - @instrumented_task("sentry.tasks.send_process_resource_change_webhook", **TASK_OPTIONS) @retry_decorator def send_resource_change_webhook(installation_id, event, data, *args, **kwargs): - installation = app_service.installation_by_id(id=installation_id) - if not installation: - logger.info( - "send_process_resource_change_webhook.missing_installation", - extra={"installation_id": installation_id, "event": event}, - ) - return - - send_webhooks(installation, event, data=data) - - metrics.incr("resource_change.processed", sample_rate=1.0, tags={"change_event": event}) - - -def notify_sentry_app(event, futures): - for f in futures: - if not f.kwargs.get("sentry_app"): - continue - - extra_kwargs = { - "additional_payload_key": None, - "additional_payload": None, - } - # If the future comes from a rule with a UI component form in the schema, append the issue alert payload - settings = f.kwargs.get("schema_defined_settings") - if settings: - extra_kwargs["additional_payload_key"] = "issue_alert" - extra_kwargs["additional_payload"] = { - "id": f.rule.id, - "title": f.rule.label, - "sentry_app_id": f.kwargs["sentry_app"].id, - "settings": settings, - } - - send_alert_event.delay( - event=event, - rule=f.rule.label, - sentry_app_id=f.kwargs["sentry_app"].id, - **extra_kwargs, - ) - - -def send_webhooks(installation, event, **kwargs): - try: - servicehook = ServiceHook.objects.get( - organization_id=installation.organization_id, actor_id=installation.id - ) - except ServiceHook.DoesNotExist: - logger.info( - "send_webhooks.missing_servicehook", - extra={"installation_id": installation.id, "event": event}, - ) - return - - if event not in servicehook.events: - return - - # The service hook applies to all projects if there are no - # ServiceHookProject records. Otherwise we want check if - # the event is within the allowed projects. - project_limited = ServiceHookProject.objects.filter(service_hook_id=servicehook.id).exists() - - # TODO(nola): This is disabled for now, because it could potentially affect internal integrations w/ error.created - # # If the event is error.created & the request is going out to the Org that owns the Sentry App, - # # Make sure we don't send the request, to prevent potential infinite loops - # if ( - # event == "error.created" - # and installation.organization_id == installation.sentry_app.owner_id - # ): - # # We just want to exclude error.created from the project that the integration lives in - # # Need to first implement project mapping for integration partners - # metrics.incr( - # "webhook_request.dropped", - # tags={"sentry_app": installation.sentry_app.id, "event": event}, - # ) - # return - - if not project_limited: - resource, action = event.split(".") - - kwargs["resource"] = resource - kwargs["action"] = action - kwargs["install"] = installation - - request_data = AppPlatformEvent(**kwargs) - send_and_save_webhook_request( - installation.sentry_app, - request_data, - servicehook.sentry_app.webhook_url, - ) + new_send_resource_change_webhook( + installation_id=installation_id, event=event, data=data, *args, **kwargs + ) @instrumented_task( @@ -462,10 +100,6 @@ def send_webhooks(installation, event, **kwargs): def create_or_update_service_hooks_for_sentry_app( sentry_app_id: int, webhook_url: str, events: list[str], **kwargs: dict ) -> None: - installations = SentryAppInstallation.objects.filter(sentry_app_id=sentry_app_id) - for installation in installations: - create_or_update_service_hooks_for_installation( - installation=installation, - events=events, - webhook_url=webhook_url, - ) + new_create_or_update_service_hooks_for_sentry_app( + sentry_app_id=sentry_app_id, webhook_url=webhook_url, events=events, **kwargs + ) From 96e9775178349908740a94a5f57b6b029978b67c Mon Sep 17 00:00:00 2001 From: Christinarlong Date: Mon, 30 Sep 2024 13:09:35 -0700 Subject: [PATCH 03/13] add barrel file --- src/sentry/sentry_apps/tasks/__init__.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 src/sentry/sentry_apps/tasks/__init__.py diff --git a/src/sentry/sentry_apps/tasks/__init__.py b/src/sentry/sentry_apps/tasks/__init__.py new file mode 100644 index 00000000000000..62d7eee5405b7d --- /dev/null +++ b/src/sentry/sentry_apps/tasks/__init__.py @@ -0,0 +1,23 @@ +from .sentry_apps import ( + build_comment_webhook, + clear_region_cache, + create_or_update_service_hooks_for_sentry_app, + installation_webhook, + process_resource_change_bound, + send_alert_event, + send_resource_change_webhook, + workflow_notification, +) +from .service_hooks import process_service_hook + +__all__ = ( + "send_alert_event", + "build_comment_webhook", + "clear_region_cache", + "create_or_update_service_hooks_for_sentry_app", + "installation_webhook", + "process_resource_change_bound", + "send_resource_change_webhook", + "workflow_notification", + "process_service_hook", +) From c1ba2b1a4b590508dc9f196c3cef46481e82d039 Mon Sep 17 00:00:00 2001 From: Christinarlong Date: Mon, 30 Sep 2024 13:33:56 -0700 Subject: [PATCH 04/13] add to celery imports --- src/sentry/conf/server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index dde74ea409b50e..1a42edeef8df37 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -751,6 +751,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: "sentry.integrations.github.tasks.pr_comment", "sentry.integrations.jira.tasks", "sentry.integrations.opsgenie.tasks", + "sentry.sentry_apps.tasks", "sentry.snuba.tasks", "sentry.replays.tasks", "sentry.monitors.tasks.clock_pulse", From bb32bda856611cf4cc112d606ebc8587e85e263c Mon Sep 17 00:00:00 2001 From: Christinarlong Date: Mon, 30 Sep 2024 13:40:21 -0700 Subject: [PATCH 05/13] update imports --- src/sentry/rules/actions/sentry_apps/notify_event.py | 2 +- src/sentry/tasks/post_process.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sentry/rules/actions/sentry_apps/notify_event.py b/src/sentry/rules/actions/sentry_apps/notify_event.py index 4fad36d35a71ff..4ddb8e7c049e06 100644 --- a/src/sentry/rules/actions/sentry_apps/notify_event.py +++ b/src/sentry/rules/actions/sentry_apps/notify_event.py @@ -15,7 +15,7 @@ RpcSentryAppEventData, app_service, ) -from sentry.tasks.sentry_apps import notify_sentry_app +from sentry.sentry_apps.tasks.sentry_apps import notify_sentry_app ValidationError = serializers.ValidationError diff --git a/src/sentry/tasks/post_process.py b/src/sentry/tasks/post_process.py index a1aca3af81357a..7b697a25cfc3e9 100644 --- a/src/sentry/tasks/post_process.py +++ b/src/sentry/tasks/post_process.py @@ -1179,7 +1179,7 @@ def process_resource_change_bounds(job: PostProcessJob) -> None: if job["is_reprocessed"]: return - from sentry.tasks.sentry_apps import process_resource_change_bound + from sentry.sentry_apps.tasks.sentry_apps import process_resource_change_bound event, is_new = job["event"], job["group_state"]["is_new"] From 973ad422a574014d927a2354f7e7fe76d3df432a Mon Sep 17 00:00:00 2001 From: Christinarlong Date: Mon, 30 Sep 2024 14:02:27 -0700 Subject: [PATCH 06/13] move tests --- tests/sentry/sentry_apps/tasks/__init__.py | 0 tests/sentry/{ => sentry_apps}/tasks/test_sentry_apps.py | 6 +++--- tests/sentry/{ => sentry_apps}/tasks/test_servicehooks.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) create mode 100644 tests/sentry/sentry_apps/tasks/__init__.py rename tests/sentry/{ => sentry_apps}/tasks/test_sentry_apps.py (99%) rename tests/sentry/{ => sentry_apps}/tasks/test_servicehooks.py (96%) diff --git a/tests/sentry/sentry_apps/tasks/__init__.py b/tests/sentry/sentry_apps/tasks/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/tests/sentry/tasks/test_sentry_apps.py b/tests/sentry/sentry_apps/tasks/test_sentry_apps.py similarity index 99% rename from tests/sentry/tasks/test_sentry_apps.py rename to tests/sentry/sentry_apps/tasks/test_sentry_apps.py index 5cc8ca5b653f1d..68ba388532464b 100644 --- a/tests/sentry/tasks/test_sentry_apps.py +++ b/tests/sentry/sentry_apps/tasks/test_sentry_apps.py @@ -20,9 +20,7 @@ from sentry.models.rule import Rule from sentry.sentry_apps.models.sentry_app import SentryApp from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation -from sentry.shared_integrations.exceptions import ClientError -from sentry.tasks.post_process import post_process_group -from sentry.tasks.sentry_apps import ( +from sentry.sentry_apps.tasks.sentry_apps import ( build_comment_webhook, installation_webhook, notify_sentry_app, @@ -31,6 +29,8 @@ send_webhooks, workflow_notification, ) +from sentry.shared_integrations.exceptions import ClientError +from sentry.tasks.post_process import post_process_group from sentry.testutils.cases import TestCase from sentry.testutils.helpers import with_feature from sentry.testutils.helpers.datetime import before_now, freeze_time, iso_format diff --git a/tests/sentry/tasks/test_servicehooks.py b/tests/sentry/sentry_apps/tasks/test_servicehooks.py similarity index 96% rename from tests/sentry/tasks/test_servicehooks.py rename to tests/sentry/sentry_apps/tasks/test_servicehooks.py index e7a790a1d7daaf..b3ca386f3f19ee 100644 --- a/tests/sentry/tasks/test_servicehooks.py +++ b/tests/sentry/sentry_apps/tasks/test_servicehooks.py @@ -2,7 +2,7 @@ import responses -from sentry.tasks.servicehooks import get_payload_v0, process_service_hook +from sentry.sentry_apps.tasks.service_hooks import get_payload_v0, process_service_hook from sentry.testutils.cases import TestCase from sentry.testutils.helpers.datetime import before_now, iso_format from sentry.testutils.skips import requires_snuba From d15e09d9354f7cb6b8ceac9aa6cea66d3facce26 Mon Sep 17 00:00:00 2001 From: Christinarlong Date: Mon, 30 Sep 2024 14:20:05 -0700 Subject: [PATCH 07/13] im mega trolling, i was filtering only files in tasks/ ;-; --- src/sentry/rules/actions/notify_event_service.py | 2 +- tests/sentry/rules/actions/test_notify_event_sentry_app.py | 2 +- tests/sentry/rules/actions/test_notify_event_service.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sentry/rules/actions/notify_event_service.py b/src/sentry/rules/actions/notify_event_service.py index 3e5f8e31d8b172..41802166746102 100644 --- a/src/sentry/rules/actions/notify_event_service.py +++ b/src/sentry/rules/actions/notify_event_service.py @@ -20,7 +20,7 @@ from sentry.rules.base import CallbackFuture from sentry.sentry_apps.api.serializers.app_platform_event import AppPlatformEvent from sentry.sentry_apps.services.app import RpcSentryAppService, app_service -from sentry.tasks.sentry_apps import notify_sentry_app +from sentry.sentry_apps.tasks.sentry_apps import notify_sentry_app from sentry.utils import json, metrics from sentry.utils.forms import set_field_choices diff --git a/tests/sentry/rules/actions/test_notify_event_sentry_app.py b/tests/sentry/rules/actions/test_notify_event_sentry_app.py index 6dd1715c5a68f2..84c58745cf9c95 100644 --- a/tests/sentry/rules/actions/test_notify_event_sentry_app.py +++ b/tests/sentry/rules/actions/test_notify_event_sentry_app.py @@ -4,8 +4,8 @@ from rest_framework import serializers from sentry.rules.actions.sentry_apps import NotifyEventSentryAppAction +from sentry.sentry_apps.tasks.sentry_apps import notify_sentry_app from sentry.silo.base import SiloMode -from sentry.tasks.sentry_apps import notify_sentry_app from sentry.testutils.cases import RuleTestCase from sentry.testutils.silo import assume_test_silo_mode from sentry.testutils.skips import requires_snuba diff --git a/tests/sentry/rules/actions/test_notify_event_service.py b/tests/sentry/rules/actions/test_notify_event_service.py index ddaf6af2cb94ba..f10158da709290 100644 --- a/tests/sentry/rules/actions/test_notify_event_service.py +++ b/tests/sentry/rules/actions/test_notify_event_service.py @@ -3,8 +3,8 @@ from django.utils import timezone from sentry.rules.actions.notify_event_service import NotifyEventServiceAction +from sentry.sentry_apps.tasks.sentry_apps import notify_sentry_app from sentry.silo.base import SiloMode -from sentry.tasks.sentry_apps import notify_sentry_app from sentry.testutils.cases import RuleTestCase from sentry.testutils.silo import assume_test_silo_mode from sentry.testutils.skips import requires_snuba From 4dc849a4939a50204480b45aa327bfc73b23cbb9 Mon Sep 17 00:00:00 2001 From: Christinarlong Date: Thu, 3 Oct 2024 15:23:26 -0700 Subject: [PATCH 08/13] mvoe over logic to new tasks and update import paths --- src/sentry/celery.py | 2 +- src/sentry/receivers/outbox/control.py | 2 +- src/sentry/receivers/sentry_apps.py | 2 +- src/sentry/sentry_apps/installations.py | 2 +- src/sentry/sentry_apps/logic.py | 2 +- src/sentry/sentry_apps/tasks/sentry_apps.py | 122 +++-- src/sentry/sentry_apps/tasks/service_hooks.py | 50 ++- src/sentry/tasks/post_process.py | 2 +- src/sentry/tasks/sentry_apps.py | 415 ++---------------- src/sentry/tasks/servicehooks.py | 50 +-- tests/sentry/receivers/outbox/test_control.py | 2 +- tests/sentry/receivers/test_sentry_apps.py | 8 +- .../sentry_apps/tasks/test_sentry_apps.py | 2 +- .../sentry_apps/tasks/test_servicehooks.py | 4 +- .../sentry_apps/test_sentry_app_creator.py | 2 +- tests/sentry/tasks/test_post_process.py | 22 +- 16 files changed, 186 insertions(+), 503 deletions(-) diff --git a/src/sentry/celery.py b/src/sentry/celery.py index d3d723a08a3152..384ec1802abaaa 100644 --- a/src/sentry/celery.py +++ b/src/sentry/celery.py @@ -15,7 +15,7 @@ # basic tasks that must be passed models still "sentry.tasks.process_buffer.process_incr", "sentry.tasks.process_resource_change_bound", - "sentry.tasks.sentry_apps.send_alert_event", + "sentry.sentry_apps.tasks.sentry_apps.send_alert_event", "sentry.tasks.unmerge", "src.sentry.notifications.utils.async_send_notification", # basic tasks that can already deal with primary keys passed diff --git a/src/sentry/receivers/outbox/control.py b/src/sentry/receivers/outbox/control.py index 46931fe263dbc5..a9b7d9c3f7db43 100644 --- a/src/sentry/receivers/outbox/control.py +++ b/src/sentry/receivers/outbox/control.py @@ -24,7 +24,7 @@ from sentry.receivers.outbox import maybe_process_tombstone from sentry.relocation.services.relocation_export.service import region_relocation_export_service from sentry.sentry_apps.models.sentry_app import SentryApp -from sentry.tasks.sentry_apps import clear_region_cache +from sentry.sentry_apps.tasks.sentry_apps import clear_region_cache logger = logging.getLogger(__name__) diff --git a/src/sentry/receivers/sentry_apps.py b/src/sentry/receivers/sentry_apps.py index 8bb125988d9606..5a46a0878eeea3 100644 --- a/src/sentry/receivers/sentry_apps.py +++ b/src/sentry/receivers/sentry_apps.py @@ -12,6 +12,7 @@ from sentry.models.team import Team from sentry.sentry_apps.logic import consolidate_events from sentry.sentry_apps.services.app import RpcSentryAppInstallation, app_service +from sentry.sentry_apps.tasks.sentry_apps import build_comment_webhook, workflow_notification from sentry.signals import ( comment_created, comment_deleted, @@ -22,7 +23,6 @@ issue_resolved, issue_unresolved, ) -from sentry.tasks.sentry_apps import build_comment_webhook, workflow_notification from sentry.users.models.user import User from sentry.users.services.user import RpcUser diff --git a/src/sentry/sentry_apps/installations.py b/src/sentry/sentry_apps/installations.py index b28a3aa5c7ddcb..7096db18d85fad 100644 --- a/src/sentry/sentry_apps/installations.py +++ b/src/sentry/sentry_apps/installations.py @@ -17,7 +17,7 @@ from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation from sentry.sentry_apps.models.sentry_app_installation_token import SentryAppInstallationToken from sentry.sentry_apps.services.hook import hook_service -from sentry.tasks.sentry_apps import installation_webhook +from sentry.sentry_apps.tasks.sentry_apps import installation_webhook from sentry.users.models.user import User from sentry.users.services.user.model import RpcUser from sentry.utils import metrics diff --git a/src/sentry/sentry_apps/logic.py b/src/sentry/sentry_apps/logic.py index f99226d4b4b5e9..bcdfa205037213 100644 --- a/src/sentry/sentry_apps/logic.py +++ b/src/sentry/sentry_apps/logic.py @@ -37,7 +37,7 @@ ) from sentry.sentry_apps.models.sentry_app_component import SentryAppComponent from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation -from sentry.tasks.sentry_apps import create_or_update_service_hooks_for_sentry_app +from sentry.sentry_apps.tasks.sentry_apps import create_or_update_service_hooks_for_sentry_app from sentry.users.models.user import User from sentry.users.services.user.model import RpcUser from sentry.utils.sentry_apps.service_hook_manager import ( diff --git a/src/sentry/sentry_apps/tasks/sentry_apps.py b/src/sentry/sentry_apps/tasks/sentry_apps.py index c51ee6cf9b4968..0e775ace10912e 100644 --- a/src/sentry/sentry_apps/tasks/sentry_apps.py +++ b/src/sentry/sentry_apps/tasks/sentry_apps.py @@ -5,14 +5,16 @@ from collections.abc import Mapping from typing import Any -from celery import current_task +from celery import Task, current_task from django.urls import reverse from requests.exceptions import RequestException from sentry import analytics from sentry.api.serializers import serialize from sentry.constants import SentryAppInstallationStatus +from sentry.db.models.base import Model from sentry.eventstore.models import Event, GroupEvent +from sentry.eventtypes.base import BaseEvent from sentry.hybridcloud.rpc.caching import region_caching_service from sentry.models.activity import Activity from sentry.models.group import Group @@ -23,6 +25,7 @@ from sentry.sentry_apps.models.sentry_app import VALID_EVENTS, SentryApp from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation from sentry.sentry_apps.models.servicehook import ServiceHook, ServiceHookProject +from sentry.sentry_apps.services.app.model import RpcSentryAppInstallation from sentry.sentry_apps.services.app.service import ( app_service, get_by_application_id, @@ -31,6 +34,7 @@ from sentry.shared_integrations.exceptions import ApiHostError, ApiTimeoutError, ClientError from sentry.silo.base import SiloMode from sentry.tasks.base import instrumented_task, retry +from sentry.users.services.user.model import RpcUser from sentry.users.services.user.service import user_service from sentry.utils import metrics from sentry.utils.http import absolute_uri @@ -39,7 +43,7 @@ create_or_update_service_hooks_for_installation, ) -logger = logging.getLogger("sentry.tasks.sentry_apps") +logger = logging.getLogger("sentry.sentry_apps.tasks.sentry_apps") TASK_OPTIONS = { "queue": "app_platform", @@ -68,7 +72,9 @@ TYPES = {"Group": Group, "Error": Event, "Comment": Activity} -def _webhook_event_data(event, group_id, project_id): +def _webhook_event_data( + event: Event | GroupEvent, group_id: int, project_id: int +) -> dict[str, Any]: project = Project.objects.get_from_cache(id=project_id) organization = Organization.objects.get_from_cache(id=project.organization_id) @@ -113,6 +119,7 @@ def send_alert_event( :return: """ group = event.group + assert group, "Group must exist to get related attributes" project = Project.objects.get_from_cache(id=group.project_id) organization = Organization.objects.get_from_cache(id=project.organization_id) @@ -164,9 +171,17 @@ def send_alert_event( ) -def _process_resource_change(action, sender, instance_id, retryer=None, *args, **kwargs): +def _process_resource_change( + *, + action: str, + sender: str, + instance_id: int, + retryer: Task | None = None, + **kwargs: Any, +) -> None: # The class is serialized as a string when enqueueing the class. - model = TYPES[sender] + model: type[Event] | type[Model] = TYPES[sender] + instance: Event | Model | None = None # The Event model has different hooks for the different event types. The sender # determines which type eg. Error and therefore the 'name' eg. error if issubclass(model, Event): @@ -186,19 +201,19 @@ def _process_resource_change(action, sender, instance_id, retryer=None, *args, * # We may run into a race condition where this task executes before the # transaction that creates the Group has committed. - try: - if issubclass(model, Event): - # XXX:(Meredith): Passing through the entire event was an intentional choice - # to avoid having to query NodeStore again for data we had previously in - # post_process. While this is not ideal, changing this will most likely involve - # an overhaul of how we do things in post_process, not just this task alone. - instance = kwargs.get("instance") - else: + if issubclass(model, Event): + # XXX:(Meredith): Passing through the entire event was an intentional choice + # to avoid having to query NodeStore again for data we had previously in + # post_process. While this is not ideal, changing this will most likely involve + # an overhaul of how we do things in post_process, not just this task alone. + instance = kwargs.get("instance") + else: + try: instance = model.objects.get(id=instance_id) - except model.DoesNotExist as e: - # Explicitly requeue the task, so we don't report this to Sentry until - # we hit the max number of retries. - return retryer.retry(exc=e) + except model.DoesNotExist as e: + # Explicitly requeue the task, so we don't report this to Sentry until + # we hit the max number of retries. + return retryer.retry(exc=e) event = f"{name}.{action}" @@ -211,36 +226,44 @@ def _process_resource_change(action, sender, instance_id, retryer=None, *args, * org = Organization.objects.get_from_cache( id=Project.objects.get_from_cache(id=instance.project_id).organization_id ) - - installations = filter( - lambda i: event in i.sentry_app.events, - app_service.get_installed_for_organization(organization_id=org.id), - ) - - for installation in installations: - data = {} - if isinstance(instance, Event) or isinstance(instance, GroupEvent): - data[name] = _webhook_event_data(instance, instance.group_id, instance.project_id) - else: - data[name] = serialize(instance) - - # Trigger a new task for each webhook - send_resource_change_webhook.delay(installation_id=installation.id, event=event, data=data) + assert org, "organization must exist to get related sentry app installations" + installations: list[RpcSentryAppInstallation] = [ + installation + for installation in app_service.get_installed_for_organization(organization_id=org.id) + if event in installation.sentry_app.events + ] + + for installation in installations: + data = {} + if isinstance(instance, (Event, GroupEvent)): + assert instance.group_id, "group id is required to create webhook event data" + data[name] = _webhook_event_data(instance, instance.group_id, instance.project_id) + else: + data[name] = serialize(instance) + + # Trigger a new task for each webhook + send_resource_change_webhook.delay( + installation_id=installation.id, event=event, data=data + ) @instrumented_task( "sentry.sentry_apps.tasks.sentry_apps.process_resource_change_bound", bind=True, **TASK_OPTIONS ) @retry_decorator -def process_resource_change_bound(self, action, sender, instance_id, *args, **kwargs): - _process_resource_change(action, sender, instance_id, retryer=self, *args, **kwargs) +def process_resource_change_bound( + self: Task, action: str, sender: str, instance_id: int, **kwargs: Any +) -> None: + _process_resource_change( + action=action, sender=sender, instance_id=instance_id, retryer=self, **kwargs + ) @instrumented_task( name="sentry.sentry_apps.tasks.sentry_apps.installation_webhook", **CONTROL_TASK_OPTIONS ) @retry_decorator -def installation_webhook(installation_id, user_id, *args, **kwargs): +def installation_webhook(installation_id: int, user_id: int, *args: Any, **kwargs: Any) -> None: from sentry.mediators.sentry_app_installations.installation_notifier import InstallationNotifier extra = {"installation_id": installation_id, "user_id": user_id} @@ -301,7 +324,9 @@ def clear_region_cache(sentry_app_id: int, region_name: str) -> None: name="sentry.sentry_apps.tasks.sentry_apps.workflow_notification", **TASK_OPTIONS ) @retry_decorator -def workflow_notification(installation_id, issue_id, type, user_id, *args, **kwargs): +def workflow_notification( + installation_id: int, issue_id: int, type: str, user_id: int, *args: Any, **kwargs: Any +) -> None: webhook_data = get_webhook_data(installation_id, issue_id, user_id) if not webhook_data: return @@ -321,10 +346,12 @@ def workflow_notification(installation_id, issue_id, type, user_id, *args, **kwa name="sentry.sentry_apps.tasks.sentry_apps.build_comment_webhook", **TASK_OPTIONS ) @retry_decorator -def build_comment_webhook(installation_id, issue_id, type, user_id, *args, **kwargs): +def build_comment_webhook( + installation_id: int, issue_id: int, type: str, user_id: int, *args: Any, **kwargs: Any +) -> None: webhook_data = get_webhook_data(installation_id, issue_id, user_id) if not webhook_data: - return + return None install, _, user = webhook_data data = kwargs.get("data", {}) project_slug = data.get("project_slug") @@ -348,18 +375,20 @@ def build_comment_webhook(installation_id, issue_id, type, user_id, *args, **kwa ) -def get_webhook_data(installation_id, issue_id, user_id): +def get_webhook_data( + installation_id: int, issue_id: int, user_id: int +) -> tuple[RpcSentryAppInstallation, Group, RpcUser | None] | None: extra = {"installation_id": installation_id, "issue_id": issue_id} install = app_service.installation_by_id(id=installation_id) if not install: logger.info("workflow_notification.missing_installation", extra=extra) - return + return None try: issue = Group.objects.get(id=issue_id) except Group.DoesNotExist: logger.info("workflow_notification.missing_issue", extra=extra) - return + return None user = None if user_id: @@ -374,7 +403,9 @@ def get_webhook_data(installation_id, issue_id, user_id): "sentry.sentry_apps.tasks.sentry_apps.send_process_resource_change_webhook", **TASK_OPTIONS ) @retry_decorator -def send_resource_change_webhook(installation_id, event, data, *args, **kwargs): +def send_resource_change_webhook( + installation_id: int, event: str, data: dict[str, Any], *args: Any, **kwargs: Any +) -> None: installation = app_service.installation_by_id(id=installation_id) if not installation: logger.info( @@ -388,12 +419,12 @@ def send_resource_change_webhook(installation_id, event, data, *args, **kwargs): metrics.incr("resource_change.processed", sample_rate=1.0, tags={"change_event": event}) -def notify_sentry_app(event, futures): +def notify_sentry_app(event: BaseEvent, futures): for f in futures: if not f.kwargs.get("sentry_app"): continue - extra_kwargs = { + extra_kwargs: dict[str, Any] = { "additional_payload_key": None, "additional_payload": None, } @@ -417,6 +448,7 @@ def notify_sentry_app(event, futures): def send_webhooks(installation, event, **kwargs): + servicehook: ServiceHook try: servicehook = ServiceHook.objects.get( organization_id=installation.organization_id, actor_id=installation.id @@ -462,7 +494,7 @@ def send_webhooks(installation, event, **kwargs): send_and_save_webhook_request( installation.sentry_app, request_data, - servicehook.sentry_app.webhook_url, + installation.sentry_app.webhook_url, ) diff --git a/src/sentry/sentry_apps/tasks/service_hooks.py b/src/sentry/sentry_apps/tasks/service_hooks.py index cd9def6898102c..76685918e9f6b0 100644 --- a/src/sentry/sentry_apps/tasks/service_hooks.py +++ b/src/sentry/sentry_apps/tasks/service_hooks.py @@ -1,6 +1,29 @@ +from time import time + +from sentry.api.serializers import serialize +from sentry.http import safe_urlopen +from sentry.sentry_apps.models.servicehook import ServiceHook from sentry.silo.base import SiloMode from sentry.tasks.base import instrumented_task, retry -from sentry.tasks.servicehooks import process_service_hook as old_process_service_hook +from sentry.tsdb.base import TSDBModel +from sentry.utils import json + + +def get_payload_v0(event): + group = event.group + project = group.project + + group_context = serialize(group) + group_context["url"] = group.get_absolute_url() + + event_context = serialize(event) + event_context["url"] = f"{group.get_absolute_url()}events/{event.event_id}/" + data = { + "project": {"slug": project.slug, "name": project.name}, + "group": group_context, + "event": event_context, + } + return data @instrumented_task( @@ -11,4 +34,27 @@ ) @retry def process_service_hook(servicehook_id, event, **kwargs): - old_process_service_hook(servicehook_id=servicehook_id, event=event, **kwargs) + try: + servicehook = ServiceHook.objects.get(id=servicehook_id) + except ServiceHook.DoesNotExist: + return + + if servicehook.version == 0: + payload = get_payload_v0(event) + else: + raise NotImplementedError + + from sentry import tsdb + + tsdb.backend.incr(TSDBModel.servicehook_fired, servicehook.id) + + headers = { + "Content-Type": "application/json", + "X-ServiceHook-Timestamp": str(int(time())), + "X-ServiceHook-GUID": servicehook.guid, + "X-ServiceHook-Signature": servicehook.build_signature(json.dumps(payload)), + } + + safe_urlopen( + url=servicehook.url, data=json.dumps(payload), headers=headers, timeout=5, verify_ssl=False + ) diff --git a/src/sentry/tasks/post_process.py b/src/sentry/tasks/post_process.py index 7b697a25cfc3e9..f3aa26f85933fc 100644 --- a/src/sentry/tasks/post_process.py +++ b/src/sentry/tasks/post_process.py @@ -1160,7 +1160,7 @@ def process_service_hooks(job: PostProcessJob) -> None: if job["is_reprocessed"]: return - from sentry.tasks.servicehooks import process_service_hook + from sentry.sentry_apps.tasks.service_hooks import process_service_hook event, has_alert = job["event"], job["has_alert"] diff --git a/src/sentry/tasks/sentry_apps.py b/src/sentry/tasks/sentry_apps.py index 2c875d774d7e76..a144e7d35ebefc 100644 --- a/src/sentry/tasks/sentry_apps.py +++ b/src/sentry/tasks/sentry_apps.py @@ -1,94 +1,26 @@ -import logging from collections.abc import Mapping from typing import Any -from celery import Task, current_task -from django.urls import reverse -from requests.exceptions import RequestException +from celery import Task -from sentry import analytics -from sentry.api.serializers import serialize -from sentry.constants import SentryAppInstallationStatus -from sentry.db.models.base import Model -from sentry.eventstore.models import BaseEvent, Event, GroupEvent -from sentry.models.activity import Activity -from sentry.models.group import Group -from sentry.models.organization import Organization -from sentry.models.project import Project -from sentry.sentry_apps.api.serializers.app_platform_event import AppPlatformEvent -from sentry.sentry_apps.models.sentry_app import VALID_EVENTS -from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation -from sentry.sentry_apps.models.servicehook import ServiceHook, ServiceHookProject -from sentry.sentry_apps.services.app.model import RpcSentryAppInstallation -from sentry.sentry_apps.services.app.service import app_service +from sentry.eventstore.models import Event +from sentry.sentry_apps.tasks.sentry_apps import CONTROL_TASK_OPTIONS, TASK_OPTIONS +from sentry.sentry_apps.tasks.sentry_apps import build_comment_webhook as new_build_comment_webhook from sentry.sentry_apps.tasks.sentry_apps import clear_region_cache as new_clear_region_cache from sentry.sentry_apps.tasks.sentry_apps import ( create_or_update_service_hooks_for_sentry_app as new_create_or_update_service_hooks_for_sentry_app, ) -from sentry.shared_integrations.exceptions import ApiHostError, ApiTimeoutError, ClientError -from sentry.silo.base import SiloMode -from sentry.tasks.base import instrumented_task, retry -from sentry.users.services.user.model import RpcUser -from sentry.users.services.user.service import user_service -from sentry.utils import metrics -from sentry.utils.http import absolute_uri -from sentry.utils.sentry_apps import send_and_save_webhook_request - -logger = logging.getLogger("sentry.tasks.sentry_apps") - -TASK_OPTIONS = { - "queue": "app_platform", - "default_retry_delay": (60 * 5), # Five minutes. - "max_retries": 3, - "record_timing": True, - "silo_mode": SiloMode.REGION, -} -CONTROL_TASK_OPTIONS = { - "queue": "app_platform.control", - "default_retry_delay": (60 * 5), # Five minutes. - "max_retries": 3, - "silo_mode": SiloMode.CONTROL, -} - -retry_decorator = retry( - on=(RequestException, ApiHostError, ApiTimeoutError), - ignore=(ClientError,), +from sentry.sentry_apps.tasks.sentry_apps import installation_webhook as new_installation_webhook +from sentry.sentry_apps.tasks.sentry_apps import ( + process_resource_change_bound as new_process_resource_change_bound, ) - -# We call some models by a different name, publicly, than their class name. -# For example the model Group is called "Issue" in the UI. We want the Service -# Hook events to match what we externally call these primitives. -RESOURCE_RENAMES = {"Group": "issue"} - -TYPES = {"Group": Group, "Error": Event, "Comment": Activity} - - -def _webhook_event_data( - event: Event | GroupEvent, group_id: int, project_id: int -) -> dict[str, Any]: - project = Project.objects.get_from_cache(id=project_id) - organization = Organization.objects.get_from_cache(id=project.organization_id) - - event_context = event.as_dict() - event_context["url"] = absolute_uri( - reverse( - "sentry-api-0-project-event-details", - args=[project.organization.slug, project.slug, event.event_id], - ) - ) - - event_context["web_url"] = absolute_uri( - reverse( - "sentry-organization-event-detail", args=[organization.slug, group_id, event.event_id] - ) - ) - - # The URL has a regex OR in it ("|") which means `reverse` cannot generate - # a valid URL (it can't know which option to pick). We have to manually - # create this URL for, that reason. - event_context["issue_url"] = absolute_uri(f"/api/0/issues/{group_id}/") - event_context["issue_id"] = str(group_id) - return event_context +from sentry.sentry_apps.tasks.sentry_apps import retry_decorator +from sentry.sentry_apps.tasks.sentry_apps import send_alert_event as new_send_alert_event +from sentry.sentry_apps.tasks.sentry_apps import ( + send_resource_change_webhook as new_send_resource_change_webhook, +) +from sentry.sentry_apps.tasks.sentry_apps import workflow_notification as new_workflow_notification +from sentry.tasks.base import instrumented_task @instrumented_task(name="sentry.tasks.sentry_apps.send_alert_event", **TASK_OPTIONS) @@ -100,173 +32,29 @@ def send_alert_event( additional_payload_key: str | None = None, additional_payload: Mapping[str, Any] | None = None, ) -> None: - """ - When an incident alert is triggered, send incident data to the SentryApp's webhook. - :param event: The `Event` for which to build a payload. - :param rule: The AlertRule that was triggered. - :param sentry_app_id: The SentryApp to notify. - :param additional_payload_key: The key used to attach additional data to the webhook payload - :param additional_payload: The extra data attached to the payload body at the key specified by `additional_payload_key`. - :return: - """ - group = event.group - assert group, "Group must exist to get related attributes" - project = Project.objects.get_from_cache(id=group.project_id) - organization = Organization.objects.get_from_cache(id=project.organization_id) - - extra = { - "sentry_app_id": sentry_app_id, - "project_slug": project.slug, - "organization_slug": organization.slug, - "rule": rule, - } - - sentry_app = app_service.get_sentry_app_by_id(id=sentry_app_id) - if sentry_app is None: - logger.info("event_alert_webhook.missing_sentry_app", extra=extra) - return - - installations = app_service.get_many( - filter=dict( - organization_id=organization.id, - app_ids=[sentry_app.id], - status=SentryAppInstallationStatus.INSTALLED, - ) - ) - if not installations: - logger.info("event_alert_webhook.missing_installation", extra=extra) - return - (install,) = installations - - event_context = _webhook_event_data(event, group.id, project.id) - - data = {"event": event_context, "triggered_rule": rule} - - # Attach extra payload to the webhook - if additional_payload_key and additional_payload: - data[additional_payload_key] = additional_payload - - request_data = AppPlatformEvent( - resource="event_alert", action="triggered", install=install, data=data + new_send_alert_event( + event=event, + rule=rule, + sentry_app_id=sentry_app_id, + additional_payload_key=additional_payload_key, + additional_payload=additional_payload, ) - send_and_save_webhook_request(sentry_app, request_data) - - # On success, record analytic event for Alert Rule UI Component - if request_data.data.get("issue_alert"): - analytics.record( - "alert_rule_ui_component_webhook.sent", - organization_id=organization.id, - sentry_app_id=sentry_app_id, - event=f"{request_data.resource}.{request_data.action}", - ) - - -def _process_resource_change( - *, - action: str, - sender: str, - instance_id: int, - retryer: Task | None = None, - **kwargs: Any, -) -> None: - # The class is serialized as a string when enqueueing the class. - model: type[Event] | type[Model] = TYPES[sender] - instance: Event | Model | None = None - # The Event model has different hooks for the different event types. The sender - # determines which type eg. Error and therefore the 'name' eg. error - if issubclass(model, Event): - if not kwargs.get("instance"): - extra = {"sender": sender, "action": action, "event_id": instance_id} - logger.info("process_resource_change.event_missing_event", extra=extra) - return - name = sender.lower() - else: - # Some resources are named differently than their model. eg. Group vs Issue. - # Looks up the human name for the model. Defaults to the model name. - name = RESOURCE_RENAMES.get(model.__name__, model.__name__.lower()) - - # By default, use Celery's `current_task` but allow a value to be passed for the - # bound Task. - retryer = retryer or current_task - - # We may run into a race condition where this task executes before the - # transaction that creates the Group has committed. - if issubclass(model, Event): - # XXX:(Meredith): Passing through the entire event was an intentional choice - # to avoid having to query NodeStore again for data we had previously in - # post_process. While this is not ideal, changing this will most likely involve - # an overhaul of how we do things in post_process, not just this task alone. - instance = kwargs.get("instance") - else: - try: - instance = model.objects.get(id=instance_id) - except model.DoesNotExist as e: - # Explicitly requeue the task, so we don't report this to Sentry until - # we hit the max number of retries. - return retryer.retry(exc=e) - - event = f"{name}.{action}" - - if event not in VALID_EVENTS: - return - - org = None - - if isinstance(instance, (Group, Event, GroupEvent)): - org = Organization.objects.get_from_cache( - id=Project.objects.get_from_cache(id=instance.project_id).organization_id - ) - assert org, "organization must exist to get related sentry app installations" - installations: list[RpcSentryAppInstallation] = [ - installation - for installation in app_service.get_installed_for_organization(organization_id=org.id) - if event in installation.sentry_app.events - ] - - for installation in installations: - data = {} - if isinstance(instance, (Event, GroupEvent)): - assert instance.group_id, "group id is required to create webhook event data" - data[name] = _webhook_event_data(instance, instance.group_id, instance.project_id) - else: - data[name] = serialize(instance) - - # Trigger a new task for each webhook - send_resource_change_webhook.delay( - installation_id=installation.id, event=event, data=data - ) - @instrumented_task("sentry.tasks.process_resource_change_bound", bind=True, **TASK_OPTIONS) @retry_decorator def process_resource_change_bound( self: Task, action: str, sender: str, instance_id: int, **kwargs: Any ) -> None: - _process_resource_change( - action=action, sender=sender, instance_id=instance_id, retryer=self, **kwargs + new_process_resource_change_bound( + action=action, sender=sender, instance_id=instance_id, **kwargs ) @instrumented_task(name="sentry.tasks.sentry_apps.installation_webhook", **CONTROL_TASK_OPTIONS) @retry_decorator def installation_webhook(installation_id: int, user_id: int, *args: Any, **kwargs: Any) -> None: - from sentry.mediators.sentry_app_installations.installation_notifier import InstallationNotifier - - extra = {"installation_id": installation_id, "user_id": user_id} - try: - # we should send the webhook for pending installations on the install event in case that's part of the workflow - install = SentryAppInstallation.objects.get(id=installation_id) - except SentryAppInstallation.DoesNotExist: - logger.info("installation_webhook.missing_installation", extra=extra) - return - - user = user_service.get_user(user_id=user_id) - if not user: - logger.info("installation_webhook.missing_user", extra=extra) - return - - InstallationNotifier.run(install=install, user=user, action="created") + new_installation_webhook(installation_id=installation_id, user_id=user_id, *args, **kwargs) @instrumented_task( @@ -281,17 +69,7 @@ def clear_region_cache(sentry_app_id: int, region_name: str) -> None: def workflow_notification( installation_id: int, issue_id: int, type: str, user_id: int, *args: Any, **kwargs: Any ) -> None: - webhook_data = get_webhook_data(installation_id, issue_id, user_id) - if not webhook_data: - return - install, issue, user = webhook_data - data = kwargs.get("data", {}) - data.update({"issue": serialize(issue)}) - send_webhooks(installation=install, event=f"issue.{type}", data=data, actor=user) - analytics.record( - f"sentry_app.issue.{type}", - user_id=user_id, - group_id=issue_id, + new_workflow_notification( installation_id=installation_id, issue_id=issue_id, type=type, @@ -306,151 +84,24 @@ def workflow_notification( def build_comment_webhook( installation_id: int, issue_id: int, type: str, user_id: int, *args: Any, **kwargs: Any ) -> None: - webhook_data = get_webhook_data(installation_id, issue_id, user_id) - if not webhook_data: - return None - install, _, user = webhook_data - data = kwargs.get("data", {}) - project_slug = data.get("project_slug") - comment_id = data.get("comment_id") - payload = { - "comment_id": data.get("comment_id"), - "issue_id": issue_id, - "project_slug": data.get("project_slug"), - "timestamp": data.get("timestamp"), - "comment": data.get("comment"), - } - send_webhooks(installation=install, event=type, data=payload, actor=user) - # `type` is comment.created, comment.updated, or comment.deleted - analytics.record( - type, - user_id=user_id, - group_id=issue_id, - project_slug=project_slug, + new_build_comment_webhook( installation_id=installation_id, - comment_id=comment_id, + issue_id=issue_id, + type=type, + user_id=user_id, + *args, + **kwargs, ) -def get_webhook_data( - installation_id: int, issue_id: int, user_id: int -) -> tuple[RpcSentryAppInstallation, Group, RpcUser | None] | None: - extra = {"installation_id": installation_id, "issue_id": issue_id} - install = app_service.installation_by_id(id=installation_id) - if not install: - logger.info("workflow_notification.missing_installation", extra=extra) - return None - - try: - issue = Group.objects.get(id=issue_id) - except Group.DoesNotExist: - logger.info("workflow_notification.missing_issue", extra=extra) - return None - - user = None - if user_id: - user = user_service.get_user(user_id=user_id) - if not user: - logger.info("workflow_notification.missing_user", extra=extra) - - return (install, issue, user) - - @instrumented_task("sentry.tasks.send_process_resource_change_webhook", **TASK_OPTIONS) @retry_decorator def send_resource_change_webhook( installation_id: int, event: str, data: dict[str, Any], *args: Any, **kwargs: Any ) -> None: - installation = app_service.installation_by_id(id=installation_id) - if not installation: - logger.info( - "send_process_resource_change_webhook.missing_installation", - extra={"installation_id": installation_id, "event": event}, - ) - return - - send_webhooks(installation, event, data=data) - - metrics.incr("resource_change.processed", sample_rate=1.0, tags={"change_event": event}) - - -def notify_sentry_app(event: BaseEvent, futures): - for f in futures: - if not f.kwargs.get("sentry_app"): - continue - - extra_kwargs: dict[str, Any] = { - "additional_payload_key": None, - "additional_payload": None, - } - # If the future comes from a rule with a UI component form in the schema, append the issue alert payload - settings = f.kwargs.get("schema_defined_settings") - if settings: - extra_kwargs["additional_payload_key"] = "issue_alert" - extra_kwargs["additional_payload"] = { - "id": f.rule.id, - "title": f.rule.label, - "sentry_app_id": f.kwargs["sentry_app"].id, - "settings": settings, - } - - send_alert_event.delay( - event=event, - rule=f.rule.label, - sentry_app_id=f.kwargs["sentry_app"].id, - **extra_kwargs, - ) - - -def send_webhooks(installation: RpcSentryAppInstallation, event: str, **kwargs: Any) -> None: - servicehook: ServiceHook - try: - servicehook = ServiceHook.objects.get( - organization_id=installation.organization_id, actor_id=installation.id - ) - except ServiceHook.DoesNotExist: - logger.info( - "send_webhooks.missing_servicehook", - extra={"installation_id": installation.id, "event": event}, - ) - return - - if event not in servicehook.events: - return - - # The service hook applies to all projects if there are no - # ServiceHookProject records. Otherwise we want check if - # the event is within the allowed projects. - project_limited = ServiceHookProject.objects.filter(service_hook_id=servicehook.id).exists() - - # TODO(nola): This is disabled for now, because it could potentially affect internal integrations w/ error.created - # # If the event is error.created & the request is going out to the Org that owns the Sentry App, - # # Make sure we don't send the request, to prevent potential infinite loops - # if ( - # event == "error.created" - # and installation.organization_id == installation.sentry_app.owner_id - # ): - # # We just want to exclude error.created from the project that the integration lives in - # # Need to first implement project mapping for integration partners - # metrics.incr( - # "webhook_request.dropped", - # tags={"sentry_app": installation.sentry_app.id, "event": event}, - # ) - # return - - if not project_limited: - resource, action = event.split(".") - - kwargs["resource"] = resource - kwargs["action"] = action - kwargs["install"] = installation - - request_data = AppPlatformEvent(**kwargs) - send_and_save_webhook_request( - installation.sentry_app, - request_data, - installation.sentry_app.webhook_url, - ) + new_send_resource_change_webhook( + installation_id=installation_id, event=event, data=data, *args, **kwargs + ) @instrumented_task( diff --git a/src/sentry/tasks/servicehooks.py b/src/sentry/tasks/servicehooks.py index 8fb42b5d30b9d2..bb780370321f3b 100644 --- a/src/sentry/tasks/servicehooks.py +++ b/src/sentry/tasks/servicehooks.py @@ -1,29 +1,6 @@ -from time import time - -from sentry.api.serializers import serialize -from sentry.http import safe_urlopen -from sentry.sentry_apps.models.servicehook import ServiceHook +from sentry.sentry_apps.tasks.service_hooks import process_service_hook as new_process_service_hook from sentry.silo.base import SiloMode from sentry.tasks.base import instrumented_task, retry -from sentry.tsdb.base import TSDBModel -from sentry.utils import json - - -def get_payload_v0(event): - group = event.group - project = group.project - - group_context = serialize(group) - group_context["url"] = group.get_absolute_url() - - event_context = serialize(event) - event_context["url"] = f"{group.get_absolute_url()}events/{event.event_id}/" - data = { - "project": {"slug": project.slug, "name": project.name}, - "group": group_context, - "event": event_context, - } - return data @instrumented_task( @@ -34,27 +11,4 @@ def get_payload_v0(event): ) @retry def process_service_hook(servicehook_id, event, **kwargs): - try: - servicehook = ServiceHook.objects.get(id=servicehook_id) - except ServiceHook.DoesNotExist: - return - - if servicehook.version == 0: - payload = get_payload_v0(event) - else: - raise NotImplementedError - - from sentry import tsdb - - tsdb.backend.incr(TSDBModel.servicehook_fired, servicehook.id) - - headers = { - "Content-Type": "application/json", - "X-ServiceHook-Timestamp": str(int(time())), - "X-ServiceHook-GUID": servicehook.guid, - "X-ServiceHook-Signature": servicehook.build_signature(json.dumps(payload)), - } - - safe_urlopen( - url=servicehook.url, data=json.dumps(payload), headers=headers, timeout=5, verify_ssl=False - ) + new_process_service_hook(servicehook_id=servicehook_id, event=event, **kwargs) diff --git a/tests/sentry/receivers/outbox/test_control.py b/tests/sentry/receivers/outbox/test_control.py index 0c653d100b2704..7af298ddb6541a 100644 --- a/tests/sentry/receivers/outbox/test_control.py +++ b/tests/sentry/receivers/outbox/test_control.py @@ -36,7 +36,7 @@ def test_process_api_application_updates(self, mock_maybe_process): ApiApplication, self.identifier, region_name=_TEST_REGION.name ) - @patch("sentry.tasks.sentry_apps.region_caching_service") + @patch("sentry.sentry_apps.tasks.sentry_apps.region_caching_service") def test_process_sentry_app_updates(self, mock_caching): org = self.create_organization() sentry_app = self.create_sentry_app() diff --git a/tests/sentry/receivers/test_sentry_apps.py b/tests/sentry/receivers/test_sentry_apps.py index fd49ae74368c91..57b41fa2cfa181 100644 --- a/tests/sentry/receivers/test_sentry_apps.py +++ b/tests/sentry/receivers/test_sentry_apps.py @@ -24,7 +24,7 @@ from sentry.types.group import GroupSubStatus -@patch("sentry.tasks.sentry_apps.workflow_notification.delay") +@patch("sentry.sentry_apps.tasks.sentry_apps.workflow_notification.delay") class TestIssueWorkflowNotifications(APITestCase): def setUp(self): self.issue = self.create_group(project=self.project) @@ -230,7 +230,7 @@ def test_notify_pending_installation(self, delay): assert not delay.called -@patch("sentry.tasks.sentry_apps.workflow_notification.delay") +@patch("sentry.sentry_apps.tasks.sentry_apps.workflow_notification.delay") class TestIssueAssigned(APITestCase): def setUp(self): self.issue = self.create_group(project=self.project) @@ -301,7 +301,7 @@ def test_after_issue_assigned_with_enhanced_privacy(self, delay): ) -@patch("sentry.tasks.sentry_apps.build_comment_webhook.delay") +@patch("sentry.sentry_apps.tasks.sentry_apps.build_comment_webhook.delay") class TestComments(APITestCase): def setUp(self): self.issue = self.create_group(project=self.project) @@ -373,7 +373,7 @@ def test_comment_deleted(self, delay): ) -@patch("sentry.tasks.sentry_apps.workflow_notification.delay") +@patch("sentry.sentry_apps.tasks.sentry_apps.workflow_notification.delay") class TestIssueWorkflowNotificationsForSubscriptionFamily(APITestCase): def setUp(self): self.issue = self.create_group(project=self.project) diff --git a/tests/sentry/sentry_apps/tasks/test_sentry_apps.py b/tests/sentry/sentry_apps/tasks/test_sentry_apps.py index 68ba388532464b..89694e6d683f89 100644 --- a/tests/sentry/sentry_apps/tasks/test_sentry_apps.py +++ b/tests/sentry/sentry_apps/tasks/test_sentry_apps.py @@ -272,7 +272,7 @@ def test_does_not_process_sentry_apps_without_issue_webhooks(self, safe_urlopen) assert len(safe_urlopen.mock_calls) == 0 - @patch("sentry.tasks.sentry_apps._process_resource_change") + @patch("sentry.sentry_apps.tasks.sentry_apps._process_resource_change") def test_process_resource_change_bound_passes_retry_object(self, process, safe_urlopen): group = self.create_group(project=self.project) diff --git a/tests/sentry/sentry_apps/tasks/test_servicehooks.py b/tests/sentry/sentry_apps/tasks/test_servicehooks.py index b3ca386f3f19ee..8c0544a6479d7d 100644 --- a/tests/sentry/sentry_apps/tasks/test_servicehooks.py +++ b/tests/sentry/sentry_apps/tasks/test_servicehooks.py @@ -15,7 +15,7 @@ class TestServiceHooks(TestCase): def setUp(self): self.hook = self.create_service_hook(project=self.project, events=("issue.created",)) - @patch("sentry.tasks.servicehooks.safe_urlopen") + @patch("sentry.sentry_apps.tasks.service_hooks.safe_urlopen") @responses.activate def test_verify_sentry_hook_signature(self, safe_urlopen): import hmac @@ -36,7 +36,7 @@ def test_verify_sentry_hook_signature(self, safe_urlopen): ((_, kwargs),) = safe_urlopen.call_args_list assert expected == kwargs["headers"]["X-ServiceHook-Signature"] - @patch("sentry.tasks.servicehooks.safe_urlopen") + @patch("sentry.sentry_apps.tasks.service_hooks.safe_urlopen") @responses.activate def test_event_created_sends_service_hook(self, safe_urlopen): self.hook.update(events=["event.created", "event.alert"]) diff --git a/tests/sentry/sentry_apps/test_sentry_app_creator.py b/tests/sentry/sentry_apps/test_sentry_app_creator.py index 8452020fb0bbd8..9e623a3691885d 100644 --- a/tests/sentry/sentry_apps/test_sentry_app_creator.py +++ b/tests/sentry/sentry_apps/test_sentry_app_creator.py @@ -203,7 +203,7 @@ def test_author(self): sentry_app = self.run_creator(author="custom") assert sentry_app.author == "custom" - @patch("sentry.tasks.sentry_apps.installation_webhook.delay") + @patch("sentry.sentry_apps.tasks.sentry_apps.installation_webhook.delay") def test_does_not_notify_service(self, delay): self.run_creator() assert not len(delay.mock_calls) diff --git a/tests/sentry/tasks/test_post_process.py b/tests/sentry/tasks/test_post_process.py index 530d383dcc3853..a588cef983884d 100644 --- a/tests/sentry/tasks/test_post_process.py +++ b/tests/sentry/tasks/test_post_process.py @@ -116,8 +116,8 @@ def call_post_process_group( class CorePostProcessGroupTestMixin(BasePostProgressGroupMixin): @patch("sentry.rules.processing.processor.RuleProcessor") - @patch("sentry.tasks.servicehooks.process_service_hook") - @patch("sentry.tasks.sentry_apps.process_resource_change_bound.delay") + @patch("sentry.sentry_apps.tasks.service_hooks.process_service_hook") + @patch("sentry.sentry_apps.tasks.sentry_apps.process_resource_change_bound.delay") @patch("sentry.signals.event_processed.send_robust") def test_issueless( self, @@ -543,7 +543,7 @@ def test_group_last_seen_buffer(self, mock_processor): class ServiceHooksTestMixin(BasePostProgressGroupMixin): - @patch("sentry.tasks.servicehooks.process_service_hook") + @patch("sentry.sentry_apps.tasks.service_hooks.process_service_hook") def test_service_hook_fires_on_new_event(self, mock_process_service_hook): event = self.create_event(data={}, project_id=self.project.id) hook = self.create_service_hook( @@ -565,7 +565,7 @@ def test_service_hook_fires_on_new_event(self, mock_process_service_hook): servicehook_id=hook.id, event=EventMatcher(event) ) - @patch("sentry.tasks.servicehooks.process_service_hook") + @patch("sentry.sentry_apps.tasks.service_hooks.process_service_hook") @patch("sentry.rules.processing.processor.RuleProcessor") def test_service_hook_fires_on_alert(self, mock_processor, mock_process_service_hook): event = self.create_event(data={}, project_id=self.project.id) @@ -594,7 +594,7 @@ def test_service_hook_fires_on_alert(self, mock_processor, mock_process_service_ servicehook_id=hook.id, event=EventMatcher(event) ) - @patch("sentry.tasks.servicehooks.process_service_hook") + @patch("sentry.sentry_apps.tasks.service_hooks.process_service_hook") @patch("sentry.rules.processing.processor.RuleProcessor") def test_service_hook_does_not_fire_without_alert( self, mock_processor, mock_process_service_hook @@ -620,7 +620,7 @@ def test_service_hook_does_not_fire_without_alert( assert not mock_process_service_hook.delay.mock_calls - @patch("sentry.tasks.servicehooks.process_service_hook") + @patch("sentry.sentry_apps.tasks.service_hooks.process_service_hook") def test_service_hook_does_not_fire_without_event(self, mock_process_service_hook): event = self.create_event(data={}, project_id=self.project.id) @@ -640,7 +640,7 @@ def test_service_hook_does_not_fire_without_event(self, mock_process_service_hoo class ResourceChangeBoundsTestMixin(BasePostProgressGroupMixin): - @patch("sentry.tasks.sentry_apps.process_resource_change_bound.delay") + @patch("sentry.sentry_apps.tasks.sentry_apps.process_resource_change_bound.delay") def test_processes_resource_change_task_on_new_group(self, delay): event = self.create_event(data={}, project_id=self.project.id) group = event.group @@ -654,7 +654,7 @@ def test_processes_resource_change_task_on_new_group(self, delay): delay.assert_called_once_with(action="created", sender="Group", instance_id=group.id) @with_feature("organizations:integrations-event-hooks") - @patch("sentry.tasks.sentry_apps.process_resource_change_bound.delay") + @patch("sentry.sentry_apps.tasks.sentry_apps.process_resource_change_bound.delay") def test_processes_resource_change_task_on_error_events(self, delay): event = self.create_event( data={ @@ -689,7 +689,7 @@ def test_processes_resource_change_task_on_error_events(self, delay): ) @with_feature("organizations:integrations-event-hooks") - @patch("sentry.tasks.sentry_apps.process_resource_change_bound.delay") + @patch("sentry.sentry_apps.tasks.sentry_apps.process_resource_change_bound.delay") def test_processes_resource_change_task_not_called_for_non_errors(self, delay): event = self.create_event( data={ @@ -710,7 +710,7 @@ def test_processes_resource_change_task_not_called_for_non_errors(self, delay): assert not delay.called - @patch("sentry.tasks.sentry_apps.process_resource_change_bound.delay") + @patch("sentry.sentry_apps.tasks.sentry_apps.process_resource_change_bound.delay") def test_processes_resource_change_task_not_called_without_feature_flag(self, delay): event = self.create_event( data={ @@ -732,7 +732,7 @@ def test_processes_resource_change_task_not_called_without_feature_flag(self, de assert not delay.called @with_feature("organizations:integrations-event-hooks") - @patch("sentry.tasks.sentry_apps.process_resource_change_bound.delay") + @patch("sentry.sentry_apps.tasks.sentry_apps.process_resource_change_bound.delay") def test_processes_resource_change_task_not_called_without_error_created(self, delay): event = self.create_event( data={ From 92bfa2ccd0e4f67a0fe393352e7aba4e895fddf3 Mon Sep 17 00:00:00 2001 From: Christinarlong Date: Fri, 4 Oct 2024 09:43:35 -0700 Subject: [PATCH 09/13] update path for pickle task --- src/sentry/celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/celery.py b/src/sentry/celery.py index 384ec1802abaaa..d3ce67dcaeeec9 100644 --- a/src/sentry/celery.py +++ b/src/sentry/celery.py @@ -14,7 +14,7 @@ [ # basic tasks that must be passed models still "sentry.tasks.process_buffer.process_incr", - "sentry.tasks.process_resource_change_bound", + "sentry.sentry_apps.tasks.sentry_apps.process_resource_change_bound", "sentry.sentry_apps.tasks.sentry_apps.send_alert_event", "sentry.tasks.unmerge", "src.sentry.notifications.utils.async_send_notification", From 7edc098c56424c0dd6594bcc4df23f4b796e3d03 Mon Sep 17 00:00:00 2001 From: Christinarlong Date: Fri, 4 Oct 2024 11:59:45 -0700 Subject: [PATCH 10/13] union type on notify_sentry_app instead of parent type --- src/sentry/sentry_apps/tasks/sentry_apps.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/sentry/sentry_apps/tasks/sentry_apps.py b/src/sentry/sentry_apps/tasks/sentry_apps.py index 0e775ace10912e..e068f0336a03e0 100644 --- a/src/sentry/sentry_apps/tasks/sentry_apps.py +++ b/src/sentry/sentry_apps/tasks/sentry_apps.py @@ -14,7 +14,6 @@ from sentry.constants import SentryAppInstallationStatus from sentry.db.models.base import Model from sentry.eventstore.models import Event, GroupEvent -from sentry.eventtypes.base import BaseEvent from sentry.hybridcloud.rpc.caching import region_caching_service from sentry.models.activity import Activity from sentry.models.group import Group @@ -103,7 +102,7 @@ def _webhook_event_data( @instrumented_task(name="sentry.sentry_apps.tasks.sentry_apps.send_alert_event", **TASK_OPTIONS) @retry_decorator def send_alert_event( - event: Event, + event: Event | GroupEvent, rule: str, sentry_app_id: int, additional_payload_key: str | None = None, @@ -419,7 +418,7 @@ def send_resource_change_webhook( metrics.incr("resource_change.processed", sample_rate=1.0, tags={"change_event": event}) -def notify_sentry_app(event: BaseEvent, futures): +def notify_sentry_app(event: Event | GroupEvent, futures): for f in futures: if not f.kwargs.get("sentry_app"): continue From cf873c19cae042b9f378213f10f81392364535d2 Mon Sep 17 00:00:00 2001 From: Christinarlong <60594860+Christinarlong@users.noreply.github.com> Date: Fri, 4 Oct 2024 15:13:53 -0700 Subject: [PATCH 11/13] Update src/sentry/sentry_apps/tasks/sentry_apps.py Co-authored-by: Mark Story --- src/sentry/sentry_apps/tasks/sentry_apps.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/sentry_apps/tasks/sentry_apps.py b/src/sentry/sentry_apps/tasks/sentry_apps.py index e068f0336a03e0..9c3978b482e8fa 100644 --- a/src/sentry/sentry_apps/tasks/sentry_apps.py +++ b/src/sentry/sentry_apps/tasks/sentry_apps.py @@ -446,7 +446,7 @@ def notify_sentry_app(event: Event | GroupEvent, futures): ) -def send_webhooks(installation, event, **kwargs): +def send_webhooks(installation: RpcSentryAppInstallation, event: str, **kwargs: Any): servicehook: ServiceHook try: servicehook = ServiceHook.objects.get( From 4104a2edec8a409ed8266b060c4d1deeb13ec42a Mon Sep 17 00:00:00 2001 From: Christinarlong Date: Fri, 4 Oct 2024 15:15:07 -0700 Subject: [PATCH 12/13] update name to match mster --- src/sentry/sentry_apps/tasks/sentry_apps.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sentry/sentry_apps/tasks/sentry_apps.py b/src/sentry/sentry_apps/tasks/sentry_apps.py index 9c3978b482e8fa..e86cd4373a2429 100644 --- a/src/sentry/sentry_apps/tasks/sentry_apps.py +++ b/src/sentry/sentry_apps/tasks/sentry_apps.py @@ -399,7 +399,7 @@ def get_webhook_data( @instrumented_task( - "sentry.sentry_apps.tasks.sentry_apps.send_process_resource_change_webhook", **TASK_OPTIONS + "sentry.sentry_apps.tasks.sentry_apps.send_resource_change_webhook", **TASK_OPTIONS ) @retry_decorator def send_resource_change_webhook( @@ -408,7 +408,7 @@ def send_resource_change_webhook( installation = app_service.installation_by_id(id=installation_id) if not installation: logger.info( - "send_process_resource_change_webhook.missing_installation", + "send_resource_change_webhook.missing_installation", extra={"installation_id": installation_id, "event": event}, ) return From 21f519295c3e8cf928bb0067f6a3e2f315e6639a Mon Sep 17 00:00:00 2001 From: Christinarlong Date: Fri, 4 Oct 2024 15:17:23 -0700 Subject: [PATCH 13/13] more typing --- src/sentry/sentry_apps/tasks/sentry_apps.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sentry/sentry_apps/tasks/sentry_apps.py b/src/sentry/sentry_apps/tasks/sentry_apps.py index e86cd4373a2429..ffb0b1d4b2a8a5 100644 --- a/src/sentry/sentry_apps/tasks/sentry_apps.py +++ b/src/sentry/sentry_apps/tasks/sentry_apps.py @@ -446,7 +446,7 @@ def notify_sentry_app(event: Event | GroupEvent, futures): ) -def send_webhooks(installation: RpcSentryAppInstallation, event: str, **kwargs: Any): +def send_webhooks(installation: RpcSentryAppInstallation, event: str, **kwargs: Any) -> None: servicehook: ServiceHook try: servicehook = ServiceHook.objects.get( @@ -457,10 +457,10 @@ def send_webhooks(installation: RpcSentryAppInstallation, event: str, **kwargs: "send_webhooks.missing_servicehook", extra={"installation_id": installation.id, "event": event}, ) - return + return None if event not in servicehook.events: - return + return None # The service hook applies to all projects if there are no # ServiceHookProject records. Otherwise we want check if