Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group status tests together #899

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 165 additions & 174 deletions tests/test_resources/test_clusters/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,6 @@ def test_cluster_endpoint(self, cluster):
verify=verify,
headers=rh.globals.rns_client.request_headers(),
)
assert r.status_code == 200
assert r.json().get("cluster_config")["resource_type"] == "cluster"

@pytest.mark.level("local")
def test_load_cluster_status(self, cluster):
endpoint = cluster.endpoint()
verify = cluster.client.verify
r = requests.get(
f"{endpoint}/status",
verify=verify,
headers=rh.globals.rns_client.request_headers(),
)

assert r.status_code == 200
status_data = r.json()
assert status_data["cluster_config"]["resource_type"] == "cluster"
Expand Down Expand Up @@ -276,6 +263,171 @@ def test_rh_here_objects(self, cluster):
assert "test_table" in cluster.keys()
assert isinstance(cluster.get("test_table"), rh.Table)

@pytest.mark.level("local")
def test_condensed_config_for_cluster(self, cluster):
remote_cluster_config = rh.function(cluster_config).to(cluster)
on_cluster_config = remote_cluster_config()
local_cluster_config = cluster.config()

keys_to_skip = [
"creds",
"client_port",
"server_host",
"api_server_url",
"ssl_keyfile",
"ssl_certfile",
]
on_cluster_config = remove_config_keys(on_cluster_config, keys_to_skip)
local_cluster_config = remove_config_keys(local_cluster_config, keys_to_skip)

if local_cluster_config.get("stable_internal_external_ips", False):
cluster_ips = local_cluster_config.pop(
"stable_internal_external_ips", None
)[0]
on_cluster_ips = on_cluster_config.pop(
"stable_internal_external_ips", None
)[0]
assert tuple(cluster_ips) == tuple(on_cluster_ips)

assert on_cluster_config == local_cluster_config

@pytest.mark.level("local")
def test_sharing(self, cluster, friend_account_logged_in_docker_cluster_pk_ssh):
# Skip this test for ondemand clusters, because making
# it compatible with ondemand_cluster requires changes
# that break CI.
# TODO: Remove this by doing some CI-specific logic.
if cluster.__class__.__name__ == "OnDemandCluster":
return

if cluster.rns_address.startswith("~"):
# For `local_named_resource` resolve the rns address so it can be shared and loaded
from runhouse.globals import rns_client

cluster.rns_address = rns_client.local_to_remote_address(
cluster.rns_address
)

cluster.share(
users=["info@run.house"],
access_level="read",
notify_users=False,
)

# First try loading in same process/filesystem because it's more debuggable, but not as thorough
resource_class_name = cluster.config().get("resource_type").capitalize()
config = cluster.config()

with friend_account():
curr_config = load_shared_resource_config(
resource_class_name, cluster.rns_address
)
new_creds = curr_config.get("creds", None)
assert f'{config["name"]}-ssh-secret' in new_creds
assert curr_config == config

# TODO: If we are testing with an ondemand_cluster we to
# sync sky key so loading ondemand_cluster from config works
# Also need aws secret to load availability zones
# secrets=["sky", "aws"],
load_shared_resource_config_cluster = rh.function(
load_shared_resource_config
).to(friend_account_logged_in_docker_cluster_pk_ssh)
new_config = load_shared_resource_config_cluster(
resource_class_name, cluster.rns_address
)
new_creds = curr_config.get("creds", None)
assert f'{config["name"]}-ssh-secret' in new_creds
assert new_config == config

@pytest.mark.level("local")
def test_access_to_shared_cluster(self, cluster):
# TODO: Remove this by doing some CI-specific logic.
if cluster.__class__.__name__ == "OnDemandCluster":
return

if cluster.rns_address.startswith("~"):
# For `local_named_resource` resolve the rns address so it can be shared and loaded
from runhouse.globals import rns_client

cluster.rns_address = rns_client.local_to_remote_address(
cluster.rns_address
)

cluster.share(
users=["support@run.house"],
access_level="write",
notify_users=False,
)

