toil.bus

Message types and message bus for leader component coordination.

Historically, the Toil Leader has been organized around functions calling other functions to “handle” different things happening. Over time, it has become very brittle: exactly the right handling functions need to be called in exactly the right order, or it gets confused and does the wrong thing.

The MessageBus is meant to let the leader avoid this by more losely coupling its components together, by having them communicate by sending messages instead of by calling functions.

When events occur (like a job coming back from the batch system with a failed exit status), this will be translated into a message that will be sent to the bus. Then, all the leader components that need to react to this message in some way (by, say, decrementing the retry count) would listen for the relevant messages on the bus and react to them. If a new component needs to be added, it can be plugged into the message bus and receive and react to messages without interfering with existing components’ ability to react to the same messages.

Eventually, the different aspects of the Leader could become separate objects.

By default, messages stay entirely within the Toil leader process, and are not persisted anywhere, not even in the JobStore.

The Message Bus also provides an extension point: its messages can be serialized to a file by the leader (see the –writeMessages option), and they can then be decoded using MessageBus.scan_bus_messages() (as is done in the Toil WES server backend). By replaying the messages and tracking their effects on job state, you can get an up-to-date view of the state of the jobs in a workflow. This includes information, such as whether jobs are issued or running, or what jobs have completely finished, which is not persisted in the JobStore.

The MessageBus instance for the leader process is owned by the Toil leader, but the BatchSystem has an opportunity to connect to it, and can send (or listen for) messages. Right now the BatchSystem deos not have to send or receive any messages; the Leader is responsible for polling it via the BatchSystem API and generating the events. But a BatchSystem implementation may send additional events (like JobAnnotationMessage).

Currently, the MessageBus is implemented using pypubsub, and so messages are always handled in a single Thread, the Toil leader’s main loop thread. If other components send events, they will be shipped over to that thread inside the MessageBus. Communication between processes is allowed using MessageBus.connect_output_file() and MessageBus.scan_bus_messages().

Module Contents

Classes

Names

Stores all the kinds of name a job can have.

JobIssuedMessage

Produced when a job is issued to run on the batch system.

JobUpdatedMessage

Produced when a job is "updated" and ready to have something happen to it.

JobCompletedMessage

Produced when a job is completed, whether successful or not.

JobFailedMessage

Produced when a job is completely failed, and will not be retried again.

JobMissingMessage

Produced when a job goes missing and should be in the batch system but isn't.

JobAnnotationMessage

