# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from abc import ABC, abstractmethod
from collections.abc import Generator, Iterator
from contextlib import contextmanager
from tempfile import mkstemp
from threading import Event, Semaphore
from typing import (
IO,
TYPE_CHECKING,
Any,
Callable,
ContextManager,
Literal,
Optional,
Union,
cast,
overload,
)
import dill
from toil.common import Toil, cacheDirName, getDirSizeRecursively
from toil.fileStores import FileID
from toil.job import DebugStoppingPointReached, Job, JobDescription
from toil.jobStores.abstractJobStore import AbstractJobStore
from toil.lib.compatibility import deprecated
from toil.lib.conversions import bytes2human
from toil.lib.io import WriteWatchingStream, mkdtemp
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from toil.fileStores.cachingFileStore import CachingFileStore
from toil.fileStores.nonCachingFileStore import NonCachingFileStore
[docs]
class AbstractFileStore(ABC):
"""
Interface used to allow user code run by Toil to read and write files.
Also provides the interface to other Toil facilities used by user code,
including:
* normal (non-real-time) logging
* finding the correct temporary directory for scratch work
* importing and exporting files into and out of the workflow
Stores user files in the jobStore, but keeps them separate from actual
jobs.
May implement caching.
Passed as argument to the :meth:`toil.job.Job.run` method.
Access to files is only permitted inside the context manager provided by
:meth:`toil.fileStores.abstractFileStore.AbstractFileStore.open`.
Also responsible for committing completed jobs back to the job store with
an update operation, and allowing that commit operation to be waited for.
"""
# Variables used for syncing reads/writes
_pendingFileWritesLock = Semaphore()
_pendingFileWrites: set[str] = set()
_terminateEvent = Event() # Used to signify crashes in threads
[docs]
def __init__(
self,
jobStore: AbstractJobStore,
jobDesc: JobDescription,
file_store_dir: str,
waitForPreviousCommit: Callable[[], Any],
) -> None:
"""
Create a new file store object.
:param jobStore: the job store in use for the current Toil run.
:param jobDesc: the JobDescription object for the currently
running job.
:param file_store_dir: the per-worker local temporary directory where
the file store should store local files. Per-job directories will be
created under here by the file store.
:param waitForPreviousCommit: the waitForCommit method of the previous job's
file store, when jobs are running in sequence on the same worker. Used to
prevent this file store's startCommit and the previous job's
startCommit methods from running at the same time and racing. If
they did race, it might be possible for the later job to be fully
marked as completed in the job store before the eralier job was.
"""
self.jobStore = jobStore
self.jobDesc = jobDesc
# This gets replaced with a subdirectory of itself on open()
self.localTempDir: str = os.path.abspath(file_store_dir)
assert self.jobStore.config.workflowID is not None
self.workflow_dir: str = Toil.getLocalWorkflowDir(
self.jobStore.config.workflowID, self.jobStore.config.workDir
)
self.coordination_dir: str = Toil.get_local_workflow_coordination_dir(
self.jobStore.config.workflowID,
self.jobStore.config.workDir,
self.jobStore.config.coordination_dir,
)
self.jobName: str = str(self.jobDesc)
self.waitForPreviousCommit = waitForPreviousCommit
self.logging_messages: list[dict[str, Union[int, str]]] = []
self.logging_user_streams: list[dict[str, str]] = []
# Records file IDs of files deleted during the current job. Doesn't get
# committed back until the job is completely successful, because if the
# job is re-run it will need to be able to re-delete these files.
# This is a set of str objects, not FileIDs.
self.filesToDelete: set[str] = set()
# Holds records of file ID, or file ID and local path, for reporting
# the accessed files of failed jobs.
self._accessLog: list[tuple[str, ...]] = []
# Holds total bytes of observed disk usage for the last job run under open()
self._job_disk_used: Optional[int] = None
[docs]
@staticmethod
def createFileStore(
jobStore: AbstractJobStore,
jobDesc: JobDescription,
file_store_dir: str,
waitForPreviousCommit: Callable[[], Any],
caching: Optional[bool],
) -> Union["NonCachingFileStore", "CachingFileStore"]:
"""Create a concreate FileStore."""
# Defer these imports until runtime, since these classes depend on us
from toil.fileStores.cachingFileStore import CachingFileStore
from toil.fileStores.nonCachingFileStore import NonCachingFileStore
fileStoreCls: Union[type["CachingFileStore"], type["NonCachingFileStore"]] = (
CachingFileStore if caching else NonCachingFileStore
)
return fileStoreCls(jobStore, jobDesc, file_store_dir, waitForPreviousCommit)
[docs]
@staticmethod
def shutdownFileStore(
workflowID: str,
config_work_dir: Optional[str],
config_coordination_dir: Optional[str],
) -> None:
"""
Carry out any necessary filestore-specific cleanup.
This is a destructive operation and it is important to ensure that there
are no other running processes on the system that are modifying or using
the file store for this workflow.
This is the intended to be the last call to the file store in a Toil run,
called by the batch system cleanup function upon batch system shutdown.
:param workflowID: The workflow ID for this invocation of the workflow
:param config_work_dir: The path to the work directory in the Toil Config.
:param config_coordination_dir: The path to the coordination directory in the Toil Config.
"""
# Defer these imports until runtime, since these classes depend on our file
from toil.fileStores.cachingFileStore import CachingFileStore
from toil.fileStores.nonCachingFileStore import NonCachingFileStore
workflowDir = Toil.getLocalWorkflowDir(workflowID, config_work_dir)
coordination_dir = Toil.get_local_workflow_coordination_dir(
workflowID, config_work_dir, config_coordination_dir
)
cacheDir = os.path.join(workflowDir, cacheDirName(workflowID))
if os.path.exists(cacheDir):
# The presence of the cacheDir suggests this was a cached run. We don't need
# the cache lock for any of this since this is the final cleanup of a job
# and there should be no other conflicting processes using the cache.
CachingFileStore.shutdown((coordination_dir, cacheDir))
else:
# This absence of cacheDir suggests otherwise.
NonCachingFileStore.shutdown(coordination_dir)
[docs]
@contextmanager
def open(self, job: Job) -> Generator[None, None, None]:
"""
Create the context manager around tasks prior and after a job has been run.
File operations are only permitted inside the context manager.
Implementations must only yield from within `with super().open(job):`.
:param job: The job instance of the toil job to run.
"""
job_requested_disk = job.disk
try:
yield
failed = False
except BaseException as e:
if isinstance(e, DebugStoppingPointReached):
self._dumpAccessLogs(job_type="Debugged", log_level=logging.INFO)
else:
self._dumpAccessLogs()
raise
finally:
# See how much disk space is used at the end of the job.
# Not a real peak disk usage, but close enough to be useful for warning the user.
self._job_disk_used = getDirSizeRecursively(self.localTempDir)
# Report disk usage
percent: float = 0.0
if job_requested_disk and job_requested_disk > 0:
percent = float(self._job_disk_used) / job_requested_disk * 100
disk_usage: str = (
f"Job {self.jobName} used {percent:.2f}% disk ({bytes2human(self._job_disk_used)}B [{self._job_disk_used}B] used, "
f"{bytes2human(job_requested_disk)}B [{job_requested_disk}B] requested)."
)
if self._job_disk_used > job_requested_disk:
self.log_to_leader(
"Job used more disk than requested. For CWL, consider increasing the outdirMin "
f"requirement, otherwise, consider increasing the disk requirement. {disk_usage}",
level=logging.WARNING,
)
else:
self.log_to_leader(disk_usage, level=logging.DEBUG)
[docs]
def get_disk_usage(self) -> Optional[int]:
"""
Get the number of bytes of disk used by the last job run under open().
Disk usage is measured at the end of the job.
TODO: Sample periodically and record peak usage.
"""
return self._job_disk_used
# Functions related to temp files and directories
[docs]
def getLocalTempDir(self) -> str:
"""
Get a new local temporary directory in which to write files.
The directory will only persist for the duration of the job.
:return: The absolute path to a new local temporary directory. This directory
will exist for the duration of the job only, and is guaranteed
to be deleted once the job terminates, removing all files it
contains recursively.
"""
return os.path.abspath(mkdtemp(dir=self.localTempDir))
[docs]
def getLocalTempFile(
self, suffix: Optional[str] = None, prefix: Optional[str] = None
) -> str:
"""
Get a new local temporary file that will persist for the duration of the job.
:param suffix: If not None, the file name will end with this string.
Otherwise, default value ".tmp" will be used
:param prefix: If not None, the file name will start with this string.
Otherwise, default value "tmp" will be used
:return: The absolute path to a local temporary file. This file will exist
for the duration of the job only, and is guaranteed to be deleted
once the job terminates.
"""
handle, tmpFile = mkstemp(
suffix=".tmp" if suffix is None else suffix,
prefix="tmp" if prefix is None else prefix,
dir=self.localTempDir,
)
os.close(handle)
return os.path.abspath(tmpFile)
[docs]
def getLocalTempFileName(
self, suffix: Optional[str] = None, prefix: Optional[str] = None
) -> str:
"""
Get a valid name for a new local file. Don't actually create a file at the path.
:param suffix: If not None, the file name will end with this string.
Otherwise, default value ".tmp" will be used
:param prefix: If not None, the file name will start with this string.
Otherwise, default value "tmp" will be used
:return: Path to valid file
"""
# Create, and then delete a temp file. Creating will guarantee you a unique,
# unused file name. There is a very, very, very low chance that another
# job will create the same file name in the span of this one being deleted
# and then being used by the user.
tempFile = self.getLocalTempFile(suffix, prefix)
os.remove(tempFile)
return tempFile
# Functions related to reading, writing and removing files to/from the job store
[docs]
@abstractmethod
def writeGlobalFile(self, localFileName: str, cleanup: bool = False) -> FileID:
"""
Upload a file (as a path) to the job store.
If the file is in a FileStore-managed temporary directory (i.e. from
:func:`toil.fileStores.abstractFileStore.AbstractFileStore.getLocalTempDir`),
it will become a local copy of the file, eligible for deletion by
:func:`toil.fileStores.abstractFileStore.AbstractFileStore.deleteLocalFile`.
If an executable file on the local filesystem is uploaded, its executability
will be preserved when it is downloaded again.
:param localFileName: The path to the local file to upload. The
last path component (basename of the file) will remain
associated with the file in the file store, if supported by the
backing JobStore, so that the file can be searched for by name
or name glob.
:param cleanup: if True then the copy of the global file will be deleted once
the job and all its successors have completed running. If not the global
file must be deleted manually.
:return: an ID that can be used to retrieve the file.
"""
raise NotImplementedError()
[docs]
@contextmanager
def writeGlobalFileStream(
self,
cleanup: bool = False,
basename: Optional[str] = None,
encoding: Optional[str] = None,
errors: Optional[str] = None,
) -> Iterator[tuple[WriteWatchingStream, FileID]]:
"""
Similar to writeGlobalFile, but allows the writing of a stream to the job store.
The yielded file handle does not need to and should not be closed explicitly.
:param encoding: The name of the encoding used to decode the file. Encodings
are the same as for decode(). Defaults to None which represents binary mode.
:param errors: Specifies how encoding errors are to be handled. Errors are the
same as for open(). Defaults to 'strict' when an encoding is specified.
:param cleanup: is as in
:func:`toil.fileStores.abstractFileStore.AbstractFileStore.writeGlobalFile`.
:param basename: If supported by the backing JobStore, use the given
file basename so that when searching the job store with a query
matching that basename, the file will be detected.
:return: A context manager yielding a tuple of
1) a file handle which can be written to and
2) the toil.fileStores.FileID of the resulting file in the job store.
"""
with self.jobStore.write_file_stream(
str(self.jobDesc.jobStoreID), cleanup, basename, encoding, errors
) as (backingStream, fileStoreID):
# We have a string version of the file ID, and the backing stream.
# We need to yield a stream the caller can write to, and a FileID
# that accurately reflects the size of the data written to the
# stream. We assume the stream is not seekable.
# Make and keep a reference to the file ID, which is currently empty
fileID = FileID(fileStoreID, 0)
# Wrap the stream to increment the file ID's size for each byte written
wrappedStream = WriteWatchingStream(backingStream)
# When the stream is written to, count the bytes
def handle(numBytes: int) -> None:
# No scope problem here, because we don't assign to a fileID local
fileID.size += numBytes
wrappedStream.onWrite(handle)
yield wrappedStream, fileID
def _dumpAccessLogs(
self, job_type: str = "Failed", log_level: int = logging.WARNING
) -> None:
"""
Log a report of the files accessed.
Includes the files that were accessed while the file store was open.
:param job_type: Adjective to describe the job in the report.
"""
if len(self._accessLog) > 0:
logger.log(log_level, "%s job accessed files:", job_type)
for item in self._accessLog:
# For each access record
if len(item) == 2:
# If it has a name, dump wit the name
file_id, dest_path = item
if os.path.exists(dest_path):
if os.path.islink(dest_path):
logger.log(
log_level,
"Symlinked file '%s' to path '%s'",
file_id,
dest_path,
)
else:
logger.log(
log_level,
"Downloaded file '%s' to path '%s'",
file_id,
dest_path,
)
else:
logger.log(
log_level,
"Downloaded file '%s' to path '%s' (gone!)",
file_id,
dest_path,
)
else:
# Otherwise dump without the name
logger.log(log_level, "Streamed file '%s'", *item)
[docs]
def logAccess(
self, fileStoreID: Union[FileID, str], destination: Union[str, None] = None
) -> None:
"""
Record that the given file was read by the job.
(to be announced if the job fails)
If destination is not None, it gives the path that the file
was downloaded to. Otherwise, assumes that the file was streamed.
Must be called by :meth:`readGlobalFile` and :meth:`readGlobalFileStream`
implementations.
"""
if destination is not None:
self._accessLog.append((fileStoreID, destination))
else:
self._accessLog.append((fileStoreID,))
[docs]
@abstractmethod
def readGlobalFile(
self,
fileStoreID: str,
userPath: Optional[str] = None,
cache: bool = True,
mutable: bool = False,
symlink: bool = False,
) -> str:
"""
Make the file associated with fileStoreID available locally.
If mutable is True, then a copy of the file will be created locally so
that the original is not modified and does not change the file for other
jobs. If mutable is False, then a link can be created to the file, saving
disk resources. The file that is downloaded will be executable if and only
if it was originally uploaded from an executable file on the local filesystem.
If a user path is specified, it is used as the destination. If a user path isn't
specified, the file is stored in the local temp directory with an encoded name.
The destination file must not be deleted by the user; it can only be
deleted through deleteLocalFile.
Implementations must call :meth:`logAccess` to report the download.
:param fileStoreID: job store id for the file
:param userPath: a path to the name of file to which the global file will
be copied or hard-linked (see below).
:param cache: Described in
:func:`toil.fileStores.CachingFileStore.readGlobalFile`
:param mutable: Described in
:func:`toil.fileStores.CachingFileStore.readGlobalFile`
:param symlink: True if caller can accept symlink, False if caller can only accept a normal file or hardlink
:return: An absolute path to a local, temporary copy of the file keyed
by fileStoreID.
"""
raise NotImplementedError()
@overload
def readGlobalFileStream(
self,
fileStoreID: str,
encoding: Literal[None] = None,
errors: Optional[str] = None,
) -> ContextManager[IO[bytes]]: ...
@overload
def readGlobalFileStream(
self, fileStoreID: str, encoding: str, errors: Optional[str] = None
) -> ContextManager[IO[str]]: ...
[docs]
@abstractmethod
def readGlobalFileStream(
self,
fileStoreID: str,
encoding: Optional[str] = None,
errors: Optional[str] = None,
) -> ContextManager[Union[IO[bytes], IO[str]]]:
"""
Read a stream from the job store; similar to readGlobalFile.
The yielded file handle does not need to and should not be closed explicitly.
:param encoding: the name of the encoding used to decode the file. Encodings
are the same as for decode(). Defaults to None which represents binary mode.
:param errors: an optional string that specifies how encoding errors are
to be handled. Errors are the same as for open(). Defaults to 'strict'
when an encoding is specified.
Implementations must call :meth:`logAccess` to report the download.
:return: a context manager yielding a file handle which can be read from.
"""
raise NotImplementedError()
[docs]
def getGlobalFileSize(self, fileStoreID: Union[FileID, str]) -> int:
"""
Get the size of the file pointed to by the given ID, in bytes.
If a FileID or something else with a non-None 'size' field, gets that.
Otherwise, asks the job store to poll the file's size.
Note that the job store may overestimate the file's size, for example
if it is encrypted and had to be augmented with an IV or other
encryption framing.
:param fileStoreID: File ID for the file
:return: File's size in bytes, as stored in the job store
"""
# First try and see if the size is still attached
size = getattr(fileStoreID, "size", None)
if size is None:
# It fell off
# Someone is mixing FileStore and JobStore file APIs,
# or serializing FileIDs as strings.
size = self.jobStore.get_file_size(fileStoreID)
return cast(int, size)
[docs]
@abstractmethod
def deleteLocalFile(self, fileStoreID: Union[FileID, str]) -> None:
"""
Delete local copies of files associated with the provided job store ID.
Raises an OSError with an errno of errno.ENOENT if no such local copies
exist. Thus, cannot be called multiple times in succession.
The files deleted are all those previously read from this file ID via
readGlobalFile by the current job into the job's file-store-provided
temp directory, plus the file that was written to create the given file
ID, if it was written by the current job from the job's
file-store-provided temp directory.
:param fileStoreID: File Store ID of the file to be deleted.
"""
raise NotImplementedError()
[docs]
@abstractmethod
def deleteGlobalFile(self, fileStoreID: Union[FileID, str]) -> None:
"""
Delete local files and then permanently deletes them from the job store.
To ensure that the job can be restarted if necessary, the delete will not
happen until after the job's run method has completed.
:param fileStoreID: the File Store ID of the file to be deleted.
"""
raise NotImplementedError()
# Functions used to read and write files directly between a source url
# and the job store.
[docs]
@deprecated(new_function_name="import_file")
def importFile(
self, srcUrl: str, sharedFileName: Optional[str] = None
) -> Optional[FileID]:
return self.import_file(srcUrl, sharedFileName)
[docs]
def import_file(
self, src_uri: str, shared_file_name: Optional[str] = None
) -> Optional[FileID]:
return self.jobStore.import_file(src_uri, shared_file_name=shared_file_name)
[docs]
@deprecated(new_function_name="export_file")
def exportFile(self, jobStoreFileID: FileID, dstUrl: str) -> None:
return self.export_file(jobStoreFileID, dstUrl)
[docs]
def export_file(self, file_id: FileID, dst_uri: str) -> None:
raise NotImplementedError()
# A utility method for accessing filenames
def _resolveAbsoluteLocalPath(self, filePath: str) -> str:
"""
Return the absolute path to filePath.
This is a wrapper for os.path.abspath because mac OS
symlinks /tmp and /var (the most common places for a default tempdir) to
/private/tmp and /private/var respectively.
:param filePath: The absolute or relative path to the file. If relative,
it must be relative to the local temp working dir
:return: Absolute path to key
"""
if os.path.isabs(filePath):
return os.path.abspath(filePath)
else:
return os.path.join(self.localTempDir, filePath)
class _StateFile:
"""Read and write dill-ed state dictionaries from/to a file into a namespace."""
def __init__(self, stateDict: dict[str, Any]):
assert isinstance(stateDict, dict)
self.__dict__.update(stateDict)
@classmethod
@abstractmethod
@contextmanager
def open(cls, outer: Optional[Any] = None) -> Iterator[Any]:
"""
This is a context manager that state file and reads it into an object that is returned
to the user in the yield.
:param outer: Instance of the calling class (to use outer methods).
"""
raise NotImplementedError()
@classmethod
def _load(cls, fileName: str) -> Any:
"""
Load the state of the cache from the state file.
:param fileName: Path to the cache state file.
:return: An instance of the state as a namespace.
:rtype: _StateFile
"""
# Read the value from the cache state file then initialize and instance of
# _CacheState with it.
with open(fileName, "rb") as fH:
infoDict = dill.load(fH)
return cls(infoDict)
def write(self, fileName: str) -> None:
"""
Write the current state to the main state file.
:param fileName: Path to the state file.
"""
with open(fileName + ".tmp", "wb") as fH:
# Based on answer by user "Mark" at:
# http://stackoverflow.com/questions/2709800/how-to-pickle-yourself
# We can't pickle nested classes. So we have to pickle the variables
# of the class.
# If we ever change this, we need to ensure it doesn't break FileID
dill.dump(self.__dict__, fH)
os.rename(fileName + ".tmp", fileName)
# Functions related to logging
[docs]
def log_to_leader(self, text: str, level: int = logging.INFO) -> None:
"""
Send a logging message to the leader. The message will also be \
logged by the worker at the same level.
:param text: The string to log.
:param level: The logging level.
"""
logger.log(level=level, msg=("LOG-TO-MASTER: " + text))
self.logging_messages.append(dict(text=text, level=level))
[docs]
@deprecated(new_function_name="export_file")
def logToMaster(self, text: str, level: int = logging.INFO) -> None:
self.log_to_leader(text, level)
[docs]
def log_user_stream(self, name: str, stream: IO[bytes]) -> None:
"""
Send a stream of UTF-8 text to the leader as a named log stream.
Useful for things like the error logs of Docker containers. The leader
will show it to the user or organize it appropriately for user-level
log information.
:param name: A hierarchical, .-delimited string.
:param stream: A stream of encoded text. Encoding errors will be
tolerated.
"""
# Read the whole stream into memory
steam_data = stream.read().decode("utf-8", errors="replace")
# And remember it for the worker to fish out
self.logging_user_streams.append(dict(name=name, text=steam_data))
# Functions run after the completion of the job.
[docs]
@abstractmethod
def startCommit(self, jobState: bool = False) -> None:
"""
Update the status of the job on the disk.
May bump the version number of the job.
May start an asynchronous process. Call waitForCommit() to wait on that
process. You must waitForCommit() before committing any further updates
to the job. During the asynchronous process, it is safe to modify the
job; modifications after this call will not be committed until the next
call.
:param jobState: If True, commit the state of the FileStore's job,
and file deletes. Otherwise, commit only file creates/updates.
"""
raise NotImplementedError()
[docs]
@abstractmethod
def waitForCommit(self) -> bool:
"""
Blocks while startCommit is running.
This function is called by this job's successor to ensure that it does
not begin modifying the job store until after this job has finished doing so.
Might be called when startCommit is never called on a particular
instance, in which case it does not block.
:return: Always returns True
"""
raise NotImplementedError()
[docs]
@classmethod
@abstractmethod
def shutdown(cls, shutdown_info: Any) -> None:
"""
Shutdown the filestore on this node.
This is intended to be called on batch system shutdown.
:param shutdown_info: The implementation-specific shutdown information,
for shutting down the file store and removing all its state and all job
local temp directories from the node.
"""
raise NotImplementedError()