Make all maps exclusive

Track dependencies in build results and dont try and complete model if dependencies not alreay completed (ie. need multiple passes to complete, which is the correct behavior)
Use base models in dynamic tables, we should fix the dynamictable base class rather than just having everything be an anonymous BaseModel in order to satisfy type validation
Clear completed queue when advancing stages - we should make sure to move everything to completed with maps rather than implicitly keeping things in completed when they aren't - so we might be getting old versions of results that aren't actually fully completed but have some reference stubs still in them.
This commit is contained in:
sneakers-the-rat 2023-09-27 20:07:39 -07:00
parent 9fcc1458fb
commit 6bbf56d1a0

View file

@ -98,6 +98,10 @@ class H5ReadResult(BaseModel):
""" """
Problems that occurred during resolution Problems that occurred during resolution
""" """
depends: List[HDF5_Path] = Field(default_factory=list)
"""
Other items that the final resolution of this item depends on
"""
FlatH5 = Dict[str, H5SourceItem] FlatH5 = Dict[str, H5SourceItem]
@ -105,7 +109,6 @@ FlatH5 = Dict[str, H5SourceItem]
class HDF5Map(ABC): class HDF5Map(ABC):
phase: ReadPhases phase: ReadPhases
exclusive: bool = False
""" """
If ``True``, if the check is fulfilled, no other maps can be applied this phase If ``True``, if the check is fulfilled, no other maps can be applied this phase
""" """
@ -145,23 +148,17 @@ class PruneEmpty(HDF5Map):
) )
# class ResolveVectorData(HDF5Map):
# """
# We will load vanilla VectorData as part of :class:`.ResolveDynamicTable`
# """
# phase = ReadPhases.read
#
# @classmethod
# def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
# if src.h5_type == 'group':
# return False
# if src.neurodata_type == 'VectorData':
#
class ResolveDynamicTable(HDF5Map): class ResolveDynamicTable(HDF5Map):
"""Handle loading a dynamic table!""" """
Handle loading a dynamic table!
Dynamic tables are sort of odd in that their models don't include their fields (except as a list of
strings in ``colnames`` ), so we need to create a new model that includes fields for each column,
and then we include the datasets as :class:`~.nwb_linkml.types.ndarray.NDArrayProxy` objects which
lazy load the arrays in a thread/process safe way.
This map also resolves
"""
phase = ReadPhases.read phase = ReadPhases.read
priority = 1 priority = 1
exclusive = True exclusive = True
@ -191,13 +188,7 @@ class ResolveDynamicTable(HDF5Map):
obj = h5f.get(src.path) obj = h5f.get(src.path)
# make a populated model :) # make a populated model :)
# TODO: use a tableproxy like NDArrayProxy to not load all these into memory base_model = provider.get_class(src.namespace, src.neurodata_type)
if src.neurodata_type != 'DynamicTable':
#base_model = provider.get_class(src.namespace, src.neurodata_type)
base_model = None
else:
base_model = None
model = dynamictable_to_model(obj, base=base_model) model = dynamictable_to_model(obj, base=base_model)
completes = ['/'.join([src.path, child]) for child in obj.keys()] completes = ['/'.join([src.path, child]) for child in obj.keys()]
@ -213,9 +204,47 @@ class ResolveDynamicTable(HDF5Map):
class ResolveModelGroup(HDF5Map): class ResolveModelGroup(HDF5Map):
"""
HDF5 Groups that have a model, as indicated by ``neurodata_type`` in their attrs.
We use the model to determine what fields we should get, and then stash references to the children to
process later as :class:`.HDF5_Path`
**Special Case:** Some groups like ``ProcessingGroup`` and others that have an arbitrary
number of named children have a special ``children`` field that is a dictionary mapping
names to the objects themselves.
So for example, this:
/processing/
eye_tracking/
cr_ellipse_fits/
center_x
center_y
...
eye_ellipse_fits/
...
pupil_ellipse_fits/
...
eye_tracking_rig_metadata/
...
would pack the ``eye_tracking`` group (a ``ProcessingModule`` ) as:
{
"name": "eye_tracking",
"children": {
"cr_ellipse_fits": HDF5_Path('/processing/eye_tracking/cr_ellipse_fits'),
"eye_ellipse_fits" : HDF5_Path('/processing/eye_tracking/eye_ellipse_fits'),
...
}
}
We will do some nice things in the model metaclass to make it possible to access the children like
``nwbfile.processing.cr_ellipse_fits.center_x`` rather than having to switch between indexing and
attribute access :)
"""
phase = ReadPhases.read phase = ReadPhases.read
priority = 10 # do this generally last priority = 10 # do this generally last
exclusive = True
@classmethod @classmethod
def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
@ -226,24 +255,23 @@ class ResolveModelGroup(HDF5Map):
@classmethod @classmethod
def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult:
model = provider.get_class(src.namespace, src.neurodata_type) model = provider.get_class(src.namespace, src.neurodata_type)
res = {} res = {}
depends = []
with h5py.File(src.h5f_path, 'r') as h5f: with h5py.File(src.h5f_path, 'r') as h5f:
obj = h5f.get(src.path) obj = h5f.get(src.path)
for key, type in model.model_fields.items(): for key, type in model.model_fields.items():
if key in obj.attrs: if key == 'children':
res[key] = {name: HDF5_Path(child.name) for name, child in obj.items()}
depends.extend([HDF5_Path(child.name) for child in obj.values()])
elif key in obj.attrs:
res[key] = obj.attrs[key] res[key] = obj.attrs[key]
continue continue
if key in obj.keys(): elif key in obj.keys():
# stash a reference to this, we'll compile it at the end # stash a reference to this, we'll compile it at the end
if src.path == '/': depends.append(HDF5_Path(obj[key].name))
target_path = '/' + key res[key] = HDF5_Path(obj[key].name)
else:
target_path = '/'.join([src.path, key])
res[key] = HDF5_Path(target_path)
res['hdf5_path'] = src.path res['hdf5_path'] = src.path
res['name'] = src.parts[-1] res['name'] = src.parts[-1]
@ -255,14 +283,21 @@ class ResolveModelGroup(HDF5Map):
model = model, model = model,
namespace=src.namespace, namespace=src.namespace,
neurodata_type=src.neurodata_type, neurodata_type=src.neurodata_type,
applied=['ResolveModelGroup'] applied=['ResolveModelGroup'],
depends=depends
) )
class ResolveDatasetAsDict(HDF5Map): class ResolveDatasetAsDict(HDF5Map):
"""Mutually exclusive with :class:`.ResolveScalars`""" """
Resolve datasets that do not have a ``neurodata_type`` of their own as a dictionary
that will be packaged into a model in the next step. Grabs the array in an :class:`~nwb_linkml.types.ndarray.NDArrayProxy`
under an ``array`` key, and then grabs any additional ``attrs`` as well.
Mutually exclusive with :class:`.ResolveScalars` - this only applies to datasets that are larger
than a single entry.
"""
phase = ReadPhases.read phase = ReadPhases.read
priority = 11 priority = 11
exclusive = True
@classmethod @classmethod
def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
@ -295,7 +330,6 @@ class ResolveDatasetAsDict(HDF5Map):
class ResolveScalars(HDF5Map): class ResolveScalars(HDF5Map):
phase = ReadPhases.read phase = ReadPhases.read
priority = 11 #catchall priority = 11 #catchall
exclusive = True
@classmethod @classmethod
def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
@ -322,6 +356,11 @@ class ResolveScalars(HDF5Map):
) )
class ResolveContainerGroups(HDF5Map): class ResolveContainerGroups(HDF5Map):
"""
Groups like ``/acquisition``` and others that have no ``neurodata_type``
(and thus no model) are returned as a dictionary with :class:`.HDF5_Path` references to
the children they contain
"""
phase = ReadPhases.read phase = ReadPhases.read
priority = 9 priority = 9
@ -340,11 +379,13 @@ class ResolveContainerGroups(HDF5Map):
@classmethod @classmethod
def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult:
"""Simple, just return a dict with references to its children""" """Simple, just return a dict with references to its children"""
depends = []
with h5py.File(src.h5f_path, 'r') as h5f: with h5py.File(src.h5f_path, 'r') as h5f:
obj = h5f.get(src.path) obj = h5f.get(src.path)
children = {} children = {}
for k, v in obj.items(): for k, v in obj.items():
children[k] = HDF5_Path(v.name) children[k] = HDF5_Path(v.name)
depends.append(HDF5_Path(v.name))
res = { res = {
'name': src.parts[-1], 'name': src.parts[-1],
@ -368,7 +409,6 @@ class CompleteDynamicTables(HDF5Map):
"""Nothing to do! already done!""" """Nothing to do! already done!"""
phase = ReadPhases.construct phase = ReadPhases.construct
priority = 1 priority = 1
exclusive = True
@classmethod @classmethod
def check(cls, src: H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: def check(cls, src: H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
if 'ResolveDynamicTable' in src.applied: if 'ResolveDynamicTable' in src.applied:
@ -386,7 +426,9 @@ class CompleteModelGroups(HDF5Map):
@classmethod @classmethod
def check(cls, src: H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: def check(cls, src: H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
if src.model is not None: if src.model is not None and \
src.source.h5_type == 'group' and \
all([depend in completed.keys() for depend in src.depends]):
return True return True
else: else:
return False return False
@ -479,7 +521,6 @@ class ReadQueue(BaseModel):
phase_maps = [m for m in HDF5Map.__subclasses__() if m.phase == phase] phase_maps = [m for m in HDF5Map.__subclasses__() if m.phase == phase]
phase_maps = sorted(phase_maps, key=lambda x: x.priority) phase_maps = sorted(phase_maps, key=lambda x: x.priority)
# if we've moved to the
results = [] results = []
@ -487,8 +528,12 @@ class ReadQueue(BaseModel):
for name, item in self.queue.items(): for name, item in self.queue.items():
for op in phase_maps: for op in phase_maps:
if op.check(item, self.provider, self.completed): if op.check(item, self.provider, self.completed):
# Formerly there was an "exclusive" property in the maps which let potentially multiple
# operations be applied per stage, except if an operation was `exclusive` which would break
# iteration over the operations. This was removed because it was badly implemented, but
# if there is ever a need to do that, then we would need to decide what to do with the
# multiple results.
results.append(op.apply(item, self.provider, self.completed)) results.append(op.apply(item, self.provider, self.completed))
if op.exclusive:
break # out of inner iteration break # out of inner iteration
# remake the source queue and save results # remake the source queue and save results
@ -505,12 +550,7 @@ class ReadQueue(BaseModel):
# if we have completed other things, delete them from the queue # if we have completed other things, delete them from the queue
completes.extend(res.completes) completes.extend(res.completes)
# for also_completed in res.completes:
# try:
# del self.queue[also_completed]
# except KeyError:
# # normal, we might have already deleted this in a previous step
# pass
else: else:
# if we didn't complete the item (eg. we found we needed more dependencies), # if we didn't complete the item (eg. we found we needed more dependencies),
# add the updated source to the queue again # add the updated source to the queue again
@ -537,7 +577,9 @@ class ReadQueue(BaseModel):
self.phases_completed.append(phase) self.phases_completed.append(phase)
if phase != ReadPhases.construct: if phase != ReadPhases.construct:
# if we're not in the last phase, move our completed to our queue # if we're not in the last phase, move our completed to our queue
self.queue = self.completed.copy() self.queue = self.completed
self.completed = {}