214 lines
8.3 KiB
Python
214 lines
8.3 KiB
Python
import warnings
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
import xarray as xr
|
|
|
|
from . import utils
|
|
|
|
|
|
class Lookups(object):
|
|
def set_values(self, values):
|
|
"""Set new values from user input"""
|
|
self.data = xr.DataArray(
|
|
np.nan, self.final_coords, list(self.final_coords))
|
|
|
|
if isinstance(values, pd.Series):
|
|
index = list(values.index)
|
|
index.sort()
|
|
self.data = self.data.expand_dims(
|
|
{'lookup_dim': index}, axis=0).copy()
|
|
|
|
for index, value in values.items():
|
|
if isinstance(values.values[0], xr.DataArray):
|
|
self.data.loc[index].loc[value.coords] = value
|
|
else:
|
|
self.data.loc[index] = value
|
|
else:
|
|
if isinstance(values, xr.DataArray):
|
|
self.data.loc[values.coords] = values.values
|
|
else:
|
|
if self.final_coords:
|
|
self.data.loc[:] = values
|
|
else:
|
|
self.data = values
|
|
|
|
def __call__(self, x, final_subs=None):
|
|
try:
|
|
return self._call(self.data, x, final_subs)
|
|
except (TypeError, KeyError):
|
|
# this except catch the errors when a lookups has been
|
|
# changed to a constant value by the user
|
|
if final_subs and isinstance(self.data, xr.DataArray):
|
|
# self.data is an array, reshape it
|
|
outdata = xr.DataArray(np.nan, final_subs, list(final_subs))
|
|
return xr.broadcast(outdata, self.data)[1]
|
|
elif final_subs:
|
|
# self.data is a float, create an array
|
|
return xr.DataArray(self.data, final_subs, list(final_subs))
|
|
else:
|
|
return self.data
|
|
|
|
def _call(self, data, x, final_subs=None):
|
|
if isinstance(x, xr.DataArray):
|
|
if not x.dims:
|
|
# shape 0 xarrays
|
|
return self._call(data, float(x))
|
|
|
|
outdata = xr.DataArray(np.nan, final_subs, list(final_subs))
|
|
|
|
if self.interp != "extrapolate" and\
|
|
np.all(x > data['lookup_dim'].values[-1]):
|
|
outdata_ext = data[-1]
|
|
warnings.warn(
|
|
self.py_name + "\n"
|
|
+ "extrapolating data above the maximum value of the series")
|
|
elif self.interp != "extrapolate" and\
|
|
np.all(x < data['lookup_dim'].values[0]):
|
|
outdata_ext = data[0]
|
|
warnings.warn(
|
|
self.py_name + "\n"
|
|
+ "extrapolating data below the minimum value of the series")
|
|
else:
|
|
data = xr.broadcast(data, x)[0]
|
|
for a in utils.xrsplit(x):
|
|
outdata.loc[a.coords] = self._call(
|
|
data.loc[a.coords],
|
|
float(a))
|
|
return outdata
|
|
|
|
# return the final array in the specified dimensions order
|
|
return xr.broadcast(
|
|
outdata, outdata_ext.reset_coords('lookup_dim', drop=True))[1]
|
|
|
|
else:
|
|
if x in data['lookup_dim'].values:
|
|
outdata = data.sel(lookup_dim=x)
|
|
elif x > data['lookup_dim'].values[-1]:
|
|
if self.interp == "extrapolate":
|
|
# extrapolate method for xmile models
|
|
k = (data[-1]-data[-2])\
|
|
/ (data['lookup_dim'].values[-1]
|
|
- data['lookup_dim'].values[-2])
|
|
outdata = data[-1] + k*(x - data['lookup_dim'].values[-1])
|
|
else:
|
|
outdata = data[-1]
|
|
warnings.warn(
|
|
self.py_name + "\n"
|
|
+ "extrapolating data above the maximum value of the series")
|
|
elif x < data['lookup_dim'].values[0]:
|
|
if self.interp == "extrapolate":
|
|
# extrapolate method for xmile models
|
|
k = (data[1]-data[0])\
|
|
/ (data['lookup_dim'].values[1]
|
|
- data['lookup_dim'].values[0])
|
|
outdata = data[0] + k*(x - data['lookup_dim'].values[0])
|
|
else:
|
|
outdata = data[0]
|
|
warnings.warn(
|
|
self.py_name + "\n"
|
|
+ "extrapolating data below the minimum value of the series")
|
|
elif self.interp == 'hold_backward':
|
|
outdata = data.sel(lookup_dim=x, method="pad")
|
|
else:
|
|
outdata = data.interp(lookup_dim=x)
|
|
|
|
# the output could be a float or an xarray
|
|
if self.is_float:
|
|
# if lookup has no-coords return a float
|
|
return float(outdata)
|
|
else:
|
|
# Remove lookup dimension coord from the DataArray
|
|
return outdata.reset_coords('lookup_dim', drop=True)
|
|
|
|
|
|
class HardcodedLookups(Lookups):
|
|
"""Class for lookups defined in the file"""
|
|
|
|
def __init__(self, x, y, coords, interp, final_coords, py_name):
|
|
# TODO: avoid add and merge all declarations in one definition
|
|
self.is_float = not bool(coords)
|
|
self.py_name = py_name
|
|
self.final_coords = final_coords
|
|
self.values = [(x, y, coords)]
|
|
self.interp = interp
|
|
|
|
def add(self, x, y, coords):
|
|
self.values.append((x, y, coords))
|
|
|
|
def initialize(self):
|
|
"""
|
|
Initialize all elements and create the self.data xarray.DataArray
|
|
"""
|
|
if len(self.values) == 1:
|
|
# Just loag one value (no add)
|
|
for x, y, coords in self.values:
|
|
if len(x) != len(set(x)):
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
"x dimension has repeated values..."
|
|
)
|
|
try:
|
|
y = np.array(y).reshape((len(x),) + (1,)*len(coords))
|
|
except ValueError:
|
|
raise ValueError(
|
|
self.py_name + "\n"
|
|
"x and y dimensions have different length..."
|
|
)
|
|
self.data = xr.DataArray(
|
|
np.tile(y, [1] + utils.compute_shape(coords)),
|
|
{"lookup_dim": x, **coords},
|
|
["lookup_dim"] + list(coords)
|
|
)
|
|
else:
|
|
# Load in several lines (add)
|
|
self.data = xr.DataArray(
|
|
np.nan, self.final_coords, list(self.final_coords))
|
|
|
|
for x, y, coords in self.values:
|
|
if "lookup_dim" not in self.data.dims:
|
|
# include lookup_dim dimension in the final array
|
|
self.data = self.data.expand_dims(
|
|
{"lookup_dim": x}, axis=0).copy()
|
|
else:
|
|
# add new coordinates (if needed) to lookup_dim
|
|
x_old = list(self.data.lookup_dim.values)
|
|
x_new = list(set(x).difference(x_old))
|
|
self.data = self.data.reindex(lookup_dim=x_old+x_new)
|
|
|
|
# reshape y value and assign it to self.data
|
|
y = np.array(y).reshape((len(x),) + (1,)*len(coords))
|
|
self.data.loc[{"lookup_dim": x, **coords}] =\
|
|
np.tile(y, [1] + utils.compute_shape(coords))
|
|
|
|
# sort data
|
|
self.data = self.data.sortby("lookup_dim")
|
|
|
|
if np.any(np.isnan(self.data)):
|
|
# fill missing values of different input lookup_dim values
|
|
values = self.data.values
|
|
self._fill_missing(self.data.lookup_dim.values, values)
|
|
self.data = xr.DataArray(values, self.data.coords, self.data.dims)
|
|
|
|
def _fill_missing(self, series, data):
|
|
"""
|
|
Fills missing values in lookups to have a common series.
|
|
Mutates the values in data.
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
|
|
"""
|
|
if len(data.shape) > 1:
|
|
# break the data array until arrive to a vector
|
|
for i in range(data.shape[1]):
|
|
if np.any(np.isnan(data[:, i])):
|
|
self._fill_missing(series, data[:, i])
|
|
elif not np.all(np.isnan(data)):
|
|
# interpolate missing values
|
|
data[np.isnan(data)] = np.interp(
|
|
series[np.isnan(data)],
|
|
series[~np.isnan(data)],
|
|
data[~np.isnan(data)])
|