Skip to content

Commit

Permalink
[GH-82] Concurrrent output upload
Browse files Browse the repository at this point in the history
The `unstage` procedure upload the outputs synchronously when
`scratch` is enabled.
We can use `nxf_parallel` to increase the upload concurrency and
reduce the total job time.

Basically, we are updating the script from:
```
nxf_unstage() {
    true
    cp .command.out /cedric-memverge-test/nf-work/work/d7/5baf3a5eade9c1b917b43ac31c011c/.command.out || true
    cp .command.err /cedric-memverge-test/nf-work/work/d7/5baf3a5eade9c1b917b43ac31c011c/.command.err || true
    [[ ${nxf_main_ret:=0} != 0 ]] && return
    IFS=$'\n'
    for name in $(eval "ls -1d *.txt" | sort | uniq); do
        nxf_s3_upload $name s3://cedric-memverge-test/nf-work/work/d7/5baf3a5eade9c1b917b43ac31c011c || true
    done
    unset IFS
}
```
to
```
nxf_unstage() {
    true
    cp .command.out /cedric-memverge-test/nf-work/work/84/52d2105b401b8531669aeca026bbbd/.command.out || true
    cp .command.err /cedric-memverge-test/nf-work/work/84/52d2105b401b8531669aeca026bbbd/.command.err || true
    [[ ${nxf_main_ret:=0} != 0 ]] && return
    uploads=()
    IFS=$'\n'
    for name in $(eval "ls -1d *.txt" | sort | uniq); do
        uploads+=("nxf_s3_upload $name s3://cedric-memverge-test/nf-work/work/84/52d2105b401b8531669aeca026bbbd")
    done
    unset IFS
    nxf_parallel "${uploads[@]}"
}
```

Here is the test result for uploading 200 files to s3
before change:
```
executor >  float (1)
[42/74bdb4] process > CreateFiles [100%] 1 of 1 ✔
Waiting for file transfers to complete (17 files)
Completed at: 01-Sep-2024 18:35:58
Duration    : 14m 17s
CPU hours   : 1.6
Succeeded   : 1
```
with the fix:
```
executor >  float (1)
[84/52d210] process > CreateFiles [100%] 1 of 1 ✔
Waiting for file transfers to complete (131 files)
Completed at: 01-Sep-2024 18:43:52
Duration    : 6m 26s
CPU hours   : 0.5
Succeeded   : 1
```
  • Loading branch information
jealous committed Sep 2, 2024
1 parent 465a4a3 commit 4ea7d44
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.memverge.nextflow

import nextflow.executor.BashFunLib

class FloatBashLib extends BashFunLib<FloatBashLib> {
static String script(FloatConf conf) {
new FloatBashLib()
.includeCoreFun(true)
.withMaxParallelTransfers(conf.maxParallelTransfers)
.render()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class FloatConf {
float timeFactor = 1
float cpuFactor = 1
float memoryFactory = 1
int maxParallelTransfers = 4

float maxCpuFactor = DFT_MAX_CPU_FACTOR
float maxMemoryFactor = DFT_MAX_MEM_FACTOR
Expand Down Expand Up @@ -167,6 +168,9 @@ class FloatConf {
if (floatNode.memoryFactor) {
this.memoryFactory = floatNode.memoryFactor as Float
}
if (floatNode.maxParallelTransfers) {
this.maxParallelTransfers = floatNode.maxParallelTransfers as Integer
}
if (floatNode.containsKey('ignoreTimeFactor')) {
this.ignoreTimeFactor = floatNode.ignoreTimeFactor as Boolean
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2024, MemVerge Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.memverge.nextflow

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.executor.SimpleFileCopyStrategy
import nextflow.file.FileSystemPathFactory
import nextflow.util.Escape

import java.nio.file.Path


@Slf4j
@CompileStatic
class FloatFileCopyStrategy extends SimpleFileCopyStrategy {
private FloatConf conf

FloatFileCopyStrategy(FloatConf conf) {
super()
this.conf = conf
}

@Override
String getBeforeStartScript() {
def script = FloatBashLib.script(conf)
final lib = FileSystemPathFactory.bashLib(targetDir)
if (lib) {
script += '\n' + lib
}
return script.leftTrim()
}

/**
* Creates the script to unstage the result output files from the scratch directory
* to the shared working directory
*/
@Override
String getUnstageOutputFilesScript(List<String> outputFiles, Path targetDir) {
final patterns = normalizeGlobStarPaths(outputFiles)
// create a bash script that will copy the out file to the working directory
log.info "Unstaging file path: $patterns, outputFiles: $outputFiles targetDir: $targetDir"

if (!patterns)
return null

final escape = new ArrayList(outputFiles.size())
for (String it : patterns)
escape.add(Escape.path(it))

final mode = stageoutMode ?: (workDir == targetDir ? 'copy' : 'move')
return """\
uploads=()
IFS=\$'\\n'
for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do
uploads+=("${stageOutCommand('$name', targetDir, mode)}")
done
unset IFS
nxf_parallel "\${uploads[@]}"
""".stripIndent(true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.exception.AbortOperationException
import nextflow.executor.AbstractGridExecutor
import nextflow.executor.BashWrapperBuilder
import nextflow.file.FileHelper
import nextflow.fusion.FusionHelper
import nextflow.processor.TaskBean
import nextflow.processor.TaskId
import nextflow.processor.TaskRun
import nextflow.util.Escape
Expand Down Expand Up @@ -70,6 +72,16 @@ class FloatGridExecutor extends AbstractGridExecutor {
new FloatTaskHandler(task, this)
}

protected BashWrapperBuilder createBashWrapperBuilder(TaskRun task) {
final bean = new TaskBean(task)
final strategy = new FloatFileCopyStrategy(floatConf)
// creates the wrapper script
final builder = new BashWrapperBuilder(bean, strategy)
// job directives headers
builder.headerScript = getHeaderScript(task)
return builder
}

protected String getHeaderScript(TaskRun task) {
log.info "[FLOAT] switch task ${task.id} to ${task.workDirStr}"
floatJobs.setWorkDir(task.id, task.workDir)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024, MemVerge Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.memverge.nextflow

import java.nio.file.Paths

class FloatFileCopyStrategyTest extends BaseTest {

def "get before script with conf"() {
given:
def conf = [
float: [maxParallelTransfers: 10]
]

when:
def fConf = FloatConf.getConf(conf)
def strategy = new FloatFileCopyStrategy(fConf)
final script = strategy.beforeStartScript

then:
script.contains('cpus>10')
!script.contains('\nnull')
}

def "get unstage output file script"() {
given:
def conf = [float:[]]

when:
def fConf = FloatConf.getConf(conf)
def strategy = new FloatFileCopyStrategy(fConf)
final script = strategy.getUnstageOutputFilesScript(
['a',], Paths.get('/target/A'))

then:
script.contains('eval "ls -1d a"')
script.contains('uploads+=("nxf_fs_move "$name" /target/A")')
}
}

0 comments on commit 4ea7d44

Please sign in to comment.