Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create an enable_telemetry flag for clusters #1128

Merged
merged 1 commit into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/setup_rh_config/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ runs:
echo "username: ${{ inputs.username }}" >> ~/.rh/config.yaml
echo "api_server_url: ${{ inputs.api_server_url }}" >> ~/.rh/config.yaml
echo "autosave: false" >> ~/.rh/config.yaml
echo "disable_observability: false" >> ~/.rh/config.yaml
15 changes: 15 additions & 0 deletions docs/security-and-authentication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,18 @@ local Runhouse config (:code:`~/.rh/config.yaml`), or in Python:
import runhouse as rh
rh.configs.disable_data_collection()
Cluster Observability
---------------------------------------
Runhouse collects various telemetry data by default on clusters. This data will be used to provide better observability
into logs, traces, and metrics associated with clusters. We will not sell data or buy any observability data collected.

To disable observability globally for all clusters, set the environment variable :code:`disable_observability`
to :code:`True`. Alternatively, set :code:`disable_observability` to :code:`true` in your
local Runhouse config (:code:`~/.rh/config.yaml`), or in Python:
BelSasha marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: python
import runhouse as rh
rh.configs.disable_observability()
12 changes: 11 additions & 1 deletion runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,10 @@ def _sync_default_env_to_cluster(self):
self._default_env.add_env_var("RH_LOG_LEVEL", log_level)
logger.info(f"Using log level {log_level} on cluster's default env")

if not configs.observability_enabled:
self._default_env.add_env_var("disable_observability", "True")
logger.info("Disabling observability on the cluster")

logger.info(f"Syncing default env {self._default_env.name} to cluster")
for node in self.ips:
self._default_env.install(cluster=self, node=node)
Expand Down Expand Up @@ -828,7 +832,6 @@ def status(self, send_to_den: bool = False):
)

if send_to_den:

