Refactor dataset adapter to use check/apply style maps!

General cleaning around maps module, getting ready to remove translate module
This commit is contained in:
sneakers-the-rat 2023-10-09 23:06:24 -07:00
parent adaf939497
commit 42e64dce75
11 changed files with 414 additions and 377 deletions

View file

@ -152,7 +152,8 @@ class ClassAdapter(Adapter):
return name return name
def handle_dtype(self, dtype: DTypeType | None) -> str: @classmethod
def handle_dtype(cls, dtype: DTypeType | None) -> str:
if isinstance(dtype, ReferenceDtype): if isinstance(dtype, ReferenceDtype):
return dtype.target_type return dtype.target_type
elif dtype is None or dtype == []: elif dtype is None or dtype == []:

View file

@ -4,6 +4,7 @@ Adapter for NWB datasets to linkml Classes
import pdb import pdb
from typing import Optional, List from typing import Optional, List
import warnings import warnings
from abc import abstractmethod
from linkml_runtime.linkml_model import ClassDefinition, SlotDefinition from linkml_runtime.linkml_model import ClassDefinition, SlotDefinition
from pydantic import PrivateAttr from pydantic import PrivateAttr
@ -11,321 +12,355 @@ from pydantic import PrivateAttr
from nwb_schema_language import Dataset, ReferenceDtype, CompoundDtype, DTypeType from nwb_schema_language import Dataset, ReferenceDtype, CompoundDtype, DTypeType
from nwb_linkml.adapters.classes import ClassAdapter from nwb_linkml.adapters.classes import ClassAdapter
from nwb_linkml.maps.naming import camel_to_snake from nwb_linkml.maps.naming import camel_to_snake
from nwb_linkml.maps.dtype import flat_to_linkml
from nwb_linkml.adapters.adapter import BuildResult from nwb_linkml.adapters.adapter import BuildResult
from nwb_linkml.maps import QUANTITY_MAP from nwb_linkml.maps import QUANTITY_MAP, Map
class DatasetMap(Map):
@classmethod
@abstractmethod
def check(c, cls:Dataset) -> bool:
pass
@classmethod
@abstractmethod
def apply(c, res: BuildResult, cls:Dataset, name:Optional[str] = None) -> BuildResult:
pass
class MapScalar(DatasetMap):
"""
Datasets that are just a single value should just be a scalar value, not an array with size 1
Replace the built class with
"""
@classmethod
def check(c, cls:Dataset) -> bool:
if cls.neurodata_type_inc != 'VectorData' and \
not cls.neurodata_type_inc and \
not cls.attributes and \
not cls.dims and \
not cls.shape and \
cls.name:
return True
else:
return False
@classmethod
def apply(c, res: BuildResult, cls:Dataset, name:Optional[str] = None) -> BuildResult:
this_slot = SlotDefinition(
name=cls.name,
description=cls.doc,
range=ClassAdapter.handle_dtype(cls.dtype),
**QUANTITY_MAP[cls.quantity]
)
res = BuildResult(slots=[this_slot])
return res
class MapScalarAttributes(DatasetMap):
"""
A scalar with attributes gets an additional slot "value" that contains the actual scalar
value of this field
"""
@classmethod
def check(c, cls:Dataset) -> bool:
if cls.neurodata_type_inc != 'VectorData' and \
not cls.neurodata_type_inc and \
cls.attributes and \
not cls.dims and \
not cls.shape and \
cls.name:
return True
else:
return False
@classmethod
def apply(c, res: BuildResult, cls:Dataset, name:Optional[str] = None) -> BuildResult:
value_slot = SlotDefinition(
name='value',
range=ClassAdapter.handle_dtype(cls.dtype),
required=True
)
res.classes[0].attributes['value'] = value_slot
return res
class MapListlike(DatasetMap):
"""
Datasets that refer to other datasets (that handle their own arrays)
"""
@classmethod
def check(c, cls:Dataset) -> bool:
dtype = ClassAdapter.handle_dtype(cls.dtype)
if is_1d(cls) and dtype != 'AnyType' and dtype not in flat_to_linkml.keys():
return True
else:
return False
@classmethod
def apply(c, res: BuildResult, cls:Dataset, name:Optional[str] = None) -> BuildResult:
dtype = camel_to_snake(ClassAdapter.handle_dtype(cls.dtype))
slot = SlotDefinition(
name=dtype,
multivalued=True,
range=ClassAdapter.handle_dtype(cls.dtype),
description=cls.doc,
required=False if cls.quantity in ('*', '?') else True
)
res.classes[0].attributes[dtype] = slot
return res
class MapArraylike(DatasetMap):
"""
Datasets without any additional attributes don't create their own subclass,
they're just an array :).
Replace the base class with the array class, and make a slot that refers to it.
"""
@classmethod
def check(c, cls:Dataset) -> bool:
if cls.name and all([cls.dims, cls.shape]) and not has_attrs(cls):
return True
else:
return False
@classmethod
def apply(c, res: BuildResult, cls:Dataset, name:Optional[str] = None) -> BuildResult:
array_class = make_arraylike(cls, name)
name = camel_to_snake(cls.name)
res = BuildResult(
slots=[
SlotDefinition(
name=name,
multivalued=False,
range=array_class.name,
description=cls.doc,
required=False if cls.quantity in ('*', '?') else True
)
],
classes=[array_class]
)
return res
class MapArrayLikeAttributes(DatasetMap):
"""
The most general case - treat everything that isn't handled by one of the special cases
as an array!
Specifically, we make an ``Arraylike`` class such that:
- Each slot within a subclass indicates a possible dimension.
- Only dimensions that are present in all the dimension specifiers in the
original schema are required.
- Shape requirements are indicated using max/min cardinalities on the slot.
- The arraylike object should be stored in the `array` slot on the containing class
(since there are already properties named `data`)
"""
NEEDS_NAME = True
@classmethod
def check(c, cls:Dataset) -> bool:
dtype = ClassAdapter.handle_dtype(cls.dtype)
if all([cls.dims, cls.shape]) and \
cls.neurodata_type_inc != 'VectorData' and \
has_attrs(cls) and \
(dtype is 'AnyType' or dtype in flat_to_linkml):
return True
else:
return False
@classmethod
def apply(c, res: BuildResult, cls:Dataset, name:Optional[str] = None) -> BuildResult:
array_class = make_arraylike(cls, name)
# make a slot for the arraylike class
array_slot = SlotDefinition(
name='array',
range=array_class.name
)
res.classes.append(array_class)
res.classes[0].attributes.update({'array': array_slot})
return res
def make_arraylike(cls:Dataset, name:Optional[str] = None) -> ClassDefinition:
# The schema language doesn't have a way of specifying a dataset/group is "abstract"
# and yet hdmf-common says you don't need a dtype if the dataset is "abstract"
# so....
dtype = ClassAdapter.handle_dtype(cls.dtype)
# dims and shape are lists of lists. First we couple them
# (so each dim has its corresponding shape)..
# and then we take unique
# (dicts are ordered by default in recent pythons,
# while set() doesn't preserve order)
dims_shape = []
for inner_dim, inner_shape in zip(cls.dims, cls.shape):
if isinstance(inner_dim, list):
# list of lists
dims_shape.extend([(dim, shape) for dim, shape in zip(inner_dim, inner_shape)])
elif isinstance(inner_shape, list):
# Some badly formatted schema will have the shape be a LoL but the dims won't be...
dims_shape.extend([(inner_dim, shape) for shape in inner_shape])
else:
# single-layer list
dims_shape.append((inner_dim, inner_shape))
dims_shape = tuple(dict.fromkeys(dims_shape).keys())
# --------------------------------------------------
# SPECIAL CASE - allen institute's ndx-aibs-ecephys.extension
# confuses "dims" with "shape" , eg shape = [None], dims = [3].
# So we hardcode that here...
# --------------------------------------------------
if len(dims_shape) == 1 and isinstance(dims_shape[0][0], int) and dims_shape[0][1] is None:
dims_shape = (('dim', dims_shape[0][0]),)
# now make slots for each of them
slots = []
for dims, shape in dims_shape:
# if there is just a single list of possible dimensions, it's required
if not any([isinstance(inner_dim, list) for inner_dim in cls.dims]):
required = True
# if a dim is present in all possible combinations of dims, make it required
elif all([dims in inner_dim for inner_dim in cls.dims]):
required = True
else:
required = False
# use cardinality to do shape
if shape == 'null':
cardinality = None
else:
cardinality = shape
slots.append(SlotDefinition(
name=dims,
required=required,
maximum_cardinality=cardinality,
minimum_cardinality=cardinality,
range=dtype
))
# and then the class is just a subclass of `Arraylist` (which is imported by default from `nwb.language.yaml`)
if name:
pass
elif cls.neurodata_type_def:
name = cls.neurodata_type_def
elif cls.name:
name = cls.name
else:
raise ValueError(f"Dataset has no name or type definition, what do call it?")
name = '__'.join([name, 'Array'])
array_class = ClassDefinition(
name=name,
is_a="Arraylike",
attributes=slots
)
return array_class
def is_1d(cls:Dataset) -> bool:
if (
not any([isinstance(dim, list) for dim in cls.dims]) and
len(cls.dims) == 1
) or ( # nested list
all([isinstance(dim, list) for dim in cls.dims]) and
len(cls.dims) == 1 and
len(cls.dims[0]) == 1
):
return True
else:
return False
def has_attrs(cls:Dataset) -> bool:
if len(cls.attributes) > 0 and \
all([not a.value for a in cls.attributes]):
return True
else:
return False
# --------------------------------------------------
# DynamicTable special cases
# --------------------------------------------------
class Map1DVector(DatasetMap):
"""
``VectorData`` is subclassed with a name but without dims or attributes, treat this as a normal 1D array
slot that replaces any class that would be built for this
"""
@classmethod
def check(c, cls:Dataset) -> bool:
if cls.neurodata_type_inc == 'VectorData' and \
not cls.dims and \
not cls.shape and \
not cls.attributes \
and cls.name:
return True
else:
return False
@classmethod
def apply(c, res: BuildResult, cls:Dataset, name:Optional[str] = None) -> BuildResult:
this_slot = SlotDefinition(
name=cls.name,
description=cls.doc,
range=ClassAdapter.handle_dtype(cls.dtype),
multivalued=True
)
# No need to make a class for us, so we replace the existing build results
res = BuildResult(slots=[this_slot])
return res
class MapNVectors(DatasetMap):
"""
An unnamed container that indicates an arbitrary quantity of some other neurodata type.
Most commonly: ``VectorData`` is subclassed without a name and with a '*' quantity to indicate
arbitrary columns.
"""
@classmethod
def check(c, cls:Dataset) -> bool:
if cls.name is None and \
cls.neurodata_type_def is None and \
cls.neurodata_type_inc and \
cls.quantity in ('*', '+'):
#cls.neurodata_type_inc in ('VectorIndex', 'VectorData') and \
return True
else:
return False
@classmethod
def apply(c, res: BuildResult, cls:Dataset, name:Optional[str] = None) -> BuildResult:
this_slot = SlotDefinition(
name=camel_to_snake(cls.neurodata_type_inc),
description=cls.doc,
range=cls.neurodata_type_inc,
**QUANTITY_MAP[cls.quantity]
)
# No need to make a class for us, so we replace the existing build results
res = BuildResult(slots=[this_slot])
return res
class DatasetAdapter(ClassAdapter): class DatasetAdapter(ClassAdapter):
cls: Dataset cls: Dataset
_handlers: List[str] = PrivateAttr(default_factory=list)
"""Keep track of which handlers have been called"""
def build(self) -> BuildResult: def build(self) -> BuildResult:
res = self.build_base() res = self.build_base()
res = self.drop_dynamic_table(res) # find a map to use
res = self.handle_arraylike(res, self.cls, self._get_full_name()) matches = [m for m in DatasetMap.__subclasses__() if m.check(self.cls)]
res = self.handle_1d_vector(res)
res = self.handle_listlike(res)
res = self.handle_scalar(res)
if len(matches) > 1:
raise RuntimeError(f"Only one map should apply to a dataset, you need to refactor the maps! Got maps: {matches}")
if len(self._handlers) > 1: # apply matching maps
raise RuntimeError(f"Only one handler should have been triggered, instead triggered {self._handlers}") for m in matches:
res = m.apply(res, self.cls, self._get_full_name())
return res return res
def handle_scalar(self, res:BuildResult) -> BuildResult:
# Simplify datasets that are just a single value
if self.cls.neurodata_type_inc != 'VectorData' and \
not self.cls.neurodata_type_inc and \
not self.cls.attributes and \
not self.cls.dims and \
not self.cls.shape and \
self.cls.name:
self._handlers.append('scalar')
# throw out the class that would have been made for us
# we just need a slot
this_slot = SlotDefinition(
name=self.cls.name,
description=self.cls.doc,
range=self.handle_dtype(self.cls.dtype),
**QUANTITY_MAP[self.cls.quantity]
)
res = BuildResult(slots = [this_slot])
# if the scalar-valued class has attributes, append a
# 'value' slot that holds the (scalar) value of the dataset
elif self.cls.neurodata_type_inc != 'VectorData' and \
not self.cls.neurodata_type_inc and \
self.cls.attributes and \
not self.cls.dims and \
not self.cls.shape and \
self.cls.name:
self._handlers.append('scalar_class')
# quantity (including requirement) is handled by the
# parent slot - the value is required if the value class is
# supplied.
# ie.
# Optional[ScalarClass] = None
# class ScalarClass:
# value: dtype
value_slot = SlotDefinition(
name='value',
range=self.handle_dtype(self.cls.dtype),
required=True
)
res.classes[0].attributes['value'] = value_slot
return res
def handle_1d_vector(self, res: BuildResult) -> BuildResult:
# handle the special case where `VectorData` is subclasssed without any dims or attributes
# which just gets instantiated as a 1-d array in HDF5
if self.cls.neurodata_type_inc == 'VectorData' and \
not self.cls.dims and \
not self.cls.shape and \
not self.cls.attributes \
and self.cls.name:
self._handlers.append('1d_vector')
this_slot = SlotDefinition(
name=self.cls.name,
description=self.cls.doc,
range=self.handle_dtype(self.cls.dtype),
multivalued=True
)
# No need to make a class for us, so we replace the existing build results
res = BuildResult(slots=[this_slot])
return res
def handle_listlike(self, res:BuildResult) -> BuildResult:
"""
Handle cases where the dataset is just a list of a specific type.
Examples:
datasets:
- name: file_create_date
dtype: isodatetime
dims:
- num_modifications
shape:
- null
"""
if self.cls.name and len(self.cls.attributes) == 0 and ((
# single-layer list
not any([isinstance(dim, list) for dim in self.cls.dims]) and
len(self.cls.dims) == 1
) or (
# nested list
all([isinstance(dim, list) for dim in self.cls.dims]) and
len(self.cls.dims) == 1 and
len(self.cls.dims[0]) == 1
)):
res = BuildResult(
slots = [
SlotDefinition(
name = self.cls.name,
multivalued=True,
range=self.handle_dtype(self.cls.dtype),
description=self.cls.doc,
required=False if self.cls.quantity in ('*', '?') else True
)
]
)
return res
else:
return res
def handle_arraylike(self, res: BuildResult, dataset: Dataset, name: Optional[str] = None) -> BuildResult:
"""
Handling the
- dims
- shape
- dtype
fields as they are used in datasets. We'll use the :class:`.Arraylike` class to imitate them.
Specifically:
- Each slot within a subclass indicates a possible dimension.
- Only dimensions that are present in all the dimension specifiers in the
original schema are required.
- Shape requirements are indicated using max/min cardinalities on the slot.
- The arraylike object should be stored in the `array` slot on the containing class
(since there are already properties named `data`)
If any of `dims`, `shape`, or `dtype` are undefined, return `None`
Args:
dataset (:class:`nwb_schema_language.Dataset`): The dataset defining the arraylike
name (str): If present, override the name of the class before appending _Array
(we don't use _get_full_name here because we want to eventually decouple these functions from this adapter
class, which is sort of a development crutch. Ideally all these methods would just work on base nwb schema language types)
"""
if not any((dataset.dims, dataset.shape)):
# none of the required properties are defined, that's fine.
return res
elif not all((dataset.dims, dataset.shape)):
# need to have both if one is present!
warnings.warn(f"A dataset needs both dims and shape to define an arraylike object. This is allowed for compatibility with some badly formatted NWB files, but should in general be avoided. Treating like we dont have an array")
return res
# Special cases
if dataset.neurodata_type_inc == 'VectorData':
# Handle this in `handle_vectorlike` instead
return res
# The schema language doesn't have a way of specifying a dataset/group is "abstract"
# and yet hdmf-common says you don't need a dtype if the dataset is "abstract"
# so....
dtype = self.handle_dtype(dataset.dtype)
# dims and shape are lists of lists. First we couple them
# (so each dim has its corresponding shape)..
# and then we take unique
# (dicts are ordered by default in recent pythons,
# while set() doesn't preserve order)
dims_shape = []
for inner_dim, inner_shape in zip(dataset.dims, dataset.shape):
if isinstance(inner_dim, list):
# list of lists
dims_shape.extend([(dim, shape) for dim, shape in zip(inner_dim, inner_shape)])
elif isinstance(inner_shape, list):
# Some badly formatted schema will have the shape be a LoL but the dims won't be...
dims_shape.extend([(inner_dim, shape) for shape in inner_shape])
else:
# single-layer list
dims_shape.append((inner_dim, inner_shape))
dims_shape = tuple(dict.fromkeys(dims_shape).keys())
# if we only have one possible dimension, it's equivalent to a list, so we just return the slot
# if len(dims_shape) == 1 and self.parent:
# quantity = QUANTITY_MAP[dataset.quantity]
# slot = SlotDefinition(
# name=dataset.name,
# range=dtype,
# description=dataset.doc,
# required=quantity['required'],
# multivalued=True
# )
# res.classes[0].attributes.update({dataset.name: slot})
# self._handlers.append('arraylike-1d')
# return res
# --------------------------------------------------
# SPECIAL CASE - allen institute's ndx-aibs-ecephys.extension
# confuses "dims" with "shape" , eg shape = [None], dims = [3].
# So we hardcode that here...
# --------------------------------------------------
if len(dims_shape) == 1 and isinstance(dims_shape[0][0], int) and dims_shape[0][1] is None:
dims_shape = (('dim', dims_shape[0][0]),)
# now make slots for each of them
slots = []
for dims, shape in dims_shape:
# if there is just a single list of possible dimensions, it's required
if not any([isinstance(inner_dim, list) for inner_dim in dataset.dims]):
required = True
# if a dim is present in all possible combinations of dims, make it required
elif all([dims in inner_dim for inner_dim in dataset.dims]):
required = True
else:
required = False
# use cardinality to do shape
if shape == 'null':
cardinality = None
else:
cardinality = shape
slots.append(SlotDefinition(
name=dims,
required=required,
maximum_cardinality=cardinality,
minimum_cardinality=cardinality,
range=dtype
))
# and then the class is just a subclass of `Arraylist` (which is imported by default from `nwb.language.yaml`)
if name:
pass
elif dataset.neurodata_type_def:
name = dataset.neurodata_type_def
elif dataset.name:
name = dataset.name
else:
raise ValueError(f"Dataset has no name or type definition, what do call it?")
name = '__'.join([name, 'Array'])
array_class = ClassDefinition(
name=name,
is_a="Arraylike",
attributes=slots
)
# make a slot for the arraylike class
array_slot = SlotDefinition(
name='array',
range=array_class.name
)
res.classes.append(array_class)
res.classes[0].attributes.update({'array': array_slot})
#res.slots.append(array_slot)
self._handlers.append('arraylike')
return res
def drop_dynamic_table(self, res:BuildResult) -> BuildResult:
"""
DynamicTables in hdmf are so special-cased that we have to just special-case them ourselves.
Typically they include a '*' quantitied, unnamed VectorData object to contain arbitrary columns,
this would normally get converted to its own container class, but since they're unnamed they conflict with
names in the containing scope.
We just convert them into multivalued slots and don't use them
"""
if self.cls.name is None and \
self.cls.neurodata_type_def is None and \
self.cls.neurodata_type_inc in ('VectorIndex', 'VectorData') and \
self.cls.quantity == '*':
self._handlers.append('dynamic_table')
this_slot = SlotDefinition(
name=camel_to_snake(self.cls.neurodata_type_inc),
description=self.cls.doc,
range=self.cls.neurodata_type_inc,
required=False,
multivalued=True
)
# No need to make a class for us, so we replace the existing build results
res = BuildResult(slots=[this_slot])
return res
elif self.cls.name is None and \
self.cls.neurodata_type_def is None and \
self.cls.neurodata_type_inc and \
self.cls.quantity in ('*', '+'):
self._handlers.append('generic_container')
this_slot = SlotDefinition(
name=camel_to_snake(self.cls.neurodata_type_inc),
description=self.cls.doc,
range=self.cls.neurodata_type_inc,
**QUANTITY_MAP[self.cls.quantity]
)
# No need to make a class for us, so we replace the existing build results
res = BuildResult(slots=[this_slot])
return res
else:
return res

