diff --git a/nwb_linkml/src/nwb_linkml/maps/hdf5.py b/nwb_linkml/src/nwb_linkml/maps/hdf5.py index 313d7a5..d2b72a2 100644 --- a/nwb_linkml/src/nwb_linkml/maps/hdf5.py +++ b/nwb_linkml/src/nwb_linkml/maps/hdf5.py @@ -98,6 +98,10 @@ class H5ReadResult(BaseModel): """ Problems that occurred during resolution """ + depends: List[HDF5_Path] = Field(default_factory=list) + """ + Other items that the final resolution of this item depends on + """ FlatH5 = Dict[str, H5SourceItem] @@ -105,7 +109,6 @@ FlatH5 = Dict[str, H5SourceItem] class HDF5Map(ABC): phase: ReadPhases - exclusive: bool = False """ If ``True``, if the check is fulfilled, no other maps can be applied this phase """ @@ -145,23 +148,17 @@ class PruneEmpty(HDF5Map): ) - -# class ResolveVectorData(HDF5Map): -# """ -# We will load vanilla VectorData as part of :class:`.ResolveDynamicTable` -# """ -# phase = ReadPhases.read -# -# @classmethod -# def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: -# if src.h5_type == 'group': -# return False -# if src.neurodata_type == 'VectorData': -# - - class ResolveDynamicTable(HDF5Map): - """Handle loading a dynamic table!""" + """ + Handle loading a dynamic table! + + Dynamic tables are sort of odd in that their models don't include their fields (except as a list of + strings in ``colnames`` ), so we need to create a new model that includes fields for each column, + and then we include the datasets as :class:`~.nwb_linkml.types.ndarray.NDArrayProxy` objects which + lazy load the arrays in a thread/process safe way. + + This map also resolves + """ phase = ReadPhases.read priority = 1 exclusive = True @@ -191,13 +188,7 @@ class ResolveDynamicTable(HDF5Map): obj = h5f.get(src.path) # make a populated model :) - # TODO: use a tableproxy like NDArrayProxy to not load all these into memory - if src.neurodata_type != 'DynamicTable': - #base_model = provider.get_class(src.namespace, src.neurodata_type) - base_model = None - else: - base_model = None - + base_model = provider.get_class(src.namespace, src.neurodata_type) model = dynamictable_to_model(obj, base=base_model) completes = ['/'.join([src.path, child]) for child in obj.keys()] @@ -213,9 +204,47 @@ class ResolveDynamicTable(HDF5Map): class ResolveModelGroup(HDF5Map): + """ + HDF5 Groups that have a model, as indicated by ``neurodata_type`` in their attrs. + We use the model to determine what fields we should get, and then stash references to the children to + process later as :class:`.HDF5_Path` + + **Special Case:** Some groups like ``ProcessingGroup`` and others that have an arbitrary + number of named children have a special ``children`` field that is a dictionary mapping + names to the objects themselves. + + So for example, this: + + /processing/ + eye_tracking/ + cr_ellipse_fits/ + center_x + center_y + ... + eye_ellipse_fits/ + ... + pupil_ellipse_fits/ + ... + eye_tracking_rig_metadata/ + ... + + would pack the ``eye_tracking`` group (a ``ProcessingModule`` ) as: + + { + "name": "eye_tracking", + "children": { + "cr_ellipse_fits": HDF5_Path('/processing/eye_tracking/cr_ellipse_fits'), + "eye_ellipse_fits" : HDF5_Path('/processing/eye_tracking/eye_ellipse_fits'), + ... + } + } + + We will do some nice things in the model metaclass to make it possible to access the children like + ``nwbfile.processing.cr_ellipse_fits.center_x`` rather than having to switch between indexing and + attribute access :) + """ phase = ReadPhases.read priority = 10 # do this generally last - exclusive = True @classmethod def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: @@ -226,24 +255,23 @@ class ResolveModelGroup(HDF5Map): @classmethod def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: - - model = provider.get_class(src.namespace, src.neurodata_type) res = {} + depends = [] with h5py.File(src.h5f_path, 'r') as h5f: obj = h5f.get(src.path) for key, type in model.model_fields.items(): - if key in obj.attrs: + if key == 'children': + res[key] = {name: HDF5_Path(child.name) for name, child in obj.items()} + depends.extend([HDF5_Path(child.name) for child in obj.values()]) + elif key in obj.attrs: res[key] = obj.attrs[key] continue - if key in obj.keys(): + elif key in obj.keys(): # stash a reference to this, we'll compile it at the end - if src.path == '/': - target_path = '/' + key - else: - target_path = '/'.join([src.path, key]) + depends.append(HDF5_Path(obj[key].name)) + res[key] = HDF5_Path(obj[key].name) - res[key] = HDF5_Path(target_path) res['hdf5_path'] = src.path res['name'] = src.parts[-1] @@ -255,14 +283,21 @@ class ResolveModelGroup(HDF5Map): model = model, namespace=src.namespace, neurodata_type=src.neurodata_type, - applied=['ResolveModelGroup'] + applied=['ResolveModelGroup'], + depends=depends ) class ResolveDatasetAsDict(HDF5Map): - """Mutually exclusive with :class:`.ResolveScalars`""" + """ + Resolve datasets that do not have a ``neurodata_type`` of their own as a dictionary + that will be packaged into a model in the next step. Grabs the array in an :class:`~nwb_linkml.types.ndarray.NDArrayProxy` + under an ``array`` key, and then grabs any additional ``attrs`` as well. + + Mutually exclusive with :class:`.ResolveScalars` - this only applies to datasets that are larger + than a single entry. + """ phase = ReadPhases.read priority = 11 - exclusive = True @classmethod def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: @@ -295,7 +330,6 @@ class ResolveDatasetAsDict(HDF5Map): class ResolveScalars(HDF5Map): phase = ReadPhases.read priority = 11 #catchall - exclusive = True @classmethod def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: @@ -322,6 +356,11 @@ class ResolveScalars(HDF5Map): ) class ResolveContainerGroups(HDF5Map): + """ + Groups like ``/acquisition``` and others that have no ``neurodata_type`` + (and thus no model) are returned as a dictionary with :class:`.HDF5_Path` references to + the children they contain + """ phase = ReadPhases.read priority = 9 @@ -340,11 +379,13 @@ class ResolveContainerGroups(HDF5Map): @classmethod def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: """Simple, just return a dict with references to its children""" + depends = [] with h5py.File(src.h5f_path, 'r') as h5f: obj = h5f.get(src.path) children = {} for k, v in obj.items(): children[k] = HDF5_Path(v.name) + depends.append(HDF5_Path(v.name)) res = { 'name': src.parts[-1], @@ -368,7 +409,6 @@ class CompleteDynamicTables(HDF5Map): """Nothing to do! already done!""" phase = ReadPhases.construct priority = 1 - exclusive = True @classmethod def check(cls, src: H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: if 'ResolveDynamicTable' in src.applied: @@ -386,7 +426,9 @@ class CompleteModelGroups(HDF5Map): @classmethod def check(cls, src: H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: - if src.model is not None: + if src.model is not None and \ + src.source.h5_type == 'group' and \ + all([depend in completed.keys() for depend in src.depends]): return True else: return False @@ -479,7 +521,6 @@ class ReadQueue(BaseModel): phase_maps = [m for m in HDF5Map.__subclasses__() if m.phase == phase] phase_maps = sorted(phase_maps, key=lambda x: x.priority) - # if we've moved to the results = [] @@ -487,9 +528,13 @@ class ReadQueue(BaseModel): for name, item in self.queue.items(): for op in phase_maps: if op.check(item, self.provider, self.completed): + # Formerly there was an "exclusive" property in the maps which let potentially multiple + # operations be applied per stage, except if an operation was `exclusive` which would break + # iteration over the operations. This was removed because it was badly implemented, but + # if there is ever a need to do that, then we would need to decide what to do with the + # multiple results. results.append(op.apply(item, self.provider, self.completed)) - if op.exclusive: - break # out of inner iteration + break # out of inner iteration # remake the source queue and save results completes = [] @@ -505,12 +550,7 @@ class ReadQueue(BaseModel): # if we have completed other things, delete them from the queue completes.extend(res.completes) - # for also_completed in res.completes: - # try: - # del self.queue[also_completed] - # except KeyError: - # # normal, we might have already deleted this in a previous step - # pass + else: # if we didn't complete the item (eg. we found we needed more dependencies), # add the updated source to the queue again @@ -537,7 +577,9 @@ class ReadQueue(BaseModel): self.phases_completed.append(phase) if phase != ReadPhases.construct: # if we're not in the last phase, move our completed to our queue - self.queue = self.completed.copy() + self.queue = self.completed + self.completed = {} +