Produced when extra information (such as an AWS Batch job ID from the

ExternalBatchIdMessage

Produced when using a batch system, links toil assigned batch ID to

QueueSizeMessage

Produced to describe the size of the queue of jobs issued but not yet

ClusterSizeMessage

Produced by the Toil-integrated autoscaler describe the number of

ClusterDesiredSizeMessage

Produced by the Toil-integrated autoscaler to describe the number of

MessageBus

Holds messages that should cause jobs to change their scheduling states.

MessageBusClient

Base class for clients (inboxes and outboxes) of a message bus. Handles

MessageInbox

A buffered connection to a message bus that lets us receive messages.

MessageOutbox

A connection to a message bus that lets us publish messages.

MessageBusConnection

A two-way connection to a message bus. Buffers incoming messages until you

JobStatus

Records the status of a job.

Functions

get_job_kind(names)

Return an identifying string for the job.

message_to_bytes(message)

Convert a plain-old-data named tuple into a byte string.

bytes_to_message(message_type, data)

Convert bytes from message_to_bytes back to a message of the given type.

replay_message_bus(path)

Replay all the messages and work out what they mean for jobs.

gen_message_bus_path()

Return a file path in tmp to store the message bus at.

Attributes

logger

MessageType

toil.bus.logger
class toil.bus.Names[source]

Bases: NamedTuple

Stores all the kinds of name a job can have.

job_name: str
unit_name: str
display_name: str
stats_name: str
job_store_id: str
toil.bus.get_job_kind(names)[source]

Return an identifying string for the job.

The result may contain spaces.

Returns: Either the unit name, job name, or display name, which identifies

the kind of job it is to toil. Otherwise “Unknown Job” in case no identifier is available

Parameters:

names (Names) –

Return type:

str

class toil.bus.JobIssuedMessage[source]

Bases: NamedTuple

Produced when a job is issued to run on the batch system.

job_type: str
job_id: str
toil_batch_id: int
class toil.bus.JobUpdatedMessage[source]

Bases: NamedTuple

Produced when a job is “updated” and ready to have something happen to it.

job_id: str
result_status: int
class toil.bus.JobCompletedMessage[source]

Bases: NamedTuple

Produced when a job is completed, whether successful or not.

job_type: str
job_id: str
exit_code: int
class toil.bus.JobFailedMessage[source]

Bases: NamedTuple

Produced when a job is completely failed, and will not be retried again.

job_type: str
job_id: str
class toil.bus.JobMissingMessage[source]

Bases: NamedTuple

Produced when a job goes missing and should be in the batch system but isn’t.

job_id: str
class toil.bus.JobAnnotationMessage[source]

Bases: NamedTuple

Produced when extra information (such as an AWS Batch job ID from the AWSBatchBatchSystem) is available that goes with a job.

job_id: str
annotation_name: str
annotation_value: str
class toil.bus.ExternalBatchIdMessage[source]

Bases: NamedTuple

Produced when using a batch system, links toil assigned batch ID to Batch system ID (Whatever’s returned by local implementation, PID, batch ID, etc)

toil_batch_id: int
external_batch_id: str
batch_system: str
class toil.bus.QueueSizeMessage[source]

Bases: NamedTuple

Produced to describe the size of the queue of jobs issued but not yet completed. Theoretically recoverable from other messages.

queue_size: int
class toil.bus.ClusterSizeMessage[source]

Bases: NamedTuple

Produced by the Toil-integrated autoscaler describe the number of instances of a certain type in a cluster.

instance_type: str
current_size: int
class toil.bus.ClusterDesiredSizeMessage[source]

Bases: NamedTuple

Produced by the Toil-integrated autoscaler to describe the number of instances of a certain type that it thinks will be needed.

instance_type: str
desired_size: int
toil.bus.message_to_bytes(message)[source]

Convert a plain-old-data named tuple into a byte string.

Parameters:

message (NamedTuple) –

Return type:

bytes

toil.bus.MessageType
toil.bus.bytes_to_message(message_type, data)[source]

Convert bytes from message_to_bytes back to a message of the given type.

Parameters:
  • message_type (Type[MessageType]) –

  • data (bytes) –

Return type:

MessageType

class toil.bus.MessageBus[source]

Holds messages that should cause jobs to change their scheduling states. Messages are put in and buffered, and can be taken out and handled as batches when convenient.

All messages are NamedTuple objects of various subtypes.

Message order is guaranteed to be preserved within a type.

MessageType
publish(message)[source]

Put a message onto the bus. Can be called from any thread.

Parameters:

message (Any) –

Return type:

None

check()[source]

If we are in the owning thread, deliver any messages that are in the queue for us. Must be called every once in a while in the main thread, possibly through inbox objects.

Return type:

None

subscribe(message_type, handler)[source]

Register the given callable to be called when messages of the given type are sent. It will be called with messages sent after the subscription is created. Returns a subscription object; when the subscription object is GC’d the subscription will end.

Parameters:
  • message_type (Type[MessageType]) –

  • handler (Callable[[MessageType], Any]) –

Return type:

pubsub.core.listener.Listener

connect(wanted_types)[source]

Get a connection object that serves as an inbox for messages of the given types. Messages of those types will accumulate in the inbox until it is destroyed. You can check for them at any time.

Parameters:

wanted_types (List[type]) –

Return type:

MessageBusConnection

outbox()[source]

Get a connection object that only allows sending messages.

Return type:

MessageOutbox

connect_output_file(file_path)[source]

Send copies of all messages to the given output file.

Returns connection data which must be kept alive for the connection to persist. That data is opaque: the user is not supposed to look at it or touch it or do anything with it other than store it somewhere or delete it.

Parameters:

file_path (str) –

Return type:

Any

classmethod scan_bus_messages(stream, message_types)[source]

Get an iterator over all messages in the given log stream of the given types, in order. Discard any trailing partial messages.

Parameters:
  • stream (IO[bytes]) –

  • message_types (List[Type[NamedTuple]]) –

Return type:

Iterator[Any]

class toil.bus.MessageBusClient[source]

Base class for clients (inboxes and outboxes) of a message bus. Handles keeping a reference to the message bus.

class toil.bus.MessageInbox[source]

Bases: MessageBusClient

A buffered connection to a message bus that lets us receive messages. Buffers incoming messages until you are ready for them. Does not preserve ordering between messages of different types.

MessageType
count(message_type)[source]

Get the number of pending messages of the given type.

Parameters:

message_type (type) –

Return type:

int

empty()[source]

Return True if no messages are pending, and false otherwise.

Return type:

bool

for_each(message_type)[source]

Loop over all messages currently pending of the given type. Each that is handled without raising an exception will be removed.

Messages sent while this function is running will not be yielded by the current call.

Parameters:

message_type (Type[MessageType]) –

Return type:

Iterator[MessageType]

class toil.bus.MessageOutbox[source]

Bases: MessageBusClient

A connection to a message bus that lets us publish messages.

publish(message)[source]

Publish the given message to the connected message bus.

We have this so you don’t need to store both the bus and your connection.

Parameters:

message (Any) –

Return type:

None

class toil.bus.MessageBusConnection[source]

Bases: MessageInbox, MessageOutbox

A two-way connection to a message bus. Buffers incoming messages until you are ready for them, and lets you send messages.

class toil.bus.JobStatus[source]

Records the status of a job.

job_store_id: str
name: str
exit_code: int
annotations: Dict[str, str]
toil_batch_id: int
external_batch_id: str
batch_system: str
__repr__()[source]

Return repr(self).

Return type:

str

toil.bus.replay_message_bus(path)[source]

Replay all the messages and work out what they mean for jobs.

We track the state and name of jobs here, by ID. We would use a list of two items but MyPy can’t understand a list of items of multiple types, so we need to define a new class.

Returns a dictionary from the job_id to a dataclass, JobStatus. A JobStatus contains information about a job which we have gathered from the message bus, including the job store id, name of the job the exit code, any associated annotations, the toil batch id the external batch id, and the batch system on which the job is running.

Parameters:

path (str) –

Return type:

Dict[str, JobStatus]

toil.bus.gen_message_bus_path()[source]

Return a file path in tmp to store the message bus at. Calling function is responsible for cleaning the generated file.

Return type:

str