From 31e409fe4d6ba52eec698cbca4bb02ded386a5b0 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Thu, 10 Dec 2020 00:47:47 -0800 Subject: [PATCH 01/11] Add bones of idea without actually running the script. --- cwltool/docker.py | 4 +++ cwltool/job.py | 60 ++++++++++++++++++++++++++++++++++++++++++ cwltool/singularity.py | 31 +++++++++++++++------- 3 files changed, 85 insertions(+), 10 deletions(-) diff --git a/cwltool/docker.py b/cwltool/docker.py index 38f2dd87d..f88e77403 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -7,6 +7,7 @@ import shutil import subprocess # nosec import sys +import tempfile import threading from distutils import spawn from io import StringIO, open # pylint: disable=redefined-builtin @@ -102,6 +103,9 @@ def __init__( super(DockerCommandLineJob, self).__init__( builder, joborder, make_path_mapper, requirements, hints, name ) + # TODO: Unused; Implement for docker as well. + self.universal_file_bindmount_dir = tempfile.mkdtemp(suffix='-cwl-docker-mnt') + self.bindings_map = [] @staticmethod def get_image( diff --git a/cwltool/job.py b/cwltool/job.py index c4132d550..be13d0915 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -4,10 +4,12 @@ import logging import os import re +import resource import shutil import subprocess # nosec import sys import tempfile +import textwrap import threading import time import uuid @@ -529,6 +531,15 @@ def run( tmpdir_lock: Optional[threading.Lock] = None, ) -> None: + # attempt to set an "unlimited" (-1) heap size for this process + # (& thus commandline size) on any system that supports it + # TODO: Do containers inherit the processes's limits? + # Can they be configured from outside of the container? + try: + resource.setrlimit(resource.RLIMIT_DATA, (-1, -1)) + except Exception: + pass + if tmpdir_lock: with tmpdir_lock: if not os.path.exists(self.tmpdir): @@ -589,6 +600,20 @@ def run( class ContainerCommandLineJob(JobBase, metaclass=ABCMeta): """Commandline job using containers.""" + def __init__( + self, + builder: Builder, + joborder: CWLObjectType, + make_path_mapper: Callable[..., PathMapper], + requirements: List[CWLObjectType], + hints: List[CWLObjectType], + name: str, + ) -> None: + super(JobBase, self).__init__( + builder, joborder, make_path_mapper, requirements, hints, name + ) + self.universal_file_bindmount_dir = None + self.bindings_map = None @abstractmethod def get_from_requirements( @@ -710,6 +735,41 @@ def add_volumes( ) pathmapper.update(key, new_path, vol.target, vol.type, vol.staged) + # Dir of individual file inputs for the job (all named as uuid4). + # This creates the same dir inside of the container as exists outside of it, + # Overlayfs must be supported/enabled (which should always be true for CWL). + src = dst = self.universal_file_bindmount_dir + runtime.append(f"--bind={src}:{dst}:rw") + + # Make a TSV of the file mappings. + mapping_tsv = os.path.join(self.universal_file_bindmount_dir, 'mapping.tsv') + with open(mapping_tsv, 'w') as f: + # 1. Sort by the destination path, which should sort alphabetically + # and by shortest path first. + # 2. Then, when we go to hardlink the files, we + # should then just be able to hardlink them in order. + for (src, dst, writable) in sorted(self.bindings_map, key=lambda x: len(x[1])): + f.write('\t'.join((src, dst, writable)) + '\n') + + # Make the script that uses the TSV file mappings to hardlink everything + # inside of the container to where the job expects to find them. + # This script needs to be the first thing run inside of the container. + linking_script = os.path.join(self.universal_file_bindmount_dir, 'hard_linking_script.py') + # TODO: Write in bash instead. All images might not have python. + with open(linking_script, 'w') as f: + f.write(textwrap.dedent(f""" + import os + + with open('{mapping_tsv}', 'r') as f: + for line in f: + src, dst, writable = line.split('\\t') + os.makedirs(os.path.dirname(dst), exist_ok=True) + os.link(src, dst) + # TODO: set the permissions on the file here after linking + + """[1:])) + os.chmod(linking_script, 0o777) + def run( self, runtimeContext: RuntimeContext, diff --git a/cwltool/singularity.py b/cwltool/singularity.py index 9361bc016..d87e27084 100644 --- a/cwltool/singularity.py +++ b/cwltool/singularity.py @@ -5,6 +5,8 @@ import re import shutil import sys +import tempfile +from uuid import uuid4 from distutils import spawn from subprocess import ( # nosec DEVNULL, @@ -102,6 +104,8 @@ def __init__( super(SingularityCommandLineJob, self).__init__( builder, joborder, make_path_mapper, requirements, hints, name ) + self.universal_file_bindmount_dir = tempfile.mkdtemp(suffix='-cwl-singularity-mnt') + self.bindings_map = [] @staticmethod def get_image( @@ -278,18 +282,25 @@ def get_from_requirements( return os.path.abspath(cast(str, r["dockerImageId"])) - @staticmethod def append_volume( - runtime: List[str], source: str, target: str, writable: bool = False + self, runtime: List[str], source: str, target: str, writable: bool = False ) -> None: - runtime.append("--bind") - runtime.append( - "{}:{}:{}".format( - docker_windows_path_adjust(source), - docker_windows_path_adjust(target), - "rw" if writable else "ro", - ) - ) + src = docker_windows_path_adjust(source) + dst = docker_windows_path_adjust(target) + writable = "rw" if writable else "ro" + + # use only "os.path.isfile(source)" for Windows? check on this... + if os.path.isfile(source) or os.path.isfile(src): + bindmount_path = os.path.join(self.universal_file_bindmount_dir, str(uuid4())) + os.link(src, bindmount_path) + self.bindings_map.append((bindmount_path, dst, writable)) + # don't add a bind arg for the shared self.universal_file_bindmount_dir + # here but at the very end + else: + # TODO: We can still bind enough dirs to exceed the max command line length. + # Not sure how to handle this, since outputs deposited in mounted dirs + # need to be there after the run. + runtime.append(f"--bind={src}:{dst}:{writable}") def add_file_or_directory_volume( self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str] From b44af89ed2a9718c4ebf7db15f9920d111b820f6 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Tue, 16 Feb 2021 23:59:32 -0800 Subject: [PATCH 02/11] Simple stage files. --- cwltool/docker.py | 4 -- cwltool/job.py | 92 ++---------------------------------------- cwltool/pathmapper.py | 10 ++++- cwltool/singularity.py | 31 +++++--------- 4 files changed, 23 insertions(+), 114 deletions(-) diff --git a/cwltool/docker.py b/cwltool/docker.py index f88e77403..38f2dd87d 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -7,7 +7,6 @@ import shutil import subprocess # nosec import sys -import tempfile import threading from distutils import spawn from io import StringIO, open # pylint: disable=redefined-builtin @@ -103,9 +102,6 @@ def __init__( super(DockerCommandLineJob, self).__init__( builder, joborder, make_path_mapper, requirements, hints, name ) - # TODO: Unused; Implement for docker as well. - self.universal_file_bindmount_dir = tempfile.mkdtemp(suffix='-cwl-docker-mnt') - self.bindings_map = [] @staticmethod def get_image( diff --git a/cwltool/job.py b/cwltool/job.py index be13d0915..206542bcc 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -4,12 +4,10 @@ import logging import os import re -import resource import shutil import subprocess # nosec import sys import tempfile -import textwrap import threading import time import uuid @@ -531,15 +529,6 @@ def run( tmpdir_lock: Optional[threading.Lock] = None, ) -> None: - # attempt to set an "unlimited" (-1) heap size for this process - # (& thus commandline size) on any system that supports it - # TODO: Do containers inherit the processes's limits? - # Can they be configured from outside of the container? - try: - resource.setrlimit(resource.RLIMIT_DATA, (-1, -1)) - except Exception: - pass - if tmpdir_lock: with tmpdir_lock: if not os.path.exists(self.tmpdir): @@ -600,20 +589,6 @@ def run( class ContainerCommandLineJob(JobBase, metaclass=ABCMeta): """Commandline job using containers.""" - def __init__( - self, - builder: Builder, - joborder: CWLObjectType, - make_path_mapper: Callable[..., PathMapper], - requirements: List[CWLObjectType], - hints: List[CWLObjectType], - name: str, - ) -> None: - super(JobBase, self).__init__( - builder, joborder, make_path_mapper, requirements, hints, name - ) - self.universal_file_bindmount_dir = None - self.bindings_map = None @abstractmethod def get_from_requirements( @@ -706,69 +681,10 @@ def add_volumes( any_path_okay: bool = False, ) -> None: """Append volume mappings to the runtime option list.""" - container_outdir = self.builder.outdir - for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): - host_outdir_tgt = None # type: Optional[str] - if vol.target.startswith(container_outdir + "/"): - host_outdir_tgt = os.path.join( - self.outdir, vol.target[len(container_outdir) + 1 :] - ) - if not host_outdir_tgt and not any_path_okay: - raise WorkflowException( - "No mandatory DockerRequirement, yet path is outside " - "the designated output directory, also know as " - "$(runtime.outdir): {}".format(vol) - ) - if vol.type in ("File", "Directory"): - self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) - elif vol.type == "WritableFile": - self.add_writable_file_volume( - runtime, vol, host_outdir_tgt, tmpdir_prefix - ) - elif vol.type == "WritableDirectory": - self.add_writable_directory_volume( - runtime, vol, host_outdir_tgt, tmpdir_prefix - ) - elif vol.type in ["CreateFile", "CreateWritableFile"]: - new_path = self.create_file_and_add_volume( - runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix - ) - pathmapper.update(key, new_path, vol.target, vol.type, vol.staged) - - # Dir of individual file inputs for the job (all named as uuid4). - # This creates the same dir inside of the container as exists outside of it, - # Overlayfs must be supported/enabled (which should always be true for CWL). - src = dst = self.universal_file_bindmount_dir - runtime.append(f"--bind={src}:{dst}:rw") - - # Make a TSV of the file mappings. - mapping_tsv = os.path.join(self.universal_file_bindmount_dir, 'mapping.tsv') - with open(mapping_tsv, 'w') as f: - # 1. Sort by the destination path, which should sort alphabetically - # and by shortest path first. - # 2. Then, when we go to hardlink the files, we - # should then just be able to hardlink them in order. - for (src, dst, writable) in sorted(self.bindings_map, key=lambda x: len(x[1])): - f.write('\t'.join((src, dst, writable)) + '\n') - - # Make the script that uses the TSV file mappings to hardlink everything - # inside of the container to where the job expects to find them. - # This script needs to be the first thing run inside of the container. - linking_script = os.path.join(self.universal_file_bindmount_dir, 'hard_linking_script.py') - # TODO: Write in bash instead. All images might not have python. - with open(linking_script, 'w') as f: - f.write(textwrap.dedent(f""" - import os - - with open('{mapping_tsv}', 'r') as f: - for line in f: - src, dst, writable = line.split('\\t') - os.makedirs(os.path.dirname(dst), exist_ok=True) - os.link(src, dst) - # TODO: set the permissions on the file here after linking - - """[1:])) - os.chmod(linking_script, 0o777) + staging_dir = tempfile.mkdtemp() + pathmapper.reset_stagedir(staging_dir) + stage_files(pathmapper, symlink=False) + self.append_volume(runtime, staging_dir, staging_dir) def run( self, diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index f1f871453..8d0203f72 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -64,10 +64,18 @@ def __init__( separateDirs: bool = True, ) -> None: """Initialize the PathMapper.""" + self.referenced_files = dedup(referenced_files) + self.basedir = basedir self._pathmap = {} # type: Dict[str, MapperEnt] self.stagedir = stagedir self.separateDirs = separateDirs - self.setup(dedup(referenced_files), basedir) + self.setup(self.referenced_files, basedir) + + def reset_stagedir(self, stagedir: str) -> None: + """Changes the target stagedir for mapped files.""" + self.stagedir = stagedir + self._pathmap = {} # type: Dict[str, MapperEnt] + self.setup(self.referenced_files, self.basedir) def visitlisting( self, diff --git a/cwltool/singularity.py b/cwltool/singularity.py index d87e27084..9361bc016 100644 --- a/cwltool/singularity.py +++ b/cwltool/singularity.py @@ -5,8 +5,6 @@ import re import shutil import sys -import tempfile -from uuid import uuid4 from distutils import spawn from subprocess import ( # nosec DEVNULL, @@ -104,8 +102,6 @@ def __init__( super(SingularityCommandLineJob, self).__init__( builder, joborder, make_path_mapper, requirements, hints, name ) - self.universal_file_bindmount_dir = tempfile.mkdtemp(suffix='-cwl-singularity-mnt') - self.bindings_map = [] @staticmethod def get_image( @@ -282,25 +278,18 @@ def get_from_requirements( return os.path.abspath(cast(str, r["dockerImageId"])) + @staticmethod def append_volume( - self, runtime: List[str], source: str, target: str, writable: bool = False + runtime: List[str], source: str, target: str, writable: bool = False ) -> None: - src = docker_windows_path_adjust(source) - dst = docker_windows_path_adjust(target) - writable = "rw" if writable else "ro" - - # use only "os.path.isfile(source)" for Windows? check on this... - if os.path.isfile(source) or os.path.isfile(src): - bindmount_path = os.path.join(self.universal_file_bindmount_dir, str(uuid4())) - os.link(src, bindmount_path) - self.bindings_map.append((bindmount_path, dst, writable)) - # don't add a bind arg for the shared self.universal_file_bindmount_dir - # here but at the very end - else: - # TODO: We can still bind enough dirs to exceed the max command line length. - # Not sure how to handle this, since outputs deposited in mounted dirs - # need to be there after the run. - runtime.append(f"--bind={src}:{dst}:{writable}") + runtime.append("--bind") + runtime.append( + "{}:{}:{}".format( + docker_windows_path_adjust(source), + docker_windows_path_adjust(target), + "rw" if writable else "ro", + ) + ) def add_file_or_directory_volume( self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str] From 22e3d4845616792301e53d832b133f0a859f5fb8 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Mon, 22 Feb 2021 21:22:52 -0800 Subject: [PATCH 03/11] Only bindmount basedirs. --- cwltool/command_line_tool.py | 7 ++++++- cwltool/docker.py | 8 ++++++-- cwltool/job.py | 32 ++++++++++++++++++++++++++++---- cwltool/pathmapper.py | 10 +--------- cwltool/singularity.py | 26 ++++++++++++-------------- 5 files changed, 53 insertions(+), 30 deletions(-) diff --git a/cwltool/command_line_tool.py b/cwltool/command_line_tool.py index 71d1e9866..a39942744 100644 --- a/cwltool/command_line_tool.py +++ b/cwltool/command_line_tool.py @@ -877,8 +877,13 @@ def update_status_output_callback( ) _logger.debug("[job %s] %s", j.name, json_dumps(builder.job, indent=4)) + separateDirs = True + # sufficient conditional for docker/singularity? + if runtimeContext.use_container: + separateDirs = False + builder.pathmapper = self.make_path_mapper( - reffiles, builder.stagedir, runtimeContext, True + reffiles, builder.stagedir, runtimeContext, separateDirs ) builder.requirements = j.requirements diff --git a/cwltool/docker.py b/cwltool/docker.py index 38f2dd87d..63955579b 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -251,6 +251,9 @@ def append_volume( runtime: List[str], source: str, target: str, writable: bool = False ) -> None: """Add binding arguments to the runtime list.""" + if os.path.isfile(source) and target.endswith(os.path.basename(source)) and not writable: + source = os.path.dirname(source) + target = os.path.dirname(target) options = [ "type=bind", "source=" + source, @@ -260,8 +263,9 @@ def append_volume( options.append("readonly") output = StringIO() csv.writer(output).writerow(options) - mount_arg = output.getvalue().strip() - runtime.append("--mount={}".format(mount_arg)) + mount_arg = f"--mount={output.getvalue().strip()}" + if mount_arg not in runtime: + runtime.append(mount_arg) # Unlike "--volume", "--mount" will fail if the volume doesn't already exist. if not os.path.exists(source): os.makedirs(source) diff --git a/cwltool/job.py b/cwltool/job.py index 206542bcc..c4132d550 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -681,10 +681,34 @@ def add_volumes( any_path_okay: bool = False, ) -> None: """Append volume mappings to the runtime option list.""" - staging_dir = tempfile.mkdtemp() - pathmapper.reset_stagedir(staging_dir) - stage_files(pathmapper, symlink=False) - self.append_volume(runtime, staging_dir, staging_dir) + container_outdir = self.builder.outdir + for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): + host_outdir_tgt = None # type: Optional[str] + if vol.target.startswith(container_outdir + "/"): + host_outdir_tgt = os.path.join( + self.outdir, vol.target[len(container_outdir) + 1 :] + ) + if not host_outdir_tgt and not any_path_okay: + raise WorkflowException( + "No mandatory DockerRequirement, yet path is outside " + "the designated output directory, also know as " + "$(runtime.outdir): {}".format(vol) + ) + if vol.type in ("File", "Directory"): + self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) + elif vol.type == "WritableFile": + self.add_writable_file_volume( + runtime, vol, host_outdir_tgt, tmpdir_prefix + ) + elif vol.type == "WritableDirectory": + self.add_writable_directory_volume( + runtime, vol, host_outdir_tgt, tmpdir_prefix + ) + elif vol.type in ["CreateFile", "CreateWritableFile"]: + new_path = self.create_file_and_add_volume( + runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix + ) + pathmapper.update(key, new_path, vol.target, vol.type, vol.staged) def run( self, diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index 8d0203f72..f1f871453 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -64,18 +64,10 @@ def __init__( separateDirs: bool = True, ) -> None: """Initialize the PathMapper.""" - self.referenced_files = dedup(referenced_files) - self.basedir = basedir self._pathmap = {} # type: Dict[str, MapperEnt] self.stagedir = stagedir self.separateDirs = separateDirs - self.setup(self.referenced_files, basedir) - - def reset_stagedir(self, stagedir: str) -> None: - """Changes the target stagedir for mapped files.""" - self.stagedir = stagedir - self._pathmap = {} # type: Dict[str, MapperEnt] - self.setup(self.referenced_files, self.basedir) + self.setup(dedup(referenced_files), basedir) def visitlisting( self, diff --git a/cwltool/singularity.py b/cwltool/singularity.py index 9361bc016..8d0cebd37 100644 --- a/cwltool/singularity.py +++ b/cwltool/singularity.py @@ -280,16 +280,17 @@ def get_from_requirements( @staticmethod def append_volume( - runtime: List[str], source: str, target: str, writable: bool = False + runtime: List[str], source: str, target: str, writable: bool = False ) -> None: - runtime.append("--bind") - runtime.append( - "{}:{}:{}".format( - docker_windows_path_adjust(source), - docker_windows_path_adjust(target), - "rw" if writable else "ro", - ) - ) + src = docker_windows_path_adjust(source) + dst = docker_windows_path_adjust(target) + writable = "rw" if writable else "ro" + if os.path.isfile(src) and dst.endswith(os.path.basename(src)) and writable == 'ro': + src = os.path.dirname(src) + dst = os.path.dirname(dst) + bind_arg = f"--bind={src}:{dst}:{writable}" + if bind_arg not in runtime: + runtime.append(bind_arg) def add_file_or_directory_volume( self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str] @@ -403,17 +404,14 @@ def create_runtime( ) ) else: - runtime.append("--bind") - runtime.append( - "{}:{}:rw".format( + runtime.append("--bind={}:{}:rw".format( docker_windows_path_adjust(os.path.realpath(self.outdir)), self.builder.outdir, ) ) - runtime.append("--bind") tmpdir = "/tmp" # nosec runtime.append( - "{}:{}:rw".format( + "--bind={}:{}:rw".format( docker_windows_path_adjust(os.path.realpath(self.tmpdir)), tmpdir ) ) From c86a5222b1315dc0e6061d7f49fcbc5a032103e1 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Thu, 11 Mar 2021 16:31:10 +0100 Subject: [PATCH 04/11] cleanups --- cwltool/docker.py | 6 +++++- cwltool/singularity.py | 11 ++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/cwltool/docker.py b/cwltool/docker.py index 63955579b..adac8e2dc 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -251,7 +251,11 @@ def append_volume( runtime: List[str], source: str, target: str, writable: bool = False ) -> None: """Add binding arguments to the runtime list.""" - if os.path.isfile(source) and target.endswith(os.path.basename(source)) and not writable: + if ( + os.path.isfile(source) + and target.endswith(os.path.basename(source)) + and not writable + ): source = os.path.dirname(source) target = os.path.dirname(target) options = [ diff --git a/cwltool/singularity.py b/cwltool/singularity.py index 8d0cebd37..f2a119c22 100644 --- a/cwltool/singularity.py +++ b/cwltool/singularity.py @@ -280,12 +280,16 @@ def get_from_requirements( @staticmethod def append_volume( - runtime: List[str], source: str, target: str, writable: bool = False + runtime: List[str], source: str, target: str, writable: bool = False ) -> None: src = docker_windows_path_adjust(source) dst = docker_windows_path_adjust(target) writable = "rw" if writable else "ro" - if os.path.isfile(src) and dst.endswith(os.path.basename(src)) and writable == 'ro': + if ( + os.path.isfile(src) + and dst.endswith(os.path.basename(src)) + and writable == "ro" + ): src = os.path.dirname(src) dst = os.path.dirname(dst) bind_arg = f"--bind={src}:{dst}:{writable}" @@ -404,7 +408,8 @@ def create_runtime( ) ) else: - runtime.append("--bind={}:{}:rw".format( + runtime.append( + "--bind={}:{}:rw".format( docker_windows_path_adjust(os.path.realpath(self.outdir)), self.builder.outdir, ) From abf2fc9be7bad6697ec6621770845b0fd45413e5 Mon Sep 17 00:00:00 2001 From: "Michael R. Crusoe" Date: Thu, 11 Mar 2021 16:34:58 +0100 Subject: [PATCH 05/11] type fix --- cwltool/singularity.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cwltool/singularity.py b/cwltool/singularity.py index f2a119c22..43093bfbb 100644 --- a/cwltool/singularity.py +++ b/cwltool/singularity.py @@ -284,15 +284,15 @@ def append_volume( ) -> None: src = docker_windows_path_adjust(source) dst = docker_windows_path_adjust(target) - writable = "rw" if writable else "ro" + writable_flag = "rw" if writable else "ro" if ( os.path.isfile(src) and dst.endswith(os.path.basename(src)) - and writable == "ro" + and writable_flag == "ro" ): src = os.path.dirname(src) dst = os.path.dirname(dst) - bind_arg = f"--bind={src}:{dst}:{writable}" + bind_arg = f"--bind={src}:{dst}:{writable_flag}" if bind_arg not in runtime: runtime.append(bind_arg) From 14814ae338484cc8e685d92d9c37f832ea29146e Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Mon, 15 Mar 2021 23:21:50 -0700 Subject: [PATCH 06/11] Update uuids. --- cwltool/command_line_tool.py | 7 +------ cwltool/pathmapper.py | 4 +++- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/cwltool/command_line_tool.py b/cwltool/command_line_tool.py index a39942744..71d1e9866 100644 --- a/cwltool/command_line_tool.py +++ b/cwltool/command_line_tool.py @@ -877,13 +877,8 @@ def update_status_output_callback( ) _logger.debug("[job %s] %s", j.name, json_dumps(builder.job, indent=4)) - separateDirs = True - # sufficient conditional for docker/singularity? - if runtimeContext.use_container: - separateDirs = False - builder.pathmapper = self.make_path_mapper( - reffiles, builder.stagedir, runtimeContext, separateDirs + reffiles, builder.stagedir, runtimeContext, True ) builder.requirements = j.requirements diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index f1f871453..8a65c4abc 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -171,7 +171,9 @@ def setup(self, referenced_files: List[CWLObjectType], basedir: str) -> None: stagedir = self.stagedir for fob in referenced_files: if self.separateDirs: - stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) + stagedir = os.path.join(self.stagedir, + "s5g%s" % uuid.uuid5(uuid.UUID('6a56ca02-b6f0-4c1a-a4b0-fb0068ce80ad'), + os.path.dirname(fob['location']))) self.visit( fob, stagedir, From c36150166dfefe73f1c3171b1b9a7023661958c2 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Tue, 24 Aug 2021 17:56:41 -0700 Subject: [PATCH 07/11] New approach. --- cwltool/docker.py | 66 +++------ cwltool/job.py | 307 ++++++++++++++++++++++++++--------------- cwltool/pathmapper.py | 16 +-- cwltool/process.py | 174 ++++++++++++++--------- cwltool/singularity.py | 93 ++++++------- 5 files changed, 372 insertions(+), 284 deletions(-) diff --git a/cwltool/docker.py b/cwltool/docker.py index adac8e2dc..e5e2ae45b 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -8,8 +8,7 @@ import subprocess # nosec import sys import threading -from distutils import spawn -from io import StringIO, open # pylint: disable=redefined-builtin +from io import StringIO # pylint: disable=redefined-builtin from typing import Callable, Dict, List, MutableMapping, Optional, Set, Tuple, cast import requests @@ -21,13 +20,7 @@ from .job import ContainerCommandLineJob from .loghandler import _logger from .pathmapper import MapperEnt, PathMapper -from .utils import ( - CWLObjectType, - create_tmp_dir, - docker_windows_path_adjust, - ensure_writable, - onWindows, -) +from .utils import CWLObjectType, create_tmp_dir, ensure_writable _IMAGES = set() # type: Set[str] _IMAGES_LOCK = threading.Lock() @@ -62,14 +55,10 @@ def _get_docker_machine_mounts() -> List[str]: def _check_docker_machine_path(path: Optional[str]) -> None: if path is None: return - if onWindows(): - path = path.lower() mounts = _get_docker_machine_mounts() found = False for mount in mounts: - if onWindows(): - mount = mount.lower() if path.startswith(mount): found = True break @@ -99,9 +88,7 @@ def __init__( name: str, ) -> None: """Initialize a command line builder using the Docker software container engine.""" - super(DockerCommandLineJob, self).__init__( - builder, joborder, make_path_mapper, requirements, hints, name - ) + super().__init__(builder, joborder, make_path_mapper, requirements, hints, name) @staticmethod def get_image( @@ -237,7 +224,7 @@ def get_from_requirements( force_pull: bool, tmp_outdir_prefix: str, ) -> Optional[str]: - if not spawn.find_executable("docker"): + if not shutil.which("docker"): raise WorkflowException("docker executable is not available") if self.get_image( @@ -251,13 +238,6 @@ def append_volume( runtime: List[str], source: str, target: str, writable: bool = False ) -> None: """Add binding arguments to the runtime list.""" - if ( - os.path.isfile(source) - and target.endswith(os.path.basename(source)) - and not writable - ): - source = os.path.dirname(source) - target = os.path.dirname(target) options = [ "type=bind", "source=" + source, @@ -267,9 +247,8 @@ def append_volume( options.append("readonly") output = StringIO() csv.writer(output).writerow(options) - mount_arg = f"--mount={output.getvalue().strip()}" - if mount_arg not in runtime: - runtime.append(mount_arg) + mount_arg = output.getvalue().strip() + runtime.append(f"--mount={mount_arg}") # Unlike "--volume", "--mount" will fail if the volume doesn't already exist. if not os.path.exists(source): os.makedirs(source) @@ -279,7 +258,7 @@ def add_file_or_directory_volume( ) -> None: """Append volume a file/dir mapping to the runtime option list.""" if not volume.resolved.startswith("_:"): - _check_docker_machine_path(docker_windows_path_adjust(volume.resolved)) + _check_docker_machine_path(volume.resolved) self.append_volume(runtime, volume.resolved, volume.target) def add_writable_file_volume( @@ -339,6 +318,15 @@ def add_writable_directory_volume( shutil.copytree(volume.resolved, host_outdir_tgt) ensure_writable(host_outdir_tgt or new_dir) + def _required_env(self) -> Dict[str, str]: + # spec currently says "HOME must be set to the designated output + # directory." but spec might change to designated temp directory. + # runtime.append("--env=HOME=/tmp") + return { + "TMPDIR": self.CONTAINER_TMPDIR, + "HOME": self.builder.outdir, + } + def create_runtime( self, env: MutableMapping[str, str], runtimeContext: RuntimeContext ) -> Tuple[List[str], Optional[str]]: @@ -356,9 +344,8 @@ def create_runtime( self.append_volume( runtime, os.path.realpath(self.outdir), self.builder.outdir, writable=True ) - tmpdir = "/tmp" # nosec self.append_volume( - runtime, os.path.realpath(self.tmpdir), tmpdir, writable=True + runtime, os.path.realpath(self.tmpdir), self.CONTAINER_TMPDIR, writable=True ) self.add_volumes( self.pathmapper, @@ -380,9 +367,7 @@ def create_runtime( runtime = [x.replace(":ro", "") for x in runtime] runtime = [x.replace(":rw", "") for x in runtime] - runtime.append( - "--workdir=%s" % (docker_windows_path_adjust(self.builder.outdir)) - ) + runtime.append("--workdir=%s" % (self.builder.outdir)) if not user_space_docker_cmd: if not runtimeContext.no_read_only: @@ -390,7 +375,7 @@ def create_runtime( if self.networkaccess: if runtimeContext.custom_net: - runtime.append("--net={0}".format(runtimeContext.custom_net)) + runtime.append(f"--net={runtimeContext.custom_net}") else: runtime.append("--net=none") @@ -398,9 +383,7 @@ def create_runtime( runtime.append("--log-driver=none") euid, egid = docker_vm_id() - if not onWindows(): - # MS Windows does not have getuid() or geteuid() functions - euid, egid = euid or os.geteuid(), egid or os.getgid() + euid, egid = euid or os.geteuid(), egid or os.getgid() if runtimeContext.no_match_user is False and ( euid is not None and egid is not None @@ -410,13 +393,6 @@ def create_runtime( if runtimeContext.rm_container: runtime.append("--rm") - runtime.append("--env=TMPDIR=/tmp") - - # spec currently says "HOME must be set to the designated output - # directory." but spec might change to designated temp directory. - # runtime.append("--env=HOME=/tmp") - runtime.append("--env=HOME=%s" % self.builder.outdir) - cidfile_path = None # type: Optional[str] # add parameters to docker to write a container ID file if runtimeContext.user_space_docker_cmd is None: @@ -445,7 +421,7 @@ def create_runtime( cidfile_path = os.path.join(cidfile_dir, cidfile_name) runtime.append("--cidfile=%s" % cidfile_path) for key, value in self.environment.items(): - runtime.append("--env=%s=%s" % (key, value)) + runtime.append(f"--env={key}={value}") if runtimeContext.strict_memory_limit and not user_space_docker_cmd: ram = self.builder.resources["ram"] diff --git a/cwltool/job.py b/cwltool/job.py index c4132d550..d5a7a75b3 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -3,6 +3,7 @@ import itertools import logging import os +import stat import re import shutil import subprocess # nosec @@ -17,8 +18,10 @@ from typing import ( IO, Callable, + Dict, Iterable, List, + Mapping, Match, MutableMapping, MutableSequence, @@ -36,6 +39,7 @@ from schema_salad.utils import json_dump, json_dumps from typing_extensions import TYPE_CHECKING +from . import env_to_stdout, run_job from .builder import Builder, HasReqsHints from .context import RuntimeContext from .errors import UnsupportedRequirement, WorkflowException @@ -49,11 +53,9 @@ DirectoryType, OutputCallbackType, bytes2str_in_dicts, - copytree_with_merge, create_tmp_dir, ensure_non_writable, ensure_writable, - onWindows, processes_to_kill, ) @@ -64,66 +66,7 @@ FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1" SHELL_COMMAND_TEMPLATE = """#!/bin/bash -python "run_job.py" "job.json" -""" - -PYTHON_RUN_SCRIPT = """ -import json -import os -import sys -if os.name == 'posix': - try: - import subprocess32 as subprocess # type: ignore - except Exception: - import subprocess -else: - import subprocess # type: ignore - -with open(sys.argv[1], "r") as f: - popen_description = json.load(f) - commands = popen_description["commands"] - cwd = popen_description["cwd"] - env = popen_description["env"] - env["PATH"] = os.environ.get("PATH") - stdin_path = popen_description["stdin_path"] - stdout_path = popen_description["stdout_path"] - stderr_path = popen_description["stderr_path"] - if stdin_path is not None: - stdin = open(stdin_path, "rb") - else: - stdin = subprocess.PIPE - if stdout_path is not None: - stdout = open(stdout_path, "wb") - else: - stdout = sys.stderr - if stderr_path is not None: - stderr = open(stderr_path, "wb") - else: - stderr = sys.stderr - if os.name == 'nt': - close_fds = False - for key, value in env.items(): - env[key] = str(value) - else: - close_fds = True - sp = subprocess.Popen(commands, - shell=False, - close_fds=close_fds, - stdin=stdin, - stdout=stdout, - stderr=stderr, - env=env, - cwd=cwd) - if sp.stdin: - sp.stdin.close() - rcode = sp.wait() - if stdin is not subprocess.PIPE: - stdin.close() - if stdout is not sys.stderr: - stdout.close() - if stderr is not sys.stderr: - stderr.close() - sys.exit(rcode) +python3 "run_job.py" "job.json" """ @@ -155,15 +98,7 @@ def relink_initialworkdir( pass elif os.path.isdir(host_outdir_tgt) and not vol.resolved.startswith("_:"): shutil.rmtree(host_outdir_tgt) - if onWindows(): - # If this becomes a big issue for someone then we could - # refactor the code to process output from a running container - # and avoid all the extra IO below - if vol.type in ("File", "WritableFile"): - shutil.copy(vol.resolved, host_outdir_tgt) - elif vol.type in ("Directory", "WritableDirectory"): - copytree_with_merge(vol.resolved, host_outdir_tgt) - elif not vol.resolved.startswith("_:"): + if not vol.resolved.startswith("_:"): try: os.symlink(vol.resolved, host_outdir_tgt) except FileExistsError: @@ -188,7 +123,7 @@ def __init__( name: str, ) -> None: """Initialize the job object.""" - super(JobBase, self).__init__() + super().__init__() self.builder = builder self.joborder = joborder self.stdin = None # type: Optional[str] @@ -241,13 +176,24 @@ def _setup(self, runtimeContext: RuntimeContext) -> None: if not os.path.exists(self.outdir): os.makedirs(self.outdir) + def is_streamable(file: str) -> bool: + if not runtimeContext.streaming_allowed: + return False + for inp in self.joborder.values(): + if isinstance(inp, dict) and inp.get("location", None) == file: + return inp.get("streamable", False) + return False + for knownfile in self.pathmapper.files(): p = self.pathmapper.mapper(knownfile) if p.type == "File" and not os.path.isfile(p[0]) and p.staged: - raise WorkflowException( - "Input file %s (at %s) not found or is not a regular " - "file." % (knownfile, self.pathmapper.mapper(knownfile)[0]) - ) + if not ( + is_streamable(knownfile) and stat.S_ISFIFO(os.stat(p[0]).st_mode) + ): + raise WorkflowException( + "Input file %s (at %s) not found or is not a regular " + "file." % (knownfile, self.pathmapper.mapper(knownfile)[0]) + ) if "listing" in self.generatefiles: runtimeContext = runtimeContext.copy() @@ -278,7 +224,18 @@ def _execute( runtimeContext: RuntimeContext, monitor_function=None, # type: Optional[Callable[[subprocess.Popen[str]], None]] ) -> None: + """Execute the tool, either directly or via script. + Note: we are now at the point where self.environment is + ignored. The caller is responsible for correctly splitting that + into the runtime and env arguments. + + `runtime` is the list of arguments to put at the start of the + command (e.g. docker run). + + `env` is the enviroment to be set for running the resulting + command line. + """ scr = self.get_requirement("ShellCommandRequirement")[0] shouldquote = needs_shell_quoting_re.search @@ -335,9 +292,7 @@ def _execute( if self.stdin is not None: rmap = self.pathmapper.reversemap(self.stdin) if rmap is None: - raise WorkflowException( - "{} missing from pathmapper".format(self.stdin) - ) + raise WorkflowException(f"{self.stdin} missing from pathmapper") else: stdin_path = rmap[1] @@ -489,6 +444,61 @@ def _execute( ) shutil.rmtree(self.tmpdir, True) + @abstractmethod + def _required_env(self) -> Dict[str, str]: + """Variables required by the CWL spec (HOME, TMPDIR, etc). + + Note that with containers, the paths will (likely) be those from + inside. + """ + pass + + def _preserve_environment_on_containers_warning( + self, varname: Optional[Iterable[str]] = None + ) -> None: + """When running in a container, issue a warning.""" + # By default, don't do anything; ContainerCommandLineJob below + # will issue a warning. + pass + + def prepare_environment( + self, runtimeContext: RuntimeContext, envVarReq: Mapping[str, str] + ) -> None: + """Set up environment variables. + + Here we prepare the environment for the job, based on any + preserved variables and `EnvVarRequirement`. Later, changes due + to `MPIRequirement`, `Secrets`, or `SoftwareRequirement` are + applied (in that order). + """ + # Start empty + env: Dict[str, str] = {} + + # Preserve any env vars + if runtimeContext.preserve_entire_environment: + self._preserve_environment_on_containers_warning() + env.update(os.environ) + elif runtimeContext.preserve_environment: + self._preserve_environment_on_containers_warning( + runtimeContext.preserve_environment + ) + for key in runtimeContext.preserve_environment: + try: + env[key] = os.environ[key] + except KeyError: + _logger.warning( + f"Attempting to preserve environment variable '{key}' which is not present" + ) + + # Set required env vars + env.update(self._required_env()) + + # Apply EnvVarRequirement + env.update(envVarReq) + + # Set on ourselves + self.environment = env + def process_monitor(self, sproc): # type: (subprocess.Popen[str]) -> None monitor = psutil.Process(sproc.pid) # Value must be list rather than integer to utilise pass-by-reference in python @@ -498,9 +508,9 @@ def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None: children = monitor.children() rss = monitor.memory_info().rss while len(children): - rss += sum([process.memory_info().rss for process in children]) + rss += sum(process.memory_info().rss for process in children) children = list( - itertools.chain(*[process.children() for process in children]) + itertools.chain(*(process.children() for process in children)) ) if memory_usage[0] is None or rss > memory_usage[0]: memory_usage[0] = rss @@ -539,26 +549,6 @@ def run( self._setup(runtimeContext) - env = self.environment - vars_to_preserve = runtimeContext.preserve_environment - if runtimeContext.preserve_entire_environment is not False: - vars_to_preserve = os.environ - if vars_to_preserve: - for key, value in os.environ.items(): - if key in vars_to_preserve and key not in env: - # On Windows, subprocess env can't handle unicode. - env[key] = str(value) if onWindows() else value - env["HOME"] = str(self.outdir) if onWindows() else self.outdir - env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir - if "PATH" not in env: - env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"] - if "SYSTEMROOT" not in env and "SYSTEMROOT" in os.environ: - env["SYSTEMROOT"] = ( - str(os.environ["SYSTEMROOT"]) - if onWindows() - else os.environ["SYSTEMROOT"] - ) - stage_files( self.pathmapper, ignore_writable=True, @@ -581,7 +571,16 @@ def run( monitor_function = functools.partial(self.process_monitor) - self._execute([], env, runtimeContext, monitor_function) + self._execute([], self.environment, runtimeContext, monitor_function) + + def _required_env(self) -> Dict[str, str]: + env = {} + env["HOME"] = self.outdir + env["TMPDIR"] = self.tmpdir + env["PATH"] = os.environ["PATH"] + if "SYSTEMROOT" in os.environ: + env["SYSTEMROOT"] = os.environ["SYSTEMROOT"] + return env CONTROL_CODE_RE = r"\x1b\[[0-9;]*[a-zA-Z]" @@ -590,6 +589,8 @@ def run( class ContainerCommandLineJob(JobBase, metaclass=ABCMeta): """Commandline job using containers.""" + CONTAINER_TMPDIR: str = "/tmp" # nosec + @abstractmethod def get_from_requirements( self, @@ -641,6 +642,19 @@ def add_writable_directory_volume( ) -> None: """Append a writable directory mapping to the runtime option list.""" + def _preserve_environment_on_containers_warning( + self, varnames: Optional[Iterable[str]] = None + ) -> None: + """When running in a container, issue a warning.""" + if varnames is None: + flags = "--preserve-entire-environment" + else: + flags = "--preserve-environment={" + ", ".join(varnames) + "}" + + _logger.warning( + f"You have specified `{flags}` while running a container which will override variables set in the container. This may break the container, be non-portable, and/or affect reproducibility." + ) + def create_file_and_add_volume( self, runtime: List[str], @@ -726,7 +740,6 @@ def run( (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") self.prov_obj = runtimeContext.prov_obj img_id = None - env = cast(MutableMapping[str, str], os.environ) user_space_docker_cmd = runtimeContext.user_space_docker_cmd if docker_req is not None and user_space_docker_cmd: # For user-space docker implementations, a local image name or ID @@ -807,7 +820,9 @@ def run( _logger.debug("%s error", container, exc_info=True) if docker_is_req: raise UnsupportedRequirement( - "%s is required to run this tool: %s" % (container, str(err)) + "{} is required to run this tool: {}".format( + container, str(err) + ) ) from err else: raise WorkflowException( @@ -818,7 +833,34 @@ def run( ) self._setup(runtimeContext) + + # Copy as don't want to modify our env + env = dict(os.environ) (runtime, cidfile) = self.create_runtime(env, runtimeContext) + + # if command is larger than this, we might exceed system limits + # 2097150 bytes is the Ubuntu system default + # check with: echo $(( $(ulimit -s)*1024 / 4 )) + # or: getconf ARG_MAX + # TODO: create a cross-platform function to check limit where it's run + if len(''.join(runtime)) >= 2097152 - 2: + if runtimeContext.singularity: + copy_these_into_container, new_runtime = self.filter_out_singularity_image_file_inputs(runtime) + if copy_these_into_container: + runtime = new_runtime + img_id = self.bake_inputs_into_docker_container(str(img_id), copy_these_into_container) + else: + copy_these_into_container, new_runtime = self.filter_out_docker_image_file_inputs(runtime) + if copy_these_into_container: + runtime = new_runtime + img_id = self.bake_inputs_into_docker_container(str(img_id), copy_these_into_container) + + # if not runtimeContext.singularity: + # copy_these_into_container, new_runtime = self.filter_out_docker_image_file_inputs(runtime, runtimeContext.docker_outdir) + # if copy_these_into_container: + # runtime = new_runtime + # img_id = self.bake_inputs_into_container(str(img_id), copy_these_into_container) + runtime.append(str(img_id)) monitor_function = None if cidfile: @@ -832,6 +874,42 @@ def run( monitor_function = functools.partial(self.process_monitor) self._execute(runtime, env, runtimeContext, monitor_function) + def bake_inputs_into_docker_container(self, image_id, inputs): + i = [f'FROM {image_id}'] + + # we do this to keep files in the build context for docker + staging_dir = tempfile.mkdtemp() + for input in inputs: + staged_file_uuid = str(uuid.uuid4()) + staged_file = os.path.join(staging_dir, staged_file_uuid) + shutil.copy(input['src'], staged_file) + i.append(f'COPY {staged_file_uuid} {input["dst"]}') + + with open(os.path.join(staging_dir, 'Dockerfile'), 'w') as f: + f.write('\n'.join(i)) + + docker_id = str(uuid.uuid4()) + _logger.critical(str(i)) + subprocess.run(['docker', 'build', '-f', os.path.join(staging_dir, 'Dockerfile'), '.', '-t', f'{docker_id}:cwl'], + cwd=staging_dir, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return f'{docker_id}:cwl' + + def filter_out_docker_image_file_inputs(self, runtime, outdir): + new_runtime = [] + copy_these_into_container = [] + for arg in runtime: + if arg.startswith('--mount=type=bind,source='): + src, dst = arg[len('--mount=type=bind,source='):].split(',target=') + if dst.endswith(',readonly'): + dst = dst[:-len(',readonly')] + if os.path.isfile(src) and not dst.startswith(outdir): + copy_these_into_container.append({'src': src, 'dst': dst}) + else: + new_runtime.append(arg) + else: + new_runtime.append(arg) + return copy_these_into_container, new_runtime + def docker_monitor( self, cidfile: str, @@ -880,7 +958,7 @@ def docker_monitor( return max_mem_percent = 0 # type: float mem_percent = 0 # type: float - with open(stats_file_name, mode="r") as stats: + with open(stats_file_name) as stats: while True: line = stats.readline() if not line: @@ -907,7 +985,7 @@ def _job_popen( stdin_path: Optional[str], stdout_path: Optional[str], stderr_path: Optional[str], - env: MutableMapping[str, str], + env: Mapping[str, str], cwd: str, make_job_dir: Callable[[], str], job_script_contents: Optional[str] = None, @@ -939,7 +1017,7 @@ def _job_popen( sproc = subprocess.Popen( commands, shell=False, # nosec - close_fds=not onWindows(), + close_fds=True, stdin=stdin, stdout=stdout, stderr=stderr, @@ -990,15 +1068,10 @@ def terminate(): # type: () -> None if job_script_contents is None: job_script_contents = SHELL_COMMAND_TEMPLATE - env_copy = {} - key = None # type: Optional[str] - for key in env: - env_copy[key] = env[key] - job_description = { "commands": commands, "cwd": cwd, - "env": env_copy, + "env": env, "stdout_path": stdout_path, "stderr_path": stderr_path, "stdin_path": stdin_path, @@ -1013,9 +1086,13 @@ def terminate(): # type: () -> None job_script = os.path.join(job_dir, "run_job.bash") with open(job_script, "wb") as _: _.write(job_script_contents.encode("utf-8")) + job_run = os.path.join(job_dir, "run_job.py") - with open(job_run, "wb") as _: - _.write(PYTHON_RUN_SCRIPT.encode("utf-8")) + shutil.copyfile(run_job.__file__, job_run) + + env_getter = os.path.join(job_dir, "env_to_stdout.py") + shutil.copyfile(env_to_stdout.__file__, env_getter) + sproc = subprocess.Popen( # nosec ["bash", job_script.encode("utf-8")], shell=False, # nosec diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index 8a65c4abc..c89fac5f4 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -12,14 +12,14 @@ from .loghandler import _logger from .stdfsaccess import abspath -from .utils import CWLObjectType, convert_pathsep_to_unix, dedup, downloadHttpFile +from .utils import CWLObjectType, dedup, downloadHttpFile MapperEnt = collections.namedtuple( "MapperEnt", ["resolved", "target", "type", "staged"] ) -class PathMapper(object): +class PathMapper: """ Mapping of files from relative path provided in the file to a tuple. @@ -95,11 +95,9 @@ def visit( staged: bool = False, ) -> None: stagedir = cast(Optional[str], obj.get("dirname")) or stagedir - tgt = convert_pathsep_to_unix( - os.path.join( - stagedir, - cast(str, obj["basename"]), - ) + tgt = os.path.join( + stagedir, + cast(str, obj["basename"]), ) if obj["location"] in self._pathmap: return @@ -171,9 +169,7 @@ def setup(self, referenced_files: List[CWLObjectType], basedir: str) -> None: stagedir = self.stagedir for fob in referenced_files: if self.separateDirs: - stagedir = os.path.join(self.stagedir, - "s5g%s" % uuid.uuid5(uuid.UUID('6a56ca02-b6f0-4c1a-a4b0-fb0068ce80ad'), - os.path.dirname(fob['location']))) + stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) self.visit( fob, stagedir, diff --git a/cwltool/process.py b/cwltool/process.py index ea2655e5d..6f286b102 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -12,7 +12,6 @@ import textwrap import urllib import uuid -from io import open from os import scandir from typing import ( Any, @@ -45,12 +44,12 @@ from schema_salad.ref_resolver import Loader, file_uri, uri_file_path from schema_salad.schema import load_schema, make_avro_schema, make_valid_avro from schema_salad.sourceline import SourceLine, strip_dup_lineno -from schema_salad.utils import convert_to_dict -from schema_salad.validate import validate_ex +from schema_salad.utils import ContextType, convert_to_dict +from schema_salad.validate import validate_ex, avro_type_name from typing_extensions import TYPE_CHECKING from . import expression -from .builder import Builder, HasReqsHints +from .builder import Builder, HasReqsHints, INPUT_OBJ_VOCAB from .context import LoadingContext, RuntimeContext, getdefault from .errors import UnsupportedRequirement, WorkflowException from .loghandler import _logger @@ -68,11 +67,9 @@ adjustDirObjs, aslist, cmp_like_py2, - copytree_with_merge, ensure_writable, get_listing, normalizeFilesDirs, - onWindows, random_outdir, visit_class, ) @@ -86,7 +83,7 @@ class LogAsDebugFilter(logging.Filter): def __init__(self, name: str, parent: logging.Logger) -> None: """Initialize.""" name = str(name) - super(LogAsDebugFilter, self).__init__(name) + super().__init__(name) self.parent = parent def filter(self, record: logging.LogRecord) -> bool: @@ -196,23 +193,23 @@ def get_schema( version = ".".join(version.split(".")[:-1]) for f in cwl_files: try: - res = resource_stream(__name__, "schemas/%s/%s" % (version, f)) + res = resource_stream(__name__, f"schemas/{version}/{f}") cache["https://w3id.org/cwl/" + f] = res.read().decode("UTF-8") res.close() - except IOError: + except OSError: pass for f in salad_files: try: res = resource_stream( __name__, - "schemas/{}/salad/schema_salad/metaschema/{}".format(version, f), + f"schemas/{version}/salad/schema_salad/metaschema/{f}", ) cache[ "https://w3id.org/cwl/salad/schema_salad/metaschema/" + f ] = res.read().decode("UTF-8") res.close() - except IOError: + except OSError: pass if version in custom_schemas: @@ -254,15 +251,10 @@ def checkRequirements( checkRequirements(entry2, supported_process_requirements) -def stage_files( - pathmapper: PathMapper, - stage_func: Optional[Callable[[str, str], None]] = None, - ignore_writable: bool = False, - symlink: bool = True, - secret_store: Optional[SecretStore] = None, - fix_conflicts: bool = False, -) -> None: - """Link or copy files to their targets. Create them as needed.""" +def check_pathmapper_conflicts(pathmapper: PathMapper, fix_conflicts: bool = False) -> None: + """ + All pathmapper resolved file paths should be unique. If any conflict, this will error early or fix them. + """ targets = {} # type: Dict[str, MapperEnt] for key, entry in pathmapper.items(): if "File" not in entry.type: @@ -274,10 +266,10 @@ def stage_files( # find first key that does not clash with an existing entry in targets # start with entry.target + '_' + 2 and then keep incrementing the number till there is no clash i = 2 - tgt = "%s_%s" % (entry.target, i) + tgt = f"{entry.target}_{i}" while tgt in targets: i += 1 - tgt = "%s_%s" % (entry.target, i) + tgt = f"{entry.target}_{i}" targets[tgt] = pathmapper.update( key, entry.resolved, tgt, entry.type, entry.staged ) @@ -287,6 +279,18 @@ def stage_files( % (targets[entry.target].resolved, entry.resolved, entry.target) ) + +def stage_files( + pathmapper: PathMapper, + stage_func: Optional[Callable[[str, str], None]] = None, + ignore_writable: bool = False, + symlink: bool = True, + secret_store: Optional[SecretStore] = None, + fix_conflicts: bool = False, +) -> None: + """Link or copy files to their targets. Create them as needed.""" + check_pathmapper_conflicts(pathmapper, fix_conflicts) + for key, entry in pathmapper.items(): if not entry.staged: continue @@ -294,15 +298,7 @@ def stage_files( os.makedirs(os.path.dirname(entry.target)) if entry.type in ("File", "Directory") and os.path.exists(entry.resolved): if symlink: # Use symlink func if allowed - if onWindows(): - if entry.type == "File": - shutil.copy(entry.resolved, entry.target) - elif entry.type == "Directory": - if os.path.exists(entry.target) and os.path.isdir(entry.target): - shutil.rmtree(entry.target) - copytree_with_merge(entry.resolved, entry.target) - else: - os.symlink(entry.resolved, entry.target) + os.symlink(entry.resolved, entry.target) elif stage_func is not None: stage_func(entry.resolved, entry.target) elif ( @@ -319,7 +315,7 @@ def stage_files( os.makedirs(entry.target) else: shutil.copytree(entry.resolved, entry.target) - ensure_writable(entry.target) + ensure_writable(entry.target, include_root=True) elif entry.type == "CreateFile" or entry.type == "CreateWritableFile": with open(entry.target, "wb") as new: if secret_store is not None: @@ -357,12 +353,10 @@ def _collectDirEntries( yield obj else: for sub_obj in obj.values(): - for dir_entry in _collectDirEntries(sub_obj): - yield dir_entry + yield from _collectDirEntries(sub_obj) elif isinstance(obj, MutableSequence): for sub_obj in obj: - for dir_entry in _collectDirEntries(sub_obj): - yield dir_entry + yield from _collectDirEntries(sub_obj) def _relocate(src: str, dst: str) -> None: if src == dst: @@ -470,30 +464,34 @@ def fill_in_defaults( def avroize_type( - field_type: Union[ - CWLObjectType, MutableSequence[CWLOutputType], CWLOutputType, None - ], + field_type: Union[CWLObjectType, MutableSequence[Any], CWLOutputType, None], name_prefix: str = "", -) -> None: +) -> Union[CWLObjectType, MutableSequence[Any], CWLOutputType, None]: """Add missing information to a type so that CWL types are valid.""" if isinstance(field_type, MutableSequence): - for field in field_type: - avroize_type(field, name_prefix) + for i, field in enumerate(field_type): + field_type[i] = avroize_type(field, name_prefix) elif isinstance(field_type, MutableMapping): if field_type["type"] in ("enum", "record"): if "name" not in field_type: field_type["name"] = name_prefix + str(uuid.uuid4()) if field_type["type"] == "record": - avroize_type( + field_type["fields"] = avroize_type( cast(MutableSequence[CWLOutputType], field_type["fields"]), name_prefix ) - if field_type["type"] == "array": - avroize_type( + elif field_type["type"] == "array": + field_type["items"] = avroize_type( cast(MutableSequence[CWLOutputType], field_type["items"]), name_prefix ) - if isinstance(field_type["type"], MutableSequence): - for ctype in field_type["type"]: - avroize_type(cast(CWLOutputType, ctype), name_prefix) + else: + field_type["type"] = avroize_type( + cast(CWLOutputType, field_type["type"]), name_prefix + ) + elif field_type == "File": + return "org.w3id.cwl.cwl.File" + elif field_type == "Directory": + return "org.w3id.cwl.cwl.Directory" + return field_type def get_overrides( @@ -568,7 +566,7 @@ def __init__( self, toolpath_object: CommentedMap, loadingContext: LoadingContext ) -> None: """Build a Process object from the provided dictionary.""" - super(Process, self).__init__() + super().__init__() self.metadata = getdefault(loadingContext.metadata, {}) # type: CWLObjectType self.provenance_object = None # type: Optional[ProvenanceProfile] self.parent_wf = None # type: Optional[ProvenanceProfile] @@ -591,7 +589,15 @@ def __init__( self.names = make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY], Loader({})) self.tool = toolpath_object self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, [])) - self.requirements.extend(self.tool.get("requirements", [])) + tool_requirements = self.tool.get("requirements", []) + if tool_requirements is None: + raise ValidationException( + SourceLine(self.tool, "requirements").makeError( + "If 'requirements' is present then it must be a list " + "or map/dictionary, not empty." + ) + ) + self.requirements.extend(tool_requirements) if "id" not in self.tool: self.tool["id"] = "_:" + str(uuid.uuid4()) self.requirements.extend( @@ -603,7 +609,15 @@ def __init__( ) ) self.hints = copy.deepcopy(getdefault(loadingContext.hints, [])) - self.hints.extend(self.tool.get("hints", [])) + tool_hints = self.tool.get("hints", []) + if tool_hints is None: + raise ValidationException( + SourceLine(self.tool, "hints").makeError( + "If 'hints' is present then it must be a list " + "or map/dictionary, not empty." + ) + ) + self.hints.extend(tool_hints) # Versions of requirements and hints which aren't mutated. self.original_requirements = copy.deepcopy(self.requirements) self.original_hints = copy.deepcopy(self.hints) @@ -626,12 +640,13 @@ def __init__( sd, _ = self.get_requirement("SchemaDefRequirement") if sd is not None: - sdtypes = cast(MutableSequence[CWLObjectType], sd["types"]) + sdtypes = copy.deepcopy(cast(MutableSequence[CWLObjectType], sd["types"])) avroize_type(cast(MutableSequence[CWLOutputType], sdtypes)) av = make_valid_avro( sdtypes, {cast(str, t["name"]): cast(Dict[str, Any], t) for t in sdtypes}, set(), + vocab=INPUT_OBJ_VOCAB, ) for i in av: self.schemaDefs[i["name"]] = i # type: ignore @@ -666,7 +681,8 @@ def __init__( c["type"] = nullable else: c["type"] = c["type"] - avroize_type(c["type"], c["name"]) + + c["type"] = avroize_type(c["type"], c["name"]) if key == "inputs": cast( List[CWLObjectType], self.inputs_record_schema["fields"] @@ -706,9 +722,13 @@ def __init__( ) raise if self.doc_schema is not None: + classname = toolpath_object["class"] + avroname = classname + if self.doc_loader and classname in self.doc_loader.vocab: + avroname = avro_type_name(self.doc_loader.vocab[classname]) validate_js_expressions( toolpath_object, - self.doc_schema.names[toolpath_object["class"]], + self.doc_schema.names[avroname], validate_js_options, ) @@ -775,7 +795,13 @@ def _init_job( raise WorkflowException( "Missing input record schema: " "{}".format(self.names) ) - validate_ex(schema, job, strict=False, logger=_logger_validation_warnings) + validate_ex( + schema, + job, + strict=False, + logger=_logger_validation_warnings, + vocab=INPUT_OBJ_VOCAB, + ) if load_listing and load_listing != "no_listing": get_listing(fs_access, job, recursive=(load_listing == "deep_listing")) @@ -1012,25 +1038,29 @@ def evalResources( def validate_hints( self, avsc_names: Names, hints: List[CWLObjectType], strict: bool ) -> None: + if self.doc_loader is None: + return for i, r in enumerate(hints): sl = SourceLine(hints, i, ValidationException) with sl: - if ( - avsc_names.get_name(cast(str, r["class"]), None) is not None - and self.doc_loader is not None - ): - plain_hint = dict( - (key, r[key]) + classname = cast(str, r["class"]) + avroname = classname + if classname in self.doc_loader.vocab: + avroname = avro_type_name(self.doc_loader.vocab[classname]) + if avsc_names.get_name(avroname, None) is not None: + plain_hint = { + key: r[key] for key in r if key not in self.doc_loader.identifiers - ) # strip identifiers + } # strip identifiers validate_ex( cast( Schema, - avsc_names.get_name(cast(str, plain_hint["class"]), None), + avsc_names.get_name(avroname, None), ), plain_hint, strict=strict, + vocab=self.doc_loader.vocab, ) elif r["class"] in ("NetworkAccess", "LoadListingRequirement"): pass @@ -1061,7 +1091,7 @@ def uniquename(stem: str, names: Optional[Set[str]] = None) -> str: u = stem while u in names: c += 1 - u = "%s_%s" % (stem, c) + u = f"{stem}_{c}" names.add(u) return u @@ -1153,7 +1183,21 @@ def scandeps( if doc["class"] == "Directory" and "listing" in doc: deps["listing"] = doc["listing"] if doc["class"] == "File" and "secondaryFiles" in doc: - deps["secondaryFiles"] = doc["secondaryFiles"] + deps["secondaryFiles"] = cast( + CWLOutputAtomType, + scandeps( + base, + cast( + Union[CWLObjectType, MutableSequence[CWLObjectType]], + doc["secondaryFiles"], + ), + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + ), + ) if nestdirs: deps = nestdir(base, deps) r.append(deps) diff --git a/cwltool/singularity.py b/cwltool/singularity.py index 43093bfbb..7de8ee565 100644 --- a/cwltool/singularity.py +++ b/cwltool/singularity.py @@ -5,7 +5,6 @@ import re import shutil import sys -from distutils import spawn from subprocess import ( # nosec DEVNULL, PIPE, @@ -20,17 +19,11 @@ from .builder import Builder from .context import RuntimeContext -from .errors import UnsupportedRequirement, WorkflowException +from .errors import WorkflowException from .job import ContainerCommandLineJob from .loghandler import _logger from .pathmapper import MapperEnt, PathMapper -from .utils import ( - CWLObjectType, - create_tmp_dir, - docker_windows_path_adjust, - ensure_non_writable, - ensure_writable, -) +from .utils import CWLObjectType, create_tmp_dir, ensure_non_writable, ensure_writable _USERNS = None # type: Optional[bool] _SINGULARITY_VERSION = "" @@ -50,6 +43,7 @@ def _singularity_supports_userns() -> bool: _USERNS = ( "No valid /bin/sh" in result or "/bin/sh doesn't exist in container" in result + or "executable file not found in" in result ) except TimeoutExpired: _USERNS = False @@ -99,9 +93,7 @@ def __init__( name: str, ) -> None: """Builder for invoking the Singularty software container engine.""" - super(SingularityCommandLineJob, self).__init__( - builder, joborder, make_path_mapper, requirements, hints, name - ) + super().__init__(builder, joborder, make_path_mapper, requirements, hints, name) @staticmethod def get_image( @@ -268,7 +260,7 @@ def get_from_requirements( (e.g. hello-world-latest.{img,sif}). """ - if not bool(spawn.find_executable("singularity")): + if not bool(shutil.which("singularity")): raise WorkflowException("singularity executable is not available") if not self.get_image(cast(Dict[str, str], r), pull_image, force_pull): @@ -282,19 +274,13 @@ def get_from_requirements( def append_volume( runtime: List[str], source: str, target: str, writable: bool = False ) -> None: - src = docker_windows_path_adjust(source) - dst = docker_windows_path_adjust(target) - writable_flag = "rw" if writable else "ro" - if ( - os.path.isfile(src) - and dst.endswith(os.path.basename(src)) - and writable_flag == "ro" - ): - src = os.path.dirname(src) - dst = os.path.dirname(dst) - bind_arg = f"--bind={src}:{dst}:{writable_flag}" - if bind_arg not in runtime: - runtime.append(bind_arg) + runtime.append("--bind") + # Mounts are writable by default, so 'rw' is optional and not + # supported (due to a bug) in some 3.6 series releases. + vol = f"{source}:{target}" + if not writable: + vol += ":ro" + runtime.append(vol) def add_file_or_directory_volume( self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str] @@ -383,6 +369,12 @@ def add_writable_directory_volume( self.append_volume(runtime, source, volume.target, writable=True) ensure_writable(source) + def _required_env(self) -> Dict[str, str]: + return { + "TMPDIR": self.CONTAINER_TMPDIR, + "HOME": self.builder.outdir, + } + def create_runtime( self, env: MutableMapping[str, str], runtime_context: RuntimeContext ) -> Tuple[List[str], Optional[str]]: @@ -394,31 +386,34 @@ def create_runtime( "exec", "--contain", "--ipc", + "--cleanenv", ] if _singularity_supports_userns(): runtime.append("--userns") else: runtime.append("--pid") + + container_HOME: Optional[str] = None if is_version_3_1_or_newer(): + # Remove HOME, as passed in a special way (restore it below) + container_HOME = self.environment.pop("HOME") runtime.append("--home") runtime.append( "{}:{}".format( - docker_windows_path_adjust(os.path.realpath(self.outdir)), - self.builder.outdir, + os.path.realpath(self.outdir), + container_HOME, ) ) else: - runtime.append( - "--bind={}:{}:rw".format( - docker_windows_path_adjust(os.path.realpath(self.outdir)), - self.builder.outdir, - ) - ) - tmpdir = "/tmp" # nosec - runtime.append( - "--bind={}:{}:rw".format( - docker_windows_path_adjust(os.path.realpath(self.tmpdir)), tmpdir + self.append_volume( + runtime, + os.path.realpath(self.outdir), + self.environment["HOME"], + writable=True, ) + + self.append_volume( + runtime, os.path.realpath(self.tmpdir), self.CONTAINER_TMPDIR, writable=True ) self.add_volumes( @@ -438,18 +433,18 @@ def create_runtime( ) runtime.append("--pwd") - runtime.append("%s" % (docker_windows_path_adjust(self.builder.outdir))) + runtime.append(self.builder.outdir) - if runtime_context.custom_net: - raise UnsupportedRequirement( - "Singularity implementation does not support custom networking" - ) - elif runtime_context.disable_net: - runtime.append("--net") - - env["SINGULARITYENV_TMPDIR"] = tmpdir - env["SINGULARITYENV_HOME"] = self.builder.outdir + if self.networkaccess: + if runtime_context.custom_net: + runtime.extend(["--net", "--network", runtime_context.custom_net]) + else: + runtime.extend(["--net", "--network", "none"]) for name, value in self.environment.items(): - env["SINGULARITYENV_{}".format(name)] = str(value) + env[f"SINGULARITYENV_{name}"] = str(value) + + if container_HOME: + # Restore HOME if we removed it above. + self.environment["HOME"] = container_HOME return (runtime, None) From 44116dd5dcfac2e2733fdeade7aae6e7091e1566 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Tue, 24 Aug 2021 17:59:47 -0700 Subject: [PATCH 08/11] Cruft. --- cwltool/job.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index d5a7a75b3..258dfda7f 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -855,12 +855,6 @@ def run( runtime = new_runtime img_id = self.bake_inputs_into_docker_container(str(img_id), copy_these_into_container) - # if not runtimeContext.singularity: - # copy_these_into_container, new_runtime = self.filter_out_docker_image_file_inputs(runtime, runtimeContext.docker_outdir) - # if copy_these_into_container: - # runtime = new_runtime - # img_id = self.bake_inputs_into_container(str(img_id), copy_these_into_container) - runtime.append(str(img_id)) monitor_function = None if cidfile: From af0c1da5ee45992224c2248c509ea798a3de0d2c Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Tue, 24 Aug 2021 18:01:10 -0700 Subject: [PATCH 09/11] Cruft. --- cwltool/job.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index 258dfda7f..c02d9bfb8 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -845,10 +845,12 @@ def run( # TODO: create a cross-platform function to check limit where it's run if len(''.join(runtime)) >= 2097152 - 2: if runtimeContext.singularity: - copy_these_into_container, new_runtime = self.filter_out_singularity_image_file_inputs(runtime) - if copy_these_into_container: - runtime = new_runtime - img_id = self.bake_inputs_into_docker_container(str(img_id), copy_these_into_container) + pass + # TODO: write the singularity equivalent + # copy_these_into_container, new_runtime = self.filter_out_singularity_image_file_inputs(runtime) + # if copy_these_into_container: + # runtime = new_runtime + # img_id = self.bake_inputs_into_docker_container(str(img_id), copy_these_into_container) else: copy_these_into_container, new_runtime = self.filter_out_docker_image_file_inputs(runtime) if copy_these_into_container: From abf614c58f3f3b110244af348e2d0b10e9090bb5 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Tue, 24 Aug 2021 18:01:47 -0700 Subject: [PATCH 10/11] Cruft. --- cwltool/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwltool/job.py b/cwltool/job.py index c02d9bfb8..baf5dddea 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -845,12 +845,12 @@ def run( # TODO: create a cross-platform function to check limit where it's run if len(''.join(runtime)) >= 2097152 - 2: if runtimeContext.singularity: - pass # TODO: write the singularity equivalent # copy_these_into_container, new_runtime = self.filter_out_singularity_image_file_inputs(runtime) # if copy_these_into_container: # runtime = new_runtime # img_id = self.bake_inputs_into_docker_container(str(img_id), copy_these_into_container) + pass else: copy_these_into_container, new_runtime = self.filter_out_docker_image_file_inputs(runtime) if copy_these_into_container: From f171a22dd4c400f84b4491e0080e65c9e7ee9573 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Tue, 19 Oct 2021 18:29:53 -0400 Subject: [PATCH 11/11] Update with new staging. --- cwltool/builder.py | 2 ++ cwltool/docker.py | 5 +++-- cwltool/job.py | 13 ++++++++++--- cwltool/pathmapper.py | 4 +++- cwltool/process.py | 13 +++++++++++++ cwltool/singularity.py | 12 ++++++------ 6 files changed, 37 insertions(+), 12 deletions(-) diff --git a/cwltool/builder.py b/cwltool/builder.py index d4b68581e..c5b4bb2c1 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -179,6 +179,7 @@ def __init__( tmpdir: str, stagedir: str, cwlVersion: str, + dockerstagedir: str = None, ) -> None: """Initialize this Builder.""" self.job = job @@ -209,6 +210,7 @@ def __init__( self.outdir = outdir self.tmpdir = tmpdir self.stagedir = stagedir + self.dockerstagedir = dockerstagedir self.cwlVersion = cwlVersion diff --git a/cwltool/docker.py b/cwltool/docker.py index e5e2ae45b..716604220 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -247,8 +247,9 @@ def append_volume( options.append("readonly") output = StringIO() csv.writer(output).writerow(options) - mount_arg = output.getvalue().strip() - runtime.append(f"--mount={mount_arg}") + mount_arg = f"--mount={output.getvalue().strip()}" + if mount_arg not in runtime: + runtime.append(mount_arg) # Unlike "--volume", "--mount" will fail if the volume doesn't already exist. if not os.path.exists(source): os.makedirs(source) diff --git a/cwltool/job.py b/cwltool/job.py index baf5dddea..19c7adde0 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -546,7 +546,6 @@ def run( else: if not os.path.exists(self.tmpdir): os.makedirs(self.tmpdir) - self._setup(runtimeContext) stage_files( @@ -696,19 +695,27 @@ def add_volumes( ) -> None: """Append volume mappings to the runtime option list.""" container_outdir = self.builder.outdir + dockerstagedir = self.builder.dockerstagedir + if dockerstagedir: + pathmapper.update(dockerstagedir, dockerstagedir, self.builder.stagedir, "Directory", True) for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): host_outdir_tgt = None # type: Optional[str] if vol.target.startswith(container_outdir + "/"): host_outdir_tgt = os.path.join( self.outdir, vol.target[len(container_outdir) + 1 :] ) - if not host_outdir_tgt and not any_path_okay: + if not host_outdir_tgt and not any_path_okay and key != dockerstagedir: raise WorkflowException( "No mandatory DockerRequirement, yet path is outside " "the designated output directory, also know as " "$(runtime.outdir): {}".format(vol) ) - if vol.type in ("File", "Directory"): + if vol.type == "File": + if dockerstagedir and vol.resolved.startswith(dockerstagedir): + pass + else: + self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) + elif vol.type == "Directory": self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) elif vol.type == "WritableFile": self.add_writable_file_volume( diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index c89fac5f4..fa54707db 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -169,7 +169,9 @@ def setup(self, referenced_files: List[CWLObjectType], basedir: str) -> None: stagedir = self.stagedir for fob in referenced_files: if self.separateDirs: - stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) + location = fob['location'] if not fob['location'].startswith('file://') else fob['location'][len('file://'):] + stagedir = os.path.join(self.stagedir, os.path.basename(os.path.dirname(location))) + # stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) self.visit( fob, stagedir, diff --git a/cwltool/process.py b/cwltool/process.py index 6f286b102..934bc4c44 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -772,6 +772,18 @@ def _init_job( % (self.metadata.get("cwlVersion"), INTERNAL_VERSION) ) + if os.environ.get('GROUP_STAGING') or True: + dockerstagedir = runtime_context.get_stagedir() + for input_keyname, input in joborder.items(): + if input.get('location') and input.get('class') == 'File': + location = str(input.get('location')) + location = location if not location.startswith('file://') else location[len('file://'):] + unique_staging_dir = os.path.join(dockerstagedir, str(uuid.uuid4())) + os.makedirs(unique_staging_dir, exist_ok=True) + new_uri = os.path.join(unique_staging_dir, input['basename']) + shutil.copyfile(location, new_uri) + input['location'] = new_uri + job = copy.deepcopy(joborder) make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess) @@ -911,6 +923,7 @@ def inc(d): # type: (List[int]) -> None tmpdir, stagedir, cwl_version, + dockerstagedir, ) bindings.extend( diff --git a/cwltool/singularity.py b/cwltool/singularity.py index 7de8ee565..ca4984f56 100644 --- a/cwltool/singularity.py +++ b/cwltool/singularity.py @@ -274,13 +274,13 @@ def get_from_requirements( def append_volume( runtime: List[str], source: str, target: str, writable: bool = False ) -> None: - runtime.append("--bind") - # Mounts are writable by default, so 'rw' is optional and not - # supported (due to a bug) in some 3.6 series releases. - vol = f"{source}:{target}" + mount_arg = f'--bind={source}:{target}' if not writable: - vol += ":ro" - runtime.append(vol) + # Mounts are writable by default, so 'rw' is optional and not + # supported (due to a bug) in some 3.6 series releases. + mount_arg += ":ro" + if vol not in runtime: + runtime.append(vol) def add_file_or_directory_volume( self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str]