Skip to content

Commit

Permalink
fix: PR issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Shalev Avhar committed Aug 14, 2024
1 parent e2f64fb commit 607f04e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 10 deletions.
14 changes: 12 additions & 2 deletions port_ocean/core/event_listener/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ async def _after_resync(self) -> None:
"""
await ocean.app.update_state_after_scheduled_sync()

async def _on_resync_failure(self, e: Exception) -> None:
"""
Can be used for event listeners that need to handle resync failures.
"""
await ocean.app.update_state_after_scheduled_sync("failed")

async def _resync(
self,
resync_args: dict[Any, Any],
Expand All @@ -57,8 +63,12 @@ async def _resync(
Triggers the "on_resync" event.
"""
await self._before_resync()
await self.events["on_resync"](resync_args)
await self._after_resync()
try:
await self.events["on_resync"](resync_args)
await self._after_resync()
except Exception as e:
await self._on_resync_failure(e)
raise e


class EventListenerSettings(BaseOceanModel, extra=Extra.allow):
Expand Down
33 changes: 29 additions & 4 deletions port_ocean/core/event_listener/once.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,15 @@ async def get_current_integration_cached(self) -> dict[str, Any]:
self.cached_integration = await ocean.port_client.get_current_integration()
return self.cached_integration

async def get_saas_integration_prediction_data(
async def get_saas_resync_initialization_and_interval(
self,
) -> tuple[int | None, datetime.datetime | None]:
"""
Get the scheduled resync interval and the last updated time of the integration config for the saas application.
interval is the saas configured resync interval time.
start_time is the last updated time of the integration config.
return: (interval, start_time)
"""
if not ocean.app.is_saas():
return (None, None)

Expand Down Expand Up @@ -95,7 +101,9 @@ async def _before_resync(self) -> None:
await super()._before_resync()
return

(interval, start_time) = await self.get_saas_integration_prediction_data()
(interval, start_time) = (
await self.get_saas_resync_initialization_and_interval()
)
await ocean.app.update_state_before_scheduled_sync(interval, start_time)

async def _after_resync(self) -> None:
Expand All @@ -104,8 +112,25 @@ async def _after_resync(self) -> None:
await super()._after_resync()
return

(interval, start_time) = await self.get_saas_integration_prediction_data()
await ocean.app.update_state_after_scheduled_sync(interval, start_time)
(interval, start_time) = (
await self.get_saas_resync_initialization_and_interval()
)
await ocean.app.update_state_after_scheduled_sync(
"completed", interval, start_time
)

async def _on_resync_failure(self, e: Exception) -> None:
if not ocean.app.is_saas():
# in case of non-saas, we still want to update the state before and after the resync
await super()._after_resync()
return

(interval, start_time) = (
await self.get_saas_resync_initialization_and_interval()
)
await ocean.app.update_state_after_scheduled_sync(
"failed", interval, start_time
)

async def _start(self) -> None:
"""
Expand Down
13 changes: 9 additions & 4 deletions port_ocean/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
import threading
from contextlib import asynccontextmanager
from typing import Callable, Any, Dict, AsyncIterator, Type
from typing import Callable, Any, Dict, AsyncIterator, Literal, Type

from fastapi import FastAPI, APIRouter
from loguru import logger
Expand Down Expand Up @@ -108,14 +108,15 @@ async def update_state_before_scheduled_sync(

async def update_state_after_scheduled_sync(
self,
status: Literal["completed", "failed"] = "completed",
interval: int | None = None,
custom_start_time: datetime.datetime | None = None,
) -> None:
_interval = interval or self.config.scheduled_resync_interval

integration = await self.port_client.update_integration_state(
{
"status": "completed",
"status": status,
"last_resync_start": (
self.last_resync_start.timestamp()
if self.last_resync_start
Expand All @@ -136,8 +137,12 @@ async def _setup_scheduled_resync(
async def execute_resync_all() -> None:
await self.update_state_before_scheduled_sync()
logger.info("Starting a new scheduled resync")
await self.integration.sync_raw_all()
await self.update_state_after_scheduled_sync()
try:
await self.integration.sync_raw_all()
await self.update_state_after_scheduled_sync()
except Exception as e:
await self.update_state_after_scheduled_sync("failed")
raise e

interval = self.config.scheduled_resync_interval
if interval is not None:
Expand Down

0 comments on commit 607f04e

Please sign in to comment.