# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions used for Toil's CWL interpreter."""
import logging
import os
from pathlib import PurePosixPath
import posixpath
import stat
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
MutableMapping,
MutableSequence,
Type,
TypeVar,
Union,
)
from toil.fileStores import FileID
from toil.fileStores.abstractFileStore import AbstractFileStore
from toil.jobStores.abstractJobStore import AbstractJobStore
logger = logging.getLogger(__name__)
# Customized CWL utilities
# What exit code do we need to bail with if we or any of the local jobs that
# parse workflow files see an unsupported feature?
CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE = 33
# And what error will make the worker exit with that code
[docs]
class CWLUnsupportedException(Exception):
"""Fallback exception."""
try:
import cwltool.errors
CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION: Union[
Type[cwltool.errors.UnsupportedRequirement], Type[CWLUnsupportedException]
] = cwltool.errors.UnsupportedRequirement
except ImportError:
CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION = CWLUnsupportedException
[docs]
def visit_top_cwl_class(
rec: Any, classes: Iterable[str], op: Callable[[Any], Any]
) -> None:
"""
Apply the given operation to all top-level CWL objects with the given named CWL class.
Like cwltool's visit_class but doesn't look inside any object visited.
"""
if isinstance(rec, MutableMapping):
if rec.get("class", None) in classes:
# This is one of the classes requested
# So process it
op(rec)
else:
# Look inside it instead
for key in rec:
visit_top_cwl_class(rec[key], classes, op)
elif isinstance(rec, MutableSequence):
# This item is actually a list of things, so look at all of them.
for key in rec:
visit_top_cwl_class(key, classes, op)
DownReturnType = TypeVar("DownReturnType")
UpReturnType = TypeVar("UpReturnType")
[docs]
def visit_cwl_class_and_reduce(
rec: Any,
classes: Iterable[str],
op_down: Callable[[Any], DownReturnType],
op_up: Callable[[Any, DownReturnType, List[UpReturnType]], UpReturnType],
) -> List[UpReturnType]:
"""
Apply the given operations to all CWL objects with the given named CWL class.
Applies the down operation top-down, and the up operation bottom-up, and
passes the down operation's result and a list of the up operation results
for all child keys (flattening across lists and collapsing nodes of
non-matching classes) to the up operation.
:returns: The flattened list of up operation results from all calls.
"""
results = []
if isinstance(rec, MutableMapping):
child_results = []
if rec.get("class", None) in classes:
# Apply the down operation
down_result = op_down(rec)
for key in rec:
# Look inside and collect child results
for result in visit_cwl_class_and_reduce(rec[key], classes, op_down, op_up):
child_results.append(result)
if rec.get("class", None) in classes:
# Apply the up operation
results.append(op_up(rec, down_result, child_results))
else:
# We aren't processing here so pass up all the child results
results += child_results
elif isinstance(rec, MutableSequence):
# This item is actually a list of things, so look at all of them.
for key in rec:
for result in visit_cwl_class_and_reduce(key, classes, op_down, op_up):
# And flatten together all their results.
results.append(result)
return results
DirectoryStructure = Dict[str, Union[str, "DirectoryStructure"]]
[docs]
def get_from_structure(dir_dict: DirectoryStructure, path: str) -> Union[str, DirectoryStructure, None]:
"""
Given a relative path, follow it in the given directory structure.
Return the string URI for files, the directory dict for
subdirectories, or None for nonexistent things.
"""
# Resolve .. and split into path components
parts = PurePosixPath(posixpath.normpath(path)).parts
if len(parts) == 0:
return dir_dict
if parts[0] in ('..', '/'):
raise RuntimeError(f"Path {path} not resolvable in virtual directory")
found: Union[str, DirectoryStructure] = dir_dict
for part in parts:
# Go down by each path component in turn
if isinstance(found, str):
# Looking for a subdirectory of a file, which doesn't exist
return None
if part not in found:
return None
found = found[part]
# Now we're at the place we want to be.
return found
[docs]
def download_structure(
file_store: AbstractFileStore,
index: Dict[str, str],
existing: Dict[str, str],
dir_dict: DirectoryStructure,
into_dir: str,
) -> None:
"""
Download nested dictionary from the Toil file store to a local path.
Guaranteed to fill the structure with real files, and not symlinks out of
it to elsewhere. File URIs may be toilfile: URIs or any other URI that
Toil's job store system can read.
:param file_store: The Toil file store to download from.
:param index: Maps from downloaded file path back to input URI.
:param existing: Maps from file_store_id URI to downloaded file path.
:param dir_dict: a dict from string to string (for files) or dict (for
subdirectories) describing a directory structure.
:param into_dir: The directory to download the top-level dict's files
into.
"""
logger.debug("Downloading directory with %s items", len(dir_dict))
for name, value in dir_dict.items():
if name == ".":
# Skip this key that isn't a real child file.
continue
if isinstance(value, dict):
# This is a subdirectory, so make it and download
# its contents
logger.debug("Downloading subdirectory '%s'", name)
subdir = os.path.join(into_dir, name)
os.mkdir(subdir)
download_structure(file_store, index, existing, value, subdir)
else:
# This must be a file path uploaded to Toil.
if not isinstance(value, str):
raise RuntimeError(f"Did not find a file at {value}.")
logger.debug("Downloading contained file '%s'", name)
dest_path = os.path.join(into_dir, name)
if value.startswith("toilfile:"):
# So download the file into place.
# Make sure to get a real copy of the file because we may need to
# mount the directory into a container as a whole.
file_store.readGlobalFile(
FileID.unpack(value[len("toilfile:") :]), dest_path, symlink=False
)
else:
# We need to download from some other kind of URL.
size, executable = AbstractJobStore.read_from_url(value, open(dest_path, 'wb'))
if executable:
# Make the written file executable
os.chmod(dest_path, os.stat(dest_path).st_mode | stat.S_IXUSR)
# Update the index dicts
# TODO: why?
index[dest_path] = value
existing[value] = dest_path