toil.bus
¶
Message types and message bus for leader component coordination.
Historically, the Toil Leader has been organized around functions calling other functions to “handle” different things happening. Over time, it has become very brittle: exactly the right handling functions need to be called in exactly the right order, or it gets confused and does the wrong thing.
The MessageBus is meant to let the leader avoid this by more losely coupling its components together, by having them communicate by sending messages instead of by calling functions.
When events occur (like a job coming back from the batch system with a failed exit status), this will be translated into a message that will be sent to the bus. Then, all the leader components that need to react to this message in some way (by, say, decrementing the retry count) would listen for the relevant messages on the bus and react to them. If a new component needs to be added, it can be plugged into the message bus and receive and react to messages without interfering with existing components’ ability to react to the same messages.
Eventually, the different aspects of the Leader could become separate objects.
By default, messages stay entirely within the Toil leader process, and are not persisted anywhere, not even in the JobStore.
The Message Bus also provides an extension point: its messages can be serialized to a file by the leader (see the –writeMessages option), and they can then be decoded using MessageBus.scan_bus_messages() (as is done in the Toil WES server backend). By replaying the messages and tracking their effects on job state, you can get an up-to-date view of the state of the jobs in a workflow. This includes information, such as whether jobs are issued or running, or what jobs have completely finished, which is not persisted in the JobStore.
The MessageBus instance for the leader process is owned by the Toil leader, but the BatchSystem has an opportunity to connect to it, and can send (or listen for) messages. Right now the BatchSystem deos not have to send or receive any messages; the Leader is responsible for polling it via the BatchSystem API and generating the events. But a BatchSystem implementation may send additional events (like JobAnnotationMessage).
Currently, the MessageBus is implemented using pypubsub, and so messages are always handled in a single Thread, the Toil leader’s main loop thread. If other components send events, they will be shipped over to that thread inside the MessageBus. Communication between processes is allowed using MessageBus.connect_output_file() and MessageBus.scan_bus_messages().
Module Contents¶
Classes¶
Produced when a job is issued to run on the batch system. |
|
Produced when a job is "updated" and ready to have something happen to it. |
|
Produced when a job is completed, whether successful or not. |
|
Produced when a job is completely failed, and will not be retried again. |
|
Produced when a job goes missing and should be in the batch system but isn't. |
|
Produced when extra information (such as an AWS Batch job ID from the |
|
Produced when using a batch system, links toil assigned batch ID to |
|
Produced to describe the size of the queue of jobs issued but not yet |
|
Produced by the Toil-integrated autoscaler describe the number of |
|
Produced by the Toil-integrated autoscaler to describe the number of |
|
Holds messages that should cause jobs to change their scheduling states. |
|
Base class for clients (inboxes and outboxes) of a message bus. Handles |
|
A buffered connection to a message bus that lets us receive messages. |
|
A connection to a message bus that lets us publish messages. |
|
A two-way connection to a message bus. Buffers incoming messages until you |
|
Records the status of a job. |
Functions¶
|
Convert a plain-old-data named tuple into a byte string. |
|
Convert bytes from message_to_bytes back to a message of the given type. |
|
Replay all the messages and work out what they mean for jobs. |
Return a file path in tmp to store the message bus at. |
Attributes¶
- toil.bus.logger¶
- class toil.bus.JobIssuedMessage[source]¶
Bases:
digraph inheritance7975993079 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "JobIssuedMessage" [URL="#toil.bus.JobIssuedMessage",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Produced when a job is issued to run on the batch system."]; "NamedTuple" -> "JobIssuedMessage" [arrowsize=0.5,style="setlinewidth(0.5)"]; "NamedTuple" [URL="https://docs.python.org/3/library/typing.html#typing.NamedTuple",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top"]; }NamedTuple
Produced when a job is issued to run on the batch system.
- class toil.bus.JobUpdatedMessage[source]¶
Bases:
digraph inheritance83f193d04b { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "JobUpdatedMessage" [URL="#toil.bus.JobUpdatedMessage",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Produced when a job is \"updated\" and ready to have something happen to it."]; "NamedTuple" -> "JobUpdatedMessage" [arrowsize=0.5,style="setlinewidth(0.5)"]; "NamedTuple" [URL="https://docs.python.org/3/library/typing.html#typing.NamedTuple",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top"]; }NamedTuple
Produced when a job is “updated” and ready to have something happen to it.
- class toil.bus.JobCompletedMessage[source]¶
Bases:
digraph inheritancefdbd1038bf { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "JobCompletedMessage" [URL="#toil.bus.JobCompletedMessage",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Produced when a job is completed, whether successful or not."]; "NamedTuple" -> "JobCompletedMessage" [arrowsize=0.5,style="setlinewidth(0.5)"]; "NamedTuple" [URL="https://docs.python.org/3/library/typing.html#typing.NamedTuple",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top"]; }NamedTuple
Produced when a job is completed, whether successful or not.
- class toil.bus.JobFailedMessage[source]¶
Bases:
digraph inheritancee648f3f4c4 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "JobFailedMessage" [URL="#toil.bus.JobFailedMessage",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Produced when a job is completely failed, and will not be retried again."]; "NamedTuple" -> "JobFailedMessage" [arrowsize=0.5,style="setlinewidth(0.5)"]; "NamedTuple" [URL="https://docs.python.org/3/library/typing.html#typing.NamedTuple",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top"]; }NamedTuple
Produced when a job is completely failed, and will not be retried again.
- class toil.bus.JobMissingMessage[source]¶
Bases:
digraph inheritance314a5a3f82 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "JobMissingMessage" [URL="#toil.bus.JobMissingMessage",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Produced when a job goes missing and should be in the batch system but isn't."]; "NamedTuple" -> "JobMissingMessage" [arrowsize=0.5,style="setlinewidth(0.5)"]; "NamedTuple" [URL="https://docs.python.org/3/library/typing.html#typing.NamedTuple",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top"]; }NamedTuple
Produced when a job goes missing and should be in the batch system but isn’t.
- class toil.bus.JobAnnotationMessage[source]¶
Bases:
digraph inheritance43568001bc { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "JobAnnotationMessage" [URL="#toil.bus.JobAnnotationMessage",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Produced when extra information (such as an AWS Batch job ID from the"]; "NamedTuple" -> "JobAnnotationMessage" [arrowsize=0.5,style="setlinewidth(0.5)"]; "NamedTuple" [URL="https://docs.python.org/3/library/typing.html#typing.NamedTuple",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top"]; }NamedTuple
Produced when extra information (such as an AWS Batch job ID from the AWSBatchBatchSystem) is available that goes with a job.
- class toil.bus.ExternalBatchIdMessage[source]¶
Bases:
digraph inheritance5a3ccfccfa { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "ExternalBatchIdMessage" [URL="#toil.bus.ExternalBatchIdMessage",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Produced when using a batch system, links toil assigned batch ID to"]; "NamedTuple" -> "ExternalBatchIdMessage" [arrowsize=0.5,style="setlinewidth(0.5)"]; "NamedTuple" [URL="https://docs.python.org/3/library/typing.html#typing.NamedTuple",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top"]; }NamedTuple
Produced when using a batch system, links toil assigned batch ID to Batch system ID (Whatever’s returned by local implementation, PID, batch ID, etc)
- class toil.bus.QueueSizeMessage[source]¶
Bases:
digraph inheritanceecb9542e1c { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "NamedTuple" [URL="https://docs.python.org/3/library/typing.html#typing.NamedTuple",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top"]; "QueueSizeMessage" [URL="#toil.bus.QueueSizeMessage",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Produced to describe the size of the queue of jobs issued but not yet"]; "NamedTuple" -> "QueueSizeMessage" [arrowsize=0.5,style="setlinewidth(0.5)"]; }NamedTuple
Produced to describe the size of the queue of jobs issued but not yet completed. Theoretically recoverable from other messages.
- class toil.bus.ClusterSizeMessage[source]¶
Bases:
digraph inheritance838d1e1d5b { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "ClusterSizeMessage" [URL="#toil.bus.ClusterSizeMessage",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Produced by the Toil-integrated autoscaler describe the number of"]; "NamedTuple" -> "ClusterSizeMessage" [arrowsize=0.5,style="setlinewidth(0.5)"]; "NamedTuple" [URL="https://docs.python.org/3/library/typing.html#typing.NamedTuple",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top"]; }NamedTuple
Produced by the Toil-integrated autoscaler describe the number of instances of a certain type in a cluster.
- class toil.bus.ClusterDesiredSizeMessage[source]¶
Bases:
digraph inheritancea544fff786 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "ClusterDesiredSizeMessage" [URL="#toil.bus.ClusterDesiredSizeMessage",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Produced by the Toil-integrated autoscaler to describe the number of"]; "NamedTuple" -> "ClusterDesiredSizeMessage" [arrowsize=0.5,style="setlinewidth(0.5)"]; "NamedTuple" [URL="https://docs.python.org/3/library/typing.html#typing.NamedTuple",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top"]; }NamedTuple
Produced by the Toil-integrated autoscaler to describe the number of instances of a certain type that it thinks will be needed.
- toil.bus.message_to_bytes(message)[source]¶
Convert a plain-old-data named tuple into a byte string.
- Parameters:
message (NamedTuple) –
- Return type:
- toil.bus.MessageType¶
- toil.bus.bytes_to_message(message_type, data)[source]¶
Convert bytes from message_to_bytes back to a message of the given type.
- Parameters:
message_type (Type[MessageType]) –
data (bytes) –
- Return type:
MessageType
- class toil.bus.MessageBus[source]¶
Holds messages that should cause jobs to change their scheduling states. Messages are put in and buffered, and can be taken out and handled as batches when convenient.
All messages are NamedTuple objects of various subtypes.
Message order is guaranteed to be preserved within a type.
- MessageType¶
- publish(message)[source]¶
Put a message onto the bus. Can be called from any thread.
- Parameters:
message (Any) –
- Return type:
None
- check()[source]¶
If we are in the owning thread, deliver any messages that are in the queue for us. Must be called every once in a while in the main thread, possibly through inbox objects.
- Return type:
None
- subscribe(message_type, handler)[source]¶
Register the given callable to be called when messages of the given type are sent. It will be called with messages sent after the subscription is created. Returns a subscription object; when the subscription object is GC’d the subscription will end.
- Parameters:
message_type (Type[MessageType]) –
handler (Callable[[MessageType], Any]) –
- Return type:
pubsub.core.listener.Listener
- connect(wanted_types)[source]¶
Get a connection object that serves as an inbox for messages of the given types. Messages of those types will accumulate in the inbox until it is destroyed. You can check for them at any time.
- Parameters:
wanted_types (List[type]) –
- Return type:
- connect_output_file(file_path)[source]¶
Send copies of all messages to the given output file.
Returns connection data which must be kept alive for the connection to persist. That data is opaque: the user is not supposed to look at it or touch it or do anything with it other than store it somewhere or delete it.
- Parameters:
file_path (str) –
- Return type:
Any
- class toil.bus.MessageBusClient[source]¶
Base class for clients (inboxes and outboxes) of a message bus. Handles keeping a reference to the message bus.
- class toil.bus.MessageInbox[source]¶
Bases:
digraph inheritance0eff38df67 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "MessageBusClient" [URL="#toil.bus.MessageBusClient",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Base class for clients (inboxes and outboxes) of a message bus. Handles"]; "MessageInbox" [URL="#toil.bus.MessageInbox",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A buffered connection to a message bus that lets us receive messages."]; "MessageBusClient" -> "MessageInbox" [arrowsize=0.5,style="setlinewidth(0.5)"]; }MessageBusClient
A buffered connection to a message bus that lets us receive messages. Buffers incoming messages until you are ready for them. Does not preserve ordering between messages of different types.
- MessageType¶
- for_each(message_type)[source]¶
Loop over all messages currently pending of the given type. Each that is handled without raising an exception will be removed.
Messages sent while this function is running will not be yielded by the current call.
- Parameters:
message_type (Type[MessageType]) –
- Return type:
Iterator[MessageType]
- class toil.bus.MessageOutbox[source]¶
Bases:
digraph inheritancece522cefe4 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "MessageBusClient" [URL="#toil.bus.MessageBusClient",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Base class for clients (inboxes and outboxes) of a message bus. Handles"]; "MessageOutbox" [URL="#toil.bus.MessageOutbox",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A connection to a message bus that lets us publish messages."]; "MessageBusClient" -> "MessageOutbox" [arrowsize=0.5,style="setlinewidth(0.5)"]; }MessageBusClient
A connection to a message bus that lets us publish messages.
- class toil.bus.MessageBusConnection[source]¶
Bases:
digraph inheritancedb5f96743d { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "MessageBusClient" [URL="#toil.bus.MessageBusClient",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Base class for clients (inboxes and outboxes) of a message bus. Handles"]; "MessageBusConnection" [URL="#toil.bus.MessageBusConnection",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A two-way connection to a message bus. Buffers incoming messages until you"]; "MessageInbox" -> "MessageBusConnection" [arrowsize=0.5,style="setlinewidth(0.5)"]; "MessageOutbox" -> "MessageBusConnection" [arrowsize=0.5,style="setlinewidth(0.5)"]; "MessageInbox" [URL="#toil.bus.MessageInbox",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A buffered connection to a message bus that lets us receive messages."]; "MessageBusClient" -> "MessageInbox" [arrowsize=0.5,style="setlinewidth(0.5)"]; "MessageOutbox" [URL="#toil.bus.MessageOutbox",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A connection to a message bus that lets us publish messages."]; "MessageBusClient" -> "MessageOutbox" [arrowsize=0.5,style="setlinewidth(0.5)"]; }MessageInbox
,MessageOutbox
A two-way connection to a message bus. Buffers incoming messages until you are ready for them, and lets you send messages.
- toil.bus.replay_message_bus(path)[source]¶
Replay all the messages and work out what they mean for jobs.
We track the state and name of jobs here, by ID. We would use a list of two items but MyPy can’t understand a list of items of multiple types, so we need to define a new class.
Returns a dictionary from the job_id to a dataclass, JobStatus. A JobStatus contains information about a job which we have gathered from the message bus, including the job store id, name of the job the exit code, any associated annotations, the toil batch id the external batch id, and the batch system on which the job is running.