Source code for toil.test.src.toilContextManagerTest
# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from toil.common import Toil, ToilContextManagerException
from toil.job import Job
from toil.test import ToilTest, get_temp_file, slow
[docs]
@slow
class ToilContextManagerTest(ToilTest):
[docs]
def setUp(self):
self.exportPath = get_temp_file()
[docs]
def tearDown(self):
os.remove(self.exportPath)
[docs]
def testContextManger(self):
options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
options.logLevel = 'INFO'
with Toil(options) as toil:
toil.start(HelloWorld())
[docs]
def testNoContextManger(self):
options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
options.logLevel = 'INFO'
toil = Toil(options)
self.assertRaises(ToilContextManagerException, toil.start, HelloWorld())
[docs]
def testExportAfterFailedExport(self):
options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
try:
with Toil(options) as toil:
_ = toil.start(HelloWorld())
# oh no, an error! :(
raise RuntimeError("we died after workflow completion but before our export finished")
except RuntimeError:
pass
options.restart = True
with Toil(options) as toil:
fileID = toil.restart()
print(fileID)
# Hopefully the error didn't cause us to lose all our work!
toil.exportFile(fileID, 'file://' + self.exportPath)
with open(self.exportPath) as f:
# The file should have all our content
self.assertEqual(f.read(), "Hello, World!")
[docs]
class HelloWorld(Job):
def __init__(self):
Job.__init__(self, memory=100000, disk='1M')
[docs]
def run(self, fileStore):
fileID = self.addChildJobFn(childFn, memory='1M', disk='1M').rv()
return self.addFollowOn(FollowOn(fileID)).rv()
[docs]
def childFn(job):
with job.fileStore.writeGlobalFileStream() as (fH, fileID):
fH.write(b"Hello, World!")
return fileID
[docs]
class FollowOn(Job):
def __init__(self, fileId):
Job.__init__(self)
self.fileId = fileId
[docs]
def run(self, fileStore):
tempDir = fileStore.getLocalTempDir()
tempFilePath = "/".join([tempDir, 'LocalCopy'])
with fileStore.readGlobalFileStream(self.fileId) as globalFile:
with open(tempFilePath, "wb") as localFile:
localFile.write(globalFile.read())
return self.fileId