Source code for toil.lib.throttle
# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 5.14.2018: copied into Toil from https://github.com/BD2KGenomics/bd2k-python-lib
import threading
import time
from typing import Union
[docs]
class LocalThrottle:
"""
A thread-safe rate limiter that throttles each thread independently. Can be used as a
function or method decorator or as a simple object, via its .throttle() method.
The use as a decorator is deprecated in favor of throttle().
"""
def __init__(self, min_interval: int) -> None:
"""
Initialize this local throttle.
:param min_interval: The minimum interval in seconds between invocations of the throttle
method or, if this throttle is used as a decorator, invocations of the decorated method.
"""
self.min_interval = min_interval
self.per_thread = threading.local()
self.per_thread.last_invocation = None
[docs]
def throttle(self, wait: bool = True) -> bool:
"""
If the wait parameter is True, this method returns True after suspending the current
thread as necessary to ensure that no less than the configured minimum interval has
passed since the last invocation of this method in the current thread returned True.
If the wait parameter is False, this method immediatley returns True (if at least the
configured minimum interval has passed since the last time this method returned True in
the current thread) or False otherwise.
"""
now = time.time()
last_invocation = self.per_thread.last_invocation
if last_invocation is not None:
interval = now - last_invocation
if interval < self.min_interval:
if wait:
remainder = self.min_interval - interval
time.sleep(remainder)
else:
return False
self.per_thread.last_invocation = now
return True
[docs]
def __call__(self, function):
def wrapper(*args, **kwargs):
self.throttle()
return function(*args, **kwargs)
return wrapper
[docs]
class throttle:
"""
A context manager for ensuring that the execution of its body takes at least a given amount
of time, sleeping if necessary. It is a simpler version of LocalThrottle if used as a
decorator.
Ensures that body takes at least the given amount of time.
>>> start = time.time()
>>> with throttle(1):
... pass
>>> 1 <= time.time() - start <= 1.1
True
Ditto when used as a decorator.
>>> @throttle(1)
... def f():
... pass
>>> start = time.time()
>>> f()
>>> 1 <= time.time() - start <= 1.1
True
If the body takes longer by itself, don't throttle.
>>> start = time.time()
>>> with throttle(1):
... time.sleep(2)
>>> 2 <= time.time() - start <= 2.1
True
Ditto when used as a decorator.
>>> @throttle(1)
... def f():
... time.sleep(2)
>>> start = time.time()
>>> f()
>>> 2 <= time.time() - start <= 2.1
True
If an exception occurs, don't throttle.
>>> start = time.time()
>>> try:
... with throttle(1):
... raise ValueError('foo')
... except ValueError:
... end = time.time()
... raise
Traceback (most recent call last):
...
ValueError: foo
>>> 0 <= end - start <= 0.1
True
Ditto when used as a decorator.
>>> @throttle(1)
... def f():
... raise ValueError('foo')
>>> start = time.time()
>>> try:
... f()
... except ValueError:
... end = time.time()
... raise
Traceback (most recent call last):
...
ValueError: foo
>>> 0 <= end - start <= 0.1
True
"""
def __init__(self, min_interval: Union[int, float]) -> None:
self.min_interval = min_interval
[docs]
def __enter__(self):
self.start = time.time()
[docs]
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
duration = time.time() - self.start
remainder = self.min_interval - duration
if remainder > 0:
time.sleep(remainder)
[docs]
def __call__(self, function):
def wrapper(*args, **kwargs):
with self:
return function(*args, **kwargs)
return wrapper