Skip to content

Commit

Permalink
separate autostop check into dedicated thread
Browse files Browse the repository at this point in the history
  • Loading branch information
jlewitt1 committed Sep 23, 2024
1 parent 7816d66 commit 982a115
Showing 1 changed file with 48 additions and 9 deletions.
57 changes: 48 additions & 9 deletions runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ async def __init__(
)
self.cluster_checks_thread.start()

logger.debug("Creating autostop check thread.")
self.autostop_check_thread = threading.Thread(
target=self.periodic_autostop_check, daemon=True
)
self.autostop_check_thread.start()

##############################################
# Cluster config state storage methods
##############################################
Expand Down Expand Up @@ -330,6 +336,42 @@ async def acheck_cluster_logs(self, interval_size: int):

return logs_den_resp, new_start_log_line, new_end_log_line

async def aperiodic_autostop_check(self):
"""Periodically check the autostop of the cluster"""
while True:
should_update_autostop: bool = self.autostop_helper is not None
cluster_config = await self.aget_cluster_config()
interval_size = cluster_config.get(
"status_check_interval", DEFAULT_STATUS_CHECK_INTERVAL
)

if not should_update_autostop:
logger.debug(
f"Not updating autostop, checking again in {interval_size} minutes"
)
break

try:
status, den_resp_code = await self.acheck_cluster_status(
send_to_den=False
)

if should_update_autostop:
logger.debug("Updating autostop")
await self._update_autostop(status)

if interval_size == -1:
continue

except Exception as e:
logger.error(
f"Autostop check has failed: {e}.\n"
"Please check cluster logs for more info.\n"
)

finally:
await asyncio.sleep(interval_size)

async def aperiodic_cluster_checks(self):
"""Periodically check the status of the cluster, gather metrics about the cluster's utilization & memory,
and save it to Den."""
Expand All @@ -338,9 +380,8 @@ async def aperiodic_cluster_checks(self):
should_send_status_and_logs_to_den: bool = (
configs.token is not None and self._cluster_uri is not None
)
should_update_autostop: bool = self.autostop_helper is not None

if not should_send_status_and_logs_to_den and not should_update_autostop:
if not should_send_status_and_logs_to_den:
break

cluster_config = await self.aget_cluster_config()
Expand All @@ -353,10 +394,6 @@ async def aperiodic_cluster_checks(self):
send_to_den=should_send_status_and_logs_to_den
)

if should_update_autostop:
logger.debug("Updating autostop")
await self._update_autostop(status)

if interval_size == -1 or not should_send_status_and_logs_to_den:
continue

Expand Down Expand Up @@ -404,10 +441,9 @@ async def aperiodic_cluster_checks(self):
if den_resp_code == 200:
await self.acheck_cluster_logs(interval_size=interval_size)

except Exception:
except Exception as e:
logger.error(
"Cluster checks have failed.\n"
"Please check cluster logs for more info.\n"
f"Cluster checks have failed: {e}.\n"
"Temporarily increasing the interval between status checks."
)
await asyncio.sleep(INCREASED_STATUS_CHECK_INTERVAL)
Expand All @@ -427,6 +463,9 @@ def periodic_cluster_checks(self):
# sync_function.
asyncio.run(self.aperiodic_cluster_checks())

def periodic_autostop_check(self):
asyncio.run(self.aperiodic_autostop_check())

async def _update_autostop(self, status: dict):
function_running = any(
any(
Expand Down

0 comments on commit 982a115

Please sign in to comment.