View file

@ -35,7 +35,6 @@ from tqdm import tqdm
import numpy as np import numpy as np
from nwb_linkml.maps.hdf5 import H5SourceItem, flatten_hdf, ReadPhases, ReadQueue from nwb_linkml.maps.hdf5 import H5SourceItem, flatten_hdf, ReadPhases, ReadQueue
from nwb_linkml.translate import generate_from_nwbfile
#from nwb_linkml.models.core_nwb_file import NWBFile #from nwb_linkml.models.core_nwb_file import NWBFile
if TYPE_CHECKING: if TYPE_CHECKING:
from nwb_linkml.models import NWBFile from nwb_linkml.models import NWBFile
@ -50,12 +49,6 @@ class HDF5IO():
self.path = Path(path) self.path = Path(path)
self._modules: Dict[str, ModuleType] = {} self._modules: Dict[str, ModuleType] = {}
@property
def modules(self) -> Dict[str, ModuleType]:
if len(self._modules) == 0:
self._modules = generate_from_nwbfile(self.path)
return self._modules
@overload @overload
def read(self, path:None) -> 'NWBFile': ... def read(self, path:None) -> 'NWBFile': ...

View file

@ -10,7 +10,7 @@ import yaml
from nwb_schema_language import Namespaces, Group, Dataset from nwb_schema_language import Namespaces, Group, Dataset
from nwb_linkml.providers.git import NamespaceRepo, NWB_CORE_REPO, HDMF_COMMON_REPO from nwb_linkml.providers.git import NamespaceRepo, NWB_CORE_REPO, HDMF_COMMON_REPO
from nwb_linkml.map import PHASES, Map from nwb_linkml.maps.postload import PHASES, KeyMap, apply_postload
from nwb_linkml.adapters.namespaces import NamespacesAdapter from nwb_linkml.adapters.namespaces import NamespacesAdapter
from nwb_linkml.adapters.schema import SchemaAdapter from nwb_linkml.adapters.schema import SchemaAdapter
@ -18,11 +18,7 @@ from nwb_linkml.adapters.schema import SchemaAdapter
def load_yaml(path:Path) -> dict: def load_yaml(path:Path) -> dict:
with open(path, 'r') as file: with open(path, 'r') as file:
ns_dict = yaml.safe_load(file) ns_dict = yaml.safe_load(file)
ns_dict = apply_postload(ns_dict)
# apply maps
maps = [m for m in Map.instances if m.phase == PHASES.postload]
for amap in maps:
ns_dict = amap.apply(ns_dict)
return ns_dict return ns_dict
def _load_namespaces(path:Path|NamespaceRepo) -> Namespaces: def _load_namespaces(path:Path|NamespaceRepo) -> Namespaces:
@ -38,10 +34,7 @@ def _load_namespaces(path:Path|NamespaceRepo) -> Namespaces:
def load_schema_file(path:Path, yaml:Optional[dict] = None) -> SchemaAdapter: def load_schema_file(path:Path, yaml:Optional[dict] = None) -> SchemaAdapter:
if yaml is not None: if yaml is not None:
source = yaml source = yaml
# apply maps source = apply_postload(source)
maps = [m for m in Map.instances if m.phase == PHASES.postload]
for amap in maps:
source = amap.apply(source)
else: else:
source = load_yaml(path) source = load_yaml(path)

