toil.lib.threading¶
Attributes¶
Classes¶
A thread whose join() method re-raises exceptions raised during run(). While join() is |
|
Class that lets a bunch of processes detect and elect a last process |
Functions¶
|
Make sure that the filesystem used at the given path is one where locks are safe to use. |
|
Get an fcntl lock, while retrying on IO errors. |
Release an fcntl lock and close the file descriptor, while handling fcntl IO errors. |
|
Get the rounded-up integer number of whole CPUs available. |
|
Delete all the process names that point to files that don't exist anymore |
|
Delete all our process name files because our process is going away. |
|
|
Return the name of the current process. Like a PID but visible between |
|
Return true if the process named by the given name (from process_name) exists, and false otherwise. |
|
Context manager that locks a mutex. The mutex is identified by the given |
Module Contents¶
- toil.lib.threading.logger¶
- toil.lib.threading.ensure_filesystem_lockable(path, timeout=30, hint=None)[source]¶
Make sure that the filesystem used at the given path is one where locks are safe to use.
File locks are not safe to use on Ceph. See <https://github.com/DataBiosphere/toil/issues/4972>.
Raises an exception if the filesystem is detected as one where using locks is known to trigger bugs in the filesystem implementation. Also raises an exception if the given path does not exist, or if attempting to determine the filesystem type takes more than the timeout in seconds.
If the filesystem type cannot be determined, does nothing.
- toil.lib.threading.safe_lock(fd, block=True, shared=False)[source]¶
Get an fcntl lock, while retrying on IO errors.
Raises OSError with EACCES or EAGAIN when a nonblocking lock is not immediately available.
- toil.lib.threading.safe_unlock_and_close(fd)[source]¶
Release an fcntl lock and close the file descriptor, while handling fcntl IO errors.
- Parameters:
fd (int)
- Return type:
None
- class toil.lib.threading.ExceptionalThread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)[source]¶
Bases:
threading.Thread
A thread whose join() method re-raises exceptions raised during run(). While join() is idempotent, the exception is only during the first invocation of join() that successfully joined the thread. If join() times out, no exception will be re reraised even though an exception might already have occurred in run().
When subclassing this thread, override tryRun() instead of run().
>>> def f(): ... assert 0 >>> t = ExceptionalThread(target=f) >>> t.start() >>> t.join() Traceback (most recent call last): ... AssertionError
>>> class MyThread(ExceptionalThread): ... def tryRun( self ): ... assert 0 >>> t = MyThread() >>> t.start() >>> t.join() Traceback (most recent call last): ... AssertionError
- exc_info = None¶
- run()[source]¶
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- Return type:
None
- join(*args, **kwargs)[source]¶
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
- toil.lib.threading.cpu_count()[source]¶
Get the rounded-up integer number of whole CPUs available.
Counts hyperthreads as CPUs.
Uses the system’s actual CPU count, or the current v1 cgroup’s quota per period, if the quota is set.
Ignores the cgroup’s cpu shares value, because it’s extremely difficult to interpret. See https://github.com/kubernetes/kubernetes/issues/81021.
Caches result for efficiency.
- Returns:
Integer count of available CPUs, minimum 1.
- Return type:
- toil.lib.threading.current_process_name_lock¶
- toil.lib.threading.collect_process_name_garbage()[source]¶
Delete all the process names that point to files that don’t exist anymore (because the work directory was temporary and got cleaned up). This is known to happen during the tests, which get their own temp directories.
Caller must hold current_process_name_lock.
- Return type:
None
- toil.lib.threading.destroy_all_process_names()[source]¶
Delete all our process name files because our process is going away.
We let all our FDs get closed by the process death.
We assume there is nobody else using the system during exit to race with.
- Return type:
None
- toil.lib.threading.get_process_name(base_dir)[source]¶
Return the name of the current process. Like a PID but visible between containers on what to Toil appears to be a node.
- toil.lib.threading.process_name_exists(base_dir, name)[source]¶
Return true if the process named by the given name (from process_name) exists, and false otherwise.
Can see across container boundaries using the given node workflow directory.
- toil.lib.threading.global_mutex(base_dir, mutex)[source]¶
Context manager that locks a mutex. The mutex is identified by the given name, and scoped to the given directory. Works across all containers that have access to the given diectory. Mutexes held by dead processes are automatically released.
Only works between processes, NOT between threads.
- Parameters:
- Return type:
collections.abc.Iterator[None]
- class toil.lib.threading.LastProcessStandingArena(base_dir, name)[source]¶
Class that lets a bunch of processes detect and elect a last process standing.
Process enter and leave (sometimes due to sudden existence failure). We guarantee that the last process to leave, if it leaves properly, will get a chance to do some cleanup. If new processes try to enter during the cleanup, they will be delayed until after the cleanup has happened and the previous “last” process has finished leaving.
The user is responsible for making sure you always leave if you enter! Consider using a try/finally; this class is not a context manager.
- base_dir¶
- mutex¶
- lockfileDir¶
- lockfileFD = None¶
- lockfileName = None¶
- enter()[source]¶
This process is entering the arena. If cleanup is in progress, blocks until it is finished.
You may not enter the arena again before leaving it.
- Return type:
None
- leave()[source]¶
This process is leaving the arena. If this process happens to be the last process standing, yields something, with other processes blocked from joining the arena until the loop body completes and the process has finished leaving. Otherwise, does not yield anything.
Should be used in a loop:
- for _ in arena.leave():
# If we get here, we were the last process. Do the cleanup pass
- Return type: