-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core] ocean core next resync #835
Changes from 47 commits
f1c9d90
446dbb0
329245c
d73a3ec
076cbae
383d99b
b7b9c4a
79fde73
3f82fc2
d208f84
b33fbfb
521c4cb
44b2c68
253440a
a3b1961
89d5e27
b756c67
47312d4
a98e22f
048b687
dda5eef
0b1cc58
1bb8ed1
d38271c
14c4da0
bde30d9
becf0b1
058fd22
b939443
baba43e
6abb7b7
bab26f3
98978ff
7aad044
996be85
b0b44d4
1325efa
930ea51
8f9a22e
4316fde
c5d20d3
e6b3626
d9a21c8
4628fe4
425b9ec
c272e5c
b4d6fc5
8f66686
53a40c3
5595405
f614823
e442985
a8e022c
6d1e81f
35c5688
0ef8b52
23d4ee9
975f226
adb7967
661bea8
7b229fd
0a50bff
36774ea
e793681
19f93df
21848be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -13,6 +13,7 @@ | |||||
get_internal_http_client, | ||||||
) | ||||||
from port_ocean.exceptions.clients import KafkaCredentialsNotFound | ||||||
from typing import Any | ||||||
|
||||||
|
||||||
class PortClient( | ||||||
|
@@ -75,3 +76,13 @@ async def get_org_id(self) -> str: | |||||
handle_status_code(response) | ||||||
|
||||||
return response.json()["organization"]["id"] | ||||||
|
||||||
async def update_integration_state(self, state: dict[str, Any]) -> dict[str, Any]: | ||||||
logger.debug(f"Updating integration state with: {state}") | ||||||
response = await self.client.patch( | ||||||
f"{self.api_url}/integration/{self.integration_identifier}/state", | ||||||
headers=await self.auth.headers(), | ||||||
json=state, | ||||||
) | ||||||
handle_status_code(response) | ||||||
return response.json().get("integration", {}) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
import datetime | ||
import signal | ||
from typing import Literal, Any | ||
|
||
|
@@ -9,6 +10,8 @@ | |
EventListenerSettings, | ||
) | ||
from port_ocean.utils.repeat import repeat_every | ||
from port_ocean.context.ocean import ocean | ||
from port_ocean.utils.time import convert_str_to_utc_datetime, convert_to_minutes | ||
|
||
|
||
class OnceEventListenerSettings(EventListenerSettings): | ||
|
@@ -41,6 +44,93 @@ def __init__( | |
): | ||
super().__init__(events) | ||
self.event_listener_config = event_listener_config | ||
self.cached_integration: dict[str, Any] | None = None | ||
|
||
async def get_current_integration_cached(self) -> dict[str, Any]: | ||
if self.cached_integration: | ||
return self.cached_integration | ||
|
||
self.cached_integration = await ocean.port_client.get_current_integration() | ||
return self.cached_integration | ||
|
||
async def get_saas_resync_initialization_and_interval( | ||
self, | ||
) -> tuple[int | None, datetime.datetime | None]: | ||
""" | ||
Get the scheduled resync interval and the last updated time of the integration config for the saas application. | ||
interval is the saas configured resync interval time. | ||
start_time is the last updated time of the integration config. | ||
return: (interval, start_time) | ||
""" | ||
if not ocean.app.is_saas(): | ||
return (None, None) | ||
|
||
try: | ||
integration = await self.get_current_integration_cached() | ||
except Exception: | ||
logger.exception("Error occurred while getting current integration") | ||
return (None, None) | ||
|
||
interval_str = ( | ||
integration.get("spec", {}) | ||
.get("appSpec", {}) | ||
.get("scheduledResyncInterval") | ||
) | ||
|
||
if not interval_str: | ||
logger.error("scheduledResyncInterval not found for integration") | ||
return (None, None) | ||
|
||
last_updated_saas_integration_config_str = integration.get( | ||
"statusInfo", {} | ||
).get("updatedAt") | ||
|
||
# we use the last updated time of the integration config as the start time since in saas application the interval is configured by the user from the portal | ||
if not last_updated_saas_integration_config_str: | ||
logger.error("updatedAt not found for integration") | ||
return (None, None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the effect and whether there are any action items? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for example, more detailed log for whether this was expected or not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
|
||
return ( | ||
convert_to_minutes(interval_str), | ||
convert_str_to_utc_datetime(last_updated_saas_integration_config_str), | ||
) | ||
|
||
async def _before_resync(self) -> None: | ||
if not ocean.app.is_saas(): | ||
# in case of non-saas, we still want to update the state before and after the resync | ||
await super()._before_resync() | ||
return | ||
|
||
(interval, start_time) = ( | ||
await self.get_saas_resync_initialization_and_interval() | ||
) | ||
await ocean.app.update_state_before_scheduled_sync(interval, start_time) | ||
|
||
async def _after_resync(self) -> None: | ||
if not ocean.app.is_saas(): | ||
# in case of non-saas, we still want to update the state before and after the resync | ||
await super()._after_resync() | ||
return | ||
|
||
(interval, start_time) = ( | ||
await self.get_saas_resync_initialization_and_interval() | ||
) | ||
await ocean.app.update_state_after_scheduled_sync( | ||
"completed", interval, start_time | ||
) | ||
|
||
async def _on_resync_failure(self, e: Exception) -> None: | ||
if not ocean.app.is_saas(): | ||
# in case of non-saas, we still want to update the state before and after the resync | ||
await super()._after_resync() | ||
return | ||
|
||
(interval, start_time) = ( | ||
await self.get_saas_resync_initialization_and_interval() | ||
) | ||
await ocean.app.update_state_after_scheduled_sync( | ||
"failed", interval, start_time | ||
) | ||
|
||
async def _start(self) -> None: | ||
""" | ||
|
@@ -53,7 +143,7 @@ async def _start(self) -> None: | |
async def resync_and_exit() -> None: | ||
logger.info("Once event listener started") | ||
try: | ||
await self.events["on_resync"]({}) | ||
await self._resync({}) | ||
except Exception: | ||
# we catch all exceptions here to make sure the application will exit gracefully | ||
logger.exception("Error occurred while resyncing") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sure that
should_log
andshould_raise
are parameters that can be passedThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
We don't need this to be changed, these parameters stay constant across event-listener
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is code in the client class, therefor it should be an interface for other uses as well. and therefor each call to the client should decide whether it wants to raise the error or not