Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add axis to apply_ragged and unpack_ragged #281

Merged
merged 7 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 50 additions & 11 deletions clouddrift/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@

def apply_ragged(
func: callable,
arrays: list[np.ndarray],
rowsize: list[int],
arrays: Union[list[Union[np.ndarray, xr.DataArray]], np.ndarray, xr.DataArray],
rowsize: Union[list[int], np.ndarray[int], xr.DataArray],
*args: tuple,
rows: Union[int, Iterable[int]] = None,
axis: int = 0,
executor: futures.Executor = futures.ThreadPoolExecutor(max_workers=None),
**kwargs: dict,
) -> Union[tuple[np.ndarray], np.ndarray]:
Expand All @@ -28,6 +29,13 @@ def apply_ragged(
indicated by row sizes ``rowsize``. The output of ``func`` will be
concatenated into a single ragged array.

You can pass ``arrays`` as NumPy arrays or xarray DataArrays, however,
the result will always be a NumPy array. Passing ``rows`` as an integer or
a sequence of integers will make ``apply_ragged`` process and return only
those specific rows, and otherwise, all rows in the input ragged array will
be processed. Further, you can use the ``axis`` parameter to specify the
ragged axis of the input array(s) (default is 0).

milancurcic marked this conversation as resolved.
Show resolved Hide resolved
By default this function uses ``concurrent.futures.ThreadPoolExecutor`` to
run ``func`` in multiple threads. The number of threads can be controlled by
passing the ``max_workers`` argument to the executor instance passed to
Expand All @@ -40,15 +48,17 @@ def apply_ragged(
----------
func : callable
Function to apply to each row of each ragged array in ``arrays``.
arrays : list[np.ndarray] or np.ndarray
arrays : list[np.ndarray] or np.ndarray or xr.DataArray
An array or a list of arrays to apply ``func`` to.
rowsize : list
rowsize : list[int] or np.ndarray[int] or xr.DataArray[int]
List of integers specifying the number of data points in each row.
*args : tuple
Additional arguments to pass to ``func``.
rows : int or Iterable[int], optional
The row(s) of the ragged array to apply ``func`` to. If ``rows`` is
milancurcic marked this conversation as resolved.
Show resolved Hide resolved
``None`` (default), then ``func`` will be applied to all rows.
axis : int, optional
The ragged axis of the input arrays. Default is 0.
executor : concurrent.futures.Executor, optional
Executor to use for concurrent execution. Default is ``ThreadPoolExecutor``
with the default number of ``max_workers``.
Expand Down Expand Up @@ -97,27 +107,51 @@ def apply_ragged(
arrays = [arrays]
# validate rowsize
for arr in arrays:
if not sum(rowsize) == len(arr):
if not sum(rowsize) == arr.shape[axis]:
raise ValueError("The sum of rowsize must equal the length of arr.")

# split the array(s) into trajectories
arrays = [unpack_ragged(arr, rowsize, rows) for arr in arrays]
arrays = [unpack_ragged(np.array(arr), rowsize, rows, axis) for arr in arrays]
iter = [[arrays[i][j] for i in range(len(arrays))] for j in range(len(arrays[0]))]

# parallel execution
res = [executor.submit(func, *x, *args, **kwargs) for x in iter]
res = [r.result() for r in res]

# concatenate the outputs
# Concatenate the outputs.

# The following wraps items in a list if they are not already iterable.
res = [item if isinstance(item, Iterable) else [item] for item in res]

# np.concatenate can concatenate along non-zero axis iff the length of
# arrays to be concatenated is > 1. If the length is 1, for example in the
# case of func that reduces over the non-ragged axis, we can only
# concatenate along axis 0.
if isinstance(res[0], tuple): # more than 1 parameter
outputs = []
for i in range(len(res[0])):
outputs.append(np.concatenate([r[i] for r in res]))
for i in range(len(res[0])): # iterate over each result variable
# If we have multiple outputs and func is a reduction function,
# we now here have a list of scalars. We need to wrap them in a
# list to concatenate them.
result = [r[i] if isinstance(r[i], Iterable) else [r[i]] for r in res]
if len(result[0]) > 1:
# Arrays to concatenate are longer than 1 element, so we can
# concatenate along the non-zero axis.
outputs.append(np.concatenate(result, axis=axis))
else:
# Arrays to concatenate are 1 element long, so we can only
# concatenate along axis 0.
outputs.append(np.concatenate(result))
return tuple(outputs)
else:
return np.concatenate(res)
if len(res[0]) > 1:
# Arrays to concatenate are longer than 1 element, so we can
# concatenate along the non-zero axis.
return np.concatenate(res, axis=axis)
else:
# Arrays to concatenate are 1 element long, so we can only
# concatenate along axis 0.
return np.concatenate(res)


def chunk(
Expand Down Expand Up @@ -1104,6 +1138,7 @@ def unpack_ragged(
ragged_array: np.ndarray,
rowsize: np.ndarray[int],
rows: Union[int, Iterable[int]] = None,
axis: int = 0,
) -> list[np.ndarray]:
"""Unpack a ragged array into a list of regular arrays.

Expand All @@ -1120,6 +1155,8 @@ def unpack_ragged(
array
rows : int or Iterable[int], optional
A row or list of rows to unpack. Default is None, which unpacks all rows.
axis : int, optional
The axis along which to unpack the ragged array. Default is 0.

Returns
-------
Expand Down Expand Up @@ -1158,4 +1195,6 @@ def unpack_ragged(
if isinstance(rows, int):
rows = [rows]

return [ragged_array[indices[n] : indices[n + 1]] for n in rows]
unpacked = np.split(ragged_array, indices[1:-1], axis=axis)

return [unpacked[i] for i in rows]
5 changes: 0 additions & 5 deletions clouddrift/sphere.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,11 +709,6 @@ def coriolis_frequency(
f : float or np.ndarray
Signed Coriolis frequency in radian per seconds.

Raises
------
Warning
Raised if the input latitudes are not in the expected range [-90, 90].

Examples
--------
>>> f = coriolis_frequency(np.array([0, 45, 90]))
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "clouddrift"
version = "0.22.0"
version = "0.23.0"
authors = [
{ name="Shane Elipot", email="selipot@miami.edu" },
{ name="Philippe Miron", email="philippemiron@gmail.com" },
Expand Down
69 changes: 52 additions & 17 deletions tests/analysis_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,58 @@ def test_simple_with_kwargs(self):
)
self.assertTrue(np.all(y == np.array([1, 4, 9, 16])))

def test_with_rows(self):
y = apply_ragged(
lambda x: x**2,
np.array([1, 2, 3, 4]),
[2, 2],
rows=0,
)
self.assertTrue(np.all(y == np.array([1, 4])))

y = apply_ragged(
lambda x: x**2,
np.array([1, 2, 3, 4]),
[2, 2],
rows=[0, 1],
)
self.assertTrue(np.all(y == np.array([1, 4, 9, 16])))

def test_with_axis(self):
# Test that axis=0 is the default.
x = np.arange((6)).reshape((3, 2))
func = lambda x: x**2
rowsize = [2, 1]
y = apply_ragged(func, x, rowsize, axis=0)
self.assertTrue(np.all(y == apply_ragged(func, x, rowsize)))

# Test that the rowsize is checked against the correct axis.
with self.assertRaises(ValueError):
y = apply_ragged(func, x.T, rowsize, axis=0)

# Test that applying an element-wise function on a 2-d array over
# ragged axis 1 is th same as applying it to the transpose over ragged
# axis 0.
rowsize = [1, 1]
y0 = apply_ragged(func, x.T, rowsize, axis=0)
y1 = apply_ragged(func, x, rowsize, axis=1)
self.assertTrue(np.all(y0 == y1.T))

# Test that axis=1 works with reduction over the non-ragged axis.
y = apply_ragged(np.sum, x, rowsize, axis=1)
self.assertTrue(np.all(y == np.sum(x, axis=0)))

# Test that the same works with xr.DataArray as input
# (this did not work before the axis feature was added).
y = apply_ragged(np.sum, xr.DataArray(data=x), rowsize, axis=1)
self.assertTrue(np.all(y == np.sum(x, axis=0)))

# Test that axis works for multiple outputs:
func = lambda x: (np.mean(x), np.std(x))
y = apply_ragged(func, x, rowsize, axis=1)
self.assertTrue(np.all(y[0] == np.mean(x, axis=0)))
self.assertTrue(np.all(y[1] == np.std(x, axis=0)))

def test_velocity_ndarray(self):
for executor in [futures.ThreadPoolExecutor(), futures.ProcessPoolExecutor()]:
u, v = apply_ragged(
Expand All @@ -739,23 +791,6 @@ def test_velocity_ndarray(self):
)
)

def test_with_rows(self):
y = apply_ragged(
lambda x: x**2,
np.array([1, 2, 3, 4]),
[2, 2],
rows=0,
)
self.assertTrue(np.all(y == np.array([1, 4])))

y = apply_ragged(
lambda x: x**2,
np.array([1, 2, 3, 4]),
[2, 2],
rows=[0, 1],
)
self.assertTrue(np.all(y == np.array([1, 4, 9, 16])))

def test_velocity_dataarray(self):
for executor in [futures.ThreadPoolExecutor(), futures.ProcessPoolExecutor()]:
u, v = apply_ragged(
Expand Down
6 changes: 0 additions & 6 deletions tests/sphere_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,3 @@ def test_coriolis_frequency_values(self):
]
)
self.assertTrue(np.allclose(f, f_expected))

def test_coriolis_frequency_warning(self):
with self.assertWarns(Warning):
coriolis_frequency(91)
with self.assertWarns(Warning):
coriolis_frequency(-91)