Skip to content

Commit

Permalink
[GH-25] Address review comments
Browse files Browse the repository at this point in the history
Retrieve all input and analyze the mount point.  If the work
directory is s3, we need to mount all s3 buckets in the input.

Address review comments.  Copy the code to retrieve s3 credentials
from NextFlow's `Global` class because it will be updated in recent
releases.
  • Loading branch information
jealous committed Jul 26, 2023
1 parent cfd89ce commit 79f5460
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 56 deletions.
1 change: 0 additions & 1 deletion plugins/nf-float/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ dependencies {
testImplementation ("org.spockframework:spock-core:2.0-M3-groovy-3.0") { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' }
testImplementation ('org.spockframework:spock-junit4:2.0-M3-groovy-3.0') { exclude group: 'org.codehaus.groovy'; exclude group: 'net.bytebuddy' }
testImplementation ('com.google.jimfs:jimfs:1.1')
testImplementation ('com.google.jimfs:jimfs:1.1')

// see https://docs.gradle.org/4.1/userguide/dependency_management.html#sec:module_replacement
modules {
Expand Down
28 changes: 20 additions & 8 deletions plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package com.memverge.nextflow

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Global
import nextflow.exception.AbortOperationException
import nextflow.io.BucketParser
import org.apache.commons.lang.StringUtils

/**
Expand All @@ -30,6 +30,7 @@ class FloatConf {
static final String MMC_ADDRESS = "MMC_ADDRESS"
static final String MMC_USERNAME = "MMC_USERNAME"
static final String MMC_PASSWORD = "MMC_PASSWORD"
static final String S3_SCHEMA = "s3"
static final String ADDR_SEP = ","
static final String NF_JOB_ID = "nf-job-id"

Expand Down Expand Up @@ -64,19 +65,30 @@ class FloatConf {
return ret
}

String getDataVolume(URI workDir) {
final scheme = workDir.getScheme()
if (scheme == "s3") {
private static boolean isS3(URI input) {
return input.getScheme() == S3_SCHEMA
}

String getInputVolume(URI input) {
if (isS3(input)) {
def options = ["mode=rw"]
if (s3accessKey && s3secretKey) {
options.add("accesskey=" + s3accessKey)
options.add("secret=" + s3secretKey)
}
final optionsStr = options.join(",")
final path = workDir.host ?
"/${workDir.host}${workDir.path}" :
workDir.path
return "[$optionsStr]s3:/$path:$path"

// the s3 URI may contains 3 slashes, replace it with 2
def string = input.toString().replaceAll("///", "//")
final bucket = BucketParser.from(string).bucket
return "[$optionsStr]$S3_SCHEMA://$bucket:/$bucket"
}
return ""
}

String getWorkDirVol(URI workDir) {
if (isS3(workDir)) {
return getInputVolume(workDir)
}
// local directory, need nfs support
if (!nfs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import groovy.transform.Memoized
import groovy.util.logging.Slf4j
import nextflow.executor.AbstractGridExecutor
import nextflow.file.FileHelper
import nextflow.fusion.FusionHelper
import nextflow.processor.TaskRun
import nextflow.util.Escape
import nextflow.util.ServiceName
Expand Down Expand Up @@ -73,7 +72,7 @@ class FloatGridExecutor extends AbstractGridExecutor {

protected String getHeaderScript(TaskRun task) {
log.info "[float] switch task ${task.id} to ${task.workDirStr}"
floatJobs.setWorkDir(task.id, task.workDirStr)
floatJobs.setWorkDir(task.id, task.workDir)

final path = Escape.path(task.workDir)
def result = "NXF_CHDIR=${path}\n"
Expand Down Expand Up @@ -118,7 +117,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
final mem = task.config.getMemory()
final giga = mem?.toGiga()
if (!giga) {
log.warn "memory $mem is too small. " +
log.debug "memory $mem is too small. " +
"will use default $DFT_MEM_GB"
}
return giga ? giga.toString() : DFT_MEM_GB
Expand Down Expand Up @@ -153,7 +152,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
return floatConf.getCliPrefix(address)
}

String toCmdString(List<String> floatCmd) {
String toLogStr(List<String> floatCmd) {
def ret = floatCmd.join(" ")
final toReplace = [
("-p " + floatConf.password): "-p ***",
Expand All @@ -170,7 +169,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
}

private static def warnDeprecated(String deprecated, String replacement) {
log.warn "[flaot] process `$deprecated` " +
log.warn1 "[float] process `$deprecated` " +
"is no longer supported, " +
"use $replacement instead"
}
Expand Down Expand Up @@ -222,25 +221,36 @@ class FloatGridExecutor extends AbstractGridExecutor {
return ''
}

private List<String> getMountVols(TaskRun task) {
List<String> volumes = []
volumes << floatConf.getWorkDirVol(workDir.uri)

for (def src : task.getInputFilesMap().values()) {
volumes << floatConf.getInputVolume(src.uri)
}
def ret = volumes.unique() - ""
log.info "[float] volumes to mount for ${task.id}: ${toLogStr(ret)}"
return ret
}

@Override
List<String> getSubmitCommandLine(TaskRun task, Path scriptFile) {
validate(task)

String tag = "${FloatConf.NF_JOB_ID}:${floatJobs.getJobName(task.id)}"
def volume = floatConf.getDataVolume(workDir.toUri())

final jobName = floatJobs.getJobName(task.id)
final String tag = "${FloatConf.NF_JOB_ID}:${jobName}"
def cmd = getSubmitCmdPrefix()
cmd << 'sbatch'
if (volume) {
cmd << '--dataVolume' << volume
for (def vol : getMountVols(task)) {
cmd << '--dataVolume' << vol
}
cmd << '--image' << getContainer(task)
cmd << '--cpu' << task.config.getCpus().toString()
cmd << '--mem' << getMemory(task)
cmd << '--job' << getScriptFilePath(scriptFile)
cmd << '--customTag' << tag
cmd.addAll(getExtra(task))
log.info "[float] submit job: ${toCmdString(cmd)}"
log.info "[float] submit job: ${toLogStr(cmd)}"
return cmd
}

Expand Down Expand Up @@ -289,7 +299,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
if (ret != 0) {
def m = """\
Unable to kill pending jobs
- cmd executed: ${toCmdString(cmd)}}
- cmd executed: ${toLogStr(cmd)}}
- exit status : $ret
- output :
""".stripIndent()
Expand All @@ -306,7 +316,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
* @param jobId The job ID to be kill
* @return The command line to be used to kill the specified job
*/
protected List<List<String>> killTaskCommands(def jobId) {
List<List<String>> killTaskCommands(def jobId) {
def jobIds
if (jobId instanceof Collection) {
jobIds = jobId
Expand All @@ -320,7 +330,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
cmd << 'scancel'
cmd << '-j'
cmd << id
log.info "[float] cancel job: ${toCmdString(cmd)}"
log.info "[float] cancel job: ${toLogStr(cmd)}"
ret.add(cmd)
}
return ret
Expand All @@ -337,7 +347,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
def cmd = getCmdPrefix0()
cmd << 'scancel'
cmd << '-j'
log.info "[float] cancel job: ${toCmdString(cmd)}"
log.info "[float] cancel job: ${toLogStr(cmd)}"
return cmd
}

Expand Down Expand Up @@ -375,7 +385,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
cmd << 'squeue'
cmd << '--format'
cmd << 'json'
log.info "[float] query job status: ${toCmdString(cmd)}"
log.info "[float] query job status: ${toLogStr(cmd)}"
return cmd
}

Expand Down Expand Up @@ -419,9 +429,4 @@ class FloatGridExecutor extends AbstractGridExecutor {
boolean isContainerNative() {
return true
}

@Override
boolean isFusionEnabled() {
return FusionHelper.isFusionEnabled(session)
}
}
29 changes: 17 additions & 12 deletions plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
package com.memverge.nextflow

import groovy.util.logging.Slf4j
import nextflow.file.FileHelper
import nextflow.processor.TaskId
import org.apache.commons.lang.RandomStringUtils
import org.apache.commons.lang.StringUtils

import java.nio.file.NoSuchFileException
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.ConcurrentHashMap

Expand All @@ -30,7 +33,7 @@ class FloatJobs {

private Map<String, String> job2status
private Map<String, String> job2oc
private Map<String, String> task2workDir
private Map<String, Path> task2workDir
private Collection<String> ocs
private String taskPrefix

Expand All @@ -42,13 +45,13 @@ class FloatJobs {
job2oc = new ConcurrentHashMap<>()
task2workDir = new ConcurrentHashMap<>()
ocs = ocAddresses
def charset = (('a'..'z')+('0'..'9')).join('')
def charset = (('a'..'z') + ('0'..'9')).join('')
taskPrefix = RandomStringUtils.random(
6, charset.toCharArray())
}

def setTaskPrefix(String prefix) {
taskPrefix = prefix
taskPrefix = prefix
}

String getJobName(TaskId id) {
Expand All @@ -63,7 +66,7 @@ class FloatJobs {
return job2status
}

def setWorkDir(TaskId taskID, String dir) {
def setWorkDir(TaskId taskID, Path dir) {
def name = getJobName(taskID)
task2workDir[name] = dir
}
Expand All @@ -85,23 +88,25 @@ class FloatJobs {

def currentSt = job2status.get(jobID, Unknown)
def workDir = task2workDir.get(taskID)
if (StringUtils.length(workDir) == 0) {
if (!workDir) {
return
}
// check the availability of result files
// call list files to update the folder cache
new File(workDir).listFiles()
FileHelper.listDirectory(workDir)
def files = ['.command.out', '.command.err', '.exitcode']
if (currentSt != Completed && st == Completed) {
for (filename in files) {
def name = Paths.get(workDir, filename).toString()
def file = new File(name)
if (!file.exists()) {
log.warn("job $jobID completed " +
"but file not found: $name")
def name = workDir.resolve(filename)
try {
!FileHelper.checkIfExists(name, [checkIfExists: true])
} catch (NoSuchFileException ex) {
log.info("[float] job $jobID completed " +
"but file not found: $ex")
return
}
}
log.info("found $files in: $workDir")
log.debug("[float] found $files in: $workDir")
}
job2status.put(jobID, st)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.memverge.nextflow

import java.nio.file.FileSystems
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.executor.GridTaskHandler
Expand All @@ -41,9 +42,15 @@ class FloatTaskHandler extends GridTaskHandler {
log.trace "start process ${task.name} > cli: ${cli}"

// -- prepare submit process
return new ProcessBuilder()
final builder = new ProcessBuilder()
.command( cli as String[] )
.redirectErrorStream(true)

if( task.workDir.fileSystem == FileSystems.default ) {
builder.directory(task.workDir.toFile())
}

return builder
}

}
Loading

0 comments on commit 79f5460

Please sign in to comment.