Source code for toil.test.src.resourceTest

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import os
import subprocess
import sys
import tempfile
from contextlib import contextmanager
from inspect import getsource
from io import BytesIO
from textwrap import dedent
from unittest.mock import MagicMock, patch
from zipfile import ZipFile

from toil import inVirtualEnv
from toil.resource import ModuleDescriptor, Resource, ResourceException
from toil.test import ToilTest
from toil.version import exactPython


[docs] @contextmanager def tempFileContaining(content, suffix=''): """ Write a file with the given contents, and keep it on disk as long as the context is active. :param str content: The contents of the file. :param str suffix: The extension to use for the temporary file. """ fd, path = tempfile.mkstemp(suffix=suffix) try: encoded = content.encode('utf-8') assert os.write(fd, encoded) == len(encoded) except: os.close(fd) raise else: os.close(fd) yield path finally: os.unlink(path)
[docs] class ResourceTest(ToilTest): """Test module descriptors and resources derived from them."""
[docs] def testStandAlone(self): self._testExternal(moduleName='userScript', pyFiles=('userScript.py', 'helper.py'))
[docs] def testPackage(self): self._testExternal(moduleName='foo.userScript', pyFiles=('foo/__init__.py', 'foo/userScript.py', 'foo/bar/__init__.py', 'foo/bar/helper.py'))
[docs] def testVirtualEnv(self): self._testExternal(moduleName='foo.userScript', virtualenv=True, pyFiles=('foo/__init__.py', 'foo/userScript.py', 'foo/bar/__init__.py', 'foo/bar/helper.py', 'de/pen/dency.py', 'de/__init__.py', 'de/pen/__init__.py'))
[docs] def testStandAloneInPackage(self): self.assertRaises(ResourceException, self._testExternal, moduleName='userScript', pyFiles=('__init__.py', 'userScript.py', 'helper.py'))
def _testExternal(self, moduleName, pyFiles, virtualenv=False): dirPath = self._createTempDir() if virtualenv: self.assertTrue(inVirtualEnv()) # --never-download prevents silent upgrades to pip, wheel and setuptools subprocess.check_call(['virtualenv', '--never-download', '--python', exactPython, dirPath]) sitePackages = os.path.join(dirPath, 'lib', exactPython, 'site-packages') # tuple assignment is necessary to make this line immediately precede the try: oldPrefix, sys.prefix, dirPath = sys.prefix, dirPath, sitePackages else: oldPrefix = None try: for relPath in pyFiles: path = os.path.join(dirPath, relPath) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'w') as f: f.write('pass\n') sys.path.append(dirPath) try: userScript = importlib.import_module(moduleName) try: self._test(userScript.__name__, expectedContents=pyFiles, allowExtraContents=True) finally: del userScript while moduleName: del sys.modules[moduleName] self.assertFalse(moduleName in sys.modules) moduleName = '.'.join(moduleName.split('.')[:-1]) finally: sys.path.remove(dirPath) finally: if oldPrefix: sys.prefix = oldPrefix
[docs] def testBuiltIn(self): # Create a ModuleDescriptor for the module containing ModuleDescriptor, i.e. toil.resource module_name = ModuleDescriptor.__module__ self.assertEqual(module_name, 'toil.resource') self._test(module_name, shouldBelongToToil=True)
def _test(self, module_name, shouldBelongToToil=False, expectedContents=None, allowExtraContents=True): module = ModuleDescriptor.forModule(module_name) # Assert basic attributes and properties self.assertEqual(module.belongsToToil, shouldBelongToToil) self.assertEqual(module.name, module_name) if shouldBelongToToil: self.assertTrue(module.dirPath.endswith('/src')) # Before the module is saved as a resource, localize() and globalize() are identity # methods. This should log.warnings. self.assertIs(module.localize(), module) self.assertIs(module.globalize(), module) # Create a mock job store ... jobStore = MagicMock() # ... to generate a fake URL for the resource ... url = 'file://foo.zip' jobStore.getSharedPublicUrl.return_value = url # ... and save the resource to it. resource = module.saveAsResourceTo(jobStore) # Ensure that the URL generation method is actually called, ... jobStore.getSharedPublicUrl.assert_called_once_with(sharedFileName=resource.pathHash) # ... and that ensure that write_shared_file_stream is called. jobStore.write_shared_file_stream.assert_called_once_with(shared_file_name=resource.pathHash, encrypted=False) # Now it gets a bit complicated: Ensure that the context manager returned by the # jobStore's write_shared_file_stream() method is entered and that the file handle yielded # by the context manager is written to once with the zipped source tree from which # 'toil.resource' was originally imported. Keep the zipped tree around such that we can # mock the download later. file_handle = jobStore.write_shared_file_stream.return_value.__enter__.return_value # The first 0 index selects the first call of write(), the second 0 selects positional # instead of keyword arguments, and the third 0 selects the first positional, i.e. the # contents. This is a bit brittle since it assumes that all the data is written in a # single call to write(). If more calls are made we can easily concatenate them. zipFile = file_handle.write.call_args_list[0][0][0] self.assertTrue(zipFile.startswith(b'PK')) # the magic header for ZIP files # Check contents if requested if expectedContents is not None: with ZipFile(BytesIO(zipFile)) as _zipFile: actualContents = set(_zipFile.namelist()) if allowExtraContents: self.assertTrue(actualContents.issuperset(expectedContents)) else: self.assertEqual(actualContents, expectedContents) self.assertEqual(resource.url, url) # Now we're on the worker. Prepare the storage for localized resources Resource.prepareSystem() try: # Register the resource for subsequent lookup. resource.register() # Lookup the resource and ensure that the result is equal to but not the same as the # original resource. Lookup will also be used when we localize the module that was # originally used to create the resource. localResource = Resource.lookup(module._resourcePath) self.assertEqual(resource, localResource) self.assertIsNot(resource, localResource) # Now show that we can localize the module using the registered resource. Set up a mock # urlopen() that yields the zipped tree ... mock_urlopen = MagicMock() mock_urlopen.return_value.read.return_value = zipFile with patch('toil.resource.urlopen', mock_urlopen): # ... and use it to download and unpack the resource localModule = module.localize() # The name should be equal between original and localized resource ... self.assertEqual(module.name, localModule.name) # ... but the directory should be different. self.assertNotEqual(module.dirPath, localModule.dirPath) # Show that we can 'undo' localization. This is necessary when the user script's jobs # are invoked on the worker where they generate more child jobs. self.assertEqual(localModule.globalize(), module) finally: Resource.cleanSystem()
[docs] def testNonPyStandAlone(self): """ Asserts that Toil enforces the user script to have a .py or .pyc extension because that's the only way auto-deployment can re-import the module on a worker. See https://github.com/BD2KGenomics/toil/issues/631 and https://github.com/BD2KGenomics/toil/issues/858 """ def script(): from configargparse import ArgumentParser from toil.common import Toil from toil.job import Job def fn(): pass if __name__ == '__main__': parser = ArgumentParser() Job.Runner.addToilOptions(parser) options = parser.parse_args() job = Job.wrapFn(fn, memory='10M', cores=0.1, disk='10M') with Toil(options) as toil: toil.start(job) scriptBody = dedent('\n'.join(getsource(script).split('\n')[1:])) shebang = '#! %s\n' % sys.executable with tempFileContaining(shebang + scriptBody) as scriptPath: self.assertFalse(scriptPath.endswith(('.py', '.pyc'))) os.chmod(scriptPath, 0o755) jobStorePath = scriptPath + '.jobStore' process = subprocess.Popen([scriptPath, jobStorePath], stderr=subprocess.PIPE) stdout, stderr = process.communicate() self.assertTrue('The name of a user script/module must end in .py or .pyc.' in stderr.decode('utf-8')) self.assertNotEqual(0, process.returncode) self.assertFalse(os.path.exists(jobStorePath))