toil.lib.history

Contains tools for tracking history.

Attributes

logger

RT

Exceptions

HistoryDatabaseSchemaTooNewError

Raised when we would write to the history database, but its schema is too

Classes

WorkflowSummary

Data class holding summary information for a workflow.

WorkflowAttemptSummary

Data class holding summary information for a workflow attempt.

JobAttemptSummary

Data class holding summary information for a job attempt within a known

HistoryManager

Class responsible for managing the history of Toil runs.

Functions

db_retry(function)

Decorate a function with the appropriate retries for accessing the database.

Module Contents

toil.lib.history.logger
exception toil.lib.history.HistoryDatabaseSchemaTooNewError[source]

Bases: RuntimeError

Raised when we would write to the history database, but its schema is too new for us to understand.

class toil.lib.history.WorkflowSummary[source]

Data class holding summary information for a workflow.

Represents all the attempts to execute one run of a workflow.

id: str
name: str | None
job_store: str
total_attempts: int
total_job_attempts: int
succeeded: bool
start_time: float | None

Time when the first workflow attempt started, in seconds since epoch.

None if there are no attempts recorded.

runtime: float | None

Time from the first workflow attempt’s start to the last one’s end, in seconds.

None if there are no attempts recorded.

trs_spec: str | None
class toil.lib.history.WorkflowAttemptSummary[source]

Data class holding summary information for a workflow attempt.

Helpfully includes the workflow metadata for Dockstore.

workflow_id: str
attempt_number: int
succeeded: bool
start_time: float
runtime: float
submitted_to_dockstore: bool
batch_system: str | None
caching: bool | None
toil_version: str | None
python_version: str | None
platform_system: str | None
platform_machine: str | None
workflow_job_store: str
workflow_trs_spec: str | None
class toil.lib.history.JobAttemptSummary[source]

Data class holding summary information for a job attempt within a known workflow attempt.

id: str
job_name: str
succeeded: bool
start_time: float
runtime: float
submitted_to_dockstore: bool
cores: float | None
cpu_seconds: float | None
memory_bytes: int | None
disk_bytes: int | None
toil.lib.history.RT
toil.lib.history.db_retry(function)[source]

Decorate a function with the appropriate retries for accessing the database.

Parameters:

function (Callable[Ellipsis, RT])

Return type:

Callable[Ellipsis, RT]

class toil.lib.history.HistoryManager[source]

Class responsible for managing the history of Toil runs.

classmethod enabled()[source]

Return True if history should be read from and written to the database.

If False, no access at all shoulf be made to the database.

Return type:

bool

classmethod enabled_job()[source]

Return True if job history should be read from and written to the database.

Always returns False if enabled() returns False.

Return type:

bool

database_path_override: str | None = None
classmethod database_path()[source]

Get the path at which the database we store history in lives.

Return type:

str

classmethod connection()[source]

Connect to the history database.

Caller must not actually use the connection without using ensure_tables() to protect reads and updates.

Must be called from inside a top-level method marked @db_retry.

The connection will be in DEFERRED isolation_level, with autocommit off on Python versions that support it. In order to run any commands outside of a transaction use the no_transaction context manager.

Return type:

sqlite3.Connection

classmethod no_transaction(con)[source]

Temporarily disable the constant active transaction on the database connection, on Python versions where it exists.

Commits the current transaction.

Parameters:

con (sqlite3.Connection)

Return type:

Iterator[None]

classmethod ensure_tables(con, cur)[source]

Ensure that tables exist in the database and the schema is migrated to the current version.

Leaves the cursor in a transaction where the schema version is known to be correct.

Must be called from inside a top-level methodf marked @db_retry.

Raises:

HistoryDatabaseSchemaTooNewError – If the schema is newer than the current version.

Parameters:
Return type:

None

classmethod record_workflow_creation(workflow_id, job_store_spec)[source]

Record that a workflow is being run.

Takes the Toil config’s workflow ID and the location of the job store.

Should only be called on the first attempt on a job store, not on a restart.

A workflow may have multiple attempts to run it, some of which succeed and others of which fail. Probably only the last one should succeed.

Parameters:
  • job_store_spec (str) – The job store specifier for the workflow. Should be canonical and always start with the type and a colon. If the job store is later moved by the user, the location will not be updated.

  • workflow_id (str)

Return type:

None

classmethod record_workflow_metadata(workflow_id, workflow_name, trs_spec=None)[source]

Associate a name and optionally a TRS ID and version with a workflow run.

Parameters:
  • workflow_id (str)

  • workflow_name (str)

  • trs_spec (Optional[str])

Return type:

None

classmethod record_job_attempt(workflow_id, workflow_attempt_number, job_name, succeeded, start_time, runtime, cores=None, cpu_seconds=None, memory_bytes=None, disk_bytes=None)[source]

Record that a job ran in a workflow.

Doesn’t expect the provided information to uniquely identify the job attempt; assigns the job attempt its own unique ID.

Thread safe.

