CCR/.venv/lib/python3.12/site-packages/xarray/tests/test_interp.py

1169 lines
36 KiB
Python

from __future__ import annotations
from itertools import combinations, permutations, product
from typing import cast, get_args
import numpy as np
import pandas as pd
import pytest
import xarray as xr
from xarray.coding.cftimeindex import _parse_array_of_cftime_strings
from xarray.core.types import (
Interp1dOptions,
InterpnOptions,
InterpolantOptions,
InterpOptions,
)
from xarray.tests import (
assert_allclose,
assert_equal,
assert_identical,
has_dask,
has_scipy,
has_scipy_ge_1_13,
raise_if_dask_computes,
requires_cftime,
requires_dask,
requires_scipy,
)
from xarray.tests.test_dataset import create_test_data
try:
import scipy
except ImportError:
pass
ALL_1D = get_args(Interp1dOptions) + get_args(InterpolantOptions)
def get_example_data(case: int) -> xr.DataArray:
if case == 0:
# 2D
x = np.linspace(0, 1, 100)
y = np.linspace(0, 0.1, 30)
return xr.DataArray(
np.sin(x[:, np.newaxis]) * np.cos(y),
dims=["x", "y"],
coords={"x": x, "y": y, "x2": ("x", x**2)},
)
elif case == 1:
# 2D chunked single dim
return get_example_data(0).chunk({"y": 3})
elif case == 2:
# 2D chunked both dims
return get_example_data(0).chunk({"x": 25, "y": 3})
elif case == 3:
# 3D
x = np.linspace(0, 1, 100)
y = np.linspace(0, 0.1, 30)
z = np.linspace(0.1, 0.2, 10)
return xr.DataArray(
np.sin(x[:, np.newaxis, np.newaxis]) * np.cos(y[:, np.newaxis]) * z,
dims=["x", "y", "z"],
coords={"x": x, "y": y, "x2": ("x", x**2), "z": z},
)
elif case == 4:
# 3D chunked single dim
# chunksize=5 lets us check whether we rechunk to 1 with quintic
return get_example_data(3).chunk({"z": 5})
else:
raise ValueError("case must be 1-4")
@pytest.fixture
def nd_interp_coords():
# interpolation indices for nd interpolation of da from case 3 of get_example_data
da = get_example_data(case=3)
coords = {}
# grid -> grid
coords["xdestnp"] = np.linspace(0.1, 1.0, 11)
coords["ydestnp"] = np.linspace(0.0, 0.2, 10)
coords["zdestnp"] = da.z.data
# list of the points defined by the above mesh in C order
mesh_x, mesh_y, mesh_z = np.meshgrid(
coords["xdestnp"], coords["ydestnp"], coords["zdestnp"], indexing="ij"
)
coords["grid_grid_points"] = np.column_stack(
[mesh_x.ravel(), mesh_y.ravel(), mesh_z.ravel()]
)
# grid -> oned
coords["xdest"] = xr.DataArray(np.linspace(0.1, 1.0, 11), dims="y") # type: ignore[assignment]
coords["ydest"] = xr.DataArray(np.linspace(0.0, 0.2, 11), dims="y") # type: ignore[assignment]
coords["zdest"] = da.z
# grid of the points defined by the oned gridded with zdest in C order
coords["grid_oned_points"] = np.array(
[
(a, b, c)
for (a, b), c in product(
zip(coords["xdest"].data, coords["ydest"].data, strict=False),
coords["zdest"].data,
)
]
)
return coords
def test_keywargs():
if not has_scipy:
pytest.skip("scipy is not installed.")
da = get_example_data(0)
assert_equal(da.interp(x=[0.5, 0.8]), da.interp({"x": [0.5, 0.8]}))
@pytest.mark.parametrize("method", ["linear", "cubic"])
@pytest.mark.parametrize("dim", ["x", "y"])
@pytest.mark.parametrize(
"case", [pytest.param(0, id="no_chunk"), pytest.param(1, id="chunk_y")]
)
def test_interpolate_1d(method: InterpOptions, dim: str, case: int) -> None:
if not has_scipy:
pytest.skip("scipy is not installed.")
if not has_dask and case in [1]:
pytest.skip("dask is not installed in the environment.")
da = get_example_data(case)
xdest = np.linspace(0.0, 0.9, 80)
actual = da.interp(method=method, coords={dim: xdest})
# scipy interpolation for the reference
def func(obj, new_x):
return scipy.interpolate.interp1d(
da[dim],
obj.data,
axis=obj.get_axis_num(dim),
bounds_error=False,
fill_value=np.nan,
kind=method,
)(new_x)
if dim == "x":
coords = {"x": xdest, "y": da["y"], "x2": ("x", func(da["x2"], xdest))}
else: # y
coords = {"x": da["x"], "y": xdest, "x2": da["x2"]}
expected = xr.DataArray(func(da, xdest), dims=["x", "y"], coords=coords)
assert_allclose(actual, expected)
@pytest.mark.parametrize("method", ["cubic", "zero"])
def test_interpolate_1d_methods(method: InterpOptions) -> None:
if not has_scipy:
pytest.skip("scipy is not installed.")
da = get_example_data(0)
dim = "x"
xdest = np.linspace(0.0, 0.9, 80)
actual = da.interp(method=method, coords={dim: xdest})
# scipy interpolation for the reference
def func(obj, new_x):
return scipy.interpolate.interp1d(
da[dim],
obj.data,
axis=obj.get_axis_num(dim),
bounds_error=False,
fill_value=np.nan,
kind=method,
)(new_x)
coords = {"x": xdest, "y": da["y"], "x2": ("x", func(da["x2"], xdest))}
expected = xr.DataArray(func(da, xdest), dims=["x", "y"], coords=coords)
assert_allclose(actual, expected)
@requires_scipy
@pytest.mark.parametrize(
"use_dask, method",
(
(False, "linear"),
(False, "akima"),
pytest.param(
False,
"makima",
marks=pytest.mark.skipif(not has_scipy_ge_1_13, reason="scipy too old"),
),
pytest.param(
True,
"linear",
marks=pytest.mark.skipif(not has_dask, reason="dask not available"),
),
pytest.param(
True,
"akima",
marks=pytest.mark.skipif(not has_dask, reason="dask not available"),
),
),
)
def test_interpolate_vectorize(use_dask: bool, method: InterpOptions) -> None:
# scipy interpolation for the reference
def func(obj, dim, new_x, method):
scipy_kwargs = {}
interpolant_options = {
"barycentric": scipy.interpolate.BarycentricInterpolator,
"krogh": scipy.interpolate.KroghInterpolator,
"pchip": scipy.interpolate.PchipInterpolator,
"akima": scipy.interpolate.Akima1DInterpolator,
"makima": scipy.interpolate.Akima1DInterpolator,
}
shape = [s for i, s in enumerate(obj.shape) if i != obj.get_axis_num(dim)]
for s in new_x.shape[::-1]:
shape.insert(obj.get_axis_num(dim), s)
if method in interpolant_options:
interpolant = interpolant_options[method]
if method == "makima":
scipy_kwargs["method"] = method
return interpolant(
da[dim], obj.data, axis=obj.get_axis_num(dim), **scipy_kwargs
)(new_x).reshape(shape)
else:
return scipy.interpolate.interp1d(
da[dim],
obj.data,
axis=obj.get_axis_num(dim),
kind=method,
bounds_error=False,
fill_value=np.nan,
**scipy_kwargs,
)(new_x).reshape(shape)
da = get_example_data(0)
if use_dask:
da = da.chunk({"y": 5})
# xdest is 1d but has different dimension
xdest = xr.DataArray(
np.linspace(0.1, 0.9, 30),
dims="z",
coords={"z": np.random.randn(30), "z2": ("z", np.random.randn(30))},
)
actual = da.interp(x=xdest, method=method)
expected = xr.DataArray(
func(da, "x", xdest, method),
dims=["z", "y"],
coords={
"z": xdest["z"],
"z2": xdest["z2"],
"y": da["y"],
"x": ("z", xdest.values),
"x2": ("z", func(da["x2"], "x", xdest, method)),
},
)
assert_allclose(actual, expected.transpose("z", "y", transpose_coords=True))
# xdest is 2d
xdest = xr.DataArray(
np.linspace(0.1, 0.9, 30).reshape(6, 5),
dims=["z", "w"],
coords={
"z": np.random.randn(6),
"w": np.random.randn(5),
"z2": ("z", np.random.randn(6)),
},
)
actual = da.interp(x=xdest, method=method)
expected = xr.DataArray(
func(da, "x", xdest, method),
dims=["z", "w", "y"],
coords={
"z": xdest["z"],
"w": xdest["w"],
"z2": xdest["z2"],
"y": da["y"],
"x": (("z", "w"), xdest.data),
"x2": (("z", "w"), func(da["x2"], "x", xdest, method)),
},
)
assert_allclose(actual, expected.transpose("z", "w", "y", transpose_coords=True))
@requires_scipy
@pytest.mark.parametrize("method", get_args(InterpnOptions))
@pytest.mark.parametrize(
"case",
[
pytest.param(3, id="no_chunk"),
pytest.param(
4, id="chunked", marks=pytest.mark.skipif(not has_dask, reason="no dask")
),
],
)
def test_interpolate_nd(case: int, method: InterpnOptions, nd_interp_coords) -> None:
da = get_example_data(case)
# grid -> grid
xdestnp = nd_interp_coords["xdestnp"]
ydestnp = nd_interp_coords["ydestnp"]
zdestnp = nd_interp_coords["zdestnp"]
grid_grid_points = nd_interp_coords["grid_grid_points"]
# the presence/absence of z coordinate may affect nd interpolants, even when the
# coordinate is unchanged
# TODO: test this?
actual = da.interp(x=xdestnp, y=ydestnp, z=zdestnp, method=method)
expected_data = scipy.interpolate.interpn(
points=(da.x, da.y, da.z),
values=da.load().data,
xi=grid_grid_points,
method=method,
bounds_error=False,
).reshape((len(xdestnp), len(ydestnp), len(zdestnp)))
expected = xr.DataArray(
expected_data,
dims=["x", "y", "z"],
coords={
"x": xdestnp,
"y": ydestnp,
"z": zdestnp,
"x2": da["x2"].interp(x=xdestnp, method=method),
},
)
assert_allclose(actual.transpose("x", "y", "z"), expected.transpose("x", "y", "z"))
# grid -> 1d-sample
xdest = nd_interp_coords["xdest"]
ydest = nd_interp_coords["ydest"]
zdest = nd_interp_coords["zdest"]
grid_oned_points = nd_interp_coords["grid_oned_points"]
actual = da.interp(x=xdest, y=ydest, z=zdest, method=method)
expected_data = scipy.interpolate.interpn(
points=(da.x, da.y, da.z),
values=da.data,
xi=grid_oned_points,
method=method,
bounds_error=False,
).reshape([len(xdest), len(zdest)])
expected = xr.DataArray(
expected_data,
dims=["y", "z"],
coords={
"y": ydest,
"z": zdest,
"x": ("y", xdest.values),
"x2": da["x2"].interp(x=xdest, method=method),
},
)
assert_allclose(actual.transpose("y", "z"), expected)
# reversed order
actual = da.interp(y=ydest, x=xdest, z=zdest, method=method)
assert_allclose(actual.transpose("y", "z"), expected)
@requires_scipy
# omit cubic, pchip, quintic because not enough points
@pytest.mark.parametrize("method", ("linear", "nearest", "slinear"))
def test_interpolate_nd_nd(method: InterpnOptions) -> None:
"""Interpolate nd array with an nd indexer sharing coordinates."""
# Create original array
a = [0, 2]
x = [0, 1, 2]
values = np.arange(6).reshape(2, 3)
da = xr.DataArray(values, dims=("a", "x"), coords={"a": a, "x": x})
# Create indexer into `a` with dimensions (y, x)
y = [10]
a_targets = [1, 2, 2]
c = {"x": x, "y": y}
ia = xr.DataArray([a_targets], dims=("y", "x"), coords=c)
out = da.interp(a=ia, method=method)
expected_xi = list(zip(a_targets, x, strict=False))
expected_vec = scipy.interpolate.interpn(
points=(a, x), values=values, xi=expected_xi, method=method
)
expected = xr.DataArray([expected_vec], dims=("y", "x"), coords=c)
xr.testing.assert_allclose(out.drop_vars("a"), expected)
# If the *shared* indexing coordinates do not match, interp should fail.
with pytest.raises(ValueError):
c = {"x": [1], "y": y}
ia = xr.DataArray([[1]], dims=("y", "x"), coords=c)
da.interp(a=ia)
with pytest.raises(ValueError):
c = {"x": [5, 6, 7], "y": y}
ia = xr.DataArray([[1]], dims=("y", "x"), coords=c)
da.interp(a=ia)
@requires_scipy
@pytest.mark.filterwarnings("ignore:All-NaN slice")
def test_interpolate_nd_with_nan() -> None:
"""Interpolate an array with an nd indexer and `NaN` values."""
# Create indexer into `a` with dimensions (y, x)
x = [0, 1, 2]
y = [10, 20]
c = {"x": x, "y": y}
a = np.arange(6, dtype=float).reshape(2, 3)
a[0, 1] = np.nan
ia = xr.DataArray(a, dims=("y", "x"), coords=c)
da = xr.DataArray([1, 2, 2], dims=("a"), coords={"a": [0, 2, 4]})
out = da.interp(a=ia)
expected = xr.DataArray(
[[1.0, np.nan, 2.0], [2.0, 2.0, np.nan]], dims=("y", "x"), coords=c
)
xr.testing.assert_allclose(out.drop_vars("a"), expected)
db = 2 * da
ds = xr.Dataset({"da": da, "db": db})
out2 = ds.interp(a=ia)
expected_ds = xr.Dataset({"da": expected, "db": 2 * expected})
xr.testing.assert_allclose(out2.drop_vars("a"), expected_ds)
@requires_scipy
@pytest.mark.parametrize("method", ("linear",))
@pytest.mark.parametrize(
"case", [pytest.param(0, id="no_chunk"), pytest.param(1, id="chunk_y")]
)
def test_interpolate_scalar(method: InterpOptions, case: int) -> None:
if not has_dask and case in [1]:
pytest.skip("dask is not installed in the environment.")
da = get_example_data(case)
xdest = 0.4
actual = da.interp(x=xdest, method=method)
# scipy interpolation for the reference
def func(obj, new_x):
return scipy.interpolate.interp1d(
da["x"],
obj.data,
axis=obj.get_axis_num("x"),
bounds_error=False,
fill_value=np.nan,
kind=method,
)(new_x)
coords = {"x": xdest, "y": da["y"], "x2": func(da["x2"], xdest)}
expected = xr.DataArray(func(da, xdest), dims=["y"], coords=coords)
assert_allclose(actual, expected)
@requires_scipy
@pytest.mark.parametrize("method", ("linear",))
@pytest.mark.parametrize(
"case", [pytest.param(3, id="no_chunk"), pytest.param(4, id="chunked")]
)
def test_interpolate_nd_scalar(method: InterpOptions, case: int) -> None:
if not has_dask and case in [4]:
pytest.skip("dask is not installed in the environment.")
da = get_example_data(case)
xdest = 0.4
ydest = 0.05
zdest = da.get_index("z")
actual = da.interp(x=xdest, y=ydest, z=zdest, method=method)
# scipy interpolation for the reference
expected_data = scipy.interpolate.RegularGridInterpolator(
(da["x"], da["y"], da["z"]),
da.transpose("x", "y", "z").values,
method=method,
bounds_error=False,
fill_value=np.nan,
)(np.asarray([(xdest, ydest, z_val) for z_val in zdest]))
coords = {
"x": xdest,
"y": ydest,
"x2": da["x2"].interp(x=xdest, method=method),
"z": da["z"],
}
expected = xr.DataArray(expected_data, dims=["z"], coords=coords)
assert_allclose(actual, expected)
@pytest.mark.parametrize("use_dask", [True, False])
def test_nans(use_dask: bool) -> None:
if not has_scipy:
pytest.skip("scipy is not installed.")
da = xr.DataArray([0, 1, np.nan, 2], dims="x", coords={"x": range(4)})
if not has_dask and use_dask:
pytest.skip("dask is not installed in the environment.")
da = da.chunk()
actual = da.interp(x=[0.5, 1.5])
# not all values are nan
assert actual.count() > 0
@requires_scipy
@pytest.mark.parametrize("use_dask", [True, False])
def test_errors(use_dask: bool) -> None:
# spline is unavailable
da = xr.DataArray([0, 1, np.nan, 2], dims="x", coords={"x": range(4)})
if not has_dask and use_dask:
pytest.skip("dask is not installed in the environment.")
da = da.chunk()
for method in ["spline"]:
with pytest.raises(ValueError), pytest.warns(PendingDeprecationWarning):
da.interp(x=[0.5, 1.5], method=method) # type: ignore[arg-type]
# not sorted
if use_dask:
da = get_example_data(3)
else:
da = get_example_data(0)
result = da.interp(x=[-1, 1, 3], kwargs={"fill_value": 0.0})
assert not np.isnan(result.values).any()
result = da.interp(x=[-1, 1, 3])
assert np.isnan(result.values).any()
# invalid method
with pytest.raises(ValueError):
da.interp(x=[2, 0], method="boo") # type: ignore[arg-type]
with pytest.raises(ValueError):
da.interp(y=[2, 0], method="boo") # type: ignore[arg-type]
# object-type DataArray cannot be interpolated
da = xr.DataArray(["a", "b", "c"], dims="x", coords={"x": [0, 1, 2]})
with pytest.raises(TypeError):
da.interp(x=0)
@requires_scipy
def test_dtype() -> None:
data_vars = dict(
a=("time", np.array([1, 1.25, 2])),
b=("time", np.array([True, True, False], dtype=bool)),
c=("time", np.array(["start", "start", "end"], dtype=str)),
)
time = np.array([0, 0.25, 1], dtype=float)
expected = xr.Dataset(data_vars, coords=dict(time=time))
actual = xr.Dataset(
{k: (dim, arr[[0, -1]]) for k, (dim, arr) in data_vars.items()},
coords=dict(time=time[[0, -1]]),
)
actual = actual.interp(time=time, method="linear")
assert_identical(expected, actual)
@requires_scipy
def test_sorted() -> None:
# unsorted non-uniform gridded data
x = np.random.randn(100)
y = np.random.randn(30)
z = np.linspace(0.1, 0.2, 10) * 3.0
da = xr.DataArray(
np.cos(x[:, np.newaxis, np.newaxis]) * np.cos(y[:, np.newaxis]) * z,
dims=["x", "y", "z"],
coords={"x": x, "y": y, "x2": ("x", x**2), "z": z},
)
x_new = np.linspace(0, 1, 30)
y_new = np.linspace(0, 1, 20)
da_sorted = da.sortby("x")
assert_allclose(da.interp(x=x_new), da_sorted.interp(x=x_new, assume_sorted=True))
da_sorted = da.sortby(["x", "y"])
assert_allclose(
da.interp(x=x_new, y=y_new),
da_sorted.interp(x=x_new, y=y_new, assume_sorted=True),
)
with pytest.raises(ValueError):
da.interp(x=[0, 1, 2], assume_sorted=True)
@requires_scipy
def test_dimension_wo_coords() -> None:
da = xr.DataArray(
np.arange(12).reshape(3, 4), dims=["x", "y"], coords={"y": [0, 1, 2, 3]}
)
da_w_coord = da.copy()
da_w_coord["x"] = np.arange(3)
assert_equal(da.interp(x=[0.1, 0.2, 0.3]), da_w_coord.interp(x=[0.1, 0.2, 0.3]))
assert_equal(
da.interp(x=[0.1, 0.2, 0.3], y=[0.5]),
da_w_coord.interp(x=[0.1, 0.2, 0.3], y=[0.5]),
)
@requires_scipy
def test_dataset() -> None:
ds = create_test_data()
ds.attrs["foo"] = "var"
ds["var1"].attrs["buz"] = "var2"
new_dim2 = xr.DataArray([0.11, 0.21, 0.31], dims="z")
interpolated = ds.interp(dim2=new_dim2)
assert_allclose(interpolated["var1"], ds["var1"].interp(dim2=new_dim2))
assert interpolated["var3"].equals(ds["var3"])
# make sure modifying interpolated does not affect the original dataset
interpolated["var1"][:, 1] = 1.0
interpolated["var2"][:, 1] = 1.0
interpolated["var3"][:, 1] = 1.0
assert not interpolated["var1"].equals(ds["var1"])
assert not interpolated["var2"].equals(ds["var2"])
assert not interpolated["var3"].equals(ds["var3"])
# attrs should be kept
assert interpolated.attrs["foo"] == "var"
assert interpolated["var1"].attrs["buz"] == "var2"
@pytest.mark.parametrize("case", [pytest.param(0, id="2D"), pytest.param(3, id="3D")])
def test_interpolate_dimorder(case: int) -> None:
"""Make sure the resultant dimension order is consistent with .sel()"""
if not has_scipy:
pytest.skip("scipy is not installed.")
da = get_example_data(case)
new_x = xr.DataArray([0, 1, 2], dims="x")
assert da.interp(x=new_x).dims == da.sel(x=new_x, method="nearest").dims
new_y = xr.DataArray([0, 1, 2], dims="y")
actual = da.interp(x=new_x, y=new_y).dims
expected = da.sel(x=new_x, y=new_y, method="nearest").dims
assert actual == expected
# reversed order
actual = da.interp(y=new_y, x=new_x).dims
expected = da.sel(y=new_y, x=new_x, method="nearest").dims
assert actual == expected
new_x = xr.DataArray([0, 1, 2], dims="a")
assert da.interp(x=new_x).dims == da.sel(x=new_x, method="nearest").dims
assert da.interp(y=new_x).dims == da.sel(y=new_x, method="nearest").dims
new_y = xr.DataArray([0, 1, 2], dims="a")
actual = da.interp(x=new_x, y=new_y).dims
expected = da.sel(x=new_x, y=new_y, method="nearest").dims
assert actual == expected
new_x = xr.DataArray([[0], [1], [2]], dims=["a", "b"])
assert da.interp(x=new_x).dims == da.sel(x=new_x, method="nearest").dims
assert da.interp(y=new_x).dims == da.sel(y=new_x, method="nearest").dims
if case == 3:
new_x = xr.DataArray([[0], [1], [2]], dims=["a", "b"])
new_z = xr.DataArray([[0], [1], [2]], dims=["a", "b"])
actual = da.interp(x=new_x, z=new_z).dims
expected = da.sel(x=new_x, z=new_z, method="nearest").dims
assert actual == expected
actual = da.interp(z=new_z, x=new_x).dims
expected = da.sel(z=new_z, x=new_x, method="nearest").dims
assert actual == expected
actual = da.interp(x=0.5, z=new_z).dims
expected = da.sel(x=0.5, z=new_z, method="nearest").dims
assert actual == expected
@requires_scipy
def test_interp_like() -> None:
ds = create_test_data()
ds.attrs["foo"] = "var"
ds["var1"].attrs["buz"] = "var2"
other = xr.DataArray(np.random.randn(3), dims=["dim2"], coords={"dim2": [0, 1, 2]})
interpolated = ds.interp_like(other)
assert_allclose(interpolated["var1"], ds["var1"].interp(dim2=other["dim2"]))
assert_allclose(interpolated["var1"], ds["var1"].interp_like(other))
assert interpolated["var3"].equals(ds["var3"])
# attrs should be kept
assert interpolated.attrs["foo"] == "var"
assert interpolated["var1"].attrs["buz"] == "var2"
other = xr.DataArray(
np.random.randn(3), dims=["dim3"], coords={"dim3": ["a", "b", "c"]}
)
actual = ds.interp_like(other)
expected = ds.reindex_like(other)
assert_allclose(actual, expected)
@requires_scipy
@pytest.mark.parametrize(
"x_new, expected",
[
(pd.date_range("2000-01-02", periods=3), [1, 2, 3]),
(
np.array(
[np.datetime64("2000-01-01T12:00"), np.datetime64("2000-01-02T12:00")]
),
[0.5, 1.5],
),
(["2000-01-01T12:00", "2000-01-02T12:00"], [0.5, 1.5]),
(["2000-01-01T12:00", "2000-01-02T12:00", "NaT"], [0.5, 1.5, np.nan]),
(["2000-01-01T12:00"], 0.5),
pytest.param("2000-01-01T12:00", 0.5, marks=pytest.mark.xfail),
],
)
def test_datetime(x_new, expected) -> None:
da = xr.DataArray(
np.arange(24),
dims="time",
coords={"time": pd.date_range("2000-01-01", periods=24)},
)
actual = da.interp(time=x_new)
expected_da = xr.DataArray(
np.atleast_1d(expected),
dims=["time"],
coords={"time": (np.atleast_1d(x_new).astype("datetime64[ns]"))},
)
assert_allclose(actual, expected_da)
@requires_scipy
def test_datetime_single_string() -> None:
da = xr.DataArray(
np.arange(24),
dims="time",
coords={"time": pd.date_range("2000-01-01", periods=24)},
)
actual = da.interp(time="2000-01-01T12:00")
expected = xr.DataArray(0.5)
assert_allclose(actual.drop_vars("time"), expected)
@requires_cftime
@requires_scipy
def test_cftime() -> None:
times = xr.cftime_range("2000", periods=24, freq="D")
da = xr.DataArray(np.arange(24), coords=[times], dims="time")
times_new = xr.cftime_range("2000-01-01T12:00:00", periods=3, freq="D")
actual = da.interp(time=times_new)
expected = xr.DataArray([0.5, 1.5, 2.5], coords=[times_new], dims=["time"])
assert_allclose(actual, expected)
@requires_cftime
@requires_scipy
def test_cftime_type_error() -> None:
times = xr.cftime_range("2000", periods=24, freq="D")
da = xr.DataArray(np.arange(24), coords=[times], dims="time")
times_new = xr.cftime_range(
"2000-01-01T12:00:00", periods=3, freq="D", calendar="noleap"
)
with pytest.raises(TypeError):
da.interp(time=times_new)
@requires_cftime
@requires_scipy
def test_cftime_list_of_strings() -> None:
from cftime import DatetimeProlepticGregorian
times = xr.cftime_range(
"2000", periods=24, freq="D", calendar="proleptic_gregorian"
)
da = xr.DataArray(np.arange(24), coords=[times], dims="time")
times_new = ["2000-01-01T12:00", "2000-01-02T12:00", "2000-01-03T12:00"]
actual = da.interp(time=times_new)
times_new_array = _parse_array_of_cftime_strings(
np.array(times_new), DatetimeProlepticGregorian
)
expected = xr.DataArray([0.5, 1.5, 2.5], coords=[times_new_array], dims=["time"])
assert_allclose(actual, expected)
@requires_cftime
@requires_scipy
def test_cftime_single_string() -> None:
from cftime import DatetimeProlepticGregorian
times = xr.cftime_range(
"2000", periods=24, freq="D", calendar="proleptic_gregorian"
)
da = xr.DataArray(np.arange(24), coords=[times], dims="time")
times_new = "2000-01-01T12:00"
actual = da.interp(time=times_new)
times_new_array = _parse_array_of_cftime_strings(
np.array(times_new), DatetimeProlepticGregorian
)
expected = xr.DataArray(0.5, coords={"time": times_new_array})
assert_allclose(actual, expected)
@requires_scipy
def test_datetime_to_non_datetime_error() -> None:
da = xr.DataArray(
np.arange(24),
dims="time",
coords={"time": pd.date_range("2000-01-01", periods=24)},
)
with pytest.raises(TypeError):
da.interp(time=0.5)
@requires_cftime
@requires_scipy
def test_cftime_to_non_cftime_error() -> None:
times = xr.cftime_range("2000", periods=24, freq="D")
da = xr.DataArray(np.arange(24), coords=[times], dims="time")
with pytest.raises(TypeError):
da.interp(time=0.5)
@requires_scipy
def test_datetime_interp_noerror() -> None:
# GH:2667
a = xr.DataArray(
np.arange(21).reshape(3, 7),
dims=["x", "time"],
coords={
"x": [1, 2, 3],
"time": pd.date_range("01-01-2001", periods=7, freq="D"),
},
)
xi = xr.DataArray(
np.linspace(1, 3, 50),
dims=["time"],
coords={"time": pd.date_range("01-01-2001", periods=50, freq="h")},
)
a.interp(x=xi, time=xi.time) # should not raise an error
@requires_cftime
@requires_scipy
def test_3641() -> None:
times = xr.cftime_range("0001", periods=3, freq="500YE")
da = xr.DataArray(range(3), dims=["time"], coords=[times])
da.interp(time=["0002-05-01"])
@requires_scipy
# cubic, quintic, pchip omitted because not enough points
@pytest.mark.parametrize("method", ("linear", "nearest", "slinear"))
def test_decompose(method: InterpOptions) -> None:
da = xr.DataArray(
np.arange(6).reshape(3, 2),
dims=["x", "y"],
coords={"x": [0, 1, 2], "y": [-0.1, -0.3]},
)
x_new = xr.DataArray([0.5, 1.5, 2.5], dims=["x1"])
y_new = xr.DataArray([-0.15, -0.25], dims=["y1"])
x_broadcast, y_broadcast = xr.broadcast(x_new, y_new)
assert x_broadcast.ndim == 2
actual = da.interp(x=x_new, y=y_new, method=method).drop_vars(("x", "y"))
expected = da.interp(x=x_broadcast, y=y_broadcast, method=method).drop_vars(
("x", "y")
)
assert_allclose(actual, expected)
@requires_scipy
@requires_dask
@pytest.mark.parametrize("method", ("linear", "nearest", "cubic", "pchip", "quintic"))
@pytest.mark.parametrize("chunked", [True, False])
@pytest.mark.parametrize(
"data_ndim,interp_ndim,nscalar",
[
(data_ndim, interp_ndim, nscalar)
for data_ndim in range(1, 4)
for interp_ndim in range(1, data_ndim + 1)
for nscalar in range(interp_ndim + 1)
],
)
@pytest.mark.filterwarnings("ignore:Increasing number of chunks")
def test_interpolate_chunk_1d(
method: InterpOptions, data_ndim, interp_ndim, nscalar, chunked: bool
) -> None:
"""Interpolate nd array with multiple independent indexers
It should do a series of 1d interpolation
"""
if method in ["cubic", "pchip", "quintic"] and interp_ndim == 3:
pytest.skip("Too slow.")
# 3d non chunked data
x = np.linspace(0, 1, 6)
y = np.linspace(2, 4, 7)
z = np.linspace(-0.5, 0.5, 8)
da = xr.DataArray(
data=np.sin(x[:, np.newaxis, np.newaxis])
* np.cos(y[:, np.newaxis])
* np.exp(z),
coords=[("x", x), ("y", y), ("z", z)],
)
# choose the data dimensions
for data_dims in permutations(da.dims, data_ndim):
# select only data_ndim dim
da = da.isel( # take the middle line
{dim: len(da.coords[dim]) // 2 for dim in da.dims if dim not in data_dims}
)
# chunk data
da = da.chunk(chunks={dim: i + 1 for i, dim in enumerate(da.dims)})
# choose the interpolation dimensions
for interp_dims in permutations(da.dims, interp_ndim):
# choose the scalar interpolation dimensions
for scalar_dims in combinations(interp_dims, nscalar):
dest = {}
for dim in interp_dims:
if dim in scalar_dims:
# take the middle point
dest[dim] = 0.5 * (da.coords[dim][0] + da.coords[dim][-1])
else:
# pick some points, including outside the domain
before = 2 * da.coords[dim][0] - da.coords[dim][1]
after = 2 * da.coords[dim][-1] - da.coords[dim][-2]
dest[dim] = cast(
xr.DataArray,
np.linspace(
before.item(), after.item(), len(da.coords[dim]) * 13
),
)
if chunked:
dest[dim] = xr.DataArray(data=dest[dim], dims=[dim])
dest[dim] = dest[dim].chunk(2)
actual = da.interp(method=method, **dest)
expected = da.compute().interp(method=method, **dest)
assert_identical(actual, expected)
# all the combinations are usually not necessary
break
break
break
@requires_scipy
@requires_dask
# quintic omitted because not enough points
@pytest.mark.parametrize("method", ("linear", "nearest", "slinear", "cubic", "pchip"))
@pytest.mark.filterwarnings("ignore:Increasing number of chunks")
def test_interpolate_chunk_advanced(method: InterpOptions) -> None:
"""Interpolate nd array with an nd indexer sharing coordinates."""
# Create original array
x = np.linspace(-1, 1, 5)
y = np.linspace(-1, 1, 7)
z = np.linspace(-1, 1, 11)
t = np.linspace(0, 1, 13)
q = np.linspace(0, 1, 17)
da = xr.DataArray(
data=np.sin(x[:, np.newaxis, np.newaxis, np.newaxis, np.newaxis])
* np.cos(y[:, np.newaxis, np.newaxis, np.newaxis])
* np.exp(z[:, np.newaxis, np.newaxis])
* t[:, np.newaxis]
+ q,
dims=("x", "y", "z", "t", "q"),
coords={"x": x, "y": y, "z": z, "t": t, "q": q, "label": "dummy_attr"},
)
# Create indexer into `da` with shared coordinate ("full-twist" Möbius strip)
theta = np.linspace(0, 2 * np.pi, 5)
w = np.linspace(-0.25, 0.25, 7)
r = xr.DataArray(
data=1 + w[:, np.newaxis] * np.cos(theta),
coords=[("w", w), ("theta", theta)],
)
xda = r * np.cos(theta)
yda = r * np.sin(theta)
zda = xr.DataArray(
data=w[:, np.newaxis] * np.sin(theta),
coords=[("w", w), ("theta", theta)],
)
kwargs = {"fill_value": None}
expected = da.interp(t=0.5, x=xda, y=yda, z=zda, kwargs=kwargs, method=method)
da = da.chunk(2)
xda = xda.chunk(1)
zda = zda.chunk(3)
actual = da.interp(t=0.5, x=xda, y=yda, z=zda, kwargs=kwargs, method=method)
assert_identical(actual, expected)
@requires_scipy
def test_interp1d_bounds_error() -> None:
"""Ensure exception on bounds error is raised if requested"""
da = xr.DataArray(
np.sin(0.3 * np.arange(4)),
[("time", np.arange(4))],
)
with pytest.raises(ValueError):
da.interp(time=3.5, kwargs=dict(bounds_error=True))
# default is to fill with nans, so this should pass
da.interp(time=3.5)
@requires_scipy
@pytest.mark.parametrize(
"x, expect_same_attrs",
[
(2.5, True),
(np.array([2.5, 5]), True),
(("x", np.array([0, 0.5, 1, 2]), dict(unit="s")), False),
],
)
def test_coord_attrs(
x,
expect_same_attrs: bool,
) -> None:
base_attrs = dict(foo="bar")
ds = xr.Dataset(
data_vars=dict(a=2 * np.arange(5)),
coords={"x": ("x", np.arange(5), base_attrs)},
)
has_same_attrs = ds.interp(x=x).x.attrs == base_attrs
assert expect_same_attrs == has_same_attrs
@requires_scipy
def test_interp1d_complex_out_of_bounds() -> None:
"""Ensure complex nans are used by default"""
da = xr.DataArray(
np.exp(0.3j * np.arange(4)),
[("time", np.arange(4))],
)
expected = da.interp(time=3.5, kwargs=dict(fill_value=np.nan + np.nan * 1j))
actual = da.interp(time=3.5)
assert_identical(actual, expected)
@requires_scipy
def test_interp_non_numeric_1d() -> None:
ds = xr.Dataset(
{
"numeric": ("time", 1 + np.arange(0, 4, 1)),
"non_numeric": ("time", np.array(["a", "b", "c", "d"])),
},
coords={"time": (np.arange(0, 4, 1))},
)
actual = ds.interp(time=np.linspace(0, 3, 7))
expected = xr.Dataset(
{
"numeric": ("time", 1 + np.linspace(0, 3, 7)),
"non_numeric": ("time", np.array(["a", "b", "b", "c", "c", "d", "d"])),
},
coords={"time": np.linspace(0, 3, 7)},
)
xr.testing.assert_identical(actual, expected)
@requires_scipy
def test_interp_non_numeric_nd() -> None:
# regression test for GH8099, GH9839
ds = xr.Dataset({"x": ("a", np.arange(4))}, coords={"a": (np.arange(4) - 1.5)})
t = xr.DataArray(
np.random.randn(6).reshape((2, 3)) * 0.5,
dims=["r", "s"],
coords={"r": np.arange(2) - 0.5, "s": np.arange(3) - 1},
)
ds["m"] = ds.x > 1
actual = ds.interp(a=t, method="linear")
# with numeric only
expected = ds[["x"]].interp(a=t, method="linear")
assert_identical(actual[["x"]], expected)
@requires_dask
@requires_scipy
def test_interp_vectorized_dask() -> None:
# Synthetic dataset chunked in the two interpolation dimensions
import dask.array as da
nt = 10
nlat = 20
nlon = 10
nq = 21
ds = xr.Dataset(
data_vars={
"foo": (
("lat", "lon", "dayofyear", "q"),
da.random.random((nlat, nlon, nt, nq), chunks=(10, 10, 10, -1)),
),
"bar": (("lat", "lon"), da.random.random((nlat, nlon), chunks=(10, 10))),
},
coords={
"lat": np.linspace(-89.5, 89.6, nlat),
"lon": np.linspace(-179.5, 179.6, nlon),
"dayofyear": np.arange(0, nt),
"q": np.linspace(0, 1, nq),
},
)
# Interpolate along non-chunked dimension
with raise_if_dask_computes():
actual = ds.interp(q=ds["bar"], kwargs={"fill_value": None})
expected = ds.compute().interp(q=ds["bar"], kwargs={"fill_value": None})
assert_identical(actual, expected)
@requires_scipy
@pytest.mark.parametrize(
"chunk",
[
pytest.param(
True, marks=pytest.mark.skipif(not has_dask, reason="requires_dask")
),
False,
],
)
def test_interp_vectorized_shared_dims(chunk: bool) -> None:
# GH4463
da = xr.DataArray(
[[[1, 2, 3], [2, 3, 4]], [[1, 2, 3], [2, 3, 4]]],
dims=("t", "x", "y"),
coords={"x": [1, 2], "y": [1, 2, 3], "t": [10, 12]},
)
dy = xr.DataArray([1.5, 2.5], dims=("u",), coords={"u": [45, 55]})
dx = xr.DataArray(
[[1.5, 1.5], [1.5, 1.5]], dims=("t", "u"), coords={"u": [45, 55], "t": [10, 12]}
)
if chunk:
da = da.chunk(t=1)
with raise_if_dask_computes():
actual = da.interp(y=dy, x=dx, method="linear")
expected = xr.DataArray(
[[2, 3], [2, 3]],
dims=("t", "u"),
coords={"u": [45, 55], "t": [10, 12], "x": dx, "y": dy},
)
assert_identical(actual, expected)