From 6bbf56d1a069815a5379072ace9170f2fabc6241 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Wed, 27 Sep 2023 20:07:39 -0700 Subject: [PATCH] Make all maps exclusive Track dependencies in build results and dont try and complete model if dependencies not alreay completed (ie. need multiple passes to complete, which is the correct behavior) Use base models in dynamic tables, we should fix the dynamictable base class rather than just having everything be an anonymous BaseModel in order to satisfy type validation Clear completed queue when advancing stages - we should make sure to move everything to completed with maps rather than implicitly keeping things in completed when they aren't - so we might be getting old versions of results that aren't actually fully completed but have some reference stubs still in them. --- nwb_linkml/src/nwb_linkml/maps/hdf5.py | 142 ++++++++++++++++--------- 1 file changed, 92 insertions(+), 50 deletions(-) diff --git a/nwb_linkml/src/nwb_linkml/maps/hdf5.py b/nwb_linkml/src/nwb_linkml/maps/hdf5.py index 313d7a5..d2b72a2 100644 --- a/nwb_linkml/src/nwb_linkml/maps/hdf5.py +++ b/nwb_linkml/src/nwb_linkml/maps/hdf5.py @@ -98,6 +98,10 @@ class H5ReadResult(BaseModel): """ Problems that occurred during resolution """ + depends: List[HDF5_Path] = Field(default_factory=list) + """ + Other items that the final resolution of this item depends on + """ FlatH5 = Dict[str, H5SourceItem] @@ -105,7 +109,6 @@ FlatH5 = Dict[str, H5SourceItem] class HDF5Map(ABC): phase: ReadPhases - exclusive: bool = False """ If ``True``, if the check is fulfilled, no other maps can be applied this phase """ @@ -145,23 +148,17 @@ class PruneEmpty(HDF5Map): ) - -# class ResolveVectorData(HDF5Map): -# """ -# We will load vanilla VectorData as part of :class:`.ResolveDynamicTable` -# """ -# phase = ReadPhases.read -# -# @classmethod -# def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: -# if src.h5_type == 'group': -# return False -# if src.neurodata_type == 'VectorData': -# - - class ResolveDynamicTable(HDF5Map): - """Handle loading a dynamic table!""" + """ + Handle loading a dynamic table! + + Dynamic tables are sort of odd in that their models don't include their fields (except as a list of + strings in ``colnames`` ), so we need to create a new model that includes fields for each column, + and then we include the datasets as :class:`~.nwb_linkml.types.ndarray.NDArrayProxy` objects which + lazy load the arrays in a thread/process safe way. + + This map also resolves + """ phase = ReadPhases.read priority = 1 exclusive = True @@ -191,13 +188,7 @@ class ResolveDynamicTable(HDF5Map): obj = h5f.get(src.path) # make a populated model :) - # TODO: use a tableproxy like NDArrayProxy to not load all these into memory - if src.neurodata_type != 'DynamicTable': - #base_model = provider.get_class(src.namespace, src.neurodata_type) - base_model = None - else: - base_model = None - + base_model = provider.get_class(src.namespace, src.neurodata_type) model = dynamictable_to_model(obj, base=base_model) completes = ['/'.join([src.path, child]) for child in obj.keys()] @@ -213,9 +204,47 @@ class ResolveDynamicTable(HDF5Map): class ResolveModelGroup(HDF5Map): + """ + HDF5 Groups that have a model, as indicated by ``neurodata_type`` in their attrs. + We use the model to determine what fields we should get, and then stash references to the children to + process later as :class:`.HDF5_Path` + + **Special Case:** Some groups like ``ProcessingGroup`` and others that have an arbitrary + number of named children have a special ``children`` field that is a dictionary mapping + names to the objects themselves. + + So for example, this: + + /processing/ + eye_tracking/ + cr_ellipse_fits/ + center_x + center_y + ... + eye_ellipse_fits/ + ... + pupil_ellipse_fits/ + ... + eye_tracking_rig_metadata/ + ... + + would pack the ``eye_tracking`` group (a ``ProcessingModule`` ) as: + + { + "name": "eye_tracking", + "children": { + "cr_ellipse_fits": HDF5_Path('/processing/eye_tracking/cr_ellipse_fits'), + "eye_ellipse_fits" : HDF5_Path('/processing/eye_tracking/eye_ellipse_fits'), + ... + } + } + + We will do some nice things in the model metaclass to make it possible to access the children like + ``nwbfile.processing.cr_ellipse_fits.center_x`` rather than having to switch between indexing and + attribute access :) + """ phase = ReadPhases.read priority = 10 # do this generally last - exclusive = True @classmethod def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: @@ -226,24 +255,23 @@ class ResolveModelGroup(HDF5Map): @classmethod def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: - - model = provider.get_class(src.namespace, src.neurodata_type) res = {} + depends = [] with h5py.File(src.h5f_path, 'r') as h5f: obj = h5f.get(src.path) for key, type in model.model_fields.items(): - if key in obj.attrs: + if key == 'children': + res[key] = {name: HDF5_Path(child.name) for name, child in obj.items()} + depends.extend([HDF5_Path(child.name) for child in obj.values()]) + elif key in obj.attrs: res[key] = obj.attrs[key] continue - if key in obj.keys(): + elif key in obj.keys(): # stash a reference to this, we'll compile it at the end - if src.path == '/': - target_path = '/' + key - else: - target_path = '/'.join([src.path, key]) + depends.append(HDF5_Path(obj[key].name)) + res[key] = HDF5_Path(obj[key].name) - res[key] = HDF5_Path(target_path) res['hdf5_path'] = src.path res['name'] = src.parts[-1] @@ -255,14 +283,21 @@ class ResolveModelGroup(HDF5Map): model = model, namespace=src.namespace, neurodata_type=src.neurodata_type, - applied=['ResolveModelGroup'] + applied=['ResolveModelGroup'], + depends=depends ) class ResolveDatasetAsDict(HDF5Map): - """Mutually exclusive with :class:`.ResolveScalars`""" + """ + Resolve datasets that do not have a ``neurodata_type`` of their own as a dictionary + that will be packaged into a model in the next step. Grabs the array in an :class:`~nwb_linkml.types.ndarray.NDArrayProxy` + under an ``array`` key, and then grabs any additional ``attrs`` as well. + + Mutually exclusive with :class:`.ResolveScalars` - this only applies to datasets that are larger + than a single entry. + """ phase = ReadPhases.read priority = 11 - exclusive = True @classmethod def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: @@ -295,7 +330,6 @@ class ResolveDatasetAsDict(HDF5Map): class ResolveScalars(HDF5Map): phase = ReadPhases.read priority = 11 #catchall - exclusive = True @classmethod def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: @@ -322,6 +356,11 @@ class ResolveScalars(HDF5Map): ) class ResolveContainerGroups(HDF5Map): + """ + Groups like ``/acquisition``` and others that have no ``neurodata_type`` + (and thus no model) are returned as a dictionary with :class:`.HDF5_Path` references to + the children they contain + """ phase = ReadPhases.read priority = 9 @@ -340,11 +379,13 @@ class ResolveContainerGroups(HDF5Map): @classmethod def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: """Simple, just return a dict with references to its children""" + depends = [] with h5py.File(src.h5f_path, 'r') as h5f: obj = h5f.get(src.path) children = {} for k, v in obj.items(): children[k] = HDF5_Path(v.name) + depends.append(HDF5_Path(v.name)) res = { 'name': src.parts[-1], @@ -368,7 +409,6 @@ class CompleteDynamicTables(HDF5Map): """Nothing to do! already done!""" phase = ReadPhases.construct priority = 1 - exclusive = True @classmethod def check(cls, src: H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: if 'ResolveDynamicTable' in src.applied: @@ -386,7 +426,9 @@ class CompleteModelGroups(HDF5Map): @classmethod def check(cls, src: H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: - if src.model is not None: + if src.model is not None and \ + src.source.h5_type == 'group' and \ + all([depend in completed.keys() for depend in src.depends]): return True else: return False @@ -479,7 +521,6 @@ class ReadQueue(BaseModel): phase_maps = [m for m in HDF5Map.__subclasses__() if m.phase == phase] phase_maps = sorted(phase_maps, key=lambda x: x.priority) - # if we've moved to the results = [] @@ -487,9 +528,13 @@ class ReadQueue(BaseModel): for name, item in self.queue.items(): for op in phase_maps: if op.check(item, self.provider, self.completed): + # Formerly there was an "exclusive" property in the maps which let potentially multiple + # operations be applied per stage, except if an operation was `exclusive` which would break + # iteration over the operations. This was removed because it was badly implemented, but + # if there is ever a need to do that, then we would need to decide what to do with the + # multiple results. results.append(op.apply(item, self.provider, self.completed)) - if op.exclusive: - break # out of inner iteration + break # out of inner iteration # remake the source queue and save results completes = [] @@ -505,12 +550,7 @@ class ReadQueue(BaseModel): # if we have completed other things, delete them from the queue completes.extend(res.completes) - # for also_completed in res.completes: - # try: - # del self.queue[also_completed] - # except KeyError: - # # normal, we might have already deleted this in a previous step - # pass + else: # if we didn't complete the item (eg. we found we needed more dependencies), # add the updated source to the queue again @@ -537,7 +577,9 @@ class ReadQueue(BaseModel): self.phases_completed.append(phase) if phase != ReadPhases.construct: # if we're not in the last phase, move our completed to our queue - self.queue = self.completed.copy() + self.queue = self.completed + self.completed = {} +