toil.deferred

Attributes

logger

Classes

RealtimeLogger

Provide a logger that logs over UDP to the leader.

ModuleDescriptor

A path to a Python module decomposed into a namedtuple of three elements

DeferredFunction

>>> from collections import defaultdict

DeferredFunctionManager

Implements a deferred function system. Each Toil worker will have an

Functions

robust_rmtree(path)

Robustly tries to delete paths.

Module Contents

toil.deferred.robust_rmtree(path)[source]

Robustly tries to delete paths.

Continues silently if the path to be removed is already gone, or if it goes away while this function is executing.

May raise an error if a path changes between file and directory while the function is executing, or if a permission error is encountered.

Parameters:

path (Union[str, bytes])

Return type:

None

class toil.deferred.RealtimeLogger(batchSystem, level=defaultLevel)[source]

Provide a logger that logs over UDP to the leader.

To use in a Toil job, do:

>>> from toil.realtimeLogger import RealtimeLogger
>>> RealtimeLogger.info("This logging message goes straight to the leader")

That’s all a user of Toil would need to do. On the leader, Job.Runner.startToil() automatically starts the UDP server by using an instance of this class as a context manager.

Parameters:
envPrefix = 'TOIL_RT_LOGGING_'
defaultLevel = 'INFO'
lock
loggingServer = None
serverThread = None
initialized = 0
logger = None
classmethod getLogger()[source]

Get the logger that logs real-time to the leader.

Note that if the returned logger is used on the leader, you will see the message twice, since it still goes to the normal log handlers, too.

Return type:

logging.Logger

__enter__()[source]
Return type:

None

__exit__(exc_type, exc_val, exc_tb)[source]
Parameters:
Return type:

None

class toil.deferred.ModuleDescriptor[source]

Bases: namedtuple('ModuleDescriptor', ('dirPath', 'name', 'fromVirtualEnv'))

A path to a Python module decomposed into a namedtuple of three elements

  • dirPath, the path to the directory that should be added to sys.path before importing the module,

  • moduleName, the fully qualified name of the module with leading package names separated by dot and

>>> import toil.resource
>>> ModuleDescriptor.forModule('toil.resource') 
ModuleDescriptor(dirPath='/.../src', name='toil.resource', fromVirtualEnv=False)
>>> import subprocess, tempfile, os
>>> dirPath = tempfile.mkdtemp()
>>> path = os.path.join( dirPath, 'foo.py' )
>>> with open(path,'w') as f:
...     _ = f.write('from toil.resource import ModuleDescriptor\n'
...                 'print(ModuleDescriptor.forModule(__name__))')
>>> subprocess.check_output([ sys.executable, path ]) 
b"ModuleDescriptor(dirPath='...', name='foo', fromVirtualEnv=False)\n"
>>> from shutil import rmtree
>>> rmtree( dirPath )

Now test a collision. ‘collections’ is part of the standard library in Python 2 and 3. >>> dirPath = tempfile.mkdtemp() >>> path = os.path.join( dirPath, ‘collections.py’ ) >>> with open(path,’w’) as f: … _ = f.write(‘from toil.resource import ModuleDescriptorn’ … ‘ModuleDescriptor.forModule(__name__)’)

This should fail and return exit status 1 due to the collision with the built-in module: >>> subprocess.call([ sys.executable, path ]) 1

Clean up >>> rmtree( dirPath )

dirPath: str
name: str
classmethod forModule(name)[source]

Return an instance of this class representing the module of the given name.

If the given module name is “__main__”, it will be translated to the actual file name of the top-level script without the .py or .pyc extension. This method assumes that the module with the specified name has already been loaded.

Parameters:

name (str)

Return type:

ModuleDescriptor

property belongsToToil: bool

True if this module is part of the Toil distribution

Return type:

bool

saveAsResourceTo(jobStore)[source]

Store the file containing this module–or even the Python package directory hierarchy containing that file–as a resource to the given job store and return the corresponding resource object. Should only be called on a leader node.

Parameters:

jobStore (toil.jobStores.abstractJobStore.AbstractJobStore)

Return type:

Resource

localize()[source]

Check if this module was saved as a resource.

If it was, return a new module descriptor that points to a local copy of that resource. Should only be called on a worker node. On the leader, this method returns this resource, i.e. self.

Return type:

ModuleDescriptor

globalize()[source]

Reverse the effect of localize().

Return type:

ModuleDescriptor

toCommand()[source]
Return type:

Sequence[str]

classmethod fromCommand(command)[source]
Parameters:

command (Sequence[str])

Return type:

ModuleDescriptor

makeLoadable()[source]
Return type:

ModuleDescriptor

load()[source]
Return type:

Optional[types.ModuleType]

toil.deferred.logger
class toil.deferred.DeferredFunction[source]

Bases: namedtuple('DeferredFunction', 'function args kwargs name module')

>>> from collections import defaultdict
>>> df = DeferredFunction.create(defaultdict, None, {'x':1}, y=2)
>>> df
DeferredFunction(defaultdict, ...)
>>> df.invoke() == defaultdict(None, x=1, y=2)
True
classmethod create(function, *args, **kwargs)[source]

Capture the given callable and arguments as an instance of this class.

Parameters:
  • function (callable) – The deferred action to take in the form of a function

  • args (tuple) – Non-keyword arguments to the function

  • kwargs (dict) – Keyword arguments to the function

invoke()[source]

Invoke the captured function with the captured arguments.

__str__()[source]

Return str(self).

__repr__
class toil.deferred.DeferredFunctionManager(stateDirBase)[source]

Implements a deferred function system. Each Toil worker will have an instance of this class. When a job is executed, it will happen inside a context manager from this class. If the job registers any “deferred” functions, they will be executed when the context manager is exited.

If the Python process terminates before properly exiting the context manager and running the deferred functions, and some other worker process enters or exits the per-job context manager of this class at a later time, or when the DeferredFunctionManager is shut down on the worker, the earlier job’s deferred functions will be picked up and run.

Note that deferred function cleanup is on a best-effort basis, and deferred functions may end up getting executed multiple times.

Internally, deferred functions are serialized into files in the given directory, which are locked by the owning process.

If that process dies, other processes can detect that the files are able to be locked, and will take them over.

Parameters:

stateDirBase (str)

STATE_DIR_STEM = 'deferred'
PREFIX = 'func'
WIP_SUFFIX = '.tmp'
__del__()[source]

Clean up our state on disk. We assume that the deferred functions we manage have all been executed, and none are currently recorded.

open()[source]

Yields a single-argument function that allows for deferred functions of type toil.DeferredFunction to be registered. We use this design so deferred functions can be registered only inside this context manager.

Not thread safe.

classmethod cleanupWorker(stateDirBase)[source]

Called by the batch system when it shuts down the node, after all workers are done, if the batch system supports worker cleanup. Checks once more for orphaned deferred functions and runs them.

Parameters:

stateDirBase (str)

Return type:

None