View file

@ -1,4 +1,5 @@
# Import everything so it's defined, but shoudlnt' necessarily be used from here # Import everything so it's defined, but shoudlnt' necessarily be used from here
from nwb_linkml.maps.preload import MAP_HDMF_DATATYPE_DEF, MAP_HDMF_DATATYPE_INC from nwb_linkml.maps.map import Map
from nwb_linkml.maps.postload import MAP_HDMF_DATATYPE_DEF, MAP_HDMF_DATATYPE_INC
from nwb_linkml.maps.quantity import QUANTITY_MAP from nwb_linkml.maps.quantity import QUANTITY_MAP
from nwb_linkml.maps.dtype import flat_to_linkml, flat_to_npytyping from nwb_linkml.maps.dtype import flat_to_linkml, flat_to_npytyping

View file

@ -6,7 +6,7 @@ so we will make our own mapping class here and re-evaluate whether they should b
""" """
import datetime import datetime
import pdb import pdb
from abc import ABC, abstractmethod from abc import abstractmethod
from pathlib import Path from pathlib import Path
from typing import Literal, List, Dict, Optional, Type, Union, Tuple from typing import Literal, List, Dict, Optional, Type, Union, Tuple
@ -16,6 +16,7 @@ from enum import StrEnum
from pydantic import BaseModel, Field, ConfigDict from pydantic import BaseModel, Field, ConfigDict
from nwb_linkml.providers.schema import SchemaProvider from nwb_linkml.providers.schema import SchemaProvider
from nwb_linkml.maps import Map
from nwb_linkml.maps.hdmf import dynamictable_to_model from nwb_linkml.maps.hdmf import dynamictable_to_model
from nwb_linkml.types.hdf5 import HDF5_Path from nwb_linkml.types.hdf5 import HDF5_Path
from nwb_linkml.types.ndarray import NDArrayProxy from nwb_linkml.types.ndarray import NDArrayProxy
@ -115,7 +116,7 @@ class H5ReadResult(BaseModel):
FlatH5 = Dict[str, H5SourceItem] FlatH5 = Dict[str, H5SourceItem]
class HDF5Map(ABC): class HDF5Map(Map):
phase: ReadPhases phase: ReadPhases
priority: int = 0 priority: int = 0
""" """

