diff --git a/cwlref-runner/README b/cwlref-runner/README index 324751be2..bb1920fa1 100644 --- a/cwlref-runner/README +++ b/cwlref-runner/README @@ -1,4 +1,4 @@ -This an optional companion package to "cwltool" which provides provides an +This an optional companion package to "cwltool" which provides an additional entry point under the alias "cwl-runner", which is the implementation-agnostic name for the default CWL interpreter installed on a host. diff --git a/cwlref-runner/setup.py b/cwlref-runner/setup.py index 98db70e36..1f6e4a914 100644 --- a/cwlref-runner/setup.py +++ b/cwlref-runner/setup.py @@ -4,22 +4,19 @@ from setuptools import setup, find_packages SETUP_DIR = os.path.dirname(__file__) -README = os.path.join(SETUP_DIR, 'README') +README = os.path.join(SETUP_DIR, "README") -setup(name='cwlref-runner', - version='1.0', - description='Common workflow language reference implementation', - long_description=open(README).read(), - author='Common workflow language working group', - author_email='common-workflow-language@googlegroups.com', - url="http://www.commonwl.org", - download_url="https://github.com/common-workflow-language/common-workflow-language", - license='Apache 2.0', - install_requires=[ - 'cwltool' - ], - entry_points={ - 'console_scripts': [ "cwl-runner=cwltool.main:main" ] - }, - zip_safe=True +setup( + name="cwlref-runner", + version="1.0", + description="Common workflow language reference implementation", + long_description=open(README).read(), + author="Common workflow language working group", + author_email="common-workflow-language@googlegroups.com", + url="http://www.commonwl.org", + download_url="https://github.com/common-workflow-language/common-workflow-language", + license="Apache 2.0", + install_requires=["cwltool"], + entry_points={"console_scripts": ["cwl-runner=cwltool.main:main"]}, + zip_safe=True, ) diff --git a/cwltool/argparser.py b/cwltool/argparser.py index efced5386..0673425b5 100644 --- a/cwltool/argparser.py +++ b/cwltool/argparser.py @@ -287,6 +287,24 @@ def arg_parser() -> argparse.ArgumentParser: type=str, ) + # TO DO: Not yet implemented + provgroup.add_argument( + "--no-data", # Maybe change to no-input and no-intermediate to ignore those kind of files?... + default=False, + action="store_true", + help="Disables the storage of input and output data files in provenence folder", + dest="no_data", + ) + + # TO DO: Not yet implemented + provgroup.add_argument( + "--no-input", # Maybe change to no-input and no-intermediate to ignore those kind of files?... + default=False, + action="store_true", + help="Disables the storage of input data files in provenence folder", + dest="no_input", + ) + printgroup = parser.add_mutually_exclusive_group() printgroup.add_argument( "--print-rdf", diff --git a/cwltool/builder.py b/cwltool/builder.py index 2ba1e6543..50f435e6d 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -573,6 +573,10 @@ def addsf( datum = cast(CWLObjectType, datum) ll = schema.get("loadListing") or self.loadListing if ll and ll != "no_listing": + # Debug show + for k in datum: + _logger.debug("Datum: %s: %s" % (k, datum[k])) + _logger.debug("----------------------------------------") get_listing( self.fs_access, datum, diff --git a/cwltool/cwlprov/__init__.py b/cwltool/cwlprov/__init__.py index b8ff8d14d..fbac64240 100644 --- a/cwltool/cwlprov/__init__.py +++ b/cwltool/cwlprov/__init__.py @@ -6,7 +6,11 @@ import re import uuid from getpass import getuser -from typing import IO, Any, Callable, Dict, List, Optional, Tuple, TypedDict, Union +from typing import IO, Any, Dict, List, Optional, Tuple, TypedDict, Union + +from cwltool.cwlprov.provenance_constants import Hasher + +from ..loghandler import _logger def _whoami() -> Tuple[str, str]: @@ -135,17 +139,16 @@ class AuthoredBy(TypedDict, total=False): def checksum_copy( src_file: IO[Any], dst_file: Optional[IO[Any]] = None, - hasher: Optional[Callable[[], "hashlib._Hash"]] = None, + hasher: Optional[str] = Hasher, buffersize: int = 1024 * 1024, ) -> str: """Compute checksums while copying a file.""" - # TODO: Use hashlib.new(Hasher_str) instead? if hasher: - checksum = hasher() + checksum = hashlib.new(hasher) else: from .provenance_constants import Hasher - checksum = Hasher() + checksum = hashlib.new(Hasher) contents = src_file.read(buffersize) if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"): temp_location = os.path.join(os.path.dirname(dst_file.name), str(uuid.uuid4())) @@ -158,6 +161,34 @@ def checksum_copy( pass if os.path.exists(temp_location): os.rename(temp_location, dst_file.name) # type: ignore + + return content_processor(contents, src_file, dst_file, checksum, buffersize) + + +def checksum_only( + src_file: IO[Any], + dst_file: Optional[IO[Any]] = None, + hasher: str = Hasher, + buffersize: int = 1024 * 1024, +) -> str: + """Calculate the checksum only, does not copy the data files.""" + if dst_file is not None: + _logger.error( + "[Debug Checksum Only] Destination file should be None but it is %s", dst_file + ) + checksum = hashlib.new(hasher) + contents = src_file.read(buffersize) + return content_processor(contents, src_file, dst_file, checksum, buffersize) + + +def content_processor( + contents: Any, + src_file: IO[Any], + dst_file: Optional[IO[Any]], + checksum: "hashlib._Hash", + buffersize: int, +) -> str: + """Calculate the checksum based on the content.""" while contents != b"": if dst_file is not None: dst_file.write(contents) diff --git a/cwltool/cwlprov/provenance_constants.py b/cwltool/cwlprov/provenance_constants.py index ec047df38..bce87337a 100644 --- a/cwltool/cwlprov/provenance_constants.py +++ b/cwltool/cwlprov/provenance_constants.py @@ -1,4 +1,3 @@ -import hashlib import os import uuid @@ -18,7 +17,12 @@ # Research Object folders METADATA = "metadata" +# sub-folders for data DATA = "data" +INPUT_DATA = "data/input" +INTM_DATA = "data/intermediate" +OUTPUT_DATA = "data/output" + WORKFLOW = "workflow" SNAPSHOT = "snapshot" # sub-folders @@ -43,10 +47,11 @@ # sha1, compatible with the File type's "checksum" field # e.g. "checksum" = "sha1$47a013e660d408619d894b20806b1d5086aab03b" # See ./cwltool/schemas/v1.0/Process.yml -Hasher = hashlib.sha1 SHA1 = "sha1" SHA256 = "sha256" SHA512 = "sha512" +# set the default hash function as SHA1 for hashlib.new +Hasher = SHA1 # TODO: Better identifiers for user, at least # these should be preserved in ~/.config/cwl for every execution diff --git a/cwltool/cwlprov/provenance_profile.py b/cwltool/cwlprov/provenance_profile.py index ce8d63ad4..16be3a9ad 100644 --- a/cwltool/cwlprov/provenance_profile.py +++ b/cwltool/cwlprov/provenance_profile.py @@ -31,6 +31,8 @@ from ..stdfsaccess import StdFsAccess from ..utils import CWLObjectType, JobsType, get_listing, posix_path, versionstring from ..workflow_job import WorkflowJob + +# from . import provenance_constants from .provenance_constants import ( ACCOUNT_UUID, CWLPROV, @@ -43,11 +45,14 @@ SCHEMA, SHA1, SHA256, + Hasher, TEXT_PLAIN, UUID, WF4EVER, WFDESC, WFPROV, + INPUT_DATA, + OUTPUT_DATA, ) from .writablebagfile import create_job, write_bag_file # change this later @@ -111,6 +116,8 @@ def __init__( _logger.debug("[provenance] Creator Full name: %s", self.full_name) self.workflow_run_uuid = run_uuid or uuid.uuid4() self.workflow_run_uri = self.workflow_run_uuid.urn + # default to input data, now only INPUT_DATA and OUTPUT_DATA are possible values + self.current_data_source = INPUT_DATA self.generate_prov_doc() def __str__(self) -> str: @@ -118,7 +125,15 @@ def __str__(self) -> str: return f"ProvenanceProfile <{self.workflow_run_uri}> in <{self.research_object}>" def generate_prov_doc(self) -> Tuple[str, ProvDocument]: - """Add basic namespaces.""" + """Generate a provenance document. + + This method adds basic namespaces to the provenance document and records host provenance. + It also adds information about the cwltool version, namespaces for various entities, + and creates agents, activities, and associations to represent the workflow execution. + + Returns: + A tuple containing the workflow run URI and the generated ProvDocument. + """ def host_provenance(document: ProvDocument) -> None: """Record host provenance.""" @@ -152,7 +167,7 @@ def host_provenance(document: ProvDocument) -> None: # https://tools.ietf.org/html/draft-thiemann-hash-urn-01 # TODO: Change to nih:sha-256; hashes # https://tools.ietf.org/html/rfc6920#section-7 - self.document.add_namespace("data", "urn:hash::sha1:") + self.document.add_namespace("data", f"urn:hash::{Hasher}:") # Also needed for docker images self.document.add_namespace(SHA256, "nih:sha-256;") @@ -287,6 +302,7 @@ def record_process_end( process_run_id: str, outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None], when: datetime.datetime, + # load_listing: None, ) -> None: self.generate_output_prov(outputs, process_run_id, process_name) self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) @@ -300,14 +316,19 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st if "checksum" in value: csum = cast(str, value["checksum"]) (method, checksum) = csum.split("$", 1) - if method == SHA1 and self.research_object.has_data_file(checksum): + # TODO intermediate file?... + if method == SHA1 and self.research_object.has_data_file( + self.current_data_source, checksum + ): entity = self.document.entity("data:" + checksum) if not entity and "location" in value: location = str(value["location"]) # If we made it here, we'll have to add it to the RO with self.fsaccess.open(location, "rb") as fhandle: - relative_path = self.research_object.add_data_file(fhandle) + relative_path = self.research_object.add_data_file( + fhandle, current_source=self.current_data_source + ) # FIXME: This naively relies on add_data_file setting hash as filename checksum = PurePath(relative_path).name entity = self.document.entity("data:" + checksum, {PROV_TYPE: WFPROV["Artifact"]}) @@ -408,8 +429,10 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity: # a later call to this method will sort that is_empty = True - if "listing" not in value: - get_listing(self.fsaccess, value) + # get loadlisting, and populate the listing of value if not no_listing, recursively if deep_listing + ll = value.get("loadListing") + if ll and ll != "no_listing": + get_listing(self.fsaccess, value, (ll == "deep_listing")) for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])): is_empty = False # Declare child-artifacts @@ -472,7 +495,9 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity: def declare_string(self, value: str) -> Tuple[ProvEntity, str]: """Save as string in UTF-8.""" byte_s = BytesIO(str(value).encode(ENCODING)) - data_file = self.research_object.add_data_file(byte_s, content_type=TEXT_PLAIN) + data_file = self.research_object.add_data_file( + byte_s, current_source=self.current_data_source, content_type=TEXT_PLAIN + ) checksum = PurePosixPath(data_file).name # FIXME: Don't naively assume add_data_file uses hash in filename! data_id = f"data:{PurePosixPath(data_file).stem}" @@ -505,7 +530,9 @@ def declare_artefact(self, value: Any) -> ProvEntity: if isinstance(value, bytes): # If we got here then we must be in Python 3 byte_s = BytesIO(value) - data_file = self.research_object.add_data_file(byte_s) + data_file = self.research_object.add_data_file( + byte_s, current_source=self.current_data_source + ) # FIXME: Don't naively assume add_data_file uses hash in filename! data_id = f"data:{PurePosixPath(data_file).stem}" return self.document.entity( @@ -604,6 +631,7 @@ def used_artefacts( job_order: Union[CWLObjectType, List[CWLObjectType]], process_run_id: str, name: Optional[str] = None, + # load_listing=None, ) -> None: """Add used() for each data artefact.""" if isinstance(job_order, list): @@ -634,7 +662,17 @@ def generate_output_prov( process_run_id: Optional[str], name: Optional[str], ) -> None: - """Call wasGeneratedBy() for each output,copy the files into the RO.""" + """ + Call wasGeneratedBy() for each output, copy the files into the RO. + + To save output data in ro.py add_data_file() method, + use a var current_data_source to keep track of whether it's + input or output (maybe intermediate in the future) data + it is later injected to add_data_file() method to save the data in the correct folder, + thus avoid changing the provenance_constants DATA + """ + self.current_data_source = OUTPUT_DATA + if isinstance(final_output, MutableSequence): for entry in final_output: self.generate_output_prov(entry, process_run_id, name) @@ -660,6 +698,7 @@ def generate_output_prov( self.document.wasGeneratedBy( entity, process_run_id, timestamp, None, {"prov:role": role} ) + # return current_data_source def prospective_prov(self, job: JobsType) -> None: """Create prospective prov recording as wfdesc prov:Plan.""" @@ -733,6 +772,8 @@ def finalize_prov_profile(self, name: Optional[str]) -> List[QualifiedName]: # TODO: Also support other profiles than CWLProv, e.g. ProvOne # list of prov identifiers of provenance files + # NOTE: prov_ids are file names prepared for provenance/RO files in + # metadata/provenance for each sub-workflow of main workflow prov_ids = [] # https://www.w3.org/TR/prov-xml/ diff --git a/cwltool/cwlprov/ro.py b/cwltool/cwlprov/ro.py index 7c6eaf5d6..1c2df9591 100644 --- a/cwltool/cwlprov/ro.py +++ b/cwltool/cwlprov/ro.py @@ -1,7 +1,8 @@ """Stores class definition of ResearchObject and WritableBagFile.""" import datetime -import hashlib + +# import hashlib import os import shutil import tempfile @@ -35,11 +36,19 @@ posix_path, versionstring, ) -from . import Aggregate, Annotation, AuthoredBy, _valid_orcid, _whoami, checksum_copy +from . import ( + Aggregate, + Annotation, + AuthoredBy, + _valid_orcid, + _whoami, + checksum_copy, + checksum_only, + provenance_constants, +) from .provenance_constants import ( ACCOUNT_UUID, CWLPROV_VERSION, - DATA, ENCODING, FOAF, LOGS, @@ -55,6 +64,9 @@ UUID, WORKFLOW, Hasher, + INPUT_DATA, + INTM_DATA, # NOT USED + OUTPUT_DATA, ) @@ -67,6 +79,8 @@ def __init__( temp_prefix_ro: str = "tmp", orcid: str = "", full_name: str = "", + no_data: bool = False, + no_input: bool = False, ) -> None: """Initialize the ResearchObject.""" self.temp_prefix = temp_prefix_ro @@ -89,6 +103,8 @@ def __init__( self.cwltool_version = f"cwltool {versionstring().split()[-1]}" self.has_manifest = False self.relativised_input_object: CWLObjectType = {} + self.no_data = no_data + self.no_input = no_input self._initialize() _logger.debug("[provenance] Temporary research object: %s", self.folder) @@ -109,7 +125,9 @@ def _initialize(self) -> None: """Initialize the bagit folder structure.""" for research_obj_folder in ( METADATA, - DATA, + INPUT_DATA, + INTM_DATA, # NOT POPULATED + OUTPUT_DATA, WORKFLOW, SNAPSHOT, PROVENANCE, @@ -182,13 +200,13 @@ def add_tagfile(self, path: str, timestamp: Optional[datetime.datetime] = None) # Below probably OK for now as metadata files # are not too large..? - checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1) + checksums[SHA1] = checksum_copy(tag_file, hasher=SHA1) tag_file.seek(0) - checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256) + checksums[SHA256] = checksum_copy(tag_file, hasher=SHA256) tag_file.seek(0) - checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512) + checksums[SHA512] = checksum_copy(tag_file, hasher=SHA512) rel_path = posix_path(os.path.relpath(path, self.folder)) self.tagfiles.add(rel_path) @@ -336,6 +354,7 @@ def guess_mediatype( return aggregates def add_uri(self, uri: str, timestamp: Optional[datetime.datetime] = None) -> Aggregate: + """Add external URI to the Research Object.""" self.self_check() aggr: Aggregate = {"uri": uri} aggr["createdOn"], aggr["createdBy"] = self._self_made(timestamp=timestamp) @@ -450,7 +469,7 @@ def generate_snapshot(self, prov_dep: CWLObjectType) -> None: else: shutil.copy(filepath, path) timestamp = datetime.datetime.fromtimestamp(os.path.getmtime(filepath)) - self.add_tagfile(path, timestamp) + self.add_tagfile(path, timestamp) # add snapshots as tag files to the RO except PermissionError: pass # FIXME: avoids duplicate snapshotting; need better solution elif key in ("secondaryFiles", "listing"): @@ -460,50 +479,93 @@ def generate_snapshot(self, prov_dep: CWLObjectType) -> None: else: pass - def has_data_file(self, sha1hash: str) -> bool: + def has_data_file(self, location: str, sha1hash: str) -> bool: """Confirm the presence of the given file in the RO.""" - folder = os.path.join(self.folder, DATA, sha1hash[0:2]) + folder = os.path.join(self.folder, location, sha1hash[0:2]) hash_path = os.path.join(folder, sha1hash) return os.path.isfile(hash_path) def add_data_file( self, from_fp: IO[Any], + current_source: str = INPUT_DATA, timestamp: Optional[datetime.datetime] = None, content_type: Optional[str] = None, ) -> str: - """Copy inputs to data/ folder.""" + """ + Copy data files to data/ folder. + + current_sourcw is the destination of the incoming file, e.g. "data/input" or "data/output" + """ + # This also copies the outputs via declare_artefacts -> generate_output_prov + # Skip certain files if no-input or no-data is used self.self_check() tmp_dir, tmp_prefix = os.path.split(self.temp_prefix) - with tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir, delete=False) as tmp: - checksum = checksum_copy(from_fp, tmp) - - # Calculate hash-based file path - folder = os.path.join(self.folder, DATA, checksum[0:2]) - path = os.path.join(folder, checksum) - # os.rename assumed safe, as our temp file should - # be in same file system as our temp folder - if not os.path.isdir(folder): - os.makedirs(folder) - os.rename(tmp.name, path) - - # Relative posix path - rel_path = posix_path(os.path.relpath(path, self.folder)) - - # Register in bagit checksum - if Hasher == hashlib.sha1: - self._add_to_bagit(rel_path, sha1=checksum) + if self.no_data: + checksum = checksum_only(from_fp) + # Create rel_path + folder = os.path.join(self.folder, current_source, checksum[0:2]) + path = os.path.join(folder, checksum) + # Relative posix path + rel_path = posix_path(os.path.relpath(path, self.folder)) + elif self.no_input and current_source == INPUT_DATA: + # for now do the same as no_data when no_input is used for input files + checksum = checksum_only(from_fp) + # Create rel_path + folder = os.path.join(self.folder, current_source, checksum[0:2]) + path = os.path.join(folder, checksum) + # Relative posix path + rel_path = posix_path(os.path.relpath(path, self.folder)) else: - _logger.warning("[provenance] Unknown hash method %s for bagit manifest", Hasher) - # Inefficient, bagit support need to checksum again - self._add_to_bagit(rel_path) - _logger.debug("[provenance] Added data file %s", path) + # calculate checksum and copy file to a tmp location + with tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir, delete=False) as tmp: + checksum = checksum_copy(from_fp, tmp) + folder = os.path.join(self.folder, current_source, checksum[0:2]) + path = os.path.join(folder, checksum) + if not os.path.isdir(folder): + os.makedirs(folder) + # Only rename when neither no data and no input is used + os.rename(tmp.name, path) + _logger.debug( + "Renaming %s to %s", tmp.name, path + ) # path is still a temp dir but in "data/input/checksum last 2 digit/checksum" + + # Relative posix path + rel_path = posix_path(os.path.relpath(path, self.folder)) + + # Register in bagit checksum + if Hasher == SHA1: + self._add_to_bagit( + rel_path, sha1=checksum + ) # that is actually saving the file to the prov RO folder + else: + _logger.warning( + "[provenance] Unknown hash method %s for bagit manifest", Hasher + ) + # Inefficient, bagit support need to checksum again + self._add_to_bagit(rel_path) + # check if self.relativised_input_object is dict + if isinstance(self.relativised_input_object, MutableMapping): + # check if "dir" exist and is a dict + if "dir" in self.relativised_input_object and isinstance( + self.relativised_input_object["dir"], MutableMapping + ): + # now safe to access "basename" key + JustABasename = self.relativised_input_object["dir"]["basename"] + _logger.debug( + "[provenance] Directory :%s", + JustABasename, + ) + else: + _logger.debug("[provenance] Added data file %s", path) + if timestamp is not None: createdOn, createdBy = self._self_made(timestamp) self._file_provenance[rel_path] = cast( Aggregate, {"createdOn": createdOn, "createdBy": createdBy} ) _logger.debug("[provenance] Relative path for data file %s", rel_path) + # This is an output hash if content_type is not None: self._content_types[rel_path] = content_type @@ -520,7 +582,11 @@ def _self_made( ) def add_to_manifest(self, rel_path: str, checksums: Dict[str, str]) -> None: - """Add files to the research object manifest.""" + """ + Add files to the research object manifest. + + Data files are added to manifest regardless of the state of no_data/no_input flag. + """ self.self_check() if PurePosixPath(rel_path).is_absolute(): raise ValueError(f"rel_path must be relative: {rel_path}") @@ -545,7 +611,11 @@ def add_to_manifest(self, rel_path: str, checksums: Dict[str, str]) -> None: checksum_file.write(line) def _add_to_bagit(self, rel_path: str, **checksums: str) -> None: - """Compute file size and checksums and adds to bagit manifest.""" + """ + Compute data file size and checksums and adds to bagit manifest. + + NOTE: THIS IS WHERE DATAFILE COPYING REALLY HAPPENS WITH checksum_copy + """ if PurePosixPath(rel_path).is_absolute(): raise ValueError(f"rel_path must be relative: {rel_path}") lpath = os.path.join(self.folder, local_path(rel_path)) @@ -562,7 +632,15 @@ def _add_to_bagit(self, rel_path: str, **checksums: str) -> None: checksums = dict(checksums) with open(lpath, "rb") as file_path: # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile? - checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) + if ( + self.no_input + and os.path.commonprefix([provenance_constants.INPUT_DATA, rel_path]) + == provenance_constants.INPUT_DATA + ): + checksums[SHA1] = checksum_only(file_path, hasher=SHA1) + _logger.debug(f"[provenance] No input - skipped copying: {rel_path}") + else: + checksums[SHA1] = checksum_copy(file_path, hasher=SHA1) self.add_to_manifest(rel_path, checksums) @@ -570,6 +648,8 @@ def _relativise_files( self, structure: Union[CWLObjectType, CWLOutputType, MutableSequence[CWLObjectType]], ) -> None: + # TODO - Are there only input files arriving here? + """Save any file objects into the RO and update the local paths.""" # Base case - we found a File we need to update _logger.debug("[provenance] Relativising: %s", structure) @@ -584,14 +664,19 @@ def _relativise_files( raise TypeError( f"Only SHA1 CWL checksums are currently supported: {structure}" ) - if self.has_data_file(checksum): + + if self.has_data_file(provenance_constants.INPUT_DATA, checksum): prefix = checksum[0:2] - relative_path = PurePosixPath("data") / prefix / checksum + relative_path = PurePosixPath("data/input") / prefix / checksum if not (relative_path is not None and "location" in structure): # Register in RO; but why was this not picked # up by used_artefacts? - _logger.info("[provenance] Adding to RO %s", structure["location"]) + _logger.info( + "[provenance] Adding to RO '%s' > %s", + structure["basename"], + structure["location"], + ) with self.fsaccess.open(cast(str, structure["location"]), "rb") as fp: relative_path = self.add_data_file(fp) checksum = PurePosixPath(relative_path).name diff --git a/cwltool/cwlprov/writablebagfile.py b/cwltool/cwlprov/writablebagfile.py index d5ff3c731..313c87120 100644 --- a/cwltool/cwlprov/writablebagfile.py +++ b/cwltool/cwlprov/writablebagfile.py @@ -55,14 +55,14 @@ def __init__(self, research_object: "ResearchObject", rel_path: str) -> None: def write(self, b: Any) -> int: """Write some content to the Bag.""" real_b = b if isinstance(b, (bytes, mmap, array)) else b.encode("utf-8") - total = 0 + total = 0 # to record the total bytes written length = len(real_b) while total < length: ret = super().write(real_b) if ret: total += ret for val in self.hashes.values(): - val.update(real_b) + val.update(real_b) # update hash with the written content return total def close(self) -> None: @@ -238,13 +238,17 @@ def packed_workflow(research_object: "ResearchObject", packed: str) -> None: def create_job( - research_object: "ResearchObject", builder_job: CWLObjectType, is_output: bool = False + research_object: "ResearchObject", + builder_job: CWLObjectType, + is_output: bool = False, ) -> CWLObjectType: # TODO customise the file """Generate the new job object with RO specific relative paths.""" copied = copy.deepcopy(builder_job) relativised_input_objecttemp: CWLObjectType = {} - research_object._relativise_files(copied) + # No inputs when either no_input or no_data is True + if not research_object.no_input and not research_object.no_data: + research_object._relativise_files(copied) def jdefault(o: Any) -> Dict[Any, Any]: return dict(o) diff --git a/cwltool/cwlviewer.py b/cwltool/cwlviewer.py index e544a568e..24eccc17c 100644 --- a/cwltool/cwlviewer.py +++ b/cwltool/cwlviewer.py @@ -64,6 +64,12 @@ def _set_inner_edges(self) -> None: ) n.set_name(str(inner_edge_row["source_step"])) self._dot_graph.add_node(n) + + # Add the edge with a label specifying the output and input files + edge_from_output_of = urlparse(inner_edge_row["output_ref"]).fragment + edge_to_input_of = urlparse(inner_edge_row["target_ref_input"]).fragment + edge_label = f"{edge_from_output_of} -> {edge_to_input_of}" + target_label = ( inner_edge_row["target_label"] if inner_edge_row["target_label"] is not None @@ -91,6 +97,11 @@ def _set_inner_edges(self) -> None: pydot.Edge( str(inner_edge_row["source_step"]), str(inner_edge_row["target_step"]), + label=edge_label, + fontsize="10", # set the font size for edge labels + fontcolor="blue", # set the font color for edge labels + color="black", # set the edge color + arrowsize="0.7" # set the arrow size ) ) @@ -175,8 +186,29 @@ def _init_dot_graph(cls) -> pydot.Graph: graph.set("bgcolor", "#eeeeee") graph.set("clusterrank", "local") graph.set("labelloc", "bottom") - graph.set("labelloc", "bottom") graph.set("labeljust", "right") + graph.set("nodesep", "0.75") # Increase the space between nodes + graph.set("ranksep", "1.25") # Increase the space between ranks + graph.set("splines", "true") # Use smooth splines for edges + + # Additional styling for nodes + graph.set_node_defaults( + style="filled", + fillcolor="lightgrey", + shape="box", + fontname="Helvetica", + fontsize="14", + margin="0.2,0.1" + ) + + # Additional styling for edges + graph.set_edge_defaults( + color="black", + arrowhead="normal", + fontname="Helvetica", + fontsize="10", + fontcolor="blue" + ) return graph diff --git a/cwltool/job.py b/cwltool/job.py index 817cb04c0..8a0d83090 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -287,7 +287,10 @@ def _execute( and isinstance(job_order, (list, dict)) ): runtimeContext.prov_obj.used_artefacts( - job_order, runtimeContext.process_run_id, str(self.name) + job_order, + runtimeContext.process_run_id, + str(self.name), + # load_listing=self.builder.loadListing, ) else: _logger.warning( @@ -413,6 +416,7 @@ def stderr_stdout_log_path( runtimeContext.process_run_id, outputs, datetime.datetime.now(), + # builder.loadListing # TODO FIX THIS ) if processStatus != "success": _logger.warning("[job %s] completed %s", self.name, processStatus) diff --git a/cwltool/main.py b/cwltool/main.py index 30f299f09..41d258f74 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -11,6 +11,7 @@ import signal import subprocess # nosec import sys +import tempfile import time import urllib import warnings @@ -702,6 +703,8 @@ def setup_provenance( temp_prefix_ro=args.tmpdir_prefix, orcid=args.orcid, full_name=args.cwl_full_name, + no_data=args.no_data, + no_input=args.no_input, ) runtimeContext.research_obj = ro log_file_io = open_log_file_for_activity(ro, ro.engine_uuid) @@ -1152,12 +1155,28 @@ def main( print(f"{args.workflow} is valid CWL.", file=stdout) return 0 - if args.print_rdf: + if args.print_rdf or args.provenance: + output = stdout + if args.provenance: + # Write workflow to temp directory + temp_workflow_dir = tempfile.TemporaryDirectory() + os.makedirs(temp_workflow_dir.name, exist_ok=True) + workflow_provenance = temp_workflow_dir.name + "/workflow.ttl" + # Sets up a turtle file for the workflow information + # (not yet in the provenance folder as it does + # not exist and creating it will give issues). + output = open(workflow_provenance, "w") + _logger.info("Writing workflow rdf to %s", workflow_provenance) print( printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer), - file=stdout, + file=output, ) - return 0 + # close the output + if args.provenance: + output.close() + # Only print_rdf exits this way + if args.print_rdf: + return 0 if args.print_dot: printdot(tool, loadingContext.loader.ctx, stdout) diff --git a/docs/conf.py b/docs/conf.py index 6e04b5d64..4be78f80f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -7,6 +7,7 @@ # -- Path setup -------------------------------------------------------------- import importlib.metadata + # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 83eb61c22..2f0359ae6 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -4,7 +4,7 @@ import sys import urllib from pathlib import Path -from typing import IO, Any, Generator, cast +from typing import IO, Any, Generator, cast, Tuple, Dict import arcp import bagit @@ -191,6 +191,7 @@ def test_directory_workflow(tmp_path: Path) -> None: file_list = ( folder / "data" + / "output" / "3c" / "3ca69e8d6c234a469d16ac28a4a658c92267c423" # checksum as returned from: @@ -203,9 +204,12 @@ def test_directory_workflow(tmp_path: Path) -> None: # even if they were inside a class: Directory for letter, l_hash in sha1.items(): prefix = l_hash[:2] # first 2 letters - p = folder / "data" / prefix / l_hash + p = folder / "data" / "output" / prefix / l_hash assert p.is_file(), f"Could not find {letter} as {p}" + # List content + list_files(tmp_path) + @needs_docker def test_no_data_files(tmp_path: Path) -> None: @@ -219,7 +223,7 @@ def test_no_data_files(tmp_path: Path) -> None: def check_output_object(base_path: Path) -> None: output_obj = base_path / "workflow" / "primary-output.json" compare_checksum = "sha1$b9214658cc453331b62c2282b772a5c063dbd284" - compare_location = "../data/b9/b9214658cc453331b62c2282b772a5c063dbd284" + compare_location = "../data/input/b9/b9214658cc453331b62c2282b772a5c063dbd284" with open(output_obj) as fp: out_json = json.load(fp) f1 = out_json["sorted_output"] @@ -231,13 +235,14 @@ def check_secondary_files(base_path: Path) -> None: foo_data = ( base_path / "data" + / "input" / "0b" / "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33" # checksum as returned from: # $ echo -n foo | sha1sum # 0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33 - ) - bar_data = base_path / "data" / "62" / "62cdb7020ff920e5aa642c3d4066950dd1f01f4d" + bar_data = base_path / "data" / "input" / "62" / "62cdb7020ff920e5aa642c3d4066950dd1f01f4d" assert foo_data.is_file(), "Did not capture file.txt 'foo'" assert bar_data.is_file(), "Did not capture secondary file.txt.idx 'bar" @@ -246,13 +251,13 @@ def check_secondary_files(base_path: Path) -> None: job_json = json.load(fp) # TODO: Verify secondaryFile in primary-job.json f1 = job_json["file1"] - assert f1["location"] == "../data/0b/0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33" + assert f1["location"] == "../data/input/0b/0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33" assert f1["basename"] == "foo1.txt" secondaries = f1["secondaryFiles"] assert secondaries f1idx = secondaries[0] - assert f1idx["location"] == "../data/62/62cdb7020ff920e5aa642c3d4066950dd1f01f4d" + assert f1idx["location"] == "../data/input/62/62cdb7020ff920e5aa642c3d4066950dd1f01f4d" assert f1idx["basename"], "foo1.txt.idx" @@ -787,3 +792,207 @@ def test_research_object() -> None: def test_research_object_picklability(research_object: ResearchObject) -> None: """Research object may need to be pickled (for Toil).""" assert pickle.dumps(research_object) is not None + + +# Function to list filestructure +def list_files(startpath: Path) -> None: + startpath = str(startpath) + print("Root: ", startpath) + for root, _dirs, files in os.walk(startpath): + level = root.replace(startpath, "").count(os.sep) + indent = " " * 4 * (level) + print("{}{}/".format(indent, os.path.basename(root))) + subindent = " " * 4 * (level + 1) + for f in files: + print("{}{}".format(subindent, f)) + + +@needs_docker +def test_directory_workflow_no_listing(tmp_path: Path) -> None: + """ + This test will check for 3 files that should be there and 3 files that should not be there. + @param tmp_path: + """ + sha1, dir2, dir3, dir4 = prepare_input_files(tmp_path) + + # Run the workflow + folder = cwltool( + tmp_path, + # CWL Arguments + "--debug", + # Workflow arguments + get_data("tests/wf/directory_no_listing.cwl"), + "--dir_deep_listing", + str(dir2), + "--dir_no_listing", + str(dir3), + "--dir_no_info", + str(dir4), + ) + + # Visualize the path structure + list_files(tmp_path) + + # Output should include ls stdout of filenames a b c on each line + file_list = ( + folder + / "data" + / "output" + / "3c" + / "3ca69e8d6c234a469d16ac28a4a658c92267c423" + # checksum as returned from: + # echo -e "a\nb\nc" | sha1sum + # 3ca69e8d6c234a469d16ac28a4a658c92267c423 - + # , + # folder + # / "data" + # / "input" + # / "c6" + # / "c632ab5419b6b03f3fd31a0d29e70148c675bd80" + ) + + assert file_list.is_file() + + # Input files should be captured by hash value, + # even if they were inside a class: Directory + for f, file_hash in sha1.items(): + prefix = file_hash[:2] # first 2 letters + p = folder / "data" / "input" / prefix / file_hash + # File should be empty and in the future not existing... + # assert os.path.getsize(p.absolute()) == 0 + # To be discarded when file really does not exist anymore + if f in ["d", "e", "f", "g", "h", "i"]: + print(f"Analysing file {f}") + assert not p.is_file(), f"Could find {f} as {p}" + else: + print(f"Analysing file {f}") + assert p.is_file(), f"Could not find {f} as {p}" + + +def prepare_input_files(tmp_path: Path) -> Tuple[Dict[str, str], Path, Path, Path]: + sha1 = { + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in a b c ; do echo -n $x | sha1sum ; done + "a": "86f7e437faa5a7fce15d1ddcb9eaeaea377667b8", + "b": "e9d71f5ee7c92d6dc9e92ffdad17b8bd49418f98", + "c": "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4", + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in d e f ; do echo -n $x | sha1sum ; done + "d": "3c363836cf4e16666669a25da280a1865c2d2874", + "e": "58e6b3a414a1e090dfc6029add0f3555ccba127f", + "f": "4a0a19218e082a343a1b17e5333409af9d98f0f5", + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in g h i ; do echo -n $x | sha1sum ; done + "g": "54fd1711209fb1c0781092374132c66e79e2241b", + "h": "27d5482eebd075de44389774fce28c69f45c8a75", + "i": "042dc4512fa3d391c5170cf3aa61e6a638f84342", + } + + dir2 = tmp_path / "dir_deep_listing" + dir2.mkdir() + for x in "abc": + # Make test files with predictable hashes + with open(dir2 / x, "w", encoding="ascii") as f: + f.write(x) + + dir3 = tmp_path / "dir_no_listing" + dir3.mkdir() + + for x in "def": + # Make test files with predictable hashes + with open(dir3 / x, "w", encoding="ascii") as f: + f.write(x) + # Temporarily generate 10.000 files to test performance + # for i in range(10): # 000): + # with open(dir3 / f"{x}_{i}", "w", encoding="ascii") as f: + # f.write(x) + # print("Created 10.000 files in dir_no_listing") + # list_files(dir3) + + dir4 = tmp_path / "dir_no_info" + dir4.mkdir() + + for x in "ghi": + # Make test files with predictable hashes + with open(dir4 / x, "w", encoding="ascii") as f: + f.write(x) + + return sha1, dir2, dir3, dir4 + + +@needs_docker +def test_directory_workflow_no_listing_no_input(tmp_path: Path) -> None: + """ + This test will check for 3 files that should be there and 3 files that should not be there. + In addition, it will not copy the input files due to the --no-input flag. + @param tmp_path: + """ + # TODO no data is currently manually set + data_option = "--no-input" + + sha1, dir2, dir3, dir4 = prepare_input_files(tmp_path) + + # Run the workflow + folder = cwltool( + tmp_path, + # CWL Arguments + "--debug", + # No data argument based on boolean + data_option, + # Workflow arguments + get_data("tests/wf/directory_no_listing.cwl"), + "--dir_deep_listing", + str(dir2), + "--dir_no_listing", + str(dir3), + "--dir_no_info", + str(dir4), + ) + + # Visualize the path structure + list_files(tmp_path) + + # Output should include ls stdout of filenames a b c on each line + file_list = ( + folder + / "data" + / "output" + / "84" + / "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4" + # checksum as returned from: + # echo -e "a\nb\nc" | sha1sum + # 3ca69e8d6c234a469d16ac28a4a658c92267c423 - + ) + + assert file_list.is_file() + + # Input files should be in the provenance location + for f, file_hash in sha1.items(): + prefix = file_hash[:2] # first 2 letters + p = folder / "data" / prefix / file_hash + + if p.is_file(): + print(f"Analysing file {f!r} {p!r}") + with open(p, "r", encoding="ascii") as fh: + content = fh.read() + print(f"Content: {content!r}") + assert not p.is_file(), f"Could find {f!r} as {p!r}" + else: + assert not p.is_file(), f"Could find {f!r} as {p!r}" + + +def cwltool_no_data(tmp_path: Path, *args: Any) -> Path: + prov_folder = tmp_path / "provenance" + prov_folder.mkdir() + new_args = ["--enable-ext", "--no-data", "--provenance", str(prov_folder)] + new_args.extend(args) + # Run within a temporary directory to not pollute git checkout + tmp_dir = tmp_path / "cwltool-run" + tmp_dir.mkdir() + with working_directory(tmp_dir): + status = main(new_args) + assert status == 0, f"Failed: cwltool.main({args})" + return prov_folder diff --git a/tests/wf/directory_no_listing.cwl b/tests/wf/directory_no_listing.cwl new file mode 100644 index 000000000..b8fce7758 --- /dev/null +++ b/tests/wf/directory_no_listing.cwl @@ -0,0 +1,79 @@ +#!/usr/bin/env cwl-runner +cwlVersion: v1.2 +class: Workflow + +doc: > + Inspect provided directory and return filenames. + Generate a new directory and return it (including content). + +hints: + - class: DockerRequirement + dockerPull: docker.io/debian:stable-slim + +inputs: + dir_deep_listing: + type: Directory + loadListing: deep_listing + dir_no_listing: + type: Directory + loadListing: no_listing + dir_no_info: + type: Directory + + +steps: + ls: + in: + dir: dir_deep_listing + ignore: dir_no_listing + out: + [listing] + run: + class: CommandLineTool + baseCommand: ls + inputs: + dir: + type: Directory + inputBinding: + position: 1 +# ignore: +# type: Directory +# inputBinding: +# position: 2 + outputs: + listing: + type: stdout + + generate: + in: [] + out: + [dir1] + run: + class: CommandLineTool + requirements: + ShellCommandRequirement: {} + LoadListingRequirement: + loadListing: deep_listing + + arguments: + - shellQuote: false + valueFrom: > + pwd; + mkdir -p dir1/x/y; + echo -n x > dir1/x.txt; + echo -n y > dir1/x/y.txt; + echo -n z > dir1/x/y/z.txt; + inputs: [] + outputs: + dir1: + type: Directory + outputBinding: + glob: "dir1" + +outputs: + output_1: + type: File + outputSource: ls/listing + output_2: + type: Directory + outputSource: generate/dir1