705 lines
23 KiB
Python
705 lines
23 KiB
Python
import warnings
|
|
|
|
from collections import namedtuple
|
|
from .const import Bound, inf
|
|
|
|
|
|
Atomic = namedtuple("Atomic", ["left", "lower", "upper", "right"])
|
|
|
|
|
|
def mergeable(a, b):
|
|
"""
|
|
Tester whether two atomic intervals can be merged (i.e. they overlap or
|
|
are adjacent).
|
|
|
|
:param a: an atomic interval.
|
|
:param b: an atomic interval.
|
|
:return: True if mergeable, False otherwise.
|
|
"""
|
|
warnings.warn(
|
|
"This function is deprecated, use Interval._mergeable instead.",
|
|
DeprecationWarning,
|
|
)
|
|
return Interval._mergeable(a, b)
|
|
|
|
|
|
class Interval:
|
|
"""
|
|
This class represents an interval.
|
|
|
|
An interval is an (automatically simplified) union of atomic intervals.
|
|
It can be created with Interval.from_atomic(...) or by passing Interval
|
|
instances to __init__.
|
|
"""
|
|
|
|
__slots__ = ("_intervals",)
|
|
__match_args__ = ("left", "lower", "upper", "right")
|
|
|
|
def __init__(self, *intervals):
|
|
"""
|
|
Create a disjunction of zero, one or more intervals.
|
|
|
|
:param intervals: zero, one or more intervals.
|
|
"""
|
|
self._intervals = list()
|
|
|
|
for interval in intervals:
|
|
if isinstance(interval, Interval):
|
|
if not interval.empty:
|
|
self._intervals.extend(interval._intervals)
|
|
else:
|
|
raise TypeError("Parameters must be Interval instances")
|
|
|
|
if len(self._intervals) > 0:
|
|
# Sort intervals by lower bound, closed first.
|
|
self._intervals.sort(key=lambda i: (i.lower, i.left is Bound.OPEN))
|
|
|
|
i = 0
|
|
# Try to merge consecutive intervals
|
|
while i < len(self._intervals) - 1:
|
|
current = self._intervals[i]
|
|
successor = self._intervals[i + 1]
|
|
|
|
if self.__class__._mergeable(current, successor):
|
|
if current.lower == successor.lower:
|
|
lower = current.lower
|
|
left = (
|
|
current.left
|
|
if current.left == Bound.CLOSED
|
|
else successor.left
|
|
)
|
|
|
|
else:
|
|
lower = min(current.lower, successor.lower)
|
|
left = (
|
|
current.left if lower == current.lower else successor.left
|
|
)
|
|
|
|
if current.upper == successor.upper:
|
|
upper = current.upper
|
|
right = (
|
|
current.right
|
|
if current.right == Bound.CLOSED
|
|
else successor.right
|
|
)
|
|
else:
|
|
upper = max(current.upper, successor.upper)
|
|
right = (
|
|
current.right if upper == current.upper else successor.right
|
|
)
|
|
|
|
union = Atomic(left, lower, upper, right)
|
|
self._intervals.pop(i) # pop current
|
|
self._intervals.pop(i) # pop successor
|
|
self._intervals.insert(i, union)
|
|
else:
|
|
i = i + 1
|
|
|
|
@classmethod
|
|
def from_atomic(cls, left, lower, upper, right):
|
|
"""
|
|
Create an Interval instance containing a single atomic interval.
|
|
|
|
:param left: either CLOSED or OPEN.
|
|
:param lower: value of the lower bound.
|
|
:param upper: value of the upper bound.
|
|
:param right: either CLOSED or OPEN.
|
|
"""
|
|
left = left if lower not in [inf, -inf] else Bound.OPEN
|
|
right = right if upper not in [inf, -inf] else Bound.OPEN
|
|
|
|
instance = cls()
|
|
# Check for non-emptiness (otherwise keep instance._intervals = [])
|
|
if lower < upper or (
|
|
lower == upper and left == Bound.CLOSED and right == Bound.CLOSED
|
|
):
|
|
instance._intervals = [Atomic(left, lower, upper, right)]
|
|
return instance
|
|
|
|
@classmethod
|
|
def _mergeable(cls, a, b):
|
|
"""
|
|
Tester whether two atomic intervals can be merged (i.e. they overlap or
|
|
are adjacent).
|
|
|
|
:param a: an atomic interval.
|
|
:param b: an atomic interval.
|
|
:return: True if mergeable, False otherwise.
|
|
"""
|
|
if a.lower < b.lower or (a.lower == b.lower and a.left == Bound.CLOSED):
|
|
first, second = a, b
|
|
else:
|
|
first, second = b, a
|
|
|
|
if first.upper == second.lower:
|
|
return first.right == Bound.CLOSED or second.left == Bound.CLOSED
|
|
|
|
return first.upper > second.lower
|
|
|
|
@property
|
|
def left(self):
|
|
"""
|
|
Lowest left boundary is either CLOSED or OPEN.
|
|
"""
|
|
if self.empty:
|
|
return Bound.OPEN
|
|
return self._intervals[0].left
|
|
|
|
@property
|
|
def lower(self):
|
|
"""
|
|
Lowest lower bound value.
|
|
"""
|
|
if self.empty:
|
|
return inf
|
|
return self._intervals[0].lower
|
|
|
|
@property
|
|
def upper(self):
|
|
"""
|
|
Highest upper bound value.
|
|
"""
|
|
if self.empty:
|
|
return -inf
|
|
return self._intervals[-1].upper
|
|
|
|
@property
|
|
def right(self):
|
|
"""
|
|
Highest right boundary is either CLOSED or OPEN.
|
|
"""
|
|
if self.empty:
|
|
return Bound.OPEN
|
|
return self._intervals[-1].right
|
|
|
|
@property
|
|
def empty(self):
|
|
"""
|
|
True if interval is empty, False otherwise.
|
|
"""
|
|
return len(self._intervals) == 0
|
|
|
|
@property
|
|
def atomic(self):
|
|
"""
|
|
True if this interval is atomic, False otherwise.
|
|
An interval is atomic if it is empty or composed of a single interval.
|
|
"""
|
|
return len(self._intervals) <= 1
|
|
|
|
@property
|
|
def enclosure(self):
|
|
"""
|
|
Return the smallest interval composed of a single atomic interval that encloses
|
|
the current interval.
|
|
|
|
:return: an Interval instance.
|
|
"""
|
|
return self.__class__.from_atomic(self.left, self.lower, self.upper, self.right)
|
|
|
|
def replace(
|
|
self, left=None, lower=None, upper=None, right=None, *, ignore_inf=True
|
|
):
|
|
"""
|
|
Create a new interval based on the current one and the provided values.
|
|
|
|
If current interval is not atomic, it is extended or restricted such that
|
|
its enclosure satisfies the new bounds. In other words, its new enclosure
|
|
will be equal to self.enclosure.replace(left, lower, upper, right).
|
|
|
|
Callable can be passed instead of values. In that case, it is called with the
|
|
current corresponding value except if ignore_inf if set (default) and the
|
|
corresponding bound is an infinity.
|
|
|
|
:param left: (a function of) left boundary.
|
|
:param lower: (a function of) value of the lower bound.
|
|
:param upper: (a function of) value of the upper bound.
|
|
:param right: (a function of) right boundary.
|
|
:param ignore_inf: ignore infinities if functions are provided (default
|
|
is True).
|
|
:return: an Interval instance
|
|
"""
|
|
enclosure = self.enclosure
|
|
|
|
if callable(left):
|
|
left = left(enclosure.left)
|
|
else:
|
|
left = enclosure.left if left is None else left
|
|
|
|
if callable(lower):
|
|
if ignore_inf and enclosure.lower in [-inf, inf]:
|
|
lower = enclosure.lower
|
|
else:
|
|
lower = lower(enclosure.lower)
|
|
else:
|
|
lower = enclosure.lower if lower is None else lower
|
|
|
|
if callable(upper):
|
|
if ignore_inf and enclosure.upper in [-inf, inf]:
|
|
upper = enclosure.upper
|
|
else:
|
|
upper = upper(enclosure.upper)
|
|
else:
|
|
upper = enclosure.upper if upper is None else upper
|
|
|
|
if callable(right):
|
|
right = right(enclosure.right)
|
|
else:
|
|
right = enclosure.right if right is None else right
|
|
|
|
if self.atomic:
|
|
return self.__class__.from_atomic(left, lower, upper, right)
|
|
|
|
n_interval = self & self.__class__.from_atomic(left, lower, upper, right)
|
|
|
|
if n_interval.atomic:
|
|
return n_interval.replace(left, lower, upper, right)
|
|
else:
|
|
lowest = n_interval[0].replace(left=left, lower=lower)
|
|
highest = n_interval[-1].replace(upper=upper, right=right)
|
|
return self.__class__(lowest, *n_interval[1:-1], highest)
|
|
|
|
def apply(self, func):
|
|
"""
|
|
Apply a function on each of the underlying atomic intervals and return their
|
|
union as a new interval instance
|
|
|
|
Given function is expected to return an interval (possibly empty or not
|
|
atomic) or a 4-uple (left, lower, upper, right) whose values correspond to
|
|
the parameters of Interval.from_atomic(left, lower, upper, right).
|
|
|
|
This method is merely a shortcut for Interval(*list(map(func, self))).
|
|
|
|
:param func: function to apply on each underlying atomic interval.
|
|
:return: an Interval instance.
|
|
"""
|
|
intervals = []
|
|
|
|
for i in self:
|
|
value = func(i)
|
|
|
|
if isinstance(value, Interval):
|
|
intervals.append(value)
|
|
elif isinstance(value, tuple):
|
|
intervals.append(self.__class__.from_atomic(*value))
|
|
else:
|
|
raise TypeError(
|
|
"Unsupported return type {} for {}".format(type(value), value)
|
|
)
|
|
|
|
return self.__class__(*intervals)
|
|
|
|
def adjacent(self, other):
|
|
"""
|
|
Test if two intervals are adjacent.
|
|
|
|
Two intervals are adjacent if they do not overlap and their union form a
|
|
single atomic interval.
|
|
|
|
While this definition corresponds to the usual notion of adjacency for atomic
|
|
intervals, it has stronger requirements for non-atomic ones since it requires
|
|
all underlying atomic intervals to be adjacent (i.e. that one
|
|
interval fills the gaps between the atomic intervals of the other one).
|
|
|
|
:param other: an interval.
|
|
:return: True if intervals are adjacent, False otherwise.
|
|
"""
|
|
return (self & other).empty and (self | other).atomic
|
|
|
|
def overlaps(self, other):
|
|
"""
|
|
Test if two intervals overlap (i.e. if their intersection is non-empty).
|
|
|
|
:param other: an interval.
|
|
:return: True if intervals overlap, False otherwise.
|
|
"""
|
|
if isinstance(other, Interval):
|
|
if self.upper < other.lower or self.lower > other.upper:
|
|
# Early out for clearly non-overlapping intervals
|
|
return False
|
|
|
|
i_iter = iter(self)
|
|
o_iter = iter(other)
|
|
i_current = next(i_iter, None)
|
|
o_current = next(o_iter, None)
|
|
|
|
while i_current is not None and o_current is not None:
|
|
if i_current < o_current:
|
|
i_current = next(i_iter, None)
|
|
elif o_current < i_current:
|
|
o_current = next(o_iter, None)
|
|
else:
|
|
return True
|
|
return False
|
|
else:
|
|
raise TypeError("Unsupported type {} for {}".format(type(other), other))
|
|
|
|
def intersection(self, other):
|
|
"""
|
|
Return the intersection of two intervals.
|
|
|
|
:param other: an interval.
|
|
:return: the intersection of the intervals.
|
|
"""
|
|
return self & other
|
|
|
|
def union(self, other):
|
|
"""
|
|
Return the union of two intervals.
|
|
|
|
:param other: an interval.
|
|
:return: the union of the intervals.
|
|
"""
|
|
return self | other
|
|
|
|
def contains(self, item):
|
|
"""
|
|
Test if given item is contained in this interval.
|
|
This method accepts intervals and arbitrary comparable values.
|
|
|
|
:param item: an interval or any arbitrary comparable value.
|
|
:return: True if given item is contained, False otherwise.
|
|
"""
|
|
return item in self
|
|
|
|
def complement(self):
|
|
"""
|
|
Return the complement of this interval.
|
|
|
|
:return: the complement of this interval.
|
|
"""
|
|
return ~self
|
|
|
|
def difference(self, other):
|
|
"""
|
|
Return the difference of two intervals.
|
|
|
|
:param other: an interval.
|
|
:return: the difference of the intervals.
|
|
"""
|
|
return self - other
|
|
|
|
def __getattr__(self, name):
|
|
if name == "__next__":
|
|
# Hack for a better representation of intervals within pandas
|
|
# See https://github.com/AlexandreDecan/portion/pull/54
|
|
try:
|
|
import inspect
|
|
|
|
if inspect.stack()[1].function == "pprint_thing":
|
|
return
|
|
except Exception:
|
|
pass
|
|
raise AttributeError
|
|
|
|
def __len__(self):
|
|
return len(self._intervals)
|
|
|
|
def __iter__(self):
|
|
yield from (self.__class__.from_atomic(*i) for i in self._intervals)
|
|
|
|
def __getitem__(self, item):
|
|
if isinstance(item, slice):
|
|
return self.__class__(
|
|
*[self.__class__.from_atomic(*i) for i in self._intervals[item]]
|
|
)
|
|
else:
|
|
return self.__class__.from_atomic(*self._intervals[item])
|
|
|
|
def __and__(self, other):
|
|
if not isinstance(other, Interval):
|
|
return NotImplemented
|
|
|
|
if self.upper < other.lower or self.lower > other.upper:
|
|
# Early out for non-overlapping intervals
|
|
return self.__class__()
|
|
elif self.atomic and other.atomic:
|
|
if self.lower == other.lower:
|
|
lower = self.lower
|
|
left = self.left if self.left == Bound.OPEN else other.left
|
|
else:
|
|
lower = max(self.lower, other.lower)
|
|
left = self.left if lower == self.lower else other.left
|
|
|
|
if self.upper == other.upper:
|
|
upper = self.upper
|
|
right = self.right if self.right == Bound.OPEN else other.right
|
|
else:
|
|
upper = min(self.upper, other.upper)
|
|
right = self.right if upper == self.upper else other.right
|
|
|
|
return self.__class__.from_atomic(left, lower, upper, right)
|
|
else:
|
|
intersections = []
|
|
|
|
i_iter = iter(self)
|
|
o_iter = iter(other)
|
|
i_current = next(i_iter, None)
|
|
o_current = next(o_iter, None)
|
|
|
|
while i_current is not None and o_current is not None:
|
|
if i_current < o_current:
|
|
i_current = next(i_iter, None)
|
|
elif o_current < i_current:
|
|
o_current = next(o_iter, None)
|
|
else:
|
|
# i_current and o_current have an overlap
|
|
intersections.append(i_current & o_current)
|
|
|
|
if i_current <= o_current:
|
|
# o_current can still intersect next i
|
|
i_current = next(i_iter, None)
|
|
elif o_current <= i_current:
|
|
# i_current can still intersect next o
|
|
o_current = next(o_iter, None)
|
|
else:
|
|
assert False
|
|
|
|
return self.__class__(*intersections)
|
|
|
|
def __or__(self, other):
|
|
if isinstance(other, Interval):
|
|
return self.__class__(self, other)
|
|
else:
|
|
return NotImplemented
|
|
|
|
def __contains__(self, item):
|
|
if isinstance(item, Interval):
|
|
if item.empty:
|
|
return True
|
|
elif self.upper < item.lower or self.lower > item.upper:
|
|
# Early out for non-overlapping intervals
|
|
return False
|
|
elif self.atomic:
|
|
left = item.lower > self.lower or (
|
|
item.lower == self.lower
|
|
and (item.left == self.left or self.left == Bound.CLOSED)
|
|
)
|
|
right = item.upper < self.upper or (
|
|
item.upper == self.upper
|
|
and (item.right == self.right or self.right == Bound.CLOSED)
|
|
)
|
|
return left and right
|
|
else:
|
|
selfiter = iter(self)
|
|
current = next(selfiter)
|
|
|
|
for other in item:
|
|
while current < other:
|
|
try:
|
|
current = next(selfiter)
|
|
except StopIteration:
|
|
return False
|
|
|
|
# here current and other could have an overlap
|
|
if other not in current:
|
|
return False
|
|
return True
|
|
else:
|
|
# Item is a value
|
|
if self.upper < item or self.lower > item:
|
|
return False
|
|
|
|
for i in self._intervals:
|
|
left = (item >= i.lower) if i.left == Bound.CLOSED else (item > i.lower)
|
|
right = (
|
|
(item <= i.upper) if i.right == Bound.CLOSED else (item < i.upper)
|
|
)
|
|
if left and right:
|
|
return True
|
|
return False
|
|
|
|
def __invert__(self):
|
|
complements = [
|
|
self.__class__.from_atomic(Bound.OPEN, -inf, self.lower, ~self.left),
|
|
self.__class__.from_atomic(~self.right, self.upper, inf, Bound.OPEN),
|
|
]
|
|
|
|
for i, j in zip(self._intervals[:-1], self._intervals[1:]):
|
|
complements.append(
|
|
self.__class__.from_atomic(~i.right, i.upper, j.lower, ~j.left)
|
|
)
|
|
|
|
return self.__class__(*complements)
|
|
|
|
def __sub__(self, other):
|
|
if isinstance(other, Interval):
|
|
return self & ~other
|
|
else:
|
|
return NotImplemented
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, Interval):
|
|
if len(other._intervals) != len(self._intervals):
|
|
return False
|
|
|
|
for a, b in zip(self._intervals, other._intervals):
|
|
eq = (
|
|
a.left == b.left
|
|
and a.lower == b.lower
|
|
and a.upper == b.upper
|
|
and a.right == b.right
|
|
)
|
|
if not eq:
|
|
return False
|
|
return True
|
|
else:
|
|
return NotImplemented
|
|
|
|
def __lt__(self, other):
|
|
if isinstance(other, Interval):
|
|
if self.empty or other.empty:
|
|
return False
|
|
|
|
if self.right == Bound.OPEN or other.left == Bound.OPEN:
|
|
return self.upper <= other.lower
|
|
else:
|
|
return self.upper < other.lower
|
|
else:
|
|
warnings.warn(
|
|
"Comparing an interval and a value is deprecated. Convert value to singleton first.",
|
|
DeprecationWarning,
|
|
)
|
|
return not self.empty and (
|
|
self.upper < other or (self.right == Bound.OPEN and self.upper == other)
|
|
)
|
|
|
|
def __gt__(self, other):
|
|
if isinstance(other, Interval):
|
|
if self.empty or other.empty:
|
|
return False
|
|
|
|
if self.left == Bound.OPEN or other.right == Bound.OPEN:
|
|
return self.lower >= other.upper
|
|
else:
|
|
return self.lower > other.upper
|
|
else:
|
|
warnings.warn(
|
|
"Comparing an interval and a value is deprecated. Convert value to singleton first.",
|
|
DeprecationWarning,
|
|
)
|
|
return not self.empty and (
|
|
self.lower > other or (self.left == Bound.OPEN and self.lower == other)
|
|
)
|
|
|
|
def __le__(self, other):
|
|
if isinstance(other, Interval):
|
|
if self.empty or other.empty:
|
|
return False
|
|
|
|
if self.right == Bound.OPEN or other.right == Bound.CLOSED:
|
|
return self.upper <= other.upper
|
|
else:
|
|
return self.upper < other.upper
|
|
else:
|
|
warnings.warn(
|
|
"Comparing an interval and a value is deprecated. Convert value to singleton first.",
|
|
DeprecationWarning,
|
|
)
|
|
return not self.empty and self.upper <= other
|
|
|
|
def __ge__(self, other):
|
|
if isinstance(other, Interval):
|
|
if self.empty or other.empty:
|
|
return False
|
|
|
|
if self.left == Bound.OPEN or other.left == Bound.CLOSED:
|
|
return self.lower >= other.lower
|
|
else:
|
|
return self.lower > other.lower
|
|
else:
|
|
warnings.warn(
|
|
"Comparing an interval and a value is deprecated. Convert value to singleton first.",
|
|
DeprecationWarning,
|
|
)
|
|
return not self.empty and self.lower >= other
|
|
|
|
def __hash__(self):
|
|
return hash(tuple([self.lower, self.upper]))
|
|
|
|
def __repr__(self):
|
|
if self.empty:
|
|
return "()"
|
|
|
|
string = []
|
|
for interval in self._intervals:
|
|
if interval.lower == interval.upper:
|
|
string.append("[" + repr(interval.lower) + "]")
|
|
else:
|
|
string.append(
|
|
("[" if interval.left == Bound.CLOSED else "(")
|
|
+ repr(interval.lower)
|
|
+ ","
|
|
+ repr(interval.upper)
|
|
+ ("]" if interval.right == Bound.CLOSED else ")")
|
|
)
|
|
return " | ".join(string)
|
|
|
|
|
|
class AbstractDiscreteInterval(Interval):
|
|
"""
|
|
An abstract class for discrete interval.
|
|
|
|
This class is not expected to be used as-is, and should be subclassed
|
|
first. The only attribute/method that should be overriden is the `_step`
|
|
class variable. This variable defines the step between two consecutive
|
|
values of the discrete domain (e.g., 1 for integers).
|
|
If a meaningfull step cannot be provided (e.g., for characters), the
|
|
_incr and _decr class methods can be overriden. They respectively return
|
|
the next and previous value given the current one.
|
|
|
|
This class is still experimental and backward incompatible changes may
|
|
occur even in minor or patch updates of portion.
|
|
"""
|
|
|
|
_step = None
|
|
|
|
@classmethod
|
|
def _incr(cls, value):
|
|
"""
|
|
Increment given value.
|
|
|
|
:param value: value to increment.
|
|
:return: incremented value.
|
|
"""
|
|
return value + cls._step
|
|
|
|
@classmethod
|
|
def _decr(cls, value):
|
|
"""
|
|
Decrement given value.
|
|
|
|
:param value: value to decrement.
|
|
:return: decremented value.
|
|
"""
|
|
return value - cls._step
|
|
|
|
@classmethod
|
|
def from_atomic(cls, left, lower, upper, right):
|
|
if left == Bound.OPEN and lower not in [-inf, inf]:
|
|
left = Bound.CLOSED
|
|
lower = cls._incr(lower)
|
|
|
|
if right == Bound.OPEN and upper not in [-inf, inf]:
|
|
right = Bound.CLOSED
|
|
upper = cls._decr(upper)
|
|
|
|
return super().from_atomic(left, lower, upper, right)
|
|
|
|
@classmethod
|
|
def _mergeable(cls, a, b):
|
|
if a.upper <= b.upper:
|
|
first, second = a, b
|
|
else:
|
|
first, second = b, a
|
|
|
|
if first.right == Bound.CLOSED and first.upper < second.lower:
|
|
first = Atomic(
|
|
first.left,
|
|
first.lower,
|
|
cls._incr(first.upper),
|
|
Bound.OPEN,
|
|
)
|
|
|
|
return super()._mergeable(first, second)
|