if den_resp_status_code == 404:
logger.info(
"Cluster has not yet been saved to Den, cannot update status or logs."
Expand All @@ -837,6 +840,13 @@ def status(self, send_to_den: bool = False):
elif den_resp_status_code != 200:
logger.warning("Failed to send cluster status to Den")

if not configs.observability_enabled and status.get("env_servlet_processes"):
logger.warning(
"Cluster observability is not currently enabled. Metrics are stale and will "
"no longer be collected. To re-enable observability, please "
"run `rh.configs.enable_observability()` and restart the server (`cluster.restart_server()`)."
)

return status

def ssh_tunnel(
Expand Down
13 changes: 13 additions & 0 deletions runhouse/rns/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Defaults:
"use_spot": False,
"use_local_configs": True,
"disable_data_collection": False,
BelSasha marked this conversation as resolved.
Show resolved Hide resolved
"disable_observability": False,
"use_rns": False,
"api_server_url": "https://api.run.house",
"dashboard_url": "https://run.house",
Expand Down Expand Up @@ -290,3 +291,15 @@ def data_collection_enabled(self) -> bool:
return False

return True

@property
def observability_enabled(self):
BelSasha marked this conversation as resolved.
Show resolved Hide resolved
return not self.get("disable_observability", False)

def disable_observability(self):
jlewitt1 marked this conversation as resolved.
Show resolved Hide resolved
self.set("disable_observability", True)
os.environ["disable_observability"] = "True"

def enable_observability(self):
self.set("disable_observability", False)
os.environ["disable_observability"] = "False"
55 changes: 26 additions & 29 deletions runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy
import datetime
import json
import os
import threading
from typing import Any, Dict, List, Optional, Set, Tuple, Union

Expand Down Expand Up @@ -63,7 +64,7 @@ async def __init__(

self._initialized_env_servlet_names: Set[str] = set()
self._key_to_env_servlet_name: Dict[Any, str] = {}
self._auth_cache: AuthCache = AuthCache(cluster_config)
self._auth_cache: AuthCache = AuthCache(self.cluster_config)
self.autostop_helper = None

if cluster_config.get("resource_subtype", None) == "OnDemandCluster":
Expand All @@ -82,9 +83,9 @@ async def __init__(
self.pid = get_pid()
self.process = psutil.Process(pid=self.pid)
self.gpu_metrics = None # will be updated only if this is a gpu cluster.
self.lock = (
threading.Lock()
) # will be used when self.gpu_metrics will be updated by different threads.

# will be used when self.gpu_metrics will be updated by different threads.
self.lock = threading.Lock()

if self.cluster_config.get("has_cuda"):
logger.debug("Creating _periodic_gpu_check thread.")
Expand Down Expand Up @@ -253,7 +254,6 @@ async def aclear_all_references_to_env_servlet_name(self, env_servlet_name: str)
##############################################
# Periodic Cluster Checks APIs
##############################################

async def asave_status_metrics_to_den(self, status: dict):
from runhouse.resources.hardware.utils import ResourceServerStatus

Expand Down Expand Up @@ -283,7 +283,6 @@ def save_status_metrics_to_den(self, status: dict):
return sync_function(self.asave_status_metrics_to_den)(status)

async def acheck_cluster_status(self, send_to_den: bool = True):

logger.debug("Performing cluster status checks")
status, den_resp_status_code = await self.astatus(send_to_den=send_to_den)

Expand All @@ -302,7 +301,6 @@ async def acheck_cluster_status(self, send_to_den: bool = True):
return status, den_resp_status_code

async def acheck_cluster_logs(self, interval_size: int):

logger.debug("Performing logs checks")

cluster_config = await self.aget_cluster_config()
Expand Down Expand Up @@ -335,7 +333,7 @@ async def acheck_cluster_logs(self, interval_size: int):
async def aperiodic_cluster_checks(self):
"""Periodically check the status of the cluster, gather metrics about the cluster's utilization & memory,
and save it to Den."""

disable_observability = os.getenv("disable_observability", False)
while True:
should_send_status_and_logs_to_den: bool = (
configs.token is not None and self._cluster_uri is not None
Expand All @@ -345,6 +343,11 @@ async def aperiodic_cluster_checks(self):
if not should_send_status_and_logs_to_den and not should_update_autostop:
break

cluster_config = await self.aget_cluster_config()
interval_size = cluster_config.get(
"status_check_interval", DEFAULT_STATUS_CHECK_INTERVAL
)

try:
status, den_resp_code = await self.acheck_cluster_status(
send_to_den=should_send_status_and_logs_to_den
Expand All @@ -354,16 +357,17 @@ async def aperiodic_cluster_checks(self):
logger.debug("Updating autostop")
await self._update_autostop(status)

cluster_config = await self.aget_cluster_config()
interval_size = cluster_config.get(
"status_check_interval", DEFAULT_STATUS_CHECK_INTERVAL
)

if interval_size == -1 or not should_send_status_and_logs_to_den:
continue

logger.debug("Performing cluster checks")

if disable_observability:
logger.info(
"Cluster observability not enabled, skipping metrics collection."
)
break

if den_resp_code == 404:
logger.info(
"Cluster has not yet been saved to Den, cannot update status or logs."
Expand Down Expand Up @@ -396,9 +400,7 @@ async def aperiodic_cluster_checks(self):
await self.aset_cluster_config_value(
key="end_log_line", value=new_end_log_line
)
# since we are setting a new values to the cluster_config, we need to reload it so the next
# cluster check iteration will reference to the updated cluster config.
cluster_config = await self.aget_cluster_config()

if den_resp_code == 200:
await self.acheck_cluster_logs(interval_size=interval_size)

Expand All @@ -409,6 +411,7 @@ async def aperiodic_cluster_checks(self):
"Temporarily increasing the interval between status checks."
)
await asyncio.sleep(INCREASED_STATUS_CHECK_INTERVAL)

finally:
# make sure that the thread will go to sleep, even if the interval size == -1
# (meaning that sending status to den is disabled).
Expand Down Expand Up @@ -468,12 +471,10 @@ async def _status_for_env_servlet(self, env_servlet_name):

async def _aperiodic_gpu_check(self):
"""periodically collects cluster gpu usage"""

pynvml.nvmlInit() # init nvidia ml info collection

while True:
try:

gpu_count = pynvml.nvmlDeviceGetCount()
with self.lock:
if not self.gpu_metrics:
Expand Down Expand Up @@ -505,23 +506,22 @@ async def _aperiodic_gpu_check(self):
}
)
self.gpu_metrics[gpu_index] = updated_gpu_info

except Exception as e:
logger.error(str(e))
pynvml.nvmlShutdown()
break

finally:
# collects gpu usage every 5 seconds.
await asyncio.sleep(GPU_COLLECTION_INTERVAL)

def _periodic_gpu_check(self):
# This is only ever called once in its own thread, so we can do asyncio.run here instead of
# sync_function.
# This is only ever called once in its own thread, so we can do asyncio.run here instead of `sync_function`.
asyncio.run(self._aperiodic_gpu_check())

def _get_node_gpu_usage(self, server_pid: int):

# currently works correctly for a single node GPU. Multinode-clusters will be supported shortly.

# TODO [SB] currently works correctly for a single node GPU. Multinode-clusters will be supported shortly.
collected_gpus_info = copy.deepcopy(self.gpu_metrics)

if collected_gpus_info is None or not collected_gpus_info[0]:
Expand All @@ -530,15 +530,13 @@ def _get_node_gpu_usage(self, server_pid: int):
cluster_gpu_usage = get_gpu_usage(
collected_gpus_info=collected_gpus_info, servlet_type=ServletType.cluster
)
cluster_gpu_usage[
"server_pid"
] = server_pid # will be useful for multi-node clusters.

# will be useful for multi-node clusters.
cluster_gpu_usage["server_pid"] = server_pid

return cluster_gpu_usage

async def astatus(self, send_to_den: bool = False) -> Tuple[Dict, Optional[int]]:
import psutil

config_cluster = copy.deepcopy(self.cluster_config)

# Popping out creds because we don't want to show them in the status
Expand Down Expand Up @@ -640,7 +638,6 @@ def status(self, send_to_den: bool = False):
# Save cluster logs to Den
##############################################
def _get_logs(self):

with open(SERVER_LOGFILE) as log_file:
log_lines = log_file.readlines()
cleaned_log_lines = [ColoredFormatter.format_log(line) for line in log_lines]
Expand Down
13 changes: 4 additions & 9 deletions runhouse/servers/http/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ async def ainitialize(
):
runtime_env = {"conda": conda_env} if conda_env else None

if not configs.observability_enabled:
logger.info("disabling cluster observability")

default_env_name = default_env_name or EMPTY_DEFAULT_ENV_NAME

# Ray and ClusterServlet should already be
Expand All @@ -154,13 +157,6 @@ async def ainitialize(
runtime_env=runtime_env,
)

# TODO disabling due to latency, figure out what to do with this
# try:
# # Collect metadata for the cluster immediately on init
# self._collect_cluster_stats()
# except Exception as e:
# logger.error(f"Failed to collect cluster stats: {str(e)}")

# We initialize a default env servlet where some things may run.
_ = obj_store.get_env_servlet(
env_name=default_env_name,
Expand Down Expand Up @@ -977,8 +973,7 @@ async def main():
# We connect this to the "base" env, which we'll initialize later,
# so writes to the obj_store within the server get proxied to the "base" env.
await obj_store.ainitialize(
default_env_name,
setup_cluster_servlet=ClusterServletSetupOption.FORCE_CREATE,
default_env_name, setup_cluster_servlet=ClusterServletSetupOption.FORCE_CREATE
)

cluster_config = await obj_store.aget_cluster_config()
Expand Down
11 changes: 11 additions & 0 deletions tests/test_resources/test_clusters/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,3 +936,14 @@ def test_switch_default_env(self, cluster):
# set it back
cluster.default_env = test_env
cluster.delete(new_env.name)

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_observability_enabled_by_default_on_cluster(self, cluster):
# Disable observability locally, which will be reflected on the cluster once the server is restarted
rh.configs.disable_observability()
cluster.restart_server()

if cluster._default_env:
env_vars = cluster._default_env.env_vars
assert env_vars.get("disable_observability") == "True"
Loading