continuing manual ruff fixes

This commit is contained in:
sneakers-the-rat 2024-07-01 22:58:55 -07:00
parent 3768e3ce0d
commit 084bceaa2e
Signed by untrusted user who does not match committer: jonny
GPG key ID: 6DCB96EF1E4D232D
15 changed files with 272 additions and 124 deletions

View file

@ -135,12 +135,15 @@ class Adapter(BaseModel):
# do nothing, is a string or whatever # do nothing, is a string or whatever
pass pass
def walk_fields(self, input: BaseModel | list | dict, field: str | Tuple[str, ...]): def walk_fields(
self, input: BaseModel | list | dict, field: str | Tuple[str, ...]
) -> Generator[Any, None, None]:
""" """
Recursively walk input for fields that match ``field`` Recursively walk input for fields that match ``field``
Args: Args:
input (:class:`pydantic.BaseModel`) : Model to walk (or a list or dictionary to walk too) input (:class:`pydantic.BaseModel`) : Model to walk (or a list or dictionary
to walk too)
field (str, Tuple[str, ...]): field (str, Tuple[str, ...]):
Returns: Returns:
@ -156,19 +159,20 @@ class Adapter(BaseModel):
self, input: BaseModel | list | dict, field: str, value: Optional[Any] = None self, input: BaseModel | list | dict, field: str, value: Optional[Any] = None
) -> Generator[BaseModel, None, None]: ) -> Generator[BaseModel, None, None]:
""" """
Recursively walk input for **models** that contain a ``field`` as a direct child with a value matching ``value`` Recursively walk input for **models** that contain a ``field`` as a direct child
with a value matching ``value``
Args: Args:
input (:class:`pydantic.BaseModel`): Model to walk input (:class:`pydantic.BaseModel`): Model to walk
field (str): Name of field - unlike :meth:`.walk_fields`, only one field can be given field (str): Name of field - unlike :meth:`.walk_fields`, only one field can be given
value (Any): Value to match for given field. If ``None`` , return models that have the field value (Any): Value to match for given field. If ``None`` ,
return models that have the field
Returns: Returns:
:class:`pydantic.BaseModel` the matching model :class:`pydantic.BaseModel` the matching model
""" """
for item in self.walk(input): for item in self.walk(input):
if isinstance(item, BaseModel): if isinstance(item, BaseModel) and field in item.model_fields:
if field in item.model_fields:
if value is None: if value is None:
yield item yield item
field_value = item.model_dump().get(field, None) field_value = item.model_dump().get(field, None)
@ -178,6 +182,13 @@ class Adapter(BaseModel):
def walk_types( def walk_types(
self, input: BaseModel | list | dict, get_type: Type[T] | Tuple[Type[T], Type[Unpack[Ts]]] self, input: BaseModel | list | dict, get_type: Type[T] | Tuple[Type[T], Type[Unpack[Ts]]]
) -> Generator[T | Ts, None, None]: ) -> Generator[T | Ts, None, None]:
"""
Walk a model, yielding items that are the same type as the given type
Args:
input (:class:`pydantic.BaseModel`, list, dict): Object to yield from
get_type (:class:`~typing.Type`, tuple[:class:`~typing.Type`]): Type to match
"""
if not isinstance(get_type, (list, tuple)): if not isinstance(get_type, (list, tuple)):
get_type = [get_type] get_type = [get_type]

View file

@ -40,15 +40,19 @@ class ClassAdapter(Adapter):
If the class has no parent, then... If the class has no parent, then...
* Its name is inferred from its `neurodata_type_def`, fixed name, or `neurodata_type_inc` in that order * Its name is inferred from its `neurodata_type_def`, fixed name, or
`neurodata_type_inc` in that order
* It is just built as normal class! * It is just built as normal class!
* It will be indicated as a ``tree_root`` (which will primarily be used to invert the translation for write operations) * It will be indicated as a ``tree_root`` (which will primarily be used to invert the
translation for write operations)
If the class has a parent, then... If the class has a parent, then...
* If it has a `neurodata_type_def` or `inc`, that will be used as its name, otherwise concatenate `parent__child`, * If it has a `neurodata_type_def` or `inc`, that will be used as its name,
otherwise concatenate `parent__child`,
eg. ``TimeSeries__TimeSeriesData`` eg. ``TimeSeries__TimeSeriesData``
* A slot will also be made and returned with the BuildResult, which the parent will then have as one of its attributes. * A slot will also be made and returned with the BuildResult,
which the parent will then have as one of its attributes.
""" """
# Build this class # Build this class
@ -83,6 +87,15 @@ class ClassAdapter(Adapter):
return res return res
def build_attrs(self, cls: Dataset | Group) -> List[SlotDefinition]: def build_attrs(self, cls: Dataset | Group) -> List[SlotDefinition]:
"""
Pack the class attributes into a list of SlotDefinitions
Args:
cls: (:class:`.Dataset` | :class:`.Group`): Class to pack
Returns:
list[:class:`.SlotDefinition`]
"""
attrs = [ attrs = [
SlotDefinition( SlotDefinition(
name=attr.name, name=attr.name,
@ -153,6 +166,15 @@ class ClassAdapter(Adapter):
@classmethod @classmethod
def handle_dtype(cls, dtype: DTypeType | None) -> str: def handle_dtype(cls, dtype: DTypeType | None) -> str:
"""
Get the string form of a dtype
Args:
dtype (:class:`.DTypeType`): Dtype to stringify
Returns:
str
"""
if isinstance(dtype, ReferenceDtype): if isinstance(dtype, ReferenceDtype):
return dtype.target_type return dtype.target_type
elif dtype is None or dtype == []: elif dtype is None or dtype == []:

View file

@ -16,10 +16,16 @@ from nwb_schema_language import Dataset
class DatasetMap(Map): class DatasetMap(Map):
"""
Abstract builder class for dataset elements
"""
@classmethod @classmethod
@abstractmethod @abstractmethod
def check(c, cls: Dataset) -> bool: def check(c, cls: Dataset) -> bool:
"""
Check if this map applies
"""
pass # pragma: no cover pass # pragma: no cover
@classmethod @classmethod
@ -27,6 +33,9 @@ class DatasetMap(Map):
def apply( def apply(
c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None
) -> BuildResult: ) -> BuildResult:
"""
Apply this mapping
"""
pass # pragma: no cover pass # pragma: no cover
@ -94,22 +103,22 @@ class MapScalar(DatasetMap):
- ``str`` - ``str``
""" """
if ( return (
cls.neurodata_type_inc != "VectorData" cls.neurodata_type_inc != "VectorData"
and not cls.neurodata_type_inc and not cls.neurodata_type_inc
and not cls.attributes and not cls.attributes
and not cls.dims and not cls.dims
and not cls.shape and not cls.shape
and cls.name and cls.name
): )
return True
else:
return False
@classmethod @classmethod
def apply( def apply(
c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None
) -> BuildResult: ) -> BuildResult:
"""
Map to a scalar value
"""
this_slot = SlotDefinition( this_slot = SlotDefinition(
name=cls.name, name=cls.name,
description=cls.doc, description=cls.doc,
@ -147,22 +156,22 @@ class MapScalarAttributes(DatasetMap):
- ``str`` - ``str``
""" """
if ( return (
cls.neurodata_type_inc != "VectorData" cls.neurodata_type_inc != "VectorData"
and not cls.neurodata_type_inc and not cls.neurodata_type_inc
and cls.attributes and cls.attributes
and not cls.dims and not cls.dims
and not cls.shape and not cls.shape
and cls.name and cls.name
): )
return True
else:
return False
@classmethod @classmethod
def apply( def apply(
c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None
) -> BuildResult: ) -> BuildResult:
"""
Map to a scalar attribute with an adjoining "value" slot
"""
value_slot = SlotDefinition( value_slot = SlotDefinition(
name="value", range=ClassAdapter.handle_dtype(cls.dtype), required=True name="value", range=ClassAdapter.handle_dtype(cls.dtype), required=True
) )
@ -177,23 +186,26 @@ class MapListlike(DatasetMap):
@classmethod @classmethod
def check(c, cls: Dataset) -> bool: def check(c, cls: Dataset) -> bool:
"""
Check if we are a 1D dataset that isn't a normal datatype
"""
dtype = ClassAdapter.handle_dtype(cls.dtype) dtype = ClassAdapter.handle_dtype(cls.dtype)
if is_1d(cls) and dtype != "AnyType" and dtype not in flat_to_linkml.keys(): return is_1d(cls) and dtype != "AnyType" and dtype not in flat_to_linkml
return True
else:
return False
@classmethod @classmethod
def apply( def apply(
c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None
) -> BuildResult: ) -> BuildResult:
"""
Map to a list of the given class
"""
dtype = camel_to_snake(ClassAdapter.handle_dtype(cls.dtype)) dtype = camel_to_snake(ClassAdapter.handle_dtype(cls.dtype))
slot = SlotDefinition( slot = SlotDefinition(
name=dtype, name=dtype,
multivalued=True, multivalued=True,
range=ClassAdapter.handle_dtype(cls.dtype), range=ClassAdapter.handle_dtype(cls.dtype),
description=cls.doc, description=cls.doc,
required=False if cls.quantity in ("*", "?") else True, required=cls.quantity not in ("*", "?"),
) )
res.classes[0].attributes[dtype] = slot res.classes[0].attributes[dtype] = slot
return res return res
@ -209,15 +221,18 @@ class MapArraylike(DatasetMap):
@classmethod @classmethod
def check(c, cls: Dataset) -> bool: def check(c, cls: Dataset) -> bool:
if cls.name and all([cls.dims, cls.shape]) and not has_attrs(cls): """
return True Check if we're a plain array
else: """
return False return cls.name and all([cls.dims, cls.shape]) and not has_attrs(cls)
@classmethod @classmethod
def apply( def apply(
c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None
) -> BuildResult: ) -> BuildResult:
"""
Map to an array class and the adjoining slot
"""
array_class = make_arraylike(cls, name) array_class = make_arraylike(cls, name)
name = camel_to_snake(cls.name) name = camel_to_snake(cls.name)
res = BuildResult( res = BuildResult(
@ -227,7 +242,7 @@ class MapArraylike(DatasetMap):
multivalued=False, multivalued=False,
range=array_class.name, range=array_class.name,
description=cls.doc, description=cls.doc,
required=False if cls.quantity in ("*", "?") else True, required=cls.quantity not in ("*", "?"),
) )
], ],
classes=[array_class], classes=[array_class],
@ -254,22 +269,24 @@ class MapArrayLikeAttributes(DatasetMap):
@classmethod @classmethod
def check(c, cls: Dataset) -> bool: def check(c, cls: Dataset) -> bool:
"""
Check that we're an array with some additional metadata
"""
dtype = ClassAdapter.handle_dtype(cls.dtype) dtype = ClassAdapter.handle_dtype(cls.dtype)
if ( return (
all([cls.dims, cls.shape]) all([cls.dims, cls.shape])
and cls.neurodata_type_inc != "VectorData" and cls.neurodata_type_inc != "VectorData"
and has_attrs(cls) and has_attrs(cls)
and (dtype == "AnyType" or dtype in flat_to_linkml) and (dtype == "AnyType" or dtype in flat_to_linkml)
): )
return True
else:
return False
@classmethod @classmethod
def apply( def apply(
c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None
) -> BuildResult: ) -> BuildResult:
"""
Map to an arraylike class
"""
array_class = make_arraylike(cls, name) array_class = make_arraylike(cls, name)
# make a slot for the arraylike class # make a slot for the arraylike class
array_slot = SlotDefinition(name="array", range=array_class.name) array_slot = SlotDefinition(name="array", range=array_class.name)
@ -286,27 +303,30 @@ class MapArrayLikeAttributes(DatasetMap):
class Map1DVector(DatasetMap): class Map1DVector(DatasetMap):
""" """
``VectorData`` is subclassed with a name but without dims or attributes, treat this as a normal 1D array ``VectorData`` is subclassed with a name but without dims or attributes,
slot that replaces any class that would be built for this treat this as a normal 1D array slot that replaces any class that would be built for this
""" """
@classmethod @classmethod
def check(c, cls: Dataset) -> bool: def check(c, cls: Dataset) -> bool:
if ( """
Check that we're a 1d VectorData class
"""
return (
cls.neurodata_type_inc == "VectorData" cls.neurodata_type_inc == "VectorData"
and not cls.dims and not cls.dims
and not cls.shape and not cls.shape
and not cls.attributes and not cls.attributes
and cls.name and cls.name
): )
return True
else:
return False
@classmethod @classmethod
def apply( def apply(
c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None
) -> BuildResult: ) -> BuildResult:
"""
Return a simple multivalued slot
"""
this_slot = SlotDefinition( this_slot = SlotDefinition(
name=cls.name, name=cls.name,
description=cls.doc, description=cls.doc,
@ -328,21 +348,23 @@ class MapNVectors(DatasetMap):
@classmethod @classmethod
def check(c, cls: Dataset) -> bool: def check(c, cls: Dataset) -> bool:
if ( """
Check for being an unnamed multivalued vector class
"""
return (
cls.name is None cls.name is None
and cls.neurodata_type_def is None and cls.neurodata_type_def is None
and cls.neurodata_type_inc and cls.neurodata_type_inc
and cls.quantity in ("*", "+") and cls.quantity in ("*", "+")
): )
# cls.neurodata_type_inc in ('VectorIndex', 'VectorData') and \
return True
else:
return False
@classmethod @classmethod
def apply( def apply(
c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None c, cls: Dataset, res: Optional[BuildResult] = None, name: Optional[str] = None
) -> BuildResult: ) -> BuildResult:
"""
Return a slot mapping to multiple values of the type
"""
this_slot = SlotDefinition( this_slot = SlotDefinition(
name=camel_to_snake(cls.neurodata_type_inc), name=camel_to_snake(cls.neurodata_type_inc),
description=cls.doc, description=cls.doc,
@ -355,9 +377,15 @@ class MapNVectors(DatasetMap):
class DatasetAdapter(ClassAdapter): class DatasetAdapter(ClassAdapter):
"""
Orchestrator class for datasets - calls the set of applicable mapping classes
"""
cls: Dataset cls: Dataset
def build(self) -> BuildResult: def build(self) -> BuildResult:
"""
Build the base result, and then apply the applicable mappings.
"""
res = self.build_base() res = self.build_base()
# find a map to use # find a map to use
@ -377,6 +405,11 @@ class DatasetAdapter(ClassAdapter):
def make_arraylike(cls: Dataset, name: Optional[str] = None) -> ClassDefinition: def make_arraylike(cls: Dataset, name: Optional[str] = None) -> ClassDefinition:
"""
Create a containing arraylike class
This is likely deprecated so this docstring is a placeholder to satisfy the linter...
"""
# The schema language doesn't have a way of specifying a dataset/group is "abstract" # The schema language doesn't have a way of specifying a dataset/group is "abstract"
# and yet hdmf-common says you don't need a dtype if the dataset is "abstract" # and yet hdmf-common says you don't need a dtype if the dataset is "abstract"
# so.... # so....
@ -421,10 +454,7 @@ def make_arraylike(cls: Dataset, name: Optional[str] = None) -> ClassDefinition:
required = False required = False
# use cardinality to do shape # use cardinality to do shape
if shape == "null": cardinality = None if shape == "null" else shape
cardinality = None
else:
cardinality = shape
slots.append( slots.append(
SlotDefinition( SlotDefinition(
@ -436,7 +466,8 @@ def make_arraylike(cls: Dataset, name: Optional[str] = None) -> ClassDefinition:
) )
) )
# and then the class is just a subclass of `Arraylist` (which is imported by default from `nwb.language.yaml`) # and then the class is just a subclass of `Arraylist`
# (which is imported by default from `nwb.language.yaml`)
if name: if name:
pass pass
elif cls.neurodata_type_def: elif cls.neurodata_type_def:
@ -453,20 +484,20 @@ def make_arraylike(cls: Dataset, name: Optional[str] = None) -> ClassDefinition:
def is_1d(cls: Dataset) -> bool: def is_1d(cls: Dataset) -> bool:
if ( """
Check if the values of a dataset are 1-dimensional
"""
return (
not any([isinstance(dim, list) for dim in cls.dims]) and len(cls.dims) == 1 not any([isinstance(dim, list) for dim in cls.dims]) and len(cls.dims) == 1
) or ( # nested list ) or ( # nested list
all([isinstance(dim, list) for dim in cls.dims]) all([isinstance(dim, list) for dim in cls.dims])
and len(cls.dims) == 1 and len(cls.dims) == 1
and len(cls.dims[0]) == 1 and len(cls.dims[0]) == 1
): )
return True
else:
return False
def has_attrs(cls: Dataset) -> bool: def has_attrs(cls: Dataset) -> bool:
if len(cls.attributes) > 0 and all([not a.value for a in cls.attributes]): """
return True Check if a dataset has any attributes at all without defaults
else: """
return False return len(cls.attributes) > 0 and all([not a.value for a in cls.attributes])

View file

@ -13,9 +13,15 @@ from nwb_schema_language import Group
class GroupAdapter(ClassAdapter): class GroupAdapter(ClassAdapter):
"""
Adapt NWB Groups to LinkML Classes
"""
cls: Group cls: Group
def build(self) -> BuildResult: def build(self) -> BuildResult:
"""
Do the translation, yielding the BuildResult
"""
# Handle container groups with only * quantity unnamed groups # Handle container groups with only * quantity unnamed groups
if len(self.cls.groups) > 0 and all( if len(self.cls.groups) > 0 and all(
[self._check_if_container(g) for g in self.cls.groups] [self._check_if_container(g) for g in self.cls.groups]
@ -80,12 +86,7 @@ class GroupAdapter(ClassAdapter):
# don't build subgroups as their own classes, just make a slot # don't build subgroups as their own classes, just make a slot
# that can contain them # that can contain them
if self.cls.name: name = cls.name if self.cls.name else "children"
name = cls.name
# elif len(cls.groups) == 1:
# name = camel_to_snake(cls.groups[0].neurodata_type_inc)
else:
name = "children"
slot = SlotDefinition( slot = SlotDefinition(
name=name, name=name,
@ -126,10 +127,7 @@ class GroupAdapter(ClassAdapter):
doc: Optional additional table(s) for describing other experimental time intervals. doc: Optional additional table(s) for describing other experimental time intervals.
quantity: '*' quantity: '*'
""" """
if not self.cls.name: name = camel_to_snake(self.cls.neurodata_type_inc) if not self.cls.name else cls.name
name = camel_to_snake(self.cls.neurodata_type_inc)
else:
name = cls.name
return BuildResult( return BuildResult(
slots=[ slots=[
@ -163,7 +161,8 @@ class GroupAdapter(ClassAdapter):
# Groups are a bit more complicated because they can also behave like # Groups are a bit more complicated because they can also behave like
# range declarations: # range declarations:
# eg. a group can have multiple groups with `neurodata_type_inc`, no name, and quantity of *, # eg. a group can have multiple groups with `neurodata_type_inc`, no name,
# and quantity of *,
# the group can then contain any number of groups of those included types as direct children # the group can then contain any number of groups of those included types as direct children
group_res = BuildResult() group_res = BuildResult()
@ -191,7 +190,4 @@ class GroupAdapter(ClassAdapter):
doc: Images objects containing images of presented stimuli. doc: Images objects containing images of presented stimuli.
quantity: '*' quantity: '*'
""" """
if not group.name and group.quantity in ("*", "+") and group.neurodata_type_inc: return not group.name and group.quantity in ("*", "+") and group.neurodata_type_inc
return True
else:
return False

View file

@ -4,7 +4,7 @@ Namespaces adapter
Wraps the :class:`nwb_schema_language.Namespaces` and other objects with convenience methods Wraps the :class:`nwb_schema_language.Namespaces` and other objects with convenience methods
for extracting information and generating translated schema for extracting information and generating translated schema
""" """
import contextlib
from copy import copy from copy import copy
from pathlib import Path from pathlib import Path
from pprint import pformat from pprint import pformat
@ -22,14 +22,17 @@ from nwb_schema_language import Namespaces
class NamespacesAdapter(Adapter): class NamespacesAdapter(Adapter):
"""
Translate a NWB Namespace to a LinkML Schema
"""
namespaces: Namespaces namespaces: Namespaces
schemas: List[SchemaAdapter] schemas: List[SchemaAdapter]
imported: List["NamespacesAdapter"] = Field(default_factory=list) imported: List["NamespacesAdapter"] = Field(default_factory=list)
_imports_populated: bool = PrivateAttr(False) _imports_populated: bool = PrivateAttr(False)
def __init__(self, **kwargs): def __init__(self, **kwargs: dict):
super(NamespacesAdapter, self).__init__(**kwargs) super().__init__(**kwargs)
self._populate_schema_namespaces() self._populate_schema_namespaces()
@classmethod @classmethod
@ -37,8 +40,8 @@ class NamespacesAdapter(Adapter):
""" """
Create a NamespacesAdapter from a nwb schema language namespaces yaml file. Create a NamespacesAdapter from a nwb schema language namespaces yaml file.
Also attempts to provide imported implicitly imported schema (using the namespace key, rather than source, eg. Also attempts to provide imported implicitly imported schema (using the namespace key,
with hdmf-common) rather than source, eg. with hdmf-common)
""" """
from nwb_linkml.io import schema as schema_io from nwb_linkml.io import schema as schema_io
from nwb_linkml.providers.git import DEFAULT_REPOS from nwb_linkml.providers.git import DEFAULT_REPOS
@ -49,10 +52,10 @@ class NamespacesAdapter(Adapter):
need_imports = [] need_imports = []
for needed in ns_adapter.needed_imports.values(): for needed in ns_adapter.needed_imports.values():
need_imports.extend([n for n in needed if n not in ns_adapter.needed_imports.keys()]) need_imports.extend([n for n in needed if n not in ns_adapter.needed_imports])
for needed in need_imports: for needed in need_imports:
if needed in DEFAULT_REPOS.keys(): if needed in DEFAULT_REPOS:
needed_source_ns = DEFAULT_REPOS[needed].provide_from_git() needed_source_ns = DEFAULT_REPOS[needed].provide_from_git()
needed_adapter = NamespacesAdapter.from_yaml(needed_source_ns) needed_adapter = NamespacesAdapter.from_yaml(needed_source_ns)
ns_adapter.imported.append(needed_adapter) ns_adapter.imported.append(needed_adapter)
@ -62,24 +65,23 @@ class NamespacesAdapter(Adapter):
def build( def build(
self, skip_imports: bool = False, progress: Optional[AdapterProgress] = None self, skip_imports: bool = False, progress: Optional[AdapterProgress] = None
) -> BuildResult: ) -> BuildResult:
"""
Build the NWB namespace to the LinkML Schema
"""
if not self._imports_populated and not skip_imports: if not self._imports_populated and not skip_imports:
self.populate_imports() self.populate_imports()
sch_result = BuildResult() sch_result = BuildResult()
for sch in self.schemas: for sch in self.schemas:
if progress is not None: if progress is not None:
try: with contextlib.suppress(KeyError):
progress.update(sch.namespace, action=sch.name)
except KeyError: # pragma: no cover
# happens when we skip builds due to caching # happens when we skip builds due to caching
pass progress.update(sch.namespace, action=sch.name)
sch_result += sch.build() sch_result += sch.build()
if progress is not None: if progress is not None:
try: with contextlib.suppress(KeyError):
progress.update(sch.namespace, advance=1)
except KeyError: # pragma: no cover
# happens when we skip builds due to caching # happens when we skip builds due to caching
pass progress.update(sch.namespace, advance=1)
# recursive step # recursive step
if not skip_imports: if not skip_imports:
@ -125,8 +127,10 @@ class NamespacesAdapter(Adapter):
return sch_result return sch_result
def _populate_schema_namespaces(self): def _populate_schema_namespaces(self) -> None:
# annotate for each schema which namespace imports it """
annotate for each schema which namespace imports it
"""
for sch in self.schemas: for sch in self.schemas:
# imports seem to always be from same folder, so we can just use name part # imports seem to always be from same folder, so we can just use name part
sch_name = sch.path.name sch_name = sch.path.name
@ -154,7 +158,7 @@ class NamespacesAdapter(Adapter):
if len(internal_matches) > 1: if len(internal_matches) > 1:
raise KeyError( raise KeyError(
f"Found multiple schemas in namespace that define {name}:\ninternal:" f"Found multiple schemas in namespace that define {name}:\ninternal:"
f" {pformat(internal_matches)}\nimported:{pformat(import_matches)}" f" {pformat(internal_matches)}\nimported:{pformat(internal_matches)}"
) )
elif len(internal_matches) == 1: elif len(internal_matches) == 1:
return internal_matches[0] return internal_matches[0]
@ -176,7 +180,7 @@ class NamespacesAdapter(Adapter):
else: else:
raise KeyError(f"No schema found that define {name}") raise KeyError(f"No schema found that define {name}")
def populate_imports(self): def populate_imports(self) -> None:
""" """
Populate the imports that are needed for each schema file Populate the imports that are needed for each schema file
@ -199,7 +203,14 @@ class NamespacesAdapter(Adapter):
self._imports_populated = True self._imports_populated = True
def to_yaml(self, base_dir: Path): def to_yaml(self, base_dir: Path) -> None:
"""
Build the schemas, saving them to ``yaml`` files according to
their ``name``
Args:
base_dir (:class:`.Path`): Directory to save ``yaml`` files
"""
schemas = self.build().schemas schemas = self.build().schemas
base_dir = Path(base_dir) base_dir = Path(base_dir)

View file

@ -4,7 +4,7 @@ to call them "schema" objects
""" """
from pathlib import Path from pathlib import Path
from typing import List, NamedTuple, Optional, Type from typing import List, Optional, Type
from linkml_runtime.linkml_model import SchemaDefinition from linkml_runtime.linkml_model import SchemaDefinition
from pydantic import Field, PrivateAttr from pydantic import Field, PrivateAttr
@ -15,11 +15,6 @@ from nwb_linkml.adapters.group import GroupAdapter
from nwb_schema_language import Dataset, Group from nwb_schema_language import Dataset, Group
class SplitSchema(NamedTuple):
main: BuildResult
split: Optional[BuildResult]
class SchemaAdapter(Adapter): class SchemaAdapter(Adapter):
""" """
An individual schema file in nwb_schema_language An individual schema file in nwb_schema_language
@ -43,6 +38,9 @@ class SchemaAdapter(Adapter):
@property @property
def name(self) -> str: def name(self) -> str:
"""
The namespace.schema name for a single schema
"""
return ".".join([self.namespace, self.path.with_suffix("").name]) return ".".join([self.namespace, self.path.with_suffix("").name])
def __repr__(self): def __repr__(self):
@ -82,7 +80,7 @@ class SchemaAdapter(Adapter):
if ( if (
len(res.slots) > 0 len(res.slots) > 0
): # pragma: no cover - hard to induce this error because the child classes don't fuck up like this ): # pragma: no cover - hard to induce this because child classes don't fuck up like this
raise RuntimeError( raise RuntimeError(
"Generated schema in this translation can only have classes, all slots should be" "Generated schema in this translation can only have classes, all slots should be"
" attributes within a class" " attributes within a class"
@ -107,6 +105,9 @@ class SchemaAdapter(Adapter):
@property @property
def created_classes(self) -> List[Type[Group | Dataset]]: def created_classes(self) -> List[Type[Group | Dataset]]:
"""
All the group and datasets created in this schema
"""
if len(self._created_classes) == 0: if len(self._created_classes) == 0:
self._created_classes = [ self._created_classes = [
t t

View file

@ -3,10 +3,15 @@ Utility functions for introspection on python annotations
""" """
import typing import typing
from typing import Any, List from typing import Any, List, Optional, Type, TypeVar
T = TypeVar('T')
def unwrap_optional(annotation): def unwrap_optional(annotation: Type[Optional[T]]) -> Type[T]:
"""
Get the inner type of an `Optional[T]` type
"""
if typing.get_origin(annotation) == typing.Union: if typing.get_origin(annotation) == typing.Union:
args = typing.get_args(annotation) args = typing.get_args(annotation)
@ -15,7 +20,10 @@ def unwrap_optional(annotation):
return annotation return annotation
def get_inner_types(annotation) -> List[Any]: def get_inner_types(annotation: Type) -> List[Any]:
"""
Get the inner types in some nested type, recursively
"""
types = [] types = []
args = typing.get_args(annotation) args = typing.get_args(annotation)
for arg in args: for arg in args:

View file

@ -54,6 +54,9 @@ class Config(BaseSettings):
@field_validator("cache_dir", mode="before") @field_validator("cache_dir", mode="before")
@classmethod @classmethod
def folder_exists(cls, v: Path, info: FieldValidationInfo) -> Path: def folder_exists(cls, v: Path, info: FieldValidationInfo) -> Path:
"""
The base cache dir should exist before validating other paths
"""
v = Path(v) v = Path(v)
v.mkdir(exist_ok=True) v.mkdir(exist_ok=True)
assert v.exists() assert v.exists()
@ -61,7 +64,10 @@ class Config(BaseSettings):
@model_validator(mode="after") @model_validator(mode="after")
def folders_exist(self) -> "Config": def folders_exist(self) -> "Config":
for field, path in self.model_dump().items(): """
All folders, including computed folders, should exist.
"""
for path in self.model_dump().values():
if isinstance(path, Path): if isinstance(path, Path):
path.mkdir(exist_ok=True, parents=True) path.mkdir(exist_ok=True, parents=True)
assert path.exists() assert path.exists()

View file

@ -1 +1,12 @@
"""
Generate downstream output from LinkML schema
Mostly for monkeypatching the pydantic generator from linkml with
changes that are unlikely to be useful upstream
"""
from nwb_linkml.generators.pydantic import PydanticGenerator from nwb_linkml.generators.pydantic import PydanticGenerator
__all__ = [
'PydanticGenerator'
]

View file

@ -25,6 +25,9 @@ The `serialize` method:
""" """
# FIXME: Remove this after we refactor this generator
# ruff: noqa
import inspect import inspect
import sys import sys
import warnings import warnings

View file

@ -1,2 +1,11 @@
"""
Loading and dumping data from and to files
"""
from nwb_linkml.io import schema from nwb_linkml.io import schema
from nwb_linkml.io.hdf5 import HDF5IO from nwb_linkml.io.hdf5 import HDF5IO
__all__ = [
"HDF5IO",
"schema"
]

View file

@ -1,5 +1,6 @@
""" """
This is a sandbox file that should be split out to its own pydantic-hdf5 package, but just experimenting here to get our bearings This is a sandbox file that should be split out to its own pydantic-hdf5 package,
but just experimenting here to get our bearings
Notes: Notes:
@ -42,6 +43,9 @@ from nwb_linkml.providers.schema import SchemaProvider
class HDF5IO: class HDF5IO:
"""
Read (and eventually write) from an NWB HDF5 file.
"""
def __init__(self, path: Path): def __init__(self, path: Path):
self.path = Path(path) self.path = Path(path)
@ -59,7 +63,8 @@ class HDF5IO:
The read process is in several stages: The read process is in several stages:
* Use :meth:`.make_provider` to generate any needed LinkML Schema or Pydantic Classes using a :class:`.SchemaProvider` * Use :meth:`.make_provider` to generate any needed LinkML Schema or Pydantic Classes
using a :class:`.SchemaProvider`
* :func:`flatten_hdf` file into a :class:`.ReadQueue` of nodes. * :func:`flatten_hdf` file into a :class:`.ReadQueue` of nodes.
* Apply the queue's :class:`ReadPhases` : * Apply the queue's :class:`ReadPhases` :
@ -67,24 +72,29 @@ class HDF5IO:
* ``read`` - load the actual data into temporary holding objects * ``read`` - load the actual data into temporary holding objects
* ``construct`` - cast the read data into models. * ``construct`` - cast the read data into models.
Read is split into stages like this to handle references between objects, where the read result of one node Read is split into stages like this to handle references between objects,
might depend on another having already been completed. It also allows us to parallelize the operations where the read result of one node
might depend on another having already been completed.
It also allows us to parallelize the operations
since each mapping operation is independent of the results of all the others in that pass. since each mapping operation is independent of the results of all the others in that pass.
.. todo:: .. todo::
Implement reading, skipping arrays - they are fast to read with the ArrayProxy class Implement reading, skipping arrays - they are fast to read with the ArrayProxy class
and dask, but there are times when we might want to leave them out of the read entirely. and dask, but there are times when we might want to leave them out of the read entirely.
This might be better implemented as a filter on ``model_dump`` , but to investigate further This might be better implemented as a filter on ``model_dump`` ,
how best to support reading just metadata, or even some specific field value, or if but to investigate further how best to support reading just metadata,
or even some specific field value, or if
we should leave that to other implementations like eg. after we do SQL export then we should leave that to other implementations like eg. after we do SQL export then
not rig up a whole query system ourselves. not rig up a whole query system ourselves.
Args: Args:
path (Optional[str]): If ``None`` (default), read whole file. Otherwise, read from specific (hdf5) path and its children path (Optional[str]): If ``None`` (default), read whole file.
Otherwise, read from specific (hdf5) path and its children
Returns: Returns:
``NWBFile`` if ``path`` is ``None``, otherwise whatever Model or dictionary of models applies to the requested ``path`` ``NWBFile`` if ``path`` is ``None``,
otherwise whatever Model or dictionary of models applies to the requested ``path``
""" """
provider = self.make_provider() provider = self.make_provider()

View file

@ -1,5 +1,18 @@
# Import everything so it's defined, but shoudlnt' necessarily be used from here """
Mapping from one domain to another
"""
from nwb_linkml.maps.dtype import flat_to_linkml, flat_to_nptyping from nwb_linkml.maps.dtype import flat_to_linkml, flat_to_nptyping
from nwb_linkml.maps.map import Map from nwb_linkml.maps.map import Map
from nwb_linkml.maps.postload import MAP_HDMF_DATATYPE_DEF, MAP_HDMF_DATATYPE_INC from nwb_linkml.maps.postload import MAP_HDMF_DATATYPE_DEF, MAP_HDMF_DATATYPE_INC
from nwb_linkml.maps.quantity import QUANTITY_MAP from nwb_linkml.maps.quantity import QUANTITY_MAP
__all__ = [
"MAP_HDMF_DATATYPE_DEF",
"MAP_HDMF_DATATYPE_INC",
"QUANTITY_MAP",
"Map",
"flat_to_linkml",
"flat_to_nptyping"
]

View file

@ -16,8 +16,24 @@ try:
) )
DTypeType = Union[List[CompoundDtype], FlatDtype, ReferenceDtype] DTypeType = Union[List[CompoundDtype], FlatDtype, ReferenceDtype]
except (NameError, RecursionError): except (NameError, RecursionError):
warnings.warn( warnings.warn(
"Error importing pydantic classes, passing because we might be in the process of patching" "Error importing pydantic classes, passing because we might be in the process of patching"
" them, but it is likely they are broken and you will be unable to use them!" " them, but it is likely they are broken and you will be unable to use them!"
) )
__all__ = [
"Attribute",
"CompoundDtype",
"Dataset",
"DTypeType",
"FlatDtype",
"Group",
"Link",
"Namespace",
"Namespaces",
"ReferenceDtype",
"Schema",
]

View file

@ -70,7 +70,7 @@ ignore = [
fixable = ["ALL"] fixable = ["ALL"]
[tool.ruff.lint.per-file-ignores] [tool.ruff.lint.per-file-ignores]
"**/tests/**.py" = ["D", "ANN"] "**/tests/**.py" = ["D", "ANN", "E501", "F841", "F722"]
[tool.mypy] [tool.mypy]
plugins = [ plugins = [