CCR/.venv/lib/python3.12/site-packages/python_utils/time.py

409 lines
13 KiB
Python

"""
This module provides utility functions for handling time-related operations.
Functions:
- timedelta_to_seconds: Convert a timedelta to seconds with microseconds as
fraction.
- delta_to_seconds: Convert a timedelta or numeric interval to seconds.
- delta_to_seconds_or_none: Convert a timedelta to seconds or return None.
- format_time: Format a timestamp (timedelta, datetime, or seconds) to a
string.
- timeout_generator: Generate items from an iterable until a timeout is
reached.
- aio_timeout_generator: Asynchronously generate items from an iterable until a
timeout is reached.
- aio_generator_timeout_detector: Detect if an async generator has not yielded
an element for a set amount of time.
- aio_generator_timeout_detector_decorator: Decorator for
aio_generator_timeout_detector.
"""
# pyright: reportUnnecessaryIsInstance=false
import asyncio
import datetime
import functools
import itertools
import time
import python_utils
from python_utils import aio, exceptions, types
_T = types.TypeVar('_T')
_P = types.ParamSpec('_P')
# There might be a better way to get the epoch with tzinfo, please create
# a pull request if you know a better way that functions for Python 2 and 3
epoch = datetime.datetime(year=1970, month=1, day=1)
def timedelta_to_seconds(delta: datetime.timedelta) -> types.Number:
"""Convert a timedelta to seconds with the microseconds as fraction.
Note that this method has become largely obsolete with the
`timedelta.total_seconds()` method introduced in Python 2.7.
>>> from datetime import timedelta
>>> '%d' % timedelta_to_seconds(timedelta(days=1))
'86400'
>>> '%d' % timedelta_to_seconds(timedelta(seconds=1))
'1'
>>> '%.6f' % timedelta_to_seconds(timedelta(seconds=1, microseconds=1))
'1.000001'
>>> '%.6f' % timedelta_to_seconds(timedelta(microseconds=1))
'0.000001'
"""
# Only convert to float if needed
if delta.microseconds:
total = delta.microseconds * 1e-6
else:
total = 0
total += delta.seconds
total += delta.days * 60 * 60 * 24
return total
def delta_to_seconds(interval: types.delta_type) -> types.Number:
"""
Convert a timedelta to seconds.
>>> delta_to_seconds(datetime.timedelta(seconds=1))
1
>>> delta_to_seconds(datetime.timedelta(seconds=1, microseconds=1))
1.000001
>>> delta_to_seconds(1)
1
>>> delta_to_seconds('whatever') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
TypeError: Unknown type ...
"""
if isinstance(interval, datetime.timedelta):
return timedelta_to_seconds(interval)
elif isinstance(interval, (int, float)):
return interval
else:
raise TypeError(f'Unknown type {type(interval)}: {interval!r}')
def delta_to_seconds_or_none(
interval: types.Optional[types.delta_type],
) -> types.Optional[types.Number]:
"""Convert a timedelta to seconds or return None."""
if interval is None:
return None
else:
return delta_to_seconds(interval)
def format_time(
timestamp: types.timestamp_type,
precision: datetime.timedelta = datetime.timedelta(seconds=1),
) -> str:
"""Formats timedelta/datetime/seconds.
>>> format_time('1')
'0:00:01'
>>> format_time(1.234)
'0:00:01'
>>> format_time(1)
'0:00:01'
>>> format_time(datetime.datetime(2000, 1, 2, 3, 4, 5, 6))
'2000-01-02 03:04:05'
>>> format_time(datetime.date(2000, 1, 2))
'2000-01-02'
>>> format_time(datetime.timedelta(seconds=3661))
'1:01:01'
>>> format_time(None)
'--:--:--'
>>> format_time(format_time) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
TypeError: Unknown type ...
"""
precision_seconds = precision.total_seconds()
if isinstance(timestamp, str):
timestamp = float(timestamp)
if isinstance(timestamp, (int, float)):
try:
timestamp = datetime.timedelta(seconds=timestamp)
except OverflowError: # pragma: no cover
timestamp = None
if isinstance(timestamp, datetime.timedelta):
seconds = timestamp.total_seconds()
# Truncate the number to the given precision
seconds = seconds - (seconds % precision_seconds)
return str(datetime.timedelta(seconds=seconds))
elif isinstance(timestamp, datetime.datetime): # pragma: no cover
# Python 2 doesn't have the timestamp method
if hasattr(timestamp, 'timestamp'):
seconds = timestamp.timestamp()
else:
seconds = timedelta_to_seconds(timestamp - epoch)
# Truncate the number to the given precision
seconds = seconds - (seconds % precision_seconds)
try: # pragma: no cover
dt = datetime.datetime.fromtimestamp(seconds)
except (ValueError, OSError): # pragma: no cover
dt = datetime.datetime.max
return str(dt)
elif isinstance(timestamp, datetime.date):
return str(timestamp)
elif timestamp is None:
return '--:--:--'
else:
raise TypeError(f'Unknown type {type(timestamp)}: {timestamp!r}')
@types.overload
def _to_iterable(
iterable: types.Union[
types.Callable[[], types.AsyncIterable[_T]],
types.AsyncIterable[_T],
],
) -> types.AsyncIterable[_T]: ...
@types.overload
def _to_iterable(
iterable: types.Union[
types.Callable[[], types.Iterable[_T]], types.Iterable[_T]
],
) -> types.Iterable[_T]: ...
def _to_iterable(
iterable: types.Union[
types.Iterable[_T],
types.Callable[[], types.Iterable[_T]],
types.AsyncIterable[_T],
types.Callable[[], types.AsyncIterable[_T]],
],
) -> types.Union[types.Iterable[_T], types.AsyncIterable[_T]]:
if callable(iterable):
return iterable()
else:
return iterable
def timeout_generator(
timeout: types.delta_type,
interval: types.delta_type = datetime.timedelta(seconds=1),
iterable: types.Union[
types.Iterable[_T], types.Callable[[], types.Iterable[_T]]
] = itertools.count, # type: ignore[assignment]
interval_multiplier: float = 1.0,
maximum_interval: types.Optional[types.delta_type] = None,
) -> types.Iterable[_T]:
"""
Generator that walks through the given iterable (a counter by default)
until the float_timeout is reached with a configurable float_interval
between items.
This can be used to limit the time spent on a slow operation. This can be
useful for testing slow APIs so you get a small sample of the data in a
reasonable amount of time.
>>> for i in timeout_generator(0.1, 0.06):
... # Put your slow code here
... print(i)
0
1
2
>>> timeout = datetime.timedelta(seconds=0.1)
>>> interval = datetime.timedelta(seconds=0.06)
>>> for i in timeout_generator(timeout, interval, itertools.count()):
... print(i)
0
1
2
>>> for i in timeout_generator(1, interval=0.1, iterable='ab'):
... print(i)
a
b
>>> timeout = datetime.timedelta(seconds=0.1)
>>> interval = datetime.timedelta(seconds=0.06)
>>> for i in timeout_generator(timeout, interval, interval_multiplier=2):
... print(i)
0
1
2
"""
float_interval: float = delta_to_seconds(interval)
float_maximum_interval: types.Optional[float] = delta_to_seconds_or_none(
maximum_interval
)
iterable_ = _to_iterable(iterable)
end = delta_to_seconds(timeout) + time.perf_counter()
for item in iterable_:
yield item
if time.perf_counter() >= end:
break
time.sleep(float_interval)
float_interval *= interval_multiplier
if float_maximum_interval:
float_interval = min(float_interval, float_maximum_interval)
async def aio_timeout_generator(
timeout: types.delta_type, # noqa: ASYNC109
interval: types.delta_type = datetime.timedelta(seconds=1),
iterable: types.Union[
types.AsyncIterable[_T], types.Callable[..., types.AsyncIterable[_T]]
] = aio.acount,
interval_multiplier: float = 1.0,
maximum_interval: types.Optional[types.delta_type] = None,
) -> types.AsyncGenerator[_T, None]:
"""
Async generator that walks through the given async iterable (a counter by
default) until the float_timeout is reached with a configurable
float_interval between items.
The interval_exponent automatically increases the float_timeout with each
run. Note that if the float_interval is less than 1, 1/interval_exponent
will be used so the float_interval is always growing. To double the
float_interval with each run, specify 2.
Doctests and asyncio are not friends, so no examples. But this function is
effectively the same as the `timeout_generator` but it uses `async for`
instead.
"""
float_interval: float = delta_to_seconds(interval)
float_maximum_interval: types.Optional[float] = delta_to_seconds_or_none(
maximum_interval
)
iterable_ = _to_iterable(iterable)
end = delta_to_seconds(timeout) + time.perf_counter()
async for item in iterable_: # pragma: no branch
yield item
if time.perf_counter() >= end:
break
await asyncio.sleep(float_interval)
float_interval *= interval_multiplier
if float_maximum_interval: # pragma: no branch
float_interval = min(float_interval, float_maximum_interval)
async def aio_generator_timeout_detector(
generator: types.AsyncGenerator[_T, None],
timeout: types.Optional[types.delta_type] = None, # noqa: ASYNC109
total_timeout: types.Optional[types.delta_type] = None,
on_timeout: types.Optional[
types.Callable[
[
types.AsyncGenerator[_T, None],
types.Optional[types.delta_type],
types.Optional[types.delta_type],
BaseException,
],
types.Any,
]
] = exceptions.reraise,
**on_timeout_kwargs: types.Mapping[types.Text, types.Any],
) -> types.AsyncGenerator[_T, None]:
"""
This function is used to detect if an asyncio generator has not yielded
an element for a set amount of time.
The `on_timeout` argument is called with the `generator`, `timeout`,
`total_timeout`, `exception` and the extra `**kwargs` to this function as
arguments.
If `on_timeout` is not specified, the exception is reraised.
If `on_timeout` is `None`, the exception is silently ignored and the
generator will finish as normal.
"""
if total_timeout is None:
total_timeout_end = None
else:
total_timeout_end = time.perf_counter() + delta_to_seconds(
total_timeout
)
timeout_s = python_utils.delta_to_seconds_or_none(timeout)
while True:
try:
if total_timeout_end and time.perf_counter() >= total_timeout_end:
raise asyncio.TimeoutError( # noqa: TRY301
'Total timeout reached'
)
if timeout_s:
yield await asyncio.wait_for(generator.__anext__(), timeout_s)
else:
yield await generator.__anext__()
except asyncio.TimeoutError as exception: # noqa: PERF203
if on_timeout is not None:
await on_timeout(
generator,
timeout,
total_timeout,
exception,
**on_timeout_kwargs,
)
break
except StopAsyncIteration:
break
def aio_generator_timeout_detector_decorator(
timeout: types.Optional[types.delta_type] = None,
total_timeout: types.Optional[types.delta_type] = None,
on_timeout: types.Optional[
types.Callable[
[
types.AsyncGenerator[types.Any, None],
types.Optional[types.delta_type],
types.Optional[types.delta_type],
BaseException,
],
types.Any,
]
] = exceptions.reraise,
**on_timeout_kwargs: types.Mapping[types.Text, types.Any],
) -> types.Callable[
[types.Callable[_P, types.AsyncGenerator[_T, None]]],
types.Callable[_P, types.AsyncGenerator[_T, None]],
]:
"""A decorator wrapper for aio_generator_timeout_detector."""
def _timeout_detector_decorator(
generator: types.Callable[_P, types.AsyncGenerator[_T, None]],
) -> types.Callable[_P, types.AsyncGenerator[_T, None]]:
"""The decorator itself."""
@functools.wraps(generator)
def wrapper(
*args: _P.args,
**kwargs: _P.kwargs,
) -> types.AsyncGenerator[_T, None]:
return aio_generator_timeout_detector(
generator(*args, **kwargs),
timeout,
total_timeout,
on_timeout,
**on_timeout_kwargs,
)
return wrapper
return _timeout_detector_decorator