Source code for toil.test.lib.test_history

# Copyright (C) 2015-2025 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import logging
import pytest
import time
from toil.test import ToilTest

from toil.lib.history import HistoryManager

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

[docs] class HistoryTest(ToilTest): """ Tests for Toil history tracking. Each test gets its own history database. """
[docs] def setUp(self) -> None: super().setUp() # Apply a temp dir override to history tracking temp_dir = self._createTempDir() HistoryManager.database_path_override = os.path.join(temp_dir, "test-db.sqlite") # Flag on job history tracking self.original_flag = HistoryManager.JOB_HISTORY_ENABLED HistoryManager.JOB_HISTORY_ENABLED = True
[docs] def tearDown(self) -> None: # Remove the temp dir override from history tracking HistoryManager.database_path_override = None # Restore job history tracking flag HistoryManager.JOB_HISTORY_ENABLED = self.original_flag super().tearDown()
[docs] def make_fake_workflow(self, workflow_id: str) -> None: # Make a fake workflow workflow_jobstore_spec = "file:/tmp/tree" HistoryManager.record_workflow_creation(workflow_id, workflow_jobstore_spec) workflow_name = "SuperCoolWF" workflow_trs_spec = "#wf:v1" HistoryManager.record_workflow_metadata(workflow_id, workflow_name, workflow_trs_spec) # Give it a job workflow_attempt_number = 1 job_name = "DoThing" succeeded = True start_time = time.time() runtime = 0.1 HistoryManager.record_job_attempt( workflow_id, workflow_attempt_number, job_name, succeeded, start_time, runtime, ) # Give it a workflow attempt with the same details. HistoryManager.record_workflow_attempt( workflow_id, workflow_attempt_number, succeeded, start_time, runtime, )
[docs] def test_history_submittable_detection(self) -> None: """ Make sure that a submittable workflow shows up as such before submission and doesn't afterward. """ workflow_id = "123" self.make_fake_workflow(workflow_id) workflow_attempt_number = 1 # Make sure we have data self.assertEqual(HistoryManager.count_workflows(), 1) self.assertEqual(HistoryManager.count_workflow_attempts(), 1) self.assertEqual(HistoryManager.count_job_attempts(), 1) # Make sure we see it as submittable submittable_workflow_attempts = HistoryManager.get_submittable_workflow_attempts() self.assertEqual(len(submittable_workflow_attempts), 1) # Make sure we see its jobs as submittable with_submittable_job_attempts = HistoryManager.get_workflow_attempts_with_submittable_job_attempts() self.assertEqual(len(with_submittable_job_attempts), 1) # Make sure we actually see the job submittable_job_attempts = HistoryManager.get_unsubmitted_job_attempts(workflow_id, workflow_attempt_number) self.assertEqual(len(submittable_job_attempts), 1) # Pretend we submitted them. HistoryManager.mark_job_attempts_submitted([j.id for j in submittable_job_attempts]) HistoryManager.mark_workflow_attempt_submitted(workflow_id, workflow_attempt_number) # Make sure they are no longer matching self.assertEqual(len(HistoryManager.get_submittable_workflow_attempts()), 0) self.assertEqual(len(HistoryManager.get_workflow_attempts_with_submittable_job_attempts()), 0) self.assertEqual(len(HistoryManager.get_unsubmitted_job_attempts(workflow_id, workflow_attempt_number)), 0) # Make sure we still have data self.assertEqual(HistoryManager.count_workflows(), 1) self.assertEqual(HistoryManager.count_workflow_attempts(), 1) self.assertEqual(HistoryManager.count_job_attempts(), 1)
[docs] def test_history_deletion(self) -> None: workflow_id = "123" self.make_fake_workflow(workflow_id) workflow_attempt_number = 1 # Make sure we can see the workflow for deletion by age but not by done-ness self.assertEqual(len(HistoryManager.get_oldest_workflow_ids()), 1) self.assertEqual(len(HistoryManager.get_fully_submitted_workflow_ids()), 0) # Pretend we submitted the workflow. HistoryManager.mark_job_attempts_submitted([j.id for j in HistoryManager.get_unsubmitted_job_attempts(workflow_id, workflow_attempt_number)]) HistoryManager.mark_workflow_attempt_submitted(workflow_id, workflow_attempt_number) # Make sure we can see the workflow for deletion by done-ness self.assertEqual(len(HistoryManager.get_fully_submitted_workflow_ids()), 1) # Add a new workflow other_workflow_id = "456" self.make_fake_workflow(other_workflow_id) # Make sure we can see the both for deletion by age but only one by done-ness self.assertEqual(len(HistoryManager.get_oldest_workflow_ids()), 2) self.assertEqual(len(HistoryManager.get_fully_submitted_workflow_ids()), 1) # Make sure the older workflow is first. self.assertEqual(HistoryManager.get_oldest_workflow_ids(), [workflow_id, other_workflow_id]) # Delete the new workflow HistoryManager.delete_workflow(other_workflow_id) # Make sure we can see the old one self.assertEqual(HistoryManager.get_oldest_workflow_ids(), [workflow_id]) self.assertEqual(HistoryManager.get_fully_submitted_workflow_ids(), [workflow_id]) # Delete the old workflow HistoryManager.delete_workflow(workflow_id) # Make sure we have no data self.assertEqual(HistoryManager.count_workflows(), 0) self.assertEqual(HistoryManager.count_workflow_attempts(), 0) self.assertEqual(HistoryManager.count_job_attempts(), 0)
[docs] def test_history_size_limit(self) -> None: """ Make sure the database size can be controlled. """ for workflow_id in ("WorkflowThatTakesUpSomeSpace,ActuallyMoreThanTheLaterOnesTake" + str(i) for i in range(10)): self.make_fake_workflow(workflow_id) # We should see the workflows. self.assertEqual(HistoryManager.count_workflows(), 10) # And they take up space. small_size = HistoryManager.get_database_byte_size() self.assertGreater(small_size, 0) # Add a bunch more for workflow_id in ("WorkflowThatTakesUpSpace" + str(i) for i in range(50)): self.make_fake_workflow(workflow_id) # We should see that this is now a much larger database large_size = HistoryManager.get_database_byte_size() logger.info("Increased database size from %s to %s", small_size, large_size) self.assertGreater(large_size, small_size) # We should be able to shrink it back down HistoryManager.enforce_byte_size_limit(small_size) reduced_size = HistoryManager.get_database_byte_size() logger.info("Decreased database size from %s to %s", large_size, reduced_size) # The database should be small enough self.assertLessEqual(reduced_size, small_size) # There should still be some workflow attempts left in the smaller database (though probably not the first ones) remaining_workflows = HistoryManager.count_workflows() logger.info("Still have %s workflows", remaining_workflows) self.assertGreater(remaining_workflows, 0)