View file

@ -0,0 +1,21 @@
from typing import Any
from abc import ABC, abstractmethod
class Map(ABC):
"""
The generic top-level mapping class is just a classmethod for checking if the map applies and a
method for applying the check if it does
"""
@classmethod
@abstractmethod
def check(cls, *args, **kwargs) -> bool:
"""Check if this map applies to the given item to read"""
@classmethod
@abstractmethod
def apply(cls, *args, **kwargs) -> Any:
"""Actually apply the map!"""

View file

@ -1,24 +1,26 @@
"""
Maps to change the loaded .yaml from nwb schema before it's given to the nwb_schema_language models
"""
from dataclasses import dataclass from dataclasses import dataclass
from typing import ClassVar, List, Optional
from enum import StrEnum from enum import StrEnum
import ast from typing import Optional, ClassVar, List
import re import re
import ast
from nwb_linkml.maps import Map
class MAP_TYPES(StrEnum):
key = 'key'
"""Mapping the name of one key to another key"""
class SCOPE_TYPES(StrEnum): class SCOPE_TYPES(StrEnum):
namespace = 'namespace' namespace = 'namespace'
class PHASES(StrEnum): class PHASES(StrEnum):
postload = "postload" postload = "postload"
"""After the YAML for a model has been loaded""" """After the YAML for a model has been loaded"""
@dataclass @dataclass
class Map: class KeyMap():
scope: str scope: str
"""The namespace that the map is relevant to""" """The namespace that the map is relevant to"""
scope_type: SCOPE_TYPES scope_type: SCOPE_TYPES
@ -36,24 +38,11 @@ class Map:
phase: Optional[PHASES] = None phase: Optional[PHASES] = None
instances: ClassVar[List['Map']] = [] instances: ClassVar[List['KeyMap']] = []
""" """
Maps that get defined!!! Maps that get defined!!!
""" """
def apply(self):
raise NotImplementedError('do this in a subclass')
def __post_init__(self):
self.instances.append(self)
# def replace_keys(input: dict, source: str, target: str) -> dict:
# """Recursively change keys in a dictionary"""
class KeyMap(Map):
def apply(self, input: dict) -> dict: def apply(self, input: dict) -> dict:
""" """
Change all keys from source to target in a super naive way. Change all keys from source to target in a super naive way.
@ -65,9 +54,34 @@ class KeyMap(Map):
out = ast.literal_eval(input_str) out = ast.literal_eval(input_str)
return out return out
def __post_init__(self):
self.instances.append(self)
def apply_preload(ns_dict) -> dict:
maps = [m for m in Map.instances if m.phase == PHASES.postload] MAP_HDMF_DATATYPE_DEF = KeyMap(
source="\'data_type_def\'",
target="\'neurodata_type_def\'",
scope='hdmf-common',
scope_type=SCOPE_TYPES.namespace,
phase=PHASES.postload
)
MAP_HDMF_DATATYPE_INC = KeyMap(
source="\'data_type_inc\'",
target="\'neurodata_type_inc\'",
scope='hdmf-common',
scope_type=SCOPE_TYPES.namespace,
phase=PHASES.postload
)
class MAP_TYPES(StrEnum):
key = 'key'
"""Mapping the name of one key to another key"""
def apply_postload(ns_dict) -> dict:
maps = [m for m in KeyMap.instances if m.phase == PHASES.postload]
for amap in maps: for amap in maps:
ns_dict = amap.apply(ns_dict) ns_dict = amap.apply(ns_dict)
return ns_dict return ns_dict

