Source code for toil.lib.io

import hashlib
import logging
import os
import shutil
import stat
import sys
import tempfile
import uuid
from collections.abc import Iterator
from contextlib import contextmanager
from io import BytesIO
from typing import IO, Any, Callable, Optional, Protocol, Union

logger = logging.getLogger(__name__)


[docs] def mkdtemp( suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None, ) -> str: """ Make a temporary directory like tempfile.mkdtemp, but with relaxed permissions. The permissions on the directory will be 711 instead of 700, allowing the group and all other users to traverse the directory. This is necessary if the directory is on NFS and the Docker daemon would like to mount it or a file inside it into a container, because on NFS even the Docker daemon appears bound by the file permissions. See <https://github.com/DataBiosphere/toil/issues/4644>, and <https://stackoverflow.com/a/67928880> which talks about a similar problem but in the context of user namespaces. """ # Make the directory result = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=dir) # Grant all the permissions: full control for user, and execute for group and other os.chmod( result, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH ) # Return the path created return result
[docs] def robust_rmtree(path: Union[str, bytes]) -> None: """ Robustly tries to delete paths. Continues silently if the path to be removed is already gone, or if it goes away while this function is executing. May raise an error if a path changes between file and directory while the function is executing, or if a permission error is encountered. """ # TODO: only allow str or bytes as an input if not isinstance(path, bytes): # Internally we must work in bytes, in case we find an undecodeable # filename. path = path.encode("utf-8") if not os.path.exists(path): # Nothing to do! return if not os.path.islink(path) and os.path.isdir(path): # It is or has been a directory try: children = os.listdir(path) except FileNotFoundError: # Directory went away return except OSError as exc: if exc.errno == 16: # 'Device or resource busy' return raise # We assume the directory going away while we have it open won't upset # the listdir iterator. for child in children: # Get the path for each child item in the directory child_path = os.path.join(path, child) # Remove it if still present robust_rmtree(child_path) try: # Actually remove the directory once the children are gone shutil.rmtree(path) except FileNotFoundError: # Directory went away return except OSError as exc: if exc.errno == 16: # 'Device or resource busy' return raise else: # It is not or was not a directory. try: # Unlink it as a normal file os.unlink(path) except FileNotFoundError: # File went away return except OSError as exc: if exc.errno == 16: # 'Device or resource busy' return raise
[docs] def atomic_tmp_file(final_path: str) -> str: """Return a tmp file name to use with atomic_install. This will be in the same directory as final_path. The temporary file will have the same extension as finalPath. It the final path is in /dev (/dev/null, /dev/stdout), it is returned unchanged and atomic_tmp_install will do nothing.""" final_dir = os.path.dirname(os.path.normpath(final_path)) # can be empty if final_dir == "/dev": return final_path final_basename = os.path.basename(final_path) final_ext = os.path.splitext(final_path)[1] base_name = f"{final_basename}.{uuid.uuid4()}.tmp{final_ext}" return os.path.join(final_dir, base_name)
[docs] def atomic_install(tmp_path, final_path) -> None: """atomic install of tmp_path as final_path""" if os.path.dirname(os.path.normpath(final_path)) != "/dev": os.rename(tmp_path, final_path)
[docs] @contextmanager def AtomicFileCreate(final_path: str, keep: bool = False) -> Iterator[str]: """Context manager to create a temporary file. Entering returns path to the temporary file in the same directory as finalPath. If the code in context succeeds, the file renamed to its actually name. If an error occurs, the file is not installed and is removed unless keep is specified. """ tmp_path = atomic_tmp_file(final_path) try: yield tmp_path atomic_install(tmp_path, final_path) except Exception: if not keep: try: os.unlink(tmp_path) except Exception: pass raise
[docs] def atomic_copy( src_path: str, dest_path: str, executable: Optional[bool] = None ) -> None: """Copy a file using posix atomic creations semantics.""" if executable is None: executable = os.stat(src_path).st_mode & stat.S_IXUSR != 0 with AtomicFileCreate(dest_path) as dest_path_tmp: shutil.copyfile(src_path, dest_path_tmp) if executable: os.chmod(dest_path_tmp, os.stat(dest_path_tmp).st_mode | stat.S_IXUSR)
[docs] def atomic_copyobj( src_fh: BytesIO, dest_path: str, length: int = 16384, executable: bool = False ) -> None: """Copy an open file using posix atomic creations semantics.""" with AtomicFileCreate(dest_path) as dest_path_tmp: with open(dest_path_tmp, "wb") as dest_path_fh: shutil.copyfileobj(src_fh, dest_path_fh, length=length) if executable: os.chmod(dest_path_tmp, os.stat(dest_path_tmp).st_mode | stat.S_IXUSR)
[docs] def make_public_dir(in_directory: str, suggested_name: Optional[str] = None) -> str: """ Make a publicly-accessible directory in the given directory. :param suggested_name: Use this directory name first if possible. Try to make a random directory name with length 4 that doesn't exist, with the given prefix. Otherwise, try length 5, length 6, etc, up to a max of 32 (len of uuid4 with dashes replaced). This function's purpose is mostly to avoid having long file names when generating directories. If somehow this fails, which should be incredibly unlikely, default to a normal uuid4, which was our old default. """ if suggested_name is not None: generated_dir_path: str = os.path.join(in_directory, suggested_name) try: os.mkdir(generated_dir_path) os.chmod(generated_dir_path, 0o777) return generated_dir_path except FileExistsError: pass for i in range( 4, 32 + 1 ): # make random uuids and truncate to lengths starting at 4 and working up to max 32 for _ in range(10): # make 10 attempts for each length truncated_uuid: str = str(uuid.uuid4()).replace("-", "")[:i] generated_dir_path: str = os.path.join(in_directory, truncated_uuid) try: os.mkdir(generated_dir_path) os.chmod(generated_dir_path, 0o777) return generated_dir_path except FileExistsError: pass this_should_never_happen: str = os.path.join(in_directory, str(uuid.uuid4())) os.mkdir(this_should_never_happen) os.chmod(this_should_never_happen, 0o777) return this_should_never_happen
[docs] def try_path(path: str, min_size: int = 100 * 1024 * 1024) -> Optional[str]: """ Try to use the given path. Return it if it exists or can be made, and we can make things within it, or None otherwise. :param min_size: Reject paths on filesystems smaller than this many bytes. """ try: os.makedirs(path, exist_ok=True) except OSError: # Maybe we lack permissions return None if not os.path.exists(path): # We didn't manage to make it return None if not os.access(path, os.W_OK): # It doesn't look writable return None try: stats = os.statvfs(path) except OSError: # Maybe we lack permissions return None # Is the filesystem big enough? # We need to look at the FS size and not the free space so we don't change # over to a different filesystem when this one fills up. fs_size = stats.f_frsize * stats.f_blocks if fs_size < min_size: # Too small return None return path
[docs] class WriteWatchingStream: """ A stream wrapping class that calls any functions passed to onWrite() with the number of bytes written for every write. Not seekable. """ def __init__(self, backingStream: IO[Any]) -> None: """ Wrap the given backing stream. """ self.backingStream = backingStream # We have no write listeners yet self.writeListeners = []
[docs] def onWrite(self, listener: Callable[[int], None]) -> None: """ Call the given listener with the number of bytes written on every write. """ self.writeListeners.append(listener)
# Implement the file API from https://docs.python.org/2.4/lib/bltin-file-objects.html
[docs] def write(self, data): """ Write the given data to the file. """ # Do the write self.backingStream.write(data) for listener in self.writeListeners: # Send out notifications listener(len(data))
[docs] def writelines(self, datas): """ Write each string from the given iterable, without newlines. """ for data in datas: self.write(data)
[docs] def flush(self): """ Flush the backing stream. """ self.backingStream.flush()
[docs] def close(self): """ Close the backing stream. """ self.backingStream.close()
[docs] class ReadableFileObj(Protocol): """ Protocol that is more specific than what file_digest takes as an argument. Also guarantees a read() method. Would extend the protocol from Typeshed for hashlib but those are only declared for 3.11+. """
[docs] def readinto(self, buf: bytearray, /) -> int: ...
[docs] def readable(self) -> bool: ...
[docs] def read(self, number: int) -> bytes: ...
# hashlib._Hash seems to not appear at runtime
[docs] def file_digest(f: ReadableFileObj, alg_name: str) -> "hashlib._Hash": """ Polyfilled hashlib.file_digest that works on Python <3.11. """ if sys.version_info >= (3, 11): return hashlib.file_digest(f, alg_name) BUFFER_SIZE = 1024 * 1024 hasher = hashlib.new(alg_name) buffer = f.read(BUFFER_SIZE) while buffer: hasher.update(buffer) buffer = f.read(BUFFER_SIZE) return hasher