toil.lib.pipes

Attributes

log

Classes

WritablePipe

An object-oriented wrapper for os.pipe. Clients should subclass it, implement

ReadablePipe

An object-oriented wrapper for os.pipe. Clients should subclass it, implement

ReadableTransformingPipe

A pipe which is constructed around a readable stream, and which provides a

HashingPipe

Class which checksums all the data read through it. If it

Module Contents

toil.lib.pipes.log
class toil.lib.pipes.WritablePipe(encoding=None, errors=None)

Bases: abc.ABC

An object-oriented wrapper for os.pipe. Clients should subclass it, implement readFrom() to consume the readable end of the pipe, then instantiate the class as a context manager to get the writable end. See the example below.

>>> import sys, shutil, codecs
>>> class MyPipe(WritablePipe):
...     def readFrom(self, readable):
...         shutil.copyfileobj(codecs.getreader('utf-8')(readable), sys.stdout)
>>> with MyPipe() as writable:
...     _ = writable.write('Hello, world!\n'.encode('utf-8'))
Hello, world!

Each instance of this class creates a thread and invokes the readFrom method in that thread. The thread will be join()ed upon normal exit from the context manager, i.e. the body of the with statement. If an exception occurs, the thread will not be joined but a well-behaved readFrom() implementation will terminate shortly thereafter due to the pipe having been closed.

Now, exceptions in the reader thread will be reraised in the main thread:

>>> class MyPipe(WritablePipe):
...     def readFrom(self, readable):
...         raise RuntimeError('Hello, world!')
>>> with MyPipe() as writable:
...     pass
Traceback (most recent call last):
...
RuntimeError: Hello, world!

More complicated, less illustrative tests:

Same as above, but proving that handles are closed:

>>> x = os.dup(0); os.close(x)
>>> class MyPipe(WritablePipe):
...     def readFrom(self, readable):
...         raise RuntimeError('Hello, world!')
>>> with MyPipe() as writable:
...     pass
Traceback (most recent call last):
...
RuntimeError: Hello, world!
>>> y = os.dup(0); os.close(y); x == y
True

Exceptions in the body of the with statement aren’t masked, and handles are closed:

>>> x = os.dup(0); os.close(x)
>>> class MyPipe(WritablePipe):
...     def readFrom(self, readable):
...         pass
>>> with MyPipe() as writable:
...     raise RuntimeError('Hello, world!')
Traceback (most recent call last):
...
RuntimeError: Hello, world!
>>> y = os.dup(0); os.close(y); x == y
True
Parameters:
  • encoding (str | None)

  • errors (str | None)

encoding: str | None = None
errors: str | None = None
readable_fh: int | None = None
writable: IO[Any] | None = None
thread: toil.lib.threading.ExceptionalThread | None = None
reader_done: bool = False
__enter__()
Return type:

IO[Any]

__exit__(exc_type, exc_val, exc_tb)
Parameters:
  • exc_type (str | None)

  • exc_val (str | None)

  • exc_tb (str | None)

Return type:

None

abstractmethod readFrom(readable)

Implement this method to read data from the pipe. This method should support both binary and text mode output.

Parameters:

readable (file) – the file object representing the readable end of the pipe. Do not explicitly invoke the close() method of the object; that will be done automatically.

Return type:

None

class toil.lib.pipes.ReadablePipe(encoding=None, errors=None)

Bases: abc.ABC

An object-oriented wrapper for os.pipe. Clients should subclass it, implement writeTo() to place data into the writable end of the pipe, then instantiate the class as a context manager to get the writable end. See the example below.

>>> import sys, shutil, codecs
>>> class MyPipe(ReadablePipe):
...     def writeTo(self, writable: IO[Any]) -> None:
...         writable.write('Hello, world!\n'.encode('utf-8'))
>>> with MyPipe() as readable:
...     shutil.copyfileobj(codecs.getreader('utf-8')(readable), sys.stdout)
Hello, world!

Each instance of this class creates a thread and invokes the writeTo() method in that thread. The thread will be join()ed upon normal exit from the context manager, i.e. the body of the with statement. If an exception occurs, the thread will not be joined but a well-behaved writeTo() implementation will terminate shortly thereafter due to the pipe having been closed.

Now, exceptions in the reader thread will be reraised in the main thread:

