From 627e00d16bbf4a9228287448746f31f946c2e8ae Mon Sep 17 00:00:00 2001 From: Cedric Zhuang Date: Tue, 3 Sep 2024 15:52:07 -0700 Subject: [PATCH] [GH-82] Enable concurrent stage in When `scratch` is enabled, use `nxf_parallel` to pull the input files. --- .../nextflow/FloatFileCopyStrategy.groovy | 21 +++++++++++-- .../nextflow/FloatGridExecutor.groovy | 2 +- .../memverge/nextflow/FloatBaseTest.groovy | 9 ++++++ .../nextflow/FloatFileCopyStrategyTest.groovy | 30 +++++++++++++++---- 4 files changed, 53 insertions(+), 9 deletions(-) diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatFileCopyStrategy.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatFileCopyStrategy.groovy index f3b403e..7f36032 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatFileCopyStrategy.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatFileCopyStrategy.groovy @@ -19,6 +19,7 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.executor.SimpleFileCopyStrategy import nextflow.file.FileSystemPathFactory +import nextflow.processor.TaskBean import nextflow.util.Escape import java.nio.file.Path @@ -29,11 +30,27 @@ import java.nio.file.Path class FloatFileCopyStrategy extends SimpleFileCopyStrategy { private FloatConf conf - FloatFileCopyStrategy(FloatConf conf) { - super() + FloatFileCopyStrategy(FloatConf conf, TaskBean bean) { + super(bean) this.conf = conf } + @Override + String getStageInputFilesScript(Map inputFiles) { + def result = 'downloads=(true)\n' + result += super.getStageInputFilesScript(inputFiles) + '\n' + result += 'nxf_parallel "${downloads[@]}"\n' + return result + } + + /** + * {@inheritDoc} + */ + @Override + String stageInputFile( Path path, String targetName ) { + return """downloads+=("${super.stageInputFile(path, targetName)}")""" + } + @Override String getBeforeStartScript() { def script = FloatBashLib.script(conf) diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy index eef9c23..29f9e56 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy @@ -74,7 +74,7 @@ class FloatGridExecutor extends AbstractGridExecutor { protected BashWrapperBuilder createBashWrapperBuilder(TaskRun task) { final bean = new TaskBean(task) - final strategy = new FloatFileCopyStrategy(floatConf) + final strategy = new FloatFileCopyStrategy(floatConf, bean) // creates the wrapper script final builder = new BashWrapperBuilder(bean, strategy) // job directives headers diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy index 06adddf..fedb6ed 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy @@ -16,6 +16,7 @@ package com.memverge.nextflow import nextflow.Session +import nextflow.processor.TaskBean import nextflow.processor.TaskConfig import nextflow.processor.TaskId import nextflow.processor.TaskProcessor @@ -85,6 +86,7 @@ class FloatBaseTest extends BaseTest { task.processor = Mock(TaskProcessor) task.processor.getSession() >> Mock(Session) task.processor.getExecutor() >> exec + task.processor.getProcessEnvironment() >> [:] task.config = conf task.id = new TaskId(id) task.index = taskSerial.incrementAndGet() @@ -93,6 +95,13 @@ class FloatBaseTest extends BaseTest { return task } + def newTaskBean(FloatTestExecutor exec, int id, TaskConfig conf = null) { + def task = newTask(exec, id, conf) + def bean = new TaskBean(task) + bean.stageInMode = 'copy' + return bean + } + def jobID(TaskId id) { return "${FloatConf.NF_JOB_ID}:$tJob-$id" } diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatFileCopyStrategyTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatFileCopyStrategyTest.groovy index 5d671ec..7e52cdb 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatFileCopyStrategyTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatFileCopyStrategyTest.groovy @@ -17,17 +17,16 @@ package com.memverge.nextflow import java.nio.file.Paths -class FloatFileCopyStrategyTest extends BaseTest { +class FloatFileCopyStrategyTest extends FloatBaseTest { def "get before script with conf"() { given: - def conf = [ - float: [maxParallelTransfers: 10] - ] + def conf = [float: [maxParallelTransfers: 10]] + def exec = newTestExecutor() when: def fConf = FloatConf.getConf(conf) - def strategy = new FloatFileCopyStrategy(fConf) + def strategy = new FloatFileCopyStrategy(fConf, newTaskBean(exec, 1)) final script = strategy.beforeStartScript then: @@ -35,18 +34,37 @@ class FloatFileCopyStrategyTest extends BaseTest { !script.contains('\nnull') } + def "get stage input file script"() { + given: + def conf = [float:[]] + def exec = newTestExecutor() + + when: + def fConf = FloatConf.getConf(conf) + def strategy = new FloatFileCopyStrategy(fConf, newTaskBean(exec, 1)) + final script = strategy.getStageInputFilesScript( + ['a': Paths.get('/target/A')]) + + then: + script.contains('downloads+=("cp -fRL /target/A a")') + script.contains('nxf_parallel') + !script.contains('\nnull') + } + def "get unstage output file script"() { given: def conf = [float:[]] + def exec = newTestExecutor() when: def fConf = FloatConf.getConf(conf) - def strategy = new FloatFileCopyStrategy(fConf) + def strategy = new FloatFileCopyStrategy(fConf, newTaskBean(exec, 1)) final script = strategy.getUnstageOutputFilesScript( ['a',], Paths.get('/target/A')) then: script.contains('eval "ls -1d a"') + script.contains('nxf_parallel') script.contains('uploads+=("nxf_fs_move "$name" /target/A")') } }