CCR/.venv/lib/python3.12/site-packages/pysd/py_backend/lookups.py

214 lines
8.3 KiB
Python

import warnings
import pandas as pd
import numpy as np
import xarray as xr
from . import utils
class Lookups(object):
def set_values(self, values):
"""Set new values from user input"""
self.data = xr.DataArray(
np.nan, self.final_coords, list(self.final_coords))
if isinstance(values, pd.Series):
index = list(values.index)
index.sort()
self.data = self.data.expand_dims(
{'lookup_dim': index}, axis=0).copy()
for index, value in values.items():
if isinstance(values.values[0], xr.DataArray):
self.data.loc[index].loc[value.coords] = value
else:
self.data.loc[index] = value
else:
if isinstance(values, xr.DataArray):
self.data.loc[values.coords] = values.values
else:
if self.final_coords:
self.data.loc[:] = values
else:
self.data = values
def __call__(self, x, final_subs=None):
try:
return self._call(self.data, x, final_subs)
except (TypeError, KeyError):
# this except catch the errors when a lookups has been
# changed to a constant value by the user
if final_subs and isinstance(self.data, xr.DataArray):
# self.data is an array, reshape it
outdata = xr.DataArray(np.nan, final_subs, list(final_subs))
return xr.broadcast(outdata, self.data)[1]
elif final_subs:
# self.data is a float, create an array
return xr.DataArray(self.data, final_subs, list(final_subs))
else:
return self.data
def _call(self, data, x, final_subs=None):
if isinstance(x, xr.DataArray):
if not x.dims:
# shape 0 xarrays
return self._call(data, float(x))
outdata = xr.DataArray(np.nan, final_subs, list(final_subs))
if self.interp != "extrapolate" and\
np.all(x > data['lookup_dim'].values[-1]):
outdata_ext = data[-1]
warnings.warn(
self.py_name + "\n"
+ "extrapolating data above the maximum value of the series")
elif self.interp != "extrapolate" and\
np.all(x < data['lookup_dim'].values[0]):
outdata_ext = data[0]
warnings.warn(
self.py_name + "\n"
+ "extrapolating data below the minimum value of the series")
else:
data = xr.broadcast(data, x)[0]
for a in utils.xrsplit(x):
outdata.loc[a.coords] = self._call(
data.loc[a.coords],
float(a))
return outdata
# return the final array in the specified dimensions order
return xr.broadcast(
outdata, outdata_ext.reset_coords('lookup_dim', drop=True))[1]
else:
if x in data['lookup_dim'].values:
outdata = data.sel(lookup_dim=x)
elif x > data['lookup_dim'].values[-1]:
if self.interp == "extrapolate":
# extrapolate method for xmile models
k = (data[-1]-data[-2])\
/ (data['lookup_dim'].values[-1]
- data['lookup_dim'].values[-2])
outdata = data[-1] + k*(x - data['lookup_dim'].values[-1])
else:
outdata = data[-1]
warnings.warn(
self.py_name + "\n"
+ "extrapolating data above the maximum value of the series")
elif x < data['lookup_dim'].values[0]:
if self.interp == "extrapolate":
# extrapolate method for xmile models
k = (data[1]-data[0])\
/ (data['lookup_dim'].values[1]
- data['lookup_dim'].values[0])
outdata = data[0] + k*(x - data['lookup_dim'].values[0])
else:
outdata = data[0]
warnings.warn(
self.py_name + "\n"
+ "extrapolating data below the minimum value of the series")
elif self.interp == 'hold_backward':
outdata = data.sel(lookup_dim=x, method="pad")
else:
outdata = data.interp(lookup_dim=x)
# the output could be a float or an xarray
if self.is_float:
# if lookup has no-coords return a float
return float(outdata)
else:
# Remove lookup dimension coord from the DataArray
return outdata.reset_coords('lookup_dim', drop=True)
class HardcodedLookups(Lookups):
"""Class for lookups defined in the file"""
def __init__(self, x, y, coords, interp, final_coords, py_name):
# TODO: avoid add and merge all declarations in one definition
self.is_float = not bool(coords)
self.py_name = py_name
self.final_coords = final_coords
self.values = [(x, y, coords)]
self.interp = interp
def add(self, x, y, coords):
self.values.append((x, y, coords))
def initialize(self):
"""
Initialize all elements and create the self.data xarray.DataArray
"""
if len(self.values) == 1:
# Just loag one value (no add)
for x, y, coords in self.values:
if len(x) != len(set(x)):
raise ValueError(
self.py_name + "\n"
"x dimension has repeated values..."
)
try:
y = np.array(y).reshape((len(x),) + (1,)*len(coords))
except ValueError:
raise ValueError(
self.py_name + "\n"
"x and y dimensions have different length..."
)
self.data = xr.DataArray(
np.tile(y, [1] + utils.compute_shape(coords)),
{"lookup_dim": x, **coords},
["lookup_dim"] + list(coords)
)
else:
# Load in several lines (add)
self.data = xr.DataArray(
np.nan, self.final_coords, list(self.final_coords))
for x, y, coords in self.values:
if "lookup_dim" not in self.data.dims:
# include lookup_dim dimension in the final array
self.data = self.data.expand_dims(
{"lookup_dim": x}, axis=0).copy()
else:
# add new coordinates (if needed) to lookup_dim
x_old = list(self.data.lookup_dim.values)
x_new = list(set(x).difference(x_old))
self.data = self.data.reindex(lookup_dim=x_old+x_new)
# reshape y value and assign it to self.data
y = np.array(y).reshape((len(x),) + (1,)*len(coords))
self.data.loc[{"lookup_dim": x, **coords}] =\
np.tile(y, [1] + utils.compute_shape(coords))
# sort data
self.data = self.data.sortby("lookup_dim")
if np.any(np.isnan(self.data)):
# fill missing values of different input lookup_dim values
values = self.data.values
self._fill_missing(self.data.lookup_dim.values, values)
self.data = xr.DataArray(values, self.data.coords, self.data.dims)
def _fill_missing(self, series, data):
"""
Fills missing values in lookups to have a common series.
Mutates the values in data.
Returns
-------
None
"""
if len(data.shape) > 1:
# break the data array until arrive to a vector
for i in range(data.shape[1]):
if np.any(np.isnan(data[:, i])):
self._fill_missing(series, data[:, i])
elif not np.all(np.isnan(data)):
# interpolate missing values
data[np.isnan(data)] = np.interp(
series[np.isnan(data)],
series[~np.isnan(data)],
data[~np.isnan(data)])