cluster_name = cluster.rns_address
cluster_creds = cluster.creds_values
cluster_creds.pop("private_key", None)
cluster_creds.pop("public_key", None)

with friend_account_in_org():
shared_cluster = rh.cluster(name=cluster_name)
assert shared_cluster.rns_address == cluster_name
assert shared_cluster.creds_values.keys() == cluster_creds.keys()
echo_msg = "hello from shared cluster"
run_res = shared_cluster.run([f"echo {echo_msg}"])
assert echo_msg in run_res[0][1]
# First element, return code
assert shared_cluster.run(["echo hello"])[0][0] == 0

@pytest.mark.level("local")
def test_changing_name_and_saving_in_between(self, cluster):
remote_summer = rh.function(summer).to(cluster)
assert remote_summer(3, 4) == 7
old_name = cluster.name

cluster.save(name="new_testing_name")

assert remote_summer(3, 4) == 7
remote_sub = rh.function(sub).to(cluster)
assert remote_sub(3, 4) == -1

cluster_keys_remote = rh.function(cluster_keys).to(cluster)

# If save did not update the name, this will attempt to create a connection
# when the cluster is used remotely. However, if you update the name, `on_this_cluster` will
# work correctly and then the remote function will just call the object store when it calls .keys()
assert cluster.keys() == cluster_keys_remote(cluster)

# Restore the state?
cluster.save(name=old_name)

@pytest.mark.level("local")
def test_caller_token_propagated(self, cluster):
remote_assume_caller_and_get_token = rh.function(
assume_caller_and_get_token
).to(cluster)

remote_assume_caller_and_get_token.share(
users=["info@run.house"], notify_users=False
)

with friend_account():
unassumed_token, assumed_token = remote_assume_caller_and_get_token()
# "Local token" is the token the cluster accesses in rh.configs.token; this is what will be used
# in subsequent rns_client calls
assert assumed_token == rh.globals.rns_client.cluster_token(
rh.configs.token, cluster.rns_address
)
assert unassumed_token != rh.configs.token

# Docker clusters are logged out, ondemand clusters are logged in
output = cluster.run("sed -n 's/.*token: *//p' ~/.rh/config.yaml")
# No config file
if output[0][0] == 2:
assert unassumed_token is None
elif output[0][0] == 0:
assert unassumed_token == output[0][1].strip()

####################################################################################################
# Status tests
####################################################################################################

