toil.bus¶
Message types and message bus for leader component coordination.
Historically, the Toil Leader has been organized around functions calling other functions to “handle” different things happening. Over time, it has become very brittle: exactly the right handling functions need to be called in exactly the right order, or it gets confused and does the wrong thing.
The MessageBus is meant to let the leader avoid this by more loosely coupling its components together, by having them communicate by sending messages instead of by calling functions.
When events occur (like a job coming back from the batch system with a failed exit status), this will be translated into a message that will be sent to the bus. Then, all the leader components that need to react to this message in some way (by, say, decrementing the retry count) would listen for the relevant messages on the bus and react to them. If a new component needs to be added, it can be plugged into the message bus and receive and react to messages without interfering with existing components’ ability to react to the same messages.
Eventually, the different aspects of the Leader could become separate objects.
By default, messages stay entirely within the Toil leader process, and are not persisted anywhere, not even in the JobStore.
The Message Bus also provides an extension point: its messages can be serialized to a file by the leader (see the –writeMessages option), and they can then be decoded using MessageBus.scan_bus_messages() (as is done in the Toil WES server backend). By replaying the messages and tracking their effects on job state, you can get an up-to-date view of the state of the jobs in a workflow. This includes information, such as whether jobs are issued or running, or what jobs have completely finished, which is not persisted in the JobStore.
The MessageBus instance for the leader process is owned by the Toil leader, but the BatchSystem has an opportunity to connect to it, and can send (or listen for) messages. Right now the BatchSystem deos not have to send or receive any messages; the Leader is responsible for polling it via the BatchSystem API and generating the events. But a BatchSystem implementation may send additional events (like JobAnnotationMessage).
Currently, the MessageBus is implemented using pypubsub, and so messages are always handled in a single Thread, the Toil leader’s main loop thread. If other components send events, they will be shipped over to that thread inside the MessageBus. Communication between processes is allowed using MessageBus.connect_output_file() and MessageBus.scan_bus_messages().
Attributes¶
Classes¶
Stores all the kinds of name a job can have. |
|
Produced when a job is issued to run on the batch system. |
|
Produced when a job is "updated" and ready to have something happen to it. |
|
Produced when a job is completed, whether successful or not. |
|
Produced when a job is completely failed, and will not be retried again. |
|
Produced when a job goes missing and should be in the batch system but isn't. |
|
Produced when extra information (such as an AWS Batch job ID from the |
|
Produced when using a batch system, links toil assigned batch ID to |
|
Produced to describe the size of the queue of jobs issued but not yet |
|
Produced by the Toil-integrated autoscaler describe the number of |
|
Produced by the Toil-integrated autoscaler to describe the number of |
|
Holds messages that should cause jobs to change their scheduling states. |
|
Base class for clients (inboxes and outboxes) of a message bus. Handles |
|
A buffered connection to a message bus that lets us receive messages. |
|
A connection to a message bus that lets us publish messages. |
|
A two-way connection to a message bus. Buffers incoming messages until you |
|
Records the status of a job. |
Functions¶
|
Return an identifying string for the job. |
|
Convert a plain-old-data named tuple into a byte string. |
|
Convert bytes from message_to_bytes back to a message of the given type. |
|
Replay all the messages and work out what they mean for jobs. |
|
Return a file path in tmp to store the message bus at. |
Module Contents¶
- toil.bus.logger¶
- toil.bus.get_job_kind(names)[source]¶
Return an identifying string for the job.
The result may contain spaces.
- Returns: Either the unit name, job name, or display name, which identifies
the kind of job it is to toil. Otherwise “Unknown Job” in case no identifier is available
- class toil.bus.JobIssuedMessage[source]¶
Bases:
NamedTuple
Produced when a job is issued to run on the batch system.
- class toil.bus.JobUpdatedMessage[source]¶
Bases:
NamedTuple
Produced when a job is “updated” and ready to have something happen to it.
- class toil.bus.JobCompletedMessage[source]¶
Bases:
NamedTuple
Produced when a job is completed, whether successful or not.
- class toil.bus.JobFailedMessage[source]¶
Bases:
NamedTuple
Produced when a job is completely failed, and will not be retried again.
- class toil.bus.JobMissingMessage[source]¶
Bases:
NamedTuple
Produced when a job goes missing and should be in the batch system but isn’t.
- class toil.bus.JobAnnotationMessage[source]¶
Bases:
NamedTuple
Produced when extra information (such as an AWS Batch job ID from the AWSBatchBatchSystem) is available that goes with a job.
- class toil.bus.ExternalBatchIdMessage[source]¶
Bases:
NamedTuple
Produced when using a batch system, links toil assigned batch ID to Batch system ID (Whatever’s returned by local implementation, PID, batch ID, etc)
- class toil.bus.QueueSizeMessage[source]¶
Bases:
NamedTuple
Produced to describe the size of the queue of jobs issued but not yet completed. Theoretically recoverable from other messages.
- class toil.bus.ClusterSizeMessage[source]¶
Bases:
NamedTuple
Produced by the Toil-integrated autoscaler describe the number of instances of a certain type in a cluster.
- class toil.bus.ClusterDesiredSizeMessage[source]¶
Bases:
NamedTuple
Produced by the Toil-integrated autoscaler to describe the number of instances of a certain type that it thinks will be needed.
- toil.bus.message_to_bytes(message)[source]¶
Convert a plain-old-data named tuple into a byte string.
- Parameters:
message (NamedTuple)
- Return type:
- toil.bus.MessageType¶
- toil.bus.bytes_to_message(message_type, data)[source]¶
Convert bytes from message_to_bytes back to a message of the given type.
- class toil.bus.MessageBus[source]¶
Holds messages that should cause jobs to change their scheduling states. Messages are put in and buffered, and can be taken out and handled as batches when convenient.
All messages are NamedTuple objects of various subtypes.
Message order is guaranteed to be preserved within a type.
- publish(message)[source]¶
Put a message onto the bus. Can be called from any thread.
- Parameters:
message (Any)
- Return type:
None
- check()[source]¶
If we are in the owning thread, deliver any messages that are in the queue for us. Must be called every once in a while in the main thread, possibly through inbox objects.
- Return type:
None
- MessageType¶
- subscribe(message_type, handler)[source]¶
Register the given callable to be called when messages of the given type are sent. It will be called with messages sent after the subscription is created. Returns a subscription object; when the subscription object is GC’d the subscription will end.
- Parameters:
message_type (type[MessageType])
handler (Callable[[MessageType], Any])
- Return type:
pubsub.core.listener.Listener
- connect(wanted_types)[source]¶
Get a connection object that serves as an inbox for messages of the given types. Messages of those types will accumulate in the inbox until it is destroyed. You can check for them at any time.
- Parameters:
- Return type:
- connect_output_file(file_path)[source]¶
Send copies of all messages to the given output file.
Returns connection data which must be kept alive for the connection to persist. That data is opaque: the user is not supposed to look at it or touch it or do anything with it other than store it somewhere or delete it.
- Parameters:
file_path (str)
- Return type:
Any
- class toil.bus.MessageBusClient[source]¶
Base class for clients (inboxes and outboxes) of a message bus. Handles keeping a reference to the message bus.
- class toil.bus.MessageInbox[source]¶
Bases:
MessageBusClient
A buffered connection to a message bus that lets us receive messages. Buffers incoming messages until you are ready for them. Does not preserve ordering between messages of different types.
- MessageType¶
- for_each(message_type)[source]¶
Loop over all messages currently pending of the given type. Each that is handled without raising an exception will be removed.
Messages sent while this function is running will not be yielded by the current call.
- Parameters:
message_type (type[MessageType])
- Return type:
collections.abc.Iterator[MessageType]
- class toil.bus.MessageOutbox[source]¶
Bases:
MessageBusClient
A connection to a message bus that lets us publish messages.
- class toil.bus.MessageBusConnection[source]¶
Bases:
MessageInbox
,MessageOutbox
A two-way connection to a message bus. Buffers incoming messages until you are ready for them, and lets you send messages.
- class toil.bus.JobStatus[source]¶
Records the status of a job.
When exit_code is -1, this means the job is either not observed or currently running.
- toil.bus.replay_message_bus(path)[source]¶
Replay all the messages and work out what they mean for jobs.
We track the state and name of jobs here, by ID. We would use a list of two items but MyPy can’t understand a list of items of multiple types, so we need to define a new class.
Returns a dictionary from the job_id to a dataclass, JobStatus. A JobStatus contains information about a job which we have gathered from the message bus, including the job store id, name of the job the exit code, any associated annotations, the toil batch id the external batch id, and the batch system on which the job is running.
- toil.bus.gen_message_bus_path(tmpdir=None)[source]¶
Return a file path in tmp to store the message bus at. Calling function is responsible for cleaning the generated file.
The tmpdir argument will override the directory that the message bus will be made in. If not provided, the standard tempfile order will be used.