Source code for toil.test.src.threadingTest

import logging
import multiprocessing
import os
import random
import time
import traceback
from functools import partial

from toil.lib.threading import (LastProcessStandingArena,
                                cpu_count,
                                global_mutex)
from toil.test import ToilTest

log = logging.getLogger(__name__)

[docs] class ThreadingTest(ToilTest): """Test Toil threading/synchronization tools."""
[docs] def testGlobalMutexOrdering(self): for it in range(10): log.info('Iteration %d', it) scope = self._createTempDir() mutex = 'mutex' # Use processes (as opposed to threads) to prevent GIL from ordering things artificially pool = multiprocessing.Pool(processes=cpu_count()) try: numTasks = 100 results = pool.map_async( func=partial(_testGlobalMutexOrderingTask, scope, mutex), iterable=list(range(numTasks))) results = results.get() finally: pool.close() pool.join() self.assertEqual(len(results), numTasks) for item in results: # Make sure all workers say they succeeded self.assertEqual(item, True)
[docs] def testLastProcessStanding(self): for it in range(10): log.info('Iteration %d', it) scope = self._createTempDir() arena_name = 'thunderdome' # Use processes (as opposed to threads) to prevent GIL from ordering things artificially pool = multiprocessing.Pool(processes=cpu_count()) try: numTasks = 100 results = pool.map_async( func=partial(_testLastProcessStandingTask, scope, arena_name), iterable=list(range(numTasks))) results = results.get() finally: pool.close() pool.join() self.assertEqual(len(results), numTasks) for item in results: # Make sure all workers say they succeeded self.assertEqual(item, True) for filename in os.listdir(scope): assert not filename.startswith('precious'), f"File {filename} still exists"
def _testGlobalMutexOrderingTask(scope, mutex, number): try: # We will all fight over the potato potato = os.path.join(scope, 'potato') with global_mutex(scope, mutex): log.info('PID %d = num %d running', os.getpid(), number) assert not os.path.exists(potato), "We see someone else holding the potato file" # Put our name there with open(potato, 'w') as out_stream: out_stream.write(str(number)) # Wait time.sleep(random.random() * 0.01) # Make sure our name is still there with open(potato) as in_stream: seen = in_stream.read().rstrip() assert seen == str(number), f"We are {number} but {seen} stole our potato!" os.unlink(potato) assert not os.path.exists(potato), "We left the potato behind" log.info('PID %d = num %d dropped potato', os.getpid(), number) return True except: traceback.print_exc() return False def _testLastProcessStandingTask(scope, arena_name, number): try: arena = LastProcessStandingArena(scope, arena_name) arena.enter() log.info('PID %d = num %d entered arena', os.getpid(), number) try: # We all make files my_precious = os.path.join(scope, 'precious' + str(number)) # Put our name there with open(my_precious, 'w') as out_stream: out_stream.write(str(number)) # Wait time.sleep(random.random() * 0.01) # Make sure our file is still there unmodified assert os.path.exists(my_precious), f"Precious file {my_precious} has been stolen!" with open(my_precious) as in_stream: seen = in_stream.read().rstrip() assert seen == str(number), f"We are {number} but saw {seen} in our precious file!" finally: was_last = False for _ in arena.leave(): was_last = True log.info('PID %d = num %d is last standing', os.getpid(), number) # Clean up all the files for filename in os.listdir(scope): if filename.startswith('precious'): log.info('PID %d = num %d cleaning up %s', os.getpid(), number, filename) os.unlink(os.path.join(scope, filename)) log.info('PID %d = num %d left arena', os.getpid(), number) return True except: traceback.print_exc() return False