Parameters:
  • job_name (str) – A human-readable name for the job. Not expected to be a job store ID or to necessarily uniquely identify the job within the workflow.

  • start_time (float) – Job execution start time ins econds since epoch.

  • runtime (float) – Job execution duration in seconds.

  • cores (Optional[float]) – Number of CPU cores the job was scheduled on.

  • cpu_seconds (Optional[float]) – CPU core-seconds actually consumed.

  • memory_bytes (Optional[int]) – Peak observed job memory usage.

  • disk_bytes (Optional[int]) – Observed job disk usage.

  • workflow_id (str)

  • workflow_attempt_number (int)

  • succeeded (bool)

Return type:

None

classmethod record_workflow_attempt(workflow_id, workflow_attempt_number, succeeded, start_time, runtime, batch_system=None, caching=None, toil_version=None, python_version=None, platform_system=None, platform_machine=None)[source]

Record a workflow attempt (start or restart) having finished or failed.

Parameters:
  • batch_system (Optional[str]) – The Python type name of the batch system implementation used.

  • caching (Optional[bool]) – Whether Toil filestore-level caching was used.

  • toil_version (Optional[str]) – Version of Toil used to run the workflow.

  • python_version (Optional[str]) – Version of Python used to run the workflow.

  • platform_system (Optional[str]) – OS (“Darwin”, “Linux”, etc.) used to run the workflow.

  • platform_machine (Optional[str]) – CPU type (“AMD64”, etc.) used to run the workflow leader.

  • workflow_id (str)

  • workflow_attempt_number (int)

  • succeeded (bool)

  • start_time (float)

  • runtime (float)

Return type:

None

classmethod summarize_workflows()[source]

List all known workflows and their summary statistics.

Return type:

list[WorkflowSummary]

classmethod get_submittable_workflow_attempts(limit=sys.maxsize)[source]

List all workflow attempts not yet submitted to Dockstore.

Parameters:

limit (int) – Get no more than this many.

Return type:

list[WorkflowAttemptSummary]

classmethod get_workflow_attempts_with_submittable_job_attempts(limit=sys.maxsize)[source]

Get all workflow attempts that have job attempts not yet submitted to Dockstore.

The workflow attempts themselves will have finished and been recorded, and have TRS IDs.

Parameters:

limit (int) – Get no more than this many.

Return type:

list[WorkflowAttemptSummary]

classmethod get_workflow_attempt(workflow_id, attempt_number)[source]

Get a single (not necessarily unsubmitted, not necessarily TRS-ID-having) workflow attempt summary, if present.

Parameters:
  • workflow_id (str)

  • attempt_number (int)

Return type:

Optional[WorkflowAttemptSummary]

classmethod get_unsubmitted_job_attempts(workflow_id, attempt_number)[source]

List all job attempts in the given workflow attempt not yet submitted to Dockstore.

Doesn’t check to make sure the workflow has a TRS ID.

Parameters:
  • workflow_id (str)

  • attempt_number (int)

Return type:

list[JobAttemptSummary]

classmethod mark_workflow_attempt_submitted(workflow_id, attempt_number)[source]

Mark a workflow attempt as having been successfully submitted to Dockstore.

Does not mark the workflow attempt’s job attempts as submitted.

Parameters:
  • workflow_id (str)

  • attempt_number (int)

Return type:

None

classmethod mark_job_attempts_submitted(job_attempt_ids)[source]

Mark a collection of job attempts as submitted to Dockstore in a single transaction.

Parameters:

job_attempt_ids (list[str])

Return type:

None

classmethod count_workflows()[source]

Count workflows in the database.

Return type:

int

classmethod count_workflow_attempts()[source]

Count workflow attempts in the database.

Return type:

int

classmethod count_job_attempts()[source]

Count job attempts in the database.

Return type:

int

classmethod get_fully_submitted_workflow_ids(limit=sys.maxsize)[source]

Get workflows that have a successful attempt and no unsubmitted attempts or job attempts.

Parameters:

limit (int)

Return type:

list[str]

classmethod get_oldest_workflow_ids(limit=sys.maxsize)[source]

Get workflows that are old.

Parameters:

limit (int)

Return type:

list[str]

classmethod delete_workflow(workflow_id)[source]

Delete a workflow and all its attempts and job attempts.

Succeeds if the workflow does not exist.

Parameters:

workflow_id (str)

Return type:

None

classmethod get_database_byte_size()[source]

Get the total number of bytes used by the database.

Return type:

int

classmethod compact_database()[source]

Shrink the database to remove unused space.

Return type:

None

classmethod enforce_byte_size_limit(limit=100 * 1024 * 1024)[source]

Shrink the database until it is smaller than the given limit, or until it is empty, by throwing away workflows.

Throws data away in a sensible order, least important to most important.

Parameters:

limit (int)

Return type:

None

classmethod database_dump_lines()[source]

Yield lines from the database dump.

For debugging tests.

Return type:

Iterable[str]