Source code for toil.lib.url

# Copyright (C) 2015-2025 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from abc import ABC, ABCMeta, abstractmethod
from typing import (
    IO,
    TYPE_CHECKING,
    Any,
    Callable,
    ContextManager,
    Literal,
    Optional,
    Union,
    cast,
    overload,
    Type,
)
from urllib.parse import ParseResult, urlparse

from toil.lib.exceptions import UnimplementedURLException
from toil.lib.memoize import memoize
from toil.lib.plugins import register_plugin, get_plugin

try:
    from botocore.exceptions import ProxyConnectionError
except ImportError:

[docs] class ProxyConnectionError(BaseException): # type: ignore """Dummy class."""
logger = logging.getLogger(__name__)
[docs] class URLAccess: """ Widget for accessing external storage (URLs). """
[docs] @classmethod def url_exists(cls, src_uri: str) -> bool: """ Return True if the file at the given URI exists, and False otherwise. May raise an error if file existence cannot be determined. :param src_uri: URL that points to a file or object in the storage mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. """ parseResult = urlparse(src_uri) otherCls = cls._find_url_implementation(parseResult) return otherCls._url_exists(parseResult)
[docs] @classmethod def get_size(cls, src_uri: str) -> Optional[int]: """ Get the size in bytes of the file at the given URL, or None if it cannot be obtained. :param src_uri: URL that points to a file or object in the storage mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. """ parseResult = urlparse(src_uri) otherCls = cls._find_url_implementation(parseResult) return otherCls._get_size(parseResult)
[docs] @classmethod def get_is_directory(cls, src_uri: str) -> bool: """ Return True if the thing at the given URL is a directory, and False if it is a file. The URL may or may not end in '/'. """ parseResult = urlparse(src_uri) otherCls = cls._find_url_implementation(parseResult) return otherCls._get_is_directory(parseResult)
[docs] @classmethod def list_url(cls, src_uri: str) -> list[str]: """ List the directory at the given URL. Returned path components can be joined with '/' onto the passed URL to form new URLs. Those that end in '/' correspond to directories. The provided URL may or may not end with '/'. Currently supported schemes are: - 's3' for objects in Amazon S3 e.g. s3://bucket/prefix/ - 'file' for local files e.g. file:///local/dir/path/ :param str src_uri: URL that points to a directory or prefix in the storage mechanism of a supported URL scheme e.g. a prefix in an AWS s3 bucket. :return: A list of URL components in the given directory, already URL-encoded. """ parseResult = urlparse(src_uri) otherCls = cls._find_url_implementation(parseResult) return otherCls._list_url(parseResult)
[docs] @classmethod def read_from_url(cls, src_uri: str, writable: IO[bytes]) -> tuple[int, bool]: """ Read the given URL and write its content into the given writable stream. Raises FileNotFoundError if the URL doesn't exist. :return: The size of the file in bytes and whether the executable permission bit is set """ parseResult = urlparse(src_uri) otherCls = cls._find_url_implementation(parseResult) return otherCls._read_from_url(parseResult, writable)
[docs] @classmethod def open_url(cls, src_uri: str) -> IO[bytes]: """ Read from the given URI. Raises FileNotFoundError if the URL doesn't exist. Has a readable stream interface, unlike :meth:`read_from_url` which takes a writable stream. """ parseResult = urlparse(src_uri) otherCls = cls._find_url_implementation(parseResult) return otherCls._open_url(parseResult)
@classmethod @abstractmethod def _url_exists(cls, url: ParseResult) -> bool: """ Return True if the item at the given URL exists, and Flase otherwise. May raise an error if file existence cannot be determined. """ raise NotImplementedError(f"No implementation for {url}") @classmethod @abstractmethod def _get_size(cls, url: ParseResult) -> Optional[int]: """ Get the size of the object at the given URL, or None if it cannot be obtained. """ raise NotImplementedError(f"No implementation for {url}") @classmethod @abstractmethod def _get_is_directory(cls, url: ParseResult) -> bool: """ Return True if the thing at the given URL is a directory, and False if it is a file or it is known not to exist. The URL may or may not end in '/'. :param url: URL that points to a file or object, or directory or prefix, in the storage mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. """ raise NotImplementedError(f"No implementation for {url}") @classmethod @abstractmethod def _read_from_url(cls, url: ParseResult, writable: IO[bytes]) -> tuple[int, bool]: """ Reads the contents of the object at the specified location and writes it to the given writable stream. Raises FileNotFoundError if the thing at the URL is not found. :param ParseResult url: URL that points to a file or object in the storage mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. :param IO[bytes] writable: a writable stream :return: The size of the file in bytes and whether the executable permission bit is set """ raise NotImplementedError(f"No implementation for {url}") @classmethod @abstractmethod def _list_url(cls, url: ParseResult) -> list[str]: """ List the contents of the given URL, which may or may not end in '/' Returns a list of URL components. Those that end in '/' are meant to be directories, while those that do not are meant to be files. :param ParseResult url: URL that points to a directory or prefix in the storage mechanism of a supported URL scheme e.g. a prefix in an AWS s3 bucket. :return: The children of the given URL, already URL-encoded if appropriate. (If the URL is a bare path, no encoding is done.) """ raise NotImplementedError(f"No implementation for {url}") @classmethod @abstractmethod def _open_url(cls, url: ParseResult) -> IO[bytes]: """ Get a stream of the object at the specified location. Raises FileNotFoundError if the thing at the URL is not found. """ raise NotImplementedError(f"No implementation for {url}") @classmethod @abstractmethod def _write_to_url( cls, readable: Union[IO[bytes], IO[str]], url: ParseResult, executable: bool = False, ) -> None: """ Reads the contents of the given readable stream and writes it to the object at the specified location. Raises FileNotFoundError if the URL doesn't exist. :param Union[IO[bytes], IO[str]] readable: a readable stream :param ParseResult url: URL that points to a file or object in the storage mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. :param bool executable: determines if the file has executable permissions """ raise NotImplementedError(f"No implementation for {url}") @classmethod @abstractmethod def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: """ Returns True if the url access implementation supports the URL's scheme. :param ParseResult url: a parsed URL that may be supported :param bool export: Determines if the url is supported for exported :return bool: returns true if the cls supports the URL """ raise NotImplementedError(f"No implementation for {url}") @classmethod def _find_url_implementation( cls, url: ParseResult, export: bool = False ) -> type["URLAccess"]: """ Returns the URLAccess subclass that supports the given URL. :param ParseResult url: The given URL :param bool export: Determines if the url is supported for exporting """ try: implementation_factory = get_plugin("url_access", url.scheme.lower()) except KeyError: raise UnimplementedURLException(url, "export" if export else "import") try: implementation = cast(Type[URLAccess], implementation_factory()) except (ImportError, ProxyConnectionError): logger.debug( "Unable to import implementation for scheme '%s', as is expected if the corresponding extra was " "omitted at installation time.", url.scheme.lower(), ) raise UnimplementedURLException(url, "export" if export else "import") if implementation._supports_url(url, export): return implementation raise UnimplementedURLException(url, "export" if export else "import")
##### # Built-in url access #####
[docs] def file_job_store_factory() -> type[URLAccess]: from toil.jobStores.fileJobStore import FileJobStore return FileJobStore
[docs] def google_job_store_factory() -> type[URLAccess]: from toil.jobStores.googleJobStore import GoogleJobStore return GoogleJobStore
[docs] def aws_job_store_factory() -> type[URLAccess]: from toil.jobStores.aws.jobStore import AWSJobStore return AWSJobStore
[docs] def job_store_support_factory() -> type[URLAccess]: from toil.jobStores.abstractJobStore import JobStoreSupport return JobStoreSupport
#make sure my py still works and the tests work # can then get rid of _url_access_classes method ##### # Registers all built-in urls ##### register_plugin("url_access", "file", file_job_store_factory) register_plugin("url_access", "gs", google_job_store_factory) register_plugin("url_access", "s3", aws_job_store_factory) register_plugin("url_access", "http", job_store_support_factory) register_plugin("url_access", "https", job_store_support_factory) register_plugin("url_access", "ftp", job_store_support_factory)