Correct schema for hdf5 path type

dont double MRO for basemodels in generated dynamictable
This commit is contained in:
sneakers-the-rat 2023-09-28 17:58:45 -07:00
parent 6bbf56d1a0
commit eca7a5ec2e
4 changed files with 44 additions and 30 deletions

View file

@ -20,7 +20,7 @@ Other TODO:
"""
import pdb
import warnings
from typing import Optional, Dict, overload, Type
from typing import Optional, Dict, overload, Type, Union
from pathlib import Path
from types import ModuleType
from typing import TYPE_CHECKING, NamedTuple
@ -58,7 +58,7 @@ class HDF5IO():
@overload
def read(self, path:str) -> BaseModel | Dict[str, BaseModel]: ...
def read(self, path:Optional[str] = None):
def read(self, path:Optional[str] = None) -> Union['NWBFile', BaseModel, Dict[str, BaseModel]]:
print('starting read')
provider = self.make_provider()
print('provider made')
@ -95,6 +95,8 @@ class HDF5IO():
queue.apply_phase(ReadPhases.construct)
pdb.set_trace()
# --------------------------------------------------
# FIXME: Hardcoding top-level file reading just for the win
# --------------------------------------------------

View file

@ -4,7 +4,6 @@ Maps for reading and writing from HDF5
We have sort of diverged from the initial idea of a generalized map as in :class:`linkml.map.Map` ,
so we will make our own mapping class here and re-evaluate whether they should be unified later
"""
import pdb
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Literal, List, Dict, Optional, Type, Union
@ -12,8 +11,7 @@ from typing import Literal, List, Dict, Optional, Type, Union
import h5py
from enum import StrEnum
from pydantic import BaseModel, Field, ConfigDict, ValidationError
import dask.array as da
from pydantic import BaseModel, Field, ConfigDict
from nwb_linkml.providers.schema import SchemaProvider
from nwb_linkml.maps.hdmf import dynamictable_to_model
@ -30,7 +28,11 @@ class ReadPhases(StrEnum):
"""After reading, casting the results of the read into their models"""
class H5SourceItem(BaseModel):
"""Tuple of items for each element when flattening an hdf5 file"""
"""
Descriptor of items for each element when :func:`.flatten_hdf` flattens an hdf5 file.
Consumed by :class:`.HDF5Map` classes, orchestrated by :class:`.ReadQueue`
"""
path: str
"""Absolute hdf5 path of element"""
h5f_path: str
@ -55,7 +57,11 @@ class H5SourceItem(BaseModel):
return self.path.split('/')
class H5ReadResult(BaseModel):
"""Result returned by each of our mapping operations"""
"""
Result returned by each of our mapping operations.
Also used as the source for operations in the ``construct`` :class:`.ReadPhases`
"""
path: str
"""absolute hdf5 path of element"""
source: Union[H5SourceItem, 'H5ReadResult']
@ -78,9 +84,9 @@ class H5ReadResult(BaseModel):
"""
The model that this item should be cast into
"""
completes: List[str] = Field(default_factory=list)
completes: List[HDF5_Path] = Field(default_factory=list)
"""
If this result completes any other fields, we remove them from the build queue
If this result completes any other fields, we remove them from the build queue.
"""
namespace: Optional[str] = None
"""
@ -88,11 +94,11 @@ class H5ReadResult(BaseModel):
"""
neurodata_type: Optional[str] = None
"""
Optional: The neurodata type to use for this object
Optional: The neurodata type to use for this object
"""
applied: List[str] = Field(default_factory=list)
"""
Which stages were applied to this item
Which map operations were applied to this item
"""
errors: List[str] = Field(default_factory=list)
"""
@ -109,19 +115,20 @@ FlatH5 = Dict[str, H5SourceItem]
class HDF5Map(ABC):
phase: ReadPhases
"""
If ``True``, if the check is fulfilled, no other maps can be applied this phase
"""
priority: int = 0
"""
Within a phase, sort mapping operations from low to high priority
(maybe this should be renamed because highest priority last doesnt make a lot of sense)
"""
@classmethod
@abstractmethod
def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
def check(cls, src: H5SourceItem|H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
"""Check if this map applies to the given item to read"""
@classmethod
@abstractmethod
def apply(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult:
def apply(cls, src: H5SourceItem|H5ReadResult, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> H5ReadResult:
"""Actually apply the map!"""
@ -157,11 +164,10 @@ class ResolveDynamicTable(HDF5Map):
and then we include the datasets as :class:`~.nwb_linkml.types.ndarray.NDArrayProxy` objects which
lazy load the arrays in a thread/process safe way.
This map also resolves
This map also resolves the child elements, indicating so by the ``completes`` field in the :class:`.ReadResult`
"""
phase = ReadPhases.read
priority = 1
exclusive = True
@classmethod
def check(cls, src: H5SourceItem, provider:SchemaProvider, completed: Dict[str, H5ReadResult]) -> bool:
if src.h5_type == 'dataset':
@ -191,7 +197,7 @@ class ResolveDynamicTable(HDF5Map):
base_model = provider.get_class(src.namespace, src.neurodata_type)
model = dynamictable_to_model(obj, base=base_model)
completes = ['/'.join([src.path, child]) for child in obj.keys()]
completes = [HDF5_Path(child.name) for child in obj.values()]
return H5ReadResult(
path=src.path,

View file

@ -69,12 +69,12 @@ def model_from_dynamictable(group:h5py.Group, base:Optional[BaseModel] = None) -
#types[col] = (List[type_ | None], ...)
types[col] = (type_, None)
if base is None:
#base = DataFrame
base = BaseModel
else:
base = (BaseModel, base)
#base = (DataFrame, base)
# if base is None:
# #base = DataFrame
# base = BaseModel
# else:
# base = (BaseModel, base)
# #base = (DataFrame, base)
model = create_model(group.name.split('/')[-1], **types, __base__=base)
@ -83,12 +83,12 @@ def model_from_dynamictable(group:h5py.Group, base:Optional[BaseModel] = None) -
def dynamictable_to_model(
group:h5py.Group,
model:Optional[Type[DataFrame]]=None,
base:Optional[BaseModel] = None) -> BaseModel:
model:Optional[Type[BaseModel]]=None,
base:Optional[Type[BaseModel]] = None) -> BaseModel:
"""
Instantiate a dynamictable model
Calls :func:`.model_from_dynamictable` if model is not provided.
Calls :func:`.model_from_dynamictable` if ``model`` is not provided.
"""
if model is None:
model = model_from_dynamictable(group, base)

View file

@ -1,5 +1,11 @@
from typing import Annotated
from typing import Any
from pydantic_core import CoreSchema, core_schema
from pydantic import GetCoreSchemaHandler
class HDF5_Path(str):
"""Trivial subclass of string to indicate that it is a reference to a location within an HDF5 file"""
pass
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, handler: GetCoreSchemaHandler
) -> CoreSchema:
return core_schema.no_info_after_validator_function(cls, handler(str))