Source code for toil.job

# Copyright (C) 2015-2018 Regents of the University of California
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import, print_function

from future import standard_library
from builtins import zip
from builtins import map
from builtins import str
from builtins import object
from builtins import super
import collections
import importlib
import inspect
import logging
import sys
import os
import time
import dill
import tempfile

    import cPickle as pickle
except ImportError:
    import pickle

from abc import ABCMeta, abstractmethod
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from contextlib import contextmanager
from io import BytesIO

# Python 3 compatibility imports
from six import iteritems, string_types

from toil.lib.expando import Expando
from toil.lib.humanize import human2bytes

from toil.common import Toil, addOptions, safeUnpickleFromStream
from toil.deferred import DeferredFunction
from toil.fileStores import FileID
from toil.lib.bioio import (setLoggingFromOptions,
from toil.resource import ModuleDescriptor
from future.utils import with_metaclass

logger = logging.getLogger( __name__ )

class BaseJob(object):
    Inherit from this class to add job properties to an object.

    If the object doesn't specify explicit requirements, these properties will fall back
    to the configured defaults. If the value cannot be determined, an AttributeError is raised.
    def __init__(self, requirements, unitName, displayName=None, jobName=None):
        cores = requirements.get('cores')
        memory = requirements.get('memory')
        disk = requirements.get('disk')
        preemptable = requirements.get('preemptable')
        if unitName:
            assert isinstance(unitName, (str, bytes))
        if jobName:
            assert isinstance(jobName, (str, bytes))
        self.unitName = unitName
        self.displayName = displayName if displayName else self.__class__.__name__
        self.jobName = jobName if jobName else self.__class__.__name__
        self._cores = self._parseResource('cores', cores)
        self._memory = self._parseResource('memory', memory)
        self._disk = self._parseResource('disk', disk)
        self._preemptable = preemptable
        self._config = None

    def disk(self):
        The maximum number of bytes of disk the job will require to run.
        if self._disk is not None:
            return self._disk
        elif self._config is not None:
            return self._config.defaultDisk
            raise AttributeError("Default value for 'disk' cannot be determined")

    def memory(self):
        The maximum number of bytes of memory the job will require to run.
        if self._memory is not None:
            return self._memory
        elif self._config is not None:
            return self._config.defaultMemory
            raise AttributeError("Default value for 'memory' cannot be determined")

    def cores(self):
        The number of CPU cores required.
        if self._cores is not None:
            return self._cores
        elif self._config is not None:
            return self._config.defaultCores
            raise AttributeError("Default value for 'cores' cannot be determined")

    def preemptable(self):
        Whether the job can be run on a preemptable node.
        if self._preemptable is not None:
            return self._preemptable
        elif self._config is not None:
            return self._config.defaultPreemptable
            raise AttributeError("Default value for 'preemptable' cannot be determined")

    def _requirements(self):
        Gets a dictionary of all the object's resource requirements. Unset values are defaulted to None
        return {'memory': getattr(self, 'memory', None),
                'cores': getattr(self, 'cores', None),
                'disk': getattr(self, 'disk', None),
                'preemptable': getattr(self, 'preemptable', None)}

    def _parseResource(name, value):
        Parse a Toil job's resource requirement value and apply resource-specific type checks. If the
        value is a string, a binary or metric unit prefix in it will be evaluated and the
        corresponding integral value will be returned.

        :param str name: The name of the resource
        :param None|str|float|int value: The resource value
        :rtype: int|float|None

        >>> Job._parseResource('cores', None)
        >>> Job._parseResource('cores', 1), Job._parseResource('disk', 1), \
        Job._parseResource('memory', 1)
        (1, 1, 1)
        >>> Job._parseResource('cores', '1G'), Job._parseResource('disk', '1G'), \
        Job._parseResource('memory', '1G')
        (1073741824, 1073741824, 1073741824)
        >>> Job._parseResource('cores', 1.1)
        >>> Job._parseResource('disk', 1.1) # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        TypeError: The 'disk' requirement does not accept values that are of <type 'float'>
        >>> Job._parseResource('memory', object()) # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        TypeError: The 'memory' requirement does not accept values that are of ...
        assert name in ('memory', 'disk', 'cores')
        if value is None:
            return value
        elif isinstance(value, (str, bytes)):
            value = human2bytes(value)
        if isinstance(value, int):
            return value
        elif isinstance(value, float) and name == 'cores':
            return value
            raise TypeError("The '%s' requirement does not accept values that are of %s"
                            % (name, type(value)))

    def __str__(self):
        printedName = "'" + self.jobName + "'"
        if self.unitName:
            printedName += ' ' + self.unitName
        elif self.unitName == '':
            printedName += ' ' + 'user passed empty string for name'
        return printedName

class JobNode(BaseJob):
    This object bridges the job graph, job, and batchsystem classes
    def __init__(self, requirements, jobName, unitName, jobStoreID,
                 command, displayName=None, predecessorNumber=1):
        super().__init__(requirements=requirements, displayName=displayName, unitName=unitName, jobName=jobName)
        self.jobStoreID = jobStoreID
        self.predecessorNumber = predecessorNumber
        self.command = command

    def __str__(self):
        return super().__str__() + ' ' + self.jobStoreID

    def __hash__(self):
        return hash(self.jobStoreID)

    def __eq__(self, other):
        if isinstance(other, self.__class__):
            return self.__dict__ == other.__dict__
        return NotImplemented

    def __ne__(self, other):
        if isinstance(other, self.__class__):
            return not self.__eq__(other)
        return NotImplemented

    def __repr__(self):
        return '%s( **%r )' % (self.__class__.__name__, self.__dict__)

    def fromJobGraph(cls, jobGraph):
        Takes a job graph object and returns a job node object
        :param toil.jobGraph.JobGraph jobGraph: A job graph object to be transformed into a job node
        :return: A job node object
        :rtype: toil.job.JobNode
        return cls(jobStoreID=jobGraph.jobStoreID,

    def fromJob(cls, job, command, predecessorNumber):
        Build a job node from a job object
        :param toil.job.Job job: the job object to be transformed into a job node
        :param str command: the JobNode's command
        :param int predecessorNumber: the number of predecessors that must finish
            successfully before the job can be scheduled
        :return: a JobNode object representing the job object parameter
        :rtype: toil.job.JobNode
        return cls(jobStoreID=None,

[docs]class Job(BaseJob): """ Class represents a unit of work in toil. """
[docs] def __init__(self, memory=None, cores=None, disk=None, preemptable=None, unitName=None, checkpoint=False, displayName=None): """ This method must be called by any overriding constructor. :param memory: the maximum number of bytes of memory the job will require to run. :param cores: the number of CPU cores required. :param disk: the amount of local disk space required by the job, expressed in bytes. :param preemptable: if the job can be run on a preemptable node. :param checkpoint: if any of this job's successor jobs completely fails, exhausting all their retries, remove any successor jobs and rerun this job to restart the subtree. Job must be a leaf vertex in the job graph when initially defined, see :func:`toil.job.Job.checkNewCheckpointsAreCutVertices`. :type cores: int or string convertible by toil.lib.humanize.human2bytes to an int :type disk: int or string convertible by toil.lib.humanize.human2bytes to an int :type preemptable: bool :type cache: int or string convertible by toil.lib.humanize.human2bytes to an int :type memory: int or string convertible by toil.lib.humanize.human2bytes to an int """ requirements = {'memory': memory, 'cores': cores, 'disk': disk, 'preemptable': preemptable} super().__init__(requirements=requirements, unitName=unitName, displayName=displayName) self.checkpoint = checkpoint self.displayName = displayName if displayName is not None else self.__class__.__name__ #Private class variables #See Job.addChild self._children = [] #See Job.addFollowOn self._followOns = [] #See Job.addService self._services = [] #A follow-on, service or child of a job A, is a "direct successor" of A; if B #is a direct successor of A, then A is a "direct predecessor" of B. self._directPredecessors = set() # Note that self.__module__ is not necessarily this module, i.e. It is the module # defining the class self is an instance of, which may be a subclass of Job that may be # defined in a different module. self.userModule = ModuleDescriptor.forModule(self.__module__).globalize() # Maps index paths into composite return values to lists of IDs of files containing # promised values for those return value items. An index path is a tuple of indices that # traverses a nested data structure of lists, dicts, tuples or any other type supporting # the __getitem__() protocol.. The special key `()` (the empty tuple) represents the # entire return value. self._rvs = collections.defaultdict(list) self._promiseJobStore = None self._fileStore = None self._defer = None self._tempDir = None self._succeeded = True
[docs] def run(self, fileStore): """ Override this function to perform work and dynamically create successor jobs. :param toil.fileStores.abstractFileStore.AbstractFileStore fileStore: Used to create local and globally sharable temporary files and to send log messages to the leader process. :return: The return value of the function can be passed to other jobs by means of :func:`toil.job.Job.rv`. """ pass
[docs] def addChild(self, childJob): """ Adds childJob to be run as child of this job. Child jobs will be run \ directly after this job's :func:`` method has completed. :param toil.job.Job childJob: :return: childJob :rtype: toil.job.Job """ self._children.append(childJob) childJob._addPredecessor(self) return childJob
[docs] def hasChild(self, childJob): """ Check if childJob is already a child of this job. :param toil.job.Job childJob: :return: True if childJob is a child of the job, else False. :rtype: bool """ return childJob in self._children
[docs] def addFollowOn(self, followOnJob): """ Adds a follow-on job, follow-on jobs will be run after the child jobs and \ their successors have been run. :param toil.job.Job followOnJob: :return: followOnJob :rtype: toil.job.Job """ self._followOns.append(followOnJob) followOnJob._addPredecessor(self) return followOnJob
[docs] def hasFollowOn(self, followOnJob): """ Check if given job is already a follow-on of this job. :param toil.job.Job followOnJob: :return: True if the followOnJob is a follow-on of this job, else False. :rtype: bool """ return followOnJob in self._followOns
[docs] def addService(self, service, parentService=None): """ Add a service. The :func:`toil.job.Job.Service.start` method of the service will be called after the run method has completed but before any successors are run. The service's :func:`toil.job.Job.Service.stop` method will be called once the successors of the job have been run. Services allow things like databases and servers to be started and accessed by jobs in a workflow. :raises toil.job.JobException: If service has already been made the child of a job or another service. :param toil.job.Job.Service service: Service to add. :param toil.job.Job.Service parentService: Service that will be started before 'service' is started. Allows trees of services to be established. parentService must be a service of this job. :return: a promise that will be replaced with the return value from :func:`toil.job.Job.Service.start` of service in any successor of the job. :rtype: toil.job.Promise """ if parentService is not None: # Do check to ensure that parentService is a service of this job def check(services): for jS in services: if jS.service == parentService or check(jS.service._childServices): return True return False if not check(self._services): raise JobException("Parent service is not a service of the given job") return parentService._addChild(service) else: if service._hasParent: raise JobException("The service already has a parent service") service._hasParent = True jobService = ServiceJob(service) self._services.append(jobService) return jobService.rv()
##Convenience functions for creating jobs
[docs] def addChildFn(self, fn, *args, **kwargs): """ Adds a function as a child job. :param fn: Function to be run as a child job with ``*args`` and ``**kwargs`` as \ arguments to this function. See toil.job.FunctionWrappingJob for reserved \ keyword arguments used to specify resource requirements. :return: The new child job that wraps fn. :rtype: toil.job.FunctionWrappingJob """ if PromisedRequirement.convertPromises(kwargs): return self.addChild(PromisedRequirementFunctionWrappingJob.create(fn, *args, **kwargs)) else: return self.addChild(FunctionWrappingJob(fn, *args, **kwargs))
[docs] def addFollowOnFn(self, fn, *args, **kwargs): """ Adds a function as a follow-on job. :param fn: Function to be run as a follow-on job with ``*args`` and ``**kwargs`` as \ arguments to this function. See toil.job.FunctionWrappingJob for reserved \ keyword arguments used to specify resource requirements. :return: The new follow-on job that wraps fn. :rtype: toil.job.FunctionWrappingJob """ if PromisedRequirement.convertPromises(kwargs): return self.addFollowOn(PromisedRequirementFunctionWrappingJob.create(fn, *args, **kwargs)) else: return self.addFollowOn(FunctionWrappingJob(fn, *args, **kwargs))
[docs] def addChildJobFn(self, fn, *args, **kwargs): """ Adds a job function as a child job. See :class:`toil.job.JobFunctionWrappingJob` for a definition of a job function. :param fn: Job function to be run as a child job with ``*args`` and ``**kwargs`` as \ arguments to this function. See toil.job.JobFunctionWrappingJob for reserved \ keyword arguments used to specify resource requirements. :return: The new child job that wraps fn. :rtype: toil.job.JobFunctionWrappingJob """ if PromisedRequirement.convertPromises(kwargs): return self.addChild(PromisedRequirementJobFunctionWrappingJob.create(fn, *args, **kwargs)) else: return self.addChild(JobFunctionWrappingJob(fn, *args, **kwargs))
[docs] def addFollowOnJobFn(self, fn, *args, **kwargs): """ Add a follow-on job function. See :class:`toil.job.JobFunctionWrappingJob` for a definition of a job function. :param fn: Job function to be run as a follow-on job with ``*args`` and ``**kwargs`` as \ arguments to this function. See toil.job.JobFunctionWrappingJob for reserved \ keyword arguments used to specify resource requirements. :return: The new follow-on job that wraps fn. :rtype: toil.job.JobFunctionWrappingJob """ if PromisedRequirement.convertPromises(kwargs): return self.addFollowOn(PromisedRequirementJobFunctionWrappingJob.create(fn, *args, **kwargs)) else: return self.addFollowOn(JobFunctionWrappingJob(fn, *args, **kwargs))
@property def tempDir(self): """ Shortcut to calling :func:`job.fileStore.getLocalTempDir`. Temp dir is created on first call and will be returned for first and future calls :return: Path to tempDir. See `job.fileStore.getLocalTempDir` :rtype: str """ if self._tempDir is None: self._tempDir = self._fileStore.getLocalTempDir() return self._tempDir
[docs] def log(self, text, level=logging.INFO): """ convenience wrapper for :func:`fileStore.logToMaster` """ self._fileStore.logToMaster(text, level)
[docs] @staticmethod def wrapFn(fn, *args, **kwargs): """ Makes a Job out of a function. \ Convenience function for constructor of :class:`toil.job.FunctionWrappingJob`. :param fn: Function to be run with ``*args`` and ``**kwargs`` as arguments. \ See toil.job.JobFunctionWrappingJob for reserved keyword arguments used \ to specify resource requirements. :return: The new function that wraps fn. :rtype: toil.job.FunctionWrappingJob """ if PromisedRequirement.convertPromises(kwargs): return PromisedRequirementFunctionWrappingJob.create(fn, *args, **kwargs) else: return FunctionWrappingJob(fn, *args, **kwargs)
[docs] @staticmethod def wrapJobFn(fn, *args, **kwargs): """ Makes a Job out of a job function. \ Convenience function for constructor of :class:`toil.job.JobFunctionWrappingJob`. :param fn: Job function to be run with ``*args`` and ``**kwargs`` as arguments. \ See toil.job.JobFunctionWrappingJob for reserved keyword arguments used \ to specify resource requirements. :return: The new job function that wraps fn. :rtype: toil.job.JobFunctionWrappingJob """ if PromisedRequirement.convertPromises(kwargs): return PromisedRequirementJobFunctionWrappingJob.create(fn, *args, **kwargs) else: return JobFunctionWrappingJob(fn, *args, **kwargs)
[docs] def encapsulate(self): """ Encapsulates the job, see :class:`toil.job.EncapsulatedJob`. Convenience function for constructor of :class:`toil.job.EncapsulatedJob`. :return: an encapsulated version of this job. :rtype: toil.job.EncapsulatedJob """ return EncapsulatedJob(self)
#################################################### #The following function is used for passing return values between #job run functions ####################################################
[docs] def rv(self, *path): """ Creates a *promise* (:class:`toil.job.Promise`) representing a return value of the job's run method, or, in case of a function-wrapping job, the wrapped function's return value. :param (Any) path: Optional path for selecting a component of the promised return value. If absent or empty, the entire return value will be used. Otherwise, the first element of the path is used to select an individual item of the return value. For that to work, the return value must be a list, dictionary or of any other type implementing the `__getitem__()` magic method. If the selected item is yet another composite value, the second element of the path can be used to select an item from it, and so on. For example, if the return value is `[6,{'a':42}]`, `.rv(0)` would select `6` , `rv(1)` would select `{'a':3}` while `rv(1,'a')` would select `3`. To select a slice from a return value that is slicable, e.g. tuple or list, the path element should be a `slice` object. For example, assuming that the return value is `[6, 7, 8, 9]` then `.rv(slice(1, 3))` would select `[7, 8]`. Note that slicing really only makes sense at the end of path. :return: A promise representing the return value of this jobs :meth:`` method. :rtype: toil.job.Promise """ return Promise(self, path)
def registerPromise(self, path): if self._promiseJobStore is None: raise RuntimeError('Trying to pass a promise from a promising job that is not a ' + 'predecessor of the job receiving the promise') # TODO: can we guarantee self.jobStoreID is populated and so pass that here? with self._promiseJobStore.writeFileStream() as (fileHandle, jobStoreFileID): promise = UnfulfilledPromiseSentinel(str(self), False) pickle.dump(promise, fileHandle, pickle.HIGHEST_PROTOCOL) self._rvs[path].append(jobStoreFileID) return self._promiseJobStore.config.jobStore, jobStoreFileID
[docs] def prepareForPromiseRegistration(self, jobStore): """ Ensure that a promise by this job (the promissor) can register with the promissor when another job referring to the promise (the promissee) is being serialized. The promissee holds the reference to the promise (usually as part of the the job arguments) and when it is being pickled, so will the promises it refers to. Pickling a promise triggers it to be registered with the promissor. :return: """ self._promiseJobStore = jobStore
#################################################### #Cycle/connectivity checking ####################################################
[docs] def checkJobGraphForDeadlocks(self): """ See :func:`toil.job.Job.checkJobGraphConnected`, :func:`toil.job.Job.checkJobGraphAcyclic` and :func:`toil.job.Job.checkNewCheckpointsAreLeafVertices` for more info. :raises toil.job.JobGraphDeadlockException: if the job graph is cyclic, contains multiple roots or contains checkpoint jobs that are not leaf vertices when defined (see :func:`toil.job.Job.checkNewCheckpointsAreLeaves`). """ self.checkJobGraphConnected() self.checkJobGraphAcylic() self.checkNewCheckpointsAreLeafVertices()
[docs] def getRootJobs(self): """ :return: The roots of the connected component of jobs that contains this job. \ A root is a job with no predecessors. :rtype : set of toil.job.Job instances """ roots = set() visited = set() #Function to get the roots of a job def getRoots(job): if job not in visited: visited.add(job) if len(job._directPredecessors) > 0: list(map(lambda p : getRoots(p), job._directPredecessors)) else: roots.add(job) #The following call ensures we explore all successor edges. list(map(lambda c : getRoots(c), job._children + job._followOns)) getRoots(self) return roots
[docs] def checkJobGraphConnected(self): """ :raises toil.job.JobGraphDeadlockException: if :func:`toil.job.Job.getRootJobs` does \ not contain exactly one root job. As execution always starts from one root job, having multiple root jobs will \ cause a deadlock to occur. """ rootJobs = self.getRootJobs() if len(rootJobs) != 1: raise JobGraphDeadlockException("Graph does not contain exactly one" " root job: %s" % rootJobs)
[docs] def checkJobGraphAcylic(self): """ :raises toil.job.JobGraphDeadlockException: if the connected component \ of jobs containing this job contains any cycles of child/followOn dependencies \ in the *augmented job graph* (see below). Such cycles are not allowed \ in valid job graphs. A follow-on edge (A, B) between two jobs A and B is equivalent \ to adding a child edge to B from (1) A, (2) from each child of A, \ and (3) from the successors of each child of A. We call each such edge \ an edge an "implied" edge. The augmented job graph is a job graph including \ all the implied edges. For a job graph G = (V, E) the algorithm is ``O(|V|^2)``. It is ``O(|V| + |E|)`` for \ a graph with no follow-ons. The former follow-on case could be improved! """ #Get the root jobs roots = self.getRootJobs() if len(roots) == 0: raise JobGraphDeadlockException("Graph contains no root jobs due to cycles") #Get implied edges extraEdges = self._getImpliedEdges(roots) #Check for directed cycles in the augmented graph visited = set() for root in roots: root._checkJobGraphAcylicDFS([], visited, extraEdges)
[docs] def checkNewCheckpointsAreLeafVertices(self): """ A checkpoint job is a job that is restarted if either it fails, or if any of \ its successors completely fails, exhausting their retries. A job is a leaf it is has no successors. A checkpoint job must be a leaf when initially added to the job graph. When its \ run method is invoked it can then create direct successors. This restriction is made to simplify implementation. :raises toil.job.JobGraphDeadlockException: if there exists a job being added to the graph for which \ checkpoint=True and which is not a leaf. """ roots = self.getRootJobs() # Roots jobs of component, these are preexisting jobs in the graph # All jobs in the component of the job graph containing self jobs = set() list(map(lambda x : x._dfs(jobs), roots)) # Check for each job for which checkpoint is true that it is a cut vertex or leaf for y in [x for x in jobs if x.checkpoint]: if y not in roots: # The roots are the prexisting jobs if not Job._isLeafVertex(y): raise JobGraphDeadlockException("New checkpoint job %s is not a leaf in the job graph" % y)
[docs] def defer(self, function, *args, **kwargs): """ Register a deferred function, i.e. a callable that will be invoked after the current attempt at running this job concludes. A job attempt is said to conclude when the job function (or the :meth:`` method for class-based jobs) returns, raises an exception or after the process running it terminates abnormally. A deferred function will be called on the node that attempted to run the job, even if a subsequent attempt is made on another node. A deferred function should be idempotent because it may be called multiple times on the same node or even in the same process. More than one deferred function may be registered per job attempt by calling this method repeatedly with different arguments. If the same function is registered twice with the same or different arguments, it will be called twice per job attempt. Examples for deferred functions are ones that handle cleanup of resources external to Toil, like Docker containers, files outside the work directory, etc. :param callable function: The function to be called after this job concludes. :param list args: The arguments to the function :param dict kwargs: The keyword arguments to the function """ if self._defer is None: raise Exception('A deferred function may only be registered with a job while that job is running.') self._defer(DeferredFunction.create(function, *args, **kwargs))
#################################################### #The following nested classes are used for #creating jobtrees (Job.Runner), #and defining a service (Job.Service) ####################################################
[docs] class Runner(object): """ Used to setup and run Toil workflow. """
[docs] @staticmethod def getDefaultArgumentParser(): """ Get argument parser with added toil workflow options. :returns: The argument parser used by a toil workflow with added Toil options. :rtype: :class:`argparse.ArgumentParser` """ parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) Job.Runner.addToilOptions(parser) return parser
[docs] @staticmethod def getDefaultOptions(jobStore): """ Get default options for a toil workflow. :param string jobStore: A string describing the jobStore \ for the workflow. :returns: The options used by a toil workflow. :rtype: argparse.ArgumentParser values object """ parser = Job.Runner.getDefaultArgumentParser() return parser.parse_args(args=[jobStore])
[docs] @staticmethod def addToilOptions(parser): """ Adds the default toil options to an :mod:`optparse` or :mod:`argparse` parser object. :param parser: Options object to add toil options to. :type parser: optparse.OptionParser or argparse.ArgumentParser """ addOptions(parser)
[docs] @staticmethod def startToil(job, options): """ Deprecated by toil.common.Toil.start. Runs the toil workflow using the given options (see Job.Runner.getDefaultOptions and Job.Runner.addToilOptions) starting with this job. :param toil.job.Job job: root job of the workflow :raises: toil.leader.FailedJobsException if at the end of function \ their remain failed jobs. :return: The return value of the root job's run function. :rtype: Any """ setLoggingFromOptions(options) with Toil(options) as toil: if not options.restart: return toil.start(job) else: return toil.restart()
[docs] class Service(with_metaclass(ABCMeta, BaseJob)): """ Abstract class used to define the interface to a service. """
[docs] def __init__(self, memory=None, cores=None, disk=None, preemptable=None, unitName=None): """ Memory, core and disk requirements are specified identically to as in \ :func:`toil.job.Job.__init__`. """ requirements = {'memory': memory, 'cores': cores, 'disk': disk, 'preemptable': preemptable} super().__init__(requirements=requirements, unitName=unitName) self._childServices = [] self._hasParent = False
[docs] @abstractmethod def start(self, job): """ Start the service. :param toil.job.Job job: The underlying job that is being run. Can be used to register deferred functions, or to access the fileStore for creating temporary files. :returns: An object describing how to access the service. The object must be pickleable and will be used by jobs to access the service (see :func:`toil.job.Job.addService`). """ pass
[docs] @abstractmethod def stop(self, job): """ Stops the service. Function can block until complete. :param toil.job.Job job: The underlying job that is being run. Can be used to register deferred functions, or to access the fileStore for creating temporary files. """ pass
[docs] def check(self): """ Checks the service is still running. :raise exceptions.RuntimeError: If the service failed, this will cause the service job to be labeled failed. :returns: True if the service is still running, else False. If False then the service job will be terminated, and considered a success. Important point: if the service job exits due to a failure, it should raise a RuntimeError, not return False! """ pass
def _addChild(self, service): """ Add a child service to start up after this service has started. This should not be called by the user, instead use :func:`toil.job.Job.Service.addService` with the ``parentService`` option. :raises toil.job.JobException: If service has already been made the child of a job or another service. :param toil.job.Job.Service service: Service to add as a "child" of this service :return: a promise that will be replaced with the return value from \ :func:`toil.job.Job.Service.start` of service after the service has started. :rtype: toil.job.Promise """ if service._hasParent: raise JobException("The service already has a parent service") service._parent = True jobService = ServiceJob(service) self._childServices.append(jobService) return jobService.rv()
#################################################### #Private functions #################################################### def _addPredecessor(self, predecessorJob): """ Adds a predecessor job to the set of predecessor jobs. Raises a \ RuntimeError if the job is already a predecessor. """ if predecessorJob in self._directPredecessors: raise RuntimeError("The given job is already a predecessor of this job") self._directPredecessors.add(predecessorJob) @staticmethod def _isLeafVertex(job): return len(job._children) == 0 \ and len(job._followOns) == 0 \ and len(job._services) == 0 @classmethod def _loadUserModule(cls, userModule): """ Imports and returns the module object represented by the given module descriptor. :type userModule: ModuleDescriptor """ return userModule.load() @classmethod def _loadJob(cls, command, jobStore): """ Unpickles a :class:`toil.job.Job` instance by decoding command. The command is a reference to a jobStoreFileID containing the \ pickle file for the job and a list of modules which must be imported so that \ the Job can be successfully unpickled. \ See :func:`toil.job.Job._serialiseFirstJob` and \ :func:`toil.job.Job._makeJobGraphs` to see precisely how the Job is encoded \ in the command. :param string command: encoding of the job in the job store. :param toil.jobStores.abstractJobStore.AbstractJobStore jobStore: The job store. :returns: The job referenced by the command. :rtype: toil.job.Job """ commandTokens = command.split() assert "_toil" == commandTokens[0] userModule = ModuleDescriptor.fromCommand(commandTokens[2:]) logger.debug('Loading user module %s.', userModule) userModule = cls._loadUserModule(userModule) pickleFile = commandTokens[1] # Get a directory to download the job in directory = tempfile.mkdtemp() # Initialize a blank filename so the finally below can't fail due to a # missing variable filename = '' try: # Get a filename to download the job to. # Don't use mkstemp because we would need to delete and replace the # file. # Don't use a NamedTemporaryFile context manager because its # context manager exit will crash if we deleted it. filename = os.path.join(directory, 'job') # Download the job if pickleFile == "firstJob": jobStore.readSharedFile(pickleFile, filename) else: jobStore.readFile(pickleFile, filename) # Open and unpickle with open(filename, 'rb') as fileHandle: return cls._unpickle(userModule, fileHandle, jobStore.config) # TODO: We ought to just unpickle straight from a streaming read finally: # Clean up the file if os.path.exists(filename): os.unlink(filename) # Clean up the directory we put it in # TODO: we assume nobody else put anything in the directory if os.path.exists(directory): os.rmdir(directory) @classmethod def _unpickle(cls, userModule, fileHandle, config): """ Unpickles an object graph from the given file handle while loading symbols \ referencing the __main__ module from the given userModule instead. :param userModule: :param fileHandle: An open, binary-mode file handle. :returns: """ def filter_main(module_name, class_name): try: if module_name == '__main__': return getattr(userModule, class_name) else: return getattr(importlib.import_module(module_name), class_name) except: if module_name == '__main__': logger.debug('Failed getting %s from module %s.', class_name, userModule) else: logger.debug('Failed getting %s from module %s.', class_name, module_name) raise try: unpickler = pickle.Unpickler(fileHandle) # In Python 2 with cPickle we set "find_global" unpickler.find_global = filter_main except AttributeError: # In Python 3 find_global isn't real and we are supposed to # subclass unpickler and override find_class. We can't just replace # it. But with cPickle in Pyhton 2 we can't subclass Unpickler. class FilteredUnpickler(pickle.Unpickler): def find_class(self, module, name): return filter_main(module, name) unpickler = FilteredUnpickler(fileHandle) runnable = unpickler.load() assert isinstance(runnable, BaseJob) runnable._config = config return runnable def getUserScript(self): return self.userModule def _fulfillPromises(self, returnValues, jobStore): """ Sets the values for promises using the return values from this job's run() function. """ for path, promiseFileStoreIDs in iteritems(self._rvs): if not path: # Note that its possible for returnValues to be a promise, not an actual return # value. This is the case if the job returns a promise from another job. In # either case, we just pass it on. promisedValue = returnValues else: # If there is an path ... if isinstance(returnValues, Promise): # ... and the value itself is a Promise, we need to created a new, narrower # promise and pass it on. promisedValue = Promise(returnValues.job, path) else: # Otherwise, we just select the desired component of the return value. promisedValue = returnValues for index in path: promisedValue = promisedValue[index] for promiseFileStoreID in promiseFileStoreIDs: # File may be gone if the job is a service being re-run and the accessing job is # already complete. if jobStore.fileExists(promiseFileStoreID): with jobStore.updateFileStream(promiseFileStoreID) as fileHandle: pickle.dump(promisedValue, fileHandle, pickle.HIGHEST_PROTOCOL) # Functions associated with Job.checkJobGraphAcyclic to establish that the job graph does not # contain any cycles of dependencies: def _dfs(self, visited): """ Adds the job and all jobs reachable on a directed path from current node to the given set. """ if self not in visited: visited.add(self) for successor in self._children + self._followOns: successor._dfs(visited) def _checkJobGraphAcylicDFS(self, stack, visited, extraEdges): """ DFS traversal to detect cycles in augmented job graph. """ if self not in visited: visited.add(self) stack.append(self) for successor in self._children + self._followOns + extraEdges[self]: successor._checkJobGraphAcylicDFS(stack, visited, extraEdges) assert stack.pop() == self if self in stack: stack.append(self) raise JobGraphDeadlockException("A cycle of job dependencies has been detected '%s'" % stack) @staticmethod def _getImpliedEdges(roots): """ Gets the set of implied edges. See Job.checkJobGraphAcylic """ #Get nodes in job graph nodes = set() for root in roots: root._dfs(nodes) ##For each follow-on edge calculate the extra implied edges #Adjacency list of implied edges, i.e. map of jobs to lists of jobs #connected by an implied edge extraEdges = dict([(n, []) for n in nodes]) for job in nodes: if len(job._followOns) > 0: #Get set of jobs connected by a directed path to job, starting #with a child edge reacheable = set() for child in job._children: child._dfs(reacheable) #Now add extra edges for descendant in reacheable: extraEdges[descendant] += job._followOns[:] return extraEdges #################################################### #The following functions are used to serialise #a job graph to the jobStore #################################################### def _createEmptyJobGraphForJob(self, jobStore, command=None, predecessorNumber=0): """ Create an empty job for the job. """ # set _config to determine user determined default values for resource requirements self._config = jobStore.config return jobStore.create(JobNode.fromJob(self, command=command, predecessorNumber=predecessorNumber)) def _makeJobGraphs(self, jobGraph, jobStore): """ Creates a jobGraph for each job in the job graph, recursively. """ jobsToJobGraphs = {self:jobGraph} for successors in (self._followOns, self._children): jobs = [successor._makeJobGraphs2(jobStore, jobsToJobGraphs) for successor in successors] jobGraph.stack.append(jobs) return jobsToJobGraphs def _makeJobGraphs2(self, jobStore, jobsToJobGraphs): #Make the jobGraph for the job, if necessary if self not in jobsToJobGraphs: jobGraph = self._createEmptyJobGraphForJob(jobStore, predecessorNumber=len(self._directPredecessors)) jobsToJobGraphs[self] = jobGraph #Add followOns/children to be run after the current job. for successors in (self._followOns, self._children): jobs = [successor._makeJobGraphs2(jobStore, jobsToJobGraphs) for successor in successors] jobGraph.stack.append(jobs) else: jobGraph = jobsToJobGraphs[self] #The return is a tuple stored within a job.stack #The tuple is jobStoreID, memory, cores, disk, #The predecessorID is used to establish which predecessors have been #completed before running the given Job - it is just a unique ID #per predecessor return JobNode.fromJobGraph(jobGraph)
[docs] def getTopologicalOrderingOfJobs(self): """ :returns: a list of jobs such that for all pairs of indices i, j for which i < j, \ the job at index i can be run before the job at index j. :rtype: list """ ordering = [] visited = set() def getRunOrder(job): #Do not add the job to the ordering until all its predecessors have been #added to the ordering for p in job._directPredecessors: if p not in visited: return if job not in visited: visited.add(job) ordering.append(job) list(map(getRunOrder, job._children + job._followOns)) getRunOrder(self) return ordering
def _serialiseJob(self, jobStore, jobsToJobGraphs, rootJobGraph): """ Pickle a job and its jobGraph to disk. """ # Pickle the job so that its run method can be run at a later time. # Drop out the children/followOns/predecessors/services - which are # all recorded within the jobStore and do not need to be stored within # the job self._children, self._followOns, self._services = [], [], [] self._directPredecessors, self._promiseJobStore = set(), None # The pickled job is "run" as the command of the job, see worker # for the mechanism which unpickles the job and executes the # method. with jobStore.writeFileStream(rootJobGraph.jobStoreID, cleanup=True) as (fileHandle, fileStoreID): pickle.dump(self, fileHandle, pickle.HIGHEST_PROTOCOL) # Note that getUserScript() may have been overridden. This is intended. If we used # self.userModule directly, we'd be getting a reference to if the job was # specified as a function (as opposed to a class) since that is where FunctionWrappingJob # is defined. What we really want is the module that was loaded as __main__, # and FunctionWrappingJob overrides getUserScript() to give us just that. Only then can # filter_main() in _unpickle( ) do its job of resolving any user-defined type or function. userScript = self.getUserScript().globalize() jobsToJobGraphs[self].command = ' '.join(('_toil', fileStoreID) + userScript.toCommand()) #Update the status of the jobGraph on disk jobStore.update(jobsToJobGraphs[self]) def _serialiseServices(self, jobStore, jobGraph, rootJobGraph): """ Serialises the services for a job. """ def processService(serviceJob, depth): # Extend the depth of the services if necessary if depth == len([]) # Recursively call to process child services for childServiceJob in serviceJob.service._childServices: processService(childServiceJob, depth+1) # Make a job wrapper serviceJobGraph = serviceJob._createEmptyJobGraphForJob(jobStore, predecessorNumber=1) # Create the start and terminate flags. # We can't associate these with the job they belong to because # that job hasn't necessarily been saved yet. serviceJobGraph.startJobStoreID = jobStore.getEmptyFileStoreID() serviceJobGraph.terminateJobStoreID = jobStore.getEmptyFileStoreID() serviceJobGraph.errorJobStoreID = jobStore.getEmptyFileStoreID() assert jobStore.fileExists(serviceJobGraph.startJobStoreID) assert jobStore.fileExists(serviceJobGraph.terminateJobStoreID) assert jobStore.fileExists(serviceJobGraph.errorJobStoreID) # Create the service job tuple j = ServiceJobNode(jobStoreID=serviceJobGraph.jobStoreID, memory=serviceJobGraph.memory, cores=serviceJobGraph.cores, disk=serviceJobGraph.disk, preemptable=serviceJobGraph.preemptable, startJobStoreID=serviceJobGraph.startJobStoreID, terminateJobStoreID=serviceJobGraph.terminateJobStoreID, errorJobStoreID=serviceJobGraph.errorJobStoreID, jobName=serviceJobGraph.jobName, unitName=serviceJobGraph.unitName, command=serviceJobGraph.command, predecessorNumber=serviceJobGraph.predecessorNumber) # Add the service job tuple to the list of services to run[depth].append(j) # Break the links between the services to stop them being serialised together #childServices = serviceJob.service._childServices serviceJob.service._childServices = None assert serviceJob._services == [] #service = serviceJob.service # Pickle the job serviceJob.pickledService = pickle.dumps(serviceJob.service, protocol=pickle.HIGHEST_PROTOCOL) serviceJob.service = None # Serialise the service job and job wrapper serviceJob._serialiseJob(jobStore, { serviceJob:serviceJobGraph }, rootJobGraph) # Restore values #serviceJob.service = service #serviceJob.service._childServices = childServices for serviceJob in self._services: processService(serviceJob, 0) self._services = [] def _serialiseJobGraph(self, jobGraph, jobStore, returnValues, firstJob): """ Pickle the graph of jobs in the jobStore. The graph is not fully serialised \ until the jobGraph itself is written to disk, this is not performed by this \ function because of the need to coordinate this operation with other updates. \ """ #Check if the job graph has created #any cycles of dependencies or has multiple roots self.checkJobGraphForDeadlocks() #Create the jobGraphs for followOns/children with jobStore.batch(): jobsToJobGraphs = self._makeJobGraphs(jobGraph, jobStore) #Get an ordering on the jobs which we use for pickling the jobs in the #correct order to ensure the promises are properly established ordering = self.getTopologicalOrderingOfJobs() assert len(ordering) == len(jobsToJobGraphs) with jobStore.batch(): # Temporarily set the jobStore locators for the promise call back functions for job in ordering: job.prepareForPromiseRegistration(jobStore) def setForServices(serviceJob): serviceJob.prepareForPromiseRegistration(jobStore) for childServiceJob in serviceJob.service._childServices: setForServices(childServiceJob) for serviceJob in job._services: setForServices(serviceJob) ordering.reverse() assert self == ordering[-1] if firstJob: #If the first job we serialise all the jobs, including the root job for job in ordering: # Pickle the services for the job job._serialiseServices(jobStore, jobsToJobGraphs[job], jobGraph) # Now pickle the job job._serialiseJob(jobStore, jobsToJobGraphs, jobGraph) else: #We store the return values at this point, because if a return value #is a promise from another job, we need to register the promise #before we serialise the other jobs self._fulfillPromises(returnValues, jobStore) #Pickle the non-root jobs for job in ordering[:-1]: # Pickle the services for the job job._serialiseServices(jobStore, jobsToJobGraphs[job], jobGraph) # Pickle the job itself job._serialiseJob(jobStore, jobsToJobGraphs, jobGraph) # Pickle any services for the job self._serialiseServices(jobStore, jobGraph, jobGraph) def _serialiseFirstJob(self, jobStore): """ Serialises the root job. Returns the wrapping job. :param toil.jobStores.abstractJobStore.AbstractJobStore jobStore: """ # Check if the workflow root is a checkpoint but not a leaf vertex. # All other job vertices in the graph are checked by checkNewCheckpointsAreLeafVertices if self.checkpoint and not Job._isLeafVertex(self): raise JobGraphDeadlockException( 'New checkpoint job %s is not a leaf in the job graph' % self) # Create first jobGraph jobGraph = self._createEmptyJobGraphForJob(jobStore=jobStore, predecessorNumber=0) # Write the graph of jobs to disk self._serialiseJobGraph(jobGraph, jobStore, None, True) jobStore.update(jobGraph) # Store the name of the first job in a file in case of restart. Up to this point the # root job is not recoverable. FIXME: "root job" or "first job", which one is it? jobStore.setRootJob(jobGraph.jobStoreID) return jobGraph def _serialiseExistingJob(self, jobGraph, jobStore, returnValues): """ Serialise an existing job. """ self._serialiseJobGraph(jobGraph, jobStore, returnValues, False) #Drop the completed command, if not dropped already jobGraph.command = None #Merge any children (follow-ons) created in the initial serialisation #with children (follow-ons) created in the subsequent scale-up. assert len(jobGraph.stack) >= 4 combinedChildren = jobGraph.stack[-1] + jobGraph.stack[-3] combinedFollowOns = jobGraph.stack[-2] + jobGraph.stack[-4] jobGraph.stack = jobGraph.stack[:-4] if len(combinedFollowOns) > 0: jobGraph.stack.append(combinedFollowOns) if len(combinedChildren) > 0: jobGraph.stack.append(combinedChildren) #################################################### #Function which worker calls to ultimately invoke #a jobs method, and then handle created #children/followOn jobs #################################################### def _run(self, jobGraph, fileStore): return @contextmanager def _executor(self, jobGraph, stats, fileStore): """ This is the core wrapping method for running the job within a worker. It sets up the stats and logging before yielding. After completion of the body, the function will finish up the stats and logging, and starts the async update process for the job. """ if stats is not None: startTime = time.time() startClock = getTotalCpuTime() baseDir = os.getcwd() yield # If the job is not a checkpoint job, add the promise files to delete # to the list of jobStoreFileIDs to delete if not self.checkpoint: for jobStoreFileID in Promise.filesToDelete: # Make sure to wrap the job store ID in a FileID object so the file store will accept it # TODO: talk directly to the job sotre here instead. fileStore.deleteGlobalFile(FileID(jobStoreFileID, 0)) else: # Else copy them to the job wrapper to delete later jobGraph.checkpointFilesToDelete = list(Promise.filesToDelete) Promise.filesToDelete.clear() # Now indicate the asynchronous update of the job can happen fileStore.startCommit(jobState=True) # Change dir back to cwd dir, if changed by job (this is a safety issue) if os.getcwd() != baseDir: os.chdir(baseDir) # Finish up the stats if stats is not None: totalCpuTime, totalMemoryUsage = getTotalCpuTimeAndMemoryUsage() Expando( time=str(time.time() - startTime), clock=str(totalCpuTime - startClock), class_name=self._jobName(), memory=str(totalMemoryUsage) ) ) def _runner(self, jobGraph, jobStore, fileStore, defer): """ This method actually runs the job, and serialises the next jobs. :param class jobGraph: Instance of a jobGraph object :param class jobStore: Instance of the job store :param toil.fileStores.abstractFileStore.AbstractFileStore fileStore: Instance of a cached or uncached filestore :param defer: Function yielded by open() context manager of :class:`toil.DeferredFunctionManager`, which is called to register deferred functions. :return: """ # Make deferred function registration available during run(). self._defer = defer # Make fileStore available as an attribute during run() ... self._fileStore = fileStore # ... but also pass it to run() as an argument for backwards compatibility. returnValues = self._run(jobGraph, fileStore) # Clean up state changes made for run() self._defer = None self._fileStore = None # Serialize the new jobs defined by the run method to the jobStore self._serialiseExistingJob(jobGraph, jobStore, returnValues) def _jobName(self): """ :rtype : string, used as identifier of the job class in the stats report. """ return self.displayName
[docs]class JobException(Exception): """ General job exception. """
[docs] def __init__(self, message): super().__init__(message)
[docs]class JobGraphDeadlockException(JobException): """ An exception raised in the event that a workflow contains an unresolvable \ dependency, such as a cycle. See :func:`toil.job.Job.checkJobGraphForDeadlocks`. """
[docs] def __init__(self, string): super().__init__(string)
[docs]class FunctionWrappingJob(Job): """ Job used to wrap a function. In its `run` method the wrapped function is called. """
[docs] def __init__(self, userFunction, *args, **kwargs): """ :param callable userFunction: The function to wrap. It will be called with ``*args`` and ``**kwargs`` as arguments. The keywords ``memory``, ``cores``, ``disk``, ``preemptable`` and ``checkpoint`` are reserved keyword arguments that if specified will be used to determine the resources required for the job, as :func:`toil.job.Job.__init__`. If they are keyword arguments to the function they will be extracted from the function definition, but may be overridden by the user (as you would expect). """ # Use the user-specified requirements, if specified, else grab the default argument # from the function, if specified, else default to None if sys.version_info >= (3, 0): argSpec = inspect.getfullargspec(userFunction) else: argSpec = inspect.getargspec(userFunction) if argSpec.defaults is None: argDict = {} else: argDict = dict(list(zip(argSpec.args[-len(argSpec.defaults):], argSpec.defaults))) def resolve(key, default=None, dehumanize=False): try: # First, try constructor arguments, ... value = kwargs.pop(key) except KeyError: try: # ..., then try default value for function keyword arguments, ... value = argDict[key] except KeyError: # ... and finally fall back to a default value. value = default # Optionally, convert strings with metric or binary prefixes. if dehumanize and isinstance(value, string_types): value = human2bytes(value) return value Job.__init__(self, memory=resolve('memory', dehumanize=True), cores=resolve('cores', dehumanize=True), disk=resolve('disk', dehumanize=True), preemptable=resolve('preemptable'), checkpoint=resolve('checkpoint', default=False), unitName=resolve('name', default=None)) self.userFunctionModule = ModuleDescriptor.forModule(userFunction.__module__).globalize() self.userFunctionName = str(userFunction.__name__) self.jobName = self.userFunctionName self._args = args self._kwargs = kwargs
def _getUserFunction(self): logger.debug('Loading user function %s from module %s.', self.userFunctionName, self.userFunctionModule) userFunctionModule = self._loadUserModule(self.userFunctionModule) return getattr(userFunctionModule, self.userFunctionName)
[docs] def run(self,fileStore): userFunction = self._getUserFunction( ) return userFunction(*self._args, **self._kwargs)
def getUserScript(self): return self.userFunctionModule def _jobName(self): return ".".join((self.__class__.__name__,,self.userFunctionName))
[docs]class JobFunctionWrappingJob(FunctionWrappingJob): """ A job function is a function whose first argument is a :class:`.Job` instance that is the wrapping job for the function. This can be used to add successor jobs for the function and perform all the functions the :class:`.Job` class provides. To enable the job function to get access to the :class:`toil.fileStores.abstractFileStore.AbstractFileStore` instance (see :func:``), it is made a variable of the wrapping job called fileStore. To specify a job's resource requirements the following default keyword arguments can be specified: - memory - disk - cores For example to wrap a function into a job we would call:: Job.wrapJobFn(myJob, memory='100k', disk='1M', cores=0.1) """ @property def fileStore(self): return self._fileStore
[docs] def run(self, fileStore): userFunction = self._getUserFunction() rValue = userFunction(*((self,) + tuple(self._args)), **self._kwargs) return rValue
class PromisedRequirementFunctionWrappingJob(FunctionWrappingJob): """ Handles dynamic resource allocation using :class:`toil.job.Promise` instances. Spawns child function using parent function parameters and fulfilled promised resource requirements. """ def __init__(self, userFunction, *args, **kwargs): self._promisedKwargs = kwargs.copy() # Replace resource requirements in intermediate job with small values. kwargs.update(dict(disk='1M', memory='32M', cores=0.1)) super().__init__(userFunction, *args, **kwargs) @classmethod def create(cls, userFunction, *args, **kwargs): """ Creates an encapsulated Toil job function with unfulfilled promised resource requirements. After the promises are fulfilled, a child job function is created using updated resource values. The subgraph is encapsulated to ensure that this child job function is run before other children in the workflow. Otherwise, a different child may try to use an unresolved promise return value from the parent. """ return EncapsulatedJob(cls(userFunction, *args, **kwargs)) def run(self, fileStore): # Assumes promises are fulfilled when parent job is run self.evaluatePromisedRequirements() userFunction = self._getUserFunction() return self.addChildFn(userFunction, *self._args, **self._promisedKwargs).rv() def evaluatePromisedRequirements(self): requirements = ["disk", "memory", "cores"] # Fulfill resource requirement promises for requirement in requirements: try: if isinstance(self._promisedKwargs[requirement], PromisedRequirement): self._promisedKwargs[requirement] = self._promisedKwargs[requirement].getValue() except KeyError: pass class PromisedRequirementJobFunctionWrappingJob(PromisedRequirementFunctionWrappingJob): """ Handles dynamic resource allocation for job functions. See :class:`toil.job.JobFunctionWrappingJob` """ def run(self, fileStore): self.evaluatePromisedRequirements() userFunction = self._getUserFunction() return self.addChildJobFn(userFunction, *self._args, **self._promisedKwargs).rv()
[docs]class EncapsulatedJob(Job): """ A convenience Job class used to make a job subgraph appear to be a single job. Let A be the root job of a job subgraph and B be another job we'd like to run after A and all its successors have completed, for this use encapsulate:: # Job A and subgraph, Job B A, B = A(), B() A' = A.encapsulate() A'.addChild(B) # B will run after A and all its successors have completed, A and its subgraph of # successors in effect appear to be just one job. If the job being encapsulated has predecessors (e.g. is not the root job), then the encapsulated job will inherit these predecessors. If predecessors are added to the job being encapsulated after the encapsulated job is created then the encapsulating job will NOT inherit these predecessors automatically. Care should be exercised to ensure the encapsulated job has the proper set of predecessors. The return value of an encapsulatd job (as accessed by the :func:`toil.job.Job.rv` function) is the return value of the root job, e.g. A().encapsulate().rv() and A().rv() will resolve to the same value after A or A.encapsulate() has been run. """
[docs] def __init__(self, job): """ :param toil.job.Job job: the job to encapsulate. """ # Giving the root of the subgraph the same resources as the first job in the subgraph. Job.__init__(self, **job._requirements) # Ensure that the encapsulated job has the same direct predecessors as the job # being encapsulated. if job._directPredecessors: for job_ in job._directPredecessors: job_.addChild(self) self.encapsulatedJob = job Job.addChild(self, job) # Use small resource requirements for dummy Job instance. # But not too small, or the job won't have enough resources to safely start up Toil. self.encapsulatedFollowOn = Job(disk='100M', memory='512M', cores=0.1) Job.addFollowOn(self, self.encapsulatedFollowOn)
[docs] def addChild(self, childJob): return Job.addChild(self.encapsulatedFollowOn, childJob)
[docs] def addService(self, service, parentService=None): return Job.addService(self.encapsulatedFollowOn, service, parentService=parentService)
[docs] def addFollowOn(self, followOnJob): return Job.addFollowOn(self.encapsulatedFollowOn, followOnJob)
[docs] def rv(self, *path): return self.encapsulatedJob.rv(*path)
[docs] def prepareForPromiseRegistration(self, jobStore): super().prepareForPromiseRegistration(jobStore) self.encapsulatedJob.prepareForPromiseRegistration(jobStore)
def getUserScript(self): return self.encapsulatedJob.getUserScript()
class ServiceJobNode(JobNode): def __init__(self, jobStoreID, memory, cores, disk, preemptable, startJobStoreID, terminateJobStoreID, errorJobStoreID, unitName, jobName, command, predecessorNumber): requirements = dict(memory=memory, cores=cores, disk=disk, preemptable=preemptable) super().__init__(unitName=unitName, jobName=jobName, requirements=requirements, jobStoreID=jobStoreID, command=command, predecessorNumber=predecessorNumber) self.startJobStoreID = startJobStoreID self.terminateJobStoreID = terminateJobStoreID self.errorJobStoreID = errorJobStoreID class ServiceJob(Job): """ Job used to wrap a :class:`toil.job.Job.Service` instance. """ def __init__(self, service): """ This constructor should not be called by a user. :param service: The service to wrap in a job. :type service: toil.job.Job.Service """ Job.__init__(self, **service._requirements) # service.__module__ is the module defining the class service is an instance of. self.serviceModule = ModuleDescriptor.forModule(service.__module__).globalize() #The service to run - this will be replace before serialization with a pickled version self.service = service self.pickledService = None self.jobName = service.jobName # This references the parent job wrapper. It is initialised just before # the job is run. It is used to access the start and terminate flags. self.jobGraph = None @property def fileStore(self): return self._fileStore def run(self, fileStore): # Unpickle the service logger.debug('Loading service module %s.', self.serviceModule) userModule = self._loadUserModule(self.serviceModule) service = self._unpickle( userModule, BytesIO( self.pickledService ), fileStore.jobStore.config ) #Start the service startCredentials = service.start(self) try: #The start credentials must be communicated to processes connecting to #the service, to do this while the run method is running we #cheat and set the return value promise within the run method self._fulfillPromises(startCredentials, fileStore.jobStore) self._rvs = {} # Set this to avoid the return values being updated after the #run method has completed! #Now flag that the service is running jobs can connect to it logger.debug("Removing the start jobStoreID to indicate that establishment of the service") assert self.jobGraph.startJobStoreID != None if fileStore.jobStore.fileExists(self.jobGraph.startJobStoreID): fileStore.jobStore.deleteFile(self.jobGraph.startJobStoreID) assert not fileStore.jobStore.fileExists(self.jobGraph.startJobStoreID) #Now block until we are told to stop, which is indicated by the removal #of a file assert self.jobGraph.terminateJobStoreID != None while True: # Check for the terminate signal if not fileStore.jobStore.fileExists(self.jobGraph.terminateJobStoreID): logger.debug("Detected that the terminate jobStoreID has been removed so exiting") if not fileStore.jobStore.fileExists(self.jobGraph.errorJobStoreID): raise RuntimeError("Detected the error jobStoreID has been removed so exiting with an error") break # Check the service's status and exit if failed or complete try: if not service.check(): logger.debug("The service has finished okay, exiting") break except RuntimeError: logger.debug("Detected termination of the service") raise time.sleep(fileStore.jobStore.config.servicePollingInterval) #Avoid excessive polling # Remove link to the jobGraph self.jobGraph = None logger.debug("Service is done") finally: # The stop function is always called service.stop(self) def _run(self, jobGraph, fileStore): # Set the jobGraph for the job self.jobGraph = jobGraph #Run the job returnValues = assert jobGraph.stack == [] assert == [] # Unset the jobGraph for the job self.jobGraph = None # Set the stack to mimic what would be expected for a non-service job (this is a hack) jobGraph.stack = [[], []] return returnValues def getUserScript(self): return self.serviceModule
[docs]class Promise(object): """ References a return value from a :meth:`` or :meth:`toil.job.Job.Service.start` method as a *promise* before the method itself is run. Let T be a job. Instances of :class:`.Promise` (termed a *promise*) are returned by T.rv(), which is used to reference the return value of T's run function. When the promise is passed to the constructor (or as an argument to a wrapped function) of a different, successor job the promise will be replaced by the actual referenced return value. This mechanism allows a return values from one job's run method to be input argument to job before the former job's run function has been executed. """ _jobstore = None """ Caches the job store instance used during unpickling to prevent it from being instantiated for each promise :type: toil.jobStores.abstractJobStore.AbstractJobStore """ filesToDelete = set() """ A set of IDs of files containing promised values when we know we won't need them anymore """
[docs] def __init__(self, job, path): """ :param Job job: the job whose return value this promise references :param path: see :meth:`.Job.rv` """ self.job = job self.path = path
def __reduce__(self): """ Called during pickling when a promise (an instance of this class) is about to be be pickled. Returns the Promise class and construction arguments that will be evaluated during unpickling, namely the job store coordinates of a file that will hold the promised return value. By the time the promise is about to be unpickled, that file should be populated. """ # The allocation of the file in the job store is intentionally lazy, we only allocate an # empty file in the job store if the promise is actually being pickled. This is done so # that we do not allocate files for promises that are never used. jobStoreLocator, jobStoreFileID = self.job.registerPromise(self.path) # Returning a class object here causes the pickling machinery to attempt to instantiate # the class. We will catch that with __new__ and return an the actual return value instead. return self.__class__, (jobStoreLocator, jobStoreFileID) @staticmethod def __new__(cls, *args): assert len(args) == 2 if isinstance(args[0], Job): # Regular instantiation when promise is created, before it is being pickled return super().__new__(cls) else: # Attempted instantiation during unpickling, return promised value instead return cls._resolve(*args) @classmethod def _resolve(cls, jobStoreLocator, jobStoreFileID): # Initialize the cached job store if it was never initialized in the current process or # if it belongs to a different workflow that was run earlier in the current process. if cls._jobstore is None or cls._jobstore.config.jobStore != jobStoreLocator: cls._jobstore = Toil.resumeJobStore(jobStoreLocator) cls.filesToDelete.add(jobStoreFileID) with cls._jobstore.readFileStream(jobStoreFileID) as fileHandle: # If this doesn't work then the file containing the promise may not exist or be # corrupted value = safeUnpickleFromStream(fileHandle) return value
[docs]class PromisedRequirement(object):
[docs] def __init__(self, valueOrCallable, *args): """ Class for dynamically allocating job function resource requirements involving :class:`toil.job.Promise` instances. Use when resource requirements depend on the return value of a parent function. PromisedRequirements can be modified by passing a function that takes the :class:`.Promise` as input. For example, let f, g, and h be functions. Then a Toil workflow can be defined as follows:: A = Job.wrapFn(f) B = A.addChildFn(g, cores=PromisedRequirement(A.rv()) C = B.addChildFn(h, cores=PromisedRequirement(lambda x: 2*x, B.rv())) :param valueOrCallable: A single Promise instance or a function that takes \*args as input parameters. :param \*args: variable length argument list :type \*args: int or .Promise """ if hasattr(valueOrCallable, '__call__'): assert len(args) != 0, 'Need parameters for PromisedRequirement function.' func = valueOrCallable else: assert len(args) == 0, 'Define a PromisedRequirement function to handle multiple arguments.' func = lambda x: x args = [valueOrCallable] self._func = dill.dumps(func) self._args = list(args)
[docs] def getValue(self): """ Returns PromisedRequirement value """ func = dill.loads(self._func) return func(*self._args)
[docs] @staticmethod def convertPromises(kwargs): """ Returns True if reserved resource keyword is a Promise or PromisedRequirement instance. Converts Promise instance to PromisedRequirement. :param kwargs: function keyword arguments :return: bool """ for r in ["disk", "memory", "cores"]: if isinstance(kwargs.get(r), Promise): kwargs[r] = PromisedRequirement(kwargs[r]) return True elif isinstance(kwargs.get(r), PromisedRequirement): return True return False
class UnfulfilledPromiseSentinel(object): """This should be overwritten by a proper promised value. Throws an exception when unpickled.""" def __init__(self, fulfillingJobName, unpickled): self.fulfillingJobName = fulfillingJobName @staticmethod def __setstate__(stateDict): """Only called when unpickling. This won't be unpickled unless the promise wasn't resolved, so we throw an exception.""" jobName = stateDict['fulfillingJobName'] raise RuntimeError("This job was passed a promise that wasn't yet resolved when it " "ran. The job {jobName} that fulfills this promise hasn't yet " "finished. This means that there aren't enough constraints to " "ensure the current job always runs after {jobName}. Consider adding a " "follow-on indirection between this job and its parent, or adding " "this job as a child/follow-on of {jobName}.".format(jobName=jobName))