import os
import shutil
import subprocess
import tempfile
from typing import List
import unittest
import uuid
import zipfile
from urllib.request import urlretrieve
from toil.test import ToilTest, needs_docker, needs_java, slow
from toil.version import exactPython
from toil.wdl.utils import get_analyzer
from toil.wdl.wdl_functions import (basename,
glob,
parse_cores,
parse_disk,
parse_memory,
process_infile,
read_csv,
read_tsv,
select_first,
size)
[docs]class BaseToilWdlTest(ToilTest):
"""Base test class for WDL tests"""
[docs] def setUp(self) -> None:
"""Runs anew before each test to create farm fresh temp dirs."""
self.output_dir = os.path.join('/tmp/', 'toil-wdl-test-' + str(uuid.uuid4()))
os.makedirs(self.output_dir)
[docs] def tearDown(self) -> None:
if os.path.exists(self.output_dir):
shutil.rmtree(self.output_dir)
[docs] @classmethod
def setUpClass(cls) -> None:
"""Runs once for all tests."""
super(BaseToilWdlTest, cls).setUpClass()
cls.base_command = [exactPython, os.path.abspath("src/toil/wdl/toilwdl.py")]
[docs]class ToilWdlTest(BaseToilWdlTest):
"""
General tests for Toil WDL
"""
[docs] @needs_docker
def testMD5sum(self):
"""Test if toilwdl produces the same outputs as known good outputs for WDL's
GATK tutorial #1."""
wdl = os.path.abspath('src/toil/test/wdl/md5sum/md5sum.wdl')
inputfile = os.path.abspath('src/toil/test/wdl/md5sum/md5sum.input')
json = os.path.abspath('src/toil/test/wdl/md5sum/md5sum.json')
subprocess.check_call(self.base_command + [wdl, json, '-o', self.output_dir, '--logDebug'])
md5sum_output = os.path.join(self.output_dir, 'md5sum.txt')
assert os.path.exists(md5sum_output)
os.unlink(md5sum_output)
[docs]class ToilWDLLibraryTest(BaseToilWdlTest):
"""
Test class for WDL standard functions.
"""
# estimated run time <1 sec
[docs] def testFn_SelectFirst(self):
"""Test the wdl built-in functional equivalent of 'select_first()',
which returns the first value in a list that is not None."""
assert select_first(['somestring', 'anotherstring', None, '', 1]) == 'somestring'
assert select_first([None, '', 1, 'somestring']) == 1
assert select_first([2, 1, '', 'somestring', None, '']) == 2
assert select_first(['', 2, 1, 'somestring', None, '']) == 2
# estimated run time <1 sec
[docs] def testFn_Size(self) -> None:
"""Test the wdl built-in functional equivalent of 'size()',
which returns a file's size based on the path."""
from toil.common import Toil
from toil.job import Job
from toil.wdl.wdl_types import WDLFile
options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
options.clean = 'always'
with Toil(options) as toil:
small = process_infile(WDLFile(file_path=os.path.abspath('src/toil/test/wdl/testfiles/vocab.wdl')), toil)
small_file = size(small)
assert small_file >= 1800, small_file
# estimated run time <1 sec
[docs] def testFn_Basename(self):
assert basename('/home/quokka/git/delete/toil/src/toil/wdl/toilwdl.py', '.py') == 'toilwdl'
assert basename('/home/quokka/git/delete/toil/src/toil/wdl/toilwdl.py') == 'toilwdl.py'
assert basename('toilwdl.py', '.py') == 'toilwdl'
assert basename('toilwdl.py') == 'toilwdl.py'
# estimated run time <1 sec
[docs] def testFn_Glob(self):
"""Test the wdl built-in functional equivalent of 'glob()',
which finds all files with a pattern in a directory."""
vocab_location = glob('vocab.wdl', os.path.abspath('src/toil'))
assert vocab_location == [os.path.abspath('src/toil/test/wdl/testfiles/vocab.wdl')], str(vocab_location)
wdl_locations = glob('wdl_*.py', os.path.abspath('src/toil'))
wdl_that_should_exist = [os.path.abspath('src/toil/wdl/wdl_analysis.py'),
os.path.abspath('src/toil/wdl/wdl_synthesis.py'),
os.path.abspath('src/toil/wdl/wdl_types.py'),
os.path.abspath('src/toil/wdl/wdl_functions.py')]
# make sure the files match the expected files
for location in wdl_that_should_exist:
assert location in wdl_locations, f'{str(location)} not in {str(wdl_locations)}!'
# make sure the same number of files were found as expected
assert len(wdl_that_should_exist) == len(wdl_locations), f'{str(len(wdl_locations))} != {str(len(wdl_that_should_exist))}'
# estimated run time <1 sec
[docs] def testFn_ParseMemory(self):
"""Test the wdl built-in functional equivalent of 'parse_memory()',
which parses a specified memory input to an int output.
The input can be a string or an int or a float and may include units
such as 'Gb' or 'mib' as a separate argument."""
assert parse_memory(2147483648) == 2147483648, str(parse_memory(2147483648))
assert parse_memory('2147483648') == 2147483648, str(parse_memory(2147483648))
assert parse_memory('2GB') == 2000000000, str(parse_memory('2GB'))
assert parse_memory('2GiB') == 2147483648, str(parse_memory('2GiB'))
assert parse_memory('1 GB') == 1000000000, str(parse_memory('1 GB'))
assert parse_memory('1 GiB') == 1073741824, str(parse_memory('1 GiB'))
# estimated run time <1 sec
[docs] def testFn_ParseCores(self):
"""Test the wdl built-in functional equivalent of 'parse_cores()',
which parses a specified disk input to an int output.
The input can be a string or an int."""
assert parse_cores(1) == 1
assert parse_cores('1') == 1
# estimated run time <1 sec
[docs] def testFn_ParseDisk(self):
"""Test the wdl built-in functional equivalent of 'parse_disk()',
which parses a specified disk input to an int output.
The input can be a string or an int or a float and may include units
such as 'Gb' or 'mib' as a separate argument.
The minimum returned value is 2147483648 bytes."""
# check minimum returned value
assert parse_disk('1') == 2147483648, str(parse_disk('1'))
assert parse_disk(1) == 2147483648, str(parse_disk(1))
assert parse_disk(2200000001) == 2200000001, str(parse_disk(2200000001))
assert parse_disk('2200000001') == 2200000001, str(parse_disk('2200000001'))
assert parse_disk('/mnt/my_mnt 3 SSD, /mnt/my_mnt2 500 HDD') == 503000000000, str(parse_disk('/mnt/my_mnt 3 SSD, /mnt/my_mnt2 500 HDD'))
assert parse_disk('local-disk 10 SSD') == 10000000000, str(parse_disk('local-disk 10 SSD'))
assert parse_disk('/mnt/ 10 HDD') == 10000000000, str(parse_disk('/mnt/ 10 HDD'))
assert parse_disk('/mnt/ 1000 HDD') == 1000000000000, str(parse_disk('/mnt/ 1000 HDD'))
# estimated run time <1 sec
[docs] def testPrimitives(self):
"""Test if toilwdl correctly interprets some basic declarations."""
wdl = os.path.abspath('src/toil/test/wdl/testfiles/vocab.wdl')
# TODO: test for all version.
aWDL = get_analyzer(wdl)
aWDL.analyze()
no_declaration = ['bool1', 'int1', 'float1', 'file1', 'string1']
collection_counter = []
for key, declaration in aWDL.workflows_dictionary['vocabulary'].items():
if not key.startswith('declaration'):
continue
name, var_type, var_expr = declaration
if name in no_declaration:
collection_counter.append(name)
assert not var_expr
if name == 'bool2':
collection_counter.append(name)
assert var_expr == 'True', var_expr
assert var_type == 'Boolean', var_type
if name == 'int2':
collection_counter.append(name)
assert var_expr == '1', var_expr
assert var_type == 'Int', var_type
if name == 'float2':
collection_counter.append(name)
assert var_expr == '1.1', var_expr
assert var_type == 'Float', var_type
if name == 'file2':
collection_counter.append(name)
assert var_expr == "'src/toil/test/wdl/test.tsv'", var_expr
assert var_type == 'File', var_type
if name == 'string2':
collection_counter.append(name)
assert var_expr == "'x'", var_expr
assert var_type == 'String', var_type
assert collection_counter == ['bool1', 'int1', 'float1', 'file1', 'string1',
'bool2', 'int2', 'float2', 'file2', 'string2']
# estimated run time <1 sec
[docs] def testCSV(self):
default_csv_output = [['1', '2', '3'],
['4', '5', '6'],
['7', '8', '9']]
csv_array = read_csv(os.path.abspath('src/toil/test/wdl/test.csv'))
assert csv_array == default_csv_output
# estimated run time <1 sec
[docs] def testTSV(self):
default_tsv_output = [['1', '2', '3'],
['4', '5', '6'],
['7', '8', '9']]
tsv_array = read_tsv(os.path.abspath('src/toil/test/wdl/test.tsv'))
assert tsv_array == default_tsv_output
[docs]class ToilWdlIntegrationTest(BaseToilWdlTest):
"""Test class for WDL tests that need extra workflows and data downloaded"""
gatk_data: str
gatk_data_dir: str
encode_data: str
encode_data_dir: str
wdl_data: str
wdl_data_dir: str
[docs] @classmethod
def setUpClass(cls) -> None:
"""Runs once for all tests."""
super(ToilWdlIntegrationTest, cls).setUpClass()
cls.test_directory = os.path.abspath("src/toil/test/wdl/")
cls.encode_data = os.path.join(cls.test_directory, "ENCODE_data.zip")
cls.encode_data_dir = os.path.join(cls.test_directory, "ENCODE_data")
cls.wdl_data = os.path.join(cls.test_directory, "wdl_templates.zip")
cls.wdl_data_dir = os.path.join(cls.test_directory, "wdl_templates")
cls.gatk_data = os.path.join(cls.test_directory, "GATK_data.zip")
cls.gatk_data_dir = os.path.join(cls.test_directory, "GATK_data")
cls.fetch_and_unzip_from_s3(filename='ENCODE_data.zip',
data=cls.encode_data,
data_dir=cls.encode_data_dir)
cls.fetch_and_unzip_from_s3(filename='wdl_templates.zip',
data=cls.wdl_data,
data_dir=cls.wdl_data_dir)
cls.fetch_and_unzip_from_s3(filename='GATK_data.zip',
data=cls.gatk_data,
data_dir=cls.gatk_data_dir)
[docs] @classmethod
def tearDownClass(cls) -> None:
"""We generate a lot of cruft."""
data_dirs = [cls.gatk_data_dir, cls.wdl_data_dir, cls.encode_data_dir]
data_zips = [cls.gatk_data, cls.wdl_data, cls.encode_data]
encode_outputs = ['ENCFF000VOL_chr21.fq.gz',
'ENCFF000VOL_chr21.raw.srt.bam',
'ENCFF000VOL_chr21.raw.srt.bam.flagstat.qc',
'ENCFF000VOL_chr21.raw.srt.dup.qc',
'ENCFF000VOL_chr21.raw.srt.filt.nodup.srt.final.bam',
'ENCFF000VOL_chr21.raw.srt.filt.nodup.srt.final.bam.bai',
'ENCFF000VOL_chr21.raw.srt.filt.nodup.srt.final.filt.nodup.sample.15.SE.tagAlign.gz',
'ENCFF000VOL_chr21.raw.srt.filt.nodup.srt.final.filt.nodup.sample.15.SE.tagAlign.gz.cc.plot.pdf',
'ENCFF000VOL_chr21.raw.srt.filt.nodup.srt.final.filt.nodup.sample.15.SE.tagAlign.gz.cc.qc',
'ENCFF000VOL_chr21.raw.srt.filt.nodup.srt.final.flagstat.qc',
'ENCFF000VOL_chr21.raw.srt.filt.nodup.srt.final.pbc.qc',
'ENCFF000VOL_chr21.raw.srt.filt.nodup.srt.final.SE.tagAlign.gz',
'ENCFF000VOL_chr21.sai',
'test.txt',
'filter_qc.json',
'filter_qc.log',
'GRCh38_chr21_bwa.tar.gz',
'mapping.json',
'mapping.log',
'post_mapping.json',
'post_mapping.log',
'wdl-stats.log',
'xcor.json',
'xcor.log',
'toilwdl_compiled.pyc',
'toilwdl_compiled.py',
'post_processing.log',
'md5.log']
for cleanup in data_dirs + data_zips + encode_outputs:
if os.path.isdir(cleanup):
shutil.rmtree(cleanup)
elif os.path.exists(cleanup):
os.remove(cleanup)
super(ToilWdlIntegrationTest, cls).tearDownClass()
# estimated run time 27 sec
[docs] @slow
@needs_java
def testTut01(self):
"""Test if toilwdl produces the same outputs as known good outputs for WDL's
GATK tutorial #1."""
wdl = os.path.abspath("src/toil/test/wdl/wdl_templates/t01/helloHaplotypeCaller.wdl")
json = os.path.abspath("src/toil/test/wdl/wdl_templates/t01/helloHaplotypeCaller_inputs.json")
ref_dir = os.path.abspath("src/toil/test/wdl/wdl_templates/t01/output/")
subprocess.check_call(self.base_command + [wdl, json, '-o', self.output_dir])
compare_runs(self.output_dir, ref_dir)
# estimated run time 28 sec
[docs] @slow
@needs_java
def testTut02(self):
"""Test if toilwdl produces the same outputs as known good outputs for WDL's
GATK tutorial #2."""
wdl = os.path.abspath("src/toil/test/wdl/wdl_templates/t02/simpleVariantSelection.wdl")
json = os.path.abspath("src/toil/test/wdl/wdl_templates/t02/simpleVariantSelection_inputs.json")
ref_dir = os.path.abspath("src/toil/test/wdl/wdl_templates/t02/output/")
subprocess.check_call(self.base_command + [wdl, json, '-o', self.output_dir])
compare_runs(self.output_dir, ref_dir)
# estimated run time 60 sec
[docs] @slow
@needs_java
def testTut03(self):
"""Test if toilwdl produces the same outputs as known good outputs for WDL's
GATK tutorial #3."""
wdl = os.path.abspath("src/toil/test/wdl/wdl_templates/t03/simpleVariantDiscovery.wdl")
json = os.path.abspath("src/toil/test/wdl/wdl_templates/t03/simpleVariantDiscovery_inputs.json")
ref_dir = os.path.abspath("src/toil/test/wdl/wdl_templates/t03/output/")
subprocess.check_call(self.base_command + [wdl, json, '-o', self.output_dir])
compare_runs(self.output_dir, ref_dir)
# estimated run time 175 sec
[docs] @slow
@needs_java
@unittest.skip('broken; see: https://github.com/DataBiosphere/toil/issues/3339')
def testTut04(self):
"""Test if toilwdl produces the same outputs as known good outputs for WDL's
GATK tutorial #4."""
wdl = os.path.abspath("src/toil/test/wdl/wdl_templates/t04/jointCallingGenotypes.wdl")
json = os.path.abspath("src/toil/test/wdl/wdl_templates/t04/jointCallingGenotypes_inputs.json")
ref_dir = os.path.abspath("src/toil/test/wdl/wdl_templates/t04/output/")
subprocess.check_call(self.base_command + [wdl, json, '-o', self.output_dir])
compare_runs(self.output_dir, ref_dir)
# estimated run time 80 sec
[docs] @slow
@needs_docker
def testENCODE(self):
"""Test if toilwdl produces the same outputs as known good outputs for
a short ENCODE run."""
wdl = os.path.abspath(
"src/toil/test/wdl/wdl_templates/testENCODE/encode_mapping_workflow.wdl")
json = os.path.abspath(
"src/toil/test/wdl/wdl_templates/testENCODE/encode_mapping_workflow.wdl.json")
ref_dir = os.path.abspath(
"src/toil/test/wdl/wdl_templates/testENCODE/output/")
subprocess.check_call(
self.base_command + [wdl, json, '--docker_user=None', '--out_dir', self.output_dir])
compare_runs(self.output_dir, ref_dir)
# estimated run time 2 sec
[docs] def testPipe(self):
"""Test basic bash input functionality with a pipe."""
wdl = os.path.abspath(
"src/toil/test/wdl/wdl_templates/testPipe/call.wdl")
json = os.path.abspath(
"src/toil/test/wdl/wdl_templates/testPipe/call.json")
ref_dir = os.path.abspath(
"src/toil/test/wdl/wdl_templates/testPipe/output/")
subprocess.check_call(
self.base_command + [wdl, json, '--out_dir', self.output_dir])
compare_runs(self.output_dir, ref_dir)
# estimated run time <1 sec
[docs] def testJSON(self):
default_json_dict_output = {
'helloHaplotypeCaller.haplotypeCaller.RefIndex': '"src/toil/test/wdl/GATK_data/ref/human_g1k_b37_20.fasta.fai"',
'helloHaplotypeCaller.haplotypeCaller.sampleName': '"WDL_tut1_output"',
'helloHaplotypeCaller.haplotypeCaller.inputBAM': '"src/toil/test/wdl/GATK_data/inputs/NA12878_wgs_20.bam"',
'helloHaplotypeCaller.haplotypeCaller.bamIndex': '"src/toil/test/wdl/GATK_data/inputs/NA12878_wgs_20.bai"',
'helloHaplotypeCaller.haplotypeCaller.GATK': '"src/toil/test/wdl/GATK_data/gatk-package-4.1.9.0-local.jar"',
'helloHaplotypeCaller.haplotypeCaller.RefDict': '"src/toil/test/wdl/GATK_data/ref/human_g1k_b37_20.dict"',
'helloHaplotypeCaller.haplotypeCaller.RefFasta': '"src/toil/test/wdl/GATK_data/ref/human_g1k_b37_20.fasta"'}
from toil.wdl.utils import dict_from_JSON
json_dict = dict_from_JSON("src/toil/test/wdl/wdl_templates/t01/helloHaplotypeCaller_inputs.json")
assert json_dict == default_json_dict_output, (
str(json_dict) + '\nAssertionError: ' + str(default_json_dict_output))
# estimated run time <1 sec
[docs] def test_size_large(self) -> None:
"""Test the wdl built-in functional equivalent of 'size()',
which returns a file's size based on the path, on a large file."""
from toil.common import Toil
from toil.job import Job
from toil.wdl.wdl_types import WDLFile
options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
options.clean = 'always'
with Toil(options) as toil:
large = process_infile(WDLFile(file_path=self.encode_data), toil)
larger_file = size(large)
larger_file_in_mb = size(large, 'mb')
assert larger_file >= 70000000, larger_file
assert larger_file_in_mb >= 70, larger_file_in_mb
[docs] @classmethod
def fetch_and_unzip_from_s3(cls, filename, data, data_dir):
if not os.path.exists(data):
s3_loc = os.path.join('http://toil-datasets.s3.amazonaws.com/', filename)
urlretrieve(s3_loc, data)
# extract the compressed data if not already extracted
if not os.path.exists(data_dir):
with zipfile.ZipFile(data, 'r') as zip_ref:
zip_ref.extractall(cls.test_directory)
[docs]def compare_runs(output_dir, ref_dir):
"""
Takes two directories and compares all of the files between those two
directories, asserting that they match.
- Ignores outputs.txt, which contains a list of the outputs in the folder.
- Compares line by line, unless the file is a .vcf file.
- Ignores potentially date-stamped comments (lines starting with '#').
- Ignores quality scores in .vcf files and only checks that they found
the same variants. This is due to assumed small observed rounding
differences between systems.
:param ref_dir: The first directory to compare (with output_dir).
:param output_dir: The second directory to compare (with ref_dir).
"""
reference_output_files = os.listdir(ref_dir)
for file in reference_output_files:
if file not in ('outputs.txt', '__pycache__'):
test_output_files = os.listdir(output_dir)
filepath = os.path.join(ref_dir, file)
with open(filepath) as default_file:
good_data = []
for line in default_file:
if not line.startswith('#'):
good_data.append(line)
for test_file in test_output_files:
if file == test_file:
test_filepath = os.path.join(output_dir, file)
if file.endswith(".vcf"):
compare_vcf_files(filepath1=filepath,
filepath2=test_filepath)
else:
with open(test_filepath) as test_file:
test_data = []
for line in test_file:
if not line.startswith('#'):
test_data.append(line)
assert good_data == test_data, "File does not match: %r" % file
[docs]def compare_vcf_files(filepath1, filepath2):
"""
Asserts that two .vcf files contain the same variant findings.
- Ignores potentially date-stamped comments (lines starting with '#').
- Ignores quality scores in .vcf files and only checks that they found
the same variants. This is due to assumed small observed rounding
differences between systems.
VCF File Column Contents:
1: #CHROM
2: POS
3: ID
4: REF
5: ALT
6: QUAL
7: FILTER
8: INFO
:param filepath1: First .vcf file to compare.
:param filepath2: Second .vcf file to compare.
"""
with open(filepath1) as default_file:
good_data = []
for line in default_file:
line = line.strip()
if not line.startswith('#'):
good_data.append(line.split('\t'))
with open(filepath2) as test_file:
test_data = []
for line in test_file:
line = line.strip()
if not line.startswith('#'):
test_data.append(line.split('\t'))
for i in range(len(test_data)):
if test_data[i] != good_data[i]:
for j in range(len(test_data[i])):
# Only compare chromosome, position, ID, reference, and alts.
# Quality score may vary (<1%) between systems because of
# (assumed) rounding differences. Same for the "info" sect.
if j < 5:
if j == 4:
if test_data[i][j].startswith('*,'):
test_data[i][j] = test_data[i][j][2:]
if good_data[i][j].startswith('*,'):
good_data[i][j] = good_data[i][j][2:]
assert test_data[i][j] == good_data[i][j], f"\nInconsistent VCFs: {filepath1} != {filepath2}\n" \
f" - {test_data[i][j]} != {good_data[i][j]}\n" \
f" - Line: {i} Column: {j}"
if __name__ == "__main__":
unittest.main() # run all tests