1164 lines
39 KiB
Python
1164 lines
39 KiB
Python
"""
|
|
These classes are a collection of the needed tools to read external data.
|
|
The External type objects created by these classes are initialized before
|
|
the Stateful objects by functions.Model.initialize.
|
|
"""
|
|
|
|
import re
|
|
import warnings
|
|
|
|
from openpyxl import load_workbook
|
|
from openpyxl.utils.exceptions import InvalidFileException
|
|
|
|
import numpy as np
|
|
import xarray as xr
|
|
import pandas as pd
|
|
|
|
from . import utils
|
|
from .data import Data
|
|
from .lookups import Lookups
|
|
|
|
|
|
_SPREADSHEET_EXTS = {'.xls', '.xlsx', '.xlsm', '.xlsb', '.odf', '.ods', '.odt'}
|
|
|
|
|
|
class Excels():
|
|
"""
|
|
Class to save the read Excel files and thus avoid double reading
|
|
"""
|
|
_Excels, _Excels_opyxl = {}, {}
|
|
|
|
@classmethod
|
|
def read(cls, file_name, tab):
|
|
"""
|
|
Read the Excel file or return the previously read one
|
|
"""
|
|
if file_name.joinpath(tab) in cls._Excels:
|
|
return cls._Excels[file_name.joinpath(tab)]
|
|
else:
|
|
# get the function to read the data based on its extension
|
|
read_kwargs = {}
|
|
ext = file_name.suffix.lower()
|
|
if ext in _SPREADSHEET_EXTS:
|
|
read_func = pd.read_excel
|
|
read_kwargs['sheet_name'] = tab
|
|
elif ext == '.csv':
|
|
read_func = pd.read_csv
|
|
if tab and not tab[0].isalnum():
|
|
read_kwargs['sep'] = tab
|
|
else:
|
|
read_func = pd.read_table
|
|
if tab and not tab[0].isalnum():
|
|
read_kwargs['sep'] = tab
|
|
# read the data
|
|
excel = np.array([
|
|
pd.to_numeric(ex, errors='coerce')
|
|
for ex in
|
|
read_func(file_name, header=None, **read_kwargs).values
|
|
])
|
|
# save data for future retrievals
|
|
cls._Excels[file_name.joinpath(tab)] = excel
|
|
return excel
|
|
|
|
@classmethod
|
|
def read_opyxl(cls, file_name):
|
|
"""
|
|
Read the Excel file using OpenPyXL or return the previously read one
|
|
"""
|
|
if file_name in cls._Excels_opyxl:
|
|
return cls._Excels_opyxl[file_name]
|
|
else:
|
|
excel = load_workbook(file_name, read_only=True, data_only=True)
|
|
cls._Excels_opyxl[file_name] = excel
|
|
return excel
|
|
|
|
@classmethod
|
|
def clean(cls):
|
|
"""
|
|
Clean the dictionary of read files
|
|
"""
|
|
for file in cls._Excels_opyxl.values():
|
|
# close files open directly with openpyxls
|
|
file.close()
|
|
# files open with pandas are automatically closed
|
|
cls._Excels, cls._Excels_opyxl = {}, {}
|
|
|
|
|
|
class External(object):
|
|
"""
|
|
Main class of external objects
|
|
|
|
Attributes
|
|
----------
|
|
py_name: str
|
|
The Python name of the object
|
|
missing: str ("warning", "error", "ignore", "keep")
|
|
What to do with missing values. If "warning" (default)
|
|
shows a warning message and interpolates the values.
|
|
If "raise" raises an error. If "ignore" interpolates
|
|
the values without showing anything. If "keep" it will keep
|
|
the missing values, this option may cause the integration to
|
|
fail, but it may be used to check the quality of the data.
|
|
file: str
|
|
File name from which the data is read.
|
|
tab: str
|
|
Tab name from which the data is read. If file type is not a
|
|
spreadsheet this will be used as a separator.
|
|
|
|
"""
|
|
missing = "warning"
|
|
|
|
def __init__(self, py_name):
|
|
self.py_name = py_name
|
|
self.file = None
|
|
self.tab = None
|
|
|
|
def __str__(self):
|
|
return self.py_name
|
|
|
|
def _get_data_from_file(self, rows, cols):
|
|
"""
|
|
Function to read data from excel file using rows and columns
|
|
|
|
Parameters
|
|
----------
|
|
rows: list of len 2
|
|
first row and last row+1 to be read, starting from 0
|
|
cols: list of len 2
|
|
first col and last col+1 to be read, starting from 0
|
|
|
|
Returns
|
|
-------
|
|
data: pandas.DataFrame, pandas.Series or float
|
|
depending on the shape of the requested data
|
|
|
|
"""
|
|
# read data
|
|
data = Excels.read(
|
|
self.file,
|
|
self.tab)[rows[0]:rows[1], cols[0]:cols[1]].copy()
|
|
|
|
shape = data.shape
|
|
|
|
# empty cells
|
|
if shape[0] == 0 or shape[1] == 0:
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
"The cells are empty.\n"
|
|
+ self._file_sheet
|
|
)
|
|
|
|
# if it is a single row remove its dimension
|
|
if shape[1] == 1:
|
|
data = data[:, 0]
|
|
if shape[0] == 1:
|
|
data = data[0]
|
|
return data
|
|
|
|
def _get_data_from_file_opyxl(self, cellname):
|
|
"""
|
|
Function to read data from excel file using cell range name
|
|
|
|
Parameters
|
|
----------
|
|
cellname: str
|
|
the cell range name
|
|
|
|
Returns
|
|
-------
|
|
data: numpy.ndarray or float
|
|
depending on the shape of the requested data
|
|
shape: list
|
|
The shape of the data in 2D.
|
|
|
|
"""
|
|
# read data
|
|
try:
|
|
excel = Excels.read_opyxl(self.file)
|
|
except InvalidFileException:
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
f"Cannot read the file '{self.file}'...\n"
|
|
f"It could happen that cell='{cellname}' was "
|
|
"read as a cell range name due to a wrong "
|
|
"definition of cell value"
|
|
)
|
|
|
|
# Get global and local cellrange names
|
|
global_cellranges = excel.defined_names
|
|
local_cellranges = None
|
|
# need to lower the sheetnames as Vensim has no case sensitivity
|
|
for sheet in excel.sheetnames:
|
|
if sheet.lower() == self.tab.lower():
|
|
local_cellranges = excel[sheet].defined_names
|
|
break
|
|
|
|
if local_cellranges is None:
|
|
# Error if it is not able to get the localSheetId
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
"The sheet doesn't exist...\n"
|
|
+ self._file_sheet
|
|
)
|
|
try:
|
|
# Search for local and global names
|
|
cellrange = local_cellranges.get(cellname)\
|
|
or global_cellranges.get(cellname)
|
|
sheet, cells = next(cellrange.destinations)
|
|
|
|
assert sheet.lower() == self.tab.lower()
|
|
self.tab = sheet # case insensitivity in sheet name
|
|
|
|
# Get the cells where the cellrange is defined
|
|
cells = re.split(r":|\$", cells)
|
|
cols = [self._col_to_num(cells[1]), None]
|
|
rows = [int(cells[2])-1, None]
|
|
if len(cells) == 3:
|
|
# 0 dim cell range
|
|
cols[1] = cols[0]+1
|
|
rows[1] = rows[0]+1
|
|
else:
|
|
# array or table
|
|
cols[1] = self._col_to_num(cells[4])+1
|
|
rows[1] = int(cells[5])
|
|
# Use pandas to read the data and return its original shape
|
|
return self._get_data_from_file(rows, cols), \
|
|
[rows[1]-rows[0], cols[1]-cols[0]]
|
|
|
|
except (AttributeError, AssertionError):
|
|
# key error if the cellrange doesn't exist in the file or sheet
|
|
raise AttributeError(
|
|
self.py_name + "\n"
|
|
f"The cellrange name '{cellname}'\n"
|
|
"Doesn't exist in:\n" + self._file_sheet
|
|
)
|
|
|
|
def _get_series_data(self, series_across, series_row_or_col, cell, size):
|
|
"""
|
|
Function thar reads series and data from excel file for
|
|
DATA and LOOKUPS.
|
|
|
|
Parameters
|
|
----------
|
|
series_across: "row", "column" or "name"
|
|
The way to read series file.
|
|
series_row_or_col: int or str
|
|
If series_across is "row" the row number where the series data is.
|
|
If series_across is "column" the column name where the series
|
|
data is.
|
|
If series_across is "name" the cell range name where the series
|
|
data is.
|
|
cell:
|
|
If series_across is not "name, the top left cell where the
|
|
data table starts.
|
|
Else the name of the cell range where the data is.
|
|
size:
|
|
The size of the 2nd dimension of the data.
|
|
|
|
Returns
|
|
-------
|
|
series, data: ndarray (1D), ndarray(1D/2D)
|
|
The values of the series and data.
|
|
|
|
"""
|
|
if series_across == "row":
|
|
# Horizontal data (dimension values in a row)
|
|
|
|
# get the dimension values
|
|
first_row, first_col = self._split_excel_cell(cell)
|
|
series = self._get_data_from_file(
|
|
rows=[int(series_row_or_col)-1, int(series_row_or_col)],
|
|
cols=[first_col, None])
|
|
|
|
# read data
|
|
data = self._get_data_from_file(
|
|
rows=[first_row, first_row + size],
|
|
cols=[first_col, None]).transpose()
|
|
|
|
elif series_across == "column":
|
|
# Vertical data (dimension values in a column)
|
|
|
|
# get the dimension values
|
|
first_row, first_col = self._split_excel_cell(cell)
|
|
series_col = self._col_to_num(series_row_or_col)
|
|
series = self._get_data_from_file(
|
|
rows=[first_row, None],
|
|
cols=[series_col, series_col+1])
|
|
|
|
# read data
|
|
data = self._get_data_from_file(
|
|
rows=[first_row, None],
|
|
cols=[first_col, first_col + size])
|
|
|
|
else:
|
|
# get series data
|
|
series, s_shape = self._get_data_from_file_opyxl(series_row_or_col)
|
|
|
|
if isinstance(series, float):
|
|
series = np.array([series])
|
|
|
|
if s_shape[0] > 1 and s_shape[1] > 1:
|
|
# Error if the lookup/time dimension is 2D
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
+ "Dimension given in:\n"
|
|
+ self._file_sheet
|
|
+ "\tDimension name:"
|
|
+ "\t'{}'\n".format(series_row_or_col)
|
|
+ " is a table and not a vector"
|
|
)
|
|
elif s_shape[1] != 1:
|
|
transpose = True
|
|
else:
|
|
transpose = False
|
|
|
|
# get data
|
|
data, d_shape = self._get_data_from_file_opyxl(cell)
|
|
|
|
if isinstance(data, float):
|
|
data = np.array([data])
|
|
|
|
if transpose:
|
|
# transpose for horizontal definition of dimension
|
|
data = data.transpose()
|
|
d_shape = d_shape[1], d_shape[0]
|
|
|
|
if d_shape[0] != len(series):
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
+ "Dimension and data given in:\n"
|
|
+ self._file_sheet
|
|
+ "\tDimension name:\t'{}'\n".format(series_row_or_col)
|
|
+ "\tData name:\t'{}'\n".format(cell)
|
|
+ " don't have the same length in the 1st dimension"
|
|
)
|
|
|
|
if d_shape[1] != size:
|
|
# Given coordinates length is different than
|
|
# the lentgh of 2nd dimension
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
+ "Data given in:\n"
|
|
+ self._file_sheet
|
|
+ "\tData name:\t'{}'\n".format(cell)
|
|
+ " has not the same size as the given coordinates"
|
|
)
|
|
|
|
return series, data
|
|
|
|
def _resolve_file(self, root):
|
|
"""
|
|
Resolve input file path. Joining the file with the root and
|
|
checking if it exists.
|
|
|
|
Parameters
|
|
----------
|
|
root: pathlib.Path or str
|
|
The root path to the model file.
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
|
|
"""
|
|
if str(self.file)[0] == '?':
|
|
# TODO add an option to include indirect references
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
+ f"Indirect reference to file: '{self.file}'")
|
|
|
|
# Join path and resolve it to better print error messages
|
|
self.file = root.joinpath(self.file).resolve()
|
|
|
|
if not self.file.is_file():
|
|
raise FileNotFoundError(
|
|
self.py_name + "\n"
|
|
+ "File '%s' not found." % self.file)
|
|
|
|
def _initialize_data(self, element_type):
|
|
"""
|
|
Initialize one element of DATA or LOOKUPS
|
|
|
|
Parameters
|
|
----------
|
|
element_type: str
|
|
"lookup" for LOOKUPS, "data" for data.
|
|
|
|
Returns
|
|
-------
|
|
data: xarray.DataArray
|
|
Dataarray with the time or interpolation dimension
|
|
as first dimension.
|
|
|
|
"""
|
|
self._resolve_file(root=self.root)
|
|
series_across = self._series_selector(self.x_row_or_col, self.cell)
|
|
size = utils.compute_shape(self.coords, reshape_len=1,
|
|
py_name=self.py_name)[0]
|
|
|
|
series, data = self._get_series_data(
|
|
series_across=series_across,
|
|
series_row_or_col=self.x_row_or_col,
|
|
cell=self.cell, size=size
|
|
)
|
|
|
|
# remove nan or missing values from dimension
|
|
if series_across != "name":
|
|
# Remove last nans only if the method is to read by row or col
|
|
i = 0
|
|
try:
|
|
while np.isnan(series[i-1]):
|
|
i -= 1
|
|
except IndexError:
|
|
# series has len 0
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
+ "Dimension given in:\n"
|
|
+ self._file_sheet
|
|
+ "\t{}:\t'{}'\n".format(series_across, self.x_row_or_col)
|
|
+ " has length 0"
|
|
)
|
|
|
|
if i != 0:
|
|
series = series[:i]
|
|
data = data[:i]
|
|
|
|
# warning/error if missing data in the series
|
|
if any(np.isnan(series)) and self.missing != "keep":
|
|
valid_values = ~np.isnan(series)
|
|
series = series[valid_values]
|
|
data = data[valid_values]
|
|
if all(np.isnan(series)):
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
+ "Dimension given in:\n"
|
|
+ self._file_sheet
|
|
+ "\t{}:\t'{}'\n".format(series_across, self.x_row_or_col)
|
|
+ " has length 0"
|
|
)
|
|
if self.missing == "warning":
|
|
warnings.warn(
|
|
self.py_name + "\n"
|
|
+ "Dimension value missing or non-valid in:\n"
|
|
+ self._file_sheet
|
|
+ "\t{}:\t'{}'\n".format(series_across, self.x_row_or_col)
|
|
+ " the corresponding data value(s) to the "
|
|
+ "missing/non-valid value(s) will be ignored\n\n"
|
|
)
|
|
elif self.missing == "raise":
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
+ "Dimension value missing or non-valid in:\n"
|
|
+ self._file_sheet
|
|
+ "\t{}:\t'{}'\n".format(series_across, self.x_row_or_col)
|
|
)
|
|
|
|
# reorder data with increasing series
|
|
if not np.all(np.diff(series) > 0) and self.missing != "keep":
|
|
order = np.argsort(series)
|
|
series = series[order]
|
|
data = data[order]
|
|
# Check if the lookup/time dimension is well defined
|
|
if np.any(np.diff(series) == 0):
|
|
raise ValueError(self.py_name + "\n"
|
|
+ "Dimension given in:\n"
|
|
+ self._file_sheet
|
|
+ "\t{}:\t'{}'\n".format(
|
|
series_across, self.x_row_or_col)
|
|
+ " has repeated values")
|
|
|
|
# Check for missing values in data
|
|
if np.any(np.isnan(data)) and self.missing != "keep":
|
|
if series_across == "name":
|
|
cell_type = "Cellrange"
|
|
else:
|
|
cell_type = "Reference cell"
|
|
|
|
if self.missing == "warning":
|
|
# Fill missing values with the chosen interpolation method
|
|
# what Vensim does during running for DATA
|
|
if self.interp != "raw":
|
|
interpolate_message =\
|
|
" the corresponding value will be filled "\
|
|
+ "with the interpolation method of the object."
|
|
else:
|
|
interpolate_message = ""
|
|
|
|
warnings.warn(
|
|
self.py_name + "\n"
|
|
+ "Data value missing or non-valid in:\n"
|
|
+ self._file_sheet
|
|
+ "\t{}:\t'{}'\n".format(cell_type, self.cell)
|
|
+ interpolate_message + "\n\n"
|
|
)
|
|
elif self.missing == "raise":
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
+ "Data value missing or non-valid in:\n"
|
|
+ self._file_sheet
|
|
+ "\t{}:\t'{}'\n".format(cell_type, self.cell)
|
|
)
|
|
# fill values
|
|
if self.interp != "raw":
|
|
self._fill_missing(series, data)
|
|
|
|
# reshape the data to fit in the xarray.DataArray
|
|
reshape_dims = tuple([len(series)] + utils.compute_shape(self.coords))
|
|
data = self._reshape(data, reshape_dims)
|
|
|
|
if element_type == "lookup":
|
|
dim_name = "lookup_dim"
|
|
else:
|
|
dim_name = "time"
|
|
|
|
data = xr.DataArray(
|
|
data=data,
|
|
coords={dim_name: series, **self.coords},
|
|
dims=[dim_name] + list(self.coords)
|
|
)
|
|
|
|
return data
|
|
|
|
def _fill_missing(self, series, data):
|
|
"""
|
|
Fills missing values in excel read data. Mutates the values in data.
|
|
|
|
Parameters
|
|
----------
|
|
series:
|
|
the time series without missing values
|
|
data:
|
|
the data with missing values
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
"""
|
|
# if data is 2dims we need to interpolate
|
|
datanan = np.isnan(data)
|
|
keeping_nan = False
|
|
if len(data.shape) == 1:
|
|
if not np.all(datanan):
|
|
data[datanan] = self._interpolate_missing(
|
|
series[datanan],
|
|
series[~datanan],
|
|
data[~datanan])
|
|
else:
|
|
keeping_nan = True
|
|
else:
|
|
for i, nanlist in enumerate(list(datanan.transpose())):
|
|
if not np.all(nanlist):
|
|
data[nanlist, i] = self._interpolate_missing(
|
|
series[nanlist],
|
|
series[~nanlist],
|
|
data[~nanlist][:, i])
|
|
else:
|
|
keeping_nan = True
|
|
|
|
if keeping_nan:
|
|
warnings.warn(
|
|
"Not able to interpolate some values..."
|
|
" keeping them as missing.\n")
|
|
|
|
def _interpolate_missing(self, x, xr, yr):
|
|
"""
|
|
Interpolates a list of missing values from _fill_missing
|
|
|
|
Parameters
|
|
----------
|
|
x:
|
|
list of missing values interpolate
|
|
xr:
|
|
non-missing x values
|
|
yr:
|
|
non-missing y values
|
|
|
|
Returns
|
|
-------
|
|
y:
|
|
Result after interpolating x with self.interp method
|
|
|
|
"""
|
|
y = np.empty_like(x, dtype=float)
|
|
for i, value in enumerate(x):
|
|
if value >= xr[-1]:
|
|
y[i] = yr[-1]
|
|
elif value <= xr[0]:
|
|
y[i] = yr[0]
|
|
elif self.interp == 'look_forward':
|
|
y[i] = yr[xr >= value][0]
|
|
elif self.interp == 'hold_backward':
|
|
y[i] = yr[xr <= value][-1]
|
|
else:
|
|
y[i] = np.interp(value, xr, yr)
|
|
return y
|
|
|
|
@property
|
|
def _file_sheet(self):
|
|
"""
|
|
Returns file and sheet name in a string
|
|
"""
|
|
return "\tFile name:\t'{}'\n".format(self.file)\
|
|
+ "\tSheet name:\t'{}'\n".format(self.tab)
|
|
|
|
@staticmethod
|
|
def _col_to_num(col):
|
|
"""
|
|
Transforms the column name to int
|
|
|
|
Parameters
|
|
----------
|
|
col: str
|
|
Column name
|
|
|
|
Returns
|
|
-------
|
|
int
|
|
Column number
|
|
"""
|
|
if len(col) == 1:
|
|
return ord(col.upper()) - ord('A')
|
|
elif len(col) == 2:
|
|
left = ord(col[0].upper()) - ord('A') + 1
|
|
right = ord(col[1].upper()) - ord('A')
|
|
return left * (ord('Z')-ord('A')+1) + right
|
|
else:
|
|
left = ord(col[0].upper()) - ord('A') + 1
|
|
center = ord(col[1].upper()) - ord('A') + 1
|
|
right = ord(col[2].upper()) - ord('A')
|
|
return left * ((ord('Z')-ord('A')+1)**2)\
|
|
+ center * (ord('Z')-ord('A')+1)\
|
|
+ right
|
|
|
|
def _split_excel_cell(self, cell):
|
|
"""
|
|
Splits a cell value given in a string.
|
|
Returns None for non-valid cell formats.
|
|
|
|
Parameters
|
|
----------
|
|
cell: str
|
|
Cell like string, such as "A1", "b16", "AC19"...
|
|
If it is not a cell like string will return None.
|
|
|
|
Returns
|
|
-------
|
|
row number, column number: int, int
|
|
If the cell input is valid. Both numbers are given in Python
|
|
enumeration, i.e., first row and first column are 0.
|
|
|
|
"""
|
|
split = re.findall(r'\d+|\D+', cell)
|
|
try:
|
|
# check that we only have two values [column, row]
|
|
assert len(split) == 2
|
|
# check that the column name has no special characters
|
|
assert not re.compile('[^a-zA-Z]+').search(split[0])
|
|
# check that row number is not 0
|
|
assert int(split[1]) != 0
|
|
# the column name has as maximum 3 letters
|
|
assert len(split[0]) <= 3
|
|
return int(split[1])-1, self._col_to_num(split[0])
|
|
except AssertionError:
|
|
return
|
|
|
|
@staticmethod
|
|
def _reshape(data, dims):
|
|
"""
|
|
Reshapes an pandas.DataFrame, pandas.Series, xarray.DataArray
|
|
or np.ndarray in the given dimensions.
|
|
|
|
Parameters
|
|
----------
|
|
data: xarray.DataArray/numpy.ndarray
|
|
Data to be reshaped
|
|
dims: tuple
|
|
The dimensions to reshape.
|
|
|
|
Returns
|
|
-------
|
|
numpy.ndarray
|
|
reshaped array
|
|
"""
|
|
if isinstance(data, (float, int)):
|
|
data = np.array(data)
|
|
elif isinstance(data, xr.DataArray):
|
|
data = data.values
|
|
|
|
return data.reshape(dims)
|
|
|
|
def _series_selector(self, x_row_or_col, cell):
|
|
"""
|
|
Selects if a series data (DATA/LOOKUPS), should be read by columns,
|
|
rows or cellrange name.
|
|
Based on the input format of x_row_or_col and cell.
|
|
The format of the 2 variables must be consistent.
|
|
|
|
Parameters
|
|
----------
|
|
x_row_or_col: str
|
|
String of a number if series is given in a row, letter if series is
|
|
given in a column or name if the series is given by cellrange name.
|
|
cell: str
|
|
Cell identificator, such as "A1", or name if the data is given
|
|
by cellrange name.
|
|
|
|
Returns
|
|
-------
|
|
series_across: str
|
|
"row" if series is given in a row
|
|
"column" if series is given in a column
|
|
"name" if series and data are given by range name
|
|
|
|
"""
|
|
try:
|
|
# if x_row_or_col is numeric the series must be a row
|
|
int(x_row_or_col)
|
|
return "row"
|
|
|
|
except ValueError:
|
|
if self._split_excel_cell(cell):
|
|
# if the cell can be splitted means that the format is
|
|
# "A1" like then the series must be a column
|
|
return "column"
|
|
else:
|
|
return "name"
|
|
|
|
|
|
class ExtData(External, Data):
|
|
"""
|
|
Class for Vensim GET XLS DATA/GET DIRECT DATA
|
|
"""
|
|
|
|
def __init__(self, file_name, tab, time_row_or_col, cell,
|
|
interp, coords, root, final_coords, py_name):
|
|
super().__init__(py_name)
|
|
self.files = [file_name]
|
|
self.tabs = [tab]
|
|
self.time_row_or_cols = [time_row_or_col]
|
|
self.cells = [cell]
|
|
self.coordss = [coords]
|
|
self.root = root
|
|
self.final_coords = final_coords
|
|
self.interp = interp or "interpolate"
|
|
self.is_float = not bool(coords)
|
|
|
|
# check if the interpolation method is valid
|
|
if self.interp not in ["interpolate", "raw",
|
|
"look_forward", "hold_backward"]:
|
|
raise ValueError(self.py_name + "\n"
|
|
+ " The interpolation method (interp) must be "
|
|
+ "'raw', 'interpolate', "
|
|
+ "'look_forward' or 'hold_backward'")
|
|
|
|
def add(self, file_name, tab, time_row_or_col, cell, interp, coords):
|
|
"""
|
|
Add information to retrieve new dimension in an already declared object
|
|
"""
|
|
self.files.append(file_name)
|
|
self.tabs.append(tab)
|
|
self.time_row_or_cols.append(time_row_or_col)
|
|
self.cells.append(cell)
|
|
self.coordss.append(coords)
|
|
|
|
interp = interp or "interpolate"
|
|
|
|
if interp.replace(" ", "_") != self.interp:
|
|
raise ValueError(self.py_name + "\n"
|
|
+ "Error matching interpolation method with "
|
|
+ "previously defined one")
|
|
|
|
if list(coords) != list(self.coordss[0]):
|
|
raise ValueError(self.py_name + "\n"
|
|
+ "Error matching dimensions with previous data")
|
|
|
|
def initialize(self):
|
|
"""
|
|
Initialize all elements and create the self.data xarray.DataArray
|
|
"""
|
|
if not self.coordss[0]:
|
|
# Just load one value (no add)
|
|
for self.file, self.tab, self.x_row_or_col, \
|
|
self.cell, self.coords\
|
|
in zip(self.files, self.tabs, self.time_row_or_cols,
|
|
self.cells, self.coordss):
|
|
self.data = self._initialize_data("data")
|
|
else:
|
|
# Load in several lines (add)
|
|
self.data = xr.DataArray(
|
|
np.nan, self.final_coords, list(self.final_coords))
|
|
|
|
for self.file, self.tab, self.x_row_or_col, \
|
|
self.cell, self.coords\
|
|
in zip(self.files, self.tabs, self.time_row_or_cols,
|
|
self.cells, self.coordss):
|
|
values = self._initialize_data("data")
|
|
|
|
coords = {"time": values.coords["time"].values, **self.coords}
|
|
if "time" not in self.data.dims:
|
|
self.data = self.data.expand_dims(
|
|
{"time": coords["time"]}, axis=0).copy()
|
|
|
|
self.data.loc[coords] = values.values
|
|
|
|
# set what to return when raw
|
|
if self.final_coords:
|
|
self.nan = xr.DataArray(
|
|
np.nan, self.final_coords, list(self.final_coords))
|
|
else:
|
|
self.nan = np.nan
|
|
|
|
|
|
class ExtLookup(External, Lookups):
|
|
"""
|
|
Class for Vensim GET XLS LOOKUPS/GET DIRECT LOOKUPS
|
|
"""
|
|
|
|
def __init__(self, file_name, tab, x_row_or_col, cell, coords,
|
|
root, final_coords, py_name):
|
|
super().__init__(py_name)
|
|
self.files = [file_name]
|
|
self.tabs = [tab]
|
|
self.x_row_or_cols = [x_row_or_col]
|
|
self.cells = [cell]
|
|
self.coordss = [coords]
|
|
self.root = root
|
|
self.final_coords = final_coords
|
|
self.interp = "interpolate"
|
|
self.is_float = not bool(coords)
|
|
|
|
def add(self, file_name, tab, x_row_or_col, cell, coords):
|
|
"""
|
|
Add information to retrieve new dimension in an already declared object
|
|
"""
|
|
self.files.append(file_name)
|
|
self.tabs.append(tab)
|
|
self.x_row_or_cols.append(x_row_or_col)
|
|
self.cells.append(cell)
|
|
self.coordss.append(coords)
|
|
|
|
if list(coords) != list(self.coordss[0]):
|
|
raise ValueError(self.py_name + "\n"
|
|
+ "Error matching dimensions with previous data")
|
|
|
|
def initialize(self):
|
|
"""
|
|
Initialize all elements and create the self.data xarray.DataArray
|
|
"""
|
|
if not self.coordss[0]:
|
|
# Just loag one value (no add)
|
|
for self.file, self.tab, self.x_row_or_col, \
|
|
self.cell, self.coords\
|
|
in zip(self.files, self.tabs, self.x_row_or_cols,
|
|
self.cells, self.coordss):
|
|
self.data = self._initialize_data("lookup")
|
|
else:
|
|
# Load in several lines (add)
|
|
self.data = xr.DataArray(
|
|
np.nan, self.final_coords, list(self.final_coords))
|
|
|
|
for self.file, self.tab, self.x_row_or_col, \
|
|
self.cell, self.coords\
|
|
in zip(self.files, self.tabs, self.x_row_or_cols,
|
|
self.cells, self.coordss):
|
|
values = self._initialize_data("lookup")
|
|
|
|
coords = {
|
|
"lookup_dim": values.coords["lookup_dim"].values,
|
|
**self.coords
|
|
}
|
|
if "lookup_dim" not in self.data.dims:
|
|
self.data = self.data.expand_dims(
|
|
{"lookup_dim": coords["lookup_dim"]}, axis=0).copy()
|
|
|
|
self.data.loc[coords] = values.values
|
|
|
|
|
|
class ExtConstant(External):
|
|
"""
|
|
Class for Vensim GET XLS CONSTANTS/GET DIRECT CONSTANTS
|
|
"""
|
|
|
|
def __init__(self, file_name, tab, cell, coords,
|
|
root, final_coords, py_name):
|
|
super().__init__(py_name)
|
|
self.files = [file_name]
|
|
self.tabs = [tab]
|
|
self.transposes = [
|
|
cell[-1] == '*' and np.prod(utils.compute_shape(coords)) > 1]
|
|
self.cells = [cell.strip('*')]
|
|
self.coordss = [coords]
|
|
self.root = root
|
|
self.final_coords = final_coords
|
|
|
|
def add(self, file_name, tab, cell, coords):
|
|
"""
|
|
Add information to retrieve new dimension in an already declared object
|
|
"""
|
|
self.files.append(file_name)
|
|
self.tabs.append(tab)
|
|
self.transposes.append(
|
|
cell[-1] == '*' and np.prod(utils.compute_shape(coords)) > 1)
|
|
self.cells.append(cell.strip('*'))
|
|
self.coordss.append(coords)
|
|
|
|
if list(coords) != list(self.coordss[0]):
|
|
raise ValueError(self.py_name + "\n"
|
|
+ "Error matching dimensions with previous data")
|
|
|
|
def initialize(self):
|
|
"""
|
|
Initialize all elements and create the self.data xarray.DataArray
|
|
"""
|
|
if not self.coordss[0]:
|
|
# Just loag one value (no add)
|
|
for self.file, self.tab, self.transpose, self.cell, self.coords\
|
|
in zip(self.files, self.tabs, self.transposes,
|
|
self.cells, self.coordss):
|
|
self.data = self._initialize()
|
|
else:
|
|
# Load in several lines (add)
|
|
|
|
self.data = xr.DataArray(
|
|
np.nan, self.final_coords, list(self.final_coords))
|
|
|
|
for self.file, self.tab, self.transpose, self.cell, self.coords\
|
|
in zip(self.files, self.tabs, self.transposes,
|
|
self.cells, self.coordss):
|
|
self.data.loc[self.coords] = self._initialize().values
|
|
|
|
def _initialize(self):
|
|
"""
|
|
Initialize one element
|
|
"""
|
|
self._resolve_file(root=self.root)
|
|
split = self._split_excel_cell(self.cell)
|
|
if split:
|
|
data_across = "cell"
|
|
cell = split
|
|
else:
|
|
data_across = "name"
|
|
cell = self.cell
|
|
|
|
shape = utils.compute_shape(self.coords, reshape_len=2,
|
|
py_name=self.py_name)
|
|
|
|
if self.transpose:
|
|
shape.reverse()
|
|
|
|
data = self._get_constant_data(data_across, cell, shape)
|
|
|
|
if self.transpose:
|
|
data = data.transpose()
|
|
|
|
if np.any(np.isnan(data)):
|
|
# nan values in data
|
|
if data_across == "name":
|
|
cell_type = "Cellrange"
|
|
else:
|
|
cell_type = "Reference cell"
|
|
|
|
if self.missing == "warning":
|
|
warnings.warn(
|
|
self.py_name + "\n"
|
|
+ "Constant value missing or non-valid in:\n"
|
|
+ self._file_sheet
|
|
+ "\t{}:\t'{}'\n".format(cell_type, self.cell)
|
|
)
|
|
elif self.missing == "raise":
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
+ "Constant value missing or non-valid in:\n"
|
|
+ self._file_sheet
|
|
+ "\t{}:\t'{}'\n".format(cell_type, self.cell)
|
|
)
|
|
|
|
# Create only an xarray if the data is not 0 dimensional
|
|
if len(self.coords) > 0:
|
|
reshape_dims = tuple(utils.compute_shape(self.coords))
|
|
data = self._reshape(data, reshape_dims)
|
|
return xr.DataArray(
|
|
data=data, coords=self.coords, dims=list(self.coords)
|
|
)
|
|
else:
|
|
# need to ensure float is returned and not numpy.float
|
|
return float(data)
|
|
|
|
def _get_constant_data(self, data_across, cell, shape):
|
|
"""
|
|
Function thar reads data from excel file for CONSTANT
|
|
|
|
Parameters
|
|
----------
|
|
data_across: "cell" or "name"
|
|
The way to read data file.
|
|
cell: int or str
|
|
If data_across is "cell" the lefttop split cell value where
|
|
the data is.
|
|
If data_across is "name" the cell range name where the data is.
|
|
shape: list
|
|
The shape of the data in 2D.
|
|
|
|
Returns
|
|
-------
|
|
data: float/ndarray(1D/2D)
|
|
The values of the data.
|
|
|
|
"""
|
|
if data_across == "cell":
|
|
# read data from topleft cell name using pandas
|
|
start_row, start_col = cell
|
|
|
|
return self._get_data_from_file(
|
|
rows=[start_row, start_row + shape[0]],
|
|
cols=[start_col, start_col + shape[1]])
|
|
|
|
else:
|
|
# read data from cell range name using OpenPyXL
|
|
data, xl_shape = self._get_data_from_file_opyxl(cell)
|
|
if shape != xl_shape:
|
|
raise ValueError(self.py_name + "\n"
|
|
+ "Data given in:\n"
|
|
+ self._file_sheet
|
|
+ "\tData name:\t{}\n".format(cell)
|
|
+ " has not the same shape as the"
|
|
+ " given coordinates")
|
|
|
|
return data
|
|
|
|
def __call__(self):
|
|
return self.data
|
|
|
|
|
|
class ExtSubscript(External):
|
|
"""
|
|
Class for Vensim GET XLS SUBSCRIPT/GET DIRECT SUBSCRIPT
|
|
"""
|
|
|
|
def __init__(self, file_name, tab, firstcell, lastcell, prefix, root):
|
|
super().__init__("Hardcoded external subscript")
|
|
self.file = file_name
|
|
self.tab = tab
|
|
self.prefix = prefix
|
|
self._resolve_file(root=root)
|
|
split = self._split_excel_cell(firstcell)
|
|
if split:
|
|
subs = self.get_subscripts_cell(*split, lastcell)
|
|
else:
|
|
subs = self.get_subscripts_name(firstcell)
|
|
|
|
self.subscript = [
|
|
self.prefix + str(d) for d in subs.flatten()
|
|
if self._not_nan(d)
|
|
]
|
|
|
|
def get_subscripts_cell(self, row_first, col_first, lastcell):
|
|
"""Get subscripts from common cell definition"""
|
|
if not lastcell:
|
|
row_last, col_last = None, None
|
|
else:
|
|
split = self._split_excel_cell(lastcell)
|
|
if split:
|
|
# last cell is col and row
|
|
row_last, col_last = split
|
|
elif lastcell.isdigit():
|
|
# last cell is row number only
|
|
row_last = int(lastcell)-1
|
|
col_last = None
|
|
else:
|
|
# last cell is a col value only
|
|
row_last = None
|
|
col_last = self._col_to_num(lastcell)
|
|
|
|
# update read keywargs for rows and columns to read
|
|
read_kwargs = {}
|
|
if row_last is not None:
|
|
read_kwargs['nrows'] = row_last-row_first+1
|
|
if col_last is not None:
|
|
read_kwargs['usecols'] = np.arange(col_first, col_last+1)
|
|
|
|
# get the function to read the data based on its extension
|
|
ext = self.file.suffix.lower()
|
|
if ext in _SPREADSHEET_EXTS:
|
|
read_func = pd.read_excel
|
|
read_kwargs['sheet_name'] = self.tab
|
|
elif ext == '.csv':
|
|
read_func = pd.read_csv
|
|
if self.tab and not self.tab[0].isalnum():
|
|
read_kwargs['sep'] = self.tab
|
|
else:
|
|
read_func = pd.read_table
|
|
if self.tab and not self.tab[0].isalnum():
|
|
read_kwargs['sep'] = self.tab
|
|
|
|
# read the data
|
|
data = read_func(
|
|
self.file,
|
|
skiprows=row_first,
|
|
dtype=object,
|
|
header=None,
|
|
**read_kwargs
|
|
).values
|
|
|
|
# skip columns if usecols couldn't be used
|
|
if col_last is None:
|
|
data = data[:, col_first:]
|
|
|
|
return data
|
|
|
|
def get_subscripts_name(self, cellname):
|
|
"""Get subscripts from cell range name definition"""
|
|
try:
|
|
excel = load_workbook(self.file, read_only=True, data_only=True)
|
|
except InvalidFileException:
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
f"Cannot read the file '{self.file}'...\n"
|
|
f"It could happen that firstcell='{cellname}' was "
|
|
"read as a cell range name due to a wrong definition "
|
|
"of cell value"
|
|
)
|
|
global_cellranges = excel.defined_names
|
|
local_cellranges = None
|
|
# need to lower the sheetnames as Vensim has no case sensitivity
|
|
for sheet in excel.sheetnames:
|
|
if sheet.lower() == self.tab.lower():
|
|
local_cellranges = excel[sheet].defined_names
|
|
break
|
|
|
|
if local_cellranges is None:
|
|
# Error if it is not able to get the localSheetId
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
"The sheet doesn't exist...\n"
|
|
+ self._file_sheet
|
|
)
|
|
try:
|
|
# Search for local and global names
|
|
cellrange = local_cellranges.get(cellname)\
|
|
or global_cellranges.get(cellname)
|
|
sheet, cells = next(cellrange.destinations)
|
|
|
|
assert sheet.lower() == self.tab.lower()
|
|
self.tab = sheet # case insensitivity in sheet name
|
|
|
|
# Get the cells where the cellrange is defined
|
|
first_cell, last_cell = cells.replace("$", '').split(":")
|
|
except (AttributeError, AssertionError):
|
|
# key error if the cellrange doesn't exist in the file or sheet
|
|
raise AttributeError(
|
|
self.py_name + "\n"
|
|
f"The cellrange name '{cellname}'\n"
|
|
"Doesn't exist in:\n" + self._file_sheet
|
|
)
|
|
else:
|
|
return self.get_subscripts_cell(
|
|
*self._split_excel_cell(first_cell), last_cell)
|
|
|
|
@staticmethod
|
|
def _not_nan(value):
|
|
"""Check if a value is not nan"""
|
|
if isinstance(value, str):
|
|
return True
|
|
return not np.isnan(value)
|