toil.leader

The leader script (of the leader/worker pair) for running jobs.

Attributes

logger

Classes

Leader

Represents the Toil leader.

Module Contents

toil.leader.logger
class toil.leader.Leader(config, batchSystem, provisioner, jobStore, rootJob, jobCache=None)[source]

Represents the Toil leader.

Responsible for determining what jobs are ready to be scheduled, by consulting the job store, and issuing them in the batch system.

Parameters:
config
jobStore
jobStoreLocator
toilState
batchSystem
issued_jobs_by_batch_system_id: dict[int, str]
preemptibleJobsIssued = 0
serviceJobsIssued = 0
serviceJobsToBeIssued: list[str] = []
preemptibleServiceJobsIssued = 0
preemptibleServiceJobsToBeIssued: list[str] = []
timeSinceJobsLastRescued = None
reissueMissingJobs_missingHash: dict[int, int]
provisioner
clusterScaler = None
serviceManager
statsAndLogging
potentialDeadlockedJobs: set[str]
potentialDeadlockTime = 0
toilMetrics: toil.common.ToilMetrics | None = None
debugJobNames = ('CWLJob', 'CWLWorkflow', 'CWLScatter', 'CWLGather', 'ResolveIndirect')
deadlockThrottler
statusThrottler
kill_throttler
progress_overall = None
progress_failed = None
GOOD_COLOR = (0, 60, 108)
BAD_COLOR = (253, 199, 0)
PROGRESS_BAR_FORMAT = '{desc}{desc_pad}{percentage:3.0f}%|{bar}| {count:{len_total}d}/{total:d} ({count_1:d} failures)...
recommended_fail_exit_code = 1
run()[source]

Run the leader process to issue and manage jobs.

Raises:

toil.exceptions.FailedJobsException if failed jobs remain after running.

Returns:

The return value of the root job’s run function.

Return type:

Any

create_status_sentinel_file(fail)[source]

Create a file in the jobstore indicating failure or success.

Parameters:

fail (bool)

Return type:

None

innerLoop()[source]

Process jobs.

This is the leader’s main loop.

checkForDeadlocks()[source]

Check if the system is deadlocked running service jobs.

feed_deadlock_watchdog()[source]

Note that progress has been made and any pending deadlock checks should be reset.

Return type:

None

issueJob(jobNode)[source]

Add a job to the queue of jobs currently trying to run.

Parameters:

jobNode (toil.job.JobDescription)

Return type:

None

issueJobs(jobs)[source]

Add a list of jobs, each represented as a jobNode object.

issueServiceJob(service_id)[source]

Issue a service job.

Put it on a queue if the maximum number of service jobs to be scheduled has been reached.

Parameters:

service_id (str)

Return type:

None

issueQueingServiceJobs()[source]

Issues any queuing service jobs up to the limit of the maximum allowed.

getNumberOfJobsIssued(preemptible=None)[source]

Get number of jobs that have been added by issueJob(s) and not removed by removeJob.

Parameters:

preemptible (Optional[bool]) – If none, return all types of jobs. If true, return just the number of preemptible jobs. If false, return just the number of non-preemptible jobs.

Return type:

int

removeJob(jobBatchSystemID)[source]

Remove a job from the system by batch system ID.

Returns:

Job description as it was issued.

Parameters:

jobBatchSystemID (int)

Return type:

toil.job.JobDescription

getJobs(preemptible=None)[source]

Get all issued jobs.

Parameters:

preemptible (Optional[bool]) – If specified, select only preemptible or only non-preemptible jobs.

Return type:

list[toil.job.JobDescription]

killJobs(jobsToKill, exit_reason=BatchJobExitReason.KILLED)[source]

Kills the given set of jobs and then sends them for processing.

Returns the jobs that, upon processing, were reissued.

Parameters:

exit_reason (toil.batchSystems.abstractBatchSystem.BatchJobExitReason)

reissueOverLongJobs()[source]

Check each issued job.

If a job is running for longer than desirable issue a kill instruction. Wait for the job to die then we pass the job to process_finished_job.

Return type:

None

reissueMissingJobs(killAfterNTimesMissing=3)[source]

Check all the current job ids are in the list of currently issued batch system jobs.

If a job is missing, we mark it as so, if it is missing for a number of runs of this function (say 10).. then we try deleting the job (though its probably lost), we wait then we pass the job to process_finished_job.

processRemovedJob(issuedJob, result_status)[source]
process_finished_job(batch_system_id, result_status, wall_time=None, exit_reason=None)[source]

Process finished jobs.

Called when an attempt to run a job finishes, either successfully or otherwise.

Takes the job out of the issued state, and then works out what to do about the fact that it succeeded or failed.

Returns:

True if the job is going to run again, and False if the job is fully done or completely failed.

Return type:

bool

process_finished_job_description(finished_job, result_status, wall_time=None, exit_reason=None, batch_system_id=None)[source]

Process a finished JobDescription based upon its success or failure.

If wall-clock time is available, informs the cluster scaler about the job finishing.

If the job failed and a batch system ID is available, checks for and reports batch system logs.

Checks if it succeeded and was removed, or if it failed and needs to be set up after failure, and dispatches to the appropriate function.

Returns:

True if the job is going to run again, and False if the job is fully done or completely failed.

Parameters:
Return type:

bool

getSuccessors(job_id, alreadySeenSuccessors)[source]

Get successors of the given job by walking the job graph recursively.

Parameters:
  • alreadySeenSuccessors (set[str]) – any successor seen here is ignored and not traversed.

  • job_id (str)

Returns:

The set of found successors. This set is added to alreadySeenSuccessors.

Return type:

set[str]

processTotallyFailedJob(job_id)[source]

Process a totally failed job.

Parameters:

job_id (str)

Return type:

None