diff --git a/cwltool/builder.py b/cwltool/builder.py index 191ef9ee7..8d848ab14 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -180,6 +180,7 @@ def __init__( stagedir: str, cwlVersion: str, container_engine: str, + dockerstagedir: str = None, ) -> None: """Initialize this Builder.""" super().__init__() @@ -211,6 +212,7 @@ def __init__( self.outdir = outdir self.tmpdir = tmpdir self.stagedir = stagedir + self.dockerstagedir = dockerstagedir self.cwlVersion = cwlVersion diff --git a/cwltool/docker.py b/cwltool/docker.py index b1c1b7bbf..89247bf86 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -247,8 +247,9 @@ def append_volume( options.append("readonly") output = StringIO() csv.writer(output).writerow(options) - mount_arg = output.getvalue().strip() - runtime.append(f"--mount={mount_arg}") + mount_arg = f"--mount={output.getvalue().strip()}" + if mount_arg not in runtime: + runtime.append(mount_arg) # Unlike "--volume", "--mount" will fail if the volume doesn't already exist. if not os.path.exists(source): os.makedirs(source) diff --git a/cwltool/job.py b/cwltool/job.py index 9cee47302..21d701b50 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -546,7 +546,6 @@ def run( else: if not os.path.exists(self.tmpdir): os.makedirs(self.tmpdir) - self._setup(runtimeContext) stage_files( @@ -696,19 +695,27 @@ def add_volumes( ) -> None: """Append volume mappings to the runtime option list.""" container_outdir = self.builder.outdir + dockerstagedir = self.builder.dockerstagedir + if dockerstagedir: + pathmapper.update(dockerstagedir, dockerstagedir, self.builder.stagedir, "Directory", True) for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): host_outdir_tgt = None # type: Optional[str] if vol.target.startswith(container_outdir + "/"): host_outdir_tgt = os.path.join( self.outdir, vol.target[len(container_outdir) + 1 :] ) - if not host_outdir_tgt and not any_path_okay: + if not host_outdir_tgt and not any_path_okay and key != dockerstagedir: raise WorkflowException( "No mandatory DockerRequirement, yet path is outside " "the designated output directory, also know as " "$(runtime.outdir): {}".format(vol) ) - if vol.type in ("File", "Directory"): + if vol.type == "File": + if dockerstagedir and vol.resolved.startswith(dockerstagedir): + pass + else: + self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) + elif vol.type == "Directory": self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) elif vol.type == "WritableFile": self.add_writable_file_volume( @@ -835,6 +842,25 @@ def run( env = dict(os.environ) (runtime, cidfile) = self.create_runtime(env, runtimeContext) + # if command is larger than this, we might exceed system limits + # 2097150 bytes is the Ubuntu system default + # check with: echo $(( $(ulimit -s)*1024 / 4 )) + # or: getconf ARG_MAX + # TODO: create a cross-platform function to check limit where it's run + if len(''.join(runtime)) >= 2097152 - 2: + if runtimeContext.singularity: + # TODO: write the singularity equivalent + # copy_these_into_container, new_runtime = self.filter_out_singularity_image_file_inputs(runtime) + # if copy_these_into_container: + # runtime = new_runtime + # img_id = self.bake_inputs_into_docker_container(str(img_id), copy_these_into_container) + pass + else: + copy_these_into_container, new_runtime = self.filter_out_docker_image_file_inputs(runtime) + if copy_these_into_container: + runtime = new_runtime + img_id = self.bake_inputs_into_docker_container(str(img_id), copy_these_into_container) + runtime.append(str(img_id)) monitor_function = None if cidfile: @@ -848,6 +874,42 @@ def run( monitor_function = functools.partial(self.process_monitor) self._execute(runtime, env, runtimeContext, monitor_function) + def bake_inputs_into_docker_container(self, image_id, inputs): + i = [f'FROM {image_id}'] + + # we do this to keep files in the build context for docker + staging_dir = tempfile.mkdtemp() + for input in inputs: + staged_file_uuid = str(uuid.uuid4()) + staged_file = os.path.join(staging_dir, staged_file_uuid) + shutil.copy(input['src'], staged_file) + i.append(f'COPY {staged_file_uuid} {input["dst"]}') + + with open(os.path.join(staging_dir, 'Dockerfile'), 'w') as f: + f.write('\n'.join(i)) + + docker_id = str(uuid.uuid4()) + _logger.critical(str(i)) + subprocess.run(['docker', 'build', '-f', os.path.join(staging_dir, 'Dockerfile'), '.', '-t', f'{docker_id}:cwl'], + cwd=staging_dir, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return f'{docker_id}:cwl' + + def filter_out_docker_image_file_inputs(self, runtime, outdir): + new_runtime = [] + copy_these_into_container = [] + for arg in runtime: + if arg.startswith('--mount=type=bind,source='): + src, dst = arg[len('--mount=type=bind,source='):].split(',target=') + if dst.endswith(',readonly'): + dst = dst[:-len(',readonly')] + if os.path.isfile(src) and not dst.startswith(outdir): + copy_these_into_container.append({'src': src, 'dst': dst}) + else: + new_runtime.append(arg) + else: + new_runtime.append(arg) + return copy_these_into_container, new_runtime + def docker_monitor( self, cidfile: str, diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index c89fac5f4..fa54707db 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -169,7 +169,9 @@ def setup(self, referenced_files: List[CWLObjectType], basedir: str) -> None: stagedir = self.stagedir for fob in referenced_files: if self.separateDirs: - stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) + location = fob['location'] if not fob['location'].startswith('file://') else fob['location'][len('file://'):] + stagedir = os.path.join(self.stagedir, os.path.basename(os.path.dirname(location))) + # stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) self.visit( fob, stagedir, diff --git a/cwltool/process.py b/cwltool/process.py index ddb6a9040..1f5f38361 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -252,15 +252,10 @@ def checkRequirements( checkRequirements(entry2, supported_process_requirements) -def stage_files( - pathmapper: PathMapper, - stage_func: Optional[Callable[[str, str], None]] = None, - ignore_writable: bool = False, - symlink: bool = True, - secret_store: Optional[SecretStore] = None, - fix_conflicts: bool = False, -) -> None: - """Link or copy files to their targets. Create them as needed.""" +def check_pathmapper_conflicts(pathmapper: PathMapper, fix_conflicts: bool = False) -> None: + """ + All pathmapper resolved file paths should be unique. If any conflict, this will error early or fix them. + """ targets = {} # type: Dict[str, MapperEnt] for key, entry in pathmapper.items(): if "File" not in entry.type: @@ -285,6 +280,18 @@ def stage_files( % (targets[entry.target].resolved, entry.resolved, entry.target) ) + +def stage_files( + pathmapper: PathMapper, + stage_func: Optional[Callable[[str, str], None]] = None, + ignore_writable: bool = False, + symlink: bool = True, + secret_store: Optional[SecretStore] = None, + fix_conflicts: bool = False, +) -> None: + """Link or copy files to their targets. Create them as needed.""" + check_pathmapper_conflicts(pathmapper, fix_conflicts) + for key, entry in pathmapper.items(): if not entry.staged: continue @@ -771,6 +778,18 @@ def _init_job( % (self.metadata.get("cwlVersion"), INTERNAL_VERSION) ) + if os.environ.get('GROUP_STAGING') or True: + dockerstagedir = runtime_context.get_stagedir() + for input_keyname, input in joborder.items(): + if input.get('location') and input.get('class') == 'File': + location = str(input.get('location')) + location = location if not location.startswith('file://') else location[len('file://'):] + unique_staging_dir = os.path.join(dockerstagedir, str(uuid.uuid4())) + os.makedirs(unique_staging_dir, exist_ok=True) + new_uri = os.path.join(unique_staging_dir, input['basename']) + shutil.copyfile(location, new_uri) + input['location'] = new_uri + job = copy.deepcopy(joborder) make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess) @@ -911,6 +930,7 @@ def inc(d): # type: (List[int]) -> None stagedir, cwl_version, self.container_engine, + dockerstagedir, ) bindings.extend( diff --git a/cwltool/singularity.py b/cwltool/singularity.py index 230c3a0cd..62b6f1d7d 100644 --- a/cwltool/singularity.py +++ b/cwltool/singularity.py @@ -259,13 +259,13 @@ def get_from_requirements( def append_volume( runtime: List[str], source: str, target: str, writable: bool = False ) -> None: - runtime.append("--bind") - # Mounts are writable by default, so 'rw' is optional and not - # supported (due to a bug) in some 3.6 series releases. - vol = f"{source}:{target}" + mount_arg = f'--bind={source}:{target}' if not writable: - vol += ":ro" - runtime.append(vol) + # Mounts are writable by default, so 'rw' is optional and not + # supported (due to a bug) in some 3.6 series releases. + mount_arg += ":ro" + if vol not in runtime: + runtime.append(vol) def add_file_or_directory_volume( self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str]