it working

had to very hackily handle special cases on NWBFile class but it working. Now badly in need of a tidying, docs, and tests.
This commit is contained in:
sneakers-the-rat 2023-10-02 23:26:43 -07:00
parent 34f8969fa9
commit f682105c1a
5 changed files with 89 additions and 9 deletions

View file

@ -21,6 +21,13 @@ class GroupAdapter(ClassAdapter):
all([self._check_if_container(g) for g in self.cls.groups]): # and \ all([self._check_if_container(g) for g in self.cls.groups]): # and \
# self.parent is not None: # self.parent is not None:
return self.handle_container_group(self.cls) return self.handle_container_group(self.cls)
# Or you can have groups like /intervals where there are some named groups, and some unnamed
# but they all have the same type
elif len(self.cls.groups) > 0 and \
all([g.neurodata_type_inc == self.cls.groups[0].neurodata_type_inc for g in self.cls.groups]) and \
self.cls.groups[0].neurodata_type_inc is not None and \
all([g.quantity in ('?', '*') for g in self.cls.groups]):
return self.handle_container_group(self.cls)
# handle if we are a terminal container group without making a new class # handle if we are a terminal container group without making a new class
if len(self.cls.groups) == 0 and \ if len(self.cls.groups) == 0 and \

View file

@ -93,7 +93,7 @@ class WeakRefShimBaseModel(BaseModel):
__slots__ = '__weakref__' __slots__ = '__weakref__'
class ConfiguredBaseModel(WeakRefShimBaseModel, class ConfiguredBaseModel(WeakRefShimBaseModel,
validate_assignment = True, validate_assignment = False,
validate_all = True, validate_all = True,
underscore_attrs_are_private = True, underscore_attrs_are_private = True,
extra = {% if allow_extra %}'allow'{% else %}'forbid'{% endif %}, extra = {% if allow_extra %}'allow'{% else %}'forbid'{% endif %},
@ -273,7 +273,8 @@ class NWBPydanticGenerator(PydanticGenerator):
all_classes = sv.all_classes(imports=True) all_classes = sv.all_classes(imports=True)
needed_classes = [] needed_classes = []
for clsname, cls in all_classes.items(): for clsname, cls in all_classes.items():
if cls.tree_root: #if cls.tree_root:
if cls.is_a != 'Arraylike':
needed_classes.append(clsname) needed_classes.append(clsname)
imports = self._locate_imports(needed_classes, sv) imports = self._locate_imports(needed_classes, sv)

View file

@ -59,9 +59,9 @@ class HDF5IO():
def read(self, path:str) -> BaseModel | Dict[str, BaseModel]: ... def read(self, path:str) -> BaseModel | Dict[str, BaseModel]: ...
def read(self, path:Optional[str] = None) -> Union['NWBFile', BaseModel, Dict[str, BaseModel]]: def read(self, path:Optional[str] = None) -> Union['NWBFile', BaseModel, Dict[str, BaseModel]]:
print('starting read')
provider = self.make_provider() provider = self.make_provider()
print('provider made')
h5f = h5py.File(str(self.path)) h5f = h5py.File(str(self.path))
if path: if path:
src = h5f.get(path) src = h5f.get(path)
@ -73,7 +73,7 @@ class HDF5IO():
children = flatten_hdf(src) children = flatten_hdf(src)
else: else:
raise NotImplementedError('directly read individual datasets') raise NotImplementedError('directly read individual datasets')
print('hdf flattened')
queue = ReadQueue( queue = ReadQueue(
h5f=self.path, h5f=self.path,
queue=children, queue=children,
@ -82,11 +82,11 @@ class HDF5IO():
# Apply initial planning phase of reading # Apply initial planning phase of reading
queue.apply_phase(ReadPhases.plan) queue.apply_phase(ReadPhases.plan)
print('phase - plan completed')
# Now do read operations until we're finished # Now do read operations until we're finished
queue.apply_phase(ReadPhases.read) queue.apply_phase(ReadPhases.read)
print('phase - read completed')
# pdb.set_trace() # pdb.set_trace()
# if len(queue.queue)> 0: # if len(queue.queue)> 0:
@ -94,7 +94,10 @@ class HDF5IO():
queue.apply_phase(ReadPhases.construct) queue.apply_phase(ReadPhases.construct)
pdb.set_trace() if path is None:
return queue.completed['/'].result
else:
return queue.completed[path].result

View file

@ -4,6 +4,7 @@ Maps for reading and writing from HDF5
We have sort of diverged from the initial idea of a generalized map as in :class:`linkml.map.Map` , We have sort of diverged from the initial idea of a generalized map as in :class:`linkml.map.Map` ,
so we will make our own mapping class here and re-evaluate whether they should be unified later so we will make our own mapping class here and re-evaluate whether they should be unified later
""" """
import datetime
import pdb import pdb
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from pathlib import Path from pathlib import Path
@ -514,6 +515,7 @@ class CompleteModelGroups(HDF5Map):
def check(cls, src: H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool: def check(cls, src: H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
if src.model is not None and \ if src.model is not None and \
src.source.h5_type == 'group' and \ src.source.h5_type == 'group' and \
src.neurodata_type != 'NWBFile' and \
all([depend in completed.keys() for depend in src.depends]): all([depend in completed.keys() for depend in src.depends]):
return True return True
else: else:
@ -560,6 +562,73 @@ class CompleteModelGroups(HDF5Map):
# applied=src.applied + ['CompleteModelGroups'] # applied=src.applied + ['CompleteModelGroups']
# ) # )
class CompleteNWBFile(HDF5Map):
"""
The Top-Level NWBFile class is so special cased we just make its own completion special case!
.. todo::
This is truly hideous, just meant as a way to get to the finish line on a late night, will be cleaned up later
"""
phase = ReadPhases.construct
priority = 11
@classmethod
def check(cls, src: H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
if src.neurodata_type == 'NWBFile' and \
all([depend in completed.keys() for depend in src.depends]):
return True
else:
return False
@classmethod
def apply(cls, src: H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult:
res = {k:v for k,v in src.result.items() if not isinstance(v, HDF5_Path)}
unpacked_results, errors, completes = resolve_references(src.result, completed)
res.update(unpacked_results)
res['name'] = 'root'
res['file_create_date'] = [datetime.datetime.fromisoformat(ts.decode('utf-8')) for ts in res['file_create_date']['array'][:]]
if 'stimulus' not in res.keys():
res['stimulus'] = provider.get_class('core', 'NWBFileStimulus')()
electrode_groups = []
egroup_keys = list(res['general'].get('extracellular_ephys', {}).keys())
egroup_dict = {}
for k in egroup_keys:
if k != 'electrodes':
egroup = res['general']['extracellular_ephys'][k]
electrode_groups.append(egroup)
egroup_dict[egroup.hdf5_path] = egroup
del res['general']['extracellular_ephys'][k]
if len(electrode_groups) > 0:
res['general']['extracellular_ephys']['electrode_group'] = electrode_groups
trode_type = provider.get_class('core', 'NWBFileGeneralExtracellularEphysElectrodes')
#anmro = list(type(res['general']['extracellular_ephys']['electrodes']).__mro__)
#anmro.insert(1, trode_type)
trodes_original = res['general']['extracellular_ephys']['electrodes']
trodes = trode_type.model_construct(trodes_original.model_dump())
res['general']['extracellular_ephys']['electrodes'] = trodes
#type(res['general']['extracellular_ephys']['electrodes']).__mro__ = tuple(anmro)
# electrodes_dict = res['general']['extracellular_ephys']['electrodes'].model_dump()
# with h5py.File(src.source.h5f_path, 'r') as h5f:
# electrodes_dict['group'] = [egroup_dict[h5f[e].name] for e in electrodes_dict['group'][:]]
# res['general']['extracellular_ephys']['electrodes'] = electrodes_dict
instance = src.model(**res)
return H5ReadResult(
path=src.path,
source=src,
result=instance,
model=src.model,
completed=True,
completes=completes,
neurodata_type=src.neurodata_type,
namespace=src.namespace,
applied=src.applied + ['CompleteModelGroups'],
errors=errors
)

View file

@ -60,7 +60,7 @@ def model_from_dynamictable(group:h5py.Group, base:Optional[BaseModel] = None) -
#nptype = nptyping.typing_.name_per_dtype[group[col].dtype.type] #nptype = nptyping.typing_.name_per_dtype[group[col].dtype.type]
nptype = group[col].dtype.type nptype = group[col].dtype.type
if nptype == np.void: if nptype == np.void:
warnings.warn(f"Cant handle numpy void type for column {col} in {group.name}") # warnings.warn(f"Cant handle numpy void type for column {col} in {group.name}")
continue continue
type_ = Optional[NDArray[Any, nptype]] type_ = Optional[NDArray[Any, nptype]]