From 50e816bad4b3677c0d2ae660698e0412e9e85aa0 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 25 Sep 2023 22:03:53 -0700 Subject: [PATCH] Working on finalizing the mapping operation... doing it single threaded for now and it's very slow but it completes up until the stage where we need to zip up the orphaned objects and other things that can be inferred from the model. Need to make a proxytable model like proxyarray because reading all these tables takes way too fuckin long and it's not what we want to do anyway. --- nwb_linkml/src/nwb_linkml/annotations.py | 19 + nwb_linkml/src/nwb_linkml/io/hdf5.py | 164 ++----- nwb_linkml/src/nwb_linkml/io/hdf5_scratch.py | 5 +- nwb_linkml/src/nwb_linkml/map.py | 1 + nwb_linkml/src/nwb_linkml/maps/hdf5.py | 423 +++++++++++++++++++ nwb_linkml/src/nwb_linkml/maps/hdmf.py | 101 ++++- nwb_linkml/src/nwb_linkml/types/df.py | 23 + nwb_linkml/src/nwb_linkml/types/hdf5.py | 3 + nwb_linkml/src/nwb_linkml/types/ndarray.py | 106 +++-- nwb_linkml/src/nwb_linkml/types/ndarray.pyi | 3 +- nwb_linkml/tests/fixtures.py | 5 + nwb_linkml/tests/test_io/test_io_hdf5.py | 9 +- nwb_linkml/tests/test_types/test_ndarray.py | 4 + 13 files changed, 677 insertions(+), 189 deletions(-) create mode 100644 nwb_linkml/src/nwb_linkml/annotations.py create mode 100644 nwb_linkml/src/nwb_linkml/maps/hdf5.py create mode 100644 nwb_linkml/src/nwb_linkml/types/hdf5.py diff --git a/nwb_linkml/src/nwb_linkml/annotations.py b/nwb_linkml/src/nwb_linkml/annotations.py new file mode 100644 index 0000000..b068936 --- /dev/null +++ b/nwb_linkml/src/nwb_linkml/annotations.py @@ -0,0 +1,19 @@ +""" +Utility functions for introspection on python annotations +""" +import typing + + +def unwrap_optional(annotation): + if typing.get_origin(annotation) == typing.Union: + args = typing.get_args(annotation) + + if len(args) == 2 and args[1].__name__ == 'NoneType': + annotation = args[0] + return annotation + + +def take_outer_type(annotation): + if typing.get_origin(annotation) is list: + return list + return annotation diff --git a/nwb_linkml/src/nwb_linkml/io/hdf5.py b/nwb_linkml/src/nwb_linkml/io/hdf5.py index 180e273..3aa261d 100644 --- a/nwb_linkml/src/nwb_linkml/io/hdf5.py +++ b/nwb_linkml/src/nwb_linkml/io/hdf5.py @@ -12,8 +12,6 @@ Mapping operations (mostly TODO atm) * Create new models from DynamicTables * Handle softlinks as object references and vice versa by adding a ``path`` attr - - Other TODO: * Read metadata only, don't read all arrays @@ -21,21 +19,19 @@ Other TODO: """ import pdb -import typing import warnings -from typing import Optional, List, Dict, overload, Literal, Type, Any +from typing import Optional, Dict, overload, Type from pathlib import Path from types import ModuleType -from typing import TypeVar, TYPE_CHECKING, NamedTuple -from abc import abstractmethod +from typing import TYPE_CHECKING, NamedTuple import json import subprocess import shutil import h5py -from pydantic import BaseModel, Field, ConfigDict -from dataclasses import dataclass, field +from pydantic import BaseModel +from nwb_linkml.maps.hdf5 import H5SourceItem, flatten_hdf, ReadPhases, ReadQueue from nwb_linkml.translate import generate_from_nwbfile #from nwb_linkml.models.core_nwb_file import NWBFile if TYPE_CHECKING: @@ -43,44 +39,6 @@ if TYPE_CHECKING: from nwb_linkml.providers.schema import SchemaProvider - -class H5SourceItem(BaseModel): - """Tuple of items for each element when flattening an hdf5 file""" - path: str - """Absolute hdf5 path of element""" - leaf: bool - """If ``True``, this item has no children (and thus we should start instantiating it before ascending to parent classes)""" - h5_type: Literal['group', 'dataset'] - """What kind of hdf5 element this is""" - depends: List[str] = Field(default_factory=list) - """Paths of other source items that this item depends on before it can be instantiated. eg. from softlinks""" - attrs: dict = Field(default_factory=dict) - """Any static attrs that can be had from the element""" - - model_config = ConfigDict(arbitrary_types_allowed=True) - @property - def parts(self) -> List[str]: - """path split by /""" - return self.path.split('/') - -FlatH5 = Dict[str, H5SourceItem] - -class ReadQueue(BaseModel): - """Container model to store items as they are built """ - h5f: h5py.File = Field( - description="Open hdf5 file used when resolving the queue!" - ) - queue: Dict[str,H5SourceItem] = Field( - default_factory=dict, - description="Items left to be instantiated, keyed by hdf5 path", - ) - completed: Dict[str, Any] = Field( - default_factory=dict, - description="Items that have already been instantiated, keyed by hdf5 path" - ) - model_config = ConfigDict(arbitrary_types_allowed=True) - - class HDF5IO(): def __init__(self, path:Path): @@ -110,27 +68,39 @@ class HDF5IO(): # get all children of selected item if isinstance(src, (h5py.File, h5py.Group)): - children = self._flatten_hdf(src) + children = flatten_hdf(src) else: raise NotImplementedError('directly read individual datasets') + queue = ReadQueue( + h5f=self.path, + queue=children, + provider=provider + ) + #pdb.set_trace() + # Apply initial planning phase of reading + queue.apply_phase(ReadPhases.plan) + # Now do read operations until we're finished + queue.apply_phase(ReadPhases.read) - - data = {} - for k, v in src.items(): - if isinstance(v, h5py.Group): - data[k] = H5Group(cls=v, parent=parent, root_model=parent).read() - elif isinstance(v, h5py.Dataset): - data[k] = H5Dataset(cls=v, parent=parent).read() - - if path is None: - return parent(**data) - if 'neurodata_type' in src.attrs: - raise NotImplementedError('Making a submodel not supported yet') - else: - return data + pdb.set_trace() + # + # + # data = {} + # for k, v in src.items(): + # if isinstance(v, h5py.Group): + # data[k] = H5Group(cls=v, parent=parent, root_model=parent).read() + # elif isinstance(v, h5py.Dataset): + # data[k] = H5Dataset(cls=v, parent=parent).read() + # + # if path is None: + # return parent(**data) + # if 'neurodata_type' in src.attrs: + # raise NotImplementedError('Making a submodel not supported yet') + # else: + # return data @@ -314,20 +284,6 @@ def truncate_file(source: Path, target: Optional[Path] = None, n:int=10) -> Path return target -def unwrap_optional(annotation): - if typing.get_origin(annotation) == typing.Union: - args = typing.get_args(annotation) - - if len(args) == 2 and args[1].__name__ == 'NoneType': - annotation = args[0] - return annotation - -def take_outer_type(annotation): - if typing.get_origin(annotation) is list: - return list - return annotation - - def submodel_by_path(model: BaseModel, path:str) -> Type[BaseModel | dict | list]: """ @@ -338,61 +294,3 @@ def submodel_by_path(model: BaseModel, path:str) -> Type[BaseModel | dict | list ann = model.model_fields[part].annotation -def flatten_hdf(h5f:h5py.File | h5py.Group, skip='specifications') -> Dict[str, H5SourceItem]: - """ - Flatten all child elements of hdf element into a dict of :class:`.H5SourceItem` s keyed by their path - - Args: - h5f (:class:`h5py.File` | :class:`h5py.Group`): HDF file or group to flatten! - """ - items = {} - def _itemize(name: str, obj: h5py.Dataset | h5py.Group): - if skip in name: - return - leaf = isinstance(obj, h5py.Dataset) or len(obj.keys()) == 0 - # get references in attrs and datasets - refs = [ref for ref in obj.attrs.values() if isinstance(ref, h5py.h5r.Reference)] - if isinstance(obj, h5py.Dataset): - h5_type = 'dataset' - if obj.shape == (): - if isinstance(obj[()], h5py.h5r.Reference): - refs.append(obj[()]) - elif isinstance(obj[0], h5py.h5r.Reference): - refs.extend(obj[:].tolist()) - else: - h5_type = 'group' - # dereference and get name of reference - depends = list(set([h5f[i].name for i in refs])) - if not name.startswith('/'): - name = '/' + name - items[name] = H5SourceItem.model_construct( - path = name, - leaf = leaf, - depends = depends, - h5_type=h5_type, - attrs = dict(obj.attrs.items()) - ) - - h5f.visititems(_itemize) - return items - -def sort_flat_hdf(flat: Dict[str, H5SourceItem]) -> Dict[str, H5SourceItem]: - """ - Sort flat hdf5 file in a rough order of solvability - - * First process any leaf items - - * Put any items with dependencies at the end - - Args: - flat: - - Returns: - - """ - class Rank(NamedTuple): - has_depends: bool - not_leaf: bool - - - diff --git a/nwb_linkml/src/nwb_linkml/io/hdf5_scratch.py b/nwb_linkml/src/nwb_linkml/io/hdf5_scratch.py index 5580661..afbff5b 100644 --- a/nwb_linkml/src/nwb_linkml/io/hdf5_scratch.py +++ b/nwb_linkml/src/nwb_linkml/io/hdf5_scratch.py @@ -17,10 +17,11 @@ class MyDf(DataFrame): a = MyDf(ints=[1,2,3]) -from nwb_linkml.io.hdf5 import HDF5IO, flatten_hdf +from nwb_linkml.io.hdf5 import HDF5IO import h5py from typing import NamedTuple, Tuple, Optional -from nwb_linkml.io.hdf5 import H5SourceItem, FlatH5, ReadQueue, HDF5IO +from nwb_linkml.io.hdf5 import HDF5IO +from nwb_linkml.maps.hdf5 import H5SourceItem, FlatH5, ReadQueue, flatten_hdf from nwb_linkml.providers.schema import SchemaProvider from rich import print from pydantic import BaseModel diff --git a/nwb_linkml/src/nwb_linkml/map.py b/nwb_linkml/src/nwb_linkml/map.py index 5f671c4..b125126 100644 --- a/nwb_linkml/src/nwb_linkml/map.py +++ b/nwb_linkml/src/nwb_linkml/map.py @@ -16,6 +16,7 @@ class PHASES(StrEnum): """After the YAML for a model has been loaded""" + @dataclass class Map: scope: str diff --git a/nwb_linkml/src/nwb_linkml/maps/hdf5.py b/nwb_linkml/src/nwb_linkml/maps/hdf5.py new file mode 100644 index 0000000..8a8a7f0 --- /dev/null +++ b/nwb_linkml/src/nwb_linkml/maps/hdf5.py @@ -0,0 +1,423 @@ +""" +Maps for reading and writing from HDF5 + +We have sort of diverged from the initial idea of a generalized map as in :class:`linkml.map.Map` , +so we will make our own mapping class here and re-evaluate whether they should be unified later +""" +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Literal, List, Dict, Optional + +import h5py +from enum import StrEnum + +from pydantic import BaseModel, Field, ConfigDict + +from nwb_linkml.providers.schema import SchemaProvider +from nwb_linkml.maps.hdmf import dynamictable_to_df +from nwb_linkml.types.hdf5 import HDF5_Path + + +class ReadPhases(StrEnum): + plan = 'plan' + """Before reading starts, building an index of objects to read""" + read = 'read' + """Main reading operation""" + construct = 'construct' + """After reading, casting the results of the read into their models""" + +class H5SourceItem(BaseModel): + """Tuple of items for each element when flattening an hdf5 file""" + path: str + """Absolute hdf5 path of element""" + h5f_path: str + """Path to the source hdf5 file""" + leaf: bool + """If ``True``, this item has no children (and thus we should start instantiating it before ascending to parent classes)""" + h5_type: Literal['group', 'dataset'] + """What kind of hdf5 element this is""" + depends: List[str] = Field(default_factory=list) + """Paths of other source items that this item depends on before it can be instantiated. eg. from softlinks""" + attrs: dict = Field(default_factory=dict) + """Any static attrs that can be had from the element""" + namespace: Optional[str] = None + """Optional: The namespace that the neurodata type belongs to""" + neurodata_type: Optional[str] = None + """Optional: the neurodata type for this dataset or group""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + @property + def parts(self) -> List[str]: + """path split by /""" + return self.path.split('/') + +class H5ReadResult(BaseModel): + """Result returned by each of our mapping operations""" + path: str + """absolute hdf5 path of element""" + source: H5SourceItem + """ + Source that this result is based on. + The map can modify this item, so the container should update the source + queue on each pass + """ + completed: bool = False + """ + Was this item completed by this map step? False for cases where eg. + we still have dependencies that need to be completed before this one + """ + result: Optional[BaseModel | dict | str | int | float] = None + """ + If completed, built result. A dict that can be instantiated into the model. + If completed is True and result is None, then remove this object + """ + completes: List[str] = Field(default_factory=list) + """ + If this result completes any other fields, we remove them from the build queue + """ + namespace: Optional[str] = None + """ + Optional: the namespace of the neurodata type for this object + """ + neurodata_type: Optional[str] = None + """ + Optional: The neurodata type to use for this object + """ + + +FlatH5 = Dict[str, H5SourceItem] + + +class HDF5Map(ABC): + phase: ReadPhases + exclusive: bool = False + """ + If ``True``, if the check is fulfilled, no other maps can be applied this phase + """ + priority: int = 0 + + @classmethod + @abstractmethod + def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: + """Check if this map applies to the given item to read""" + + @classmethod + @abstractmethod + def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: + """Actually apply the map!""" + + +# -------------------------------------------------- +# Planning maps +# -------------------------------------------------- +class PruneEmpty(HDF5Map): + """Remove groups with no attrs """ + phase = ReadPhases.plan + @classmethod + def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: + if src.leaf and src.h5_type == 'group': + with h5py.File(src.h5f_path, 'r') as h5f: + obj = h5f.get(src.path) + if len(obj.attrs) == 0: + return True + + @classmethod + def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: + return H5ReadResult.model_construct( + path = src.path, + source=src, + completed=True + ) + +# class ResolveVectorData(HDF5Map): +# """ +# We will load vanilla VectorData as part of :class:`.ResolveDynamicTable` +# """ +# phase = ReadPhases.read +# +# @classmethod +# def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: +# if src.h5_type == 'group': +# return False +# if src.neurodata_type == 'VectorData': +# + + +class ResolveDynamicTable(HDF5Map): + """Handle loading a dynamic table!""" + phase = ReadPhases.read + priority = 1 + exclusive = True + @classmethod + def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: + if src.h5_type == 'dataset': + return False + if 'neurodata_type' in src.attrs: + if src.attrs['neurodata_type'] == 'DynamicTable': + return True + # otherwise, see if it's a subclass + model = provider.get_class(src.attrs['namespace'], src.attrs['neurodata_type']) + # just inspect the MRO as strings rather than trying to check subclasses because + # we might replace DynamicTable in the future, and there isn't a stable DynamicTable + # class to inherit from anyway because of the whole multiple versions thing + parents = [parent.__name__ for parent in model.__mro__] + if 'DynamicTable' in parents: + return True + else: + return False + else: + return False + + @classmethod + def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: + with h5py.File(src.h5f_path, 'r') as h5f: + obj = h5f.get(src.path) + + # make a populated model :) + # TODO: use a tableproxy like NDArrayProxy to not load all these into memory + if src.neurodata_type != 'DynamicTable': + #base_model = provider.get_class(src.namespace, src.neurodata_type) + base_model = None + else: + base_model = None + + model = dynamictable_to_df(obj, base=base_model) + + completes = ['/'.join([src.path, child]) for child in obj.keys()] + + return H5ReadResult( + path=src.path, + source=src, + result=model, + completes=completes, + completed = True + ) + + +class ResolveModelGroup(HDF5Map): + phase = ReadPhases.read + priority = 10 # do this generally last + exclusive = True + + @classmethod + def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: + if 'neurodata_type' in src.attrs and src.h5_type == 'group': + return True + else: + return False + + @classmethod + def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: + model = provider.get_class(src.namespace, src.neurodata_type) + res = {} + with h5py.File(src.h5f_path, 'r') as h5f: + obj = h5f.get(src.path) + for key, type in model.model_fields.items(): + if key in obj.attrs: + res[key] = obj.attrs[key] + continue + if key in obj.keys(): + # stash a reference to this, we'll compile it at the end + res[key] = HDF5_Path('/'.join([src.path, key])) + + res['hdf5_path'] = src.path + res['name'] = src.parts[-1] + return H5ReadResult( + path=src.path, + source=src, + completed=True, + result = res, + namespace=src.namespace, + neurodata_type=src.neurodata_type + ) +# +# class ResolveModelDataset(HDF5Map): +# phase = ReadPhases.read +# priority = 10 +# exclusive = True +# +# @classmethod +# def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: +# if 'neurodata_type' in src.attrs and src.h5_type == 'dataset': +# return True +# else: +# return False +# +# def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: +# +class ResolveScalars(HDF5Map): + phase = ReadPhases.read + priority = 11 #catchall + exclusive = True + + @classmethod + def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: + if src.h5_type == 'dataset' and 'neurodata_type' not in src.attrs: + with h5py.File(src.h5f_path, 'r') as h5f: + obj = h5f.get(src.path) + if obj.shape == (): + return True + else: + return False + @classmethod + def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: + with h5py.File(src.h5f_path, 'r') as h5f: + obj = h5f.get(src.path) + res = obj[()] + return H5ReadResult( + path=src.path, + source = src, + completed=True, + result = res + ) + + + + +class ReadQueue(BaseModel): + """Container model to store items as they are built """ + h5f: Path = Field( + description=("Path to the source hdf5 file used when resolving the queue! " + "Each translation step should handle opening and closing the file, " + "rather than passing a handle around") + ) + provider: SchemaProvider = Field( + description="SchemaProvider used by each of the items in the read queue" + ) + queue: Dict[str,H5SourceItem] = Field( + default_factory=dict, + description="Items left to be instantiated, keyed by hdf5 path", + ) + completed: Dict[str, H5ReadResult] = Field( + default_factory=dict, + description="Items that have already been instantiated, keyed by hdf5 path" + ) + model_config = ConfigDict(arbitrary_types_allowed=True) + + def apply_phase(self, phase:ReadPhases): + phase_maps = [m for m in HDF5Map.__subclasses__() if m.phase == phase] + phase_maps = sorted(phase_maps, key=lambda x: x.priority) + + results = [] + + # TODO: Thread/multiprocess this + for name, item in self.queue.items(): + for op in phase_maps: + if op.check(item, self.provider, self.completed): + results.append(op.apply(item, self.provider, self.completed)) + if op.exclusive: + break # out of inner iteration + + # remake the source queue and save results + for res in results: + # remove the original item + del self.queue[res.path] + if res.completed: + # if the item has been finished and there is some result, add it to the results + if res.result is not None: + self.completed[res.path] = res + # otherwise if the item has been completed and there was no result, + # just drop it. + + # if we have completed other things, delete them from the queue + for also_completed in res.completes: + try: + del self.queue[also_completed] + except KeyError: + # normal, we might have already deleted this in a previous step + pass + else: + # if we didn't complete the item (eg. we found we needed more dependencies), + # add the updated source to the queue again + self.queue[res.path] = res.source + + + + + + + +def flatten_hdf(h5f:h5py.File | h5py.Group, skip='specifications') -> Dict[str, H5SourceItem]: + """ + Flatten all child elements of hdf element into a dict of :class:`.H5SourceItem` s keyed by their path + + Args: + h5f (:class:`h5py.File` | :class:`h5py.Group`): HDF file or group to flatten! + """ + items = {} + def _itemize(name: str, obj: h5py.Dataset | h5py.Group): + if skip in name: + return + leaf = isinstance(obj, h5py.Dataset) or len(obj.keys()) == 0 + + if isinstance(obj, h5py.Dataset): + h5_type = 'dataset' + elif isinstance(obj, h5py.Group): + h5_type = 'group' + else: + raise ValueError(f'Object must be a dataset or group! {obj}') + + # get references in attrs and datasets to populate dependencies + #depends = get_references(obj) + + #if not name.startswith('/'): + # name = '/' + name + + attrs = dict(obj.attrs.items()) + + items[name] = H5SourceItem.model_construct( + path = name, + h5f_path=h5f.file.filename, + leaf = leaf, + #depends = depends, + h5_type=h5_type, + attrs = attrs, + namespace = attrs.get('namespace', None), + neurodata_type= attrs.get('neurodata_type', None) + ) + + h5f.visititems(_itemize) + return items + + +def get_references(obj: h5py.Dataset | h5py.Group) -> List[str]: + """ + Find all hdf5 object references in a dataset or group + + Locate references in + + * Attrs + * Scalar datasets + * Single-column datasets + * Multi-column datasets + + Args: + obj (:class:`h5py.Dataset` | :class:`h5py.Group`): Object to evaluate + + Returns: + List[str]: List of paths that are referenced within this object + """ + # Find references in attrs + refs = [ref for ref in obj.attrs.values() if isinstance(ref, h5py.h5r.Reference)] + + # For datasets, apply checks depending on shape of data. + if isinstance(obj, h5py.Dataset): + if obj.shape == (): + # scalar + if isinstance(obj[()], h5py.h5r.Reference): + refs.append(obj[()]) + elif isinstance(obj[0], h5py.h5r.Reference): + # single-column + refs.extend(obj[:].tolist()) + elif len(obj.dtype) > 1: + # "compound" datasets + for name in obj.dtype.names: + if isinstance(obj[name][0], h5py.h5r.Reference): + refs.extend(obj[name].tolist()) + + # dereference and get name of reference + if isinstance(obj, h5py.Dataset): + depends = list(set([obj.parent.get(i).name for i in refs])) + else: + depends = list(set([obj.get(i).name for i in refs])) + return depends diff --git a/nwb_linkml/src/nwb_linkml/maps/hdmf.py b/nwb_linkml/src/nwb_linkml/maps/hdmf.py index fb93b49..df1e724 100644 --- a/nwb_linkml/src/nwb_linkml/maps/hdmf.py +++ b/nwb_linkml/src/nwb_linkml/maps/hdmf.py @@ -2,55 +2,89 @@ Mapping functions for handling HDMF classes like DynamicTables """ import pdb -from typing import List, Type, Optional +from typing import List, Type, Optional, Any import ast from nwb_linkml.types import DataFrame import h5py -from pydantic import create_model +from pydantic import create_model, BaseModel from nwb_linkml.maps import dtype import numpy as np +from nwb_linkml.types.hdf5 import HDF5_Path +from nwb_linkml.types.ndarray import NDArray -def model_from_dynamictable(group:h5py.Group) -> Type[DataFrame]: +def model_from_dynamictable(group:h5py.Group, base:Optional[BaseModel] = None) -> Type[DataFrame]: colnames = group.attrs['colnames'] types = {} for col in colnames: + idxname = col + '_index' + if idxname in group.keys(): + idx = group.get(idxname)[0] + dset = group.get(col) + item = dset[idx] + else: + dset = group.get(col) + item = dset[0] # read the first entry to see what we got - dset = group.get(col) - item = dset[0] + if isinstance(item, bytes): item = item.decode('utf-8') if isinstance(item, str): # try to see if this is actually a list or smth encoded as a string try: item = ast.literal_eval(item) - except ValueError: + except (ValueError, SyntaxError): pass type_ = type(item) type_ = dtype.np_to_python.get(type_, type_) + if type_ is h5py.h5r.Reference: + type_ = HDF5_Path + elif type_ is np.ndarray: + type_ = NDArray + if type_ is not np.void: # FIXME: handling nested column types that appear only in some versions? types[col] = (List[type_ | None], ...) - model = create_model(group.name.split('/')[-1], **types, __base__=DataFrame) + if base is None: + base = DataFrame + else: + base = (DataFrame, base) + + + model = create_model(group.name.split('/')[-1], **types, __base__=base) return model -def dynamictable_to_df(group:h5py.Group, model:Optional[Type[DataFrame]]=None) -> DataFrame: +def dynamictable_to_df(group:h5py.Group, + model:Optional[Type[DataFrame]]=None, + base:Optional[BaseModel] = None) -> DataFrame: if model is None: - model = model_from_dynamictable(group) + model = model_from_dynamictable(group, base) items = {} - for col in model.model_fields.keys(): - data = group.get(col)[:] + for col, col_type in model.model_fields.items(): + if col not in group.keys(): + continue + idxname = col + '_index' + if idxname in group.keys(): + idx = group.get(idxname)[:] + data = group.get(col)[idx-1] + else: + data = group.get(col)[:] + + # Handle typing inside of list if isinstance(data[0], bytes): data = data.astype('unicode') if isinstance(data[0], str): + # lists and other compound data types can get flattened out to strings when stored + # so we try and literal eval and recover them try: eval_type = type(ast.literal_eval(data[0])) - except ValueError: + except (ValueError, SyntaxError): eval_type = str + # if we've found one of those, get the data type within it. if eval_type is not str: eval_list = [] for item in data.tolist(): @@ -58,12 +92,45 @@ def dynamictable_to_df(group:h5py.Group, model:Optional[Type[DataFrame]]=None) - eval_list.append(ast.literal_eval(item)) except ValueError: eval_list.append(None) - items[col] = eval_list - continue + data = eval_list + elif isinstance(data[0], h5py.h5r.Reference): + data = [HDF5_Path(group[d].name) for d in data] + elif isinstance(data[0], tuple) and any([isinstance(d, h5py.h5r.Reference) for d in data[0]]): + # references stored inside a tuple, reference + location. + # dereference them!? + dset = group.get(col) + names = dset.dtype.names + if names is not None and names[0] == 'idx_start' and names[1] == 'count': + data = dereference_reference_vector(dset, data) - items[col] = data.tolist() + else: + data = data.tolist() - pdb.set_trace() - return model(**items) + # After list, check if we need to put this thing inside of + # another class, as indicated by the enclosing model + + items[col] = data + + return model(hdf5_path = group.name, + name = group.name.split('/')[-1], + **items) + + +def dereference_reference_vector(dset: h5py.Dataset, data:Optional[List[Any]]) -> List: + """ + Given a compound dataset with indices, counts, and object references, dereference to values + + Data is of the form + (idx_start, count, target) + """ + # assume all these references are to the same target + # and the index is in the 3rd position + if data is None: + data = dset[:] + + target = dset.parent.get(data[0][-1]) + res = [target[d[0]:d[0]+d[1]] for d in data] + return res + diff --git a/nwb_linkml/src/nwb_linkml/types/df.py b/nwb_linkml/src/nwb_linkml/types/df.py index 54d4869..b89530e 100644 --- a/nwb_linkml/src/nwb_linkml/types/df.py +++ b/nwb_linkml/src/nwb_linkml/types/df.py @@ -5,6 +5,7 @@ import pdb from typing import List, Any, get_origin, get_args, Union, Optional, Dict from types import NoneType +import h5py import numpy as np import pandas as pd from pydantic import ( @@ -115,3 +116,25 @@ class DataFrame(BaseModel, pd.DataFrame): for k, v in out.items() } return nxt(self.__class__(**out)) + + +class DataFrameProxy(DataFrame): + def __init__(self, hdf5_file: str, hdf5_path:str, **kwargs): + # pdb.set_trace() + super().__init__(**kwargs) + + self.hdf5_file = hdf5_file + self.hdf5_path = hdf5_path + + + def _load(self): + cols = {} + with h5py.File(self.hdf5_file, 'r') as h5f: + obj = h5f.get(self.hdf5_path) + for col in self.model_fields.keys(): + if col in obj.keys(): + setattr(self, col, obj[col][:]) + + def __getitem__(self, item:str): + pass + diff --git a/nwb_linkml/src/nwb_linkml/types/hdf5.py b/nwb_linkml/src/nwb_linkml/types/hdf5.py new file mode 100644 index 0000000..1aeb4b6 --- /dev/null +++ b/nwb_linkml/src/nwb_linkml/types/hdf5.py @@ -0,0 +1,3 @@ +from typing import Annotated + +HDF5_Path = Annotated[str, """Trivial subclass of string to indicate that it is a reference to a location within an HDF5 file"""] \ No newline at end of file diff --git a/nwb_linkml/src/nwb_linkml/types/ndarray.py b/nwb_linkml/src/nwb_linkml/types/ndarray.py index 63ae36d..93525ad 100644 --- a/nwb_linkml/src/nwb_linkml/types/ndarray.py +++ b/nwb_linkml/src/nwb_linkml/types/ndarray.py @@ -4,6 +4,7 @@ Extension of nptyping NDArray for pydantic that allows for JSON-Schema serializa * Order to store data in (row first) """ import pdb +from pathlib import Path from typing import ( Any, Callable, @@ -22,6 +23,7 @@ from pydantic import ( from pydantic.json_schema import JsonSchemaValue import numpy as np +import h5py from nptyping import NDArray as _NDArray from nptyping.ndarray import NDArrayMeta @@ -49,55 +51,66 @@ class NDArray(_NDArray): np_to_python[dtype]) def validate_dtype(value: np.ndarray) -> np.ndarray: + if dtype is Any: + return value assert value.dtype == dtype, f"Invalid dtype! expected {dtype}, got {value.dtype}" return value def validate_array(value: Any) -> np.ndarray: assert cls.__instancecheck__(value), f'Invalid shape! expected shape {shape.prepared_args}, got shape {value.shape}' return value + def coerce_list(value: Any) -> np.ndarray: + if isinstance(value, list): + value = np.array(value) + return value + # get the names of the shape constraints, if any - shape_parts = shape.__args__[0].split(',') - split_parts = [p.split(' ')[1] if len(p.split(' ')) == 2 else None for p in shape_parts] + if shape is Any: + list_schema = core_schema.list_schema(core_schema.any_schema()) + else: + shape_parts = shape.__args__[0].split(',') + split_parts = [p.split(' ')[1] if len(p.split(' ')) == 2 else None for p in shape_parts] - # Construct a list of list schema - # go in reverse order - construct list schemas such that - # the final schema is the one that checks the first dimension - shape_labels = reversed(split_parts) - shape_args = reversed(shape.prepared_args) - list_schema = None - for arg, label in zip(shape_args, shape_labels): - # which handler to use? for the first we use the actual type - # handler, everywhere else we use the prior list handler - if list_schema is None: - inner_schema = array_type_handler - else: - inner_schema = list_schema + # Construct a list of list schema + # go in reverse order - construct list schemas such that + # the final schema is the one that checks the first dimension + shape_labels = reversed(split_parts) + shape_args = reversed(shape.prepared_args) + list_schema = None + for arg, label in zip(shape_args, shape_labels): + # which handler to use? for the first we use the actual type + # handler, everywhere else we use the prior list handler + if list_schema is None: + inner_schema = array_type_handler + else: + inner_schema = list_schema - # make a label annotation, if we have one - if label is not None: - metadata = {'name': label} - else: - metadata = None + # make a label annotation, if we have one + if label is not None: + metadata = {'name': label} + else: + metadata = None - # make the current level list schema, accounting for shape - if arg == '*': - list_schema = core_schema.list_schema(inner_schema, - metadata=metadata) - else: - arg = int(arg) - list_schema = core_schema.list_schema( - inner_schema, - min_length=arg, - max_length=arg, - metadata=metadata - ) + # make the current level list schema, accounting for shape + if arg == '*': + list_schema = core_schema.list_schema(inner_schema, + metadata=metadata) + else: + arg = int(arg) + list_schema = core_schema.list_schema( + inner_schema, + min_length=arg, + max_length=arg, + metadata=metadata + ) return core_schema.json_or_python_schema( json_schema=list_schema, python_schema=core_schema.chain_schema( [ + core_schema.no_info_plain_validator_function(coerce_list), core_schema.is_instance_schema(np.ndarray), core_schema.no_info_plain_validator_function(validate_dtype), core_schema.no_info_plain_validator_function(validate_array) @@ -107,4 +120,31 @@ class NDArray(_NDArray): lambda instance: instance.tolist(), when_used='json' ) - ) \ No newline at end of file + ) + + +class NDArrayProxy(): + """ + Thin proxy to numpy arrays stored within hdf5 files, + only read into memory when accessed, but otherwise + passthrough all attempts to access attributes. + """ + def __init__(self, h5f_file: Path, path: str): + """ + Args: + h5f_file (:class:`pathlib.Path`): Path to source HDF5 file + path (str): Location within HDF5 file where this array is located + """ + self.h5f_file = h5f_file + self.path = path + + def __getattr__(self, item): + with h5py.File(self.h5f_file, 'r') as h5f: + obj = h5f.get(self.path) + return getattr(obj, item) + def __getitem__(self, slice) -> np.ndarray: + with h5py.File(self.h5f_file, 'r') as h5f: + obj = h5f.get(self.path) + return obj[slice] + def __setitem__(self, slice, value): + raise NotImplementedError(f"Cant write into an arrayproxy yet!") diff --git a/nwb_linkml/src/nwb_linkml/types/ndarray.pyi b/nwb_linkml/src/nwb_linkml/types/ndarray.pyi index 1aa5dd7..a90b8d3 100644 --- a/nwb_linkml/src/nwb_linkml/types/ndarray.pyi +++ b/nwb_linkml/src/nwb_linkml/types/ndarray.pyi @@ -1,3 +1,4 @@ import numpy as np -NDArray = np.ndarray \ No newline at end of file +NDArray = np.ndarray +NDArrayProxy = np.ndarray \ No newline at end of file diff --git a/nwb_linkml/tests/fixtures.py b/nwb_linkml/tests/fixtures.py index bb33a90..ca5e605 100644 --- a/nwb_linkml/tests/fixtures.py +++ b/nwb_linkml/tests/fixtures.py @@ -27,3 +27,8 @@ def nwb_core_fixture() -> NamespacesAdapter: nwb_core.populate_imports() return nwb_core + +@pytest.fixture(scope="session") +def data_dir() -> Path: + path = Path(__file__).parent.resolve() / 'data' + return path diff --git a/nwb_linkml/tests/test_io/test_io_hdf5.py b/nwb_linkml/tests/test_io/test_io_hdf5.py index 6cd54f9..f8fa306 100644 --- a/nwb_linkml/tests/test_io/test_io_hdf5.py +++ b/nwb_linkml/tests/test_io/test_io_hdf5.py @@ -9,13 +9,14 @@ from ..fixtures import tmp_output_dir, set_config_vars from nwb_linkml.io.hdf5 import HDF5IO from nwb_linkml.io.hdf5 import truncate_file + @pytest.mark.skip() def test_hdf_read(): NWBFILE = Path('/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773.nwb') if not NWBFILE.exists(): return io = HDF5IO(path=NWBFILE) - model = io.read('acquisition') + model = io.read() pdb.set_trace() @@ -76,10 +77,12 @@ def test_truncate_file(tmp_output_dir): assert target_h5f[target_h5f['link']['child'].attrs['reference_contig']].name == target_h5f['data']['dataset_contig'].name assert target_h5f[target_h5f['link']['child'].attrs['reference_chunked']].name == target_h5f['data']['dataset_chunked'].name assert target_h5f['data']['dataset_contig'].attrs['anattr'] == 1 + @pytest.mark.skip() def test_flatten_hdf(): - from nwb_linkml.io.hdf5 import HDF5IO, flatten_hdf - path = '/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773_probe-769322820_ecephys.nwb' + from nwb_linkml.io.hdf5 import HDF5IO + from nwb_linkml.maps.hdf5 import flatten_hdf + path = '/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773.nwb' import h5py h5f = h5py.File(path) flat = flatten_hdf(h5f) diff --git a/nwb_linkml/tests/test_types/test_ndarray.py b/nwb_linkml/tests/test_types/test_ndarray.py index 14f26b8..d7d0506 100644 --- a/nwb_linkml/tests/test_types/test_ndarray.py +++ b/nwb_linkml/tests/test_types/test_ndarray.py @@ -9,6 +9,7 @@ from pydantic import BaseModel, ValidationError, Field from nwb_linkml.types.ndarray import NDArray from nptyping import Shape, Number +from ..fixtures import data_dir def test_ndarray_type(): class Model(BaseModel): @@ -54,3 +55,6 @@ def test_ndarray_union(): with pytest.raises(ValidationError): instance = Model(array=np.random.random((5,10,4,6))) +@pytest.mark.skip() +def test_ndarray_proxy(data_dir): + h5f_source = data_dir / 'aibs_ecephys.nwb'