CCR/.venv/lib/python3.12/site-packages/pysd/py_backend/components.py

289 lines
9.2 KiB
Python

"""
Model components and time managing classes.
"""
from warnings import warn
import os
import random
import inspect
import importlib.util
from copy import deepcopy
import numpy as np
from pysd._version import __version__
class Component(object):
def __init__(self):
self.namespace = {}
self.dependencies = {}
def add(self, name, units=None, limits=(np.nan, np.nan),
subscripts=None, comp_type=None, comp_subtype=None,
depends_on={}, other_deps={}):
"""
This decorators allows assigning metadata to a function.
"""
def decorator(function):
function.name = name
function.units = units
function.limits = limits
function.subscripts = subscripts
function.type = comp_type
function.subtype = comp_subtype
function.args = inspect.getfullargspec(function)[0]
# include component in namespace and dependencies
self.namespace[name] = function.__name__
if function.__name__ != "time":
self.dependencies[function.__name__] = depends_on
self.dependencies.update(other_deps)
return function
return decorator
class Components(object):
"""
Workaround class to let the user do:
model.components.var = value
"""
def __init__(self, py_model_file, set_components):
object.__setattr__(self, "_components", self._load(py_model_file))
object.__setattr__(self, "_set_components", set_components)
def _load(self, py_model_file):
"""
Load model components.
Parameters
----------
py_model_file: str
Model file to be loaded.
Returns
-------
components: module
The imported file content.
"""
# need a unique identifier for the imported module.
module_name = os.path.splitext(py_model_file)[0]\
+ str(random.randint(0, 1000000))
try:
spec = importlib.util.spec_from_file_location(
module_name, py_model_file)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
except TypeError:
raise ImportError(
"\n\nNot able to import the model. "
+ "This may be because the model was compiled with an "
+ "earlier version of PySD, you can check on the top of "
+ "the model file you are trying to load."
+ "\nThe current version of PySd is :"
+ "\n\tPySD " + __version__ + "\n\n"
+ "Please translate again the model with the function"
+ " read_vensim or read_xmile.")
def __getattribute__(self, name):
"""
Get attribute from the class. Try Except vlock is used to load directly
model components in order to avoid making the model slower during the
integration.
"""
try:
return getattr(object.__getattribute__(self, "_components"), name)
except AttributeError:
if name in ["_components", "_set_components",
"_set_component", "_load"]:
# The attribute is from the class Components
return object.__getattribute__(self, name)
else:
raise NameError(f"Component '{name}' not found in the model.")
def __setattr__(self, name, value):
"""
Workaround calling the Macro._set_components method
"""
self._set_components({name: value})
def _set_component(self, name, value):
"""
Replaces the previous setter.
"""
setattr(
object.__getattribute__(self, "_components"),
name,
value
)
class Time(object):
rprec = 1e-5 # relative precision for final time and saving time
def __init__(self):
self._time = None
self.stage = None
self.return_timestamps = None
self._next_return = None
self._control_vars_tracker = {}
def __call__(self):
return self._time
def export(self):
"""Exports time values to a dictionary."""
return {
"control_vars": self._get_control_vars(),
"stage": self.stage,
"_time": self._time,
"return_timestamps": self.return_timestamps,
"_next_return": self._next_return
}
def _get_control_vars(self):
"""
Make control vars changes exportable.
"""
out = {}
for cvar, value in self._control_vars_tracker.items():
if callable(value):
out[cvar] = value()
else:
out[cvar] = value
return out
def _set_time(self, time_dict):
"""Copy values from other Time object, used by Model.copy"""
self.set_control_vars(**time_dict['control_vars'])
for key, value in time_dict.items():
if key == 'control_vars':
continue
setattr(self, key, value)
def set_control_vars(self, **kwargs):
"""
Set the control variables values
Parameters
----------
**kwards:
initial_time: float, callable or None
Initial time.
final_time: float, callable or None
Final time.
time_step: float, callable or None
Time step.
saveper: float, callable or None
Saveper.
"""
# filter None values
kwargs = {
key: value for key, value in kwargs.items()
if value is not None
}
# track changes
self._control_vars_tracker.update(kwargs)
self._set_control_vars(**kwargs)
def _set_control_vars(self, **kwargs):
"""
Set the control variables values. Private version to be used
to avoid tracking changes.
"""
def _convert_value(value):
# this function is necessary to avoid copying the pointer in the
# lambda function.
if callable(value):
return value
else:
return lambda: value
for key, value in kwargs.items():
setattr(self, key, _convert_value(value))
if "initial_time" in kwargs:
self._initial_time = self.initial_time()
self._time = self.initial_time()
def in_bounds(self):
"""
Check if time is smaller than current final time value.
Returns
-------
bool:
True if time is smaller than final time. Otherwise, returns Fase.
"""
return self._time + self.time_step()*self.rprec < self.final_time()
def in_return(self):
""" Check if current time should be returned """
prec = self.time_step() * self.rprec
if self.return_timestamps is not None:
# this allows managing float precision error
if self._next_return is None:
return False
if np.isclose(self._time, self._next_return, prec):
self._update_next_return()
return True
else:
while self._next_return is not None\
and self._time > self._next_return:
warn(
f"The returning time stamp '{self._next_return}' "
"seems to not be a multiple of the time step. "
"This value will not be saved in the output. "
"Please, modify the returning timestamps or the "
"integration time step to avoid this."
)
self._update_next_return()
return False
time_delay = self._time - self._initial_time
save_per = self.saveper()
return time_delay % save_per < prec or -time_delay % save_per < prec
def round(self):
""" Return rounded time to outputs to avoid float precision error"""
return np.round(
self._time,
-int(np.log10(self.time_step()*self.rprec)))
def add_return_timestamps(self, return_timestamps):
""" Add return timestamps """
if hasattr(return_timestamps, '__len__')\
and len(return_timestamps) > 0:
self.return_timestamps = list(return_timestamps)
self.return_timestamps.sort(reverse=True)
self._next_return = self.return_timestamps.pop()
elif isinstance(return_timestamps, (float, int)):
self._next_return = return_timestamps
self.return_timestamps = []
else:
self._next_return = None
self.return_timestamps = None
def update(self, value):
""" Update current time value """
self._time = value
def _update_next_return(self):
""" Update the next_return value """
if self.return_timestamps:
self._next_return = self.return_timestamps.pop()
else:
self._next_return = None
def reset(self):
""" Reset time value to the initial """
self._time = self._initial_time