Working on finalizing the mapping operation... doing it single threaded for now and it's very slow but it completes up until the stage where we need to zip up the orphaned objects and other things that can be inferred from the model.

Need to make a proxytable model like proxyarray because reading all these tables takes way too fuckin long and it's not what we want to do anyway.
This commit is contained in:
sneakers-the-rat 2023-09-25 22:03:53 -07:00
parent ce75dacf93
commit 50e816bad4
13 changed files with 677 additions and 189 deletions

View file

@ -0,0 +1,19 @@
"""
Utility functions for introspection on python annotations
"""
import typing
def unwrap_optional(annotation):
if typing.get_origin(annotation) == typing.Union:
args = typing.get_args(annotation)
if len(args) == 2 and args[1].__name__ == 'NoneType':
annotation = args[0]
return annotation
def take_outer_type(annotation):
if typing.get_origin(annotation) is list:
return list
return annotation

View file

@ -12,8 +12,6 @@ Mapping operations (mostly TODO atm)
* Create new models from DynamicTables * Create new models from DynamicTables
* Handle softlinks as object references and vice versa by adding a ``path`` attr * Handle softlinks as object references and vice versa by adding a ``path`` attr
Other TODO: Other TODO:
* Read metadata only, don't read all arrays * Read metadata only, don't read all arrays
@ -21,21 +19,19 @@ Other TODO:
""" """
import pdb import pdb
import typing
import warnings import warnings
from typing import Optional, List, Dict, overload, Literal, Type, Any from typing import Optional, Dict, overload, Type
from pathlib import Path from pathlib import Path
from types import ModuleType from types import ModuleType
from typing import TypeVar, TYPE_CHECKING, NamedTuple from typing import TYPE_CHECKING, NamedTuple
from abc import abstractmethod
import json import json
import subprocess import subprocess
import shutil import shutil
import h5py import h5py
from pydantic import BaseModel, Field, ConfigDict from pydantic import BaseModel
from dataclasses import dataclass, field
from nwb_linkml.maps.hdf5 import H5SourceItem, flatten_hdf, ReadPhases, ReadQueue
from nwb_linkml.translate import generate_from_nwbfile from nwb_linkml.translate import generate_from_nwbfile
#from nwb_linkml.models.core_nwb_file import NWBFile #from nwb_linkml.models.core_nwb_file import NWBFile
if TYPE_CHECKING: if TYPE_CHECKING:
@ -43,44 +39,6 @@ if TYPE_CHECKING:
from nwb_linkml.providers.schema import SchemaProvider from nwb_linkml.providers.schema import SchemaProvider
class H5SourceItem(BaseModel):
"""Tuple of items for each element when flattening an hdf5 file"""
path: str
"""Absolute hdf5 path of element"""
leaf: bool
"""If ``True``, this item has no children (and thus we should start instantiating it before ascending to parent classes)"""
h5_type: Literal['group', 'dataset']
"""What kind of hdf5 element this is"""
depends: List[str] = Field(default_factory=list)
"""Paths of other source items that this item depends on before it can be instantiated. eg. from softlinks"""
attrs: dict = Field(default_factory=dict)
"""Any static attrs that can be had from the element"""
model_config = ConfigDict(arbitrary_types_allowed=True)
@property
def parts(self) -> List[str]:
"""path split by /"""
return self.path.split('/')
FlatH5 = Dict[str, H5SourceItem]
class ReadQueue(BaseModel):
"""Container model to store items as they are built """
h5f: h5py.File = Field(
description="Open hdf5 file used when resolving the queue!"
)
queue: Dict[str,H5SourceItem] = Field(
default_factory=dict,
description="Items left to be instantiated, keyed by hdf5 path",
)
completed: Dict[str, Any] = Field(
default_factory=dict,
description="Items that have already been instantiated, keyed by hdf5 path"
)
model_config = ConfigDict(arbitrary_types_allowed=True)
class HDF5IO(): class HDF5IO():
def __init__(self, path:Path): def __init__(self, path:Path):
@ -110,27 +68,39 @@ class HDF5IO():
# get all children of selected item # get all children of selected item
if isinstance(src, (h5py.File, h5py.Group)): if isinstance(src, (h5py.File, h5py.Group)):
children = self._flatten_hdf(src) children = flatten_hdf(src)
else: else:
raise NotImplementedError('directly read individual datasets') raise NotImplementedError('directly read individual datasets')
queue = ReadQueue(
h5f=self.path,
queue=children,
provider=provider
)
#pdb.set_trace()
# Apply initial planning phase of reading
queue.apply_phase(ReadPhases.plan)
# Now do read operations until we're finished
queue.apply_phase(ReadPhases.read)
pdb.set_trace()
data = {} #
for k, v in src.items(): #
if isinstance(v, h5py.Group): # data = {}
data[k] = H5Group(cls=v, parent=parent, root_model=parent).read() # for k, v in src.items():
elif isinstance(v, h5py.Dataset): # if isinstance(v, h5py.Group):
data[k] = H5Dataset(cls=v, parent=parent).read() # data[k] = H5Group(cls=v, parent=parent, root_model=parent).read()
# elif isinstance(v, h5py.Dataset):
if path is None: # data[k] = H5Dataset(cls=v, parent=parent).read()
return parent(**data) #
if 'neurodata_type' in src.attrs: # if path is None:
raise NotImplementedError('Making a submodel not supported yet') # return parent(**data)
else: # if 'neurodata_type' in src.attrs:
return data # raise NotImplementedError('Making a submodel not supported yet')
# else:
# return data
@ -314,20 +284,6 @@ def truncate_file(source: Path, target: Optional[Path] = None, n:int=10) -> Path
return target return target
def unwrap_optional(annotation):
if typing.get_origin(annotation) == typing.Union:
args = typing.get_args(annotation)
if len(args) == 2 and args[1].__name__ == 'NoneType':
annotation = args[0]
return annotation
def take_outer_type(annotation):
if typing.get_origin(annotation) is list:
return list
return annotation
def submodel_by_path(model: BaseModel, path:str) -> Type[BaseModel | dict | list]: def submodel_by_path(model: BaseModel, path:str) -> Type[BaseModel | dict | list]:
""" """
@ -338,61 +294,3 @@ def submodel_by_path(model: BaseModel, path:str) -> Type[BaseModel | dict | list
ann = model.model_fields[part].annotation ann = model.model_fields[part].annotation
def flatten_hdf(h5f:h5py.File | h5py.Group, skip='specifications') -> Dict[str, H5SourceItem]:
"""
Flatten all child elements of hdf element into a dict of :class:`.H5SourceItem` s keyed by their path
Args:
h5f (:class:`h5py.File` | :class:`h5py.Group`): HDF file or group to flatten!
"""
items = {}
def _itemize(name: str, obj: h5py.Dataset | h5py.Group):
if skip in name:
return
leaf = isinstance(obj, h5py.Dataset) or len(obj.keys()) == 0
# get references in attrs and datasets
refs = [ref for ref in obj.attrs.values() if isinstance(ref, h5py.h5r.Reference)]
if isinstance(obj, h5py.Dataset):
h5_type = 'dataset'
if obj.shape == ():
if isinstance(obj[()], h5py.h5r.Reference):
refs.append(obj[()])
elif isinstance(obj[0], h5py.h5r.Reference):
refs.extend(obj[:].tolist())
else:
h5_type = 'group'
# dereference and get name of reference
depends = list(set([h5f[i].name for i in refs]))
if not name.startswith('/'):
name = '/' + name
items[name] = H5SourceItem.model_construct(
path = name,
leaf = leaf,
depends = depends,
h5_type=h5_type,
attrs = dict(obj.attrs.items())
)
h5f.visititems(_itemize)
return items
def sort_flat_hdf(flat: Dict[str, H5SourceItem]) -> Dict[str, H5SourceItem]:
"""
Sort flat hdf5 file in a rough order of solvability
* First process any leaf items
* Put any items with dependencies at the end
Args:
flat:
Returns:
"""
class Rank(NamedTuple):
has_depends: bool
not_leaf: bool

View file

@ -17,10 +17,11 @@ class MyDf(DataFrame):
a = MyDf(ints=[1,2,3]) a = MyDf(ints=[1,2,3])
from nwb_linkml.io.hdf5 import HDF5IO, flatten_hdf from nwb_linkml.io.hdf5 import HDF5IO
import h5py import h5py
from typing import NamedTuple, Tuple, Optional from typing import NamedTuple, Tuple, Optional
from nwb_linkml.io.hdf5 import H5SourceItem, FlatH5, ReadQueue, HDF5IO from nwb_linkml.io.hdf5 import HDF5IO
from nwb_linkml.maps.hdf5 import H5SourceItem, FlatH5, ReadQueue, flatten_hdf
from nwb_linkml.providers.schema import SchemaProvider from nwb_linkml.providers.schema import SchemaProvider
from rich import print from rich import print
from pydantic import BaseModel from pydantic import BaseModel

View file

@ -16,6 +16,7 @@ class PHASES(StrEnum):
"""After the YAML for a model has been loaded""" """After the YAML for a model has been loaded"""
@dataclass @dataclass
class Map: class Map:
scope: str scope: str

View file

@ -0,0 +1,423 @@
"""
Maps for reading and writing from HDF5
We have sort of diverged from the initial idea of a generalized map as in :class:`linkml.map.Map` ,
so we will make our own mapping class here and re-evaluate whether they should be unified later
"""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Literal, List, Dict, Optional
import h5py
from enum import StrEnum
from pydantic import BaseModel, Field, ConfigDict
from nwb_linkml.providers.schema import SchemaProvider
from nwb_linkml.maps.hdmf import dynamictable_to_df
from nwb_linkml.types.hdf5 import HDF5_Path
class ReadPhases(StrEnum):
plan = 'plan'
"""Before reading starts, building an index of objects to read"""
read = 'read'
"""Main reading operation"""
construct = 'construct'
"""After reading, casting the results of the read into their models"""
class H5SourceItem(BaseModel):
"""Tuple of items for each element when flattening an hdf5 file"""
path: str
"""Absolute hdf5 path of element"""
h5f_path: str
"""Path to the source hdf5 file"""
leaf: bool
"""If ``True``, this item has no children (and thus we should start instantiating it before ascending to parent classes)"""
h5_type: Literal['group', 'dataset']
"""What kind of hdf5 element this is"""
depends: List[str] = Field(default_factory=list)
"""Paths of other source items that this item depends on before it can be instantiated. eg. from softlinks"""
attrs: dict = Field(default_factory=dict)
"""Any static attrs that can be had from the element"""
namespace: Optional[str] = None
"""Optional: The namespace that the neurodata type belongs to"""
neurodata_type: Optional[str] = None
"""Optional: the neurodata type for this dataset or group"""
model_config = ConfigDict(arbitrary_types_allowed=True)
@property
def parts(self) -> List[str]:
"""path split by /"""
return self.path.split('/')
class H5ReadResult(BaseModel):
"""Result returned by each of our mapping operations"""
path: str
"""absolute hdf5 path of element"""
source: H5SourceItem
"""
Source that this result is based on.
The map can modify this item, so the container should update the source
queue on each pass
"""
completed: bool = False
"""
Was this item completed by this map step? False for cases where eg.
we still have dependencies that need to be completed before this one
"""
result: Optional[BaseModel | dict | str | int | float] = None
"""
If completed, built result. A dict that can be instantiated into the model.
If completed is True and result is None, then remove this object
"""
completes: List[str] = Field(default_factory=list)
"""
If this result completes any other fields, we remove them from the build queue
"""
namespace: Optional[str] = None
"""
Optional: the namespace of the neurodata type for this object
"""
neurodata_type: Optional[str] = None
"""
Optional: The neurodata type to use for this object
"""
FlatH5 = Dict[str, H5SourceItem]
class HDF5Map(ABC):
phase: ReadPhases
exclusive: bool = False
"""
If ``True``, if the check is fulfilled, no other maps can be applied this phase
"""
priority: int = 0
@classmethod
@abstractmethod
def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
"""Check if this map applies to the given item to read"""
@classmethod
@abstractmethod
def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult:
"""Actually apply the map!"""
# --------------------------------------------------
# Planning maps
# --------------------------------------------------
class PruneEmpty(HDF5Map):
"""Remove groups with no attrs """
phase = ReadPhases.plan
@classmethod
def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
if src.leaf and src.h5_type == 'group':
with h5py.File(src.h5f_path, 'r') as h5f:
obj = h5f.get(src.path)
if len(obj.attrs) == 0:
return True
@classmethod
def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult:
return H5ReadResult.model_construct(
path = src.path,
source=src,
completed=True
)
# class ResolveVectorData(HDF5Map):
# """
# We will load vanilla VectorData as part of :class:`.ResolveDynamicTable`
# """
# phase = ReadPhases.read
#
# @classmethod
# def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
# if src.h5_type == 'group':
# return False
# if src.neurodata_type == 'VectorData':
#
class ResolveDynamicTable(HDF5Map):
"""Handle loading a dynamic table!"""
phase = ReadPhases.read
priority = 1
exclusive = True
@classmethod
def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
if src.h5_type == 'dataset':
return False
if 'neurodata_type' in src.attrs:
if src.attrs['neurodata_type'] == 'DynamicTable':
return True
# otherwise, see if it's a subclass
model = provider.get_class(src.attrs['namespace'], src.attrs['neurodata_type'])
# just inspect the MRO as strings rather than trying to check subclasses because
# we might replace DynamicTable in the future, and there isn't a stable DynamicTable
# class to inherit from anyway because of the whole multiple versions thing
parents = [parent.__name__ for parent in model.__mro__]
if 'DynamicTable' in parents:
return True
else:
return False
else:
return False
@classmethod
def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult:
with h5py.File(src.h5f_path, 'r') as h5f:
obj = h5f.get(src.path)
# make a populated model :)
# TODO: use a tableproxy like NDArrayProxy to not load all these into memory
if src.neurodata_type != 'DynamicTable':
#base_model = provider.get_class(src.namespace, src.neurodata_type)
base_model = None
else:
base_model = None
model = dynamictable_to_df(obj, base=base_model)
completes = ['/'.join([src.path, child]) for child in obj.keys()]
return H5ReadResult(
path=src.path,
source=src,
result=model,
completes=completes,
completed = True
)
class ResolveModelGroup(HDF5Map):
phase = ReadPhases.read
priority = 10 # do this generally last
exclusive = True
@classmethod
def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
if 'neurodata_type' in src.attrs and src.h5_type == 'group':
return True
else:
return False
@classmethod
def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult:
model = provider.get_class(src.namespace, src.neurodata_type)
res = {}
with h5py.File(src.h5f_path, 'r') as h5f:
obj = h5f.get(src.path)
for key, type in model.model_fields.items():
if key in obj.attrs:
res[key] = obj.attrs[key]
continue
if key in obj.keys():
# stash a reference to this, we'll compile it at the end
res[key] = HDF5_Path('/'.join([src.path, key]))
res['hdf5_path'] = src.path
res['name'] = src.parts[-1]
return H5ReadResult(
path=src.path,
source=src,
completed=True,
result = res,
namespace=src.namespace,
neurodata_type=src.neurodata_type
)
#
# class ResolveModelDataset(HDF5Map):
# phase = ReadPhases.read
# priority = 10
# exclusive = True
#
# @classmethod
# def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
# if 'neurodata_type' in src.attrs and src.h5_type == 'dataset':
# return True
# else:
# return False
#
# def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult:
#
class ResolveScalars(HDF5Map):
phase = ReadPhases.read
priority = 11 #catchall
exclusive = True
@classmethod
def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
if src.h5_type == 'dataset' and 'neurodata_type' not in src.attrs:
with h5py.File(src.h5f_path, 'r') as h5f:
obj = h5f.get(src.path)
if obj.shape == ():
return True
else:
return False
@classmethod
def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult:
with h5py.File(src.h5f_path, 'r') as h5f:
obj = h5f.get(src.path)
res = obj[()]
return H5ReadResult(
path=src.path,
source = src,
completed=True,
result = res
)
class ReadQueue(BaseModel):
"""Container model to store items as they are built """
h5f: Path = Field(
description=("Path to the source hdf5 file used when resolving the queue! "
"Each translation step should handle opening and closing the file, "
"rather than passing a handle around")
)
provider: SchemaProvider = Field(
description="SchemaProvider used by each of the items in the read queue"
)
queue: Dict[str,H5SourceItem] = Field(
default_factory=dict,
description="Items left to be instantiated, keyed by hdf5 path",
)
completed: Dict[str, H5ReadResult] = Field(
default_factory=dict,
description="Items that have already been instantiated, keyed by hdf5 path"
)
model_config = ConfigDict(arbitrary_types_allowed=True)
def apply_phase(self, phase:ReadPhases):
phase_maps = [m for m in HDF5Map.__subclasses__() if m.phase == phase]
phase_maps = sorted(phase_maps, key=lambda x: x.priority)
results = []
# TODO: Thread/multiprocess this
for name, item in self.queue.items():
for op in phase_maps:
if op.check(item, self.provider, self.completed):
results.append(op.apply(item, self.provider, self.completed))
if op.exclusive:
break # out of inner iteration
# remake the source queue and save results
for res in results:
# remove the original item
del self.queue[res.path]
if res.completed:
# if the item has been finished and there is some result, add it to the results
if res.result is not None:
self.completed[res.path] = res
# otherwise if the item has been completed and there was no result,
# just drop it.
# if we have completed other things, delete them from the queue
for also_completed in res.completes:
try:
del self.queue[also_completed]
except KeyError:
# normal, we might have already deleted this in a previous step
pass
else:
# if we didn't complete the item (eg. we found we needed more dependencies),
# add the updated source to the queue again
self.queue[res.path] = res.source
def flatten_hdf(h5f:h5py.File | h5py.Group, skip='specifications') -> Dict[str, H5SourceItem]:
"""
Flatten all child elements of hdf element into a dict of :class:`.H5SourceItem` s keyed by their path
Args:
h5f (:class:`h5py.File` | :class:`h5py.Group`): HDF file or group to flatten!
"""
items = {}
def _itemize(name: str, obj: h5py.Dataset | h5py.Group):
if skip in name:
return
leaf = isinstance(obj, h5py.Dataset) or len(obj.keys()) == 0
if isinstance(obj, h5py.Dataset):
h5_type = 'dataset'
elif isinstance(obj, h5py.Group):
h5_type = 'group'
else:
raise ValueError(f'Object must be a dataset or group! {obj}')
# get references in attrs and datasets to populate dependencies
#depends = get_references(obj)
#if not name.startswith('/'):
# name = '/' + name
attrs = dict(obj.attrs.items())
items[name] = H5SourceItem.model_construct(
path = name,
h5f_path=h5f.file.filename,
leaf = leaf,
#depends = depends,
h5_type=h5_type,
attrs = attrs,
namespace = attrs.get('namespace', None),
neurodata_type= attrs.get('neurodata_type', None)
)
h5f.visititems(_itemize)
return items
def get_references(obj: h5py.Dataset | h5py.Group) -> List[str]:
"""
Find all hdf5 object references in a dataset or group
Locate references in
* Attrs
* Scalar datasets
* Single-column datasets
* Multi-column datasets
Args:
obj (:class:`h5py.Dataset` | :class:`h5py.Group`): Object to evaluate
Returns:
List[str]: List of paths that are referenced within this object
"""
# Find references in attrs
refs = [ref for ref in obj.attrs.values() if isinstance(ref, h5py.h5r.Reference)]
# For datasets, apply checks depending on shape of data.
if isinstance(obj, h5py.Dataset):
if obj.shape == ():
# scalar
if isinstance(obj[()], h5py.h5r.Reference):
refs.append(obj[()])
elif isinstance(obj[0], h5py.h5r.Reference):
# single-column
refs.extend(obj[:].tolist())
elif len(obj.dtype) > 1:
# "compound" datasets
for name in obj.dtype.names:
if isinstance(obj[name][0], h5py.h5r.Reference):
refs.extend(obj[name].tolist())
# dereference and get name of reference
if isinstance(obj, h5py.Dataset):
depends = list(set([obj.parent.get(i).name for i in refs]))
else:
depends = list(set([obj.get(i).name for i in refs]))
return depends

View file

@ -2,55 +2,89 @@
Mapping functions for handling HDMF classes like DynamicTables Mapping functions for handling HDMF classes like DynamicTables
""" """
import pdb import pdb
from typing import List, Type, Optional from typing import List, Type, Optional, Any
import ast import ast
from nwb_linkml.types import DataFrame from nwb_linkml.types import DataFrame
import h5py import h5py
from pydantic import create_model from pydantic import create_model, BaseModel
from nwb_linkml.maps import dtype from nwb_linkml.maps import dtype
import numpy as np import numpy as np
from nwb_linkml.types.hdf5 import HDF5_Path
from nwb_linkml.types.ndarray import NDArray
def model_from_dynamictable(group:h5py.Group) -> Type[DataFrame]: def model_from_dynamictable(group:h5py.Group, base:Optional[BaseModel] = None) -> Type[DataFrame]:
colnames = group.attrs['colnames'] colnames = group.attrs['colnames']
types = {} types = {}
for col in colnames: for col in colnames:
idxname = col + '_index'
if idxname in group.keys():
idx = group.get(idxname)[0]
dset = group.get(col)
item = dset[idx]
else:
dset = group.get(col)
item = dset[0]
# read the first entry to see what we got # read the first entry to see what we got
dset = group.get(col)
item = dset[0]
if isinstance(item, bytes): if isinstance(item, bytes):
item = item.decode('utf-8') item = item.decode('utf-8')
if isinstance(item, str): if isinstance(item, str):
# try to see if this is actually a list or smth encoded as a string # try to see if this is actually a list or smth encoded as a string
try: try:
item = ast.literal_eval(item) item = ast.literal_eval(item)
except ValueError: except (ValueError, SyntaxError):
pass pass
type_ = type(item) type_ = type(item)
type_ = dtype.np_to_python.get(type_, type_) type_ = dtype.np_to_python.get(type_, type_)
if type_ is h5py.h5r.Reference:
type_ = HDF5_Path
elif type_ is np.ndarray:
type_ = NDArray
if type_ is not np.void: if type_ is not np.void:
# FIXME: handling nested column types that appear only in some versions? # FIXME: handling nested column types that appear only in some versions?
types[col] = (List[type_ | None], ...) types[col] = (List[type_ | None], ...)
model = create_model(group.name.split('/')[-1], **types, __base__=DataFrame) if base is None:
base = DataFrame
else:
base = (DataFrame, base)
model = create_model(group.name.split('/')[-1], **types, __base__=base)
return model return model
def dynamictable_to_df(group:h5py.Group, model:Optional[Type[DataFrame]]=None) -> DataFrame: def dynamictable_to_df(group:h5py.Group,
model:Optional[Type[DataFrame]]=None,
base:Optional[BaseModel] = None) -> DataFrame:
if model is None: if model is None:
model = model_from_dynamictable(group) model = model_from_dynamictable(group, base)
items = {} items = {}
for col in model.model_fields.keys(): for col, col_type in model.model_fields.items():
data = group.get(col)[:] if col not in group.keys():
continue
idxname = col + '_index'
if idxname in group.keys():
idx = group.get(idxname)[:]
data = group.get(col)[idx-1]
else:
data = group.get(col)[:]
# Handle typing inside of list
if isinstance(data[0], bytes): if isinstance(data[0], bytes):
data = data.astype('unicode') data = data.astype('unicode')
if isinstance(data[0], str): if isinstance(data[0], str):
# lists and other compound data types can get flattened out to strings when stored
# so we try and literal eval and recover them
try: try:
eval_type = type(ast.literal_eval(data[0])) eval_type = type(ast.literal_eval(data[0]))
except ValueError: except (ValueError, SyntaxError):
eval_type = str eval_type = str
# if we've found one of those, get the data type within it.
if eval_type is not str: if eval_type is not str:
eval_list = [] eval_list = []
for item in data.tolist(): for item in data.tolist():
@ -58,12 +92,45 @@ def dynamictable_to_df(group:h5py.Group, model:Optional[Type[DataFrame]]=None) -
eval_list.append(ast.literal_eval(item)) eval_list.append(ast.literal_eval(item))
except ValueError: except ValueError:
eval_list.append(None) eval_list.append(None)
items[col] = eval_list data = eval_list
continue elif isinstance(data[0], h5py.h5r.Reference):
data = [HDF5_Path(group[d].name) for d in data]
elif isinstance(data[0], tuple) and any([isinstance(d, h5py.h5r.Reference) for d in data[0]]):
# references stored inside a tuple, reference + location.
# dereference them!?
dset = group.get(col)
names = dset.dtype.names
if names is not None and names[0] == 'idx_start' and names[1] == 'count':
data = dereference_reference_vector(dset, data)
items[col] = data.tolist() else:
data = data.tolist()
pdb.set_trace() # After list, check if we need to put this thing inside of
return model(**items) # another class, as indicated by the enclosing model
items[col] = data
return model(hdf5_path = group.name,
name = group.name.split('/')[-1],
**items)
def dereference_reference_vector(dset: h5py.Dataset, data:Optional[List[Any]]) -> List:
"""
Given a compound dataset with indices, counts, and object references, dereference to values
Data is of the form
(idx_start, count, target)
"""
# assume all these references are to the same target
# and the index is in the 3rd position
if data is None:
data = dset[:]
target = dset.parent.get(data[0][-1])
res = [target[d[0]:d[0]+d[1]] for d in data]
return res

View file

@ -5,6 +5,7 @@ import pdb
from typing import List, Any, get_origin, get_args, Union, Optional, Dict from typing import List, Any, get_origin, get_args, Union, Optional, Dict
from types import NoneType from types import NoneType
import h5py
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from pydantic import ( from pydantic import (
@ -115,3 +116,25 @@ class DataFrame(BaseModel, pd.DataFrame):
for k, v in out.items() for k, v in out.items()
} }
return nxt(self.__class__(**out)) return nxt(self.__class__(**out))
class DataFrameProxy(DataFrame):
def __init__(self, hdf5_file: str, hdf5_path:str, **kwargs):
# pdb.set_trace()
super().__init__(**kwargs)
self.hdf5_file = hdf5_file
self.hdf5_path = hdf5_path
def _load(self):
cols = {}
with h5py.File(self.hdf5_file, 'r') as h5f:
obj = h5f.get(self.hdf5_path)
for col in self.model_fields.keys():
if col in obj.keys():
setattr(self, col, obj[col][:])
def __getitem__(self, item:str):
pass

View file

@ -0,0 +1,3 @@
from typing import Annotated
HDF5_Path = Annotated[str, """Trivial subclass of string to indicate that it is a reference to a location within an HDF5 file"""]

View file

@ -4,6 +4,7 @@ Extension of nptyping NDArray for pydantic that allows for JSON-Schema serializa
* Order to store data in (row first) * Order to store data in (row first)
""" """
import pdb import pdb
from pathlib import Path
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -22,6 +23,7 @@ from pydantic import (
from pydantic.json_schema import JsonSchemaValue from pydantic.json_schema import JsonSchemaValue
import numpy as np import numpy as np
import h5py
from nptyping import NDArray as _NDArray from nptyping import NDArray as _NDArray
from nptyping.ndarray import NDArrayMeta from nptyping.ndarray import NDArrayMeta
@ -49,55 +51,66 @@ class NDArray(_NDArray):
np_to_python[dtype]) np_to_python[dtype])
def validate_dtype(value: np.ndarray) -> np.ndarray: def validate_dtype(value: np.ndarray) -> np.ndarray:
if dtype is Any:
return value
assert value.dtype == dtype, f"Invalid dtype! expected {dtype}, got {value.dtype}" assert value.dtype == dtype, f"Invalid dtype! expected {dtype}, got {value.dtype}"
return value return value
def validate_array(value: Any) -> np.ndarray: def validate_array(value: Any) -> np.ndarray:
assert cls.__instancecheck__(value), f'Invalid shape! expected shape {shape.prepared_args}, got shape {value.shape}' assert cls.__instancecheck__(value), f'Invalid shape! expected shape {shape.prepared_args}, got shape {value.shape}'
return value return value
def coerce_list(value: Any) -> np.ndarray:
if isinstance(value, list):
value = np.array(value)
return value
# get the names of the shape constraints, if any # get the names of the shape constraints, if any
shape_parts = shape.__args__[0].split(',') if shape is Any:
split_parts = [p.split(' ')[1] if len(p.split(' ')) == 2 else None for p in shape_parts] list_schema = core_schema.list_schema(core_schema.any_schema())
else:
shape_parts = shape.__args__[0].split(',')
split_parts = [p.split(' ')[1] if len(p.split(' ')) == 2 else None for p in shape_parts]
# Construct a list of list schema # Construct a list of list schema
# go in reverse order - construct list schemas such that # go in reverse order - construct list schemas such that
# the final schema is the one that checks the first dimension # the final schema is the one that checks the first dimension
shape_labels = reversed(split_parts) shape_labels = reversed(split_parts)
shape_args = reversed(shape.prepared_args) shape_args = reversed(shape.prepared_args)
list_schema = None list_schema = None
for arg, label in zip(shape_args, shape_labels): for arg, label in zip(shape_args, shape_labels):
# which handler to use? for the first we use the actual type # which handler to use? for the first we use the actual type
# handler, everywhere else we use the prior list handler # handler, everywhere else we use the prior list handler
if list_schema is None: if list_schema is None:
inner_schema = array_type_handler inner_schema = array_type_handler
else: else:
inner_schema = list_schema inner_schema = list_schema
# make a label annotation, if we have one # make a label annotation, if we have one
if label is not None: if label is not None:
metadata = {'name': label} metadata = {'name': label}
else: else:
metadata = None metadata = None
# make the current level list schema, accounting for shape # make the current level list schema, accounting for shape
if arg == '*': if arg == '*':
list_schema = core_schema.list_schema(inner_schema, list_schema = core_schema.list_schema(inner_schema,
metadata=metadata) metadata=metadata)
else: else:
arg = int(arg) arg = int(arg)
list_schema = core_schema.list_schema( list_schema = core_schema.list_schema(
inner_schema, inner_schema,
min_length=arg, min_length=arg,
max_length=arg, max_length=arg,
metadata=metadata metadata=metadata
) )
return core_schema.json_or_python_schema( return core_schema.json_or_python_schema(
json_schema=list_schema, json_schema=list_schema,
python_schema=core_schema.chain_schema( python_schema=core_schema.chain_schema(
[ [
core_schema.no_info_plain_validator_function(coerce_list),
core_schema.is_instance_schema(np.ndarray), core_schema.is_instance_schema(np.ndarray),
core_schema.no_info_plain_validator_function(validate_dtype), core_schema.no_info_plain_validator_function(validate_dtype),
core_schema.no_info_plain_validator_function(validate_array) core_schema.no_info_plain_validator_function(validate_array)
@ -108,3 +121,30 @@ class NDArray(_NDArray):
when_used='json' when_used='json'
) )
) )
class NDArrayProxy():
"""
Thin proxy to numpy arrays stored within hdf5 files,
only read into memory when accessed, but otherwise
passthrough all attempts to access attributes.
"""
def __init__(self, h5f_file: Path, path: str):
"""
Args:
h5f_file (:class:`pathlib.Path`): Path to source HDF5 file
path (str): Location within HDF5 file where this array is located
"""
self.h5f_file = h5f_file
self.path = path
def __getattr__(self, item):
with h5py.File(self.h5f_file, 'r') as h5f:
obj = h5f.get(self.path)
return getattr(obj, item)
def __getitem__(self, slice) -> np.ndarray:
with h5py.File(self.h5f_file, 'r') as h5f:
obj = h5f.get(self.path)
return obj[slice]
def __setitem__(self, slice, value):
raise NotImplementedError(f"Cant write into an arrayproxy yet!")

View file

@ -1,3 +1,4 @@
import numpy as np import numpy as np
NDArray = np.ndarray NDArray = np.ndarray
NDArrayProxy = np.ndarray

View file

@ -27,3 +27,8 @@ def nwb_core_fixture() -> NamespacesAdapter:
nwb_core.populate_imports() nwb_core.populate_imports()
return nwb_core return nwb_core
@pytest.fixture(scope="session")
def data_dir() -> Path:
path = Path(__file__).parent.resolve() / 'data'
return path

View file

@ -9,13 +9,14 @@ from ..fixtures import tmp_output_dir, set_config_vars
from nwb_linkml.io.hdf5 import HDF5IO from nwb_linkml.io.hdf5 import HDF5IO
from nwb_linkml.io.hdf5 import truncate_file from nwb_linkml.io.hdf5 import truncate_file
@pytest.mark.skip() @pytest.mark.skip()
def test_hdf_read(): def test_hdf_read():
NWBFILE = Path('/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773.nwb') NWBFILE = Path('/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773.nwb')
if not NWBFILE.exists(): if not NWBFILE.exists():
return return
io = HDF5IO(path=NWBFILE) io = HDF5IO(path=NWBFILE)
model = io.read('acquisition') model = io.read()
pdb.set_trace() pdb.set_trace()
@ -76,10 +77,12 @@ def test_truncate_file(tmp_output_dir):
assert target_h5f[target_h5f['link']['child'].attrs['reference_contig']].name == target_h5f['data']['dataset_contig'].name assert target_h5f[target_h5f['link']['child'].attrs['reference_contig']].name == target_h5f['data']['dataset_contig'].name
assert target_h5f[target_h5f['link']['child'].attrs['reference_chunked']].name == target_h5f['data']['dataset_chunked'].name assert target_h5f[target_h5f['link']['child'].attrs['reference_chunked']].name == target_h5f['data']['dataset_chunked'].name
assert target_h5f['data']['dataset_contig'].attrs['anattr'] == 1 assert target_h5f['data']['dataset_contig'].attrs['anattr'] == 1
@pytest.mark.skip() @pytest.mark.skip()
def test_flatten_hdf(): def test_flatten_hdf():
from nwb_linkml.io.hdf5 import HDF5IO, flatten_hdf from nwb_linkml.io.hdf5 import HDF5IO
path = '/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773_probe-769322820_ecephys.nwb' from nwb_linkml.maps.hdf5 import flatten_hdf
path = '/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773.nwb'
import h5py import h5py
h5f = h5py.File(path) h5f = h5py.File(path)
flat = flatten_hdf(h5f) flat = flatten_hdf(h5f)

View file

@ -9,6 +9,7 @@ from pydantic import BaseModel, ValidationError, Field
from nwb_linkml.types.ndarray import NDArray from nwb_linkml.types.ndarray import NDArray
from nptyping import Shape, Number from nptyping import Shape, Number
from ..fixtures import data_dir
def test_ndarray_type(): def test_ndarray_type():
class Model(BaseModel): class Model(BaseModel):
@ -54,3 +55,6 @@ def test_ndarray_union():
with pytest.raises(ValidationError): with pytest.raises(ValidationError):
instance = Model(array=np.random.random((5,10,4,6))) instance = Model(array=np.random.random((5,10,4,6)))
@pytest.mark.skip()
def test_ndarray_proxy(data_dir):
h5f_source = data_dir / 'aibs_ecephys.nwb'