Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add multiplayer mode for coupled runs #1335

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ runtime.rds
# ignore slurm logs
slurm-[0-9]*.log
slurm-[0-9]*.out

# ignore helper file and log for multiplayer mode
/scripts/multiplayer/log.txt
/scripts/multiplayer/slurmjobs.sh
6 changes: 3 additions & 3 deletions config/tests/scenario_config_coupled_shortCascade.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
title;start;qos;sbatch;magpie_scen;magpie_empty;no_ghgprices_land_until;max_iterations;oldrun;path_gdx;path_gdx_ref;path_gdx_bau;path_report;cm_nash_autoconverge_lastrun;path_mif_ghgprice_land
TESTTHAT-SSP2EU-Base;1;priority;--wait;SSP2|NPI;TRUE;y2150;2;;;;;;2;
TESTTHAT-SSP2EU-NDC;1;priority;--wait;SSP2|NDC;TRUE;y2150;2;;;;;;2;TESTTHAT-SSP2EU-Base
TESTTHAT-SSP2EU-Policy;2;priority;--wait;SSP2|NDC;TRUE;y2150;2;TESTTHAT-SSP2EU-Base;;;;;;output/C_TESTTHAT-SSP2EU-Base-rem-1/REMIND_generic_C_TESTTHAT-SSP2EU-Base-rem-1.mif
TESTTHAT-SSP2EU-Base;1;auto;--wait;SSP2|NPI;TRUE;y2150;2;;;;;;2;
TESTTHAT-SSP2EU-NDC;1;auto;--wait;SSP2|NDC;TRUE;y2150;2;;;;;;2;TESTTHAT-SSP2EU-Base
TESTTHAT-SSP2EU-Policy;2;auto;--wait;SSP2|NDC;TRUE;y2150;2;TESTTHAT-SSP2EU-Base;;;;;;output/C_TESTTHAT-SSP2EU-Base-rem-1/REMIND_generic_C_TESTTHAT-SSP2EU-Base-rem-1.mif
41 changes: 41 additions & 0 deletions scripts/multiplayer/run.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
folder <- getwd()
message("\n### ", Sys.info()[["user"]], " checking at ", Sys.time(), ".")
Rfile <- "run.R"
if (basename(folder) == "multiplayer" && file.exists(Rfile)) {
setwd(file.path("..", ".."))
}
multiplayerfolder <- file.path("scripts", "multiplayer")
lockID <- try(gms::model_lock(folder = multiplayerfolder, file = ".lock", timeout1 = 0.05), silent = TRUE)
if (inherits(lockID, "try-error")) {
message("Could not get lock within 3 minutes, skipping.")
} else {
bashfile <- file.path(multiplayerfolder, "slurmjobs.sh")
if (file.exists(bashfile)) {
sq <- system(paste0("squeue -u ", Sys.info()[["user"]], " -o '%q %j' | grep -v multiplayer"), intern = TRUE)
freepriority <- max(0, 4 - sum(grepl("^priority ", sq)))
code <- readLines(con = bashfile)
code <- code[code != ""]
start <- min(freepriority, length(code))
message("# With ", freepriority, " free priority slots and ", length(code), " runs waiting, starting ", start, " runs.")
if (start > 0) {
for (c in code[seq(start)]) {
message("\n", c)
exitCode <- system(c)
if (0 < exitCode) {
message("System call failed, not deleting the above line.")
}
code <- setdiff(code, c)
}
write(code, file = bashfile, append = FALSE)
message("\n# Still ", length(code), " runs left.")
}
} else {
stop("### ", bashfile, " does not exist, stopping multiplayer mode for ", Sys.info()[["user"]])
}
gms::model_unlock(lockID)
}
mins <- 30
setwd(multiplayerfolder)
system(paste0("sbatch --qos=short --wrap='Rscript --vanilla ", Rfile, "' --job-name=multiplayer ",
"--output=log.txt --error=log.txt --open-mode=append --time=5 --begin=now+", mins, "minutes"))
message("### ", Sys.info()[["user"]], " will be back in ", mins, " minutes.\n")
35 changes: 35 additions & 0 deletions scripts/multiplayer/start.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
folder <- getwd()
Rfile <- "run.R"
if (! basename(folder) == "multiplayer" || ! file.exists(Rfile)) {
setwd(file.path("scripts", "multiplayer"))
if (! basename(getwd()) == "multiplayer" || ! file.exists(Rfile)) {
stop("No idea where you are. Please run 'Rscript scripts/multiplayer/start.R' from your REMIND directory.")
}
}

bashfile <- "slurmjobs.sh"
if (! file.exists(bashfile)) {
lockID <- try(gms::model_lock(file = ".lock", timeout1 = 0.05), silent = TRUE)
if (inherits(lockID, "try-error")) {
stop("Could not get lock within 3 minutes, skipping.")
} else {
write("", file = bashfile, append = TRUE)
gms::model_unlock(lockID)
}
}

command <- paste0("sbatch --qos=short --wrap='Rscript --vanilla ", Rfile, "' --job-name=multiplayer --output=log.txt ",
"--error=log.txt --open-mode=append --time=5")

squeueresult <- suppressWarnings(system(paste0("squeue -u ", Sys.info()[["user"]], " -h -o '%j %Z' | grep multiplayer"), intern = TRUE))
if (any(squeueresult == paste("multiplayer", getwd()))) {
message("\n### A multiplayer job is already running for your user in this folder. Skipping.")
} else {
message("\n### Thanks for entering multiplayer mode.")
message("A slurm job named 'multiplayer' on the 'short' qos will be started.")
message("It tries to start new runs on 'priority' slots regularly and starts itself again.")
message("If you are ready, delete 'scripts/multiplayer/slurmjobs.sh' which will stop all multiplayer runs.")
message("Check 'scripts/multiplayer/log.txt' to see the progress.")
system(command)
}