@pytest.mark.level("local")
def test_rh_status_pythonic(self, cluster):
sleep_remote = rh.function(sleep_fn).to(
Expand Down Expand Up @@ -505,167 +657,6 @@ def test_rh_status_stopped(self, cluster):
finally:
cluster.run(["runhouse restart"])

@pytest.mark.level("local")
def test_condensed_config_for_cluster(self, cluster):
remote_cluster_config = rh.function(cluster_config).to(cluster)
on_cluster_config = remote_cluster_config()
local_cluster_config = cluster.config()

keys_to_skip = [
"creds",
"client_port",
"server_host",
"api_server_url",
"ssl_keyfile",
"ssl_certfile",
]
on_cluster_config = remove_config_keys(on_cluster_config, keys_to_skip)
local_cluster_config = remove_config_keys(local_cluster_config, keys_to_skip)

if local_cluster_config.get("stable_internal_external_ips", False):
cluster_ips = local_cluster_config.pop(
"stable_internal_external_ips", None
)[0]
on_cluster_ips = on_cluster_config.pop(
"stable_internal_external_ips", None
)[0]
assert tuple(cluster_ips) == tuple(on_cluster_ips)

assert on_cluster_config == local_cluster_config

@pytest.mark.level("local")
def test_sharing(self, cluster, friend_account_logged_in_docker_cluster_pk_ssh):
# Skip this test for ondemand clusters, because making
# it compatible with ondemand_cluster requires changes
# that break CI.
# TODO: Remove this by doing some CI-specific logic.
if cluster.__class__.__name__ == "OnDemandCluster":
return

if cluster.rns_address.startswith("~"):
# For `local_named_resource` resolve the rns address so it can be shared and loaded
from runhouse.globals import rns_client

cluster.rns_address = rns_client.local_to_remote_address(
cluster.rns_address
)

cluster.share(
users=["info@run.house"],
access_level="read",
notify_users=False,
)

# First try loading in same process/filesystem because it's more debuggable, but not as thorough
resource_class_name = cluster.config().get("resource_type").capitalize()
config = cluster.config()

with friend_account():
curr_config = load_shared_resource_config(
resource_class_name, cluster.rns_address
)
new_creds = curr_config.get("creds", None)
assert f'{config["name"]}-ssh-secret' in new_creds
assert curr_config == config

# TODO: If we are testing with an ondemand_cluster we to
# sync sky key so loading ondemand_cluster from config works
# Also need aws secret to load availability zones
# secrets=["sky", "aws"],
load_shared_resource_config_cluster = rh.function(
load_shared_resource_config
).to(friend_account_logged_in_docker_cluster_pk_ssh)
new_config = load_shared_resource_config_cluster(
resource_class_name, cluster.rns_address
)
new_creds = curr_config.get("creds", None)
assert f'{config["name"]}-ssh-secret' in new_creds
assert new_config == config

@pytest.mark.level("local")
def test_access_to_shared_cluster(self, cluster):
# TODO: Remove this by doing some CI-specific logic.
if cluster.__class__.__name__ == "OnDemandCluster":
return

if cluster.rns_address.startswith("~"):
# For `local_named_resource` resolve the rns address so it can be shared and loaded
from runhouse.globals import rns_client

cluster.rns_address = rns_client.local_to_remote_address(
cluster.rns_address
)

cluster.share(
users=["support@run.house"],
access_level="write",
notify_users=False,
)

cluster_name = cluster.rns_address
cluster_creds = cluster.creds_values
cluster_creds.pop("private_key", None)
cluster_creds.pop("public_key", None)

with friend_account_in_org():
shared_cluster = rh.cluster(name=cluster_name)
assert shared_cluster.rns_address == cluster_name
assert shared_cluster.creds_values.keys() == cluster_creds.keys()
echo_msg = "hello from shared cluster"
run_res = shared_cluster.run([f"echo {echo_msg}"])
assert echo_msg in run_res[0][1]
# First element, return code
assert shared_cluster.run(["echo hello"])[0][0] == 0

@pytest.mark.level("local")
def test_changing_name_and_saving_in_between(self, cluster):
remote_summer = rh.function(summer).to(cluster)
assert remote_summer(3, 4) == 7
old_name = cluster.name

cluster.save(name="new_testing_name")

assert remote_summer(3, 4) == 7
remote_sub = rh.function(sub).to(cluster)
assert remote_sub(3, 4) == -1

cluster_keys_remote = rh.function(cluster_keys).to(cluster)

# If save did not update the name, this will attempt to create a connection
# when the cluster is used remotely. However, if you update the name, `on_this_cluster` will
# work correctly and then the remote function will just call the object store when it calls .keys()
assert cluster.keys() == cluster_keys_remote(cluster)

# Restore the state?
cluster.save(name=old_name)

@pytest.mark.level("local")
def test_caller_token_propagated(self, cluster):
remote_assume_caller_and_get_token = rh.function(
assume_caller_and_get_token
).to(cluster)

remote_assume_caller_and_get_token.share(
users=["info@run.house"], notify_users=False
)

with friend_account():
unassumed_token, assumed_token = remote_assume_caller_and_get_token()
# "Local token" is the token the cluster accesses in rh.configs.token; this is what will be used
# in subsequent rns_client calls
assert assumed_token == rh.globals.rns_client.cluster_token(
rh.configs.token, cluster.rns_address
)
assert unassumed_token != rh.configs.token

# Docker clusters are logged out, ondemand clusters are logged in
output = cluster.run("sed -n 's/.*token: *//p' ~/.rh/config.yaml")
# No config file
if output[0][0] == 2:
assert unassumed_token is None
elif output[0][0] == 0:
assert unassumed_token == output[0][1].strip()

@pytest.mark.level("local")
def test_send_status_to_db(self, cluster):
import json
Expand Down
Loading