Skip to content

Commit

Permalink
dev
Browse files Browse the repository at this point in the history
  • Loading branch information
davidhassell committed Sep 6, 2023
1 parent 229e383 commit 82efa8b
Show file tree
Hide file tree
Showing 21 changed files with 804 additions and 428 deletions.
1 change: 1 addition & 0 deletions cf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@
FullArray,
GatheredArray,
NetCDFArray,
PointTopologyArray,
RaggedContiguousArray,
RaggedIndexedArray,
RaggedIndexedContiguousArray,
Expand Down
3 changes: 3 additions & 0 deletions cf/cfimplementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
CFANetCDFArray,
GatheredArray,
NetCDFArray,
PointTopologyArray,
RaggedContiguousArray,
RaggedIndexedArray,
RaggedIndexedContiguousArray,
Expand Down Expand Up @@ -175,6 +176,7 @@ def initialise_CFANetCDFArray(
CellConnectivityArray=CellConnectivityArray,
GatheredArray=GatheredArray,
NetCDFArray=NetCDFArray,
PointTopologyArray=PointTopologyArray,
RaggedContiguousArray=RaggedContiguousArray,
RaggedIndexedArray=RaggedIndexedArray,
RaggedIndexedContiguousArray=RaggedIndexedContiguousArray,
Expand Down Expand Up @@ -229,6 +231,7 @@ def implementation():
'Data': cf.data.data.Data,
'GatheredArray': cf.data.array.gatheredarray.GatheredArray,
'NetCDFArray': cf.data.array.netcdfarray.NetCDFArray,
'PointTopologyArray': <class 'cf.data.array.pointtopologyarray.PointTopologyArray'>,
'RaggedContiguousArray': cf.data.array.raggedcontiguousarray.RaggedContiguousArray,
'RaggedIndexedArray': cf.data.array.raggedindexedarray.RaggedIndexedArray,
'RaggedIndexedContiguousArray': cf.data.array.raggedindexedcontiguousarray.RaggedIndexedContiguousArray,
Expand Down
1 change: 1 addition & 0 deletions cf/data/array/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .fullarray import FullArray
from .gatheredarray import GatheredArray
from .netcdfarray import NetCDFArray
from .pointtopologyarray import PointTopologyArray
from .raggedcontiguousarray import RaggedContiguousArray
from .raggedindexedarray import RaggedIndexedArray
from .raggedindexedcontiguousarray import RaggedIndexedContiguousArray
Expand Down
105 changes: 8 additions & 97 deletions cf/data/array/boundsfromnodesarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,15 @@ class BoundsFromNodesArray(
Container,
cfdm.BoundsFromNodesArray,
):
"""An underlying gathered array.
"""An array of cell bounds defined by UGRID node coordinates.
TODOUGRID
The UGRID node coordinates contain the locations of the nodes of
the domain topology. In UGRID, the bounds of edge, face and volume
cells may be defined by the these locations in conjunction with a
mapping from from each cell boundary vertex to its corresponding
coordinate value.
Compression by gathering combines axes of a multidimensional array
into a new, discrete axis whilst omitting the missing values and
thus reducing the number of values that need to be stored.
The information needed to uncompress the data is stored in a "list
variable" that gives the indices of the required points.
See CF section 8.2. "Lossless Compression by Gathering".
.. versionadded:: TODOUGRIDVER
.. versionadded:: UGRIDVER
"""

Expand All @@ -33,91 +28,7 @@ def __repr__(self):
x.__repr__() <==> repr(x)
.. versionadded:: TODOUGRIDVER
.. versionadded:: UGRIDVER
"""
return super().__repr__().replace("<", "<CF ", 1)


# def to_dask_array(self, chunks="auto"):
# """Convert the data to a `dask` array.
#
# .. versionadded:: TODOUGRIDVER
#
# :Parameters:
#
# chunks: `int`, `tuple`, `dict` or `str`, optional
# Specify the chunking of the returned dask array.
#
# Any value accepted by the *chunks* parameter of the
# `dask.array.from_array` function is allowed.
#
# The chunk sizes implied by *chunks* for a dimension that
# has been fragmented are ignored and replaced with values
# that are implied by that dimensions fragment sizes.
#
# :Returns:
#
# `dask.array.Array`
# The `dask` array representation.
#
# """
# from functools import partial
#
# import dask.array as da
# from dask import config
# from dask.array.core import getter, normalize_chunks
# from dask.base import tokenize
#
# name = (f"{self.__class__.__name__}-{tokenize(self)}",)
#
# dtype = self.dtype
#
# context = partial(config.set, scheduler="synchronous")
#
# compressed_dimensions = self.compressed_dimensions()
# conformed_data = self.conformed_data()
#
# # If possible, convert the compressed data to a dask array
# # that doesn't support concurrent reads. This prevents
# # "compute called by compute" failures problems at compute
# # time.
# #
# # TODO: This won't be necessary if this is refactored so that
# # the compressed data is part of the same dask graph as
# # the compressed subarrays.
# conformed_data = {
# k: self._lock_file_read(v) for k, v in conformed_data.items()
# }
#
# # Get the (cfdm) subarray class
# Subarray = self.get_Subarray()
# subarray_name = Subarray().__class__.__name__
#
# # Set the chunk sizes for the dask array
# chunks = self.subarray_shapes(chunks)
# chunks = normalize_chunks(
# self.subarray_shapes(chunks),
# shape=self.shape,
# dtype=dtype,
# )
#
# dsk = {}
# for u_indices, u_shape, c_indices, chunk_location in zip(
# *self.subarrays(chunks)
# ):
# subarray = Subarray(
# indices=c_indices,
# shape=u_shape,
# compressed_dimensions=compressed_dimensions,
# context_manager=context,
# **conformed_data,
# )
#
# key = f"{subarray_name}-{tokenize(subarray)}"
# dsk[key] = subarray
#
# dsk[name + chunk_location] = (getter, key, Ellipsis, False, False)
#
# # Return the dask array
# return da.Array(dsk, name[0], chunks=chunks, dtype=dtype)
21 changes: 9 additions & 12 deletions cf/data/array/cellconnectivityarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,17 @@ class CellConnectivityArray(
Container,
cfdm.CellConnectivityArray,
):
"""An underlying gathered array.
"""A connectivity array derived from a UGRID connectivity variable.
TODOUGRID
A UGRID connectivity variable contains indices which map each cell
to its neighbours, as found in a UGRID "edge_edge_connectivty",
"face_face_connectivty", or "volume_volume_connectivty" variable.
Compression by gathering combines axes of a multidimensional array
into a new, discrete axis whilst omitting the missing values and
thus reducing the number of values that need to be stored.
The connectivity array has one more column than the corresponding
UGRID variable. The extra column, in the first position, contains
the identifier for each cell.
The information needed to uncompress the data is stored in a "list
variable" that gives the indices of the required points.
See CF section 8.2. "Lossless Compression by Gathering".
.. versionadded:: TODOUGRIDVER
.. versionadded:: UGRIDVER
"""

Expand All @@ -33,7 +30,7 @@ def __repr__(self):
x.__repr__() <==> repr(x)
.. versionadded:: TODOUGRIDVER
.. versionadded:: UGRIDVER
"""
return super().__repr__().replace("<", "<CF ", 1)
171 changes: 88 additions & 83 deletions cf/data/array/gatheredarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,86 +32,91 @@ def __repr__(self):
"""
return super().__repr__().replace("<", "<CF ", 1)

def to_dask_array(self, chunks="auto"):
"""Convert the data to a `dask` array.
.. versionadded:: 3.14.0
:Parameters:
chunks: `int`, `tuple`, `dict` or `str`, optional
Specify the chunking of the returned dask array.
Any value accepted by the *chunks* parameter of the
`dask.array.from_array` function is allowed.
The chunk sizes implied by *chunks* for a dimension that
has been fragmented are ignored and replaced with values
that are implied by that dimensions fragment sizes.
:Returns:
`dask.array.Array`
The `dask` array representation.
"""
from functools import partial

import dask.array as da
from dask import config
from dask.array.core import getter, normalize_chunks
from dask.base import tokenize

name = (f"{self.__class__.__name__}-{tokenize(self)}",)

dtype = self.dtype

context = partial(config.set, scheduler="synchronous")

compressed_dimensions = self.compressed_dimensions()
conformed_data = self.conformed_data()
compressed_data = conformed_data["data"]
uncompressed_indices = conformed_data["uncompressed_indices"]

# If possible, convert the compressed data to a dask array
# that doesn't support concurrent reads. This prevents
# "compute called by compute" failures problems at compute
# time.
#
# TODO: This won't be necessary if this is refactored so that
# the compressed data is part of the same dask graph as
# the compressed subarrays.
compressed_data = self._lock_file_read(compressed_data)

# Get the (cfdm) subarray class
Subarray = self.get_Subarray()
subarray_name = Subarray().__class__.__name__

# Set the chunk sizes for the dask array
chunks = self.subarray_shapes(chunks)
chunks = normalize_chunks(
self.subarray_shapes(chunks),
shape=self.shape,
dtype=dtype,
)

dsk = {}
for u_indices, u_shape, c_indices, chunk_location in zip(
*self.subarrays(chunks)
):
subarray = Subarray(
data=compressed_data,
indices=c_indices,
shape=u_shape,
compressed_dimensions=compressed_dimensions,
uncompressed_indices=uncompressed_indices,
context_manager=context,
)

key = f"{subarray_name}-{tokenize(subarray)}"
dsk[key] = subarray

dsk[name + chunk_location] = (getter, key, Ellipsis, False, False)

# Return the dask array
return da.Array(dsk, name[0], chunks=chunks, dtype=dtype)
def subarray_parameters(self):
"""TODOUGRID"""
return {}


# def to_dask_array(self, chunks="auto"):
# """Convert the data to a `dask` array.
#
# .. versionadded:: 3.14.0
#
# :Parameters:
#
# chunks: `int`, `tuple`, `dict` or `str`, optional
# Specify the chunking of the returned dask array.
#
# Any value accepted by the *chunks* parameter of the
# `dask.array.from_array` function is allowed.
#
# The chunk sizes implied by *chunks* for a dimension that
# has been fragmented are ignored and replaced with values
# that are implied by that dimensions fragment sizes.
#
# :Returns:
#
# `dask.array.Array`
# The `dask` array representation.
#
# """
# from functools import partial
#
# import dask.array as da
# from dask import config
# from dask.array.core import getter, normalize_chunks
# from dask.base import tokenize
#
# name = (f"{self.__class__.__name__}-{tokenize(self)}",)
#
# dtype = self.dtype
#
# context = partial(config.set, scheduler="synchronous")
#
# compressed_dimensions = self.compressed_dimensions()
# conformed_data = self.conformed_data()
# compressed_data = conformed_data["data"]
# uncompressed_indices = conformed_data["uncompressed_indices"]
#
# # If possible, convert the compressed data to a dask array
# # that doesn't support concurrent reads. This prevents
# # "compute called by compute" failures problems at compute
# # time.
# #
# # TODO: This won't be necessary if this is refactored so that
# # the compressed data is part of the same dask graph as
# # the compressed subarrays.
# compressed_data = self._lock_file_read(compressed_data)
#
# # Get the (cfdm) subarray class
# Subarray = self.get_Subarray()
# subarray_name = Subarray().__class__.__name__
#
# # Set the chunk sizes for the dask array
# # chunks = self.subarray_shapes(chunks)
# chunks = normalize_chunks(
# self.subarray_shapes(chunks),
# shape=self.shape,
# dtype=dtype,
# )
#
# dsk = {}
# for u_indices, u_shape, c_indices, chunk_location in zip(
# *self.subarrays(chunks)
# ):
# subarray = Subarray(
# data=compressed_data,
# indices=c_indices,
# shape=u_shape,
# compressed_dimensions=compressed_dimensions,
# uncompressed_indices=uncompressed_indices,
# context_manager=context,
# )
#
# key = f"{subarray_name}-{tokenize(subarray)}"
# dsk[key] = subarray
#
# dsk[name + chunk_location] = (getter, key, Ellipsis, False, False)
#
# # Return the dask array
# return da.Array(dsk, name[0], chunks=chunks, dtype=dtype)
3 changes: 2 additions & 1 deletion cf/data/array/mixin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
from .compressedarraymixin import CompressedArrayMixin
from .filearraymixin import FileArrayMixin
from .mesharraymixin import MeshArrayMixin
from .raggedarraymixin import RaggedArrayMixin

# from .raggedarraymixin import RaggedArrayMixin
Loading

0 comments on commit 82efa8b

Please sign in to comment.