556 lines
15 KiB
Python
556 lines
15 KiB
Python
"""
|
|
These are general utilities used by the builder.py, functions.py or the
|
|
model file. Vensim's function equivalents should not go here but in
|
|
functions.py
|
|
"""
|
|
|
|
import json
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from chardet.universaldetector import UniversalDetector
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Set
|
|
|
|
import progressbar
|
|
import numpy as np
|
|
import xarray as xr
|
|
import pandas as pd
|
|
|
|
|
|
def xrsplit(array):
|
|
"""
|
|
Split an array to a list of all the components.
|
|
|
|
Parameters
|
|
----------
|
|
array: xarray.DataArray
|
|
Array to split.
|
|
|
|
Returns
|
|
-------
|
|
sp_list: list of xarray.DataArrays
|
|
List of shape 0 xarray.DataArrays with coordinates.
|
|
|
|
"""
|
|
sp_list = [sa for sa in array]
|
|
if sp_list[0].shape:
|
|
sp_list = [ssa for sa in sp_list for ssa in xrsplit(sa)]
|
|
return sp_list
|
|
|
|
|
|
def get_current_computer_time():
|
|
"""
|
|
Returns the current machine time. Needed to mock the machine time in
|
|
the tests.
|
|
|
|
Parameters
|
|
---------
|
|
None
|
|
|
|
Returns
|
|
-------
|
|
datetime.now(): datetime.datetime
|
|
Current machine time.
|
|
|
|
"""
|
|
return datetime.now()
|
|
|
|
|
|
def get_return_elements(return_columns, namespace):
|
|
"""
|
|
Takes a list of return elements formatted in vensim's format
|
|
Varname[Sub1, SUb2]
|
|
and returns first the model elements (in Python safe language)
|
|
that need to be computed and collected, and secondly the addresses
|
|
that each element in the return columns list translates to
|
|
|
|
Parameters
|
|
----------
|
|
return_columns: list of strings
|
|
|
|
namespace: dict
|
|
|
|
Returns
|
|
-------
|
|
capture_elements
|
|
return_addresses
|
|
|
|
"""
|
|
capture_elements = list()
|
|
return_addresses = dict()
|
|
for col in return_columns:
|
|
if col[0] == col[-1] and col[0] == '"':
|
|
name = col
|
|
address = None
|
|
elif "[" in col:
|
|
name, location = col.strip("]").split("[")
|
|
address = tuple([loc.strip() for loc in location.split(",")])
|
|
else:
|
|
name = col
|
|
address = None
|
|
|
|
if name in namespace:
|
|
py_name = namespace[name]
|
|
else:
|
|
if name in namespace.values():
|
|
py_name = name
|
|
else:
|
|
raise KeyError(name + " not found as model element")
|
|
|
|
if py_name not in capture_elements:
|
|
capture_elements += [py_name]
|
|
|
|
return_addresses[col] = (py_name, address)
|
|
|
|
return list(capture_elements), return_addresses
|
|
|
|
|
|
def compute_shape(coords, reshape_len=None, py_name=""):
|
|
"""
|
|
Computes the 'shape' of a coords dictionary.
|
|
Function used to rearange data in xarrays and
|
|
to compute the number of rows/columns to be read in a file.
|
|
|
|
Parameters
|
|
----------
|
|
coords: dict
|
|
Ordered dictionary of the dimension names as a keys with their values.
|
|
|
|
reshape_len: int (optional)
|
|
Number of dimensions of the output shape.
|
|
The shape will ony compute the corresponent table
|
|
dimensions to read from Excel, then, the dimensions
|
|
with length one will be ignored at first.
|
|
Lately, it will complete with 1 on the left of the shape
|
|
if the reshape_len value is bigger than the length of shape.
|
|
Will raise a ValueError if we try to reshape to a reshape_len
|
|
smaller than the initial shape.
|
|
|
|
py_name: str
|
|
Name to print if an error is raised.
|
|
|
|
Returns
|
|
-------
|
|
shape: list
|
|
Shape of the ordered dictionary or of the desired table or vector.
|
|
|
|
"""
|
|
if not reshape_len:
|
|
return [len(coord) for coord in coords.values()]
|
|
|
|
# get the shape of the coordinates bigger than 1
|
|
shape = [len(coord) for coord in coords.values() if len(coord) > 1]
|
|
|
|
shape_len = len(shape)
|
|
|
|
# return an error when the current shape is bigger than the requested one
|
|
if shape_len > reshape_len:
|
|
raise ValueError(
|
|
py_name
|
|
+ "\n"
|
|
+ "The shape of the coords to read in a "
|
|
+ " external file must be at most "
|
|
+ "{} dimensional".format(reshape_len)
|
|
)
|
|
|
|
# complete with 1s on the left
|
|
return [1] * (reshape_len - shape_len) + shape
|
|
|
|
|
|
def get_key_and_value_by_insensitive_key_or_value(key, dict):
|
|
"""
|
|
Providing a key or value in a dictionary search for the real key and value
|
|
in the dictionary ignoring case sensitivity.
|
|
|
|
Parameters
|
|
----------
|
|
key: str
|
|
Key or value to look for in the dictionary.
|
|
dict: dict
|
|
Dictionary to search in.
|
|
|
|
Returns
|
|
-------
|
|
real key, real value: (str, str) or (None, None)
|
|
The real key and value that appear in the dictionary or a tuple
|
|
of Nones if the input key is not in the dictionary.
|
|
|
|
"""
|
|
lower_key = key.lower()
|
|
for real_key, real_value in dict.items():
|
|
if real_key.lower() == lower_key or real_value.lower() == lower_key:
|
|
return real_key, real_value
|
|
|
|
return None, None
|
|
|
|
|
|
def rearrange(data, dims, coords):
|
|
"""
|
|
Returns a xarray.DataArray object with the given coords and dims
|
|
|
|
Parameters
|
|
---------
|
|
data: float or xarray.DataArray
|
|
The input data to rearrange.
|
|
|
|
dims: list
|
|
Ordered list of the dimensions.
|
|
|
|
coords: dict
|
|
Dictionary of the dimension names as a keys with their values.
|
|
|
|
Returns
|
|
-------
|
|
xarray.DataArray
|
|
|
|
"""
|
|
# subset used coords in general coords will be the subscript_dict
|
|
coords = {dim: coords[dim] for dim in dims}
|
|
if isinstance(data, xr.DataArray):
|
|
shape = tuple(compute_shape(coords))
|
|
if data.shape == shape:
|
|
# Allows switching dimensions names and transpositions
|
|
return xr.DataArray(data=data.values, coords=coords, dims=dims)
|
|
elif np.prod(shape) < np.prod(data.shape):
|
|
# Allows subscripting a subrange
|
|
return data.rename({
|
|
dim: new_dim for dim, new_dim in zip(data.dims, dims)
|
|
if dim != new_dim
|
|
}).loc[coords]
|
|
|
|
# The coordinates are expanded or transposed
|
|
return xr.DataArray(0, coords, dims) + data
|
|
|
|
elif data is not None:
|
|
return xr.DataArray(data, coords, dims)
|
|
|
|
return None
|
|
|
|
|
|
def load_model_data(root, model_name):
|
|
|
|
"""
|
|
Used for models split in several files.
|
|
Loads subscripts and modules dictionaries
|
|
|
|
Parameters
|
|
----------
|
|
root: pathlib.Path
|
|
Path to the model file.
|
|
|
|
model_name: str
|
|
Name of the model without file type extension (e.g. "my_model").
|
|
|
|
Returns
|
|
-------
|
|
subscripts: dict
|
|
Dictionary describing the possible dimensions of the stock's
|
|
subscripts.
|
|
|
|
modules: dict
|
|
Dictionary containing view (module) names as keys and a list of the
|
|
corresponding variables as values.
|
|
|
|
"""
|
|
with open(root.joinpath("_subscripts_" + model_name + ".json")) as subs:
|
|
subscripts = json.load(subs)
|
|
|
|
# the _modules.json in the sketch_var folder shows to which module each
|
|
# variable belongs
|
|
with open(root.joinpath("modules_" + model_name, "_modules.json")) as mods:
|
|
modules = json.load(mods)
|
|
|
|
return subscripts, modules
|
|
|
|
|
|
def load_modules(module_name, module_content, work_dir, submodules):
|
|
"""
|
|
Used to load model modules from the main model file, when
|
|
split_views=True in the read_vensim function. This function is used
|
|
to iterate over the different layers of the nested dictionary that
|
|
describes which model variables belong to each module/submodule.
|
|
|
|
Parameters
|
|
----------
|
|
module_name: str
|
|
Name of the module to load.
|
|
|
|
module_content: dict or list
|
|
Content of the module. If it's a dictionary, it means that the
|
|
module has submodules, whereas if it is a list it means that that
|
|
particular module/submodule is a final one.
|
|
|
|
work_dir: pathlib.Path
|
|
Path to the module file.
|
|
|
|
submodules: list
|
|
This list gets updated at every recursive iteration, and each element
|
|
corresponds to the string representation of each module/submodule that
|
|
is read.
|
|
|
|
Returns
|
|
-------
|
|
str:
|
|
String representations of the modules/submodules to execute in the main
|
|
model file.
|
|
|
|
"""
|
|
if isinstance(module_content, list):
|
|
with open(work_dir.joinpath(module_name + ".py"), "r",
|
|
encoding="UTF-8") as mod:
|
|
submodules.append(mod.read())
|
|
else:
|
|
for submod_name, submod_content in module_content.items():
|
|
load_modules(
|
|
submod_name, submod_content,
|
|
work_dir.joinpath(module_name),
|
|
submodules)
|
|
|
|
return "\n\n".join(submodules)
|
|
|
|
|
|
def load_outputs(file_name, transpose=False, columns=None, encoding=None):
|
|
"""
|
|
Load outputs file
|
|
|
|
Parameters
|
|
----------
|
|
file_name: str or pathlib.Path
|
|
Output file to read. Must be csv or tab.
|
|
|
|
transpose: bool (optional)
|
|
If True reads transposed outputs file, i.e. one variable per row.
|
|
Default is False.
|
|
|
|
columns: list or None (optional)
|
|
List of the column names to load. If None loads all the columns.
|
|
Default is None.
|
|
NOTE: if transpose=False, the loading will be faster as only
|
|
selected columns will be loaded. If transpose=True the whole
|
|
file must be read and it will be subselected later.
|
|
|
|
encoding: str or None (optional)
|
|
Encoding type to read output file. Needed if the file has special
|
|
characters. Default is None.
|
|
|
|
Returns
|
|
-------
|
|
pandas.DataFrame
|
|
A pandas.DataFrame with the outputs values.
|
|
|
|
"""
|
|
read_func = {'.csv': pd.read_csv, '.tab': pd.read_table}
|
|
|
|
file_name = Path(file_name)
|
|
|
|
if columns:
|
|
columns = set(columns)
|
|
if not transpose:
|
|
columns.add("Time")
|
|
|
|
for end, func in read_func.items():
|
|
if file_name.suffix.lower() == end:
|
|
if transpose:
|
|
out = func(file_name,
|
|
encoding=encoding,
|
|
index_col=0).T
|
|
if columns:
|
|
out = out[list(columns)]
|
|
else:
|
|
out = func(file_name,
|
|
encoding=encoding,
|
|
usecols=columns,
|
|
index_col="Time")
|
|
|
|
out.index = out.index.astype(float)
|
|
# return the dataframe removing nan index values
|
|
return out[~np.isnan(out.index)]
|
|
|
|
raise ValueError(
|
|
f"\nNot able to read '{file_name}'. "
|
|
+ f"Only {', '.join(list(read_func))} files are accepted.")
|
|
|
|
|
|
def detect_encoding(filename):
|
|
"""
|
|
Detects the encoding of a file.
|
|
|
|
Parameters
|
|
----------
|
|
filename: str
|
|
Name of the file to detect the encoding.
|
|
|
|
Returns
|
|
-------
|
|
encoding: str
|
|
The encoding of the file.
|
|
|
|
"""
|
|
detector = UniversalDetector()
|
|
with open(filename, 'rb') as file:
|
|
for line in file.readlines():
|
|
detector.feed(line)
|
|
detector.close()
|
|
return detector.result['encoding']
|
|
|
|
|
|
def print_objects_format(object_set, text):
|
|
"""
|
|
Return a printable version of the variables in object_sect with the
|
|
header given with text.
|
|
"""
|
|
text += " (total %(n_obj)s):\n\t%(objs)s\n" % {
|
|
"n_obj": len(object_set),
|
|
"objs": ", ".join(object_set)
|
|
}
|
|
return text
|
|
|
|
|
|
@dataclass
|
|
class Dependencies():
|
|
"""
|
|
Representation of variables dependencies.
|
|
|
|
Parameters
|
|
----------
|
|
c_vars: set
|
|
Set of all selected model variables.
|
|
d_deps: dict
|
|
Dictionary of dependencies needed to run vars and modules.
|
|
s_deps: set
|
|
Set of stateful objects to update when integrating selected
|
|
model variables.
|
|
|
|
"""
|
|
c_vars: Set[str]
|
|
d_deps: Dict[str, set]
|
|
s_deps: Set[str]
|
|
|
|
def __str__(self):
|
|
text = print_objects_format(self.c_vars, "Selected variables")
|
|
|
|
if self.d_deps["initial"]:
|
|
text += print_objects_format(
|
|
self.d_deps["initial"],
|
|
"\nDependencies for initialization only")
|
|
if self.d_deps["step"]:
|
|
text += print_objects_format(
|
|
self.d_deps["step"],
|
|
"\nDependencies that may change over time")
|
|
if self.d_deps["lookup"]:
|
|
text += print_objects_format(
|
|
self.d_deps["lookup"],
|
|
"\nLookup table dependencies")
|
|
|
|
text += print_objects_format(
|
|
self.s_deps,
|
|
"\nStateful objects integrated with the selected variables")
|
|
|
|
return text
|
|
|
|
|
|
class ProgressBar:
|
|
"""
|
|
Progress bar for integration
|
|
"""
|
|
|
|
def __init__(self, max_value=None):
|
|
|
|
self.max_value = max_value
|
|
if self.max_value is None:
|
|
return
|
|
|
|
self.counter = 0
|
|
|
|
self.bar = progressbar.ProgressBar(
|
|
max_value=self.max_value,
|
|
widgets=[
|
|
progressbar.ETA(),
|
|
" ",
|
|
progressbar.Bar("#", "[", "]", "-"),
|
|
progressbar.Percentage(),
|
|
],
|
|
)
|
|
|
|
self.bar.start()
|
|
|
|
def update(self):
|
|
"""Update progress bar"""
|
|
try:
|
|
self.counter += 1
|
|
self.bar.update(self.counter)
|
|
except AttributeError:
|
|
# Error if bar is not imported
|
|
pass
|
|
|
|
def finish(self):
|
|
"""Finish progress bar"""
|
|
try:
|
|
self.bar.finish()
|
|
except AttributeError:
|
|
# Error if bar is not imported
|
|
pass
|
|
|
|
|
|
class UniqueDims():
|
|
"""
|
|
Helper class to create unique dimension names for data_vars with the
|
|
same dimension name but different coords in xarray Datasets.
|
|
"""
|
|
def __init__(self, original_dim_name):
|
|
self.dim_name = original_dim_name
|
|
self.dim_prefix = self.dim_name + "_#"
|
|
self.unique_dims = []
|
|
self.num = 1
|
|
|
|
def name_new_dim(self, dim_name, coords):
|
|
"""
|
|
Returns either a new name (original_dim_name + _# + num) if the coords
|
|
list is not in unique_dims, or the preexisting dimension name if it is.
|
|
Parameters
|
|
----------
|
|
dim_name: str
|
|
This argument is used to verify that we are passing the right
|
|
dimension name to the class.
|
|
coords: list
|
|
List of coordinates of a dimension.
|
|
|
|
Returns
|
|
-------
|
|
Updated name of the original dimension.
|
|
"""
|
|
if dim_name != self.dim_name:
|
|
raise ValueError(
|
|
"This object is configured to process dimension "
|
|
f"{self.dim_name} and it's being passed a dimension "
|
|
f"named {dim_name}"
|
|
)
|
|
if self.is_new(coords):
|
|
new_dim_name = self.dim_prefix + str(self.num)
|
|
self.unique_dims.append((new_dim_name, coords))
|
|
self.num += 1
|
|
return new_dim_name
|
|
else:
|
|
for y in self.unique_dims:
|
|
if np.array_equal(coords, y[1]):
|
|
return y[0]
|
|
|
|
def is_new(self, coords):
|
|
"""
|
|
Checks if coords is already in the unique_dims list or not.
|
|
|
|
Parameters
|
|
----------
|
|
coords: list
|
|
List of coordinates of a dimension.
|
|
|
|
Returns
|
|
-------
|
|
bool
|
|
"""
|
|
if not any(
|
|
map(lambda y: np.array_equal(y[1], coords),
|
|
self.unique_dims)):
|
|
return True
|
|
return False
|