toil.jobStores.utils¶
Attributes¶
Exceptions¶
Raised when a particular type of job store is requested but can't be used. |
Classes¶
A thread whose join() method re-raises exceptions raised during run(). While join() is |
|
An object-oriented wrapper for os.pipe. Clients should subclass it, implement |
|
An object-oriented wrapper for os.pipe. Clients should subclass it, implement |
|
A pipe which is constructed around a readable stream, and which provides a |
Functions¶
|
Generate a random locator for a job store of the given type. Raises an |
Module Contents¶
- class toil.jobStores.utils.ExceptionalThread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)[source]¶
Bases:
threading.ThreadA thread whose join() method re-raises exceptions raised during run(). While join() is idempotent, the exception is only during the first invocation of join() that successfully joined the thread. If join() times out, no exception will be re reraised even though an exception might already have occurred in run().
When subclassing this thread, override tryRun() instead of run().
>>> def f(): ... assert 0 >>> t = ExceptionalThread(target=f) >>> t.start() >>> t.join() Traceback (most recent call last): ... AssertionError
>>> class MyThread(ExceptionalThread): ... def tryRun( self ): ... assert 0 >>> t = MyThread() >>> t.start() >>> t.join() Traceback (most recent call last): ... AssertionError
- exc_info = None¶
- run()[source]¶
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- Return type:
None
- join(*args, **kwargs)[source]¶
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
- class toil.jobStores.utils.WritablePipe(encoding=None, errors=None)[source]¶
Bases:
abc.ABCAn object-oriented wrapper for os.pipe. Clients should subclass it, implement
readFrom()to consume the readable end of the pipe, then instantiate the class as a context manager to get the writable end. See the example below.>>> import sys, shutil >>> class MyPipe(WritablePipe): ... def readFrom(self, readable): ... shutil.copyfileobj(codecs.getreader('utf-8')(readable), sys.stdout) >>> with MyPipe() as writable: ... _ = writable.write('Hello, world!\n'.encode('utf-8')) Hello, world!
Each instance of this class creates a thread and invokes the readFrom method in that thread. The thread will be join()ed upon normal exit from the context manager, i.e. the body of the with statement. If an exception occurs, the thread will not be joined but a well-behaved
readFrom()implementation will terminate shortly thereafter due to the pipe having been closed.Now, exceptions in the reader thread will be reraised in the main thread:
>>> class MyPipe(WritablePipe): ... def readFrom(self, readable): ... raise RuntimeError('Hello, world!') >>> with MyPipe() as writable: ... pass Traceback (most recent call last): ... RuntimeError: Hello, world!
More complicated, less illustrative tests:
Same as above, but proving that handles are closed:
>>> x = os.dup(0); os.close(x) >>> class MyPipe(WritablePipe): ... def readFrom(self, readable): ... raise RuntimeError('Hello, world!') >>> with MyPipe() as writable: ... pass Traceback (most recent call last): ... RuntimeError: Hello, world! >>> y = os.dup(0); os.close(y); x == y True
Exceptions in the body of the with statement aren’t masked, and handles are closed:
>>> x = os.dup(0); os.close(x) >>> class MyPipe(WritablePipe): ... def readFrom(self, readable): ... pass >>> with MyPipe() as writable: ... raise RuntimeError('Hello, world!') Traceback (most recent call last): ... RuntimeError: Hello, world! >>> y = os.dup(0); os.close(y); x == y True
- abstract readFrom(readable)[source]¶
Implement this method to read data from the pipe. This method should support both binary and text mode output.
- Parameters:
readable (file) – the file object representing the readable end of the pipe. Do not explicitly invoke the close() method of the object, that will be done automatically.
- class toil.jobStores.utils.ReadablePipe(encoding=None, errors=None)[source]¶
Bases:
abc.ABCAn object-oriented wrapper for os.pipe. Clients should subclass it, implement
writeTo()to place data into the writable end of the pipe, then instantiate the class as a context manager to get the writable end. See the example below.>>> import sys, shutil >>> class MyPipe(ReadablePipe): ... def writeTo(self, writable): ... writable.write('Hello, world!\n'.encode('utf-8')) >>> with MyPipe() as readable: ... shutil.copyfileobj(codecs.getreader('utf-8')(readable), sys.stdout) Hello, world!
Each instance of this class creates a thread and invokes the
writeTo()method in that thread. The thread will be join()ed upon normal exit from the context manager, i.e. the body of the with statement. If an exception occurs, the thread will not be joined but a well-behavedwriteTo()implementation will terminate shortly thereafter due to the pipe having been closed.Now, exceptions in the reader thread will be reraised in the main thread:
>>> class MyPipe(ReadablePipe): ... def writeTo(self, writable): ... raise RuntimeError('Hello, world!') >>> with MyPipe() as readable: ... pass Traceback (most recent call last): ... RuntimeError: Hello, world!
More complicated, less illustrative tests:
Same as above, but proving that handles are closed:
>>> x = os.dup(0); os.close(x) >>> class MyPipe(ReadablePipe): ... def writeTo(self, writable): ... raise RuntimeError('Hello, world!') >>> with MyPipe() as readable: ... pass Traceback (most recent call last): ... RuntimeError: Hello, world! >>> y = os.dup(0); os.close(y); x == y True
Exceptions in the body of the with statement aren’t masked, and handles are closed:
>>> x = os.dup(0); os.close(x) >>> class MyPipe(ReadablePipe): ... def writeTo(self, writable): ... pass >>> with MyPipe() as readable: ... raise RuntimeError('Hello, world!') Traceback (most recent call last): ... RuntimeError: Hello, world! >>> y = os.dup(0); os.close(y); x == y True
- abstract writeTo(writable)[source]¶
Implement this method to write data from the pipe. This method should support both binary and text mode input.
- Parameters:
writable (file) – the file object representing the writable end of the pipe. Do not explicitly invoke the close() method of the object, that will be done automatically.
- class toil.jobStores.utils.ReadableTransformingPipe(source, encoding=None, errors=None)[source]¶
Bases:
ReadablePipeA pipe which is constructed around a readable stream, and which provides a context manager that gives a readable stream.
Useful as a base class for pipes which have to transform or otherwise visit bytes that flow through them, instead of just consuming or producing data.
Clients should subclass it and implement
transform(), like so:>>> import sys, shutil >>> class MyPipe(ReadableTransformingPipe): ... def transform(self, readable, writable): ... writable.write(readable.read().decode('utf-8').upper().encode('utf-8')) >>> class SourcePipe(ReadablePipe): ... def writeTo(self, writable): ... writable.write('Hello, world!\n'.encode('utf-8')) >>> with SourcePipe() as source: ... with MyPipe(source) as transformed: ... shutil.copyfileobj(codecs.getreader('utf-8')(transformed), sys.stdout) HELLO, WORLD!
The
transform()method runs in its own thread, and should move data chunk by chunk instead of all at once. It should finish normally if it encounters either an EOF on the readable, or aBrokenPipeErroron the writable. This means that it should make sure to actually catch aBrokenPipeErrorwhen writing.See also:
toil.lib.misc.WriteWatchingStream.- abstract transform(readable, writable)[source]¶
Implement this method to ship data through the pipe.
- Parameters:
readable (file) – the input stream file object to transform.
writable (file) – the file object representing the writable end of the pipe. Do not explicitly invoke the close() method of the object, that will be done automatically.
- writeTo(writable)[source]¶
Implement this method to write data from the pipe. This method should support both binary and text mode input.
- Parameters:
writable (file) – the file object representing the writable end of the pipe. Do not explicitly invoke the close() method of the object, that will be done automatically.
Bases:
RuntimeErrorRaised when a particular type of job store is requested but can’t be used.
- toil.jobStores.utils.generate_locator(job_store_type, local_suggestion=None, decoration=None)[source]¶
Generate a random locator for a job store of the given type. Raises an JobStoreUnavailableException if that job store cannot be used.
- Parameters:
- Return str:
Job store locator for a usable job store.
- Return type: