Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lazy_load parameter to open() #573

Merged
merged 8 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
constructor. It can also be controlled with the existing
``set_array_storage`` method of ``AsdfFile`` and the ``all_array_storage``
argument to ``AsdfFile.write_to``. [#557]
- Add new parameter ``lazy_load`` to ``AsdfFile.open``. It is ``True`` by
default and preserves the default behavior. ``False`` detaches the
loaded tree from the underlying file: all blocks are fully read and
numpy arrays are materialized. Thus it becomes safe to close the file
and continue using ``AsdfFile.tree``. However, ``copy_arrays`` parameter
is still effective and the active memory maps may still require the file
to stay open in case ``copy_arrays`` is ``False``. [#573]

2.1.1 (unreleased)
------------------
Expand Down
31 changes: 27 additions & 4 deletions asdf/asdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class AsdfFile(versioning.VersionedMixin):
def __init__(self, tree=None, uri=None, extensions=None, version=None,
ignore_version_mismatch=True, ignore_unrecognized_tag=False,
ignore_implicit_conversion=False, copy_arrays=False,
custom_schema=None, inline_threshold=None):
lazy_load=True, custom_schema=None, inline_threshold=None):
"""
Parameters
----------
Expand Down Expand Up @@ -92,6 +92,16 @@ def __init__(self, tree=None, uri=None, extensions=None, version=None,
When `False`, when reading files, attempt to memmap underlying data
arrays when possible.

lazy_load : bool, optional
When `True` and the underlying file handle is seekable, data
arrays will only be loaded lazily: i.e. when they are accessed
for the first time. In this case the underlying file must stay
open during the lifetime of the tree. Setting to False causes
all data arrays to be loaded up front, which means that they
can be accessed even after the underlying file is closed.
Note: even if `lazy_load` is `False`, `copy_arrays` is still taken
into account.

custom_schema : str, optional
Path to a custom schema file that will be used for a secondary
validation pass. This can be used to ensure that particular ASDF
Expand Down Expand Up @@ -122,8 +132,9 @@ def __init__(self, tree=None, uri=None, extensions=None, version=None,
self._fd = None
self._closed = False
self._external_asdf_by_uri = {}
self._blocks = block.BlockManager(self, copy_arrays=copy_arrays,
inline_threshold=inline_threshold)
self._blocks = block.BlockManager(
self, copy_arrays=copy_arrays, inline_threshold=inline_threshold,
lazy_load=lazy_load)
self._uri = None
if tree is None:
self.tree = {}
Expand Down Expand Up @@ -706,6 +717,7 @@ def open(cls, fd, uri=None, mode='r',
ignore_unrecognized_tag=False,
_force_raw_types=False,
copy_arrays=False,
lazy_load=True,
custom_schema=None,
strict_extension_check=False,
ignore_missing_extensions=False):
Expand Down Expand Up @@ -749,6 +761,16 @@ def open(cls, fd, uri=None, mode='r',
When `False`, when reading files, attempt to memmap underlying data
arrays when possible.

lazy_load : bool, optional
When `True` and the underlying file handle is seekable, data
arrays will only be loaded lazily: i.e. when they are accessed
for the first time. In this case the underlying file must stay
open during the lifetime of the tree. Setting to False causes
all data arrays to be loaded up front, which means that they
can be accessed even after the underlying file is closed.
Note: even if `lazy_load` is `False`, `copy_arrays` is still taken
into account.

custom_schema : str, optional
Path to a custom schema file that will be used for a secondary
validation pass. This can be used to ensure that particular ASDF
Expand All @@ -775,7 +797,8 @@ def open(cls, fd, uri=None, mode='r',
self = cls(extensions=extensions,
ignore_version_mismatch=ignore_version_mismatch,
ignore_unrecognized_tag=ignore_unrecognized_tag,
copy_arrays=copy_arrays, custom_schema=custom_schema)
copy_arrays=copy_arrays, lazy_load=lazy_load,
custom_schema=custom_schema)

return cls._open_impl(
self, fd, uri=uri, mode=mode,
Expand Down
129 changes: 90 additions & 39 deletions asdf/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class BlockManager(object):
"""
Manages the `Block`s associated with a ASDF file.
"""
def __init__(self, asdffile, copy_arrays=False, inline_threshold=None):
def __init__(self, asdffile, copy_arrays=False, inline_threshold=None,
lazy_load=True):
self._asdffile = weakref.ref(asdffile)

self._internal_blocks = []
Expand All @@ -56,6 +57,7 @@ def __init__(self, asdffile, copy_arrays=False, inline_threshold=None):
self._data_to_block_mapping = {}
self._validate_checksums = False
self._memmap = not copy_arrays
self._lazy_load = lazy_load

def __len__(self):
"""
Expand Down Expand Up @@ -191,6 +193,21 @@ def inline_blocks(self):
for block in self._inline_blocks:
yield block

@property
def memmap(self):
"""
The flag which indicates whether the arrays are memory mapped
to the underlying file.
"""
return self._memmap

@property
def lazy_load(self):
"""
The flag which indicates whether the blocks are lazily read.
"""
return self._lazy_load

def has_blocks_with_offset(self):
"""
Returns `True` if any of the internal blocks currently have an
Expand All @@ -201,6 +218,9 @@ def has_blocks_with_offset(self):
return True
return False

def _new_block(self):
return Block(memmap=self.memmap, lazy_load=self.lazy_load)

def _sort_blocks_by_offset(self):
def sorter(x):
if x.offset is None:
Expand All @@ -212,7 +232,7 @@ def sorter(x):
def _read_next_internal_block(self, fd, past_magic=False):
# This assumes the file pointer is at the beginning of the
# block, (or beginning + 4 if past_magic is True)
block = Block(memmap=self._memmap).read(
block = self._new_block().read(
fd, past_magic=past_magic,
validate_checksum=self._validate_checksums)
if block is not None:
Expand Down Expand Up @@ -262,22 +282,23 @@ def finish_reading_internal_blocks(self):
This is called before updating a file, since updating requires
knowledge of all internal blocks in the file.
"""
if len(self._internal_blocks):
for i, block in enumerate(self._internal_blocks):
if isinstance(block, UnloadedBlock):
block.load()

last_block = self._internal_blocks[-1]

# Read all of the remaining blocks in the file, if any
if (last_block._fd is not None and
last_block._fd.seekable()):
last_block._fd.seek(last_block.end_offset)
while True:
last_block = self._read_next_internal_block(
last_block._fd, False)
if last_block is None:
break
if not self._internal_blocks:
return
vmarkovtsev marked this conversation as resolved.
Show resolved Hide resolved
for i, block in enumerate(self._internal_blocks):
if isinstance(block, UnloadedBlock):
block.load()

last_block = self._internal_blocks[-1]

# Read all of the remaining blocks in the file, if any
if (last_block._fd is not None and
last_block._fd.seekable()):
last_block._fd.seek(last_block.end_offset)
while True:
last_block = self._read_next_internal_block(
last_block._fd, False)
if last_block is None:
break

def write_internal_blocks_serial(self, fd, pad_blocks=False):
"""
Expand Down Expand Up @@ -499,7 +520,7 @@ def read_block_index(self, fd, ctx):
# make sure it makes sense.
fd.seek(offsets[-1], generic_io.SEEK_SET)
try:
block = Block(memmap=self._memmap).read(fd)
block = self._new_block().read(fd)
except (ValueError, IOError):
return

Expand All @@ -511,11 +532,16 @@ def read_block_index(self, fd, ctx):
# objects
for offset in offsets[1:-1]:
self._internal_blocks.append(
UnloadedBlock(fd, offset, memmap=self._memmap))
UnloadedBlock(fd, offset,
memmap=self.memmap, lazy_load=self.lazy_load))

# We already read the last block in the file -- no need to read it again
self._internal_blocks.append(block)

# Materialize the internal blocks if we are not lazy
if not self.lazy_load:
self.finish_reading_internal_blocks()

def get_external_filename(self, filename, index):
"""
Given a main filename and an index number, return a new file
Expand Down Expand Up @@ -788,7 +814,8 @@ class Block(object):
('checksum', '16s')
])

def __init__(self, data=None, uri=None, array_storage='internal', memmap=True):
def __init__(self, data=None, uri=None, array_storage='internal',
memmap=True, lazy_load=True):
if isinstance(data, np.ndarray) and not data.flags.c_contiguous:
if data.flags.f_contiguous:
self._data = np.asfortranarray(data)
Expand All @@ -806,6 +833,7 @@ def __init__(self, data=None, uri=None, array_storage='internal', memmap=True):
self._checksum = None
self._should_memmap = memmap
self._memmapped = False
self._lazy_load = lazy_load

self.update_size()
self._allocated = self._size
Expand Down Expand Up @@ -943,10 +971,10 @@ def read(self, fd, past_magic=False, validate_checksum=False):
"""
Read a Block from the given Python file-like object.

If the file is seekable, the reading or memmapping of the
actual data is postponed until an array requests it. If the
file is a stream, the data will be read into memory
immediately.
If the file is seekable and lazy_load is True, the reading
or memmapping of the actual data is postponed until an array
requests it. If the file is a stream or lazy_load is False,
the data will be read into memory immediately.

Parameters
----------
Expand Down Expand Up @@ -1012,19 +1040,33 @@ def read(self, fd, past_magic=False, validate_checksum=False):
# If the file is seekable, we can delay reading the actual
# data until later.
self._fd = fd
self._header_size = header_size
self._offset = offset
self._header_size = header_size
if header['flags'] & constants.BLOCK_FLAG_STREAMED:
# Support streaming blocks
fd.fast_forward(-1)
self._array_storage = 'streamed'
self._data_size = self._size = self._allocated = \
(fd.tell() - self.data_offset) + 1
if self._lazy_load:
fd.fast_forward(-1)
self._data_size = self._size = self._allocated = \
(fd.tell() - self.data_offset) + 1
else:
self._data = fd.read_into_array(-1)
self._data_size = self._size = self._allocated = len(self._data)
else:
fd.fast_forward(header['allocated_size'])
self._allocated = header['allocated_size']
self._size = header['used_size']
self._data_size = header['data_size']
if self._lazy_load:
fd.fast_forward(self._allocated)
else:
curpos = fd.tell()
self._memmap_data()
fd.seek(curpos)
if not self._memmapped:
self._data = self._read_data(fd, self._size, self._data_size)
fd.fast_forward(self._allocated - self._size)
else:
fd.fast_forward(self._allocated)
else:
# If the file is a stream, we need to get the data now.
if header['flags'] & constants.BLOCK_FLAG_STREAMED:
Expand All @@ -1033,9 +1075,9 @@ def read(self, fd, past_magic=False, validate_checksum=False):
self._data = fd.read_into_array(-1)
self._data_size = self._size = self._allocated = len(self._data)
else:
self._data_size = header['data_size']
self._size = header['used_size']
self._allocated = header['allocated_size']
self._size = header['used_size']
self._data_size = header['data_size']
self._data = self._read_data(fd, self._size, self._data_size)
fd.fast_forward(self._allocated - self._size)
fd.close()
Expand All @@ -1048,12 +1090,24 @@ def read(self, fd, past_magic=False, validate_checksum=False):
return self

def _read_data(self, fd, used_size, data_size):
"""
Read the block data from a file.
"""
if not self.input_compression:
return fd.read_into_array(used_size)
else:
return mcompression.decompress(
fd, used_size, data_size, self.input_compression)

def _memmap_data(self):
"""
Memory map the block data from the file.
"""
memmap = self._fd.can_memmap() and not self.input_compression
if self._should_memmap and memmap:
self._data = self._fd.memmap_array(self.data_offset, self._size)
self._memmapped = True

def write(self, fd):
"""
Write an internal block to the given Python file-like object.
Expand Down Expand Up @@ -1130,12 +1184,8 @@ def data(self):
# Be nice and reset the file position after we're done
curpos = self._fd.tell()
try:
memmap = self._fd.can_memmap() and not self.input_compression
if self._should_memmap and memmap:
self._data = self._fd.memmap_array(
self.data_offset, self._size)
self._memmapped = True
else:
self._memmap_data()
if not self._memmapped:
self._fd.seek(self.data_offset)
self._data = self._read_data(
self._fd, self._size, self._data_size)
Expand Down Expand Up @@ -1165,7 +1215,7 @@ class UnloadedBlock(object):
full-fledged block whenever the underlying data or more detail is
requested.
"""
def __init__(self, fd, offset, memmap=True):
def __init__(self, fd, offset, memmap=True, lazy_load=True):
self._fd = fd
self._offset = offset
self._data = None
Expand All @@ -1176,6 +1226,7 @@ def __init__(self, fd, offset, memmap=True):
self._checksum = None
self._should_memmap = memmap
self._memmapped = False
self._lazy_load = lazy_load

def __len__(self):
self.load()
Expand Down
2 changes: 2 additions & 0 deletions asdf/tags/core/ndarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ def __init__(self, source, shape, dtype, offset, strides,
self._offset = offset
self._strides = strides
self._order = order
if not asdffile.blocks.lazy_load:
self._make_array()
vmarkovtsev marked this conversation as resolved.
Show resolved Hide resolved

def _make_array(self):
if self._array is None:
Expand Down
12 changes: 7 additions & 5 deletions asdf/tags/core/tests/test_ndarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,16 @@ def test_byteorder(tmpdir):
}

def check_asdf(asdf):
tree = asdf.tree
my_tree = asdf.tree
for endian in ('bigendian', 'little'):
assert my_tree[endian].dtype == tree[endian].dtype
vmarkovtsev marked this conversation as resolved.
Show resolved Hide resolved

if sys.byteorder == 'little':
assert tree['bigendian'].dtype.byteorder == '>'
assert tree['little'].dtype.byteorder == '='
assert my_tree['bigendian'].dtype.byteorder == '>'
assert my_tree['little'].dtype.byteorder == '='
else:
assert tree['bigendian'].dtype.byteorder == '='
assert tree['little'].dtype.byteorder == '<'
assert my_tree['bigendian'].dtype.byteorder == '='
assert my_tree['little'].dtype.byteorder == '<'

def check_raw_yaml(content):
assert b'byteorder: little' in content
Expand Down
Loading