toil.deferred¶
Attributes¶
Classes¶
Provide a logger that logs over UDP to the leader. |
|
A path to a Python module decomposed into a namedtuple of three elements |
|
>>> from collections import defaultdict
|
|
Implements a deferred function system. Each Toil worker will have an |
Functions¶
|
Robustly tries to delete paths. |
Module Contents¶
- toil.deferred.robust_rmtree(path)[source]¶
Robustly tries to delete paths.
Continues silently if the path to be removed is already gone, or if it goes away while this function is executing.
May raise an error if a path changes between file and directory while the function is executing, or if a permission error is encountered.
- class toil.deferred.RealtimeLogger(batchSystem, level=defaultLevel)[source]¶
Provide a logger that logs over UDP to the leader.
To use in a Toil job, do:
>>> from toil.realtimeLogger import RealtimeLogger >>> RealtimeLogger.info("This logging message goes straight to the leader")
That’s all a user of Toil would need to do. On the leader, Job.Runner.startToil() automatically starts the UDP server by using an instance of this class as a context manager.
- Parameters:
batchSystem (toil.batchSystems.abstractBatchSystem.AbstractBatchSystem)
level (str)
- envPrefix = 'TOIL_RT_LOGGING_'¶
- defaultLevel = 'INFO'¶
- lock¶
- loggingServer = None¶
- serverThread = None¶
- initialized = 0¶
- logger = None¶
- classmethod getLogger()[source]¶
Get the logger that logs real-time to the leader.
Note that if the returned logger is used on the leader, you will see the message twice, since it still goes to the normal log handlers, too.
- Return type:
- __exit__(exc_type, exc_val, exc_tb)[source]¶
- Parameters:
exc_type (Optional[Type[BaseException]])
exc_val (Optional[BaseException])
exc_tb (Optional[types.TracebackType])
- Return type:
None
- class toil.deferred.ModuleDescriptor[source]¶
Bases:
namedtuple('ModuleDescriptor', ('dirPath','name','fromVirtualEnv'))A path to a Python module decomposed into a namedtuple of three elements
dirPath, the path to the directory that should be added to sys.path before importing the module,
moduleName, the fully qualified name of the module with leading package names separated by dot and
>>> import toil.resource >>> ModuleDescriptor.forModule('toil.resource') ModuleDescriptor(dirPath='/.../src', name='toil.resource', fromVirtualEnv=False)
>>> import subprocess, tempfile, os >>> dirPath = tempfile.mkdtemp() >>> path = os.path.join( dirPath, 'foo.py' ) >>> with open(path,'w') as f: ... _ = f.write('from toil.resource import ModuleDescriptor\n' ... 'print(ModuleDescriptor.forModule(__name__))') >>> subprocess.check_output([ sys.executable, path ]) b"ModuleDescriptor(dirPath='...', name='foo', fromVirtualEnv=False)\n"
>>> from shutil import rmtree >>> rmtree( dirPath )
Now test a collision. ‘collections’ is part of the standard library in Python 2 and 3. >>> dirPath = tempfile.mkdtemp() >>> path = os.path.join( dirPath, ‘collections.py’ ) >>> with open(path,’w’) as f: … _ = f.write(‘from toil.resource import ModuleDescriptorn’ … ‘ModuleDescriptor.forModule(__name__)’)
This should fail and return exit status 1 due to the collision with the built-in module: >>> subprocess.call([ sys.executable, path ]) 1
Clean up >>> rmtree( dirPath )
- classmethod forModule(name)[source]¶
Return an instance of this class representing the module of the given name.
If the given module name is “__main__”, it will be translated to the actual file name of the top-level script without the .py or .pyc extension. This method assumes that the module with the specified name has already been loaded.
- Parameters:
name (str)
- Return type:
- saveAsResourceTo(jobStore)[source]¶
Store the file containing this module–or even the Python package directory hierarchy containing that file–as a resource to the given job store and return the corresponding resource object. Should only be called on a leader node.
- Parameters:
- Return type:
- localize()[source]¶
Check if this module was saved as a resource.
If it was, return a new module descriptor that points to a local copy of that resource. Should only be called on a worker node. On the leader, this method returns this resource, i.e. self.
- Return type:
- load()[source]¶
- Return type:
Optional[types.ModuleType]
- toil.deferred.logger¶
- class toil.deferred.DeferredFunction[source]¶
Bases:
namedtuple('DeferredFunction','function args kwargs name module')>>> from collections import defaultdict >>> df = DeferredFunction.create(defaultdict, None, {'x':1}, y=2) >>> df DeferredFunction(defaultdict, ...) >>> df.invoke() == defaultdict(None, x=1, y=2) True
- classmethod create(function, *args, **kwargs)[source]¶
Capture the given callable and arguments as an instance of this class.
- __repr__¶
- class toil.deferred.DeferredFunctionManager(stateDirBase)[source]¶
Implements a deferred function system. Each Toil worker will have an instance of this class. When a job is executed, it will happen inside a context manager from this class. If the job registers any “deferred” functions, they will be executed when the context manager is exited.
If the Python process terminates before properly exiting the context manager and running the deferred functions, and some other worker process enters or exits the per-job context manager of this class at a later time, or when the DeferredFunctionManager is shut down on the worker, the earlier job’s deferred functions will be picked up and run.
Note that deferred function cleanup is on a best-effort basis, and deferred functions may end up getting executed multiple times.
Internally, deferred functions are serialized into files in the given directory, which are locked by the owning process.
If that process dies, other processes can detect that the files are able to be locked, and will take them over.
- Parameters:
stateDirBase (str)
- STATE_DIR_STEM = 'deferred'¶
- PREFIX = 'func'¶
- WIP_SUFFIX = '.tmp'¶
- __del__()[source]¶
Clean up our state on disk. We assume that the deferred functions we manage have all been executed, and none are currently recorded.