>>> class MyPipe(ReadablePipe):
...     def writeTo(self, writable):
...         raise RuntimeError('Hello, world!')
>>> with MyPipe() as readable:
...     pass
Traceback (most recent call last):
...
RuntimeError: Hello, world!

More complicated, less illustrative tests:

Same as above, but proving that handles are closed:

>>> x = os.dup(0); os.close(x)
>>> class MyPipe(ReadablePipe):
...     def writeTo(self, writable: IO[Any]) -> None:
...         raise RuntimeError('Hello, world!')
>>> with MyPipe() as readable:
...     pass
Traceback (most recent call last):
...
RuntimeError: Hello, world!
>>> y = os.dup(0); os.close(y); x == y
True

Exceptions in the body of the with statement aren’t masked, and handles are closed:

>>> x = os.dup(0); os.close(x)
>>> class MyPipe(ReadablePipe):
...     def writeTo(self, writable):
...         pass
>>> with MyPipe() as readable:
...     raise RuntimeError('Hello, world!')
Traceback (most recent call last):
...
RuntimeError: Hello, world!
>>> y = os.dup(0); os.close(y); x == y
True
Parameters:
  • encoding (str | None)

  • errors (str | None)

abstractmethod writeTo(writable)

Implement this method to write data from the pipe. This method should support both binary and text mode input.

Parameters:

writable (file) – the file object representing the writable end of the pipe. Do not explicitly invoke the close() method of the object, that will be done automatically.

Return type:

None

encoding: str | None = None
errors: str | None = None
writable_fh: int | None = None
readable: IO[Any] | None = None
thread: toil.lib.threading.ExceptionalThread | None = None
__enter__()
Return type:

IO[Any]

__exit__(exc_type, exc_val, exc_tb)
Parameters:
  • exc_type (str | None)

  • exc_val (str | None)

  • exc_tb (str | None)

Return type:

None

class toil.lib.pipes.ReadableTransformingPipe(source, encoding=None, errors=None)

Bases: ReadablePipe

A pipe which is constructed around a readable stream, and which provides a context manager that gives a readable stream.

Useful as a base class for pipes which have to transform or otherwise visit bytes that flow through them, instead of just consuming or producing data.

Clients should subclass it and implement transform(), like so:

>>> import sys, shutil, codecs
>>> class MyPipe(ReadableTransformingPipe):
...     def transform(self, readable, writable):
...         writable.write(readable.read().decode('utf-8').upper().encode('utf-8'))
>>> class SourcePipe(ReadablePipe):
...     def writeTo(self, writable):
...         writable.write('Hello, world!\n'.encode('utf-8'))
>>> with SourcePipe() as source:
...     with MyPipe(source) as transformed:
...         shutil.copyfileobj(codecs.getreader('utf-8')(transformed), sys.stdout)
HELLO, WORLD!

The transform() method runs in its own thread, and should move data chunk by chunk instead of all at once. It should finish normally if it encounters either an EOF on the readable, or a BrokenPipeError on the writable. This means that it should make sure to actually catch a BrokenPipeError when writing.

See also: toil.lib.misc.WriteWatchingStream.

Parameters:
  • source (IO[Any])

  • encoding (str | None)

  • errors (str | None)

source
abstractmethod transform(readable, writable)

Implement this method to ship data through the pipe.

Parameters:
  • readable (file) – the input stream file object to transform.

  • writable (file) – the file object representing the writable end of the pipe. Do not explicitly invoke the close() method of the object, that will be done automatically.

Return type:

None

writeTo(writable)

Implement this method to write data from the pipe. This method should support both binary and text mode input.

Parameters:

writable (file) – the file object representing the writable end of the pipe. Do not explicitly invoke the close() method of the object, that will be done automatically.

Return type:

None

class toil.lib.pipes.HashingPipe(source, encoding=None, errors=None, checksum_to_verify=None)

Bases: ReadableTransformingPipe

Class which checksums all the data read through it. If it reaches EOF and the checksum isn’t correct, raises ChecksumError.

Assumes info actually has a checksum.

Parameters:
  • source (IO[Any])

  • encoding (str | None)

  • errors (str | None)

  • checksum_to_verify (str | None)

checksum_to_verify = None
transform(readable, writable)

Implement this method to ship data through the pipe.

Parameters:
  • readable (file) – the input stream file object to transform.

  • writable (file) – the file object representing the writable end of the pipe. Do not explicitly invoke the close() method of the object, that will be done automatically.

Return type:

None