diff --git a/nwb_linkml/src/nwb_linkml/annotations.py b/nwb_linkml/src/nwb_linkml/annotations.py new file mode 100644 index 0000000..b068936 --- /dev/null +++ b/nwb_linkml/src/nwb_linkml/annotations.py @@ -0,0 +1,19 @@ +""" +Utility functions for introspection on python annotations +""" +import typing + + +def unwrap_optional(annotation): + if typing.get_origin(annotation) == typing.Union: + args = typing.get_args(annotation) + + if len(args) == 2 and args[1].__name__ == 'NoneType': + annotation = args[0] + return annotation + + +def take_outer_type(annotation): + if typing.get_origin(annotation) is list: + return list + return annotation diff --git a/nwb_linkml/src/nwb_linkml/io/hdf5.py b/nwb_linkml/src/nwb_linkml/io/hdf5.py index 180e273..3aa261d 100644 --- a/nwb_linkml/src/nwb_linkml/io/hdf5.py +++ b/nwb_linkml/src/nwb_linkml/io/hdf5.py @@ -12,8 +12,6 @@ Mapping operations (mostly TODO atm) * Create new models from DynamicTables * Handle softlinks as object references and vice versa by adding a ``path`` attr - - Other TODO: * Read metadata only, don't read all arrays @@ -21,21 +19,19 @@ Other TODO: """ import pdb -import typing import warnings -from typing import Optional, List, Dict, overload, Literal, Type, Any +from typing import Optional, Dict, overload, Type from pathlib import Path from types import ModuleType -from typing import TypeVar, TYPE_CHECKING, NamedTuple -from abc import abstractmethod +from typing import TYPE_CHECKING, NamedTuple import json import subprocess import shutil import h5py -from pydantic import BaseModel, Field, ConfigDict -from dataclasses import dataclass, field +from pydantic import BaseModel +from nwb_linkml.maps.hdf5 import H5SourceItem, flatten_hdf, ReadPhases, ReadQueue from nwb_linkml.translate import generate_from_nwbfile #from nwb_linkml.models.core_nwb_file import NWBFile if TYPE_CHECKING: @@ -43,44 +39,6 @@ if TYPE_CHECKING: from nwb_linkml.providers.schema import SchemaProvider - -class H5SourceItem(BaseModel): - """Tuple of items for each element when flattening an hdf5 file""" - path: str - """Absolute hdf5 path of element""" - leaf: bool - """If ``True``, this item has no children (and thus we should start instantiating it before ascending to parent classes)""" - h5_type: Literal['group', 'dataset'] - """What kind of hdf5 element this is""" - depends: List[str] = Field(default_factory=list) - """Paths of other source items that this item depends on before it can be instantiated. eg. from softlinks""" - attrs: dict = Field(default_factory=dict) - """Any static attrs that can be had from the element""" - - model_config = ConfigDict(arbitrary_types_allowed=True) - @property - def parts(self) -> List[str]: - """path split by /""" - return self.path.split('/') - -FlatH5 = Dict[str, H5SourceItem] - -class ReadQueue(BaseModel): - """Container model to store items as they are built """ - h5f: h5py.File = Field( - description="Open hdf5 file used when resolving the queue!" - ) - queue: Dict[str,H5SourceItem] = Field( - default_factory=dict, - description="Items left to be instantiated, keyed by hdf5 path", - ) - completed: Dict[str, Any] = Field( - default_factory=dict, - description="Items that have already been instantiated, keyed by hdf5 path" - ) - model_config = ConfigDict(arbitrary_types_allowed=True) - - class HDF5IO(): def __init__(self, path:Path): @@ -110,27 +68,39 @@ class HDF5IO(): # get all children of selected item if isinstance(src, (h5py.File, h5py.Group)): - children = self._flatten_hdf(src) + children = flatten_hdf(src) else: raise NotImplementedError('directly read individual datasets') + queue = ReadQueue( + h5f=self.path, + queue=children, + provider=provider + ) + #pdb.set_trace() + # Apply initial planning phase of reading + queue.apply_phase(ReadPhases.plan) + # Now do read operations until we're finished + queue.apply_phase(ReadPhases.read) - - data = {} - for k, v in src.items(): - if isinstance(v, h5py.Group): - data[k] = H5Group(cls=v, parent=parent, root_model=parent).read() - elif isinstance(v, h5py.Dataset): - data[k] = H5Dataset(cls=v, parent=parent).read() - - if path is None: - return parent(**data) - if 'neurodata_type' in src.attrs: - raise NotImplementedError('Making a submodel not supported yet') - else: - return data + pdb.set_trace() + # + # + # data = {} + # for k, v in src.items(): + # if isinstance(v, h5py.Group): + # data[k] = H5Group(cls=v, parent=parent, root_model=parent).read() + # elif isinstance(v, h5py.Dataset): + # data[k] = H5Dataset(cls=v, parent=parent).read() + # + # if path is None: + # return parent(**data) + # if 'neurodata_type' in src.attrs: + # raise NotImplementedError('Making a submodel not supported yet') + # else: + # return data @@ -314,20 +284,6 @@ def truncate_file(source: Path, target: Optional[Path] = None, n:int=10) -> Path return target -def unwrap_optional(annotation): - if typing.get_origin(annotation) == typing.Union: - args = typing.get_args(annotation) - - if len(args) == 2 and args[1].__name__ == 'NoneType': - annotation = args[0] - return annotation - -def take_outer_type(annotation): - if typing.get_origin(annotation) is list: - return list - return annotation - - def submodel_by_path(model: BaseModel, path:str) -> Type[BaseModel | dict | list]: """ @@ -338,61 +294,3 @@ def submodel_by_path(model: BaseModel, path:str) -> Type[BaseModel | dict | list ann = model.model_fields[part].annotation -def flatten_hdf(h5f:h5py.File | h5py.Group, skip='specifications') -> Dict[str, H5SourceItem]: - """ - Flatten all child elements of hdf element into a dict of :class:`.H5SourceItem` s keyed by their path - - Args: - h5f (:class:`h5py.File` | :class:`h5py.Group`): HDF file or group to flatten! - """ - items = {} - def _itemize(name: str, obj: h5py.Dataset | h5py.Group): - if skip in name: - return - leaf = isinstance(obj, h5py.Dataset) or len(obj.keys()) == 0 - # get references in attrs and datasets - refs = [ref for ref in obj.attrs.values() if isinstance(ref, h5py.h5r.Reference)] - if isinstance(obj, h5py.Dataset): - h5_type = 'dataset' - if obj.shape == (): - if isinstance(obj[()], h5py.h5r.Reference): - refs.append(obj[()]) - elif isinstance(obj[0], h5py.h5r.Reference): - refs.extend(obj[:].tolist()) - else: - h5_type = 'group' - # dereference and get name of reference - depends = list(set([h5f[i].name for i in refs])) - if not name.startswith('/'): - name = '/' + name - items[name] = H5SourceItem.model_construct( - path = name, - leaf = leaf, - depends = depends, - h5_type=h5_type, - attrs = dict(obj.attrs.items()) - ) - - h5f.visititems(_itemize) - return items - -def sort_flat_hdf(flat: Dict[str, H5SourceItem]) -> Dict[str, H5SourceItem]: - """ - Sort flat hdf5 file in a rough order of solvability - - * First process any leaf items - - * Put any items with dependencies at the end - - Args: - flat: - - Returns: - - """ - class Rank(NamedTuple): - has_depends: bool - not_leaf: bool - - - diff --git a/nwb_linkml/src/nwb_linkml/io/hdf5_scratch.py b/nwb_linkml/src/nwb_linkml/io/hdf5_scratch.py index 5580661..afbff5b 100644 --- a/nwb_linkml/src/nwb_linkml/io/hdf5_scratch.py +++ b/nwb_linkml/src/nwb_linkml/io/hdf5_scratch.py @@ -17,10 +17,11 @@ class MyDf(DataFrame): a = MyDf(ints=[1,2,3]) -from nwb_linkml.io.hdf5 import HDF5IO, flatten_hdf +from nwb_linkml.io.hdf5 import HDF5IO import h5py from typing import NamedTuple, Tuple, Optional -from nwb_linkml.io.hdf5 import H5SourceItem, FlatH5, ReadQueue, HDF5IO +from nwb_linkml.io.hdf5 import HDF5IO +from nwb_linkml.maps.hdf5 import H5SourceItem, FlatH5, ReadQueue, flatten_hdf from nwb_linkml.providers.schema import SchemaProvider from rich import print from pydantic import BaseModel diff --git a/nwb_linkml/src/nwb_linkml/map.py b/nwb_linkml/src/nwb_linkml/map.py index 5f671c4..b125126 100644 --- a/nwb_linkml/src/nwb_linkml/map.py +++ b/nwb_linkml/src/nwb_linkml/map.py @@ -16,6 +16,7 @@ class PHASES(StrEnum): """After the YAML for a model has been loaded""" + @dataclass class Map: scope: str diff --git a/nwb_linkml/src/nwb_linkml/maps/hdf5.py b/nwb_linkml/src/nwb_linkml/maps/hdf5.py new file mode 100644 index 0000000..8a8a7f0 --- /dev/null +++ b/nwb_linkml/src/nwb_linkml/maps/hdf5.py @@ -0,0 +1,423 @@ +""" +Maps for reading and writing from HDF5 + +We have sort of diverged from the initial idea of a generalized map as in :class:`linkml.map.Map` , +so we will make our own mapping class here and re-evaluate whether they should be unified later +""" +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Literal, List, Dict, Optional + +import h5py +from enum import StrEnum + +from pydantic import BaseModel, Field, ConfigDict + +from nwb_linkml.providers.schema import SchemaProvider +from nwb_linkml.maps.hdmf import dynamictable_to_df +from nwb_linkml.types.hdf5 import HDF5_Path + + +class ReadPhases(StrEnum): + plan = 'plan' + """Before reading starts, building an index of objects to read""" + read = 'read' + """Main reading operation""" + construct = 'construct' + """After reading, casting the results of the read into their models""" + +class H5SourceItem(BaseModel): + """Tuple of items for each element when flattening an hdf5 file""" + path: str + """Absolute hdf5 path of element""" + h5f_path: str + """Path to the source hdf5 file""" + leaf: bool + """If ``True``, this item has no children (and thus we should start instantiating it before ascending to parent classes)""" + h5_type: Literal['group', 'dataset'] + """What kind of hdf5 element this is""" + depends: List[str] = Field(default_factory=list) + """Paths of other source items that this item depends on before it can be instantiated. eg. from softlinks""" + attrs: dict = Field(default_factory=dict) + """Any static attrs that can be had from the element""" + namespace: Optional[str] = None + """Optional: The namespace that the neurodata type belongs to""" + neurodata_type: Optional[str] = None + """Optional: the neurodata type for this dataset or group""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + @property + def parts(self) -> List[str]: + """path split by /""" + return self.path.split('/') + +class H5ReadResult(BaseModel): + """Result returned by each of our mapping operations""" + path: str + """absolute hdf5 path of element""" + source: H5SourceItem + """ + Source that this result is based on. + The map can modify this item, so the container should update the source + queue on each pass + """ + completed: bool = False + """ + Was this item completed by this map step? False for cases where eg. + we still have dependencies that need to be completed before this one + """ + result: Optional[BaseModel | dict | str | int | float] = None + """ + If completed, built result. A dict that can be instantiated into the model. + If completed is True and result is None, then remove this object + """ + completes: List[str] = Field(default_factory=list) + """ + If this result completes any other fields, we remove them from the build queue + """ + namespace: Optional[str] = None + """ + Optional: the namespace of the neurodata type for this object + """ + neurodata_type: Optional[str] = None + """ + Optional: The neurodata type to use for this object + """ + + +FlatH5 = Dict[str, H5SourceItem] + + +class HDF5Map(ABC): + phase: ReadPhases + exclusive: bool = False + """ + If ``True``, if the check is fulfilled, no other maps can be applied this phase + """ + priority: int = 0 + + @classmethod + @abstractmethod + def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: + """Check if this map applies to the given item to read""" + + @classmethod + @abstractmethod + def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: + """Actually apply the map!""" + + +# -------------------------------------------------- +# Planning maps +# -------------------------------------------------- +class PruneEmpty(HDF5Map): + """Remove groups with no attrs """ + phase = ReadPhases.plan + @classmethod + def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: + if src.leaf and src.h5_type == 'group': + with h5py.File(src.h5f_path, 'r') as h5f: + obj = h5f.get(src.path) + if len(obj.attrs) == 0: + return True + + @classmethod + def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: + return H5ReadResult.model_construct( + path = src.path, + source=src, + completed=True + ) + +# class ResolveVectorData(HDF5Map): +# """ +# We will load vanilla VectorData as part of :class:`.ResolveDynamicTable` +# """ +# phase = ReadPhases.read +# +# @classmethod +# def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: +# if src.h5_type == 'group': +# return False +# if src.neurodata_type == 'VectorData': +# + + +class ResolveDynamicTable(HDF5Map): + """Handle loading a dynamic table!""" + phase = ReadPhases.read + priority = 1 + exclusive = True + @classmethod + def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: + if src.h5_type == 'dataset': + return False + if 'neurodata_type' in src.attrs: + if src.attrs['neurodata_type'] == 'DynamicTable': + return True + # otherwise, see if it's a subclass + model = provider.get_class(src.attrs['namespace'], src.attrs['neurodata_type']) + # just inspect the MRO as strings rather than trying to check subclasses because + # we might replace DynamicTable in the future, and there isn't a stable DynamicTable + # class to inherit from anyway because of the whole multiple versions thing + parents = [parent.__name__ for parent in model.__mro__] + if 'DynamicTable' in parents: + return True + else: + return False + else: + return False + + @classmethod + def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: + with h5py.File(src.h5f_path, 'r') as h5f: + obj = h5f.get(src.path) + + # make a populated model :) + # TODO: use a tableproxy like NDArrayProxy to not load all these into memory + if src.neurodata_type != 'DynamicTable': + #base_model = provider.get_class(src.namespace, src.neurodata_type) + base_model = None + else: + base_model = None + + model = dynamictable_to_df(obj, base=base_model) + + completes = ['/'.join([src.path, child]) for child in obj.keys()] + + return H5ReadResult( + path=src.path, + source=src, + result=model, + completes=completes, + completed = True + ) + + +class ResolveModelGroup(HDF5Map): + phase = ReadPhases.read + priority = 10 # do this generally last + exclusive = True + + @classmethod + def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: + if 'neurodata_type' in src.attrs and src.h5_type == 'group': + return True + else: + return False + + @classmethod + def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: + model = provider.get_class(src.namespace, src.neurodata_type) + res = {} + with h5py.File(src.h5f_path, 'r') as h5f: + obj = h5f.get(src.path) + for key, type in model.model_fields.items(): + if key in obj.attrs: + res[key] = obj.attrs[key] + continue + if key in obj.keys(): + # stash a reference to this, we'll compile it at the end + res[key] = HDF5_Path('/'.join([src.path, key])) + + res['hdf5_path'] = src.path + res['name'] = src.parts[-1] + return H5ReadResult( + path=src.path, + source=src, + completed=True, + result = res, + namespace=src.namespace, + neurodata_type=src.neurodata_type + ) +# +# class ResolveModelDataset(HDF5Map): +# phase = ReadPhases.read +# priority = 10 +# exclusive = True +# +# @classmethod +# def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: +# if 'neurodata_type' in src.attrs and src.h5_type == 'dataset': +# return True +# else: +# return False +# +# def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: +# +class ResolveScalars(HDF5Map): + phase = ReadPhases.read + priority = 11 #catchall + exclusive = True + + @classmethod + def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: + if src.h5_type == 'dataset' and 'neurodata_type' not in src.attrs: + with h5py.File(src.h5f_path, 'r') as h5f: + obj = h5f.get(src.path) + if obj.shape == (): + return True + else: + return False + @classmethod + def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult: + with h5py.File(src.h5f_path, 'r') as h5f: + obj = h5f.get(src.path) + res = obj[()] + return H5ReadResult( + path=src.path, + source = src, + completed=True, + result = res + ) + + + + +class ReadQueue(BaseModel): + """Container model to store items as they are built """ + h5f: Path = Field( + description=("Path to the source hdf5 file used when resolving the queue! " + "Each translation step should handle opening and closing the file, " + "rather than passing a handle around") + ) + provider: SchemaProvider = Field( + description="SchemaProvider used by each of the items in the read queue" + ) + queue: Dict[str,H5SourceItem] = Field( + default_factory=dict, + description="Items left to be instantiated, keyed by hdf5 path", + ) + completed: Dict[str, H5ReadResult] = Field( + default_factory=dict, + description="Items that have already been instantiated, keyed by hdf5 path" + ) + model_config = ConfigDict(arbitrary_types_allowed=True) + + def apply_phase(self, phase:ReadPhases): + phase_maps = [m for m in HDF5Map.__subclasses__() if m.phase == phase] + phase_maps = sorted(phase_maps, key=lambda x: x.priority) + + results = [] + + # TODO: Thread/multiprocess this + for name, item in self.queue.items(): + for op in phase_maps: + if op.check(item, self.provider, self.completed): + results.append(op.apply(item, self.provider, self.completed)) + if op.exclusive: + break # out of inner iteration + + # remake the source queue and save results + for res in results: + # remove the original item + del self.queue[res.path] + if res.completed: + # if the item has been finished and there is some result, add it to the results + if res.result is not None: + self.completed[res.path] = res + # otherwise if the item has been completed and there was no result, + # just drop it. + + # if we have completed other things, delete them from the queue + for also_completed in res.completes: + try: + del self.queue[also_completed] + except KeyError: + # normal, we might have already deleted this in a previous step + pass + else: + # if we didn't complete the item (eg. we found we needed more dependencies), + # add the updated source to the queue again + self.queue[res.path] = res.source + + + + + + + +def flatten_hdf(h5f:h5py.File | h5py.Group, skip='specifications') -> Dict[str, H5SourceItem]: + """ + Flatten all child elements of hdf element into a dict of :class:`.H5SourceItem` s keyed by their path + + Args: + h5f (:class:`h5py.File` | :class:`h5py.Group`): HDF file or group to flatten! + """ + items = {} + def _itemize(name: str, obj: h5py.Dataset | h5py.Group): + if skip in name: + return + leaf = isinstance(obj, h5py.Dataset) or len(obj.keys()) == 0 + + if isinstance(obj, h5py.Dataset): + h5_type = 'dataset' + elif isinstance(obj, h5py.Group): + h5_type = 'group' + else: + raise ValueError(f'Object must be a dataset or group! {obj}') + + # get references in attrs and datasets to populate dependencies + #depends = get_references(obj) + + #if not name.startswith('/'): + # name = '/' + name + + attrs = dict(obj.attrs.items()) + + items[name] = H5SourceItem.model_construct( + path = name, + h5f_path=h5f.file.filename, + leaf = leaf, + #depends = depends, + h5_type=h5_type, + attrs = attrs, + namespace = attrs.get('namespace', None), + neurodata_type= attrs.get('neurodata_type', None) + ) + + h5f.visititems(_itemize) + return items + + +def get_references(obj: h5py.Dataset | h5py.Group) -> List[str]: + """ + Find all hdf5 object references in a dataset or group + + Locate references in + + * Attrs + * Scalar datasets + * Single-column datasets + * Multi-column datasets + + Args: + obj (:class:`h5py.Dataset` | :class:`h5py.Group`): Object to evaluate + + Returns: + List[str]: List of paths that are referenced within this object + """ + # Find references in attrs + refs = [ref for ref in obj.attrs.values() if isinstance(ref, h5py.h5r.Reference)] + + # For datasets, apply checks depending on shape of data. + if isinstance(obj, h5py.Dataset): + if obj.shape == (): + # scalar + if isinstance(obj[()], h5py.h5r.Reference): + refs.append(obj[()]) + elif isinstance(obj[0], h5py.h5r.Reference): + # single-column + refs.extend(obj[:].tolist()) + elif len(obj.dtype) > 1: + # "compound" datasets + for name in obj.dtype.names: + if isinstance(obj[name][0], h5py.h5r.Reference): + refs.extend(obj[name].tolist()) + + # dereference and get name of reference + if isinstance(obj, h5py.Dataset): + depends = list(set([obj.parent.get(i).name for i in refs])) + else: + depends = list(set([obj.get(i).name for i in refs])) + return depends diff --git a/nwb_linkml/src/nwb_linkml/maps/hdmf.py b/nwb_linkml/src/nwb_linkml/maps/hdmf.py index fb93b49..df1e724 100644 --- a/nwb_linkml/src/nwb_linkml/maps/hdmf.py +++ b/nwb_linkml/src/nwb_linkml/maps/hdmf.py @@ -2,55 +2,89 @@ Mapping functions for handling HDMF classes like DynamicTables """ import pdb -from typing import List, Type, Optional +from typing import List, Type, Optional, Any import ast from nwb_linkml.types import DataFrame import h5py -from pydantic import create_model +from pydantic import create_model, BaseModel from nwb_linkml.maps import dtype import numpy as np +from nwb_linkml.types.hdf5 import HDF5_Path +from nwb_linkml.types.ndarray import NDArray -def model_from_dynamictable(group:h5py.Group) -> Type[DataFrame]: +def model_from_dynamictable(group:h5py.Group, base:Optional[BaseModel] = None) -> Type[DataFrame]: colnames = group.attrs['colnames'] types = {} for col in colnames: + idxname = col + '_index' + if idxname in group.keys(): + idx = group.get(idxname)[0] + dset = group.get(col) + item = dset[idx] + else: + dset = group.get(col) + item = dset[0] # read the first entry to see what we got - dset = group.get(col) - item = dset[0] + if isinstance(item, bytes): item = item.decode('utf-8') if isinstance(item, str): # try to see if this is actually a list or smth encoded as a string try: item = ast.literal_eval(item) - except ValueError: + except (ValueError, SyntaxError): pass type_ = type(item) type_ = dtype.np_to_python.get(type_, type_) + if type_ is h5py.h5r.Reference: + type_ = HDF5_Path + elif type_ is np.ndarray: + type_ = NDArray + if type_ is not np.void: # FIXME: handling nested column types that appear only in some versions? types[col] = (List[type_ | None], ...) - model = create_model(group.name.split('/')[-1], **types, __base__=DataFrame) + if base is None: + base = DataFrame + else: + base = (DataFrame, base) + + + model = create_model(group.name.split('/')[-1], **types, __base__=base) return model -def dynamictable_to_df(group:h5py.Group, model:Optional[Type[DataFrame]]=None) -> DataFrame: +def dynamictable_to_df(group:h5py.Group, + model:Optional[Type[DataFrame]]=None, + base:Optional[BaseModel] = None) -> DataFrame: if model is None: - model = model_from_dynamictable(group) + model = model_from_dynamictable(group, base) items = {} - for col in model.model_fields.keys(): - data = group.get(col)[:] + for col, col_type in model.model_fields.items(): + if col not in group.keys(): + continue + idxname = col + '_index' + if idxname in group.keys(): + idx = group.get(idxname)[:] + data = group.get(col)[idx-1] + else: + data = group.get(col)[:] + + # Handle typing inside of list if isinstance(data[0], bytes): data = data.astype('unicode') if isinstance(data[0], str): + # lists and other compound data types can get flattened out to strings when stored + # so we try and literal eval and recover them try: eval_type = type(ast.literal_eval(data[0])) - except ValueError: + except (ValueError, SyntaxError): eval_type = str + # if we've found one of those, get the data type within it. if eval_type is not str: eval_list = [] for item in data.tolist(): @@ -58,12 +92,45 @@ def dynamictable_to_df(group:h5py.Group, model:Optional[Type[DataFrame]]=None) - eval_list.append(ast.literal_eval(item)) except ValueError: eval_list.append(None) - items[col] = eval_list - continue + data = eval_list + elif isinstance(data[0], h5py.h5r.Reference): + data = [HDF5_Path(group[d].name) for d in data] + elif isinstance(data[0], tuple) and any([isinstance(d, h5py.h5r.Reference) for d in data[0]]): + # references stored inside a tuple, reference + location. + # dereference them!? + dset = group.get(col) + names = dset.dtype.names + if names is not None and names[0] == 'idx_start' and names[1] == 'count': + data = dereference_reference_vector(dset, data) - items[col] = data.tolist() + else: + data = data.tolist() - pdb.set_trace() - return model(**items) + # After list, check if we need to put this thing inside of + # another class, as indicated by the enclosing model + + items[col] = data + + return model(hdf5_path = group.name, + name = group.name.split('/')[-1], + **items) + + +def dereference_reference_vector(dset: h5py.Dataset, data:Optional[List[Any]]) -> List: + """ + Given a compound dataset with indices, counts, and object references, dereference to values + + Data is of the form + (idx_start, count, target) + """ + # assume all these references are to the same target + # and the index is in the 3rd position + if data is None: + data = dset[:] + + target = dset.parent.get(data[0][-1]) + res = [target[d[0]:d[0]+d[1]] for d in data] + return res + diff --git a/nwb_linkml/src/nwb_linkml/types/df.py b/nwb_linkml/src/nwb_linkml/types/df.py index 54d4869..b89530e 100644 --- a/nwb_linkml/src/nwb_linkml/types/df.py +++ b/nwb_linkml/src/nwb_linkml/types/df.py @@ -5,6 +5,7 @@ import pdb from typing import List, Any, get_origin, get_args, Union, Optional, Dict from types import NoneType +import h5py import numpy as np import pandas as pd from pydantic import ( @@ -115,3 +116,25 @@ class DataFrame(BaseModel, pd.DataFrame): for k, v in out.items() } return nxt(self.__class__(**out)) + + +class DataFrameProxy(DataFrame): + def __init__(self, hdf5_file: str, hdf5_path:str, **kwargs): + # pdb.set_trace() + super().__init__(**kwargs) + + self.hdf5_file = hdf5_file + self.hdf5_path = hdf5_path + + + def _load(self): + cols = {} + with h5py.File(self.hdf5_file, 'r') as h5f: + obj = h5f.get(self.hdf5_path) + for col in self.model_fields.keys(): + if col in obj.keys(): + setattr(self, col, obj[col][:]) + + def __getitem__(self, item:str): + pass + diff --git a/nwb_linkml/src/nwb_linkml/types/hdf5.py b/nwb_linkml/src/nwb_linkml/types/hdf5.py new file mode 100644 index 0000000..1aeb4b6 --- /dev/null +++ b/nwb_linkml/src/nwb_linkml/types/hdf5.py @@ -0,0 +1,3 @@ +from typing import Annotated + +HDF5_Path = Annotated[str, """Trivial subclass of string to indicate that it is a reference to a location within an HDF5 file"""] \ No newline at end of file diff --git a/nwb_linkml/src/nwb_linkml/types/ndarray.py b/nwb_linkml/src/nwb_linkml/types/ndarray.py index 63ae36d..93525ad 100644 --- a/nwb_linkml/src/nwb_linkml/types/ndarray.py +++ b/nwb_linkml/src/nwb_linkml/types/ndarray.py @@ -4,6 +4,7 @@ Extension of nptyping NDArray for pydantic that allows for JSON-Schema serializa * Order to store data in (row first) """ import pdb +from pathlib import Path from typing import ( Any, Callable, @@ -22,6 +23,7 @@ from pydantic import ( from pydantic.json_schema import JsonSchemaValue import numpy as np +import h5py from nptyping import NDArray as _NDArray from nptyping.ndarray import NDArrayMeta @@ -49,55 +51,66 @@ class NDArray(_NDArray): np_to_python[dtype]) def validate_dtype(value: np.ndarray) -> np.ndarray: + if dtype is Any: + return value assert value.dtype == dtype, f"Invalid dtype! expected {dtype}, got {value.dtype}" return value def validate_array(value: Any) -> np.ndarray: assert cls.__instancecheck__(value), f'Invalid shape! expected shape {shape.prepared_args}, got shape {value.shape}' return value + def coerce_list(value: Any) -> np.ndarray: + if isinstance(value, list): + value = np.array(value) + return value + # get the names of the shape constraints, if any - shape_parts = shape.__args__[0].split(',') - split_parts = [p.split(' ')[1] if len(p.split(' ')) == 2 else None for p in shape_parts] + if shape is Any: + list_schema = core_schema.list_schema(core_schema.any_schema()) + else: + shape_parts = shape.__args__[0].split(',') + split_parts = [p.split(' ')[1] if len(p.split(' ')) == 2 else None for p in shape_parts] - # Construct a list of list schema - # go in reverse order - construct list schemas such that - # the final schema is the one that checks the first dimension - shape_labels = reversed(split_parts) - shape_args = reversed(shape.prepared_args) - list_schema = None - for arg, label in zip(shape_args, shape_labels): - # which handler to use? for the first we use the actual type - # handler, everywhere else we use the prior list handler - if list_schema is None: - inner_schema = array_type_handler - else: - inner_schema = list_schema + # Construct a list of list schema + # go in reverse order - construct list schemas such that + # the final schema is the one that checks the first dimension + shape_labels = reversed(split_parts) + shape_args = reversed(shape.prepared_args) + list_schema = None + for arg, label in zip(shape_args, shape_labels): + # which handler to use? for the first we use the actual type + # handler, everywhere else we use the prior list handler + if list_schema is None: + inner_schema = array_type_handler + else: + inner_schema = list_schema - # make a label annotation, if we have one - if label is not None: - metadata = {'name': label} - else: - metadata = None + # make a label annotation, if we have one + if label is not None: + metadata = {'name': label} + else: + metadata = None - # make the current level list schema, accounting for shape - if arg == '*': - list_schema = core_schema.list_schema(inner_schema, - metadata=metadata) - else: - arg = int(arg) - list_schema = core_schema.list_schema( - inner_schema, - min_length=arg, - max_length=arg, - metadata=metadata - ) + # make the current level list schema, accounting for shape + if arg == '*': + list_schema = core_schema.list_schema(inner_schema, + metadata=metadata) + else: + arg = int(arg) + list_schema = core_schema.list_schema( + inner_schema, + min_length=arg, + max_length=arg, + metadata=metadata + ) return core_schema.json_or_python_schema( json_schema=list_schema, python_schema=core_schema.chain_schema( [ + core_schema.no_info_plain_validator_function(coerce_list), core_schema.is_instance_schema(np.ndarray), core_schema.no_info_plain_validator_function(validate_dtype), core_schema.no_info_plain_validator_function(validate_array) @@ -107,4 +120,31 @@ class NDArray(_NDArray): lambda instance: instance.tolist(), when_used='json' ) - ) \ No newline at end of file + ) + + +class NDArrayProxy(): + """ + Thin proxy to numpy arrays stored within hdf5 files, + only read into memory when accessed, but otherwise + passthrough all attempts to access attributes. + """ + def __init__(self, h5f_file: Path, path: str): + """ + Args: + h5f_file (:class:`pathlib.Path`): Path to source HDF5 file + path (str): Location within HDF5 file where this array is located + """ + self.h5f_file = h5f_file + self.path = path + + def __getattr__(self, item): + with h5py.File(self.h5f_file, 'r') as h5f: + obj = h5f.get(self.path) + return getattr(obj, item) + def __getitem__(self, slice) -> np.ndarray: + with h5py.File(self.h5f_file, 'r') as h5f: + obj = h5f.get(self.path) + return obj[slice] + def __setitem__(self, slice, value): + raise NotImplementedError(f"Cant write into an arrayproxy yet!") diff --git a/nwb_linkml/src/nwb_linkml/types/ndarray.pyi b/nwb_linkml/src/nwb_linkml/types/ndarray.pyi index 1aa5dd7..a90b8d3 100644 --- a/nwb_linkml/src/nwb_linkml/types/ndarray.pyi +++ b/nwb_linkml/src/nwb_linkml/types/ndarray.pyi @@ -1,3 +1,4 @@ import numpy as np -NDArray = np.ndarray \ No newline at end of file +NDArray = np.ndarray +NDArrayProxy = np.ndarray \ No newline at end of file diff --git a/nwb_linkml/tests/fixtures.py b/nwb_linkml/tests/fixtures.py index bb33a90..ca5e605 100644 --- a/nwb_linkml/tests/fixtures.py +++ b/nwb_linkml/tests/fixtures.py @@ -27,3 +27,8 @@ def nwb_core_fixture() -> NamespacesAdapter: nwb_core.populate_imports() return nwb_core + +@pytest.fixture(scope="session") +def data_dir() -> Path: + path = Path(__file__).parent.resolve() / 'data' + return path diff --git a/nwb_linkml/tests/test_io/test_io_hdf5.py b/nwb_linkml/tests/test_io/test_io_hdf5.py index 6cd54f9..f8fa306 100644 --- a/nwb_linkml/tests/test_io/test_io_hdf5.py +++ b/nwb_linkml/tests/test_io/test_io_hdf5.py @@ -9,13 +9,14 @@ from ..fixtures import tmp_output_dir, set_config_vars from nwb_linkml.io.hdf5 import HDF5IO from nwb_linkml.io.hdf5 import truncate_file + @pytest.mark.skip() def test_hdf_read(): NWBFILE = Path('/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773.nwb') if not NWBFILE.exists(): return io = HDF5IO(path=NWBFILE) - model = io.read('acquisition') + model = io.read() pdb.set_trace() @@ -76,10 +77,12 @@ def test_truncate_file(tmp_output_dir): assert target_h5f[target_h5f['link']['child'].attrs['reference_contig']].name == target_h5f['data']['dataset_contig'].name assert target_h5f[target_h5f['link']['child'].attrs['reference_chunked']].name == target_h5f['data']['dataset_chunked'].name assert target_h5f['data']['dataset_contig'].attrs['anattr'] == 1 + @pytest.mark.skip() def test_flatten_hdf(): - from nwb_linkml.io.hdf5 import HDF5IO, flatten_hdf - path = '/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773_probe-769322820_ecephys.nwb' + from nwb_linkml.io.hdf5 import HDF5IO + from nwb_linkml.maps.hdf5 import flatten_hdf + path = '/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773.nwb' import h5py h5f = h5py.File(path) flat = flatten_hdf(h5f) diff --git a/nwb_linkml/tests/test_types/test_ndarray.py b/nwb_linkml/tests/test_types/test_ndarray.py index 14f26b8..d7d0506 100644 --- a/nwb_linkml/tests/test_types/test_ndarray.py +++ b/nwb_linkml/tests/test_types/test_ndarray.py @@ -9,6 +9,7 @@ from pydantic import BaseModel, ValidationError, Field from nwb_linkml.types.ndarray import NDArray from nptyping import Shape, Number +from ..fixtures import data_dir def test_ndarray_type(): class Model(BaseModel): @@ -54,3 +55,6 @@ def test_ndarray_union(): with pytest.raises(ValidationError): instance = Model(array=np.random.random((5,10,4,6))) +@pytest.mark.skip() +def test_ndarray_proxy(data_dir): + h5f_source = data_dir / 'aibs_ecephys.nwb'