View file

@ -1,22 +0,0 @@
"""
Maps to change the loaded .yaml from nwb schema before it's given to the nwb_schema_language models
"""
from nwb_linkml.map import KeyMap, SCOPE_TYPES, PHASES
MAP_HDMF_DATATYPE_DEF = KeyMap(
source="\'data_type_def\'",
target="\'neurodata_type_def\'",
scope='hdmf-common',
scope_type=SCOPE_TYPES.namespace,
phase=PHASES.postload
)
MAP_HDMF_DATATYPE_INC = KeyMap(
source="\'data_type_inc\'",
target="\'neurodata_type_inc\'",
scope='hdmf-common',
scope_type=SCOPE_TYPES.namespace,
phase=PHASES.postload
)

View file

@ -16,7 +16,7 @@ from linkml_runtime.dumpers import yaml_dumper
from nwb_schema_language import Namespaces from nwb_schema_language import Namespaces
from nwb_linkml.io.schema import load_schema_file from nwb_linkml.io.schema import load_schema_file
from nwb_linkml.generators.pydantic import NWBPydanticGenerator from nwb_linkml.generators.pydantic import NWBPydanticGenerator
from nwb_linkml.map import apply_preload from nwb_linkml.maps.postload import apply_preload
from nwb_linkml.adapters import SchemaAdapter, NamespacesAdapter from nwb_linkml.adapters import SchemaAdapter, NamespacesAdapter
#from nwb_linkml.models import core, hdmf_common #from nwb_linkml.models import core, hdmf_common

View file

@ -2,11 +2,11 @@ import pdb
import pytest import pytest
from pathlib import Path from pathlib import Path
from nwb_linkml.translate import generate_from_nwbfile #from nwb_linkml.translate import generate_from_nwbfile
def test_generate_pydantic(): # def test_generate_pydantic():
# pass until we rig up smaller test data # # pass until we rig up smaller test data
pass # pass
#NWBFILE = Path('/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773.nwb') #NWBFILE = Path('/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773.nwb')