# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import codecs
import logging
import os
import random
import sys
import time
import traceback
from threading import Event, Thread
from unittest import skipIf
import pytest
from toil.batchSystems.singleMachine import SingleMachineBatchSystem
from toil.exceptions import FailedJobsException
from toil.job import Job
from toil.leader import DeadlockException
from toil.lib.retry import retry_flaky_test
from toil.test import ToilTest, get_temp_file, slow
logger = logging.getLogger(__name__)
[docs]
class JobServiceTest(ToilTest):
"""
Tests testing the Job.Service class
"""
[docs]
@slow
def testServiceSerialization(self):
"""
Tests that a service can receive a promise without producing a serialization
error.
"""
job = Job()
service = ToySerializableService("woot")
startValue = job.addService(service) # Add a first service to job
subService = ToySerializableService(startValue) # Now create a child of
# that service that takes the start value promise from the parent service
job.addService(subService, parentService=service) # This should work if
# serialization on services is working correctly.
self.runToil(job)
[docs]
@slow
def testService(self, checkpoint=False):
"""
Tests the creation of a Job.Service with random failures of the worker.
"""
for test in range(2):
outFile = get_temp_file(rootDir=self._createTempDir()) # Temporary file
messageInt = random.randint(1, sys.maxsize)
try:
# Wire up the services/jobs
t = Job.wrapJobFn(
serviceTest, outFile, messageInt, checkpoint=checkpoint
)
# Run the workflow repeatedly until success
self.runToil(t)
# Check output
self.assertEqual(int(open(outFile).readline()), messageInt)
finally:
os.remove(outFile)
[docs]
@slow
@skipIf(
SingleMachineBatchSystem.numCores < 4,
"Need at least four cores to run this test",
)
def testServiceDeadlock(self):
"""
Creates a job with more services than maxServices, checks that deadlock is detected.
"""
outFile = get_temp_file(rootDir=self._createTempDir())
try:
def makeWorkflow():
job = Job()
r1 = job.addService(ToySerializableService("woot1"))
r2 = job.addService(ToySerializableService("woot2"))
r3 = job.addService(ToySerializableService("woot3"))
job.addChildFn(fnTest, [r1, r2, r3], outFile)
return job
# This should fail as too few services available
try:
self.runToil(
makeWorkflow(), badWorker=0.0, maxServiceJobs=2, deadlockWait=5
)
except DeadlockException:
print("Got expected deadlock exception")
else:
assert 0
# This should pass, as adequate services available
self.runToil(makeWorkflow(), maxServiceJobs=3)
# Check we get expected output
assert open(outFile).read() == "woot1 woot2 woot3"
finally:
os.remove(outFile)
[docs]
def testServiceWithCheckpoints(self):
"""
Tests the creation of a Job.Service with random failures of the worker, making the root job use checkpointing to
restart the subtree.
"""
self.testService(checkpoint=True)
[docs]
@slow
@skipIf(
SingleMachineBatchSystem.numCores < 4,
"Need at least four cores to run this test",
)
def testServiceRecursive(self, checkpoint=True):
"""
Tests the creation of a Job.Service, creating a chain of services and accessing jobs.
Randomly fails the worker.
"""
for test in range(1):
# Temporary file
outFile = get_temp_file(rootDir=self._createTempDir())
messages = [random.randint(1, sys.maxsize) for i in range(3)]
try:
# Wire up the services/jobs
t = Job.wrapJobFn(
serviceTestRecursive, outFile, messages, checkpoint=checkpoint
)
# Run the workflow repeatedly until success
self.runToil(t)
# Check output
self.assertEqual(list(map(int, open(outFile).readlines())), messages)
finally:
os.remove(outFile)
[docs]
@slow
@skipIf(
SingleMachineBatchSystem.numCores < 4,
"Need at least four cores to run this test",
)
@pytest.mark.timeout(1200)
def testServiceParallelRecursive(self, checkpoint=True):
"""
Tests the creation of a Job.Service, creating parallel chains of services and accessing jobs.
Randomly fails the worker.
"""
# This test needs to have something like 10 jobs succeed
BUNDLE_SIZE = 3
BUNDLE_COUNT = 2
RETRY_COUNT = 4
FAIL_FRACTION = 0.5
MAX_ATTEMPTS = 10
total_jobs = BUNDLE_SIZE * BUNDLE_COUNT * 2 + 1
p_complete_job_failure = FAIL_FRACTION ** (RETRY_COUNT + 1)
p_workflow_success = (1 - p_complete_job_failure) ** total_jobs
logger.info("Going to run %s total jobs, each of which completely fails %s of the time, so the workflow will succeed with probability %s", total_jobs, p_complete_job_failure, p_workflow_success)
p_test_failure = (1 - p_workflow_success) ** MAX_ATTEMPTS
logger.info("This test will fail spuriously with probability %s", p_test_failure)
# We want to run the workflow through several times to test restarting, so we need it to often fail but reliably sometimes succeed, and almost always succeed when repeated.
self.assertGreater(0.8, p_workflow_success)
self.assertGreater(p_workflow_success, 0.2)
self.assertGreater(0.001, p_test_failure)
for test in range(1):
# Temporary file
outFiles = [get_temp_file(rootDir=self._createTempDir()) for j in range(BUNDLE_COUNT)]
# We send 3 messages each in 2 sets, each of which needs a service and a client
messageBundles = [
[random.randint(1, sys.maxsize) for i in range(BUNDLE_SIZE)] for j in range(BUNDLE_COUNT)
]
try:
# Wire up the services/jobs
t = Job.wrapJobFn(
serviceTestParallelRecursive,
outFiles,
messageBundles,
checkpoint=True,
)
# Run the workflow repeatedly until success
self.runToil(t, retryCount=RETRY_COUNT, badWorker=FAIL_FRACTION, max_attempts=MAX_ATTEMPTS)
# Check output
for messages, outFile in zip(messageBundles, outFiles):
self.assertEqual(
list(map(int, open(outFile).readlines())), messages
)
finally:
list(map(os.remove, outFiles))
[docs]
def runToil(
self,
rootJob,
retryCount=1,
badWorker=0.5,
badWorkedFailInterval=0.1,
maxServiceJobs=sys.maxsize,
deadlockWait=60,
max_attempts=50
):
# Create the runner for the workflow.
options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
options.logLevel = "DEBUG"
options.retryCount = retryCount
options.badWorker = badWorker
options.badWorkerFailInterval = badWorkedFailInterval
options.servicePollingInterval = 1
options.maxServiceJobs = maxServiceJobs
options.deadlockWait = deadlockWait
# Run the workflow
total_tries = 1
while True:
try:
Job.Runner.startToil(rootJob, options)
break
except FailedJobsException as e:
i = e.numberOfFailedJobs
logger.info("Workflow attempt %s/%s failed with %s failed jobs", total_tries, max_attempts, i)
if total_tries == max_attempts:
self.fail() # Exceeded a reasonable number of restarts
total_tries += 1
options.restart = True
logger.info("Succeeded after %s/%s attempts", total_tries, max_attempts)
[docs]
class PerfectServiceTest(JobServiceTest):
[docs]
def runToil(
self,
*args,
**kwargs
):
"""
Let us run all the tests in the other service test class, but without worker failures.
"""
kwargs["badWorker"] = 0
super().runToil(
*args,
**kwargs
)
[docs]
def serviceTest(job, outFile, messageInt):
"""
Creates one service and one accessing job, which communicate with two files to establish
that both run concurrently.
"""
# Clean out out-file
open(outFile, "w").close()
# We create a random number that is added to messageInt and subtracted by
# the serviceAccessor, to prove that when service test is checkpointed and
# restarted there is never a connection made between an earlier service and
# later serviceAccessor, or vice versa.
to_subtract = random.randint(
1, sys.maxsize
)
job.addChildJobFn(
serviceAccessor,
job.addService(ToyService(messageInt + to_subtract)),
outFile,
to_subtract,
)
[docs]
def serviceTestRecursive(job, outFile, messages):
"""
Creates a chain of services and accessing jobs, each paired together.
"""
if len(messages) > 0:
# Clean out out-file
open(outFile, "w").close()
to_add = random.randint(1, sys.maxsize)
service = ToyService(messages[0] + to_add)
child = job.addChildJobFn(
serviceAccessor, job.addService(service), outFile, to_add
)
for i in range(1, len(messages)):
to_add = random.randint(1, sys.maxsize)
service2 = ToyService(messages[i] + to_add, cores=0.1)
child = child.addChildJobFn(
serviceAccessor,
job.addService(service2, parentService=service),
outFile,
to_add,
cores=0.1,
)
service = service2
[docs]
def serviceTestParallelRecursive(job, outFiles, messageBundles):
"""
Creates multiple chains of services and accessing jobs.
"""
for messages, outFile in zip(messageBundles, outFiles):
# Clean out out-file
open(outFile, "w").close()
if len(messages) > 0:
to_add = random.randint(1, sys.maxsize)
service = ToyService(messages[0] + to_add)
child = job.addChildJobFn(
serviceAccessor, job.addService(service), outFile, to_add
)
for i in range(1, len(messages)):
to_add = random.randint(1, sys.maxsize)
service2 = ToyService(messages[i] + to_add, cores=0.1)
child = child.addChildJobFn(
serviceAccessor,
job.addService(service2, parentService=service),
outFile,
to_add,
cores=0.1,
)
service = service2
[docs]
class ToyService(Job.Service):
def __init__(self, messageInt, *args, **kwargs):
"""
While established the service repeatedly:
- reads an integer i from the inJobStoreFileID file
- writes i and messageInt to the outJobStoreFileID file
"""
Job.Service.__init__(self, *args, **kwargs)
self.messageInt = messageInt
[docs]
def start(self, job):
assert self.disk is not None
assert self.memory is not None
assert self.cores is not None
self.terminate = Event()
self.error = Event()
# Note that service jobs are special and do not necessarily have job.jobStoreID.
# So we don't associate these files with this job.
inJobStoreID = job.fileStore.jobStore.get_empty_file_store_id()
outJobStoreID = job.fileStore.jobStore.get_empty_file_store_id()
self.serviceThread = Thread(
target=self.serviceWorker,
args=(
job.fileStore.jobStore,
self.terminate,
self.error,
inJobStoreID,
outJobStoreID,
self.messageInt,
),
)
self.serviceThread.start()
return (inJobStoreID, outJobStoreID)
[docs]
def stop(self, job):
self.terminate.set()
self.serviceThread.join()
[docs]
def check(self):
if self.error.isSet():
raise RuntimeError("Service worker failed")
return True
[docs]
@staticmethod
def serviceWorker(
jobStore, terminate, error, inJobStoreID, outJobStoreID, messageInt
):
try:
while True:
if terminate.isSet(): # Quit if we've got the terminate signal
logger.debug("Service worker being told to quit")
return
time.sleep(0.2) # Sleep to avoid thrashing
# Try reading a line from the input file
try:
with jobStore.read_file_stream(inJobStoreID) as f:
f = codecs.getreader("utf-8")(f)
line = f.readline()
except:
logger.debug(
"Something went wrong reading a line: %s",
traceback.format_exc(),
)
raise
if len(line.strip()) == 0:
# Don't try and make an integer out of nothing
continue
# Try converting the input line into an integer
try:
inputInt = int(line)
except ValueError:
logger.debug(
"Tried casting input line '%s' to integer but got error: %s",
line,
traceback.format_exc(),
)
continue
# Write out the resulting read integer and the message
with jobStore.update_file_stream(outJobStoreID) as f:
f.write((f"{inputInt} {messageInt}\n").encode())
except:
logger.debug("Error in service worker: %s", traceback.format_exc())
error.set()
raise
[docs]
def serviceAccessor(job, communicationFiles, outFile, to_subtract):
"""
Writes a random integer iinto the inJobStoreFileID file, then tries 10 times reading
from outJobStoreFileID to get a pair of integers, the first equal to i the second written into the outputFile.
"""
inJobStoreFileID, outJobStoreFileID = communicationFiles
# Get a random integer to advertise ourselves
key = random.randint(1, sys.maxsize)
# Write the integer into the file
logger.debug("Writing key to inJobStoreFileID")
with job.fileStore.jobStore.update_file_stream(inJobStoreFileID) as fH:
fH.write(("%s\n" % key).encode("utf-8"))
logger.debug("Trying to read key and message from outJobStoreFileID")
for i in range(10): # Try 10 times over
time.sleep(0.2) # Avoid thrashing
# Try reading an integer from the input file and writing out the message
with job.fileStore.jobStore.read_file_stream(outJobStoreFileID) as fH:
fH = codecs.getreader("utf-8")(fH)
line = fH.readline()
tokens = line.split()
if len(tokens) != 2:
continue
key2, message = tokens
if int(key2) == key:
logger.debug(
f"Matched key's: {key}, writing message: {int(message) - to_subtract} with to_subtract: {to_subtract}"
)
with open(outFile, "a") as fH:
fH.write("%s\n" % (int(message) - to_subtract))
return
assert 0 # Job failed to get info from the service
[docs]
class ToySerializableService(Job.Service):
def __init__(self, messageInt, *args, **kwargs):
"""
Trivial service for testing serialization.
"""
Job.Service.__init__(self, *args, **kwargs)
self.messageInt = messageInt
[docs]
def start(self, job):
return self.messageInt
[docs]
def stop(self, job):
pass
[docs]
def check(self):
return True
[docs]
def fnTest(strings, outputFile):
"""
Function concatenates the strings together and writes them to the output file
"""
with open(outputFile, "w") as fH:
fH.write(" ".join(strings))