From a82e62bdfb9440e2e1e22bd711db498a69840789 Mon Sep 17 00:00:00 2001 From: arupcsedu Date: Thu, 3 Aug 2023 10:34:43 -0400 Subject: [PATCH] [Cylon-RP] Cylon scaling test with radical-pilot (#661) * [Cylon-RP] Cylon scaling test with radical-pilot Signed-off-by: Arup Sarker * [Cylon-RP] Cylon scaling test with radical-pilot Signed-off-by: Arup Sarker * [Cylon] Fix panda build error Signed-off-by: Arup Sarker * [Cylon] Update numpy and panda version Signed-off-by: Arup Sarker * [Cylon] Update panda version Signed-off-by: Arup Sarker * [Cylon-radical pilot] Fix updated radical pilot script for scaling test Signed-off-by: Arup Sarker * [Cylon] Fix Cylon CI system and add slice operation on scaling test Signed-off-by: Arup Sarker * [Cylon] Fix cython version for CI build Signed-off-by: Arup Sarker * [Cylon] Update new version Signed-off-by: Arup Sarker * fix native build * [Cylon] Fix numpy and pyarrow dependencies Signed-off-by: Arup Sarker * [Pyarrow] Update workspace file for CI build Signed-off-by: Arup Sarker * update native script Signed-off-by: Arup Sarker * Update build.sh Co-authored-by: niranda perera * Update c-cpp.yml --------- Signed-off-by: Arup Sarker Co-authored-by: niranda perera --- build.sh | 4 +- conda/environments/cylon.yml | 6 +- conda/environments/cylon_MacOS.yml | 6 +- conda/environments/cylon_rivanna_1.yml | 2 +- conda/environments/cylon_rivanna_2.yml | 2 +- conda/environments/gcylon.yml | 6 +- conda/environments/windows.yml | 6 +- requirements.txt | 6 +- rivanna/README.md | 60 +++-- rivanna/rp-scripts/README.md | 51 +++++ rivanna/rp-scripts/config.json | 223 +++++++++++++++++++ rivanna/rp-scripts/raptor.cfg | 55 +++++ rivanna/rp-scripts/raptor_master.py | 210 ++++++++++++++++++ rivanna/rp-scripts/raptor_worker.py | 42 ++++ rivanna/rp-scripts/resource_uva.json | 39 ++++ rivanna/rp-scripts/rp-cylon.slurm | 21 ++ rivanna/rp-scripts/rp_scaling.py | 294 +++++++++++++++++++++++++ rivanna/scripts/README.md | 49 +++-- rivanna/scripts/cylon_scaling.py | 62 +++++- rivanna/scripts/scaling_job.slurm | 46 ++-- 20 files changed, 1103 insertions(+), 87 deletions(-) create mode 100644 rivanna/rp-scripts/README.md create mode 100644 rivanna/rp-scripts/config.json create mode 100644 rivanna/rp-scripts/raptor.cfg create mode 100755 rivanna/rp-scripts/raptor_master.py create mode 100755 rivanna/rp-scripts/raptor_worker.py create mode 100644 rivanna/rp-scripts/resource_uva.json create mode 100644 rivanna/rp-scripts/rp-cylon.slurm create mode 100644 rivanna/rp-scripts/rp_scaling.py diff --git a/build.sh b/build.sh index 288736e9b..c9f6b0a21 100755 --- a/build.sh +++ b/build.sh @@ -358,10 +358,10 @@ build_pyarrow() { build_python_pyarrow() { print_line - echo "Building Python" + echo "Building Pycylon" source "${PYTHON_ENV_PATH}"/bin/activate || exit 1 read_python_requirements - pip install pyarrow==5.0.0 || exit 1 + pip install pyarrow==9.0.0 || exit 1 ARROW_LIB=$(python3 -c 'import pyarrow as pa; import os; print(os.path.dirname(pa.__file__))') || exit 1 LD_LIBRARY_PATH="${ARROW_LIB}:${BUILD_PATH}/lib:${LD_LIBRARY_PATH}" || exit 1 diff --git a/conda/environments/cylon.yml b/conda/environments/cylon.yml index ab616bd1a..35849906a 100644 --- a/conda/environments/cylon.yml +++ b/conda/environments/cylon.yml @@ -10,9 +10,9 @@ dependencies: - glog - openmpi=4.1.3=ha1ae619_105 - ucx>=1.12.1 - - cython>=0.29,<0.30 - - numpy - - pandas>=1.0,<1.6.0dev0 + - cython>=0.29.31,<3 + - numpy<1.24.4 + - pandas>=1.0,<2.0.0 - fsspec>=0.6.0 - setuptools # they are not needed for using pygcylon or compiling it diff --git a/conda/environments/cylon_MacOS.yml b/conda/environments/cylon_MacOS.yml index 7aa0099ee..e41c71556 100644 --- a/conda/environments/cylon_MacOS.yml +++ b/conda/environments/cylon_MacOS.yml @@ -9,9 +9,9 @@ dependencies: - pyarrow=9.0.0 - glog - openmpi>=4.1.2 - - cython>=0.29,<0.30 - - numpy - - pandas>=1.0,<1.6.0dev0 + - cython>=0.29.31,<3 + - numpy<1.24.4 + - pandas>=1.0,<2.0.0 - fsspec>=0.6.0 - setuptools # they are not needed for using pygcylon or compiling it diff --git a/conda/environments/cylon_rivanna_1.yml b/conda/environments/cylon_rivanna_1.yml index f7f166894..d53400eab 100644 --- a/conda/environments/cylon_rivanna_1.yml +++ b/conda/environments/cylon_rivanna_1.yml @@ -10,7 +10,7 @@ dependencies: - glog - openmpi=4.1.3=ha1ae619_105 - ucx>=1.12.1 - - cython>=0.29,<0.30 + - cython>=0.29.31,<3 - numpy - pandas>=1.0,<1.6.0dev0 - fsspec>=0.6.0 diff --git a/conda/environments/cylon_rivanna_2.yml b/conda/environments/cylon_rivanna_2.yml index 4bccf625a..1b61f2e63 100644 --- a/conda/environments/cylon_rivanna_2.yml +++ b/conda/environments/cylon_rivanna_2.yml @@ -10,7 +10,7 @@ dependencies: - glog #- openmpi=4.1.3=ha1ae619_105 - ucx>=1.12.1 - - cython>=0.29,<0.30 + - cython>=0.29.31,<3 - numpy - pandas>=1.0,<1.6.0dev0 - fsspec>=0.6.0 diff --git a/conda/environments/gcylon.yml b/conda/environments/gcylon.yml index 885201337..3f692ed81 100644 --- a/conda/environments/gcylon.yml +++ b/conda/environments/gcylon.yml @@ -9,14 +9,14 @@ dependencies: - cmake>=3.23.1,!=3.25.0 - arrow-cpp=9 - pyarrow=9.0.0 - - cython>=0.29,<0.30 + - cython>=0.29.31,<3 - cudf=22.12.01 - cudatoolkit=11.5 - glog - openmpi=4.1.3=ha1ae619_105 - ucx>=1.12.1 - - numpy - - pandas>=1.0,<1.6.0dev0 + - numpy<1.24.4 + - pandas>=1.0,<2.0.0 - fsspec>=0.6.0 - setuptools # these are for running tests only, diff --git a/conda/environments/windows.yml b/conda/environments/windows.yml index 63042ae7e..4c6411bcf 100644 --- a/conda/environments/windows.yml +++ b/conda/environments/windows.yml @@ -9,9 +9,9 @@ dependencies: - pyarrow=9.0.0 - glog - msmpi - - cython>=0.29,<0.30 - - numpy - - pandas>=1.0,<1.6.0dev0 + - cython>=0.29.31,<3 + - numpy<1.24.4 + - pandas>=1.0,<2.0.0 - fsspec>=0.6.0 - setuptools # they are not needed for using pygcylon or compiling it diff --git a/requirements.txt b/requirements.txt index 59cfbdf78..6deb409ec 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ wheel -numpy -cython +cython>=0.29.31,<3 +numpy<1.24.4 +pandas>=1.0,<2.0.0 fsspec # required for pandas-parquet -pandas # setuptools==60.0.0 # required for mpi4py # mpi4py==3.1.3 mpi4py diff --git a/rivanna/README.md b/rivanna/README.md index 8e5492015..6e4eee9d1 100644 --- a/rivanna/README.md +++ b/rivanna/README.md @@ -4,43 +4,57 @@ Arup Sarker (arupcsedu@gmail.com, djy8hg@virginia.edu) -## Intsall instructions +## Install instructions Rivanna is an HPC system offerbed by University of Virginia. -There are two ways you can build cylon on Rivanna. +This will use custom dependencies of the system gcc, openmpi version. +```shell -### 1. Custom gcc conda install +git clone https://github.com/cylondata/cylon.git +cd cylon -This will use custom dependencies of the system gcc, openmpi version and run slurm script. +module load gcc/9.2.0 openmpi/3.1.6 python/3.7.7 cmake/3.23.3 +python -m venv $PWD/cy-rp-env -```shell -git clone https://github.com/cylondata/cylon.git -cd cylon -conda env create -f conda/environments/cylon_rivanna_1.yml -sbatch rivanna/job_cylon_rivanna_1.slurm -``` +source $PWD/cy-rp-env/bin/activate -For more details of the dependent libraries and Slurm scripts, Please checkout the following links: -* -* +pip install pip -U +pip install pytest -### 2. Module based conda install. +export CC=`which gcc` +export CXX=`which g++` +CC=gcc MPICC=mpicc pip install --no-binary mpi4py install mpi4py +pip install -U pytest-mpi +pip install numpy +pip install pyarrow==9.0.0 -This will build Cylon by using the loaded module of openmpi and gcc. -Create virtual environment +rm -rf build +BUILD_PATH=$PWD/build +export LD_LIBRARY_PATH=$BUILD_PATH/arrow/install/lib64:$BUILD_PATH/glog/install/lib64:$BUILD_PATH/lib64:$BUILD_PATH/lib:$LD_LIBRARY_PATH + +./build.sh -pyenv $PWD/cy-rp-env -bpath $(pwd)/build --cpp --python_with_pyarrow --cython --test --cmake-flags "-DMPI_C_COMPILER=$(which mpicc) -DMPI_CXX_COMPILER=$(which mpicxx)" + +``` +It will take some time to build. So, grab a coffee!!! + +Let's perform a scaling operation with join. Before that, we have to install the dependencies as follow. ```shell -git clone https://github.com/cylondata/cylon.git -cd cylon -conda env create -f conda/environments/cylon_rivanna_2.yml -sbatch rivanna/job_cylon_rivanna_2.slurm +pip install cloudmesh-common +pip install openssl-python +python3 -m pip install urllib3==1.26.6 ``` -For more details of the dependent libraries and Slurm scripts, Please checkout below links: +We will slum script to run the scaling operation. + +```shell +sbatch rivanna/scripts/scaling_job.slurm +``` + +For more details of the dependent libraries and Slurm scripts, Please checkout the following links: - - +* diff --git a/rivanna/rp-scripts/README.md b/rivanna/rp-scripts/README.md new file mode 100644 index 000000000..28a496536 --- /dev/null +++ b/rivanna/rp-scripts/README.md @@ -0,0 +1,51 @@ +# Running Cylon on Rivanna + +Arup Sarker (arupcsedu@gmail.com, djy8hg@virginia.edu) + + + +## Install instructions for Radical Pilot + +Rivanna is an HPC system offerbed by University of Virginia. +This will use custom dependencies of the system gcc, openmpi version. +Use the same python environment "cylon_rct" for radical-pilot + +```shell +module load gcc/9.2.0 openmpi/3.1.6 python/3.7.7 cmake/3.23.3 +source $HOME/cylon_rct/bin/activate +pip install radical.pilot +``` +For checking all dependent library version: + +```shell +radical-stack +``` +You need to export mongo-db url: + +```shell +export RADICAL_PILOT_DBURL="mongodb:ADD_YOUR_URL" +``` +Setup is done. Now let's execute scaling with cylon. + +```shell +cd /some_path_to/cylon/rivanna/rp-scripts +python rp_scaling.py +``` + +If you want to make any change in the uva resource file(/some_path_to/radical.pilot/src/radical/pilot/configs) or any other places in the radical pilot source, + +```shell +git clone https://github.com/radical-cybertools/radical.pilot.git +cd radical.pilot +``` +For reflecting those change, you need to upgrade radical-pilot by, + +```shell +pip install . --upgrade +``` + +To uninstall radical pilot, execute + +```shell +pip uninstall radical.pilot +``` \ No newline at end of file diff --git a/rivanna/rp-scripts/config.json b/rivanna/rp-scripts/config.json new file mode 100644 index 000000000..ceed11256 --- /dev/null +++ b/rivanna/rp-scripts/config.json @@ -0,0 +1,223 @@ + +{ + "local.localhost" : { + "project" : null, + "queue" : null, + "schema" : null, + "cores" : 32, + "gpus" : 0 + }, + + "local.localhost_test" : { + "project" : null, + "queue" : null, + "schema" : null, + "cores" : 1, + "gpus" : 0 + }, + + "local.localhost_flux" : { + "project" : null, + "queue" : null, + "schema" : null, + "cores" : 128, + "gpus" : 0 + }, + + "local.localhost_prte" : { + "project" : null, + "queue" : null, + "schema" : null, + "cores" : 64 + }, + + "local.local" : { + "project" : null, + "queue" : null, + "schema" : null, + "cores" : 64 + }, + + "anl.arcticus" : { + "project" : null, + "queue" : "arcticus", + "schema" : "local", + "cores" : 96, + "gpus" : 2 + }, + + "nersc.edison" : { + "project" : null, + "queue" : "debug", + "schema" : "ssh", + "cores" : 64 + }, + + "access.expanse": { + "project" : "UNC100", + "queue" : "compute", + "schema" : "local", + "cores" : 128 + }, + + "access.supermic_ssh" : { + "project" : "TG-MCB090174", + "queue" : "workq", + "schema" : "gsissh", + "cores" : 64 + }, + + "access.supermic" : { + "project" : "TG-MCB090174", + "queue" : "workq", + "schema" : "gsissh", + "cores" : 80 + }, + + "access.supermic_orte" : { + "project" : "TG-CCR140028", + "queue" : "workq", + "schema" : "gsissh", + "cores" : 80 + }, + + "princeton.traverse" : { + "project" : "tromp", + "queue" : "all", + "schema" : "local", + "cores" : 32, + "gpus" : 4 + }, + + "princeton.tiger_cpu" : { + "project" : "geo", + "queue" : "cpu", + "schema" : "ssh", + "cores" : 40 + }, + + "princeton.tiger_gpu" : { + "project" : "geo", + "queue" : "gpu", + "schema" : "local", + "cores" : 56 + }, + + "tacc.frontera" : { + "project" : "MCB20006", + "queue" : "development", + "schema" : "ssh", + "cores" : 64 + }, + + "access.longhorn" : { + "project" : "FTA-Jha", + "queue" : "development", + "schema" : "local", + "cores" : 40 + }, + + "access.bridges2" : { + "project" : null, + "queue" : "RM", + "schema" : "local", + "cores" : 128 + }, + + "access.wrangler" : { + "project" : "TG-MCB090174", + "queue" : "debug", + "schema" : "gsissh", + "cores" : 24 + }, + + "access.stampede2_ssh" : { + "project" : "TG-MCB090174", + "queue" : "development", + "schema" : "local", + "cores" : 204 + }, + + "access.stampede2_srun" : { + "project" : "TG-MCB090174", + "queue" : "development", + "schema" : "local", + "cores" : 204 + }, + + "access.stampede2_ibrun" : { + "project" : "TG-MCB090174", + "queue" : "development", + "schema" : "local", + "cores" : 204 + }, + + "ncar.cheyenne": { + "project" : "URTG0014", + "queue" : "regular", + "schema" : "local", + "cores" : 72 + }, + + "llnl.lassen" : { + "project" : "CV_DDMD", + "queue" : "pdebug", + # "queue" : "pbatch", + "schema" : "local", + "cores" : 30, + "gpus" : 0 + }, + + "ornl.spock" : { + "project" : "csc449", + "queue" : "ecp", + "schema" : "local", + "cores" : 64, + "gpus" : 4 + }, + + "ornl.summit" : { + "project" : "CSC343", + "queue" : "batch", + "schema" : "local", + "cores" : 168, + "gpus" : 6 + }, + + "ornl.summit_prte" : { + "project" : "geo111", + "queue" : "batch", + "schema" : "local", + "cores" : 168, + "gpus" : 6 + }, + + "ornl.rhea_aprun" : { + "project" : "BIP149", + "queue" : "batch", + "schema" : "local", + "cores" : 64 + }, + + "radical.three" : { + "project" : null, + "queue" : null, + "schema" : "ssh", + "cores" : 8 + }, + + "rutgers.amarel" : { + "project" : null, + "queue" : null, + "schema" : "ssh", + "cores" : 8 + }, + + "uva.rivanna" : { + "project" : null, + "queue" : "standard", + "schema" : "local", + "cores" : 8 + } +} + diff --git a/rivanna/rp-scripts/raptor.cfg b/rivanna/rp-scripts/raptor.cfg new file mode 100644 index 000000000..bf202a248 --- /dev/null +++ b/rivanna/rp-scripts/raptor.cfg @@ -0,0 +1,55 @@ +{ + # resource configuration + "cores_per_node" : 398, + "gpus_per_node" : 0, + + # raptor configuration + "n_masters" : 1, + "n_workers" : 1, + "masters_per_node" : 1, + "nodes_per_worker" : 1, + + # extra nodes for non-raptor rp tasks + "nodes_rp" : 1, + # extra resources for the rp agent (optional) + "nodes_agent" : 0, + + # pilot runtime in min + "runtime" : 60, + + # task configuration + "cores_per_task" : 1, + "sleep" : 3, + # These are used as the range of the for loops for defining and submitting + # non-raptor and raptor tasks, respectively. + "tasks_rp" : 1, + "tasks_raptor" : 1, + + "pilot_descr": { + "resource" : "uva.rivanna", + "runtime" : 60, + "access_schema": "interactive", + "queue" : "bii", + "project" : "bii_dsc_community" + }, + + "master_descr": { + "mode" : "raptor.master", + "named_env" : "cylon_rp_venv", + "executable" : "./raptor_master.py" + }, + + "worker_descr": { + "mode" : "raptor.worker", + "named_env" : "cylon_rp_venv", + "pre_exec" : ["module load gcc/9.2.0", + "module load openmpi/3.1.6", + "module load python/3.7.7", + "export LD_LIBRARY_PATH=$HOME/rc_arup/cylon/build/arrow/install/lib64:$HOME/rc_arup/cylon/build/glog/install/lib64:$HOME/rc_arup/cylon/build/lib64:$HOME/rc_arup/cylon/build/lib:$LD_LIBRARY_PATH" + ], + + # custom worker class + "raptor_class" : "MyWorker", + "raptor_file" : "./raptor_worker.py" + } +} \ No newline at end of file diff --git a/rivanna/rp-scripts/raptor_master.py b/rivanna/rp-scripts/raptor_master.py new file mode 100755 index 000000000..6e417682e --- /dev/null +++ b/rivanna/rp-scripts/raptor_master.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 + +import os +import sys +import time + +from collections import defaultdict + +import radical.utils as ru +import radical.pilot as rp + + +def out(msg): + sys.stdout.write('==== %s\n' % msg) + sys.stdout.flush() + + +# This script has to run as a task within a pilot allocation, and is +# a demonstration of a task overlay within the RCT framework. It is expected +# to be staged and launched by the `raptor.py` script in the radical.pilot +# examples/misc directory. +# This master task will: +# +# - create a master which bootstraps a specific communication layer +# - insert n workers into the pilot (again as a task) +# - perform RPC handshake with those workers +# - send RPC requests to the workers +# - terminate the worker +# +# The worker itself is an external program which is not covered in this code. + +RANKS = 2 + + + +# ------------------------------------------------------------------------------ +# +class MyMaster(rp.raptor.Master): + ''' + This class provides the communication setup for the task overlay: it will + set up the request / response communication queues and provide the endpoint + information to be forwarded to the workers. + ''' + + # -------------------------------------------------------------------------- + # + def __init__(self, cfg): + + self._cnt = 0 + self._submitted = defaultdict(int) + self._collected = defaultdict(int) + + # initialize the task overlay base class. That base class will ensure + # proper communication channels to the pilot agent. + super().__init__(cfg=cfg) + + self._sleep = self._cfg.sleep + + + # -------------------------------------------------------------------------- + # + def submit(self): + + # self._prof.prof('create_start') + + # create additional tasks to be distributed to the workers. + + #tds = list() + + #self._prof.prof('create_stop') + + # wait for outstanding tasks to complete + while True: + + completed = sum(self._collected.values()) + submitted = sum(self._submitted.values()) + + if submitted: + # request_cb has been called, so we can wait for completion + + self._log.info('=== submit done?: %d >= %d ', completed, submitted) + + if completed >= submitted: + break + + time.sleep(1) + + self._log.info('=== submit done!') + + + # -------------------------------------------------------------------------- + # + def request_cb(self, tasks): + + for task in tasks: + + self._log.debug('=== request_cb %s\n', task['uid']) + + mode = task['description']['mode'] + uid = task['description']['uid'] + + self._submitted[mode] += 1 + + # for each `function` mode task, submit one more `proc` mode request + if mode == rp.TASK_FUNC: + self.submit_tasks(rp.TaskDescription( + {'uid' : 'extra' + uid, + # 'timeout' : 10, + 'mode' : rp.TASK_PROC, + 'ranks' : RANKS, + 'executable': '/bin/sh', + 'arguments' : ['-c', + 'sleep %d; ' % self._sleep + + 'echo "hello $RP_RANK/$RP_RANKS: ' + '$RP_TASK_ID"'], + 'raptor_id' : 'master.000000'})) + + return tasks + + + # -------------------------------------------------------------------------- + # + def result_cb(self, tasks): + ''' + Log results. + + Log file is named by the master tasks UID. + ''' + for task in tasks: + + mode = task['description']['mode'] + self._collected[mode] += 1 + + # NOTE: `state` will be `AGENT_EXECUTING` + self._log.info('=== result_cb %s: %s [%s] [%s]', + task['uid'], + task['state'], + task['stdout'], + task['return_value']) + + # Note that stdout is part of the master task result. + print('id: %s [%s]:\n out: %s\n ret: %s\n' + % (task['uid'], task['state'], task['stdout'], + task['return_value'])) + + +# ------------------------------------------------------------------------------ +# +if __name__ == '__main__': + + # This master script runs as a task within a pilot allocation. The purpose + # of this master is to (a) spawn a set or workers within the same + # allocation, (b) to distribute work items (`hello` function calls) to those + # workers, and (c) to collect the responses again. + cfg_fname = str(sys.argv[1]) + cfg = ru.Config(cfg=ru.read_json(cfg_fname)) + cfg.rank = int(sys.argv[2]) + + n_workers = cfg.n_workers + nodes_per_worker = cfg.nodes_per_worker + cores_per_node = cfg.cores_per_node + gpus_per_node = cfg.gpus_per_node + descr = cfg.worker_descr + pwd = os.getcwd() + + # one node is used by master. Alternatively (and probably better), we could + # reduce one of the worker sizes by one core. But it somewhat depends on + # the worker type and application workload to judge if that makes sense, so + # we leave it for now. + + # create a master class instance - this will establish communication to the + # pilot agent + master = MyMaster(cfg) + + # insert `n` worker tasks into the agent. The agent will schedule (place) + # those workers and execute them. Insert one smaller worker (see above) + # NOTE: this assumes a certain worker size / layout + out('workers: %d' % n_workers) + descr['ranks'] = nodes_per_worker * cores_per_node + descr['gpus_per_rank'] = nodes_per_worker * gpus_per_node + worker_ids = master.submit_workers( + [rp.TaskDescription(descr) for _ in range(n_workers)]) + + # wait until `m` of those workers are up + # This is optional, work requests can be submitted before and will wait in + # a work queue. + # FIXME + master.wait_workers(count=1) + + out('start') + master.start() + out('submit') + master.submit() + + # let some time pass for client side tasks to complete + time.sleep(600) + + out('stop') + # TODO: can be run from thread? + master.stop() + out('join') + + # TODO: worker state callback + master.join() + out('done') + + # TODO: expose RPC hooks + + +# ------------------------------------------------------------------------------ diff --git a/rivanna/rp-scripts/raptor_worker.py b/rivanna/rp-scripts/raptor_worker.py new file mode 100755 index 000000000..d1ae066dc --- /dev/null +++ b/rivanna/rp-scripts/raptor_worker.py @@ -0,0 +1,42 @@ + +import time +import random + +import radical.pilot as rp + +RANKS = 2 + + +# ------------------------------------------------------------------------------ +# +class MyWorker(rp.raptor.MPIWorker): + ''' + This class provides the required functionality to execute work requests. + In this simple example, the worker only implements a single call: `hello`. + ''' + + # -------------------------------------------------------------------------- + # + def __init__(self, cfg): + + super().__init__(cfg) + + self.register_mode('foo', self._dispatch_foo) + + + # -------------------------------------------------------------------------- + # + def _dispatch_foo(self, task): + + import pprint + self._log.debug('==== running foo\n%s', + pprint.pformat(task['description'])) + + return 'out', 'err', 0, None, None + + + # -------------------------------------------------------------------------- + # + +# ------------------------------------------------------------------------------ + diff --git a/rivanna/rp-scripts/resource_uva.json b/rivanna/rp-scripts/resource_uva.json new file mode 100644 index 000000000..de4cdc193 --- /dev/null +++ b/rivanna/rp-scripts/resource_uva.json @@ -0,0 +1,39 @@ +{ + "description" : "Heterogeneous community-model Linux cluster", + "notes" : "Access from registered UVA IP address. See https://www.rc.virginia.edu/userinfo/rivanna/login/", + "schemas" : ["local", "ssh", "interactive"], + "local" : + { + "job_manager_endpoint" : "slurm://rivanna.hpc.virginia.edu/", + "filesystem_endpoint" : "file://rivanna.hpc.virginia.edu/" + }, + "ssh" : + { + "job_manager_endpoint" : "slurm+ssh://rivanna.hpc.virginia.edu/", + "filesystem_endpoint" : "sftp://rivanna.hpc.virginia.edu/" + }, + "interactive" : + { + "job_manager_endpoint" : "fork://localhost/", + "filesystem_endpoint" : "file://localhost/" + }, + "default_queue" : "standard", + "resource_manager" : "SLURM", + "agent_scheduler" : "CONTINUOUS", + "agent_spawner" : "POPEN", + "launch_methods" : { + "order": ["MPIRUN"], + "MPIRUN" : {} + }, + "pre_bootstrap_0" : [ + "module load gcc/9.2.0", + "module load openmpi/3.1.6", + "module load python/3.7.7", + "export LD_LIBRARY_PATH=$HOME/rc_arup/cylon/build/arrow/install/lib64:$HOME/rc_arup/cylon/build/glog/install/lib64:$HOME/rc_arup/cylon/build/lib64:$HOME/rc_arup/cylon/build/lib:$LD_LIBRARY_PATH" + ], + "default_remote_workdir" : "/scratch/$USER", + "python_dist" : "default", + "virtenv_dist" : "default", + "virtenv_mode" : "create", + "rp_version" : "local" +} \ No newline at end of file diff --git a/rivanna/rp-scripts/rp-cylon.slurm b/rivanna/rp-scripts/rp-cylon.slurm new file mode 100644 index 000000000..fe360d5d4 --- /dev/null +++ b/rivanna/rp-scripts/rp-cylon.slurm @@ -0,0 +1,21 @@ +#!/bin/bash +#SBATCH --nodes=10 +#SBATCH --ntasks-per-node=40 +#SBATCH --exclusive +#SBATCH --time=2:00:00 +#SBATCH --partition=bii +#SBATCH -A bii_dsc_community +#SBATCH --output=rivanna/scripts/cylogs/rp-cylon-3n-40w-5m-%x-%j.out +#SBATCH --error=rivanna/scripts/cylogs/rp-cylon-3n-40w-5m-%x-%j.err + + +module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 + + +source $HOME/cylon_rp_venv/bin/activate + +export RADICAL_LOG_LVL="DEBUG" +export RADICAL_PROFILE="TRUE" +export RADICAL_PILOT_DBURL="mongodb://rct-tutorial:HXH7vExF7GvCeMWn@95.217.193.116:27017/rct-tutorial" + +python rivanna/rp-scripts/rp_scaling.py diff --git a/rivanna/rp-scripts/rp_scaling.py b/rivanna/rp-scripts/rp_scaling.py new file mode 100644 index 000000000..f65e1eb65 --- /dev/null +++ b/rivanna/rp-scripts/rp_scaling.py @@ -0,0 +1,294 @@ +#!/usr/bin/env python3 + +''' +Demonstrate the "raptor" features for remote Task management. + +This script and its supporting files may use relative file paths. Run from the +directory in which you found it. + +Refer to the ``raptor.cfg`` file in the same directory for configurable run time +details. + +By default, this example uses the ``local.localhost`` resource with the +``local`` access scheme where RP oversubscribes resources to emulate multiple +nodes. + +In this example, we + - Launch one or more raptor "master" task(s), which self-submits additional + tasks (results are logged in the master's `result_cb` callback). + - Stage scripts to be used by a raptor "Worker" + - Provision a Python virtual environment with + :py:func:`~radical.pilot.prepare_env` + - Submit several tasks that will be routed through the master(s) to the + worker(s). + - Submit a non-raptor task in the same Pilot environment + +''' + +import os +import sys + +import radical.utils as ru +import radical.pilot as rp + + +# To enable logging, some environment variables need to be set. +# Ref +# * https://radicalpilot.readthedocs.io/en/stable/overview.html#what-about-logging +# * https://radicalpilot.readthedocs.io/en/stable/developer.html#debugging +# For terminal output, set RADICAL_LOG_TGT=stderr or RADICAL_LOG_TGT=stdout +logger = ru.Logger('raptor') +PWD = os.path.abspath(os.path.dirname(__file__)) +RANKS = 398 + + +# ------------------------------------------------------------------------------ +# +@rp.pythontask +def cylon_join(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':20000000, 'it': 10}): + + import time + import argparse + + import pandas as pd + + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + df3 = df1.merge(df2, on=[0], algorithm='sort', env=env) + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + + + +def cylon_slice(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':20000000, 'it': 10}): + + import time + import argparse + + import pandas as pd + + from numpy.random import default_rng + from pycylon.frame import CylonEnv, DataFrame + from pycylon.net import MPIConfig + + comm = comm + data = data + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + + for i in range(data['it']): + env.barrier() + t1 = time.time() + df3 = df1[0:20000000, env] # distributed slice + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + + if env.rank == 0: + pass + env.finalize() + + +# ------------------------------------------------------------------------------ +# +def task_state_cb(task, state): + logger.info('task %s: %s', task.uid, state) + if state == rp.FAILED: + logger.error('task %s failed', task.uid) + + +# ------------------------------------------------------------------------------ +# +if __name__ == '__main__': + + if len(sys.argv) < 2: + cfg_file = '%s/raptor.cfg' % PWD + + else: + cfg_file = sys.argv[1] + + cfg = ru.Config(cfg=ru.read_json(cfg_file)) + + cores_per_node = cfg.cores_per_node + gpus_per_node = cfg.gpus_per_node + n_masters = cfg.n_masters + n_workers = cfg.n_workers + masters_per_node = cfg.masters_per_node + nodes_per_worker = cfg.nodes_per_worker + + # we use a reporter class for nicer output + report = ru.Reporter(name='radical.pilot') + report.title('Raptor example (RP version %s)' % rp.version) + + session = rp.Session() + try: + pd = rp.PilotDescription(cfg.pilot_descr) + + pd.cores = 400 + pd.gpus = 0 + pd.runtime = 60 + + pmgr = rp.PilotManager(session=session) + tmgr = rp.TaskManager(session=session) + tmgr.register_callback(task_state_cb) + + pilot = pmgr.submit_pilots(pd) + tmgr.add_pilots(pilot) + + pmgr.wait_pilots(uids=pilot.uid, state=[rp.PMGR_ACTIVE]) + + report.info('Stage files for the worker `my_hello` command.\n') + # See raptor_worker.py. + pilot.stage_in({'source': ru.which('radical-pilot-hello.sh'), + 'target': 'radical-pilot-hello.sh', + 'action': rp.TRANSFER}) + + # Issue an RPC to provision a Python virtual environment for the later + # raptor tasks. Note that we are telling prepare_env to install + # radical.pilot and radical.utils from sdist archives on the local + # filesystem. This only works for the default resource, local.localhost. + report.info('Call pilot.prepare_env()... ') + pilot.prepare_env(env_name='cylon_rp_venv', + env_spec={'type' : 'venv', + 'path' : '$HOME/cylon_rp_venv', + 'setup': []}) + report.info('done\n') + + # Launch a raptor master task, which will launch workers and self-submit + # some additional tasks for illustration purposes. + + master_ids = [ru.generate_id('master.%(item_counter)06d', + ru.ID_CUSTOM, ns=session.uid) + for _ in range(n_masters)] + + tds = list() + for i in range(n_masters): + td = rp.TaskDescription(cfg.master_descr) + td.mode = rp.RAPTOR_MASTER + td.uid = master_ids[i] + td.arguments = [cfg_file, i] + td.cpu_processes = 1 + td.cpu_threads = 1 + td.named_env = 'rp' + td.input_staging = [{'source': '%s/raptor_master.py' % PWD, + 'target': 'raptor_master.py', + 'action': rp.TRANSFER, + 'flags' : rp.DEFAULT_FLAGS}, + {'source': '%s/raptor_worker.py' % PWD, + 'target': 'raptor_worker.py', + 'action': rp.TRANSFER, + 'flags' : rp.DEFAULT_FLAGS}, + {'source': cfg_file, + 'target': os.path.basename(cfg_file), + 'action': rp.TRANSFER, + 'flags' : rp.DEFAULT_FLAGS} + ] + tds.append(td) + + if len(tds) > 0: + report.info('Submit raptor master(s) %s\n' + % str([t.uid for t in tds])) + task = tmgr.submit_tasks(tds) + if not isinstance(task, list): + task = [task] + + states = tmgr.wait_tasks( + uids=[t.uid for t in task], + state=rp.FINAL + [rp.AGENT_EXECUTING], + timeout=60 + ) + logger.info('Master states: %s', str(states)) + + tds = list() + for i in range(1): + + bson = cylon_join(comm=None, data={'unique': 0.9, 'scaling': 'w', 'rows':20000000, 'it': 10}) + tds.append(rp.TaskDescription({ + 'uid' : 'task.cylon.w.%06d' % i, + 'mode' : rp.TASK_FUNC, + 'ranks' : RANKS, + 'function' : bson, + 'raptor_id' : master_ids[i % n_masters]})) + + + if len(tds) > 0: + report.info('Submit tasks %s.\n' % str([t.uid for t in tds])) + tasks = tmgr.submit_tasks(tds) + + logger.info('Wait for tasks %s', [t.uid for t in tds]) + tmgr.wait_tasks(uids=[t.uid for t in tasks]) + + for task in tasks: + report.info('id: %s [%s]:\n out: %s\n ret: %s\n' + % (task.uid, task.state, task.stdout, task.return_value)) + + finally: + session.close(download=True) + + report.info('Logs from the master task should now be in local files \n') + report.info('like %s/%s/%s.log\n' % (session.uid, pilot.uid, master_ids[0])) + +# ------------------------------------------------------------------------------ \ No newline at end of file diff --git a/rivanna/scripts/README.md b/rivanna/scripts/README.md index f8e4899a0..6eaa056b7 100644 --- a/rivanna/scripts/README.md +++ b/rivanna/scripts/README.md @@ -2,35 +2,42 @@ ``` pip install cloudmesh-common +pip install openssl-python +python3 -m pip install urllib3==1.26.6 ``` 2. Run the scripts in set of **compute nodes** as follows. ```bash #!/bin/bash +#SBATCH --nodes=4 +#SBATCH --ntasks-per-node=40 +#SBATCH --exclusive +#SBATCH --time=1:00:00 +#SBATCH --partition=bii #SBATCH -A bii_dsc_community -#SBATCH -p standard -#SBATCH -N 1 -#SBATCH -c 32 -#SBATCH -t 10:00:00 - -PARENT=$HOME/.conda/envs # parent directory of conda env -ENV=cylon_rivanna_1 # name of env - -#---- DO NOT MODIFY THIS SECTION ---- -DIR=$PARENT/$ENV -module purge -module load anaconda -source activate cylon_rivanna_1 - -export OMPI_MCA_pml="ucx" OMPI_MCA_osc="ucx" \ - PATH=$DIR/bin:$DIR/libexec/gcc/x86_64-conda-linux-gnu/12.2.0:$PATH \ - LD_LIBRARY_PATH=$DIR/lib:$LD_LIBRARY_PATH \ - PYTHONPATH=$DIR/lib/python3.10/site-packages \ - CC=$(which mpicc) CXX=$(which mpicxx) +#SBATCH --output=rivanna/scripts/cylogs/mpirun-96t-4n-160w-35m-%x-%j.out +#SBATCH --error=rivanna/scripts/cylogs/mpirun-96t-4n-160w-35m-%x-%j.err + + +module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 + +#module load gcc/11.2.0 +#module load openmpi/4.1.4 +#module load python/3.11.1 + +#source $HOME/CYLON/bin/activate +source $HOME/cylon_rp_venv/bin/activate + +BUILD_PATH=$PWD/build + +export LD_LIBRARY_PATH=$BUILD_PATH/arrow/install/lib64:$BUILD_PATH/glog/install/lib64:$BUILD_PATH/lib64:$BUILD_PATH/lib:$LD_LIBRARY_PATH + which python gcc g++ -#---- (END) ---- -python cylon_scaling.py -n 8 + +#srun -n 160 python $PWD/rivanna/scripts/cylon_scaling.py -n 35000000 +mpirun -np 160 python rivanna/scripts/cylon_scaling.py -n 35000000 + ``` \ No newline at end of file diff --git a/rivanna/scripts/cylon_scaling.py b/rivanna/scripts/cylon_scaling.py index 870a2e512..180dafc8e 100644 --- a/rivanna/scripts/cylon_scaling.py +++ b/rivanna/scripts/cylon_scaling.py @@ -34,7 +34,10 @@ def join(data=None): df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) - + + if env.rank == 0: + print("Task# ", data['task']) + for i in range(data['it']): env.barrier() StopWatch.start(f"join_{i}_{data['host']}_{data['rows']}_{data['it']}") @@ -42,7 +45,7 @@ def join(data=None): df3 = df1.merge(df2, on=[0], algorithm='sort', env=env) env.barrier() t2 = time.time() - t = (t2 - t1) * 1000 + t = (t2 - t1) sum_t = comm.reduce(t) tot_l = comm.reduce(len(df3)) @@ -58,6 +61,57 @@ def join(data=None): env.finalize() +def slice(data=None): + StopWatch.start(f"slice_total_{data['host']}_{data['rows']}_{data['it']}") + + comm = MPI.COMM_WORLD + + config = MPIConfig(comm) + env = CylonEnv(config=config, distributed=True) + + u = data['unique'] + + if data['scaling'] == 'w': # weak + num_rows = data['rows'] + max_val = num_rows * env.world_size + else: # 's' strong + max_val = data['rows'] + num_rows = int(data['rows'] / env.world_size) + + rng = default_rng(seed=env.rank) + data1 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + data2 = rng.integers(0, int(max_val * u), size=(num_rows, 2)) + + df1 = DataFrame(pd.DataFrame(data1).add_prefix("col")) + df2 = DataFrame(pd.DataFrame(data2).add_prefix("col")) + + if env.rank == 0: + print("Task# ", data['task']) + + for i in range(data['it']): + env.barrier() + StopWatch.start(f"slice_{i}_{data['host']}_{data['rows']}_{data['it']}") + t1 = time.time() + df3 = df1[0:20000000, env] # distributed slice + #print(df3) + #df3 = df1.merge(df2, on=[0], algorithm='sort', env=env) + env.barrier() + t2 = time.time() + t = (t2 - t1) + sum_t = comm.reduce(t) + tot_l = comm.reduce(len(df3)) + + if env.rank == 0: + avg_t = sum_t / env.world_size + print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l) + StopWatch.stop(f"slice_{i}_{data['host']}_{data['rows']}_{data['it']}") + + StopWatch.stop(f"slice_total_{data['host']}_{data['rows']}_{data['it']}") + + if env.rank == 0: + StopWatch.benchmark(tag=str(data)) + + env.finalize() if __name__ == "__main__": parser = argparse.ArgumentParser(description="weak scaling") @@ -69,7 +123,9 @@ def join(data=None): args = vars(parser.parse_args()) args['host'] = "rivanna" - join(args) + for i in range(160): + args['task'] = i + join(args) # os.system(f"{git} branch | fgrep '*' ") # os.system(f"{git} rev-parse HEAD") diff --git a/rivanna/scripts/scaling_job.slurm b/rivanna/scripts/scaling_job.slurm index 5dc1f8945..22ec1fb80 100644 --- a/rivanna/scripts/scaling_job.slurm +++ b/rivanna/scripts/scaling_job.slurm @@ -1,26 +1,30 @@ #!/bin/bash +#SBATCH --nodes=4 +#SBATCH --ntasks-per-node=40 +#SBATCH --exclusive +#SBATCH --time=1:00:00 +#SBATCH --partition=bii #SBATCH -A bii_dsc_community -#SBATCH -p standard -#SBATCH -N 1 -#SBATCH -c 10 -#SBATCH -t 10:00:00 - -PARENT=$HOME/.conda/envs # parent directory of conda env -ENV=cylon_rivanna_1 # name of env - -#---- DO NOT MODIFY THIS SECTION ---- -DIR=$PARENT/$ENV -module purge -module load anaconda -source activate cylon_rivanna_1 - -export OMPI_MCA_pml="ucx" OMPI_MCA_osc="ucx" \ - PATH=$DIR/bin:$DIR/libexec/gcc/x86_64-conda-linux-gnu/12.2.0:$PATH \ - LD_LIBRARY_PATH=$DIR/lib:$LD_LIBRARY_PATH \ - PYTHONPATH=$DIR/lib/python3.10/site-packages \ - CC=$(which mpicc) CXX=$(which mpicxx) +#SBATCH --output=rivanna/scripts/cylogs/mpirun-96t-4n-160w-35m-%x-%j.out +#SBATCH --error=rivanna/scripts/cylogs/mpirun-96t-4n-160w-35m-%x-%j.err + + +module load gcc/9.2.0 openmpi/3.1.6 cmake/3.23.3 python/3.7.7 + +#module load gcc/11.2.0 +#module load openmpi/4.1.4 +#module load python/3.11.1 + +#source $HOME/CYLON/bin/activate +source $HOME/cylon_rp_venv/bin/activate + +BUILD_PATH=$PWD/build + +export LD_LIBRARY_PATH=$BUILD_PATH/arrow/install/lib64:$BUILD_PATH/glog/install/lib64:$BUILD_PATH/lib64:$BUILD_PATH/lib:$LD_LIBRARY_PATH + which python gcc g++ -#---- (END) ---- -python rivanna/scripts/cylon_scaling.py -n 8 + +#srun -n 160 python $PWD/rivanna/scripts/cylon_scaling.py -n 35000000 +mpirun -np 160 python rivanna/scripts/cylon_scaling.py -n 35000000