2 changes: 1 addition & 1 deletion scripts/start/modelSummary.R
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ modelSummary <- function(folder = ".", gams_runtime = NULL) {
message(" Modelstat after ", as.numeric(names(modelstat)), " iterations: ", modelstat,
if (modelstat %in% names(explain_modelstat)) paste0(" (", explain_modelstat[modelstat], ")"))
}
logStatus <- grep("*** Status", readLines(file.path(folder, "full.log")), fixed = TRUE, value = TRUE)
logStatus <- grep("*** Status", readLines(file.path(folder, "full.log"), warn = FALSE), fixed = TRUE, value = TRUE)
message(" full.log states: ", paste(logStatus, collapse = ", "))
if (! all("*** Status: Normal completion" == logStatus)) stoprun <- TRUE
}
Expand Down
6 changes: 3 additions & 3 deletions scripts/start/prepare.R
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ prepare <- function() {
paste0("rev",cfg$inputRevision,"_", madrat::regionscode(cfg$regionmapping),ifelse(cfg$extramappings_historic == "","",paste0("-", madrat::regionscode(cfg$extramappings_historic))),"_", tolower(cfg$validationmodel_name),".tgz"),
paste0("CESparametersAndGDX_",cfg$CESandGDXversion,".tgz"))
# download and distribute needed data
if(!setequal(input_new, input_old) | cfg$force_download) {
message(if (cfg$force_download) "You set 'cfg$force_download = TRUE'"
if (! setequal(input_new, input_old) || isTRUE(cfg$force_download)) {
message(if (isTRUE(cfg$force_download)) "You set 'cfg$force_download = TRUE'"
else "Your input data are outdated or in a different regional resolution",
". New input data are downloaded and distributed.")
download_distribute(files = input_new,
Expand All @@ -277,7 +277,7 @@ prepare <- function() {
}

# extract BAU emissions for NDC runs to set up emission goals for region where only some countries have a target
if ((!is.null(cfg$gms$carbonprice) && (cfg$gms$carbonprice == "NDC")) | (!is.null(cfg$gms$carbonpriceRegi) && (cfg$gms$carbonpriceRegi == "NDC")) ){
if (isTRUE(cfg$gms$carbonprice == "NDC") || isTRUE(cfg$gms$carbonpriceRegi == "NDC")) {
cat("\nRun scripts/input/prepare_NDC.R.\n")
source("scripts/input/prepare_NDC.R")
prepare_NDC(as.character(cfg$files2export$start["input_bau.gdx"]), cfg)
Expand Down
16 changes: 10 additions & 6 deletions start.R
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ config.file <- NULL
if(!exists("argv")) argv <- commandArgs(trailingOnly = TRUE)
argv <- argv[! grepl("^-", argv) & ! grepl("=", argv)]
# check if user provided any unknown arguments or config files that do not exist
if (length(argv) > 0) {
file_exists <- file.exists(argv)
if (sum(file_exists) > 1) stop("You provided more than one file, start.R can only handle one.")
if (!all(file_exists)) stop("Unknown parameter provided: ", paste(argv[!file_exists], collapse = ", "))
# set config file to not known parameter where the file actually exists
config.file <- argv[[1]]
if (length(argv) == 1) {
if (file.exists(argv)) {
config.file <- argv
} else if (file.exists(file.path("config", argv)) {
config.file <- file.path("config", argv)
} else {
stop("Unknown parameter provided: ", paste(argv, collapse = ", "))
}
} else {
stop("You provided more than one file or other command line argument, start.R can only handle one.")
}

if ("--help" %in% flags) {
Expand Down
76 changes: 54 additions & 22 deletions start_bundle_coupled.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ helpText <- "

# Please provide all files and paths relative to the folder where start_coupled is executed
path_remind <- getwd() # provide path to REMIND. Default: the actual path which the script is started from
path_magpie <- normalizePath(file.path(getwd(), "..", "magpie"))
path_magpie <- normalizePath(file.path(getwd(), "magpie"), mustWork = FALSE)
if (! dir.exists(path_magpie)) path_magpie <- normalizePath(file.path(getwd(), "..", "magpie"))

# Paths to the files where scenarios are defined
# path_settings_remind contains the detailed configuration of the REMIND scenarios
Expand Down Expand Up @@ -177,6 +178,7 @@ errorsfound <- 0
startedRuns <- 0
finishedRuns <- 0
waitingRuns <- 0
qosRuns <- NULL
deletedFolders <- 0

stamp <- format(Sys.time(), "_%Y-%m-%d_%H.%M.%S")
Expand Down Expand Up @@ -262,10 +264,12 @@ for (scen in common) {
}
}

if (file.exists("/p") && "qos" %in% names(scenarios_coupled)
&& sum(scenarios_coupled[common, "qos"] == "priority", na.rm = TRUE) > 4) {
message("\nAttention, you want to start more than 4 runs with qos=priority mode.")
message("They may not be able to run in parallel on the PIK cluster.")
qos_default <- "auto"
if (! "qos" %in% names(scenarios_coupled)) scenarios_coupled[, "qos"] <- qos_default
scenarios_coupled[, "qos"] <- ifelse(is.na(scenarios_coupled[, "qos"]), qos_default, scenarios_coupled[, "qos"])
if (file.exists("/p") && sum(scenarios_coupled[common, "qos"] == "priority", na.rm = TRUE) > 4) {
message("\nAttention, you want to start more than 4 runs with qos=priority mode.")
message("They may not be able to run in parallel on the PIK cluster.")
}

####################################################
Expand All @@ -280,7 +284,7 @@ for(scen in common){
runname <- paste0(prefix_runname, scen) # name of the run that is used for the folder names
path_report <- NULL # sets the path to the report REMIND is started with in the first loop
qos <- scenarios_coupled[scen, "qos"] # set the SLURM quality of service (priority/short/medium/...)
if(is.null(qos) || is.na(qos)) qos <- "auto" # if qos could not be found in scenarios_coupled use short/priority
qosRuns[qos] <- if (is.null(qosRuns[qos])) 1 else qosRuns[qos] + 1 # count
sbatch <- scenarios_coupled[scen, "sbatch"] # retrieve sbatch options from scenarios_coupled
if (is.null(sbatch) || is.na(sbatch)) sbatch <- "" # if sbatch could not be found in scenarios_coupled use empty string
start_iter_first <- 1 # iteration to start the coupling with
Expand Down Expand Up @@ -523,10 +527,14 @@ for(scen in common){
}
foldername <- file.path("output", fullrunname)
if ((i > start_iter_first || !scenarios_coupled[scen, "start_magpie"]) && file.exists(foldername)) {
if (errorsfound == 0) {
if (! "--test" %in% flags) unlink(foldername, recursive = TRUE, force = TRUE)
message("Delete ", foldername, if ("--test" %in% flags) " if not in test mode", ". ", appendLF = FALSE)
deletedFolders <- deletedFolders + 1
if (errorsfound == 0 && ! any(c("--test", "--gamscompile") %in% flags)) {
message("Folder ", foldername, " exists but incomplete. Delete it and rerun (else will be skipped)? y/N")
if (tolower(gms::getLine()) %in% c("y", "yes")) {
unlink(foldername, recursive = TRUE, force = TRUE)
deletedFolders <- deletedFolders + 1
} else {
start_now <- FALSE
}
}
}

Expand Down Expand Up @@ -612,18 +620,30 @@ for (scen in common) {
logfile <- file.path("output", fullrunname, paste0("log", if (scenarios_coupled[scen, "start_magpie"]) "-mag", ".txt"))
if (! file.exists(dirname(logfile))) dir.create(dirname(logfile))
message("Find logging in ", logfile)
if (isTRUE(runEnv$qos == "auto")) {
starthere <- TRUE
if (isTRUE(runEnv$qos %in% c("auto", "multiplayer"))) {
sq <- system(paste0("squeue -u ", Sys.info()[["user"]], " -o '%q %j'"), intern = TRUE)
runEnv$qos <- if (is.null(attr(sq, "status")) && sum(grepl("^priority ", sq)) < 4) "priority" else "short"
starthereprio <- is.null(attr(sq, "status")) && sum(grepl("^priority ", sq)) < 4
starthere <- runEnv$qos == "auto" || starthereprio
runEnv$qos <- if (starthereprio || runEnv$qos == "multiplayer") "priority" else "short"
}
slurm_command <- paste0("sbatch --qos=", runEnv$qos, " --mem=8000 --job-name=", fullrunname,
" --output=", logfile, " --mail-type=END --comment=REMIND-MAgPIE --tasks-per-node=", runEnv$numberOfTasks,
" ", runEnv$sbatch, " --wrap=\"Rscript start_coupled.R coupled_config=", Rdatafile, "\"")
slurm_command <- paste0("sbatch --qos=", runEnv$qos, " --job-name=", fullrunname, " --output=", logfile,
" --mail-type=END --comment=REMIND-MAgPIE --tasks-per-node=", runEnv$numberOfTasks,
if (runEnv$numberOfTasks == 1) " --mem=8000", " ", runEnv$sbatch,
" --wrap=\"Rscript start_coupled.R coupled_config=", Rdatafile, "\"")
message(slurm_command)
exitCode <- system(slurm_command)
if (0 < exitCode) {
errorsfound <- errorsfound + 1
message("sbatch command failed, check logs.")
if (starthere) {
exitCode <- system(slurm_command)
if (0 < exitCode) {
errorsfound <- errorsfound + 1
message("sbatch command failed, check logs.")
}
} else {
lockID <- gms::model_lock(folder = file.path("scripts", "multiplayer"), file = ".lock")
multiplayersh <- file.path("scripts", "multiplayer", "slurmjobs.sh")
write(slurm_command, file = multiplayersh, append = TRUE)
message("Run not started, but written to ", multiplayersh)
gms::model_unlock(lockID)
}
}
}
Expand All @@ -646,9 +666,21 @@ if (! "--test" %in% flags && ! "--gamscompile" %in% flags) {
message(cs_command)
}

message("\nDone: ", finishedRuns, " runs already finished. ", deletedFolders, " folders deleted. ",
startedRuns, " runs started. ", waitingRuns, " runs are waiting.",
if("--test" %in% flags) "\nYou are in TEST mode, only RData files were written.")
message("#### Summary ####")
message("\nDone.", if(any(c("--test", "--gamscompile") %in% flags)) " You are in TEST or gamscompile mode, no runs were actually started.")
message("- ", finishedRuns, " runs already finished.")
message("- ", deletedFolders, " folders deleted.")
message("- ", startedRuns, " runs started.")
message("- ", waitingRuns, " runs are waiting.")
message("qos statistics: ", paste0(names(qosRuns), ": ", qosRuns, collapse = ", "))
if (file.exists("/p") && isTRUE(qosRuns["multiplayer"] > 0)) {
startfile <- file.path("scripts", "multiplayer", "start.R")
message("Some runs use multiplayer mode. Ask your colleagues to run 'Rscript ", startfile, "' in this folder.")
message("This creates a recurrent slurm job that starts the runs for which no free priority slot was available.")
message("Terminate all these multiplayer jobs by deleting the file 'scripts/multiplayer/slurmjobs.sh'.")
message("Starting such a job for you as well to use all 'priority' slots to guarantee that the cascades finishes even if nobody wants to help you.")
system(paste("Rscript", startfile)) # better than source to avoid changes in working directory etc
}
# make sure we have a non-zero exit status if there were any errors
if (0 < errorsfound) {
stop(red, errorsfound, NC, " errors were identified, check logs above for details.")
Expand Down
41 changes: 27 additions & 14 deletions start_coupled.R
Original file line number Diff line number Diff line change
Expand Up @@ -297,23 +297,35 @@ start_coupled <- function(path_remind, path_magpie, cfg_rem, cfg_mag, runname, m
message("Starting subsequent run ", run)
logfile <- file.path("output", subseq.env$fullrunname, "log.txt")
if (! file.exists(dirname(logfile))) dir.create(dirname(logfile))
if (isTRUE(subseq.env$qos == "auto")) {
starthere <- TRUE
if (isTRUE(subseq.env$qos %in% c("auto", "multiplayer"))) {
sq <- system(paste0("squeue -u ", Sys.info()[["user"]], " -o '%q %j' | grep -v ", fullrunname), intern = TRUE)
subseq.env$qos <- if (is.null(attr(sq, "status")) && sum(grepl("^priority ", sq)) < 4) "priority" else "short"
starthereprio <- is.null(attr(sq, "status")) && sum(grepl("^priority ", sq)) < 4
starthere <- subseq.env$qos == "auto" || starthereprio
subseq.env$qos <- if (starthereprio || subseq.env$qos == "multiplayer") "priority" else "short"
}
subsequentcommand <- paste0("sbatch --qos=", subseq.env$qos, " --mem=8000 --job-name=", subseq.env$fullrunname, " --output=", logfile,
subsequentcommand <- paste0("sbatch --qos=", subseq.env$qos, " --job-name=", subseq.env$fullrunname, " --output=", logfile,
" --mail-type=END --comment=REMIND-MAgPIE --tasks-per-node=", subseq.env$numberOfTasks,
" ", subseq.env$sbatch, " --wrap=\"Rscript start_coupled.R coupled_config=", RData_file, "\"")
if (subseq.env$numberOfTasks == 1) " --mem=8000", " ", subseq.env$sbatch,
" --wrap=\"Rscript start_coupled.R coupled_config=", RData_file, "\"")
message(subsequentcommand)
if (length(needfulldatagdx) > 0) {
exitCode <- system(subsequentcommand)
if (0 < exitCode) {
message("sbatch command failed, check logs")
errorsfound <- errorsfound + 1
# if sbatch has the --wait argument, the user is likely interactively
# waiting for the result of the run (like in a test). In that case,
# fail immediately so that the user knows about the failure asap.
stopifnot(! grepl("--wait", subsequentcommand))
if (starthere) {
exitCode <- system(subsequentcommand)
if (0 < exitCode) {
message("sbatch command failed, check logs")
errorsfound <- errorsfound + 1
# if sbatch has the --wait argument, the user is likely interactively
# waiting for the result of the run (like in a test). In that case,
# fail immediately so that the user knows about the failure asap.
stopifnot(! grepl("--wait", subsequentcommand))
}
} else {
lockID <- gms::model_lock(folder = file.path("scripts", "multiplayer"), file = ".lock")
multiplayersh <- file.path("scripts", "multiplayer", "slurmjobs.sh")
write(subsequentcommand, file = multiplayersh, append = TRUE)
message("Subsequent run not started, but written to ", multiplayersh)
gms::model_unlock(lockID)
}
} else {
message(RData_file, " already contained a gdx for this run. To avoid runs to be started twice, I'm not starting it. You can start it by running the command directly above.")
Expand Down Expand Up @@ -345,7 +357,8 @@ start_coupled <- function(path_remind, path_magpie, cfg_rem, cfg_mag, runname, m
source_include <- TRUE
runs <- runname
folder <- "./output"
source("scripts/output/comparison/plot_compare_iterations.R", local = TRUE)
pci <- try(source("scripts/output/comparison/plot_compare_iterations.R", local = TRUE))
if (inherits(pci, "try-error")) errorsfound <- errorsfound + 1
cs_runs <- findIterations(runname, modelpath = remindpath, latest = FALSE)
cs_name <- paste0("compScen-rem-1-", max_iterations, "_", runname)
cs_qos <- if (!isFALSE(run_compareScenarios)) run_compareScenarios else "short"
Expand All @@ -363,8 +376,8 @@ start_coupled <- function(path_remind, path_magpie, cfg_rem, cfg_mag, runname, m
}
}
}
if (errorsfound > 0) stop(errorsfound, " errors found, check the logs.")
message("### start_coupled() finished. ###")
if (errorsfound > 0) stop(errorsfound, " errors found, check the logs.")
}

##################################################################
Expand Down
Loading