From 982a11513a2a31f76ef571a0d0de510d69abbd89 Mon Sep 17 00:00:00 2001 From: jlewitt1 Date: Mon, 23 Sep 2024 18:33:04 +0300 Subject: [PATCH] separate autostop check into dedicated thread --- runhouse/servers/cluster_servlet.py | 57 ++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/runhouse/servers/cluster_servlet.py b/runhouse/servers/cluster_servlet.py index 7f225b740..0cbc66519 100644 --- a/runhouse/servers/cluster_servlet.py +++ b/runhouse/servers/cluster_servlet.py @@ -100,6 +100,12 @@ async def __init__( ) self.cluster_checks_thread.start() + logger.debug("Creating autostop check thread.") + self.autostop_check_thread = threading.Thread( + target=self.periodic_autostop_check, daemon=True + ) + self.autostop_check_thread.start() + ############################################## # Cluster config state storage methods ############################################## @@ -330,6 +336,42 @@ async def acheck_cluster_logs(self, interval_size: int): return logs_den_resp, new_start_log_line, new_end_log_line + async def aperiodic_autostop_check(self): + """Periodically check the autostop of the cluster""" + while True: + should_update_autostop: bool = self.autostop_helper is not None + cluster_config = await self.aget_cluster_config() + interval_size = cluster_config.get( + "status_check_interval", DEFAULT_STATUS_CHECK_INTERVAL + ) + + if not should_update_autostop: + logger.debug( + f"Not updating autostop, checking again in {interval_size} minutes" + ) + break + + try: + status, den_resp_code = await self.acheck_cluster_status( + send_to_den=False + ) + + if should_update_autostop: + logger.debug("Updating autostop") + await self._update_autostop(status) + + if interval_size == -1: + continue + + except Exception as e: + logger.error( + f"Autostop check has failed: {e}.\n" + "Please check cluster logs for more info.\n" + ) + + finally: + await asyncio.sleep(interval_size) + async def aperiodic_cluster_checks(self): """Periodically check the status of the cluster, gather metrics about the cluster's utilization & memory, and save it to Den.""" @@ -338,9 +380,8 @@ async def aperiodic_cluster_checks(self): should_send_status_and_logs_to_den: bool = ( configs.token is not None and self._cluster_uri is not None ) - should_update_autostop: bool = self.autostop_helper is not None - if not should_send_status_and_logs_to_den and not should_update_autostop: + if not should_send_status_and_logs_to_den: break cluster_config = await self.aget_cluster_config() @@ -353,10 +394,6 @@ async def aperiodic_cluster_checks(self): send_to_den=should_send_status_and_logs_to_den ) - if should_update_autostop: - logger.debug("Updating autostop") - await self._update_autostop(status) - if interval_size == -1 or not should_send_status_and_logs_to_den: continue @@ -404,10 +441,9 @@ async def aperiodic_cluster_checks(self): if den_resp_code == 200: await self.acheck_cluster_logs(interval_size=interval_size) - except Exception: + except Exception as e: logger.error( - "Cluster checks have failed.\n" - "Please check cluster logs for more info.\n" + f"Cluster checks have failed: {e}.\n" "Temporarily increasing the interval between status checks." ) await asyncio.sleep(INCREASED_STATUS_CHECK_INTERVAL) @@ -427,6 +463,9 @@ def periodic_cluster_checks(self): # sync_function. asyncio.run(self.aperiodic_cluster_checks()) + def periodic_autostop_check(self): + asyncio.run(self.aperiodic_autostop_check()) + async def _update_autostop(self, status: dict): function_running = any( any(