1169 lines
36 KiB
Python
1169 lines
36 KiB
Python
from __future__ import annotations
|
|
|
|
from itertools import combinations, permutations, product
|
|
from typing import cast, get_args
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import pytest
|
|
|
|
import xarray as xr
|
|
from xarray.coding.cftimeindex import _parse_array_of_cftime_strings
|
|
from xarray.core.types import (
|
|
Interp1dOptions,
|
|
InterpnOptions,
|
|
InterpolantOptions,
|
|
InterpOptions,
|
|
)
|
|
from xarray.tests import (
|
|
assert_allclose,
|
|
assert_equal,
|
|
assert_identical,
|
|
has_dask,
|
|
has_scipy,
|
|
has_scipy_ge_1_13,
|
|
raise_if_dask_computes,
|
|
requires_cftime,
|
|
requires_dask,
|
|
requires_scipy,
|
|
)
|
|
from xarray.tests.test_dataset import create_test_data
|
|
|
|
try:
|
|
import scipy
|
|
except ImportError:
|
|
pass
|
|
|
|
ALL_1D = get_args(Interp1dOptions) + get_args(InterpolantOptions)
|
|
|
|
|
|
def get_example_data(case: int) -> xr.DataArray:
|
|
if case == 0:
|
|
# 2D
|
|
x = np.linspace(0, 1, 100)
|
|
y = np.linspace(0, 0.1, 30)
|
|
return xr.DataArray(
|
|
np.sin(x[:, np.newaxis]) * np.cos(y),
|
|
dims=["x", "y"],
|
|
coords={"x": x, "y": y, "x2": ("x", x**2)},
|
|
)
|
|
elif case == 1:
|
|
# 2D chunked single dim
|
|
return get_example_data(0).chunk({"y": 3})
|
|
elif case == 2:
|
|
# 2D chunked both dims
|
|
return get_example_data(0).chunk({"x": 25, "y": 3})
|
|
elif case == 3:
|
|
# 3D
|
|
x = np.linspace(0, 1, 100)
|
|
y = np.linspace(0, 0.1, 30)
|
|
z = np.linspace(0.1, 0.2, 10)
|
|
return xr.DataArray(
|
|
np.sin(x[:, np.newaxis, np.newaxis]) * np.cos(y[:, np.newaxis]) * z,
|
|
dims=["x", "y", "z"],
|
|
coords={"x": x, "y": y, "x2": ("x", x**2), "z": z},
|
|
)
|
|
elif case == 4:
|
|
# 3D chunked single dim
|
|
# chunksize=5 lets us check whether we rechunk to 1 with quintic
|
|
return get_example_data(3).chunk({"z": 5})
|
|
else:
|
|
raise ValueError("case must be 1-4")
|
|
|
|
|
|
@pytest.fixture
|
|
def nd_interp_coords():
|
|
# interpolation indices for nd interpolation of da from case 3 of get_example_data
|
|
|
|
da = get_example_data(case=3)
|
|
|
|
coords = {}
|
|
# grid -> grid
|
|
coords["xdestnp"] = np.linspace(0.1, 1.0, 11)
|
|
coords["ydestnp"] = np.linspace(0.0, 0.2, 10)
|
|
coords["zdestnp"] = da.z.data
|
|
# list of the points defined by the above mesh in C order
|
|
mesh_x, mesh_y, mesh_z = np.meshgrid(
|
|
coords["xdestnp"], coords["ydestnp"], coords["zdestnp"], indexing="ij"
|
|
)
|
|
coords["grid_grid_points"] = np.column_stack(
|
|
[mesh_x.ravel(), mesh_y.ravel(), mesh_z.ravel()]
|
|
)
|
|
|
|
# grid -> oned
|
|
coords["xdest"] = xr.DataArray(np.linspace(0.1, 1.0, 11), dims="y") # type: ignore[assignment]
|
|
coords["ydest"] = xr.DataArray(np.linspace(0.0, 0.2, 11), dims="y") # type: ignore[assignment]
|
|
coords["zdest"] = da.z
|
|
# grid of the points defined by the oned gridded with zdest in C order
|
|
coords["grid_oned_points"] = np.array(
|
|
[
|
|
(a, b, c)
|
|
for (a, b), c in product(
|
|
zip(coords["xdest"].data, coords["ydest"].data, strict=False),
|
|
coords["zdest"].data,
|
|
)
|
|
]
|
|
)
|
|
|
|
return coords
|
|
|
|
|
|
def test_keywargs():
|
|
if not has_scipy:
|
|
pytest.skip("scipy is not installed.")
|
|
|
|
da = get_example_data(0)
|
|
assert_equal(da.interp(x=[0.5, 0.8]), da.interp({"x": [0.5, 0.8]}))
|
|
|
|
|
|
@pytest.mark.parametrize("method", ["linear", "cubic"])
|
|
@pytest.mark.parametrize("dim", ["x", "y"])
|
|
@pytest.mark.parametrize(
|
|
"case", [pytest.param(0, id="no_chunk"), pytest.param(1, id="chunk_y")]
|
|
)
|
|
def test_interpolate_1d(method: InterpOptions, dim: str, case: int) -> None:
|
|
if not has_scipy:
|
|
pytest.skip("scipy is not installed.")
|
|
|
|
if not has_dask and case in [1]:
|
|
pytest.skip("dask is not installed in the environment.")
|
|
|
|
da = get_example_data(case)
|
|
xdest = np.linspace(0.0, 0.9, 80)
|
|
actual = da.interp(method=method, coords={dim: xdest})
|
|
|
|
# scipy interpolation for the reference
|
|
def func(obj, new_x):
|
|
return scipy.interpolate.interp1d(
|
|
da[dim],
|
|
obj.data,
|
|
axis=obj.get_axis_num(dim),
|
|
bounds_error=False,
|
|
fill_value=np.nan,
|
|
kind=method,
|
|
)(new_x)
|
|
|
|
if dim == "x":
|
|
coords = {"x": xdest, "y": da["y"], "x2": ("x", func(da["x2"], xdest))}
|
|
else: # y
|
|
coords = {"x": da["x"], "y": xdest, "x2": da["x2"]}
|
|
|
|
expected = xr.DataArray(func(da, xdest), dims=["x", "y"], coords=coords)
|
|
assert_allclose(actual, expected)
|
|
|
|
|
|
@pytest.mark.parametrize("method", ["cubic", "zero"])
|
|
def test_interpolate_1d_methods(method: InterpOptions) -> None:
|
|
if not has_scipy:
|
|
pytest.skip("scipy is not installed.")
|
|
|
|
da = get_example_data(0)
|
|
dim = "x"
|
|
xdest = np.linspace(0.0, 0.9, 80)
|
|
|
|
actual = da.interp(method=method, coords={dim: xdest})
|
|
|
|
# scipy interpolation for the reference
|
|
def func(obj, new_x):
|
|
return scipy.interpolate.interp1d(
|
|
da[dim],
|
|
obj.data,
|
|
axis=obj.get_axis_num(dim),
|
|
bounds_error=False,
|
|
fill_value=np.nan,
|
|
kind=method,
|
|
)(new_x)
|
|
|
|
coords = {"x": xdest, "y": da["y"], "x2": ("x", func(da["x2"], xdest))}
|
|
expected = xr.DataArray(func(da, xdest), dims=["x", "y"], coords=coords)
|
|
assert_allclose(actual, expected)
|
|
|
|
|
|
@requires_scipy
|
|
@pytest.mark.parametrize(
|
|
"use_dask, method",
|
|
(
|
|
(False, "linear"),
|
|
(False, "akima"),
|
|
pytest.param(
|
|
False,
|
|
"makima",
|
|
marks=pytest.mark.skipif(not has_scipy_ge_1_13, reason="scipy too old"),
|
|
),
|
|
pytest.param(
|
|
True,
|
|
"linear",
|
|
marks=pytest.mark.skipif(not has_dask, reason="dask not available"),
|
|
),
|
|
pytest.param(
|
|
True,
|
|
"akima",
|
|
marks=pytest.mark.skipif(not has_dask, reason="dask not available"),
|
|
),
|
|
),
|
|
)
|
|
def test_interpolate_vectorize(use_dask: bool, method: InterpOptions) -> None:
|
|
# scipy interpolation for the reference
|
|
def func(obj, dim, new_x, method):
|
|
scipy_kwargs = {}
|
|
interpolant_options = {
|
|
"barycentric": scipy.interpolate.BarycentricInterpolator,
|
|
"krogh": scipy.interpolate.KroghInterpolator,
|
|
"pchip": scipy.interpolate.PchipInterpolator,
|
|
"akima": scipy.interpolate.Akima1DInterpolator,
|
|
"makima": scipy.interpolate.Akima1DInterpolator,
|
|
}
|
|
|
|
shape = [s for i, s in enumerate(obj.shape) if i != obj.get_axis_num(dim)]
|
|
for s in new_x.shape[::-1]:
|
|
shape.insert(obj.get_axis_num(dim), s)
|
|
|
|
if method in interpolant_options:
|
|
interpolant = interpolant_options[method]
|
|
if method == "makima":
|
|
scipy_kwargs["method"] = method
|
|
return interpolant(
|
|
da[dim], obj.data, axis=obj.get_axis_num(dim), **scipy_kwargs
|
|
)(new_x).reshape(shape)
|
|
else:
|
|
return scipy.interpolate.interp1d(
|
|
da[dim],
|
|
obj.data,
|
|
axis=obj.get_axis_num(dim),
|
|
kind=method,
|
|
bounds_error=False,
|
|
fill_value=np.nan,
|
|
**scipy_kwargs,
|
|
)(new_x).reshape(shape)
|
|
|
|
da = get_example_data(0)
|
|
|
|
if use_dask:
|
|
da = da.chunk({"y": 5})
|
|
|
|
# xdest is 1d but has different dimension
|
|
xdest = xr.DataArray(
|
|
np.linspace(0.1, 0.9, 30),
|
|
dims="z",
|
|
coords={"z": np.random.randn(30), "z2": ("z", np.random.randn(30))},
|
|
)
|
|
|
|
actual = da.interp(x=xdest, method=method)
|
|
|
|
expected = xr.DataArray(
|
|
func(da, "x", xdest, method),
|
|
dims=["z", "y"],
|
|
coords={
|
|
"z": xdest["z"],
|
|
"z2": xdest["z2"],
|
|
"y": da["y"],
|
|
"x": ("z", xdest.values),
|
|
"x2": ("z", func(da["x2"], "x", xdest, method)),
|
|
},
|
|
)
|
|
assert_allclose(actual, expected.transpose("z", "y", transpose_coords=True))
|
|
|
|
# xdest is 2d
|
|
xdest = xr.DataArray(
|
|
np.linspace(0.1, 0.9, 30).reshape(6, 5),
|
|
dims=["z", "w"],
|
|
coords={
|
|
"z": np.random.randn(6),
|
|
"w": np.random.randn(5),
|
|
"z2": ("z", np.random.randn(6)),
|
|
},
|
|
)
|
|
|
|
actual = da.interp(x=xdest, method=method)
|
|
|
|
expected = xr.DataArray(
|
|
func(da, "x", xdest, method),
|
|
dims=["z", "w", "y"],
|
|
coords={
|
|
"z": xdest["z"],
|
|
"w": xdest["w"],
|
|
"z2": xdest["z2"],
|
|
"y": da["y"],
|
|
"x": (("z", "w"), xdest.data),
|
|
"x2": (("z", "w"), func(da["x2"], "x", xdest, method)),
|
|
},
|
|
)
|
|
assert_allclose(actual, expected.transpose("z", "w", "y", transpose_coords=True))
|
|
|
|
|
|
@requires_scipy
|
|
@pytest.mark.parametrize("method", get_args(InterpnOptions))
|
|
@pytest.mark.parametrize(
|
|
"case",
|
|
[
|
|
pytest.param(3, id="no_chunk"),
|
|
pytest.param(
|
|
4, id="chunked", marks=pytest.mark.skipif(not has_dask, reason="no dask")
|
|
),
|
|
],
|
|
)
|
|
def test_interpolate_nd(case: int, method: InterpnOptions, nd_interp_coords) -> None:
|
|
da = get_example_data(case)
|
|
|
|
# grid -> grid
|
|
xdestnp = nd_interp_coords["xdestnp"]
|
|
ydestnp = nd_interp_coords["ydestnp"]
|
|
zdestnp = nd_interp_coords["zdestnp"]
|
|
grid_grid_points = nd_interp_coords["grid_grid_points"]
|
|
# the presence/absence of z coordinate may affect nd interpolants, even when the
|
|
# coordinate is unchanged
|
|
# TODO: test this?
|
|
actual = da.interp(x=xdestnp, y=ydestnp, z=zdestnp, method=method)
|
|
expected_data = scipy.interpolate.interpn(
|
|
points=(da.x, da.y, da.z),
|
|
values=da.load().data,
|
|
xi=grid_grid_points,
|
|
method=method,
|
|
bounds_error=False,
|
|
).reshape((len(xdestnp), len(ydestnp), len(zdestnp)))
|
|
expected = xr.DataArray(
|
|
expected_data,
|
|
dims=["x", "y", "z"],
|
|
coords={
|
|
"x": xdestnp,
|
|
"y": ydestnp,
|
|
"z": zdestnp,
|
|
"x2": da["x2"].interp(x=xdestnp, method=method),
|
|
},
|
|
)
|
|
assert_allclose(actual.transpose("x", "y", "z"), expected.transpose("x", "y", "z"))
|
|
|
|
# grid -> 1d-sample
|
|
xdest = nd_interp_coords["xdest"]
|
|
ydest = nd_interp_coords["ydest"]
|
|
zdest = nd_interp_coords["zdest"]
|
|
grid_oned_points = nd_interp_coords["grid_oned_points"]
|
|
actual = da.interp(x=xdest, y=ydest, z=zdest, method=method)
|
|
expected_data = scipy.interpolate.interpn(
|
|
points=(da.x, da.y, da.z),
|
|
values=da.data,
|
|
xi=grid_oned_points,
|
|
method=method,
|
|
bounds_error=False,
|
|
).reshape([len(xdest), len(zdest)])
|
|
expected = xr.DataArray(
|
|
expected_data,
|
|
dims=["y", "z"],
|
|
coords={
|
|
"y": ydest,
|
|
"z": zdest,
|
|
"x": ("y", xdest.values),
|
|
"x2": da["x2"].interp(x=xdest, method=method),
|
|
},
|
|
)
|
|
|
|
assert_allclose(actual.transpose("y", "z"), expected)
|
|
|
|
# reversed order
|
|
actual = da.interp(y=ydest, x=xdest, z=zdest, method=method)
|
|
assert_allclose(actual.transpose("y", "z"), expected)
|
|
|
|
|
|
@requires_scipy
|
|
# omit cubic, pchip, quintic because not enough points
|
|
@pytest.mark.parametrize("method", ("linear", "nearest", "slinear"))
|
|
def test_interpolate_nd_nd(method: InterpnOptions) -> None:
|
|
"""Interpolate nd array with an nd indexer sharing coordinates."""
|
|
# Create original array
|
|
a = [0, 2]
|
|
x = [0, 1, 2]
|
|
values = np.arange(6).reshape(2, 3)
|
|
da = xr.DataArray(values, dims=("a", "x"), coords={"a": a, "x": x})
|
|
|
|
# Create indexer into `a` with dimensions (y, x)
|
|
y = [10]
|
|
a_targets = [1, 2, 2]
|
|
c = {"x": x, "y": y}
|
|
ia = xr.DataArray([a_targets], dims=("y", "x"), coords=c)
|
|
out = da.interp(a=ia, method=method)
|
|
|
|
expected_xi = list(zip(a_targets, x, strict=False))
|
|
expected_vec = scipy.interpolate.interpn(
|
|
points=(a, x), values=values, xi=expected_xi, method=method
|
|
)
|
|
expected = xr.DataArray([expected_vec], dims=("y", "x"), coords=c)
|
|
xr.testing.assert_allclose(out.drop_vars("a"), expected)
|
|
|
|
# If the *shared* indexing coordinates do not match, interp should fail.
|
|
with pytest.raises(ValueError):
|
|
c = {"x": [1], "y": y}
|
|
ia = xr.DataArray([[1]], dims=("y", "x"), coords=c)
|
|
da.interp(a=ia)
|
|
|
|
with pytest.raises(ValueError):
|
|
c = {"x": [5, 6, 7], "y": y}
|
|
ia = xr.DataArray([[1]], dims=("y", "x"), coords=c)
|
|
da.interp(a=ia)
|
|
|
|
|
|
@requires_scipy
|
|
@pytest.mark.filterwarnings("ignore:All-NaN slice")
|
|
def test_interpolate_nd_with_nan() -> None:
|
|
"""Interpolate an array with an nd indexer and `NaN` values."""
|
|
|
|
# Create indexer into `a` with dimensions (y, x)
|
|
x = [0, 1, 2]
|
|
y = [10, 20]
|
|
c = {"x": x, "y": y}
|
|
a = np.arange(6, dtype=float).reshape(2, 3)
|
|
a[0, 1] = np.nan
|
|
ia = xr.DataArray(a, dims=("y", "x"), coords=c)
|
|
|
|
da = xr.DataArray([1, 2, 2], dims=("a"), coords={"a": [0, 2, 4]})
|
|
out = da.interp(a=ia)
|
|
expected = xr.DataArray(
|
|
[[1.0, np.nan, 2.0], [2.0, 2.0, np.nan]], dims=("y", "x"), coords=c
|
|
)
|
|
xr.testing.assert_allclose(out.drop_vars("a"), expected)
|
|
|
|
db = 2 * da
|
|
ds = xr.Dataset({"da": da, "db": db})
|
|
out2 = ds.interp(a=ia)
|
|
expected_ds = xr.Dataset({"da": expected, "db": 2 * expected})
|
|
xr.testing.assert_allclose(out2.drop_vars("a"), expected_ds)
|
|
|
|
|
|
@requires_scipy
|
|
@pytest.mark.parametrize("method", ("linear",))
|
|
@pytest.mark.parametrize(
|
|
"case", [pytest.param(0, id="no_chunk"), pytest.param(1, id="chunk_y")]
|
|
)
|
|
def test_interpolate_scalar(method: InterpOptions, case: int) -> None:
|
|
if not has_dask and case in [1]:
|
|
pytest.skip("dask is not installed in the environment.")
|
|
|
|
da = get_example_data(case)
|
|
xdest = 0.4
|
|
|
|
actual = da.interp(x=xdest, method=method)
|
|
|
|
# scipy interpolation for the reference
|
|
def func(obj, new_x):
|
|
return scipy.interpolate.interp1d(
|
|
da["x"],
|
|
obj.data,
|
|
axis=obj.get_axis_num("x"),
|
|
bounds_error=False,
|
|
fill_value=np.nan,
|
|
kind=method,
|
|
)(new_x)
|
|
|
|
coords = {"x": xdest, "y": da["y"], "x2": func(da["x2"], xdest)}
|
|
expected = xr.DataArray(func(da, xdest), dims=["y"], coords=coords)
|
|
assert_allclose(actual, expected)
|
|
|
|
|
|
@requires_scipy
|
|
@pytest.mark.parametrize("method", ("linear",))
|
|
@pytest.mark.parametrize(
|
|
"case", [pytest.param(3, id="no_chunk"), pytest.param(4, id="chunked")]
|
|
)
|
|
def test_interpolate_nd_scalar(method: InterpOptions, case: int) -> None:
|
|
if not has_dask and case in [4]:
|
|
pytest.skip("dask is not installed in the environment.")
|
|
|
|
da = get_example_data(case)
|
|
xdest = 0.4
|
|
ydest = 0.05
|
|
zdest = da.get_index("z")
|
|
|
|
actual = da.interp(x=xdest, y=ydest, z=zdest, method=method)
|
|
# scipy interpolation for the reference
|
|
expected_data = scipy.interpolate.RegularGridInterpolator(
|
|
(da["x"], da["y"], da["z"]),
|
|
da.transpose("x", "y", "z").values,
|
|
method=method,
|
|
bounds_error=False,
|
|
fill_value=np.nan,
|
|
)(np.asarray([(xdest, ydest, z_val) for z_val in zdest]))
|
|
|
|
coords = {
|
|
"x": xdest,
|
|
"y": ydest,
|
|
"x2": da["x2"].interp(x=xdest, method=method),
|
|
"z": da["z"],
|
|
}
|
|
expected = xr.DataArray(expected_data, dims=["z"], coords=coords)
|
|
assert_allclose(actual, expected)
|
|
|
|
|
|
@pytest.mark.parametrize("use_dask", [True, False])
|
|
def test_nans(use_dask: bool) -> None:
|
|
if not has_scipy:
|
|
pytest.skip("scipy is not installed.")
|
|
|
|
da = xr.DataArray([0, 1, np.nan, 2], dims="x", coords={"x": range(4)})
|
|
|
|
if not has_dask and use_dask:
|
|
pytest.skip("dask is not installed in the environment.")
|
|
da = da.chunk()
|
|
|
|
actual = da.interp(x=[0.5, 1.5])
|
|
# not all values are nan
|
|
assert actual.count() > 0
|
|
|
|
|
|
@requires_scipy
|
|
@pytest.mark.parametrize("use_dask", [True, False])
|
|
def test_errors(use_dask: bool) -> None:
|
|
# spline is unavailable
|
|
da = xr.DataArray([0, 1, np.nan, 2], dims="x", coords={"x": range(4)})
|
|
if not has_dask and use_dask:
|
|
pytest.skip("dask is not installed in the environment.")
|
|
da = da.chunk()
|
|
|
|
for method in ["spline"]:
|
|
with pytest.raises(ValueError), pytest.warns(PendingDeprecationWarning):
|
|
da.interp(x=[0.5, 1.5], method=method) # type: ignore[arg-type]
|
|
|
|
# not sorted
|
|
if use_dask:
|
|
da = get_example_data(3)
|
|
else:
|
|
da = get_example_data(0)
|
|
|
|
result = da.interp(x=[-1, 1, 3], kwargs={"fill_value": 0.0})
|
|
assert not np.isnan(result.values).any()
|
|
result = da.interp(x=[-1, 1, 3])
|
|
assert np.isnan(result.values).any()
|
|
|
|
# invalid method
|
|
with pytest.raises(ValueError):
|
|
da.interp(x=[2, 0], method="boo") # type: ignore[arg-type]
|
|
with pytest.raises(ValueError):
|
|
da.interp(y=[2, 0], method="boo") # type: ignore[arg-type]
|
|
|
|
# object-type DataArray cannot be interpolated
|
|
da = xr.DataArray(["a", "b", "c"], dims="x", coords={"x": [0, 1, 2]})
|
|
with pytest.raises(TypeError):
|
|
da.interp(x=0)
|
|
|
|
|
|
@requires_scipy
|
|
def test_dtype() -> None:
|
|
data_vars = dict(
|
|
a=("time", np.array([1, 1.25, 2])),
|
|
b=("time", np.array([True, True, False], dtype=bool)),
|
|
c=("time", np.array(["start", "start", "end"], dtype=str)),
|
|
)
|
|
time = np.array([0, 0.25, 1], dtype=float)
|
|
expected = xr.Dataset(data_vars, coords=dict(time=time))
|
|
actual = xr.Dataset(
|
|
{k: (dim, arr[[0, -1]]) for k, (dim, arr) in data_vars.items()},
|
|
coords=dict(time=time[[0, -1]]),
|
|
)
|
|
actual = actual.interp(time=time, method="linear")
|
|
assert_identical(expected, actual)
|
|
|
|
|
|
@requires_scipy
|
|
def test_sorted() -> None:
|
|
# unsorted non-uniform gridded data
|
|
x = np.random.randn(100)
|
|
y = np.random.randn(30)
|
|
z = np.linspace(0.1, 0.2, 10) * 3.0
|
|
da = xr.DataArray(
|
|
np.cos(x[:, np.newaxis, np.newaxis]) * np.cos(y[:, np.newaxis]) * z,
|
|
dims=["x", "y", "z"],
|
|
coords={"x": x, "y": y, "x2": ("x", x**2), "z": z},
|
|
)
|
|
|
|
x_new = np.linspace(0, 1, 30)
|
|
y_new = np.linspace(0, 1, 20)
|
|
|
|
da_sorted = da.sortby("x")
|
|
assert_allclose(da.interp(x=x_new), da_sorted.interp(x=x_new, assume_sorted=True))
|
|
da_sorted = da.sortby(["x", "y"])
|
|
assert_allclose(
|
|
da.interp(x=x_new, y=y_new),
|
|
da_sorted.interp(x=x_new, y=y_new, assume_sorted=True),
|
|
)
|
|
|
|
with pytest.raises(ValueError):
|
|
da.interp(x=[0, 1, 2], assume_sorted=True)
|
|
|
|
|
|
@requires_scipy
|
|
def test_dimension_wo_coords() -> None:
|
|
da = xr.DataArray(
|
|
np.arange(12).reshape(3, 4), dims=["x", "y"], coords={"y": [0, 1, 2, 3]}
|
|
)
|
|
da_w_coord = da.copy()
|
|
da_w_coord["x"] = np.arange(3)
|
|
|
|
assert_equal(da.interp(x=[0.1, 0.2, 0.3]), da_w_coord.interp(x=[0.1, 0.2, 0.3]))
|
|
assert_equal(
|
|
da.interp(x=[0.1, 0.2, 0.3], y=[0.5]),
|
|
da_w_coord.interp(x=[0.1, 0.2, 0.3], y=[0.5]),
|
|
)
|
|
|
|
|
|
@requires_scipy
|
|
def test_dataset() -> None:
|
|
ds = create_test_data()
|
|
ds.attrs["foo"] = "var"
|
|
ds["var1"].attrs["buz"] = "var2"
|
|
new_dim2 = xr.DataArray([0.11, 0.21, 0.31], dims="z")
|
|
interpolated = ds.interp(dim2=new_dim2)
|
|
|
|
assert_allclose(interpolated["var1"], ds["var1"].interp(dim2=new_dim2))
|
|
assert interpolated["var3"].equals(ds["var3"])
|
|
|
|
# make sure modifying interpolated does not affect the original dataset
|
|
interpolated["var1"][:, 1] = 1.0
|
|
interpolated["var2"][:, 1] = 1.0
|
|
interpolated["var3"][:, 1] = 1.0
|
|
|
|
assert not interpolated["var1"].equals(ds["var1"])
|
|
assert not interpolated["var2"].equals(ds["var2"])
|
|
assert not interpolated["var3"].equals(ds["var3"])
|
|
# attrs should be kept
|
|
assert interpolated.attrs["foo"] == "var"
|
|
assert interpolated["var1"].attrs["buz"] == "var2"
|
|
|
|
|
|
@pytest.mark.parametrize("case", [pytest.param(0, id="2D"), pytest.param(3, id="3D")])
|
|
def test_interpolate_dimorder(case: int) -> None:
|
|
"""Make sure the resultant dimension order is consistent with .sel()"""
|
|
if not has_scipy:
|
|
pytest.skip("scipy is not installed.")
|
|
|
|
da = get_example_data(case)
|
|
|
|
new_x = xr.DataArray([0, 1, 2], dims="x")
|
|
assert da.interp(x=new_x).dims == da.sel(x=new_x, method="nearest").dims
|
|
|
|
new_y = xr.DataArray([0, 1, 2], dims="y")
|
|
actual = da.interp(x=new_x, y=new_y).dims
|
|
expected = da.sel(x=new_x, y=new_y, method="nearest").dims
|
|
assert actual == expected
|
|
# reversed order
|
|
actual = da.interp(y=new_y, x=new_x).dims
|
|
expected = da.sel(y=new_y, x=new_x, method="nearest").dims
|
|
assert actual == expected
|
|
|
|
new_x = xr.DataArray([0, 1, 2], dims="a")
|
|
assert da.interp(x=new_x).dims == da.sel(x=new_x, method="nearest").dims
|
|
assert da.interp(y=new_x).dims == da.sel(y=new_x, method="nearest").dims
|
|
new_y = xr.DataArray([0, 1, 2], dims="a")
|
|
actual = da.interp(x=new_x, y=new_y).dims
|
|
expected = da.sel(x=new_x, y=new_y, method="nearest").dims
|
|
assert actual == expected
|
|
|
|
new_x = xr.DataArray([[0], [1], [2]], dims=["a", "b"])
|
|
assert da.interp(x=new_x).dims == da.sel(x=new_x, method="nearest").dims
|
|
assert da.interp(y=new_x).dims == da.sel(y=new_x, method="nearest").dims
|
|
|
|
if case == 3:
|
|
new_x = xr.DataArray([[0], [1], [2]], dims=["a", "b"])
|
|
new_z = xr.DataArray([[0], [1], [2]], dims=["a", "b"])
|
|
actual = da.interp(x=new_x, z=new_z).dims
|
|
expected = da.sel(x=new_x, z=new_z, method="nearest").dims
|
|
assert actual == expected
|
|
|
|
actual = da.interp(z=new_z, x=new_x).dims
|
|
expected = da.sel(z=new_z, x=new_x, method="nearest").dims
|
|
assert actual == expected
|
|
|
|
actual = da.interp(x=0.5, z=new_z).dims
|
|
expected = da.sel(x=0.5, z=new_z, method="nearest").dims
|
|
assert actual == expected
|
|
|
|
|
|
@requires_scipy
|
|
def test_interp_like() -> None:
|
|
ds = create_test_data()
|
|
ds.attrs["foo"] = "var"
|
|
ds["var1"].attrs["buz"] = "var2"
|
|
|
|
other = xr.DataArray(np.random.randn(3), dims=["dim2"], coords={"dim2": [0, 1, 2]})
|
|
interpolated = ds.interp_like(other)
|
|
|
|
assert_allclose(interpolated["var1"], ds["var1"].interp(dim2=other["dim2"]))
|
|
assert_allclose(interpolated["var1"], ds["var1"].interp_like(other))
|
|
assert interpolated["var3"].equals(ds["var3"])
|
|
|
|
# attrs should be kept
|
|
assert interpolated.attrs["foo"] == "var"
|
|
assert interpolated["var1"].attrs["buz"] == "var2"
|
|
|
|
other = xr.DataArray(
|
|
np.random.randn(3), dims=["dim3"], coords={"dim3": ["a", "b", "c"]}
|
|
)
|
|
|
|
actual = ds.interp_like(other)
|
|
expected = ds.reindex_like(other)
|
|
assert_allclose(actual, expected)
|
|
|
|
|
|
@requires_scipy
|
|
@pytest.mark.parametrize(
|
|
"x_new, expected",
|
|
[
|
|
(pd.date_range("2000-01-02", periods=3), [1, 2, 3]),
|
|
(
|
|
np.array(
|
|
[np.datetime64("2000-01-01T12:00"), np.datetime64("2000-01-02T12:00")]
|
|
),
|
|
[0.5, 1.5],
|
|
),
|
|
(["2000-01-01T12:00", "2000-01-02T12:00"], [0.5, 1.5]),
|
|
(["2000-01-01T12:00", "2000-01-02T12:00", "NaT"], [0.5, 1.5, np.nan]),
|
|
(["2000-01-01T12:00"], 0.5),
|
|
pytest.param("2000-01-01T12:00", 0.5, marks=pytest.mark.xfail),
|
|
],
|
|
)
|
|
def test_datetime(x_new, expected) -> None:
|
|
da = xr.DataArray(
|
|
np.arange(24),
|
|
dims="time",
|
|
coords={"time": pd.date_range("2000-01-01", periods=24)},
|
|
)
|
|
|
|
actual = da.interp(time=x_new)
|
|
expected_da = xr.DataArray(
|
|
np.atleast_1d(expected),
|
|
dims=["time"],
|
|
coords={"time": (np.atleast_1d(x_new).astype("datetime64[ns]"))},
|
|
)
|
|
|
|
assert_allclose(actual, expected_da)
|
|
|
|
|
|
@requires_scipy
|
|
def test_datetime_single_string() -> None:
|
|
da = xr.DataArray(
|
|
np.arange(24),
|
|
dims="time",
|
|
coords={"time": pd.date_range("2000-01-01", periods=24)},
|
|
)
|
|
actual = da.interp(time="2000-01-01T12:00")
|
|
expected = xr.DataArray(0.5)
|
|
|
|
assert_allclose(actual.drop_vars("time"), expected)
|
|
|
|
|
|
@requires_cftime
|
|
@requires_scipy
|
|
def test_cftime() -> None:
|
|
times = xr.cftime_range("2000", periods=24, freq="D")
|
|
da = xr.DataArray(np.arange(24), coords=[times], dims="time")
|
|
|
|
times_new = xr.cftime_range("2000-01-01T12:00:00", periods=3, freq="D")
|
|
actual = da.interp(time=times_new)
|
|
expected = xr.DataArray([0.5, 1.5, 2.5], coords=[times_new], dims=["time"])
|
|
|
|
assert_allclose(actual, expected)
|
|
|
|
|
|
@requires_cftime
|
|
@requires_scipy
|
|
def test_cftime_type_error() -> None:
|
|
times = xr.cftime_range("2000", periods=24, freq="D")
|
|
da = xr.DataArray(np.arange(24), coords=[times], dims="time")
|
|
|
|
times_new = xr.cftime_range(
|
|
"2000-01-01T12:00:00", periods=3, freq="D", calendar="noleap"
|
|
)
|
|
with pytest.raises(TypeError):
|
|
da.interp(time=times_new)
|
|
|
|
|
|
@requires_cftime
|
|
@requires_scipy
|
|
def test_cftime_list_of_strings() -> None:
|
|
from cftime import DatetimeProlepticGregorian
|
|
|
|
times = xr.cftime_range(
|
|
"2000", periods=24, freq="D", calendar="proleptic_gregorian"
|
|
)
|
|
da = xr.DataArray(np.arange(24), coords=[times], dims="time")
|
|
|
|
times_new = ["2000-01-01T12:00", "2000-01-02T12:00", "2000-01-03T12:00"]
|
|
actual = da.interp(time=times_new)
|
|
|
|
times_new_array = _parse_array_of_cftime_strings(
|
|
np.array(times_new), DatetimeProlepticGregorian
|
|
)
|
|
expected = xr.DataArray([0.5, 1.5, 2.5], coords=[times_new_array], dims=["time"])
|
|
|
|
assert_allclose(actual, expected)
|
|
|
|
|
|
@requires_cftime
|
|
@requires_scipy
|
|
def test_cftime_single_string() -> None:
|
|
from cftime import DatetimeProlepticGregorian
|
|
|
|
times = xr.cftime_range(
|
|
"2000", periods=24, freq="D", calendar="proleptic_gregorian"
|
|
)
|
|
da = xr.DataArray(np.arange(24), coords=[times], dims="time")
|
|
|
|
times_new = "2000-01-01T12:00"
|
|
actual = da.interp(time=times_new)
|
|
|
|
times_new_array = _parse_array_of_cftime_strings(
|
|
np.array(times_new), DatetimeProlepticGregorian
|
|
)
|
|
expected = xr.DataArray(0.5, coords={"time": times_new_array})
|
|
|
|
assert_allclose(actual, expected)
|
|
|
|
|
|
@requires_scipy
|
|
def test_datetime_to_non_datetime_error() -> None:
|
|
da = xr.DataArray(
|
|
np.arange(24),
|
|
dims="time",
|
|
coords={"time": pd.date_range("2000-01-01", periods=24)},
|
|
)
|
|
with pytest.raises(TypeError):
|
|
da.interp(time=0.5)
|
|
|
|
|
|
@requires_cftime
|
|
@requires_scipy
|
|
def test_cftime_to_non_cftime_error() -> None:
|
|
times = xr.cftime_range("2000", periods=24, freq="D")
|
|
da = xr.DataArray(np.arange(24), coords=[times], dims="time")
|
|
|
|
with pytest.raises(TypeError):
|
|
da.interp(time=0.5)
|
|
|
|
|
|
@requires_scipy
|
|
def test_datetime_interp_noerror() -> None:
|
|
# GH:2667
|
|
a = xr.DataArray(
|
|
np.arange(21).reshape(3, 7),
|
|
dims=["x", "time"],
|
|
coords={
|
|
"x": [1, 2, 3],
|
|
"time": pd.date_range("01-01-2001", periods=7, freq="D"),
|
|
},
|
|
)
|
|
xi = xr.DataArray(
|
|
np.linspace(1, 3, 50),
|
|
dims=["time"],
|
|
coords={"time": pd.date_range("01-01-2001", periods=50, freq="h")},
|
|
)
|
|
a.interp(x=xi, time=xi.time) # should not raise an error
|
|
|
|
|
|
@requires_cftime
|
|
@requires_scipy
|
|
def test_3641() -> None:
|
|
times = xr.cftime_range("0001", periods=3, freq="500YE")
|
|
da = xr.DataArray(range(3), dims=["time"], coords=[times])
|
|
da.interp(time=["0002-05-01"])
|
|
|
|
|
|
@requires_scipy
|
|
# cubic, quintic, pchip omitted because not enough points
|
|
@pytest.mark.parametrize("method", ("linear", "nearest", "slinear"))
|
|
def test_decompose(method: InterpOptions) -> None:
|
|
da = xr.DataArray(
|
|
np.arange(6).reshape(3, 2),
|
|
dims=["x", "y"],
|
|
coords={"x": [0, 1, 2], "y": [-0.1, -0.3]},
|
|
)
|
|
x_new = xr.DataArray([0.5, 1.5, 2.5], dims=["x1"])
|
|
y_new = xr.DataArray([-0.15, -0.25], dims=["y1"])
|
|
x_broadcast, y_broadcast = xr.broadcast(x_new, y_new)
|
|
assert x_broadcast.ndim == 2
|
|
|
|
actual = da.interp(x=x_new, y=y_new, method=method).drop_vars(("x", "y"))
|
|
expected = da.interp(x=x_broadcast, y=y_broadcast, method=method).drop_vars(
|
|
("x", "y")
|
|
)
|
|
assert_allclose(actual, expected)
|
|
|
|
|
|
@requires_scipy
|
|
@requires_dask
|
|
@pytest.mark.parametrize("method", ("linear", "nearest", "cubic", "pchip", "quintic"))
|
|
@pytest.mark.parametrize("chunked", [True, False])
|
|
@pytest.mark.parametrize(
|
|
"data_ndim,interp_ndim,nscalar",
|
|
[
|
|
(data_ndim, interp_ndim, nscalar)
|
|
for data_ndim in range(1, 4)
|
|
for interp_ndim in range(1, data_ndim + 1)
|
|
for nscalar in range(interp_ndim + 1)
|
|
],
|
|
)
|
|
@pytest.mark.filterwarnings("ignore:Increasing number of chunks")
|
|
def test_interpolate_chunk_1d(
|
|
method: InterpOptions, data_ndim, interp_ndim, nscalar, chunked: bool
|
|
) -> None:
|
|
"""Interpolate nd array with multiple independent indexers
|
|
|
|
It should do a series of 1d interpolation
|
|
"""
|
|
|
|
if method in ["cubic", "pchip", "quintic"] and interp_ndim == 3:
|
|
pytest.skip("Too slow.")
|
|
|
|
# 3d non chunked data
|
|
x = np.linspace(0, 1, 6)
|
|
y = np.linspace(2, 4, 7)
|
|
z = np.linspace(-0.5, 0.5, 8)
|
|
da = xr.DataArray(
|
|
data=np.sin(x[:, np.newaxis, np.newaxis])
|
|
* np.cos(y[:, np.newaxis])
|
|
* np.exp(z),
|
|
coords=[("x", x), ("y", y), ("z", z)],
|
|
)
|
|
|
|
# choose the data dimensions
|
|
for data_dims in permutations(da.dims, data_ndim):
|
|
# select only data_ndim dim
|
|
da = da.isel( # take the middle line
|
|
{dim: len(da.coords[dim]) // 2 for dim in da.dims if dim not in data_dims}
|
|
)
|
|
|
|
# chunk data
|
|
da = da.chunk(chunks={dim: i + 1 for i, dim in enumerate(da.dims)})
|
|
|
|
# choose the interpolation dimensions
|
|
for interp_dims in permutations(da.dims, interp_ndim):
|
|
# choose the scalar interpolation dimensions
|
|
for scalar_dims in combinations(interp_dims, nscalar):
|
|
dest = {}
|
|
for dim in interp_dims:
|
|
if dim in scalar_dims:
|
|
# take the middle point
|
|
dest[dim] = 0.5 * (da.coords[dim][0] + da.coords[dim][-1])
|
|
else:
|
|
# pick some points, including outside the domain
|
|
before = 2 * da.coords[dim][0] - da.coords[dim][1]
|
|
after = 2 * da.coords[dim][-1] - da.coords[dim][-2]
|
|
|
|
dest[dim] = cast(
|
|
xr.DataArray,
|
|
np.linspace(
|
|
before.item(), after.item(), len(da.coords[dim]) * 13
|
|
),
|
|
)
|
|
if chunked:
|
|
dest[dim] = xr.DataArray(data=dest[dim], dims=[dim])
|
|
dest[dim] = dest[dim].chunk(2)
|
|
actual = da.interp(method=method, **dest)
|
|
expected = da.compute().interp(method=method, **dest)
|
|
|
|
assert_identical(actual, expected)
|
|
|
|
# all the combinations are usually not necessary
|
|
break
|
|
break
|
|
break
|
|
|
|
|
|
@requires_scipy
|
|
@requires_dask
|
|
# quintic omitted because not enough points
|
|
@pytest.mark.parametrize("method", ("linear", "nearest", "slinear", "cubic", "pchip"))
|
|
@pytest.mark.filterwarnings("ignore:Increasing number of chunks")
|
|
def test_interpolate_chunk_advanced(method: InterpOptions) -> None:
|
|
"""Interpolate nd array with an nd indexer sharing coordinates."""
|
|
# Create original array
|
|
x = np.linspace(-1, 1, 5)
|
|
y = np.linspace(-1, 1, 7)
|
|
z = np.linspace(-1, 1, 11)
|
|
t = np.linspace(0, 1, 13)
|
|
q = np.linspace(0, 1, 17)
|
|
da = xr.DataArray(
|
|
data=np.sin(x[:, np.newaxis, np.newaxis, np.newaxis, np.newaxis])
|
|
* np.cos(y[:, np.newaxis, np.newaxis, np.newaxis])
|
|
* np.exp(z[:, np.newaxis, np.newaxis])
|
|
* t[:, np.newaxis]
|
|
+ q,
|
|
dims=("x", "y", "z", "t", "q"),
|
|
coords={"x": x, "y": y, "z": z, "t": t, "q": q, "label": "dummy_attr"},
|
|
)
|
|
|
|
# Create indexer into `da` with shared coordinate ("full-twist" Möbius strip)
|
|
theta = np.linspace(0, 2 * np.pi, 5)
|
|
w = np.linspace(-0.25, 0.25, 7)
|
|
r = xr.DataArray(
|
|
data=1 + w[:, np.newaxis] * np.cos(theta),
|
|
coords=[("w", w), ("theta", theta)],
|
|
)
|
|
|
|
xda = r * np.cos(theta)
|
|
yda = r * np.sin(theta)
|
|
zda = xr.DataArray(
|
|
data=w[:, np.newaxis] * np.sin(theta),
|
|
coords=[("w", w), ("theta", theta)],
|
|
)
|
|
|
|
kwargs = {"fill_value": None}
|
|
expected = da.interp(t=0.5, x=xda, y=yda, z=zda, kwargs=kwargs, method=method)
|
|
|
|
da = da.chunk(2)
|
|
xda = xda.chunk(1)
|
|
zda = zda.chunk(3)
|
|
actual = da.interp(t=0.5, x=xda, y=yda, z=zda, kwargs=kwargs, method=method)
|
|
assert_identical(actual, expected)
|
|
|
|
|
|
@requires_scipy
|
|
def test_interp1d_bounds_error() -> None:
|
|
"""Ensure exception on bounds error is raised if requested"""
|
|
da = xr.DataArray(
|
|
np.sin(0.3 * np.arange(4)),
|
|
[("time", np.arange(4))],
|
|
)
|
|
|
|
with pytest.raises(ValueError):
|
|
da.interp(time=3.5, kwargs=dict(bounds_error=True))
|
|
|
|
# default is to fill with nans, so this should pass
|
|
da.interp(time=3.5)
|
|
|
|
|
|
@requires_scipy
|
|
@pytest.mark.parametrize(
|
|
"x, expect_same_attrs",
|
|
[
|
|
(2.5, True),
|
|
(np.array([2.5, 5]), True),
|
|
(("x", np.array([0, 0.5, 1, 2]), dict(unit="s")), False),
|
|
],
|
|
)
|
|
def test_coord_attrs(
|
|
x,
|
|
expect_same_attrs: bool,
|
|
) -> None:
|
|
base_attrs = dict(foo="bar")
|
|
ds = xr.Dataset(
|
|
data_vars=dict(a=2 * np.arange(5)),
|
|
coords={"x": ("x", np.arange(5), base_attrs)},
|
|
)
|
|
|
|
has_same_attrs = ds.interp(x=x).x.attrs == base_attrs
|
|
assert expect_same_attrs == has_same_attrs
|
|
|
|
|
|
@requires_scipy
|
|
def test_interp1d_complex_out_of_bounds() -> None:
|
|
"""Ensure complex nans are used by default"""
|
|
da = xr.DataArray(
|
|
np.exp(0.3j * np.arange(4)),
|
|
[("time", np.arange(4))],
|
|
)
|
|
|
|
expected = da.interp(time=3.5, kwargs=dict(fill_value=np.nan + np.nan * 1j))
|
|
actual = da.interp(time=3.5)
|
|
assert_identical(actual, expected)
|
|
|
|
|
|
@requires_scipy
|
|
def test_interp_non_numeric_1d() -> None:
|
|
ds = xr.Dataset(
|
|
{
|
|
"numeric": ("time", 1 + np.arange(0, 4, 1)),
|
|
"non_numeric": ("time", np.array(["a", "b", "c", "d"])),
|
|
},
|
|
coords={"time": (np.arange(0, 4, 1))},
|
|
)
|
|
actual = ds.interp(time=np.linspace(0, 3, 7))
|
|
|
|
expected = xr.Dataset(
|
|
{
|
|
"numeric": ("time", 1 + np.linspace(0, 3, 7)),
|
|
"non_numeric": ("time", np.array(["a", "b", "b", "c", "c", "d", "d"])),
|
|
},
|
|
coords={"time": np.linspace(0, 3, 7)},
|
|
)
|
|
xr.testing.assert_identical(actual, expected)
|
|
|
|
|
|
@requires_scipy
|
|
def test_interp_non_numeric_nd() -> None:
|
|
# regression test for GH8099, GH9839
|
|
ds = xr.Dataset({"x": ("a", np.arange(4))}, coords={"a": (np.arange(4) - 1.5)})
|
|
t = xr.DataArray(
|
|
np.random.randn(6).reshape((2, 3)) * 0.5,
|
|
dims=["r", "s"],
|
|
coords={"r": np.arange(2) - 0.5, "s": np.arange(3) - 1},
|
|
)
|
|
ds["m"] = ds.x > 1
|
|
|
|
actual = ds.interp(a=t, method="linear")
|
|
# with numeric only
|
|
expected = ds[["x"]].interp(a=t, method="linear")
|
|
assert_identical(actual[["x"]], expected)
|
|
|
|
|
|
@requires_dask
|
|
@requires_scipy
|
|
def test_interp_vectorized_dask() -> None:
|
|
# Synthetic dataset chunked in the two interpolation dimensions
|
|
import dask.array as da
|
|
|
|
nt = 10
|
|
nlat = 20
|
|
nlon = 10
|
|
nq = 21
|
|
ds = xr.Dataset(
|
|
data_vars={
|
|
"foo": (
|
|
("lat", "lon", "dayofyear", "q"),
|
|
da.random.random((nlat, nlon, nt, nq), chunks=(10, 10, 10, -1)),
|
|
),
|
|
"bar": (("lat", "lon"), da.random.random((nlat, nlon), chunks=(10, 10))),
|
|
},
|
|
coords={
|
|
"lat": np.linspace(-89.5, 89.6, nlat),
|
|
"lon": np.linspace(-179.5, 179.6, nlon),
|
|
"dayofyear": np.arange(0, nt),
|
|
"q": np.linspace(0, 1, nq),
|
|
},
|
|
)
|
|
|
|
# Interpolate along non-chunked dimension
|
|
with raise_if_dask_computes():
|
|
actual = ds.interp(q=ds["bar"], kwargs={"fill_value": None})
|
|
expected = ds.compute().interp(q=ds["bar"], kwargs={"fill_value": None})
|
|
assert_identical(actual, expected)
|
|
|
|
|
|
@requires_scipy
|
|
@pytest.mark.parametrize(
|
|
"chunk",
|
|
[
|
|
pytest.param(
|
|
True, marks=pytest.mark.skipif(not has_dask, reason="requires_dask")
|
|
),
|
|
False,
|
|
],
|
|
)
|
|
def test_interp_vectorized_shared_dims(chunk: bool) -> None:
|
|
# GH4463
|
|
da = xr.DataArray(
|
|
[[[1, 2, 3], [2, 3, 4]], [[1, 2, 3], [2, 3, 4]]],
|
|
dims=("t", "x", "y"),
|
|
coords={"x": [1, 2], "y": [1, 2, 3], "t": [10, 12]},
|
|
)
|
|
dy = xr.DataArray([1.5, 2.5], dims=("u",), coords={"u": [45, 55]})
|
|
dx = xr.DataArray(
|
|
[[1.5, 1.5], [1.5, 1.5]], dims=("t", "u"), coords={"u": [45, 55], "t": [10, 12]}
|
|
)
|
|
if chunk:
|
|
da = da.chunk(t=1)
|
|
with raise_if_dask_computes():
|
|
actual = da.interp(y=dy, x=dx, method="linear")
|
|
expected = xr.DataArray(
|
|
[[2, 3], [2, 3]],
|
|
dims=("t", "u"),
|
|
coords={"u": [45, 55], "t": [10, 12], "x": dx, "y": dy},
|
|
)
|
|
assert_identical(actual, expected)
|