diff --git a/.github/workflows/build_docs.yaml b/.github/workflows/build_docs.yaml index 1c4850e76..f61cc373f 100644 --- a/.github/workflows/build_docs.yaml +++ b/.github/workflows/build_docs.yaml @@ -18,7 +18,7 @@ jobs: - name: Build docs run: cd docs && make html && cd .. - name: Upload artifacts - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: docs path: docs/_build/html diff --git a/.github/workflows/cluster_tests.yaml b/.github/workflows/cluster_tests.yaml index c2eb218ea..e3b520951 100644 --- a/.github/workflows/cluster_tests.yaml +++ b/.github/workflows/cluster_tests.yaml @@ -34,7 +34,7 @@ jobs: - name: Run all cluster unit tests tests env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} run: pytest -v tests/test_resources/test_cluster.py --level unit - name: Teardown all test clusters diff --git a/.github/workflows/local_den_unit_tests.yaml b/.github/workflows/local_den_unit_tests.yaml index 1c6d77c21..ab444f8ff 100644 --- a/.github/workflows/local_den_unit_tests.yaml +++ b/.github/workflows/local_den_unit_tests.yaml @@ -48,8 +48,8 @@ jobs: - name: Setup Runhouse Config uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.TEST_USERNAME }} - token: ${{ secrets.TEST_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_PROD_TOKEN }} - name: Update Server URL in Runhouse Config run: | @@ -63,6 +63,6 @@ jobs: - name: Run unit tests env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} run: pytest -v --level unit diff --git a/.github/workflows/local_tests.yaml b/.github/workflows/local_tests.yaml index eee9ea873..291e44bb5 100644 --- a/.github/workflows/local_tests.yaml +++ b/.github/workflows/local_tests.yaml @@ -40,14 +40,14 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_PROD_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - name: pytest -v --level local tests/test_servers/ env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} run: pytest -v --level local tests/test_servers/ timeout-minutes: 60 @@ -78,15 +78,15 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_PROD_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - name: pytest -v --level local -k "not servertest and not secrettest and not moduletest and not functiontest and not envtest and not clustertest" env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_PROD_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest -v --level local -k "not servertest and not secrettest and not moduletest and not functiontest and not envtest and not clustertest" timeout-minutes: 60 @@ -118,15 +118,15 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_PROD_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - name: pytest -v --level local -k "secrettest" env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_PROD_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest -v --level local -k "secrettest" timeout-minutes: 60 @@ -143,15 +143,15 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_PROD_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - name: pytest -v --level local -k "moduletest" env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_PROD_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest -v --level local -k "moduletest" timeout-minutes: 60 @@ -168,15 +168,15 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_PROD_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - name: pytest -v --level local -k "functiontest" env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_PROD_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest -v --level local -k "functiontest" timeout-minutes: 60 @@ -193,15 +193,15 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_PROD_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - name: pytest -v --level local -k "envtest" env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ env.ORG_MEMBER_PROD_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest -v --level local -k "envtest" timeout-minutes: 60 @@ -218,15 +218,15 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_PROD_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - name: pytest -v --level local -k "clustertest" env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_PROD_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest -v --level local -k "clustertest" timeout-minutes: 60 diff --git a/.github/workflows/local_tests_den_dev.yaml b/.github/workflows/local_tests_den_dev.yaml index d328aa8dc..a35b09b24 100644 --- a/.github/workflows/local_tests_den_dev.yaml +++ b/.github/workflows/local_tests_den_dev.yaml @@ -21,8 +21,8 @@ jobs: # - name: pytest -v --level local tests/test_servers/ # env: - # TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - # TEST_USERNAME: ${{ secrets.TEST_USERNAME }} + # KITCHEN_TESTER_DEV_TOKEN: ${{ secrets.KITCHEN_TESTER_DEV_TOKEN }} + # KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} # run: pytest -v --level local tests/test_servers/ server-tests-logged-in-level-local: @@ -37,15 +37,14 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_DEV_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - - name: pytest -v --level local tests/test_servers/ env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_DEV_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} run: pytest -v --level local tests/test_servers/ --api-server-url $API_SERVER_URL timeout-minutes: 60 @@ -60,8 +59,8 @@ jobs: # - name: pytest -v --level local -k "not servertest and not secrettest" # env: - # TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - # TEST_USERNAME: ${{ secrets.TEST_USERNAME }} + # KITCHEN_TESTER_DEV_TOKEN: ${{ secrets.KITCHEN_TESTER_DEV_TOKEN }} + # KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} # run: pytest -v --level local -k "not servertest and not secrettest" most-tests-logged-in-level-local: @@ -76,15 +75,15 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_DEV_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - name: pytest -v --level local -k "not servertest and not secrettest and not moduletest and not functiontest and not envtest" env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_DEV_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_DEV_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest -v --level local -k "not servertest and not secrettest and not moduletest and not functiontest and not envtest" --api-server-url $API_SERVER_URL timeout-minutes: 60 @@ -100,8 +99,8 @@ jobs: # - name: pytest -v --level local -k "secrettest" # env: - # TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - # TEST_USERNAME: ${{ secrets.TEST_USERNAME }} + # KITCHEN_TESTER_DEV_TOKEN: ${{ secrets.KITCHEN_TESTER_DEV_TOKEN }} + # KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} # run: pytest -v --level local -k "secrettest" secret-tests-logged-in-level-local: @@ -116,15 +115,15 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_DEV_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - name: pytest -v --level local -k "secrettest" env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_DEV_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_DEV_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest -v --level local -k "secrettest" --api-server-url $API_SERVER_URL timeout-minutes: 60 @@ -141,15 +140,15 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_DEV_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - name: pytest -v --level local -k "moduletest" env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_DEV_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_DEV_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest -v --level local -k "moduletest" --api-server-url $API_SERVER_URL timeout-minutes: 60 @@ -166,15 +165,15 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_DEV_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - name: pytest -v --level local -k "functiontest" env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_DEV_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_DEV_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest -v --level local -k "functiontest" --api-server-url $API_SERVER_URL timeout-minutes: 60 @@ -191,15 +190,15 @@ jobs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ secrets.CI_ACCOUNT_USERNAME }} - token: ${{ secrets.CI_ACCOUNT_TOKEN }} + username: ${{ secrets.DEN_TESTER_USERNAME }} + token: ${{ secrets.DEN_TESTER_DEV_TOKEN }} api_server_url: ${{ env.API_SERVER_URL }} - name: pytest -v --level local -k "envtest" env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_DEV_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_DEV_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest -v --level local -k "envtest" --api-server-url $API_SERVER_URL timeout-minutes: 60 diff --git a/.github/workflows/nightly_release_testing.yaml b/.github/workflows/nightly_release_testing.yaml index ef1e5d111..6f6a6b7fe 100644 --- a/.github/workflows/nightly_release_testing.yaml +++ b/.github/workflows/nightly_release_testing.yaml @@ -25,15 +25,15 @@ jobs: KUBECONFIG: ${{ secrets.KUBECONFIG }} GCP_SERVICE_ACCOUNT_KEY: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }} GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }} - CI_ACCOUNT_TOKEN: ${{ secrets.CI_ACCOUNT_TOKEN }} - CI_ACCOUNT_USERNAME: ${{ secrets.CI_ACCOUNT_USERNAME }} + DEN_TESTER_TOKEN: ${{ secrets.DEN_TESTER_PROD_TOKEN }} + DEN_TESTER_USERNAME: ${{ secrets.DEN_TESTER_USERNAME }} API_SERVER_URL: ${{ env.API_SERVER_URL }} - name: Run not cluster tests env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_PROD_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest --level release tests -k "not cluster" --detached timeout-minutes: 180 @@ -63,15 +63,15 @@ jobs: KUBECONFIG: ${{ secrets.KUBECONFIG }} GCP_SERVICE_ACCOUNT_KEY: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }} GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }} - CI_ACCOUNT_TOKEN: ${{ secrets.CI_ACCOUNT_TOKEN }} - CI_ACCOUNT_USERNAME: ${{ secrets.CI_ACCOUNT_USERNAME }} + DEN_TESTER_TOKEN: ${{ secrets.DEN_TESTER_PROD_TOKEN }} + DEN_TESTER_USERNAME: ${{ secrets.DEN_TESTER_USERNAME }} API_SERVER_URL: ${{ env.API_SERVER_URL }} - name: Run cluster and not on-demand tests env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_PROD_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest --level release tests -k "cluster and not ondemand" --detached timeout-minutes: 180 @@ -101,17 +101,17 @@ jobs: KUBECONFIG: ${{ secrets.KUBECONFIG }} GCP_SERVICE_ACCOUNT_KEY: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }} GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }} - CI_ACCOUNT_TOKEN: ${{ secrets.CI_ACCOUNT_TOKEN }} - CI_ACCOUNT_USERNAME: ${{ secrets.CI_ACCOUNT_USERNAME }} + DEN_TESTER_TOKEN: ${{ secrets.DEN_TESTER_PROD_TOKEN }} + DEN_TESTER_USERNAME: ${{ secrets.DEN_TESTER_USERNAME }} API_SERVER_URL: ${{ env.API_SERVER_URL }} - name: Run on-demand aws tests env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_PROD_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} - run: pytest --level release tests -k "ondemand_aws_cluster" --detached + run: pytest --level release tests -k "ondemand_aws_docker_cluster" --detached timeout-minutes: 180 - name: Teardown all ondemand-aws-tests clusters @@ -139,15 +139,15 @@ jobs: AWS_OSS_ROLE_ARN: ${{ secrets.AWS_OSS_ROLE_ARN }} DEV_AWS_ACCESS_KEY: ${{ secrets.DEV_AWS_ACCESS_KEY }} DEV_AWS_SECRET_KEY: ${{ secrets.DEV_AWS_SECRET_KEY }} - CI_ACCOUNT_TOKEN: ${{ secrets.CI_ACCOUNT_TOKEN }} - CI_ACCOUNT_USERNAME: ${{ secrets.CI_ACCOUNT_USERNAME }} + DEN_TESTER_TOKEN: ${{ secrets.DEN_TESTER_PROD_TOKEN }} + DEN_TESTER_USERNAME: ${{ secrets.DEN_TESTER_USERNAME }} API_SERVER_URL: ${{ env.API_SERVER_URL }} - name: Run on-demand gcp tests env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_PROD_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest --level release tests -k "ondemand_gcp_cluster" --detached timeout-minutes: 180 @@ -177,15 +177,15 @@ jobs: DEV_AWS_SECRET_KEY: ${{ secrets.DEV_AWS_SECRET_KEY }} GCP_SERVICE_ACCOUNT_KEY: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }} GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }} - CI_ACCOUNT_TOKEN: ${{ secrets.CI_ACCOUNT_TOKEN }} - CI_ACCOUNT_USERNAME: ${{ secrets.CI_ACCOUNT_USERNAME }} + DEN_TESTER_TOKEN: ${{ secrets.DEN_TESTER_PROD_TOKEN }} + DEN_TESTER_USERNAME: ${{ secrets.DEN_TESTER_USERNAME }} API_SERVER_URL: ${{ env.API_SERVER_URL }} - name: Run kubernetes tests env: - TEST_TOKEN: ${{ secrets.TEST_TOKEN }} - TEST_USERNAME: ${{ secrets.TEST_USERNAME }} - ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_TOKEN }} + KITCHEN_TESTER_TOKEN: ${{ secrets.KITCHEN_TESTER_PROD_TOKEN }} + KITCHEN_TESTER_USERNAME: ${{ secrets.KITCHEN_TESTER_USERNAME }} + ORG_MEMBER_TOKEN: ${{ secrets.ORG_MEMBER_PROD_TOKEN }} ORG_MEMBER_USERNAME: ${{ secrets.ORG_MEMBER_USERNAME }} run: pytest --level release tests -k "ondemand_k8s_cluster" --detached timeout-minutes: 180 @@ -222,8 +222,8 @@ jobs: DEV_AWS_SECRET_KEY: ${{ secrets.DEV_AWS_SECRET_KEY }} GCP_SERVICE_ACCOUNT_KEY: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }} GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }} - CI_ACCOUNT_TOKEN: ${{ secrets.CI_ACCOUNT_TOKEN }} - CI_ACCOUNT_USERNAME: ${{ secrets.CI_ACCOUNT_USERNAME }} + DEN_TESTER_TOKEN: ${{ secrets.DEN_TESTER_PROD_TOKEN }} + DEN_TESTER_USERNAME: ${{ secrets.DEN_TESTER_USERNAME }} API_SERVER_URL: ${{ env.API_SERVER_URL }} - name: Wait to check cluster status diff --git a/.github/workflows/release_precheck.yaml b/.github/workflows/release_precheck.yaml index 1fca4a759..cedd1b739 100644 --- a/.github/workflows/release_precheck.yaml +++ b/.github/workflows/release_precheck.yaml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ['3.7', '3.8', '3.9', '3.10', '3.11'] + python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] steps: - name: Checkout repository @@ -26,12 +26,11 @@ jobs: - name: Create Conda environment run: conda create --yes --name test-env python=${{ matrix.python-version }} - - name: Install current package in editable mode and importlib_metadata for python 3.7 + - name: Install current package in editable mode run: | source $CONDA/etc/profile.d/conda.sh conda activate test-env pip install -e . - pip install importlib_metadata - name: Test package import run: | diff --git a/.github/workflows/sagemaker_tests.yaml b/.github/workflows/sagemaker_tests.yaml deleted file mode 100644 index 25c1c81ec..000000000 --- a/.github/workflows/sagemaker_tests.yaml +++ /dev/null @@ -1,37 +0,0 @@ -name: sagemaker-tests - -on: workflow_dispatch - -jobs: - sagemaker-tests: - runs-on: ubuntu-latest - steps: - - name: Check out repository code - uses: actions/checkout@v3 - - - name: setup python - uses: actions/setup-python@v4 - with: - python-version: '3.10' - - - name: Configure aws credentials - run: | - aws sts assume-role --role-arn ${{ secrets.AWS_ROLE_ARN }} --role-session-name "GitHubAction" > assumed-role.json - export AWS_ACCESS_KEY_ID=$(jq -r .Credentials.AccessKeyId assumed-role.json) - export AWS_SECRET_ACCESS_KEY=$(jq -r .Credentials.SecretAccessKey assumed-role.json) - export AWS_SESSION_TOKEN=$(jq -r .Credentials.SessionToken assumed-role.json) - - - name: Install python packages & dependencies - run: | - pip install runhouse[sagemaker] - pip install tests/requirements.txt - - - name: Run all sagemaker tests - env: - AWS_ROLE_ARN: ${{ secrets.AWS_ROLE_ARN }} - run: pytest tests/test_resources/test_clusters/test_sagemaker_cluster/test_sagemaker.py --ignore-filters - - - name: Teardown all SageMaker clusters - run: | - aws sagemaker stop-training-job --training-job-name "rh-sagemaker" - aws sagemaker stop-training-job --training-job-name "rh-sagemaker-2" diff --git a/.github/workflows/setup_release_testing/action.yaml b/.github/workflows/setup_release_testing/action.yaml index fada8c122..ec3945c8d 100644 --- a/.github/workflows/setup_release_testing/action.yaml +++ b/.github/workflows/setup_release_testing/action.yaml @@ -20,11 +20,11 @@ inputs: KUBECONFIG: description: 'Kubeconfig' required: false - CI_ACCOUNT_TOKEN: - description: 'CI Token' + DEN_TESTER_TOKEN: + description: 'Den Tester Token' required: true - CI_ACCOUNT_USERNAME: - description: 'CI Username' + DEN_TESTER_USERNAME: + description: 'Den Tester Username' required: true API_SERVER_URL: description: 'API Server URL' @@ -85,6 +85,6 @@ runs: - name: Setup ~/.rh/config.yaml uses: ./.github/workflows/setup_rh_config with: - username: ${{ inputs.CI_ACCOUNT_USERNAME }} - token: ${{ inputs.CI_ACCOUNT_TOKEN }} + username: ${{ inputs.DEN_TESTER_USERNAME }} + token: ${{ inputs.DEN_TESTER_TOKEN }} api_server_url: ${{ inputs.API_SERVER_URL }} diff --git a/docs/api/python/function.rst b/docs/api/python/function.rst index 21a202d77..5a0ee2134 100644 --- a/docs/api/python/function.rst +++ b/docs/api/python/function.rst @@ -17,7 +17,7 @@ Function Class .. autoclass:: runhouse.Function :members: - :exclude-members: map, starmap, get_or_call, send_secrets + :exclude-members: map, starmap, get_or_call .. automethod:: __init__ diff --git a/docs/tutorials/quick-start-local.rst b/docs/tutorials/quick-start-local.rst index 12f7a64dd..41c955533 100644 --- a/docs/tutorials/quick-start-local.rst +++ b/docs/tutorials/quick-start-local.rst @@ -39,28 +39,28 @@ and details of the server. For printing cluster's status outside the cluser, its /sashab/rh-basic-cpu 😈 Runhouse Daemon is running 🏃 - Runhouse v0.0.28 - server pid: 29395 + Runhouse v0.0.34 + 🤖 aws m6i.large cluster | 🌍 us-east-1 | 💸 $0.096/hr + server pid: 29477 • server port: 32300 - • den auth: True + • den auth: False • server connection type: ssh • backend config: - • resource subtype: OnDemandCluster - • domain: None - • server host: 0.0.0.0 - • ips: ['52.207.212.159'] - • resource subtype: OnDemandCluster - • autostop mins: autostop disabled + • resource subtype: OnDemandCluster + • domain: None + • server host: 0.0.0.0 + • ips: ['52.91.194.125'] + • autostop mins: autostop disabled + CPU Utilization: 5.4% Serving 🍦 : • _cluster_default_env (runhouse.Env) - This environment has only python packages installed, if such provided. No resources were found. - • sd_env (runhouse.Env) | pid: 29716 | node: head (52.207.212.159) - CPU: 0.0% | Memory: 0.13 / 8 Gb (1.65%) - This environment has only python packages installed, if such provided. No resources were found. - • np_pd_env (runhouse.Env) | pid: 29578 | node: head (52.207.212.159) - CPU: 0.0% | Memory: 0.13 / 8 Gb (1.71%) - • /sashab/summer (runhouse.Function) - • mult (runhouse.Function) + This environment has only python packages installed, if provided. No resources were found. + • np_pd_env (runhouse.Env) | pid: 29621 | node: head (52.91.194.125) + CPU: 0.3% | Memory: 0.1 / 8 Gb (0.01%) + • np_pd_env (runhouse.Env) + • summer (runhouse.Function) Currently not running + • mult (runhouse.Function) Running for 2.484918 seconds + *GPU cluster* @@ -69,29 +69,33 @@ and details of the server. For printing cluster's status outside the cluser, its /sashab/rh-basic-gpu 😈 Runhouse Daemon is running 🏃 - Runhouse v0.0.28 - server pid: 29486 + Runhouse v0.0.34 + 🤖 aws g5.xlarge cluster | 🌍 us-east-1 | 💰 $1.006/hr + server pid: 29657 • server port: 32300 - • den auth: True + • den auth: False • server connection type: ssh • backend config: - • resource subtype: OnDemandCluster - • domain: None - • server host: 0.0.0.0 - • ips: ['35.171.157.49'] - • resource subtype: OnDemandCluster - • autostop mins: autostop disabled + • resource subtype: OnDemandCluster + • domain: None + • server host: 0.0.0.0 + • ips: ['3.92.223.118'] + • autostop mins: autostop disabled + CPU Utilization: 12.8% | GPU Utilization: 7.07% Serving 🍦 : • _cluster_default_env (runhouse.Env) - This environment has only python packages installed, if such provided. No resources were found. - • np_pd_env (runhouse.Env) | pid: 29672 | node: head (35.171.157.49) - CPU: 0.0% | Memory: 0.13 / 16 Gb (0.85%) - • /sashab/summer (runhouse.Function) - • mult (runhouse.Function) - • sd_env (runhouse.Env) | pid: 29812 | node: head (35.171.157.49) - CPU: 1.0% | Memory: 4.47 / 16 Gb (28.95%) - GPU: 0.0% | Memory: 6.89 / 23 Gb (29.96%) - • sd_generate (runhouse.Function) + This environment has only python packages installed, if provided. No resources were found. + • np_pd_env (runhouse.Env) | pid: 29809 | node: head (3.92.223.118) + CPU: 0.4% | Memory: 0.1 / 16 Gb (0.01%) + • np_pd_env (runhouse.Env) + • summer (runhouse.Function) Currently not running + • mult (runhouse.Function) Currently not running + • sd_env (runhouse.Env) | pid: 32054 | node: head (3.92.223.118) + CPU: 40.1% | Memory: 2.87 / 16 Gb (0.19%) + GPU Memory: 3.38 / 23 Gb (14.7%) + • sd_env (runhouse.Env) + • sd_generate (runhouse.Function) Running for 26.578614 seconds + Local Python Function --------------------- diff --git a/examples/lora-example-with-notebook/readme.md b/examples/lora-example-with-notebook/readme.md new file mode 100644 index 000000000..e90c1e2b9 --- /dev/null +++ b/examples/lora-example-with-notebook/readme.md @@ -0,0 +1,7 @@ +## LoRA Fine-Tuning Class with Example of Notebook Usage +In this example, we define a Fine Tuner class (LoraFineTuner.py) in **regular Python** and launch remote GPU compute to do the fine-tuning. + +In particular, we show how you can start the fine tuning and interact with the fine-tuning class (a remote object) through regular Python or a Notebook. Runhouse lets you work *locally* with *remote objects* defined by regular code and edited locally, compared to tooling like hosted notebooks which let you *work locally while SSH'ed into a remote setting.* This offers a few distinct advantages: +* **Real compute and real data:** ML Engineers and data scientists do not need to launch projects on toy compute offered in a research environment. +* **Real code:** Rather than working on Notebooks (because they have to), your team is writing code and developing locally just like a normal software team. The only difference is dispatching the work for remote computation since the local machine doesn't have the right hardware. +* **Fast research to production:** The work done while writing and testing the class is essentially enough to bring the work to production as well. There is no costly rebuilding of the same code a second time to work in a Pipeline. diff --git a/examples/torch-training/TorchBasicExample.py b/examples/torch-training/TorchBasicExample.py index 1b3307baf..ac6421452 100644 --- a/examples/torch-training/TorchBasicExample.py +++ b/examples/torch-training/TorchBasicExample.py @@ -33,18 +33,39 @@ import torch.nn as nn import torch.nn.functional as F import torch.optim as optim +from PIL import Image from torch.utils.data import DataLoader from torchvision import datasets, transforms # Let's define a function that downloads the data. You can imagine this as a generic function to access data. -def DownloadData(path="./data"): +def download_data(path="./data"): datasets.MNIST(path, train=True, download=True) datasets.MNIST(path, train=False, download=True) print("Done with data download") +def preprocess_data(path): + transform = transforms.Compose( + [ + transforms.Resize( + (28, 28), interpolation=Image.BILINEAR + ), # Resize to 28x28 using bilinear interpolation + transforms.ToTensor(), + transforms.Normalize( + (0.5,), (0.5,) + ), # Normalize with mean=0.5, std=0.5 for general purposes + ] + ) + + train = datasets.MNIST(path, train=False, download=False, transform=transform) + test = datasets.MNIST(path, train=False, download=False, transform=transform) + print("Done with data preprocessing") + print(f"Number of training samples: {len(train)}") + print(f"Number of test samples: {len(test)}") + + # Next, we define a model class. We define a very basic feedforward neural network with three fully connected layers. class TorchExampleBasic(nn.Module): def __init__(self): @@ -78,9 +99,7 @@ def __init__(self): self.train_loader = None self.test_loader = None - self.transform = transforms.Compose( - [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] - ) + self.transform = transforms.Compose([transforms.ToTensor()]) self.accuracy = None self.test_loss = None @@ -216,7 +235,8 @@ def return_status(self): remote_torch_example = rh.module(SimpleTrainer).to( cluster, env=env, name="torch-basic-training" ) - remote_download = rh.function(DownloadData).to(cluster, env=env) + remote_download = rh.function(download_data).to(cluster, env=env) + remote_preprocess = rh.function(preprocess_data).to(cluster, env=env) # ## Calling our remote Trainer # We instantiate the remote class @@ -235,6 +255,7 @@ def return_status(self): # We create the datasets remotely, and then send them to the remote model / remote .load_train() method. The "preprocessing" happens remotely. # They become instance variables of the remote Trainer. remote_download() + remote_preprocess() model.load_train("./data", batch_size) model.load_test("./data", batch_size) diff --git a/examples/torch-training/airflow-multicloud/DataProcessing.py b/examples/torch-training/airflow-multicloud/DataProcessing.py new file mode 100644 index 000000000..987d120b7 --- /dev/null +++ b/examples/torch-training/airflow-multicloud/DataProcessing.py @@ -0,0 +1,40 @@ +import os + +import boto3 + + +# Download data from S3 +def download_folder_from_s3(bucket_name, s3_folder_prefix, local_folder_path): + s3 = boto3.client("s3") + + paginator = s3.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket_name, Prefix=s3_folder_prefix): + if "Contents" in page: + for obj in page["Contents"]: + s3_key = obj["Key"] + relative_path = os.path.relpath(s3_key, s3_folder_prefix) + local_path = os.path.join(local_folder_path, relative_path) + + os.makedirs(os.path.dirname(local_path), exist_ok=True) + s3.download_file(bucket_name, s3_key, local_path) + print(f"Downloaded {s3_key} to {local_path}") + + +# download_folder_from_s3('rh-demo-external', 'your/s3/folder/prefix', '/path/to/local/folder', 'your-access-key-id', 'your-secret-access-key') + + +# Upload data to S3 bucket +def upload_folder_to_s3(local_folder_path, bucket_name, s3_folder_prefix): + s3 = boto3.client("s3") + + for root, dirs, files in os.walk(local_folder_path): + for file in files: + local_path = os.path.join(root, file) + relative_path = os.path.relpath(local_path, local_folder_path) + s3_path = os.path.join(s3_folder_prefix, relative_path) + + s3.upload_file(local_path, bucket_name, s3_path) + print(f"Uploaded {local_path} to s3://{bucket_name}/{s3_path}") + + +# upload_folder_to_s3('/path/to/local/folder', 'rh-demo-external', 'your/s3/folder/prefix', 'your-access-key-id', 'your-secret-access-key') diff --git a/examples/torch-training/airflow-multicloud/airflow_multicloud_torch_train.py b/examples/torch-training/airflow-multicloud/airflow_multicloud_torch_train.py new file mode 100644 index 000000000..19f56b95e --- /dev/null +++ b/examples/torch-training/airflow-multicloud/airflow_multicloud_torch_train.py @@ -0,0 +1,264 @@ +# # Using Multi-Cloud with Airflow + +# This basic Airflow example combines CPU processing on AWS, storage on S3, and GPU processing on Google Cloud. +# There are several reasons why you might want to use multiple clouds in a single workflow: +# * Your data lives on AWS, but you are quota limited on GPUs there or don't have access to powerful GPUs. You can do pre-processing on AWS and then train on a second cloud where you have GPUs. +# * You don't want GPUs to sit idle while you are doing pre-processing or post-processing. You can use a GPU cluster on-demand and then down it when you are done. +# * You want to right-size instances for each step of execution rather than bringing up a box that is the maximum of each of your memory, CPU, and GPU requirements. + +# The Torch model is identical to our [Torch training example](https://www.run.house/examples/torch-vision-mnist-basic-model-train-test). +# +# ## The usage pattern for Runhouse with Airflow: +# * Write Python classes and functions using normal, ordinary coding best practices. Do not think about DAGs or DSLs at all, just write great code. +# * Send the code for remote execution with Runhouse, and figure out whether the code works, debugging it interactively. Runhouse lets you send the code in seconds, and streams logs back. You can work on remote as if it were local. +# * Once you are satisfied with your code, you can write the callables for an Airflow PythonOperator. The code that is actually in the Airflow DAG is the **minimal code** to call out to already working Classes and Functions, defining the order of the steps (or you can even have a one-step Airflow DAG, making Airflow purely for scheduling and observability) +# * And you can easily iterate further on your code, or test the pipeline end-to-end from local with no Airflow participation + +# ## Setup credentials and dependencies +# +# Optionally, set up a virtual environment: +# ```shell +# $ conda create -n demo-runhouse python=3.10 +# $ conda activate demo-runhouse +# ``` +# Install the required dependencies: +# ```shell +# $ pip install "runhouse[aws]" torch torchvision airflow +# ``` +# +# We'll be launching an AWS EC2 GCP instance via [SkyPilot](https://github.com/skypilot-org/skypilot), so we need to +# make sure our AWS credentials are set up: +# ```shell +# $ aws configure +# $ gcloud init +# $ gcloud auth application-default login +# $ sky check +# ``` +# + +# Import some other libraries we need - namely Airflow, Runhouse, and a few others. +import logging +import os + +import sys +from datetime import datetime, timedelta + +import runhouse as rh +from airflow import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python import PythonOperator + +# ## Import the model class from the parent folder +# This class is the same as the example in https://www.run.house/examples/torch-vision-mnist-basic-model-train-test +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) +sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) + +from DataProcessing import download_folder_from_s3, upload_folder_to_s3 +from TorchBasicExample import download_data, preprocess_data, SimpleTrainer + + +logger = logging.getLogger(__name__) + +cpu_cluster_name = "cpu-cluster" # To be used later to name the clusters we raise +gpu_cluster_name = "gpu-cluster" + + +def get_cluster(**kwargs): + return rh.cluster( + name=kwargs.get("cluster_name", "rh-cluster"), + instance_type=kwargs.get("instance_type"), + provider=kwargs.get("provider", "aws"), + ).up_if_not() + + +# ## Define the callable functions. +# These will be called in sequence by the Airflow PythonOperator. Each task in the Airflow DAG becomes minimal. +# These callables define both *what tasks are run* and also *where the tasks are run* - essentially, programatically controlling the dispatch. + +# We can bring up an on-demand cluster using Runhouse. You can access powerful usage patterns by defining compute in code. All subsequent steps connect to this cluster by name, but you can bring up other clusters for other steps. +def bring_up_cluster_callable(**kwargs): + logger.info("Connecting to remote cluster") + cluster = get_cluster(**kwargs) + print(cluster.is_up()) + + # cluster.save() ## Use if you have a Runhouse Den account to save and monitor the resource. + + +# We will send the function to download data to the remote cluster and then invoke it to download the data to the remote machine. You can imagine that this is a data access or pre-processing step after which data is prepared. +def access_data_callable(**kwargs): + logger.info("Step 3: Preprocess the Data") + + env = rh.env(name="test_env", reqs=["torch", "torchvision"]) + + cluster = get_cluster(**kwargs) + + remote_download = rh.function(download_data).to(cluster, env=env) + logger.info("Download function sent to remote") + remote_download() + logger.info("Data downloaded") + + +# Now execute the preprocessing on the CPU-only cluster +def preprocess_data_callable(**kwargs): + cluster = get_cluster(**kwargs) + + env = rh.env(name="test_env", secrets=["aws"], reqs=["torch", "torchvision"]) + + remote_preprocess_data = rh.function(preprocess_data).to(cluster, env=env) + remote_upload = rh.function(upload_folder_to_s3).to(cluster, env=env) + logger.info("Data preprocessing and upload functions sent to cluster") + + remote_preprocess_data("./data") + logger.info("Data preprocessed") + remote_upload("./data", "rh-demo-external", "torch-training-example") + + logger.info("Saved to S3") + + +# Download the data from S3, onto a newly launched cluster. +def download_s3_data_callable(**kwargs): + cluster = get_cluster(**kwargs) + env = rh.env(name="test_env", secrets=["aws"], reqs=["torch", "torchvision"]) + s3_download = rh.function(download_folder_from_s3).to(cluster, env=env) + s3_download("rh-demo-external", "torch-training-example", "./data") + + +# Then we instantiate the trainer, and then invoke the training on the remote machine. On the remote, we have a GPU. This is also a natural point to split the workflow if we want to do some tasks on GPU and some on CPU. +def train_model_callable(**kwargs): + logger.info("Step 4: Train Model") + cluster = get_cluster(**kwargs) + + env = rh.env(name="test_env", reqs=["torch", "torchvision"]) + + remote_torch_example = rh.module(SimpleTrainer).to( + cluster, env=env, name="torch-basic-training" + ) + + model = remote_torch_example() + + batch_size = 64 + epochs = 5 + learning_rate = 0.01 + cluster.run(["ls"]) + model.load_train("./data", batch_size) + model.load_test("./data", batch_size) + + for epoch in range(epochs): + model.train_model(learning_rate=learning_rate) + model.test_model() + model.save_model( + bucket_name="my-simple-torch-model-example", + s3_file_path=f"checkpoints/model_epoch_{epoch + 1}.pth", + ) + + +# We programatically down the cluster, but we can also reuse this cluster by name. +def down_cluster(**kwargs): + cluster = get_cluster(**kwargs) + cluster.teardown() + + +# ## Define the Airflow DAG +# This is a simple DAG with multiple steps. Each step is a PythonOperator that calls a function defined above. +default_args = { + "owner": "paul", + "depends_on_past": False, + "start_date": datetime(2024, 8, 6), + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), +} +cpu_cluster_config = { + "cluster_name": "cpu-cluster", + "instance_type": "CPU:4+", + "provider": "aws", +} +gpu_cluster_config = { + "cluster_name": "gpu-cluster", + "instance_type": "L4:1", + "provider": "gcp", +} + + +dag = DAG( + "pytorch_training_pipeline_example_multicloud", + default_args=default_args, + description="A simple PyTorch training DAG with multiple steps", + schedule=timedelta(days=1), +) +run_sky_status = BashOperator( + task_id="run_sky_status", + bash_command="sky status", + dag=dag, +) + +bring_up_cluster_task = PythonOperator( + task_id="bring_up_cluster_task", + python_callable=bring_up_cluster_callable, + op_kwargs=cpu_cluster_config, + dag=dag, +) + +access_data_task = PythonOperator( + task_id="access_data_task", + python_callable=access_data_callable, + op_kwargs=cpu_cluster_config, + dag=dag, +) + +preprocess_data_task = PythonOperator( + task_id="preprocess_data_task", + python_callable=preprocess_data_callable, + op_kwargs=cpu_cluster_config, + dag=dag, +) + +bring_up_gpu_cluster_task = PythonOperator( + task_id="bring_up_gpu_cluster_task", + python_callable=bring_up_cluster_callable, + op_kwargs=gpu_cluster_config, + dag=dag, +) + +download_data_from_s3_task = PythonOperator( + task_id="download_data_from_s3_task", + python_callable=download_s3_data_callable, + op_kwargs=gpu_cluster_config, + dag=dag, +) + +train_model_task = PythonOperator( + task_id="train_model_task", + python_callable=train_model_callable, + op_kwargs=gpu_cluster_config, + dag=dag, +) + +down_cluster_task = PythonOperator( + task_id="down_cluster_task", + python_callable=down_cluster, + op_kwargs=cpu_cluster_config, + dag=dag, +) + +down_gpu_cluster_task = PythonOperator( + task_id="down_gpu_cluster_task", + python_callable=down_cluster, + op_kwargs=gpu_cluster_config, + dag=dag, +) + + +# You can see that this is an incredibly minimal amount of code in Airflow. The callables are callable from the DAG. But you can also run them from a Python script, from a notebook, or anywhere else - so you can instantly iterate on the underlying classes, the functions, and by the time they run locally, they are ready for prime time in your DAG. +( + run_sky_status + >> bring_up_cluster_task + >> access_data_task + >> preprocess_data_task + >> bring_up_gpu_cluster_task + >> download_data_from_s3_task + >> train_model_task + >> down_cluster_task + >> down_gpu_cluster_task +) diff --git a/examples/torch-training/airflow-multicloud/local_run_of_callables.py b/examples/torch-training/airflow-multicloud/local_run_of_callables.py new file mode 100644 index 000000000..c505a4818 --- /dev/null +++ b/examples/torch-training/airflow-multicloud/local_run_of_callables.py @@ -0,0 +1,56 @@ +# You can easily test both the Airflow flow, and the underlying components and code by calling them from local + +# Because the execution has been offloaded to GPU compute on remote, you can call each step from local, or from a notebook +# You can imagine that a DS or MLE might write a pipeline and interactively debug from local. +# Then, only when they are confident all the functions work, do they upload the Airflow pipeline which is minimal + +# Airflow is used to schedule, monitor, and retry jobs while providing observability over runs. +# However, the code that is the substance of the program is not packed into the Airflow DAG. + +import logging + +from airflow_multicloud_torch_train import ( + access_data_callable, + bring_up_cluster_callable, + down_cluster, + download_s3_data_callable, + preprocess_data_callable, + train_model_callable, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +if __name__ == "__main__": + logger.info("Starting the pipeline...") + + logger.info("Step 1: Bring up cluster") + cpu_cluster_config = { + "cluster_name": "cpu-cluster", + "instance_type": "CPU:4+", + "provider": "aws", + } + gpu_cluster_config = { + "cluster_name": "gpu-cluster", + "instance_type": "L4:1", + "provider": "gcp", + } + + bring_up_cluster_callable(**cpu_cluster_config) + + logger.info("Step 2: Access data") + access_data_callable(**cpu_cluster_config) + + logger.info("Step 3: Preprocess data") + preprocess_data_callable(**cpu_cluster_config) + + logger.info("Step 4: Train model") + bring_up_cluster_callable(**gpu_cluster_config) + download_s3_data_callable(**gpu_cluster_config) + train_model_callable(**gpu_cluster_config) + + logger.info("Pipeline completed.") + + down_cluster(**gpu_cluster_config) + down_cluster(**cpu_cluster_config) + logger.info("Cluster sucessfully downed.") diff --git a/examples/torch-training/airflow/airflow_example_torch_train.py b/examples/torch-training/airflow/airflow_example_torch_train.py index 698fee5df..24d1f3c7e 100644 --- a/examples/torch-training/airflow/airflow_example_torch_train.py +++ b/examples/torch-training/airflow/airflow_example_torch_train.py @@ -1,4 +1,4 @@ -# ## A Basic Airflow Example Using PyTorch to Train an Image Classification NN with the MNIST Dataset +# # A Basic Airflow Example Using PyTorch to Train an Image Classification NN with the MNIST Dataset # ## Using Airflow and Runhouse together # This example demonstrates how to use Airflow along with Runhouse to dispatch the work of training a basic Torch model to a remote GPU. @@ -49,7 +49,7 @@ from airflow.operators.python import PythonOperator sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) -from TorchBasicExample import DownloadData, SimpleTrainer +from TorchBasicExample import download_data, preprocess_data, SimpleTrainer logger = logging.getLogger(__name__) @@ -74,14 +74,17 @@ def access_data_callable(**kwargs): env = rh.env(name="test_env", reqs=["torch", "torchvision"]) cluster = rh.cluster(name="a10g-cluster").up_if_not() - remote_download = rh.function(DownloadData).to(cluster, env=env) + remote_download = rh.function(download_data).to(cluster, env=env) + remote_preprocess = rh.function(preprocess_data).to(cluster, env=env) logger.info("Download function sent to remote") remote_download() + remote_preprocess() logger.info("Downloaded") # Then we instantiate the trainer, and then invoke the training on the remote machine. On the remote, we have a GPU. This is also a natural point to split the workflow if we want to do some tasks on GPU and some on CPU. def train_model_callable(**kwargs): + logger.info("Step 3: Train Model") cluster = rh.cluster(name="a10g-cluster").up_if_not() env = rh.env(name="test_env", reqs=["torch", "torchvision"]) diff --git a/runhouse/__init__.py b/runhouse/__init__.py index 8ae83f2f8..003e35ba6 100644 --- a/runhouse/__init__.py +++ b/runhouse/__init__.py @@ -59,4 +59,4 @@ def __getattr__(name): raise AttributeError(f"module {__name__!r} has no attribute {name!r}") -__version__ = "0.0.34" +__version__ = "0.0.35" diff --git a/runhouse/constants.py b/runhouse/constants.py index ba0bd41c1..ea31a46be 100644 --- a/runhouse/constants.py +++ b/runhouse/constants.py @@ -59,6 +59,9 @@ "bash ~/miniconda.sh -b -p ~/miniconda", "source $HOME/miniconda/bin/activate", ] +# TODO should default to user's local Python version? +# from platform import python_version; python_version() +CONDA_PREFERRED_PYTHON_VERSION = "3.10.9" TEST_ORG = "test-org" TESTING_LOG_LEVEL = "INFO" diff --git a/runhouse/main.py b/runhouse/main.py index f9a65e603..3e5738ad4 100644 --- a/runhouse/main.py +++ b/runhouse/main.py @@ -1,3 +1,4 @@ +import datetime import importlib import logging import math @@ -11,6 +12,7 @@ import ray import requests +import rich.markdown import typer import yaml @@ -39,7 +41,6 @@ kill_actors, ) - # create an explicit Typer application app = typer.Typer(add_completion=False) @@ -221,13 +222,7 @@ def _print_cluster_config(cluster_config: Dict): "server_connection_type", ] - backend_config = [ - "resource_subtype", - "domain", - "server_host", - "ips", - "resource_subtype", - ] + backend_config = ["resource_subtype", "domain", "server_host", "ips"] if cluster_config.get("resource_subtype") != "Cluster": backend_config.append("autostop_mins") @@ -282,7 +277,7 @@ def _print_envs_info( # case where the default env doesn't hve any other resources, apart from the default env itself. console.print(f"{BULLET_UNICODE} {default_env_name} (runhouse.Env)") console.print( - f"{DOUBLE_SPACE_UNICODE}This environment has only python packages installed, if such provided. No " + f"{DOUBLE_SPACE_UNICODE}This environment has only python packages installed, if provided. No " "resources were found." ) @@ -384,7 +379,7 @@ def _print_envs_info( if len(resources_in_env) == 0: # No resources were found in the env, only the associated installed python reqs were installed. console.print( - f"{DOUBLE_SPACE_UNICODE}This environment has only python packages installed, if such provided. No resources were " + f"{DOUBLE_SPACE_UNICODE}This environment has only python packages installed, if provided. No resources were " "found." ) @@ -392,20 +387,95 @@ def _print_envs_info( for resource in resources_in_env: for resource_name, resource_info in resource.items(): resource_type = _adjust_resource_type( - resource_info["resource_type"] - ) - console.print( - f"{DOUBLE_SPACE_UNICODE}{BULLET_UNICODE} {resource_name} ({resource_type})" + resource_info.get("resource_type") ) + active_function_calls = resource_info.get("active_function_calls") + resource_info_str = f"{DOUBLE_SPACE_UNICODE}{BULLET_UNICODE} {resource_name} ({resource_type})" + + if resource_type == "runhouse.Function" and active_function_calls: + func_start_time_utc = active_function_calls[0].get( + "start_time", None + ) + + # casting func_start_time_utc to datetime format + func_start_time_utc = datetime.datetime.fromtimestamp( + func_start_time_utc, tz=datetime.timezone.utc + ) + + # func_end_time_utc = current time. Making sure it is in the same format as func_start_time_utc, + # so we could calculate function's running time. + func_end_time_utc = datetime.datetime.fromtimestamp( + time.time(), tz=datetime.timezone.utc + ) + + func_running_time = ( + func_end_time_utc - func_start_time_utc + ).total_seconds() + + is_func_running: str = f" [italic bright_green]Running for {func_running_time} seconds[/italic bright_green]" + + elif ( + resource_type == "runhouse.Function" + and not active_function_calls + ): + is_func_running: str = " [italic bright_yellow]Currently not running[/italic bright_yellow]" + + else: + is_func_running: str = "" + + resource_info_str = resource_info_str + is_func_running + + console.print(resource_info_str) + + +def _print_cloud_properties(cluster_config: dict): + cloud_properties = cluster_config.get("launched_properties", None) + if not cloud_properties: + return + cloud = cloud_properties.get("cloud") + instance_type = cloud_properties.get("instance_type") + region = cloud_properties.get("region") + cost_per_hour = cloud_properties.get("cost_per_hour") + + has_cuda = cluster_config.get("has_cuda", False) + cost_emoji = "💰" if has_cuda else "💸" + + num_of_instances = len(cluster_config.get("ips")) + num_of_instances_str = f"{num_of_instances}x " if num_of_instances > 1 else "" + + print( + f"🤖 {num_of_instances_str}{cloud} {instance_type} cluster | 🌍 {region} | {cost_emoji} ${cost_per_hour}/hr" + ) + + +def _get_resource_link_in_den_ui(cluster_name: str, api_server_url: str): + cluster_uri = rns_client.format_rns_address(cluster_name) + link_to_den_dashboard = f"{api_server_url}/resources/{cluster_uri}" + return link_to_den_dashboard + def _print_status(status_data: dict, current_cluster: Cluster) -> None: """Prints the status of the cluster to the console""" cluster_config = status_data.get("cluster_config") env_servlet_processes = status_data.get("env_servlet_processes") - if "name" in cluster_config.keys(): - console.print(cluster_config.get("name")) + cluster_name = cluster_config.get("name", None) + + if cluster_name: + api_server_url = cluster_config.get( + "api_server_url", rh.configs.get("api_server_url") + ) + api_server_url = api_server_url.replace( + "api", "www" + ) # convert the api link to the ui link. + cluster_link_in_den_ui = _get_resource_link_in_den_ui( + cluster_name=cluster_name, api_server_url=api_server_url + ) + cluster_name_hyperlink = rich.markdown.Text( + cluster_name, style=f"link {cluster_link_in_den_ui} white" + ) + console.print(cluster_name_hyperlink) has_cuda: bool = cluster_config.get("has_cuda") @@ -416,6 +486,7 @@ def _print_status(status_data: dict, current_cluster: Cluster) -> None: console.print(daemon_headline_txt, style="bold royal_blue1") console.print(f"Runhouse v{status_data.get('runhouse_version')}") + _print_cloud_properties(cluster_config) console.print(f"server pid: {status_data.get('server_pid')}") # Print relevant info from cluster config. @@ -431,7 +502,7 @@ def _print_status(status_data: dict, current_cluster: Cluster) -> None: cluster_cpu_utilization: float = status_data.get("server_cpu_utilization") server_util_info = ( - f"CPU Utilization: {round(cluster_cpu_utilization, 2)}% | GPU Utilization: {round(cluster_gpu_utilization,2)}%" + f"CPU Utilization: {round(cluster_cpu_utilization, 2)}% | GPU Utilization: {round(cluster_gpu_utilization, 2)}%" if has_cuda else f"CPU Utilization: {round(cluster_cpu_utilization, 2)}%" ) @@ -491,9 +562,7 @@ def status( current_cluster = cluster_or_local # cluster_or_local = rh.here try: - cluster_status = current_cluster.status( - resource_address=current_cluster.rns_address, send_to_den=send_to_den - ) + cluster_status = current_cluster.status(send_to_den=send_to_den) except ValueError: console.print("Failed to load status for cluster.") diff --git a/runhouse/resources/envs/conda_env.py b/runhouse/resources/envs/conda_env.py index b33ada436..ccdd2ccd4 100644 --- a/runhouse/resources/envs/conda_env.py +++ b/runhouse/resources/envs/conda_env.py @@ -5,7 +5,7 @@ import yaml -from runhouse.constants import ENVS_DIR +from runhouse.constants import CONDA_PREFERRED_PYTHON_VERSION, ENVS_DIR from runhouse.globals import obj_store from runhouse.logger import get_logger @@ -59,18 +59,21 @@ def config(self, condensed=True): def env_name(self): return self.conda_yaml["name"] - def _create_conda_env(self, force: bool = False, cluster: "Cluster" = None): + def _create_conda_env( + self, force: bool = False, cluster: "Cluster" = None, node: Optional[str] = None + ): yaml_path = Path(ENVS_DIR) / f"{self.env_name}.yml" env_exists = ( f"\n{self.env_name} " - in run_setup_command("conda info --envs", cluster=cluster)[1] + in run_setup_command("conda info --envs", cluster=cluster, node=node)[1] ) - run_setup_command(f"mkdir -p {ENVS_DIR}", cluster=cluster) + run_setup_command(f"mkdir -p {ENVS_DIR}", cluster=cluster, node=node) yaml_exists = ( (Path(ENVS_DIR).expanduser() / f"{self.env_name}.yml").exists() if not cluster - else run_setup_command(f"ls {yaml_path}", cluster=cluster)[0] == 0 + else run_setup_command(f"ls {yaml_path}", cluster=cluster, node=node)[0] + == 0 ) if force or not (yaml_exists and env_exists): @@ -87,19 +90,25 @@ def _create_conda_env(self, force: bool = False, cluster: "Cluster" = None): subprocess.run(f'python -c "{python_commands}"', shell=True) else: contents = yaml.dump(self.conda_yaml) - run_setup_command(f"echo $'{contents}' > {yaml_path}", cluster=cluster) + run_setup_command( + f"echo $'{contents}' > {yaml_path}", cluster=cluster, node=node + ) # create conda env from yaml file - run_setup_command(f"conda env create -f {yaml_path}", cluster=cluster) + run_setup_command( + f"conda env create -f {yaml_path}", cluster=cluster, node=node + ) env_exists = ( f"\n{self.env_name} " - in run_setup_command("conda info --envs", cluster=cluster)[1] + in run_setup_command("conda info --envs", cluster=cluster, node=node)[1] ) if not env_exists: raise RuntimeError(f"conda env {self.env_name} not created properly.") - def install(self, force: bool = False, cluster: "Cluster" = None): + def install( + self, force: bool = False, cluster: "Cluster" = None, node: Optional[str] = None + ): """Locally install packages and run setup commands. Args: @@ -109,32 +118,42 @@ def install(self, force: bool = False, cluster: "Cluster" = None): on the cluster using SSH. (default: ``None``) """ if not any(["python" in dep for dep in self.conda_yaml["dependencies"]]): - status_codes = run_setup_command("python --version", cluster=cluster) + status_codes = run_setup_command( + "python --version", cluster=cluster, node=node + ) base_python_version = ( - status_codes[1].split()[1] if status_codes[0] == 0 else "3.10.9" + status_codes[1].split()[1] + if status_codes[0] == 0 + else CONDA_PREFERRED_PYTHON_VERSION ) self.conda_yaml["dependencies"].append(f"python=={base_python_version}") install_conda(cluster=cluster) local_env_exists = ( f"\n{self.env_name} " - in run_setup_command("conda info --envs", cluster=cluster)[1] + in run_setup_command("conda info --envs", cluster=cluster, node=node)[1] ) - # Hash the config_for_rns to check if we need to create/install the conda env - env_config = self.config() - # Remove the name because auto-generated names will be different, but the installed components are the same - env_config.pop("name") - install_hash = hash(str(env_config)) - # Check the existing hash - if local_env_exists and install_hash in obj_store.installed_envs and not force: - logger.debug("Env already installed, skipping") - return - obj_store.installed_envs[install_hash] = self.name - - self._create_conda_env(force=force, cluster=cluster) - - self._install_reqs(cluster=cluster) - self._run_setup_cmds(cluster=cluster) + # If we're doing the install remotely via SSH (e.g. for default_env), there is no cache + if not cluster: + # Hash the config_for_rns to check if we need to create/install the conda env + env_config = self.config() + # Remove the name because auto-generated names will be different, but the installed components are the same + env_config.pop("name") + install_hash = hash(str(env_config)) + # Check the existing hash + if ( + local_env_exists + and install_hash in obj_store.installed_envs + and not force + ): + logger.debug("Env already installed, skipping") + return + obj_store.installed_envs[install_hash] = self.name + + self._create_conda_env(force=force, cluster=cluster, node=node) + + self._install_reqs(cluster=cluster, node=node) + self._run_setup_cmds(cluster=cluster, node=node) return diff --git a/runhouse/resources/envs/env.py b/runhouse/resources/envs/env.py index 41e011f40..2e626a7dc 100644 --- a/runhouse/resources/envs/env.py +++ b/runhouse/resources/envs/env.py @@ -140,7 +140,9 @@ def _secrets_to(self, system: Union[str, Cluster]): new_secrets.append(secret.to(system=system, env=self)) return new_secrets - def _install_reqs(self, cluster: Cluster = None, reqs: List = None): + def _install_reqs( + self, cluster: Cluster = None, reqs: List = None, node: str = "all" + ): reqs = reqs or self.reqs if reqs: for package in reqs: @@ -154,9 +156,11 @@ def _install_reqs(self, cluster: Cluster = None, reqs: List = None): raise ValueError(f"package {package} not recognized") logger.debug(f"Installing package: {str(pkg)}") - pkg._install(env=self, cluster=cluster) + pkg._install(env=self, cluster=cluster, node=node) - def _run_setup_cmds(self, cluster: Cluster = None, setup_cmds: List = None): + def _run_setup_cmds( + self, cluster: Cluster = None, setup_cmds: List = None, node: str = "all" + ): setup_cmds = setup_cmds or self.setup_cmds if not setup_cmds: @@ -165,10 +169,13 @@ def _run_setup_cmds(self, cluster: Cluster = None, setup_cmds: List = None): for cmd in setup_cmds: cmd = self._full_command(cmd) run_setup_command( - cmd, cluster=cluster, env_vars=_process_env_vars(self.env_vars) + cmd, + cluster=cluster, + env_vars=_process_env_vars(self.env_vars), + node=node, ) - def install(self, force: bool = False, cluster: Cluster = None): + def install(self, force: bool = False, cluster: Cluster = None, node: str = "all"): """Locally install packages and run setup commands. Args: @@ -176,20 +183,23 @@ def install(self, force: bool = False, cluster: Cluster = None): on the cluster. (Default: ``False``) cluster (Clsuter, optional): Cluster to install the env on. If not provided, env is installed on the current cluster. (Default: ``None``) + node (str, optional): Node to install the env on. (Default: ``"all"``) """ - # Hash the config_for_rns to check if we need to install - env_config = self.config() - # Remove the name because auto-generated names will be different, but the installed components are the same - env_config.pop("name") - install_hash = hash(str(env_config)) - # Check the existing hash - if install_hash in obj_store.installed_envs and not force: - logger.debug("Env already installed, skipping") - return - obj_store.installed_envs[install_hash] = self.name - - self._install_reqs(cluster=cluster) - self._run_setup_cmds(cluster=cluster) + # If we're doing the install remotely via SSH (e.g. for default_env), there is no cache + if not cluster: + # Hash the config_for_rns to check if we need to install + env_config = self.config() + # Remove the name because auto-generated names will be different, but the installed components are the same + env_config.pop("name") + install_hash = hash(str(env_config)) + # Check the existing hash + if install_hash in obj_store.installed_envs and not force: + logger.debug("Env already installed, skipping") + return + obj_store.installed_envs[install_hash] = self.name + + self._install_reqs(cluster=cluster, node=node) + self._run_setup_cmds(cluster=cluster, node=node) def _full_command(self, command: str): if self._run_cmd: @@ -205,7 +215,7 @@ def _run_command(self, command: str, **kwargs): def to( self, system: Union[str, Cluster], - node_idx: int = None, + node_idx: Optional[int] = None, path: str = None, force_install: bool = False, ): @@ -227,19 +237,21 @@ def to( >>> s3_env = env.to("s3", path="s3_bucket/my_env") """ system = _get_cluster_from(system) + if ( + isinstance(system, Cluster) + and node_idx is not None + and node_idx >= len(system.ips) + ): + raise ValueError( + f"Cluster {system.name} has only {len(system.ips)} nodes. Requested node index {node_idx} is out of bounds." + ) + new_env = copy.deepcopy(self) new_env.reqs, new_env.working_dir = self._reqs_to(system, path) if isinstance(system, Cluster): if node_idx is not None: - if node_idx >= len(system.ips): - raise ValueError( - f"Cluster {system.name} has only {len(system.ips)} nodes. Requested node index {node_idx} is out of bounds." - ) - - if new_env.compute is None: - new_env.compute = {} - + new_env.compute = new_env.compute or {} new_env.compute["node_idx"] = node_idx key = ( diff --git a/runhouse/resources/envs/utils.py b/runhouse/resources/envs/utils.py index 1c4182627..5ef8465e2 100644 --- a/runhouse/resources/envs/utils.py +++ b/runhouse/resources/envs/utils.py @@ -2,7 +2,7 @@ import subprocess from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Optional import yaml @@ -131,6 +131,7 @@ def run_setup_command( cluster: "Cluster" = None, env_vars: Dict = None, stream_logs: bool = True, + node: Optional[str] = None, ): """ Helper function to run a command during possibly the cluster default env setup. If a cluster is provided, @@ -152,14 +153,14 @@ def run_setup_command( return run_with_logs(cmd, stream_logs=stream_logs, require_outputs=True)[:2] return cluster._run_commands_with_runner( - [cmd], stream_logs=stream_logs, env_vars=env_vars + [cmd], stream_logs=stream_logs, env_vars=env_vars, node=node )[0] -def install_conda(cluster: "Cluster" = None): - if run_setup_command("conda --version", cluster=cluster)[0] != 0: +def install_conda(cluster: "Cluster" = None, node: Optional[str] = None): + if run_setup_command("conda --version", cluster=cluster, node=node)[0] != 0: logging.info("Conda is not installed. Installing...") for cmd in CONDA_INSTALL_CMDS: - run_setup_command(cmd, cluster=cluster, stream_logs=True) - if run_setup_command("conda --version", cluster=cluster)[0] != 0: + run_setup_command(cmd, cluster=cluster, node=node, stream_logs=True) + if run_setup_command("conda --version", cluster=cluster, node=node)[0] != 0: raise RuntimeError("Could not install Conda.") diff --git a/runhouse/resources/functions/function.py b/runhouse/resources/functions/function.py index b9e0e5530..6bae8af21 100644 --- a/runhouse/resources/functions/function.py +++ b/runhouse/resources/functions/function.py @@ -203,35 +203,6 @@ def config(self, condensed=True): ) return config - def send_secrets(self, providers: Optional[List[str]] = None): - """Send secrets to the system. - - Args: - providers (List[str], optional): List of secret names to send over to the system. If none are provided, - syncs over all locally detected provider secrets. (Default: ``None``) - - Example: - >>> remote_fn.send_secrets(providers=["aws", "lambda"]) - """ - self.system.sync_secrets(providers=providers) - - def http_url(self, curl_command=False, *args, **kwargs) -> str: - """ - Return the endpoint needed to run the Function on the remote cluster, or provide the curl command if requested. - """ - raise NotImplementedError("http_url not yet implemented for Function") - - def notebook(self, persist=False, sync_package_on_close=None, port_forward=8888): - """Tunnel into and launch notebook from the system.""" - if self.system is None: - raise RuntimeError("Cannot SSH, running locally") - - self.system.notebook( - persist=persist, - sync_package_on_close=sync_package_on_close, - port_forward=port_forward, - ) - def get_or_call( self, run_name: str, load_from_den: bool = True, *args, **kwargs ) -> Any: @@ -269,29 +240,6 @@ def get_or_call( return self.call(*args, **kwargs, run_name=run_name) - def keep_warm( - self, - autostop_mins=None, - ): - """Keep the system warm for autostop_mins. - - Args: - autostop_mins (int): Keep the cluster warm for this amount of time. - If ``None`` or -1, keep warm indefinitely. - - Example: - >>> # keep gpu warm for 30 mins - >>> remote_fn = rh.function(local_fn).to(gpu) - >>> remote_fn.keep_warm(autostop_mins=30) - """ - if autostop_mins is None: - logger.info(f"Keeping {self.name} indefinitely warm") - # keep indefinitely warm if user doesn't specify - autostop_mins = -1 - self.system.keep_warm(autostop_mins=autostop_mins) - - return self - @staticmethod def _handle_nb_fn(fn, fn_pointers, serialize_notebook_fn, name): """Handle the case where the user passes in a notebook function""" diff --git a/runhouse/resources/functions/function_factory.py b/runhouse/resources/functions/function_factory.py index 7e09ba3e3..a21959b4c 100644 --- a/runhouse/resources/functions/function_factory.py +++ b/runhouse/resources/functions/function_factory.py @@ -17,7 +17,6 @@ def function( env: Optional[Union[List[str], Env, str]] = None, load_from_den: bool = True, dryrun: bool = False, - load_secrets: bool = False, serialize_notebook_fn: bool = False, ): """runhouse.function(fn: str | Callable | None = None, name: str | None = None, system: str | Cluster | None = None, env: str | List[str] | Env | None = None, dryrun: bool = False, load_secrets: bool = False, serialize_notebook_fn: bool = False) @@ -26,15 +25,13 @@ def function( Args: fn (Optional[str or Callable]): The function to execute on the remote system when the function is called. - name (Optional[str], optional): Name of the Function to create or retrieve. + name (Optional[str]): Name of the Function to create or retrieve. This can be either from a local config or from the RNS. (Default: ``None``) env (Optional[List[str] or Env or str], optional): List of requirements to install on the remote cluster, or path to the requirements.txt file, or Env object or string name of an Env object. (Default: ``None``) load_from_den (bool, optional): Whether to try loading the function from Den. (Default: ``True``) dryrun (bool, optional): Whether to create the Function if it doesn't exist, or load the Function object as a dryrun. (Default: ``False``) - load_secrets (bool, optional): Whether or not to send secrets; only applicable if `dryrun` is set to ``False``. - (Default: ``False``) serialize_notebook_fn (bool, optional): If function is of a notebook setting, whether or not to serialized the function. (Default: ``False``) @@ -122,7 +119,4 @@ def function( new_function = Function(fn_pointers=fn_pointers, name=name, dryrun=dryrun, env=env) - if load_secrets and not dryrun: - new_function.send_secrets() - return new_function diff --git a/runhouse/resources/hardware/cluster.py b/runhouse/resources/hardware/cluster.py index b7e5e096c..7debb8cf8 100644 --- a/runhouse/resources/hardware/cluster.py +++ b/runhouse/resources/hardware/cluster.py @@ -438,12 +438,19 @@ def is_shared(self) -> bool: return f"{self._creds.name}/" in ssh_creds.get("ssh_private_key", "") - def _command_runner(self, node: Optional[str] = None) -> "CommandRunner": + def _command_runner( + self, node: Optional[str] = None, use_docker_exec: Optional[bool] = False + ) -> "CommandRunner": from runhouse.resources.hardware.sky_command_runner import ( SkyKubernetesRunner, SkySSHRunner, ) + if node == "all": + raise ValueError( + "CommandRunner can only be instantiated for individual nodes" + ) + node = node or self.address if ( @@ -468,7 +475,8 @@ def _command_runner(self, node: Optional[str] = None) -> "CommandRunner": ssh_private_key=ssh_credentials.get("ssh_private_key"), ssh_proxy_command=ssh_credentials.get("ssh_proxy_command"), ssh_control_name=ssh_control_name, - docker_user=self.docker_user, + docker_user=self.docker_user if not use_docker_exec else None, + use_docker_exec=use_docker_exec, ) return runner @@ -518,7 +526,8 @@ def _sync_default_env_to_cluster(self): logger.info(f"Using log level {log_level} on cluster's default env") logger.info(f"Syncing default env {self._default_env.name} to cluster") - self._default_env.install(cluster=self) + for node in self.ips: + self._default_env.install(cluster=self, node=node) def _sync_runhouse_to_cluster( self, @@ -557,7 +566,7 @@ def _sync_runhouse_to_cluster( contents=True, filter_options="- docs/", ) - rh_install_cmd = "python3 -m pip install ./runhouse" + rh_install_cmd = f"python3 -m pip install {dest_path}" else: # Package is installed in site-packages @@ -823,11 +832,10 @@ def connect_server_client(self, force_reconnect=False): system=self, ) - def status(self, resource_address: str = None, send_to_den: bool = False): + def status(self, send_to_den: bool = False): """Load the status of the Runhouse daemon running on a cluster. Args: - resource_address (str, optional): send_to_den (bool, optional): Whether to send and update the status in Den. Only applies to clusters that are saved to Den. (Default: ``False``) """ @@ -839,7 +847,6 @@ def status(self, resource_address: str = None, send_to_den: bool = False): else: status, den_resp_status_code = self.call_client_method( "status", - resource_address=resource_address or self.rns_address, send_to_den=send_to_den, ) @@ -1032,7 +1039,7 @@ def restart_server( user_config = yaml.safe_dump( { "token": rns_client.cluster_token( - rns_client.token, rns_client.username + resource_address=rns_client.username ), "username": rns_client.username, "default_folder": rns_client.default_folder, @@ -1165,7 +1172,6 @@ def call( method_to_call, module_name, method_name, - resource_address=self.rns_address, stream_logs=stream_logs, data={"args": args, "kwargs": kwargs}, run_name=run_name, @@ -1513,7 +1519,9 @@ def _run_commands_with_runner( ssh_credentials.pop("private_key", None) ssh_credentials.pop("public_key", None) - runner = self._command_runner(node=node) + runner = self._command_runner( + node=node, use_docker_exec=self.docker_user is not None + ) env_var_prefix = ( " ".join(f"{key}={val}" for key, val in env_vars.items()) diff --git a/runhouse/resources/hardware/on_demand_cluster.py b/runhouse/resources/hardware/on_demand_cluster.py index 6c8253864..6bdbac438 100644 --- a/runhouse/resources/hardware/on_demand_cluster.py +++ b/runhouse/resources/hardware/on_demand_cluster.py @@ -1,8 +1,10 @@ +import asyncio import contextlib import json import subprocess import time import warnings +from concurrent.futures import ProcessPoolExecutor from pathlib import Path from typing import Any, Dict, List, Union @@ -27,7 +29,11 @@ from runhouse.globals import configs, obj_store, rns_client from runhouse.logger import get_logger -from runhouse.resources.hardware.utils import ResourceServerStatus, ServerConnectionType +from runhouse.resources.hardware.utils import ( + ResourceServerStatus, + ServerConnectionType, + up_cluster_helper, +) from .cluster import Cluster @@ -403,12 +409,21 @@ def _populate_connection_from_status_dict(self, cluster_dict: Dict[str, Any]): instance_type = launched_resource.instance_type region = launched_resource.region cost_per_hr = launched_resource.get_cost(60 * 60) + disk_size = launched_resource.disk_size + num_cpus = launched_resource.cpus + self.launched_properties = { "cloud": cloud, "instance_type": instance_type, "region": region, "cost_per_hour": str(cost_per_hr), + "disk_size": disk_size, + "num_cpus": num_cpus, } + if launched_resource.accelerators: + self.launched_properties[ + "accelerators" + ] = launched_resource.accelerators if handle.ssh_user: self.launched_properties["ssh_user"] = handle.ssh_user if handle.docker_user: @@ -478,6 +493,27 @@ def num_cpus(self): return None + async def a_up(self, capture_output: Union[bool, str] = True): + """Up the cluster async in another process, so it can be parallelized and logs can be captured sanely. + + capture_output: If True, supress the output of the cluster creation process. If False, print the output + normally. If a string, write the output to the file at that path. + """ + + with ProcessPoolExecutor() as executor: + loop = asyncio.get_running_loop() + await loop.run_in_executor( + executor, up_cluster_helper, self, capture_output + ) + return self + + async def a_up_if_not(self, capture_output: Union[bool, str] = True): + if not self.is_up(): + # Don't store stale IPs + self.ips = None + await self.a_up(capture_output=capture_output) + return self + def up(self): """Up the cluster. diff --git a/runhouse/resources/hardware/sky_command_runner.py b/runhouse/resources/hardware/sky_command_runner.py index bff7503c4..8831d2ea7 100644 --- a/runhouse/resources/hardware/sky_command_runner.py +++ b/runhouse/resources/hardware/sky_command_runner.py @@ -66,6 +66,7 @@ def __init__( docker_user: Optional[str] = None, disable_control_master: Optional[bool] = False, local_bind_port: Optional[int] = None, + use_docker_exec: Optional[bool] = False, ): super().__init__( node, @@ -81,6 +82,7 @@ def __init__( self.docker_user = docker_user self.local_bind_port = local_bind_port self.remote_bind_port = None + self.use_docker_exec = use_docker_exec def _ssh_base_command( self, @@ -176,7 +178,10 @@ def run( if quiet_ssh: # RH MODIFIED base_ssh_command.append("-q") - if self.docker_user: # RH MODIFIED + if self.use_docker_exec: # RH MODIFIED + cmd = " ".join(cmd) if isinstance(cmd, list) else cmd + cmd = f"sudo docker exec {DEFAULT_DOCKER_CONTAINER_NAME} bash -c {shlex.quote(cmd)}" + elif self.docker_user: cmd = " ".join(cmd) if isinstance(cmd, list) else cmd cmd = f"conda deactivate && {cmd}" diff --git a/runhouse/resources/hardware/utils.py b/runhouse/resources/hardware/utils.py index 5105c9552..e98af49be 100644 --- a/runhouse/resources/hardware/utils.py +++ b/runhouse/resources/hardware/utils.py @@ -4,7 +4,7 @@ from enum import Enum from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union from runhouse.constants import ( CLUSTER_CONFIG_PATH, @@ -125,18 +125,18 @@ def _unnamed_default_env_name(cluster_name): return f"{cluster_name}_default_env" -def detect_cuda_version_or_cpu(cluster: "Cluster" = None): +def detect_cuda_version_or_cpu(cluster: "Cluster" = None, node: Optional[str] = None): """Return the CUDA version on the cluster. If we are on a CPU-only cluster return 'cpu'. Note: A cpu-only machine may have the CUDA toolkit installed, which means nvcc will still return a valid version. Also check if the NVIDIA driver is installed to confirm we are on a GPU.""" - status_codes = run_setup_command("nvcc --version", cluster=cluster) + status_codes = run_setup_command("nvcc --version", cluster=cluster, node=node) if not status_codes[0] == 0: return "cpu" cuda_version = status_codes[1].split("release ")[1].split(",")[0] - if run_setup_command("nvidia-smi", cluster=cluster)[0] == 0: + if run_setup_command("nvidia-smi", cluster=cluster, node=node)[0] == 0: return cuda_version return "cpu" @@ -227,3 +227,24 @@ def _ssh_base_command( def _generate_ssh_control_hash(ssh_control_name): return hashlib.md5(ssh_control_name.encode()).hexdigest()[:_HASH_MAX_LENGTH] + + +def up_cluster_helper(cluster, capture_output: Union[bool, str] = True): + from runhouse.utils import SuppressStd + + if capture_output: + try: + with SuppressStd() as outfile: + cluster.up() + except Exception as e: + if isinstance(capture_output, str): + logger.error( + f"Error starting cluster {cluster.name}, logs written to {capture_output}" + ) + raise e + finally: + if isinstance(capture_output, str): + with open(capture_output, "w") as f: + f.write(outfile.output) + else: + cluster.up() diff --git a/runhouse/resources/packages/package.py b/runhouse/resources/packages/package.py index 03f44f65e..fb5099c76 100644 --- a/runhouse/resources/packages/package.py +++ b/runhouse/resources/packages/package.py @@ -223,7 +223,12 @@ def _reqs_install_cmd( install_cmd = self._prepend_env_command(install_cmd, env=env) return install_cmd - def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): + def _install( + self, + env: Union[str, "Env"] = None, + cluster: "Cluster" = None, + node: Optional[str] = None, + ): """Install package. Args: @@ -240,6 +245,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): dest=str(self.install_target.path_to_sync_to_on_cluster), up=True, contents=True, + node=node, ) self.install_target.local_path = ( @@ -265,6 +271,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): retcode = run_setup_command( f"python -c \"import importlib.util; exit(0) if importlib.util.find_spec('{self.install_target}') else exit(1)\"", cluster=cluster, + node=node, )[0] if retcode != 0: self.install_target = ( @@ -273,7 +280,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): install_cmd = self._pip_install_cmd(env=env, cluster=cluster) logger.info(f"Running via install_method pip: {install_cmd}") - retcode = run_setup_command(install_cmd, cluster=cluster)[0] + retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0] if retcode != 0: raise RuntimeError( f"Pip install {install_cmd} failed, check that the package exists and is available for your platform." @@ -282,7 +289,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): elif self.install_method == "conda": install_cmd = self._conda_install_cmd(env=env, cluster=cluster) logger.info(f"Running via install_method conda: {install_cmd}") - retcode = run_setup_command(install_cmd, cluster=cluster)[0] + retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0] if retcode != 0: raise RuntimeError( f"Conda install {install_cmd} failed, check that the package exists and is " @@ -293,7 +300,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): install_cmd = self._reqs_install_cmd(env=env, cluster=cluster) if install_cmd: logger.info(f"Running via install_method reqs: {install_cmd}") - retcode = run_setup_command(install_cmd, cluster=cluster)[0] + retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0] if retcode != 0: raise RuntimeError( f"Reqs install {install_cmd} failed, check that the package exists and is available for your platform." @@ -315,6 +322,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): ) if not cluster else run_setup_command( f"export PATH=$PATH;{self.install_target.full_local_path_str()}", cluster=cluster, + node=node, ) elif not cluster: if Path(self.install_target).resolve().expanduser().exists(): diff --git a/runhouse/rns/rns_client.py b/runhouse/rns/rns_client.py index 721030623..f50652b0f 100644 --- a/runhouse/rns/rns_client.py +++ b/runhouse/rns/rns_client.py @@ -1,4 +1,3 @@ -import hashlib import importlib import json import os @@ -197,7 +196,6 @@ def request_headers( headers: dict = self._configs.request_headers if not headers: - # TODO: allow this? means we failed to load token from configs return None if "Authorization" not in headers: @@ -220,18 +218,52 @@ def request_headers( "Failed to extract token from request auth header. Expected in format: Bearer " ) - hashed_token = self.cluster_token(den_token, resource_address) + hashed_token = self.cluster_token(resource_address) return {"Authorization": f"Bearer {hashed_token}"} - def cluster_token(self, den_token: str, resource_address: str): + def cluster_token( + self, resource_address: str, username: str = None, den_token: str = None + ): + """Load the hashed token as generated in Den. Cache the token value in-memory for a given resource address. + Optionally provide a username and den token instead of using the default values stored in local configs.""" if resource_address and "/" in resource_address: # If provided as a full rns address, extract the top level directory resource_address = self.base_folder(resource_address) - hash_input = (den_token + resource_address).encode("utf-8") - hash_hex = hashlib.sha256(hash_input).hexdigest() - return f"{hash_hex}+{resource_address}+{self._configs.username}" + uri = f"{self.api_server_url}/auth/token/cluster" + token_payload = { + "resource_address": resource_address, + "username": username or self._configs.username, + } + + headers = ( + {"Authorization": f"Bearer {den_token}"} + if den_token + else self._configs.request_headers + ) + resp = self.session.post( + uri, + data=json.dumps(token_payload), + headers=headers, + ) + if resp.status_code != 200: + raise Exception( + f"Received [{resp.status_code}] from Den POST '{uri}': Failed to load cluster token: {load_resp_content(resp)}" + ) + + resp_data = read_resp_data(resp) + return resp_data.get("token") + + def validate_cluster_token(self, cluster_token: str, cluster_uri: str) -> bool: + """Checks whether a particular cluster token is valid for the given cluster URI""" + request_uri = self.resource_uri(cluster_uri) + uri = f"{self.api_server_url}/auth/token/cluster/{request_uri}" + resp = self.session.get( + uri, + headers={"Authorization": f"Bearer {cluster_token}"}, + ) + return resp.status_code == 200 def resource_request_payload(self, payload) -> dict: payload = remove_null_values_from_dict(payload) diff --git a/runhouse/servers/cluster_servlet.py b/runhouse/servers/cluster_servlet.py index 4720702f4..1ac95dd3a 100644 --- a/runhouse/servers/cluster_servlet.py +++ b/runhouse/servers/cluster_servlet.py @@ -93,11 +93,11 @@ async def __init__( ) collect_gpu_thread.start() - logger.info("Creating periodic_cluster_checks thread.") - cluster_checks_thread = threading.Thread( + logger.debug("Creating periodic_cluster_checks thread.") + self.cluster_checks_thread = threading.Thread( target=self.periodic_cluster_checks, daemon=True ) - cluster_checks_thread.start() + self.cluster_checks_thread.start() ############################################## # Cluster config state storage methods @@ -155,11 +155,16 @@ async def aresource_access_level( ) -> Union[str, None]: # If the token in this request matches that of the owner of the cluster, # they have access to everything - if configs.token and ( - configs.token == token - or rns_client.cluster_token(configs.token, resource_uri) == token - ): - return ResourceAccess.WRITE + config_token = configs.token + if config_token: + if config_token == token: + return ResourceAccess.WRITE + + if resource_uri and rns_client.validate_cluster_token( + cluster_token=token, cluster_uri=resource_uri + ): + return ResourceAccess.WRITE + return self._auth_cache.lookup_access_level(token, resource_uri) async def aget_username(self, token: str) -> str: @@ -259,7 +264,9 @@ async def asave_status_metrics_to_den(self, status: dict): status_data = { "status": ResourceServerStatus.running, - "resource_type": status_copy.get("cluster_config").pop("resource_type"), + "resource_type": status_copy.get("cluster_config").pop( + "resource_type", "cluster" + ), "resource_info": status_copy, "env_servlet_processes": env_servlet_processes, } @@ -275,6 +282,56 @@ async def asave_status_metrics_to_den(self, status: dict): def save_status_metrics_to_den(self, status: dict): return sync_function(self.asave_status_metrics_to_den)(status) + async def acheck_cluster_status(self, send_to_den: bool = True): + + logger.debug("Performing cluster status checks") + status, den_resp_status_code = await self.astatus(send_to_den=send_to_den) + + if not send_to_den: + return status, den_resp_status_code + + if den_resp_status_code == 404: + logger.info( + "Cluster has not yet been saved to Den, cannot update status or logs" + ) + elif den_resp_status_code != 200: + logger.error("Failed to send cluster status to Den") + else: + logger.debug("Successfully sent cluster status to Den") + + return status, den_resp_status_code + + async def acheck_cluster_logs(self, interval_size: int): + + logger.debug("Performing logs checks") + + cluster_config = await self.aget_cluster_config() + prev_end_log_line = cluster_config.get("end_log_line", 0) + ( + logs_den_resp, + new_start_log_line, + new_end_log_line, + ) = await self.send_cluster_logs_to_den( + prev_end_log_line=prev_end_log_line, + ) + if not logs_den_resp: + logger.debug( + f"No logs were generated in the past {interval_size} minute(s), logs were not sent to Den" + ) + + elif logs_den_resp.status_code == 200: + logger.debug("Successfully sent cluster logs to Den") + await self.aset_cluster_config_value( + key="start_log_line", value=new_start_log_line + ) + await self.aset_cluster_config_value( + key="end_log_line", value=new_end_log_line + ) + else: + logger.error("Failed to send logs to Den") + + return logs_den_resp, new_start_log_line, new_end_log_line + async def aperiodic_cluster_checks(self): """Periodically check the status of the cluster, gather metrics about the cluster's utilization & memory, and save it to Den.""" @@ -284,6 +341,16 @@ async def aperiodic_cluster_checks(self): "status_check_interval", DEFAULT_STATUS_CHECK_INTERVAL ) while True: + should_send_status_and_logs_to_den: bool = ( + configs.token is not None + and interval_size != -1 + and self._cluster_uri is not None + ) + should_update_autostop: bool = self.autostop_helper is not None + + if not should_send_status_and_logs_to_den and not should_update_autostop: + break + try: # Only if one of these is true, do we actually need to get the status from each EnvServlet should_send_status_and_logs_to_den: bool = ( @@ -300,7 +367,7 @@ async def aperiodic_cluster_checks(self): break logger.debug("Performing cluster checks") - status, status_code = await self.astatus( + status, den_resp_code = await self.acheck_cluster_status( send_to_den=should_send_status_and_logs_to_den ) @@ -311,13 +378,13 @@ async def aperiodic_cluster_checks(self): if not should_send_status_and_logs_to_den: break - if status_code == 404: + if den_resp_code == 404: logger.info( "Cluster has not yet been saved to Den, cannot update status or logs." ) - elif status_code != 200: + elif den_resp_code != 200: logger.error( - f"Failed to send cluster status to Den, status_code: {status_code}" + f"Failed to send cluster status to Den, status_code: {den_resp_code}" ) else: logger.debug("Successfully sent cluster status to Den") @@ -346,6 +413,8 @@ async def aperiodic_cluster_checks(self): # since we are setting a new values to the cluster_config, we need to reload it so the next # cluster check iteration will reference to the updated cluster config. cluster_config = await self.aget_cluster_config() + if den_resp_code == 200: + await self.acheck_cluster_logs(interval_size=interval_size) except Exception: logger.error( @@ -585,6 +654,7 @@ def status(self, send_to_den: bool = False): # Save cluster logs to Den ############################################## def _get_logs(self): + with open(SERVER_LOGFILE) as log_file: log_lines = log_file.readlines() cleaned_log_lines = [ColoredFormatter.format_log(line) for line in log_lines] @@ -596,7 +666,7 @@ def _generate_logs_file_name(self): async def send_cluster_logs_to_den( self, prev_end_log_line: int - ) -> Tuple[Optional[int], Optional[int], Optional[int]]: + ) -> Tuple[Optional[requests.Response], Optional[int], Optional[int]]: """Load the most recent logs from the server's log file and send them to Den.""" # setting to a list, so it will be easier to get the end line num + the logs delta to send to den. latest_logs = self._get_logs().split("\n") @@ -632,4 +702,4 @@ async def send_cluster_logs_to_den( f"{resp_status_code}: Failed to send cluster logs to Den: {post_logs_resp.json()}" ) - return resp_status_code, prev_end_log_line, new_end_log_line + return post_logs_resp, prev_end_log_line, new_end_log_line diff --git a/runhouse/servers/env_servlet.py b/runhouse/servers/env_servlet.py index d15c8ce50..b454bc856 100644 --- a/runhouse/servers/env_servlet.py +++ b/runhouse/servers/env_servlet.py @@ -229,7 +229,7 @@ def _get_env_cpu_usage(self, cluster_config: dict = None): if not cluster_config.get("resource_subtype") == "Cluster": stable_internal_external_ips = cluster_config.get( - "stable_internal_external_ips" + "stable_internal_external_ips", [] ) for ips_set in stable_internal_external_ips: internal_ip, external_ip = ips_set[0], ips_set[1] @@ -241,7 +241,7 @@ def _get_env_cpu_usage(self, cluster_config: dict = None): node_name = f"worker_{stable_internal_external_ips.index(ips_set)} ({external_ip})" else: # a case it is a BYO cluster, assume that first ip in the ips list is the head. - ips = cluster_config.get("ips") + ips = cluster_config.get("ips", []) if len(ips) == 1 or node_ip == ips[0]: node_name = f"head ({node_ip})" else: diff --git a/runhouse/servers/http/auth.py b/runhouse/servers/http/auth.py index 501e86be2..7ca1a7f65 100644 --- a/runhouse/servers/http/auth.py +++ b/runhouse/servers/http/auth.py @@ -85,13 +85,14 @@ async def averify_cluster_access( from runhouse.globals import configs, obj_store # The logged-in user always has full access to the cluster. This is especially important if they flip on - # Den Auth without saving the cluster. We may need to generate a subtoken here to check. + # Den Auth without saving the cluster. Note: The token saved in the cluster config is a hashed cluster token, + # which may match the token provided in the request headers. if configs.token: if configs.token == token: return True - if ( - cluster_uri - and rns_client.cluster_token(configs.token, cluster_uri) == token + + if cluster_uri and rns_client.validate_cluster_token( + cluster_token=token, cluster_uri=cluster_uri ): return True diff --git a/runhouse/servers/http/http_client.py b/runhouse/servers/http/http_client.py index 75dab47d3..6f141a15d 100644 --- a/runhouse/servers/http/http_client.py +++ b/runhouse/servers/http/http_client.py @@ -99,6 +99,7 @@ def __init__( ) self.log_formatter = ClusterLogsFormatter(self.system) + self._request_headers = rns_client.request_headers(self.resource_address) def _certs_are_self_signed(self) -> bool: """Checks whether the cert provided is self-signed. If it is, all client requests will include the path @@ -165,7 +166,6 @@ def request( self, endpoint, req_type="post", - resource_address=None, data=None, env=None, stream_logs=True, @@ -175,7 +175,7 @@ def request( timeout=None, headers: Union[Dict, None] = None, ): - headers = rns_client.request_headers(resource_address, headers) + headers = headers or self._request_headers json_dict = { "data": data, "env": env, @@ -202,11 +202,7 @@ def request_json( headers: Union[Dict, None] = None, ): # Support use case where we explicitly do not want to provide headers (e.g. requesting a cert) - headers = ( - rns_client.request_headers(self.resource_address) - if headers != {} - else headers - ) + headers = self._request_headers if headers != {} else headers req_fn = ( session.get if req_type == "get" @@ -276,13 +272,11 @@ def check_server(self): f"but local Runhouse version is ({runhouse.__version__})" ) - def status(self, resource_address: str, send_to_den: bool = False): + def status(self, send_to_den: bool = False): """Load the remote cluster's status.""" - # Note: Resource address must be specified in order to construct the cluster subtoken return self.request( f"status?send_to_den={send_to_den}", req_type="get", - resource_address=resource_address, ) def folder_ls(self, path: Union[str, Path], full_paths: bool, sort: bool): @@ -390,11 +384,11 @@ def call( method_name: str, data: Any = None, serialization: Optional[str] = None, - resource_address=None, run_name: Optional[str] = None, stream_logs: bool = True, remote: bool = False, save=False, + headers=None, ): """wrapper to temporarily support cluster's call signature""" return self.call_module_method( @@ -402,12 +396,12 @@ def call( method_name, data=data, serialization=serialization, - resource_address=resource_address or self.resource_address, run_name=run_name, stream_logs=stream_logs, remote=remote, save=save, system=self.system, + headers=headers, ) def call_module_method( @@ -416,12 +410,12 @@ def call_module_method( method_name: str, data: Any = None, serialization: Optional[str] = None, - resource_address=None, run_name: Optional[str] = None, stream_logs: bool = True, remote: bool = False, save=False, system=None, + headers=None, ): """ Client function to call the rpc for call_module_method @@ -451,7 +445,7 @@ def call_module_method( remote=remote, ).model_dump(), stream=True, - headers=rns_client.request_headers(resource_address), + headers=headers or self._request_headers, auth=self.auth, verify=self.verify, ) @@ -504,7 +498,6 @@ async def acall( method_name: str, data: Any = None, serialization: Optional[str] = None, - resource_address=None, run_name: Optional[str] = None, stream_logs: bool = True, remote: bool = False, @@ -517,7 +510,6 @@ async def acall( method_name, data=data, serialization=serialization, - resource_address=resource_address or self.resource_address, run_name=run_name, stream_logs=stream_logs, remote=remote, @@ -532,7 +524,6 @@ async def acall_module_method( method_name: str, data: Any = None, serialization: Optional[str] = None, - resource_address=None, run_name: Optional[str] = None, stream_logs: bool = True, remote: bool = False, @@ -569,7 +560,7 @@ async def acall_module_method( remote=remote, run_async=run_async, ).model_dump(), - headers=rns_client.request_headers(resource_address), + headers=self._request_headers, ) as res: if res.status_code != 200: raise ValueError( @@ -675,7 +666,7 @@ def set_settings(self, new_settings: Dict[str, Any]): res = retry_with_exponential_backoff(session.post)( self._formatted_url("settings"), json=new_settings, - headers=rns_client.request_headers(self.resource_address), + headers=self._request_headers, auth=self.auth, verify=self.verify, ) diff --git a/runhouse/servers/obj_store.py b/runhouse/servers/obj_store.py index 2016c2236..bc3e5046f 100644 --- a/runhouse/servers/obj_store.py +++ b/runhouse/servers/obj_store.py @@ -522,13 +522,14 @@ async def ahas_resource_access(self, token: str, resource_uri=None) -> bool: # The logged-in user always has full access to the cluster and its resources. This is especially # important if they flip on Den Auth without saving the cluster. - # configs.token is the token stored on the cluster itself - if configs.token: - if configs.token == token: + # configs.token is the token stored on the cluster itself, which is itself a hashed subtoken + config_token = configs.token + if config_token: + if config_token == token: return True - if ( - resource_uri - and rns_client.cluster_token(configs.token, resource_uri) == token + + if resource_uri and rns_client.validate_cluster_token( + cluster_token=token, cluster_uri=resource_uri ): return True diff --git a/runhouse/utils.py b/runhouse/utils.py index 833a28f4f..538752e00 100644 --- a/runhouse/utils.py +++ b/runhouse/utils.py @@ -2,7 +2,8 @@ import contextvars import functools import logging -from io import StringIO +import tempfile +from io import SEEK_SET, StringIO try: import importlib.metadata as metadata @@ -32,7 +33,7 @@ import pexpect -from runhouse.constants import LOGS_DIR +from runhouse.constants import RH_LOGFILE_PATH from runhouse.logger import get_logger, init_logger logger = get_logger(__name__) @@ -481,7 +482,7 @@ def _stderr_path(self) -> str: @staticmethod def _base_local_folder_path(name: str): """Path to the base folder for this Run on a local system.""" - return f"{LOGS_DIR}/{name}" + return f"{RH_LOGFILE_PATH}/{name}" @staticmethod def _filter_files_by_ext(files: list, ext: str): @@ -516,6 +517,58 @@ def _path_to_file_by_ext(self, ext: str) -> str: return path_to_ext +class SuppressStd(object): + """Context to capture stderr and stdout at C-level.""" + + def __init__(self, outfile=None): + self.orig_stdout_fileno = sys.__stdout__.fileno() + self.orig_stderr_fileno = sys.__stderr__.fileno() + self.output = None + + def __enter__(self): + # Redirect the stdout/stderr fd to temp file + self.orig_stdout_dup = os.dup(self.orig_stdout_fileno) + self.orig_stderr_dup = os.dup(self.orig_stderr_fileno) + self.tfile = tempfile.TemporaryFile(mode="w+b") + os.dup2(self.tfile.fileno(), self.orig_stdout_fileno) + os.dup2(self.tfile.fileno(), self.orig_stderr_fileno) + + # Store the stdout object and replace it by the temp file. + self.stdout_obj = sys.stdout + self.stderr_obj = sys.stderr + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + + return self + + def __exit__(self, exc_class, value, traceback): + + # Make sure to flush stdout + print(flush=True) + + # Restore the stdout/stderr object. + sys.stdout = self.stdout_obj + sys.stderr = self.stderr_obj + + # Close capture file handle + os.close(self.orig_stdout_fileno) + os.close(self.orig_stderr_fileno) + + # Restore original stderr and stdout + os.dup2(self.orig_stdout_dup, self.orig_stdout_fileno) + os.dup2(self.orig_stderr_dup, self.orig_stderr_fileno) + + # Close duplicate file handle. + os.close(self.orig_stdout_dup) + os.close(self.orig_stderr_dup) + + # Copy contents of temporary file to the given stream + self.tfile.flush() + self.tfile.seek(0, SEEK_SET) + self.output = self.tfile.read().decode() + self.tfile.close() + + #################################################################################################### # Name generation #################################################################################################### diff --git a/setup.py b/setup.py index edd99dfa0..50bc33586 100644 --- a/setup.py +++ b/setup.py @@ -62,6 +62,7 @@ def parse_readme(readme: str) -> str: install_requires = [ + "annotated-types>=0.6.0", "python-dotenv", "fastapi", "pexpect", @@ -141,11 +142,11 @@ def parse_readme(readme: str) -> str: }, include_package_data=True, classifiers=[ - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", "Topic :: Software Development :: Libraries :: Python Modules", diff --git a/tests/conftest.py b/tests/conftest.py index f2a06cdb2..89adc19ba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -38,10 +38,10 @@ class TestCluster(tests.test_resources.test_resource.TestResource): "docker_cluster_pwd_ssh_no_auth", ] } - MINIMAL = {"cluster": ["ondemand_aws_cluster"]} + MINIMAL = {"cluster": ["ondemand_aws_docker_cluster"]} RELEASE = { "cluster": [ - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "static_cpu_pwd_cluster", ] } @@ -50,9 +50,9 @@ class TestCluster(tests.test_resources.test_resource.TestResource): "docker_cluster_pk_ssh_no_auth", "docker_cluster_pk_ssh_den_auth", "docker_cluster_pwd_ssh_no_auth", - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "static_cpu_pwd_cluster", - "multinode_cpu_cluster" + "multinode_cpu_docker_conda_cluster" ] } @@ -241,9 +241,9 @@ def event_loop(): from tests.fixtures.on_demand_cluster_fixtures import ( a10g_gpu_cluster, # noqa: F401 k80_gpu_cluster, # noqa: F401 - multinode_cpu_cluster, # noqa: F401 + multinode_cpu_docker_conda_cluster, # noqa: F401 multinode_gpu_cluster, # noqa: F401 - ondemand_aws_cluster, # noqa: F401 + ondemand_aws_docker_cluster, # noqa: F401 ondemand_aws_https_cluster_with_auth, # noqa: F401 ondemand_cluster, # noqa: F401 ondemand_gcp_cluster, # noqa: F401 @@ -356,12 +356,12 @@ def event_loop(): } default_fixtures[TestLevels.MINIMAL] = { "cluster": [ - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", ] } default_fixtures[TestLevels.RELEASE] = { "cluster": [ - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "ondemand_gcp_cluster", "ondemand_k8s_cluster", "ondemand_k8s_docker_cluster", @@ -374,12 +374,12 @@ def event_loop(): "docker_cluster_pk_ssh_no_auth", "docker_cluster_pk_ssh_den_auth", "docker_cluster_pwd_ssh_no_auth", - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "ondemand_gcp_cluster", "ondemand_k8s_cluster", "ondemand_k8s_docker_cluster", "ondemand_aws_https_cluster_with_auth", - "multinode_cpu_cluster", + "multinode_cpu_docker_conda_cluster", "static_cpu_pwd_cluster", "multinode_gpu_cluster", # for testing cluster status on multinode gpu. ] diff --git a/tests/fixtures/folder_fixtures.py b/tests/fixtures/folder_fixtures.py index b51f2e072..ee0930380 100644 --- a/tests/fixtures/folder_fixtures.py +++ b/tests/fixtures/folder_fixtures.py @@ -49,13 +49,13 @@ def docker_cluster_folder(docker_cluster_pk_ssh_no_auth): @pytest.fixture -def cluster_folder(ondemand_aws_cluster): +def cluster_folder(ondemand_aws_docker_cluster): args = { "name": "test_cluster_folder", "path": "rh-folder", } - cluster_folder = rh.folder(**args).to(system=ondemand_aws_cluster) + cluster_folder = rh.folder(**args).to(system=ondemand_aws_docker_cluster) init_args[id(cluster_folder)] = args cluster_folder.put( {f"sample_file_{i}.txt": f"file{i}".encode() for i in range(3)}, overwrite=True diff --git a/tests/fixtures/on_demand_cluster_fixtures.py b/tests/fixtures/on_demand_cluster_fixtures.py index 6309e2cb6..cf4aed1de 100644 --- a/tests/fixtures/on_demand_cluster_fixtures.py +++ b/tests/fixtures/on_demand_cluster_fixtures.py @@ -38,7 +38,7 @@ def setup_test_cluster(args, request, create_env=False): @pytest.fixture( params=[ - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "ondemand_gcp_cluster", "ondemand_k8s_cluster", "ondemand_k8s_docker_cluster", @@ -53,7 +53,7 @@ def ondemand_cluster(request): @pytest.fixture(scope="session") -def ondemand_aws_cluster(request): +def ondemand_aws_docker_cluster(request): """ Note: Also used to test docker and default env with alternate Ray version. """ @@ -169,12 +169,16 @@ def a10g_gpu_cluster(request): @pytest.fixture(scope="session") -def multinode_cpu_cluster(request): +def multinode_cpu_docker_conda_cluster(request): args = { "name": "rh-cpu-multinode", "num_instances": NUM_OF_INSTANCES, "image_id": "docker:rayproject/ray:latest-py311-cpu", - "default_env": rh.env(reqs=["ray==2.30.0"], working_dir=None), + "default_env": rh.conda_env( + name="default_env", + reqs=test_env().reqs + ["ray==2.30.0"], + conda_env={"dependencies": ["python=3.11"], "name": "default_env"}, + ), "provider": "aws", "instance_type": "CPU:2+", } diff --git a/tests/test_den/test_rns.py b/tests/test_den/test_rns.py index b29cd4e86..d9e712a92 100644 --- a/tests/test_den/test_rns.py +++ b/tests/test_den/test_rns.py @@ -79,7 +79,7 @@ def test_ls(): rh.set_folder("@") -def test_from_name(ondemand_aws_cluster): +def test_from_name(ondemand_aws_docker_cluster): f = rh.folder(name="~/tests/bert_ft") assert f.path - assert ondemand_aws_cluster.instance_type == "CPU:2+" + assert ondemand_aws_docker_cluster.instance_type == "CPU:2+" diff --git a/tests/test_login.py b/tests/test_login.py index 05fe7d236..68c72e4d3 100644 --- a/tests/test_login.py +++ b/tests/test_login.py @@ -34,7 +34,7 @@ def add_secrets_to_vault(headers): def test_login_flow_in_new_env(): - token = os.getenv("TEST_TOKEN") + token = os.getenv("KITCHEN_TESTER_TOKEN") headers = {"Authorization": f"Bearer {token}"} add_secrets_to_vault(headers) diff --git a/tests/test_obj_store.py b/tests/test_obj_store.py index dfb853afc..b5fcf49b4 100644 --- a/tests/test_obj_store.py +++ b/tests/test_obj_store.py @@ -23,10 +23,10 @@ "docker_cluster_pk_ssh_no_auth", ] } -MINIMAL = {"cluster": ["ondemand_aws_cluster"]} +MINIMAL = {"cluster": ["ondemand_aws_docker_cluster"]} RELEASE = { "cluster": [ - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "ondemand_gcp_cluster", "ondemand_k8s_cluster", "ondemand_k8s_docker_cluster", @@ -38,12 +38,12 @@ "cluster": [ "docker_cluster_pwd_ssh_no_auth", "docker_cluster_pk_ssh_no_auth", - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "ondemand_gcp_cluster", "ondemand_k8s_cluster", "ondemand_k8s_docker_cluster", "ondemand_aws_https_cluster_with_auth", - "multinode_cpu_cluster", + "multinode_cpu_docker_conda_cluster", "static_cpu_pwd_cluster", ] } diff --git a/tests/test_resources/test_clusters/cluster_tests.py b/tests/test_resources/test_clusters/cluster_tests.py index afbee45a0..cb115d56b 100644 --- a/tests/test_resources/test_clusters/cluster_tests.py +++ b/tests/test_resources/test_clusters/cluster_tests.py @@ -26,14 +26,14 @@ def sd_generate_image(prompt): return model(prompt).images[0] -def test_cluster_config(ondemand_aws_cluster): - config = ondemand_aws_cluster.config() +def test_cluster_config(ondemand_aws_docker_cluster): + config = ondemand_aws_docker_cluster.config() cluster2 = OnDemandCluster.from_config(config) - assert cluster2.address == ondemand_aws_cluster.address + assert cluster2.address == ondemand_aws_docker_cluster.address -def test_cluster_sharing(ondemand_aws_cluster): - ondemand_aws_cluster.share( +def test_cluster_sharing(ondemand_aws_docker_cluster): + ondemand_aws_docker_cluster.share( users=["donny@run.house", "josh@run.house"], access_level="write", notify_users=False, @@ -41,8 +41,10 @@ def test_cluster_sharing(ondemand_aws_cluster): assert True -def test_read_shared_cluster(ondemand_aws_cluster): - res = ondemand_aws_cluster.run_python(["import numpy", "print(numpy.__version__)"]) +def test_read_shared_cluster(ondemand_aws_docker_cluster): + res = ondemand_aws_docker_cluster.run_python( + ["import numpy", "print(numpy.__version__)"] + ) assert res[0][1] @@ -78,8 +80,8 @@ def test_on_same_cluster(cluster): assert func_hw(hw_copy) -def test_on_diff_cluster(ondemand_aws_cluster, static_cpu_pwd_cluster): - func_hw = rh.function(is_on_cluster).to(ondemand_aws_cluster) +def test_on_diff_cluster(ondemand_aws_docker_cluster, static_cpu_pwd_cluster): + func_hw = rh.function(is_on_cluster).to(ondemand_aws_docker_cluster) assert not func_hw(static_cpu_pwd_cluster) diff --git a/tests/test_resources/test_clusters/test_cluster.py b/tests/test_resources/test_clusters/test_cluster.py index fd63d2805..767d66a19 100644 --- a/tests/test_resources/test_clusters/test_cluster.py +++ b/tests/test_resources/test_clusters/test_cluster.py @@ -124,7 +124,7 @@ class TestCluster(tests.test_resources.test_resource.TestResource): "docker_cluster_pk_ssh_den_auth", "docker_cluster_pwd_ssh_no_auth", "static_cpu_pwd_cluster", - "multinode_cpu_cluster", + "multinode_cpu_docker_conda_cluster", ] } @@ -425,16 +425,20 @@ def test_caller_token_propagated(self, cluster): remote_assume_caller_and_get_token.share( users=["info@run.house"], notify_users=False ) - with friend_account(): unassumed_token, assumed_token = remote_assume_caller_and_get_token() # "Local token" is the token the cluster accesses in rh.configs.token; this is what will be used # in subsequent rns_client calls - assert assumed_token == rh.globals.rns_client.cluster_token( - rh.configs.token, cluster.rns_address - ) assert unassumed_token != rh.configs.token + # Both tokens should be valid for the cluster + assert rh.globals.rns_client.validate_cluster_token( + assumed_token, cluster.rns_address + ) + assert rh.globals.rns_client.validate_cluster_token( + unassumed_token, cluster.rns_address + ) + # Docker clusters are logged out, ondemand clusters are logged in output = cluster.run("sed -n 's/.*token: *//p' ~/.rh/config.yaml") # No config file @@ -591,7 +595,12 @@ def status_cli_test_logic(self, cluster, status_cli_command: str): assert "node: " in status_output_string assert status_output_string.count("node: ") >= 1 - # if it is a GPU cluster, check GPU print as well + cloud_properties = cluster.config().get("launched_properties", None) + if cloud_properties: + properties_to_check = ["cloud", "instance_type", "region", "cost_per_hour"] + for p in properties_to_check: + property_value = cloud_properties.get(p) + assert property_value in status_output_string @pytest.mark.level("local") @pytest.mark.clustertest diff --git a/tests/test_resources/test_clusters/test_multinode_cluster.py b/tests/test_resources/test_clusters/test_multinode_cluster.py index e826e6546..fde6d4309 100644 --- a/tests/test_resources/test_clusters/test_multinode_cluster.py +++ b/tests/test_resources/test_clusters/test_multinode_cluster.py @@ -9,15 +9,15 @@ class TestMultiNodeCluster: @pytest.mark.level("release") - def test_rsync_and_ssh_onto_worker_node(self, multinode_cpu_cluster): - worker_node = multinode_cpu_cluster.ips[-1] + def test_rsync_and_ssh_onto_worker_node(self, multinode_cpu_docker_conda_cluster): + worker_node = multinode_cpu_docker_conda_cluster.ips[-1] local_rh_package_path = Path(importlib.util.find_spec("runhouse").origin).parent local_rh_package_path = local_rh_package_path.parent dest_path = f"~/{local_rh_package_path.name}" # Rsync Runhouse package onto the worker node - multinode_cpu_cluster.rsync( + multinode_cpu_docker_conda_cluster.rsync( source=str(local_rh_package_path), dest=dest_path, up=True, @@ -25,45 +25,43 @@ def test_rsync_and_ssh_onto_worker_node(self, multinode_cpu_cluster): contents=True, ) - status_codes = multinode_cpu_cluster.run(["ls -l", dest_path], node=worker_node) + status_codes = multinode_cpu_docker_conda_cluster.run( + [f"ls -l {dest_path}"], node=worker_node + ) assert status_codes[0][0] == 0 assert "runhouse" in status_codes[0][1] @pytest.mark.level("release") def test_ray_started_on_worker_node_after_cluster_restart( - self, multinode_cpu_cluster + self, multinode_cpu_docker_conda_cluster ): - head_node = multinode_cpu_cluster.ips[0] + head_node = multinode_cpu_docker_conda_cluster.ips[0] - status_codes = multinode_cpu_cluster.run(["ray status"], node=head_node) + status_codes = multinode_cpu_docker_conda_cluster.run( + ["ray status"], node=head_node + ) assert status_codes[0][0] == 0 - status_code_strings = [] - - for status_code in status_codes: - # Convert each element of the tuple to a string and join them with ", " - status_code_string = ", ".join(map(str, status_code)) - status_code_strings.append(status_code_string) - - return_value = ", ".join(status_code_strings) + status_output = status_codes[0][1] node_marker = "1 node_" - num_nodes = return_value.count(node_marker) - + num_nodes = status_output.count(node_marker) assert num_nodes == 2 @pytest.mark.level("release") - def test_send_envs_to_specific_worker_node(self, multinode_cpu_cluster): + def test_send_envs_to_specific_worker_node( + self, multinode_cpu_docker_conda_cluster + ): env_0 = rh.env( name="worker_env_0", reqs=["langchain", "pytest"], - ).to(multinode_cpu_cluster, node_idx=0) + ).to(multinode_cpu_docker_conda_cluster, node_idx=0) env_1 = rh.env( name="worker_env_1", reqs=["torch", "pytest"], - ).to(multinode_cpu_cluster, node_idx=1) + ).to(multinode_cpu_docker_conda_cluster, node_idx=1) env_2 = rh.env( name="worker_env_2", @@ -71,54 +69,57 @@ def test_send_envs_to_specific_worker_node(self, multinode_cpu_cluster): ) with pytest.raises(ValueError): - env_2.to(multinode_cpu_cluster, node_idx=len(multinode_cpu_cluster.ips)) + env_2.to( + multinode_cpu_docker_conda_cluster, + node_idx=len(multinode_cpu_docker_conda_cluster.ips), + ) - env_2.to(multinode_cpu_cluster, node_idx=1) + env_2.to(multinode_cpu_docker_conda_cluster, node_idx=1) get_pid_0 = rh.function(get_pid_and_ray_node).to( - name="get_pid_0", system=multinode_cpu_cluster, env=env_0 + name="get_pid_0", system=multinode_cpu_docker_conda_cluster, env=env_0 ) get_pid_1 = rh.function(get_pid_and_ray_node).to( - name="get_pid_1", system=multinode_cpu_cluster, env=env_1 + name="get_pid_1", system=multinode_cpu_docker_conda_cluster, env=env_1 ) get_pid_2 = rh.function(get_pid_and_ray_node).to( - name="get_pid_2", system=multinode_cpu_cluster, env=env_2 + name="get_pid_2", system=multinode_cpu_docker_conda_cluster, env=env_2 ) assert get_pid_0()[1] != get_pid_1()[1] assert get_pid_1()[1] == get_pid_2()[1] @pytest.mark.level("release") - def test_specifying_resources(self, multinode_cpu_cluster): + def test_specifying_resources(self, multinode_cpu_docker_conda_cluster): env0 = rh.env( name="worker_env_0", compute={"CPU": 1.75}, - ).to(multinode_cpu_cluster) + ).to(multinode_cpu_docker_conda_cluster) env1 = rh.env( name="worker_env_1", compute={"CPU": 0.5}, - ).to(multinode_cpu_cluster) + ).to(multinode_cpu_docker_conda_cluster) env2 = rh.env( name="worker_env_2", compute={"memory": 4 * 1024 * 1024 * 1024}, - ).to(multinode_cpu_cluster) + ).to(multinode_cpu_docker_conda_cluster) env3 = rh.env( name="worker_env_3", compute={"CPU": 0.1, "memory": 2 * 1024 * 1024 * 1024}, - ).to(multinode_cpu_cluster) + ).to(multinode_cpu_docker_conda_cluster) - status = multinode_cpu_cluster.status() + status = multinode_cpu_docker_conda_cluster.status() env0_node = status["env_servlet_processes"][env0.name]["node_ip"] env1_node = status["env_servlet_processes"][env1.name]["node_ip"] env2_node = status["env_servlet_processes"][env2.name]["node_ip"] env3_node = status["env_servlet_processes"][env3.name]["node_ip"] - assert env0_node in multinode_cpu_cluster.internal_ips - assert env1_node in multinode_cpu_cluster.internal_ips - assert env2_node in multinode_cpu_cluster.internal_ips - assert env3_node in multinode_cpu_cluster.internal_ips + assert env0_node in multinode_cpu_docker_conda_cluster.internal_ips + assert env1_node in multinode_cpu_docker_conda_cluster.internal_ips + assert env2_node in multinode_cpu_docker_conda_cluster.internal_ips + assert env3_node in multinode_cpu_docker_conda_cluster.internal_ips assert env0_node != env1_node # Too much CPU assert env2_node != env3_node # Too much memory diff --git a/tests/test_resources/test_clusters/test_on_demand_cluster.py b/tests/test_resources/test_clusters/test_on_demand_cluster.py index 903740959..7df71f149 100644 --- a/tests/test_resources/test_clusters/test_on_demand_cluster.py +++ b/tests/test_resources/test_clusters/test_on_demand_cluster.py @@ -2,9 +2,13 @@ import time import pytest +import requests import runhouse as rh +from runhouse.globals import rns_client +from runhouse.resources.hardware.utils import ResourceServerStatus + import tests.test_resources.test_clusters.test_cluster from tests.utils import friend_account @@ -60,14 +64,14 @@ class TestOnDemandCluster(tests.test_resources.test_clusters.test_cluster.TestCl LOCAL = {"cluster": []} MINIMAL = { "cluster": [ - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "ondemand_gcp_cluster", "ondemand_k8s_cluster", ] } RELEASE = { "cluster": [ - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "ondemand_gcp_cluster", "ondemand_aws_https_cluster_with_auth", "ondemand_k8s_cluster", @@ -76,7 +80,7 @@ class TestOnDemandCluster(tests.test_resources.test_clusters.test_cluster.TestCl } MAXIMAL = { "cluster": [ - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "ondemand_gcp_cluster", "ondemand_k8s_cluster", "ondemand_k8s_docker_cluster", @@ -85,7 +89,7 @@ class TestOnDemandCluster(tests.test_resources.test_clusters.test_cluster.TestCl "k80_gpu_cluster", "a10g_gpu_cluster", "static_cpu_pwd_cluster", - "multinode_cpu_cluster", + "multinode_cpu_docker_conda_cluster", "multinode_gpu_cluster", ] } @@ -189,11 +193,45 @@ def test_cluster_ping_and_is_up(self, cluster): assert cluster.ips == original_ips @pytest.mark.level("release") - def test_docker_container_reqs(self, ondemand_aws_cluster): - ret_code = ondemand_aws_cluster.run("pip freeze | grep torch")[0][0] + def test_docker_container_reqs(self, ondemand_aws_docker_cluster): + ret_code = ondemand_aws_docker_cluster.run("pip freeze | grep torch")[0][0] assert ret_code == 0 @pytest.mark.level("release") - def test_fn_to_docker_container(self, ondemand_aws_cluster): - remote_torch_exists = rh.function(torch_exists).to(ondemand_aws_cluster) + def test_fn_to_docker_container(self, ondemand_aws_docker_cluster): + remote_torch_exists = rh.function(torch_exists).to(ondemand_aws_docker_cluster) assert remote_torch_exists() + + #################################################################################################### + # Status tests + #################################################################################################### + + # TODO: Affects cluster state, causes other tests to fail with ssh connection errors + @pytest.mark.skip() + @pytest.mark.level("minimal") + def test_set_status_after_teardown(self, cluster, mocker): + mock_function = mocker.patch("sky.down") + response = cluster.teardown() + assert isinstance(response, int) + assert ( + response == 200 + ) # that means that the call to post status endpoint in den was successful + mock_function.assert_called_once() + + cluster_config = cluster.config() + cluster_uri = rns_client.format_rns_address(cluster.rns_address) + api_server_url = cluster_config.get("api_server_url", rns_client.api_server_url) + cluster.teardown() + get_status_data_resp = requests.get( + f"{api_server_url}/resource/{cluster_uri}/cluster/status", + headers=rns_client.request_headers(), + ) + + assert get_status_data_resp.status_code == 200 + # For UI displaying purposes, the cluster/status endpoint returns cluster status history. + # The latest status info is the first element in the list returned by the endpoint. + get_status_data = get_status_data_resp.json()["data"][0] + assert get_status_data["resource_type"] == cluster_config.get("resource_type") + assert get_status_data["status"] == ResourceServerStatus.terminated + + assert cluster.is_up() diff --git a/tests/test_resources/test_data/test_package.py b/tests/test_resources/test_data/test_package.py index 030425a58..37be17801 100644 --- a/tests/test_resources/test_data/test_package.py +++ b/tests/test_resources/test_data/test_package.py @@ -39,12 +39,12 @@ class TestPackage(tests.test_resources.test_resource.TestResource): MINIMAL = { "package": packages, - "cluster": ["ondemand_aws_cluster"], + "cluster": ["ondemand_aws_docker_cluster"], } RELEASE = { "package": packages, - "cluster": ["ondemand_aws_cluster"], + "cluster": ["ondemand_aws_docker_cluster"], } @pytest.mark.level("unit") diff --git a/tests/test_resources/test_envs/test_env.py b/tests/test_resources/test_envs/test_env.py index 605b3ede0..b7b173c85 100644 --- a/tests/test_resources/test_envs/test_env.py +++ b/tests/test_resources/test_envs/test_env.py @@ -65,7 +65,7 @@ class TestEnv(tests.test_resources.test_resource.TestResource): "base_conda_env", "named_conda_env_from_dict", ], - "cluster": ["ondemand_aws_cluster"], + "cluster": ["ondemand_aws_docker_cluster"], } RELEASE = { "env": [ @@ -75,7 +75,7 @@ class TestEnv(tests.test_resources.test_resource.TestResource): "named_conda_env_from_dict", ], "cluster": [ - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "static_cpu_pwd_cluster", ], } @@ -89,13 +89,13 @@ class TestEnv(tests.test_resources.test_resource.TestResource): "conda_env_from_path", ], "cluster": [ - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "ondemand_gcp_cluster", "ondemand_k8s_cluster", "ondemand_k8s_docker_cluster", "ondemand_aws_https_cluster_with_auth", "static_cpu_pwd_cluster", - "multinode_cpu_cluster", + "multinode_cpu_docker_conda_cluster", "docker_cluster_pk_ssh_no_auth", "docker_cluster_pwd_ssh_no_auth", "docker_cluster_pk_ssh_den_auth", diff --git a/tests/test_resources/test_modules/test_functions/conftest.py b/tests/test_resources/test_modules/test_functions/conftest.py index 98dd2945b..db5c66648 100644 --- a/tests/test_resources/test_modules/test_functions/conftest.py +++ b/tests/test_resources/test_modules/test_functions/conftest.py @@ -24,9 +24,9 @@ def slow_running_func(a, b): @pytest.fixture(scope="session") -def summer_func(ondemand_aws_cluster): +def summer_func(ondemand_aws_docker_cluster): args = {"name": "summer_func", "fn": summer} - f = rh.function(**args).to(ondemand_aws_cluster, env=["pytest"]) + f = rh.function(**args).to(ondemand_aws_docker_cluster, env=["pytest"]) init_args[id(f)] = args return f @@ -44,14 +44,14 @@ def summer_func_shared(shared_cluster): @pytest.fixture(scope="session") -def func_with_artifacts(ondemand_aws_cluster): +def func_with_artifacts(ondemand_aws_docker_cluster): return rh.function(save_and_load_artifacts, name="artifacts_func").to( - ondemand_aws_cluster, env=["pytest"] + ondemand_aws_docker_cluster, env=["pytest"] ) @pytest.fixture(scope="session") -def slow_func(ondemand_aws_cluster): +def slow_func(ondemand_aws_docker_cluster): return rh.function(slow_running_func, name="slow_func").to( - ondemand_aws_cluster, env=["pytest"] + ondemand_aws_docker_cluster, env=["pytest"] ) diff --git a/tests/test_resources/test_modules/test_functions/test_function.py b/tests/test_resources/test_modules/test_functions/test_function.py index 5199e02f5..8b08e5930 100644 --- a/tests/test_resources/test_modules/test_functions/test_function.py +++ b/tests/test_resources/test_modules/test_functions/test_function.py @@ -259,7 +259,7 @@ def test_function_external_fn(self, cluster): assert int(res) == 10 @pytest.mark.skip("Runs indefinitely.") - # originally used ondemand_aws_cluster, therefore marked as minimal + # originally used ondemand_aws_docker_cluster, therefore marked as minimal @pytest.mark.level("minimal") def test_notebook(self, cluster): nb_sum = lambda x: multiproc_np_sum(x) @@ -307,14 +307,16 @@ def test_share_and_revoke_function(self, cluster, test_rns_folder): @pytest.mark.level("release") def test_load_function_in_new_cluster( - self, ondemand_aws_cluster, static_cpu_pwd_cluster, test_rns_folder + self, ondemand_aws_docker_cluster, static_cpu_pwd_cluster, test_rns_folder ): remote_func_name = get_remote_func_name(test_rns_folder) - ondemand_aws_cluster.save( - f"@/{ondemand_aws_cluster.name}" + ondemand_aws_docker_cluster.save( + f"@/{ondemand_aws_docker_cluster.name}" ) # Needs to be saved to rns, right now has a local name by default - remote_sum = rh.function(summer).to(ondemand_aws_cluster).save(remote_func_name) + remote_sum = ( + rh.function(summer).to(ondemand_aws_docker_cluster).save(remote_func_name) + ) static_cpu_pwd_cluster.sync_secrets(["sky"]) remote_python = ( @@ -329,8 +331,10 @@ def test_load_function_in_new_cluster( remote_sum.delete_configs() @pytest.mark.level("release") - def test_nested_diff_clusters(self, ondemand_aws_cluster, static_cpu_pwd_cluster): - summer_cpu = rh.function(summer).to(ondemand_aws_cluster) + def test_nested_diff_clusters( + self, ondemand_aws_docker_cluster, static_cpu_pwd_cluster + ): + summer_cpu = rh.function(summer).to(ondemand_aws_docker_cluster) call_function_diff_cpu = rh.function(call_function).to(static_cpu_pwd_cluster) kwargs = {"a": 1, "b": 5} @@ -423,9 +427,6 @@ def test_get_or_call(self, cluster): # TODO: should not use .resolved_state(), need to be fixed. assert get_or_call_res == 17 - # TODO: should consider testing Function.send_secrets after secrets refactor. - # TODO: should consider testing Function.keep_warm. - # ---------- Unittests ---------- @pytest.fixture(autouse=True) def init_fixtures(self): @@ -439,75 +440,6 @@ def test_get_unittest(self, mocker): assert response == 5 mock_function.assert_called_once_with(run_key="my_mocked_run") - @pytest.mark.level("unit") - def test_keep_warm_byo_unittest(self, mocker): - # Create a Mock instance for Function - mock_function = mocker.patch("runhouse.Function.keep_warm") - mock_function.return_value = self.function - - # Create a Mock instance for cluster - mock_cluster = mocker.patch("runhouse.cluster") - - # BYO cluster - regular_cluster = mock_cluster(name="regular_cluster") - regular_cluster.autostop_mins.return_value = 1 - - # Set the system attribute - self.function.system = regular_cluster - - # Call the method under test - response_regular = self.function.keep_warm(autostop_mins=1) - - # Assertions - mock_function.assert_called_once_with(autostop_mins=1) - assert ( - response_regular.system.autostop_mins.return_value - == self.function.system.autostop_mins.return_value - ) - assert self.function.system.autostop_mins.return_value == 1 - assert response_regular.system.autostop_mins.return_value == 1 - - # Reset the system attribute - self.function.system = None - - @pytest.mark.level("unit") - def test_keep_warm_on_demand_unittest(self, mocker): - mock_function = mocker.patch("runhouse.Function.keep_warm") - mock_function.return_value = self.function - - # Create a Mock instance for cluster - mock_cluster = mocker.patch("runhouse.OnDemandCluster") - - # on demand cluster - on_demand_cluster = mock_cluster(name="on_demand_cluster", instance_type="aws") - on_demand_cluster.autostop_mins.return_value = 2 - - # Set the system attribute - self.function.system = on_demand_cluster - - # Call the method under test - response_on_demand = self.function.keep_warm(autostop_mins=2) - - # Assertions - mock_function.assert_called_once_with(autostop_mins=2) - assert ( - response_on_demand.system.autostop_mins.return_value - == self.function.system.autostop_mins.return_value - ) - assert self.function.system.autostop_mins.return_value == 2 - assert response_on_demand.system.autostop_mins.return_value == 2 - - # Reset the system attribute - self.function.system = None - - @pytest.mark.level("unit") - def test_notebook_unittest(self, mocker): - mock_function = mocker.patch("runhouse.Function.notebook") - mock_function.return_value = None - response = self.function.notebook() - mock_function.assert_called_once_with() - assert response is None - @pytest.mark.level("unit") def test_call_unittest(self, mocker): mock_function = mocker.patch("runhouse.Function.call") @@ -565,12 +497,6 @@ def test_get_or_call_unittest(self, mocker): second_response = self.function.get_or_call("my_run_first_time") assert second_response == 17 - @pytest.mark.level("unit") - @pytest.mark.skip("Maybe send secrets is not relevant") - def test_send_secrets_unittest(self, mock_get): - # TODO: need to think if send_secrets is a relevant Function method - pass - @pytest.mark.level("local") @pytest.mark.asyncio async def test_run_async_permutations(self, cluster): diff --git a/tests/test_resources/test_resource_sharing.py b/tests/test_resources/test_resource_sharing.py index 960bd4f10..0a1515db1 100644 --- a/tests/test_resources/test_resource_sharing.py +++ b/tests/test_resources/test_resource_sharing.py @@ -63,7 +63,6 @@ def call_cluster_methods(cluster, valid_token): @pytest.mark.level("local") def test_calling_shared_resource(self, resource): - current_token = rh.configs.token cluster = resource.system # Run commands on cluster with current token @@ -71,7 +70,7 @@ def test_calling_shared_resource(self, resource): assert return_codes[0][0] == 0 # Call function with current token via CURL - cluster_token = rns_client.cluster_token(current_token, cluster.rns_address) + cluster_token = rns_client.cluster_token(cluster.rns_address) res = self.call_func_with_curl( cluster, resource.name, cluster_token, **{"a": 1, "b": 2} ) @@ -119,14 +118,18 @@ def test_use_resource_apis(self, resource): # Use invalid token to confirm no function access rh.configs.token = "abc123" - try: - resource(2, 2) == 4 - except Exception as e: - assert "Unauthorized access to resource summer." in str(e) + with pytest.raises(Exception): + # cluster will throw error since the cluster token is invalid + # note: use the "call" method directly in order to pass new request headers with invalid token + cluster._http_client.call( + key=reloaded_func.name, + method_name="call", + headers=rns_client.request_headers(resource.rns_address), + ) # Reset back to valid token and confirm we can call function again rh.configs.token = current_token - cluster_token = rns_client.cluster_token(current_token, cluster.rns_address) + cluster_token = rns_client.cluster_token(cluster.rns_address) res = self.call_func_with_curl( cluster, resource.name, cluster_token, **{"a": 1, "b": 2} @@ -138,7 +141,6 @@ def test_calling_resource_with_cluster_write_access(self, resource): """Check that a user with write access to a cluster can call a function on that cluster, even without having explicit access to the function.""" current_username = rh.configs.username - current_token = rh.configs.token cluster = resource.system cluster_uri = rns_client.resource_uri(cluster.rns_address) @@ -170,7 +172,7 @@ def test_calling_resource_with_cluster_write_access(self, resource): ) # Confirm user can still call the function with write access to the cluster - cluster_token = rns_client.cluster_token(current_token, cluster.rns_address) + cluster_token = rns_client.cluster_token(cluster.rns_address) res = self.call_func_with_curl( cluster, resource.name, @@ -188,7 +190,6 @@ def test_calling_resource_with_no_cluster_access(self, resource): """Check that a user with no access to the cluster can still call a function on that cluster if they were given explicit access to the function.""" current_username = rh.configs.username - current_token = rh.configs.token cluster = resource.system cluster_uri = rns_client.resource_uri(cluster.rns_address) @@ -205,7 +206,7 @@ def test_calling_resource_with_no_cluster_access(self, resource): ), f"Failed to remove access to the cluster for user: {current_username}: {resp.text}" # Confirm current user can still call the function (which they still have explicit access to) - cluster_token = rns_client.cluster_token(current_token, cluster.rns_address) + cluster_token = rns_client.cluster_token(cluster.rns_address) res = self.call_func_with_curl( cluster, resource.name, cluster_token, **{"a": 1, "b": 2} ) @@ -219,7 +220,6 @@ def test_calling_resource_with_cluster_read_access(self, resource): """Check that a user with read only access to the cluster cannot call a function on that cluster if they do not explicitly have access to the function itself.""" current_username = rh.configs.username - current_token = rh.configs.token cluster = resource.system cluster_uri = rns_client.resource_uri(cluster.rns_address) @@ -259,7 +259,7 @@ def test_calling_resource_with_cluster_read_access(self, resource): cluster.enable_den_auth(flush=True) # Confirm user can no longer call the function with read only access to the cluster and no function access - cluster_token = rns_client.cluster_token(current_token, cluster.rns_address) + cluster_token = rns_client.cluster_token(cluster.rns_address) res = self.call_func_with_curl( cluster, resource.name, cluster_token, **{"a": 1, "b": 2} ) diff --git a/tests/test_resources/test_secrets/test_secret.py b/tests/test_resources/test_secrets/test_secret.py index c533ec75d..0cfa546f3 100644 --- a/tests/test_resources/test_secrets/test_secret.py +++ b/tests/test_resources/test_secrets/test_secret.py @@ -78,25 +78,25 @@ class TestSecret(tests.test_resources.test_resource.TestResource): "openai_secret", "custom_provider_secret", ], - "cluster": ["ondemand_aws_cluster"], + "cluster": ["ondemand_aws_docker_cluster"], } RELEASE = { "secret": ["test_secret"] + provider_secrets, "cluster": [ - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "static_cpu_pwd_cluster", ], } MAXIMAL = { "secret": ["test_secret"] + provider_secrets, "cluster": [ - "ondemand_aws_cluster", + "ondemand_aws_docker_cluster", "ondemand_gcp_cluster", "ondemand_k8s_cluster", "ondemand_k8s_docker_cluster", "ondemand_aws_https_cluster_with_auth", "static_cpu_pwd_cluster", - "multinode_cpu_cluster", + "multinode_cpu_docker_conda_cluster", "docker_cluster_pk_ssh_no_auth", "docker_cluster_pwd_ssh_no_auth", "docker_cluster_pk_ssh_den_auth", diff --git a/tests/test_servers/conftest.py b/tests/test_servers/conftest.py index 0838ec9b9..63778eafc 100644 --- a/tests/test_servers/conftest.py +++ b/tests/test_servers/conftest.py @@ -14,7 +14,11 @@ from runhouse.servers.http.certs import TLSCertConfig from runhouse.servers.http.http_server import app, HTTPServer -from tests.utils import friend_account, get_ray_servlet_and_obj_store +from tests.utils import ( + friend_account, + get_ray_cluster_servlet, + get_ray_env_servlet_and_obj_store, +) logger = get_logger(__name__) @@ -170,9 +174,15 @@ def local_client_with_den_auth(logged_in_account): @pytest.fixture(scope="session") -def test_servlet(): - servlet, _ = get_ray_servlet_and_obj_store("test_servlet") - yield servlet +def test_env_servlet(): + env_servlet, _ = get_ray_env_servlet_and_obj_store("test_env_servlet") + yield env_servlet + + +@pytest.fixture(scope="session") +def test_cluster_servlet(request): + cluster_servlet = get_ray_cluster_servlet() + yield cluster_servlet @pytest.fixture(scope="function") @@ -180,7 +190,7 @@ def obj_store(request): # Use the parameter to set the name of the servlet actor to use env_servlet_name = request.param - _, test_obj_store = get_ray_servlet_and_obj_store(env_servlet_name) + _, test_obj_store = get_ray_env_servlet_and_obj_store(env_servlet_name) # Clears everything, not just what's in this env servlet test_obj_store.clear() diff --git a/tests/test_servers/test_http_client.py b/tests/test_servers/test_http_client.py index 304a95c6f..a48b4094d 100644 --- a/tests/test_servers/test_http_client.py +++ b/tests/test_servers/test_http_client.py @@ -1,4 +1,5 @@ import json +from unittest.mock import patch import pytest @@ -82,7 +83,11 @@ def test_get_certificate(self, mocker): mock_file_open().write.assert_called_once_with(b"certificate_content") @pytest.mark.level("unit") - def test_use_cert_verification(self, mocker): + @patch("runhouse.globals.rns_client.request_headers") + def test_use_cert_verification(self, mock_request_headers, mocker): + # Mock the request_headers to avoid actual HTTP requests in the test for loading the cluster token + mock_request_headers.return_value = {"Authorization": "Bearer mock_token"} + # Mock a certificate where the issuer is different from the subject mock_cert = mocker.MagicMock() mock_cert.issuer = "issuer" @@ -136,6 +141,9 @@ def test_use_cert_verification(self, mocker): @pytest.mark.level("unit") def test_call_module_method(self, mocker): + expected_headers = rns_client.request_headers( + resource_address=self.local_cluster.rns_address + ) response_sequence = [ json.dumps({"output_type": "stdout", "data": "Log message"}), json.dumps( @@ -161,7 +169,6 @@ def test_call_module_method(self, mocker): result = self.client.call( module_name, method_name, - resource_address=self.local_cluster.rns_address, run_name="test_run_name", ) @@ -177,9 +184,7 @@ def test_call_module_method(self, mocker): "save": False, "remote": False, } - expected_headers = rns_client.request_headers( - resource_address=self.local_cluster.rns_address - ) + expected_verify = self.client.verify mock_post.assert_called_once_with( @@ -193,6 +198,8 @@ def test_call_module_method(self, mocker): @pytest.mark.level("unit") def test_call_module_method_with_args_kwargs(self, mocker): + expected_headers = rns_client.request_headers(self.local_cluster.rns_address) + mock_response = mocker.MagicMock() mock_response.status_code = 200 # Set up iter_lines to return an iterator @@ -212,7 +219,6 @@ def test_call_module_method_with_args_kwargs(self, mocker): module_name, method_name, data=data, - resource_address=self.local_cluster.rns_address, run_name="test_run_name", ) @@ -226,7 +232,6 @@ def test_call_module_method_with_args_kwargs(self, mocker): "remote": False, } expected_url = f"http://localhost:32300/{module_name}/{method_name}" - expected_headers = rns_client.request_headers(self.local_cluster.rns_address) expected_verify = self.client.verify mock_post.assert_called_with( @@ -246,12 +251,12 @@ def test_call_module_method_error_handling(self, mocker, local_cluster): mocker.patch("requests.Session.post", return_value=mock_response) with pytest.raises(ValueError): - self.client.call( - "module", "method", resource_address=local_cluster.rns_address - ) + self.client.call("module", "method") @pytest.mark.level("unit") def test_call_module_method_config(self, mocker, local_cluster): + request_headers = rns_client.request_headers(local_cluster.rns_address) + test_data = self.local_cluster.config() mock_response = mocker.Mock() mock_response.status_code = 200 @@ -265,7 +270,7 @@ def test_call_module_method_config(self, mocker, local_cluster): cluster = self.client.call( EMPTY_DEFAULT_ENV_NAME, "install", - resource_address=local_cluster.rns_address, + headers=request_headers, ) assert cluster.config() == test_data diff --git a/tests/test_servers/test_http_server.py b/tests/test_servers/test_http_server.py index 94298442c..16be8636a 100644 --- a/tests/test_servers/test_http_server.py +++ b/tests/test_servers/test_http_server.py @@ -611,7 +611,8 @@ def test_no_access_to_cluster(self, http_client, cluster): import requests - with friend_account(): # Test accounts with Den auth are created under test_account + with friend_account(): + # Test accounts with Den auth are created under test_account res = requests.get( f"{rns_client.api_server_url}/resource", headers=rns_client.request_headers(cluster.rns_address), diff --git a/tests/test_servers/test_server_obj_store.py b/tests/test_servers/test_server_obj_store.py index f80e675dd..de2fdb81f 100644 --- a/tests/test_servers/test_server_obj_store.py +++ b/tests/test_servers/test_server_obj_store.py @@ -2,7 +2,7 @@ from runhouse.servers.obj_store import ObjStoreError -from tests.utils import friend_account, get_ray_servlet_and_obj_store +from tests.utils import friend_account, get_ray_env_servlet_and_obj_store def list_compare(list1, list2): @@ -126,7 +126,7 @@ def test_clear(self, obj_store): def test_many_env_servlets(self, obj_store): assert obj_store.keys() == [] - _, obj_store_2 = get_ray_servlet_and_obj_store("other") + _, obj_store_2 = get_ray_env_servlet_and_obj_store("other") assert obj_store_2.keys() == [] obj_store.put("k1", "v1") @@ -298,7 +298,7 @@ def test_many_env_servlets(self, obj_store): assert obj_store.keys_for_env_servlet_name(obj_store_2.servlet_name) == [] # Testing of maintaining envs - _, obj_store_3 = get_ray_servlet_and_obj_store("third") + _, obj_store_3 = get_ray_env_servlet_and_obj_store("third") assert obj_store_3.keys() == ["k1"] obj_store_3.put("k2", "v2") obj_store_3.put("k3", "v3") @@ -312,7 +312,7 @@ def test_many_env_servlets(self, obj_store): @pytest.mark.level("unit") def test_delete_env_servlet(self, obj_store): - _, obj_store_2 = get_ray_servlet_and_obj_store("obj_store_2") + _, obj_store_2 = get_ray_env_servlet_and_obj_store("obj_store_2") assert obj_store.keys() == [] assert obj_store_2.keys() == [] diff --git a/tests/test_servers/test_servlet.py b/tests/test_servers/test_servlet.py index 99d1eed6a..9514bf15b 100644 --- a/tests/test_servers/test_servlet.py +++ b/tests/test_servers/test_servlet.py @@ -8,11 +8,11 @@ @pytest.mark.servertest class TestServlet: @pytest.mark.level("unit") - def test_put_resource(self, test_servlet): + def test_put_resource(self, test_env_servlet): resource = Resource(name="local-resource") state = {} resp = ObjStore.call_actor_method( - test_servlet, + test_env_servlet, "aput_resource_local", data=serialize_data( (resource.config(condensed=False), state, resource.dryrun), "pickle" @@ -23,10 +23,10 @@ def test_put_resource(self, test_servlet): assert deserialize_data(resp.data, resp.serialization) == resource.name @pytest.mark.level("unit") - def test_put_obj_local(self, test_servlet): + def test_put_obj_local(self, test_env_servlet): resource = Resource(name="local-resource") resp = ObjStore.call_actor_method( - test_servlet, + test_env_servlet, "aput_local", key="key1", data=serialize_data(resource, "pickle"), @@ -35,9 +35,9 @@ def test_put_obj_local(self, test_servlet): assert resp.output_type == "success" @pytest.mark.level("unit") - def test_get_obj(self, test_servlet): + def test_get_obj(self, test_env_servlet): resp = ObjStore.call_actor_method( - test_servlet, + test_env_servlet, "aget_local", key="key1", default=KeyError, @@ -49,9 +49,9 @@ def test_get_obj(self, test_servlet): assert isinstance(resource, Resource) @pytest.mark.level("unit") - def test_get_obj_remote(self, test_servlet): + def test_get_obj_remote(self, test_env_servlet): resp = ObjStore.call_actor_method( - test_servlet, + test_env_servlet, "aget_local", key="key1", default=KeyError, @@ -63,9 +63,9 @@ def test_get_obj_remote(self, test_servlet): assert isinstance(resource_config, dict) @pytest.mark.level("unit") - def test_get_obj_does_not_exist(self, test_servlet): + def test_get_obj_does_not_exist(self, test_env_servlet): resp = ObjStore.call_actor_method( - test_servlet, + test_env_servlet, "aget_local", key="abcdefg", default=KeyError, diff --git a/tests/utils.py b/tests/utils.py index b12d8011d..bb9cecbe5 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -11,21 +11,33 @@ from runhouse.constants import TESTING_LOG_LEVEL from runhouse.globals import rns_client -from runhouse.servers.obj_store import ObjStore, RaySetupOption +from runhouse.servers.obj_store import get_cluster_servlet, ObjStore, RaySetupOption -def get_ray_servlet_and_obj_store(env_name): - """Helper method for getting auth servlet and base env servlet""" +def get_ray_env_servlet_and_obj_store(env_name): + """Helper method for getting object store""" test_obj_store = ObjStore() test_obj_store.initialize(env_name, setup_ray=RaySetupOption.GET_OR_FAIL) - servlet = test_obj_store.get_env_servlet( + test_env_servlet = test_obj_store.get_env_servlet( env_name=env_name, create=True, ) - return servlet, test_obj_store + return test_env_servlet, test_obj_store + + +def get_ray_cluster_servlet(cluster_config=None): + """Helper method for getting base cluster servlet""" + cluster_servlet = get_cluster_servlet(create_if_not_exists=True) + + if cluster_config: + ObjStore.call_actor_method( + cluster_servlet, "aset_cluster_config", cluster_config + ) + + return cluster_servlet def get_pid_and_ray_node(a=0): @@ -59,12 +71,14 @@ def friend_account(): try: account = rns_client.load_account_from_env( - token_env_var="TEST_TOKEN", - usr_env_var="TEST_USERNAME", + token_env_var="KITCHEN_TESTER_TOKEN", + usr_env_var="KITCHEN_TESTER_USERNAME", dotenv_path=dotenv_path, ) if account is None: - pytest.skip("`TEST_TOKEN` or `TEST_USERNAME` not set, skipping test.") + pytest.skip( + "`KITCHEN_TESTER_TOKEN` or `KITCHEN_TESTER_USERNAME` not set, skipping test." + ) yield account finally: