toil.lib.threading

Attributes

logger

current_process_name_lock

current_process_name_for

Classes

ExceptionalThread

A thread whose join() method re-raises exceptions raised during run(). While join() is

LastProcessStandingArena

Class that lets a bunch of processes detect and elect a last process

Functions

ensure_filesystem_lockable(path[, timeout, hint])

Make sure that the filesystem used at the given path is one where locks are safe to use.

safe_lock(fd[, block, shared])

Get an fcntl lock, while retrying on IO errors.

safe_unlock_and_close(fd)

Release an fcntl lock and close the file descriptor, while handling fcntl IO errors.

cpu_count()

Get the rounded-up integer number of whole CPUs available.

collect_process_name_garbage()

Delete all the process names that point to files that don't exist anymore

destroy_all_process_names()

Delete all our process name files because our process is going away.

get_process_name(base_dir)

Return the name of the current process. Like a PID but visible between

process_name_exists(base_dir, name)

Return true if the process named by the given name (from process_name) exists, and false otherwise.

global_mutex(base_dir, mutex)

Context manager that locks a mutex. The mutex is identified by the given

Module Contents

toil.lib.threading.logger
toil.lib.threading.ensure_filesystem_lockable(path, timeout=30, hint=None)[source]

Make sure that the filesystem used at the given path is one where locks are safe to use.

File locks are not safe to use on Ceph. See <https://github.com/DataBiosphere/toil/issues/4972>.

Raises an exception if the filesystem is detected as one where using locks is known to trigger bugs in the filesystem implementation. Also raises an exception if the given path does not exist, or if attempting to determine the filesystem type takes more than the timeout in seconds.

If the filesystem type cannot be determined, does nothing.

Parameters:
  • hint (Optional[str]) – Extra text to include in an error, if raised, telling the user how to change the offending path.

  • path (str)

  • timeout (float)

Return type:

None

toil.lib.threading.safe_lock(fd, block=True, shared=False)[source]

Get an fcntl lock, while retrying on IO errors.

Raises OSError with EACCES or EAGAIN when a nonblocking lock is not immediately available.

Parameters:
Return type:

None

toil.lib.threading.safe_unlock_and_close(fd)[source]

Release an fcntl lock and close the file descriptor, while handling fcntl IO errors.

Parameters:

fd (int)

Return type:

None

class toil.lib.threading.ExceptionalThread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)[source]

Bases: threading.Thread

A thread whose join() method re-raises exceptions raised during run(). While join() is idempotent, the exception is only during the first invocation of join() that successfully joined the thread. If join() times out, no exception will be re reraised even though an exception might already have occurred in run().

When subclassing this thread, override tryRun() instead of run().

>>> def f():
...     assert 0
>>> t = ExceptionalThread(target=f)
>>> t.start()
>>> t.join()
Traceback (most recent call last):
...
AssertionError
>>> class MyThread(ExceptionalThread):
...     def tryRun( self ):
...         assert 0
>>> t = MyThread()
>>> t.start()
>>> t.join()
Traceback (most recent call last):
...
AssertionError
exc_info = None
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Return type:

None

tryRun()[source]
Return type:

None

join(*args, **kwargs)[source]

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

Parameters:
Return type:

None

toil.lib.threading.cpu_count()[source]

Get the rounded-up integer number of whole CPUs available.

Counts hyperthreads as CPUs.

Uses the system’s actual CPU count, or the current v1 cgroup’s quota per period, if the quota is set.

Ignores the cgroup’s cpu shares value, because it’s extremely difficult to interpret. See https://github.com/kubernetes/kubernetes/issues/81021.

Caches result for efficiency.

Returns:

Integer count of available CPUs, minimum 1.

Return type:

int

toil.lib.threading.current_process_name_lock
toil.lib.threading.current_process_name_for: dict[str, str]
toil.lib.threading.collect_process_name_garbage()[source]

Delete all the process names that point to files that don’t exist anymore (because the work directory was temporary and got cleaned up). This is known to happen during the tests, which get their own temp directories.

Caller must hold current_process_name_lock.

Return type:

None

toil.lib.threading.destroy_all_process_names()[source]

Delete all our process name files because our process is going away.

We let all our FDs get closed by the process death.

We assume there is nobody else using the system during exit to race with.

Return type:

None

toil.lib.threading.get_process_name(base_dir)[source]

Return the name of the current process. Like a PID but visible between containers on what to Toil appears to be a node.

Parameters:

base_dir (str) – Base directory to work in. Defines the shared namespace.

Returns:

Process’s assigned name

Return type:

str

toil.lib.threading.process_name_exists(base_dir, name)[source]

Return true if the process named by the given name (from process_name) exists, and false otherwise.

Can see across container boundaries using the given node workflow directory.

Parameters:
  • base_dir (str) – Base directory to work in. Defines the shared namespace.

  • name (str) – Process’s name to poll

Returns:

True if the named process is still alive, and False otherwise.

Return type:

bool

toil.lib.threading.global_mutex(base_dir, mutex)[source]

Context manager that locks a mutex. The mutex is identified by the given name, and scoped to the given directory. Works across all containers that have access to the given diectory. Mutexes held by dead processes are automatically released.

Only works between processes, NOT between threads.

Parameters:
  • base_dir (str) – Base directory to work in. Defines the shared namespace.

  • mutex (str) – Mutex to lock. Must be a permissible path component.

Return type:

collections.abc.Iterator[None]

class toil.lib.threading.LastProcessStandingArena(base_dir, name)[source]

Class that lets a bunch of processes detect and elect a last process standing.

Process enter and leave (sometimes due to sudden existence failure). We guarantee that the last process to leave, if it leaves properly, will get a chance to do some cleanup. If new processes try to enter during the cleanup, they will be delayed until after the cleanup has happened and the previous “last” process has finished leaving.

The user is responsible for making sure you always leave if you enter! Consider using a try/finally; this class is not a context manager.

Parameters:
base_dir
mutex
lockfileDir
lockfileFD = None
lockfileName = None
enter()[source]

This process is entering the arena. If cleanup is in progress, blocks until it is finished.

You may not enter the arena again before leaving it.

Return type:

None

leave()[source]

This process is leaving the arena. If this process happens to be the last process standing, yields something, with other processes blocked from joining the arena until the loop body completes and the process has finished leaving. Otherwise, does not yield anything.

Should be used in a loop:

for _ in arena.leave():

# If we get here, we were the last process. Do the cleanup pass

Return type:

collections.abc.Iterator[bool]