All ruff and black fixes done

This commit is contained in:
sneakers-the-rat 2024-07-01 23:52:32 -07:00
parent 7c6e69c87e
commit f50275473c
Signed by untrusted user who does not match committer: jonny
GPG key ID: 6DCB96EF1E4D232D
30 changed files with 339 additions and 163 deletions

View file

@ -380,6 +380,7 @@ class DatasetAdapter(ClassAdapter):
"""
Orchestrator class for datasets - calls the set of applicable mapping classes
"""
cls: Dataset
def build(self) -> BuildResult:

View file

@ -16,6 +16,7 @@ class GroupAdapter(ClassAdapter):
"""
Adapt NWB Groups to LinkML Classes
"""
cls: Group
def build(self) -> BuildResult:

View file

@ -4,6 +4,7 @@ Namespaces adapter
Wraps the :class:`nwb_schema_language.Namespaces` and other objects with convenience methods
for extracting information and generating translated schema
"""
import contextlib
from copy import copy
from pathlib import Path
@ -25,6 +26,7 @@ class NamespacesAdapter(Adapter):
"""
Translate a NWB Namespace to a LinkML Schema
"""
namespaces: Namespaces
schemas: List[SchemaAdapter]
imported: List["NamespacesAdapter"] = Field(default_factory=list)

View file

@ -5,7 +5,7 @@ Utility functions for introspection on python annotations
import typing
from typing import Any, List, Optional, Type, TypeVar
T = TypeVar('T')
T = TypeVar("T")
def unwrap_optional(annotation: Type[Optional[T]]) -> Type[T]:

View file

@ -7,6 +7,4 @@ changes that are unlikely to be useful upstream
from nwb_linkml.generators.pydantic import PydanticGenerator
__all__ = [
'PydanticGenerator'
]
__all__ = ["PydanticGenerator"]

View file

@ -5,7 +5,4 @@ Loading and dumping data from and to files
from nwb_linkml.io import schema
from nwb_linkml.io.hdf5 import HDF5IO
__all__ = [
"HDF5IO",
"schema"
]
__all__ = ["HDF5IO", "schema"]

View file

@ -135,7 +135,8 @@ class HDF5IO:
Need to create inverse mappings that can take pydantic models to
hdf5 groups and datasets. If more metadata about the generation process
needs to be preserved (eg. explicitly notating that something is an attribute,
dataset, group, then we can make use of the :class:`~nwb_linkml.generators.pydantic.LinkML_Meta`
dataset, group, then we can make use of the
:class:`~nwb_linkml.generators.pydantic.LinkML_Meta`
model. If the model to edit has been loaded from an HDF5 file (rather than
freshly created), then the ``hdf5_path`` should be populated making
mapping straightforward, but we probably want to generalize that to deterministically
@ -165,7 +166,8 @@ class HDF5IO:
# get versions for each namespace
versions = {}
for ns_schema in schema.values():
# each "namespace" can actually contain multiple namespaces which actually contain the version info
# each "namespace" can actually contain multiple namespaces
# which actually contain the version info
for inner_ns in ns_schema["namespace"]["namespaces"]:
versions[inner_ns["name"]] = inner_ns["version"]
@ -190,10 +192,10 @@ def read_specs_as_dicts(group: h5py.Group) -> dict:
"""
spec_dict = {}
def _read_spec(name, node) -> None:
def _read_spec(name: str, node: h5py.Dataset) -> None:
if isinstance(node, h5py.Dataset):
# make containing dict if they dont exist
# make containing dict if they don't exist
pieces = node.name.split("/")
if pieces[-3] not in spec_dict:
spec_dict[pieces[-3]] = {}
@ -230,7 +232,7 @@ def find_references(h5f: h5py.File, path: str) -> List[str]:
"""
references = []
def _find_references(name, obj: h5py.Group | h5py.Dataset) -> None:
def _find_references(name: str, obj: h5py.Group | h5py.Dataset) -> None:
pbar.update()
refs = []
for attr in obj.attrs.values():
@ -271,8 +273,10 @@ def truncate_file(source: Path, target: Optional[Path] = None, n: int = 10) -> P
Args:
source (:class:`pathlib.Path`): Source hdf5 file
target (:class:`pathlib.Path`): Optional - target hdf5 file to write to. If ``None``, use ``{source}_truncated.hdf5``
n (int): The number of items from datasets (samples along the 0th dimension of a dataset) to include
target (:class:`pathlib.Path`): Optional - target hdf5 file to write to.
If ``None``, use ``{source}_truncated.hdf5``
n (int): The number of items from datasets
(samples along the 0th dimension of a dataset) to include
Returns:
:class:`pathlib.Path` path of the truncated file
@ -308,7 +312,8 @@ def truncate_file(source: Path, target: Optional[Path] = None, n: int = 10) -> P
try:
obj.resize(n, axis=0)
except TypeError:
# contiguous arrays can't be trivially resized, so we have to copy and create a new dataset
# contiguous arrays can't be trivially resized,
# so we have to copy and create a new dataset
tmp_name = obj.name + "__tmp"
original_name = obj.name
obj.parent.move(obj.name, tmp_name)
@ -324,7 +329,8 @@ def truncate_file(source: Path, target: Optional[Path] = None, n: int = 10) -> P
# use h5repack to actually remove the items from the dataset
if shutil.which("h5repack") is None:
warnings.warn(
"Truncated file made, but since h5repack not found in path, file won't be any smaller"
"Truncated file made, but since h5repack not found in path, file won't be any smaller",
stacklevel=2,
)
return target
@ -333,7 +339,7 @@ def truncate_file(source: Path, target: Optional[Path] = None, n: int = 10) -> P
["h5repack", "-f", "GZIP=9", str(target), str(target_tmp)], capture_output=True
)
if res.returncode != 0:
warnings.warn(f"h5repack did not return 0: {res.stderr} {res.stdout}")
warnings.warn(f"h5repack did not return 0: {res.stderr} {res.stdout}", stacklevel=2)
# remove the attempt at the repack
target_tmp.unlink()
return target

View file

@ -17,6 +17,9 @@ from nwb_schema_language import Dataset, Group, Namespaces
def load_yaml(path: Path | str) -> dict:
"""
Load yaml file from file, applying postload modifications
"""
if isinstance(path, str) and not Path(path).exists():
ns_dict = yaml.safe_load(path)
else:
@ -86,8 +89,10 @@ def load_namespace_adapter(
Args:
namespace (:class:`:class:`.Namespace`):
path (:class:`pathlib.Path`): Optional: Location of the namespace file - all relative paths are interpreted relative to this
version (str): Optional: tag or commit to check out namespace is a :class:`.NamespaceRepo`. If ``None``, use ``HEAD`` if not already checked out,
path (:class:`pathlib.Path`): Optional: Location of the namespace file -
all relative paths are interpreted relative to this
version (str): Optional: tag or commit to check out namespace is a
:class:`.NamespaceRepo`. If ``None``, use ``HEAD`` if not already checked out,
or otherwise use whatever version is already checked out.
Returns:
@ -128,16 +133,18 @@ def load_namespace_adapter(
return adapter
def load_nwb_core(core_version="2.6.0", hdmf_version="1.5.0") -> NamespacesAdapter:
def load_nwb_core(core_version: str = "2.6.0", hdmf_version: str = "1.5.0") -> NamespacesAdapter:
"""
Convenience function for loading the NWB core schema + hdmf-common as a namespace adapter.
.. note::
NWB Core schema are implicitly linked to a specific version of HDMF common by virtue of which version
NWB Core schema are implicitly linked to a specific version of HDMF common by
virtue of which version
of `hdmf-common-schema` is checked out as a submodule in the repository. We don't
attempt to resolve that linkage here because it's not in the schema, but the defaults
are for the latest nwb core ( ``'2.6.0'`` ) and its linked hdmf-common version ( ``'1.5.0'`` )
are for the latest nwb core ( ``'2.6.0'`` ) and its linked hdmf-common version
( ``'1.5.0'`` )
Args:
core_version (str): an entry in :attr:`.NWB_CORE_REPO.versions`

View file

@ -13,6 +13,5 @@ __all__ = [
"QUANTITY_MAP",
"Map",
"flat_to_linkml",
"flat_to_nptyping"
"flat_to_nptyping",
]

View file

@ -1,3 +1,7 @@
"""
Dtype mappings
"""
from datetime import datetime
from typing import Any, Type

View file

@ -5,6 +5,10 @@ We have sort of diverged from the initial idea of a generalized map as in :class
so we will make our own mapping class here and re-evaluate whether they should be unified later
"""
# FIXME: return and document whatever is left of this godforsaken module after refactoring
# ruff: noqa: D102
# ruff: noqa: D101
import contextlib
import datetime
import inspect
@ -45,11 +49,17 @@ class H5SourceItem(BaseModel):
h5f_path: str
"""Path to the source hdf5 file"""
leaf: bool
"""If ``True``, this item has no children (and thus we should start instantiating it before ascending to parent classes)"""
"""
If ``True``, this item has no children
(and thus we should start instantiating it before ascending to parent classes)
"""
h5_type: Literal["group", "dataset"]
"""What kind of hdf5 element this is"""
depends: List[str] = Field(default_factory=list)
"""Paths of other source items that this item depends on before it can be instantiated. eg. from softlinks"""
"""
Paths of other source items that this item depends on before it can be instantiated.
eg. from softlinks
"""
attrs: dict = Field(default_factory=dict)
"""Any static attrs that can be had from the element"""
namespace: Optional[str] = None
@ -159,7 +169,8 @@ class HDF5Map(Map):
def check_empty(obj: h5py.Group) -> bool:
"""
Check if a group has no attrs or children OR has no attrs and all its children also have no attrs and no children
Check if a group has no attrs or children OR has no attrs and all its children
also have no attrs and no children
Returns:
bool
@ -216,12 +227,14 @@ class ResolveDynamicTable(HDF5Map):
"""
Handle loading a dynamic table!
Dynamic tables are sort of odd in that their models don't include their fields (except as a list of
strings in ``colnames`` ), so we need to create a new model that includes fields for each column,
and then we include the datasets as :class:`~.nwb_linkml.types.ndarray.NDArrayProxy` objects which
lazy load the arrays in a thread/process safe way.
Dynamic tables are sort of odd in that their models don't include their fields
(except as a list of strings in ``colnames`` ),
so we need to create a new model that includes fields for each column,
and then we include the datasets as :class:`~.nwb_linkml.types.ndarray.NDArrayProxy`
objects which lazy load the arrays in a thread/process safe way.
This map also resolves the child elements, indicating so by the ``completes`` field in the :class:`.ReadResult`
This map also resolves the child elements,
indicating so by the ``completes`` field in the :class:`.ReadResult`
"""
phase = ReadPhases.read
@ -272,8 +285,8 @@ class ResolveDynamicTable(HDF5Map):
class ResolveModelGroup(HDF5Map):
"""
HDF5 Groups that have a model, as indicated by ``neurodata_type`` in their attrs.
We use the model to determine what fields we should get, and then stash references to the children to
process later as :class:`.HDF5_Path`
We use the model to determine what fields we should get, and then stash references
to the children to process later as :class:`.HDF5_Path`
**Special Case:** Some groups like ``ProcessingGroup`` and others that have an arbitrary
number of named children have a special ``children`` field that is a dictionary mapping
@ -305,9 +318,9 @@ class ResolveModelGroup(HDF5Map):
}
}
We will do some nice things in the model metaclass to make it possible to access the children like
``nwbfile.processing.cr_ellipse_fits.center_x`` rather than having to switch between indexing and
attribute access :)
We will do some nice things in the model metaclass to make it possible to access the children
like ``nwbfile.processing.cr_ellipse_fits.center_x``
rather than having to switch between indexing and attribute access :)
"""
phase = ReadPhases.read
@ -328,7 +341,7 @@ class ResolveModelGroup(HDF5Map):
depends = []
with h5py.File(src.h5f_path, "r") as h5f:
obj = h5f.get(src.path)
for key in model.model_fields.keys():
for key in model.model_fields:
if key == "children":
res[key] = {name: resolve_hardlink(child) for name, child in obj.items()}
depends.extend([resolve_hardlink(child) for child in obj.values()])
@ -361,7 +374,8 @@ class ResolveModelGroup(HDF5Map):
class ResolveDatasetAsDict(HDF5Map):
"""
Resolve datasets that do not have a ``neurodata_type`` of their own as a dictionary
that will be packaged into a model in the next step. Grabs the array in an :class:`~nwb_linkml.types.ndarray.NDArrayProxy`
that will be packaged into a model in the next step. Grabs the array in an
:class:`~nwb_linkml.types.ndarray.NDArrayProxy`
under an ``array`` key, and then grabs any additional ``attrs`` as well.
Mutually exclusive with :class:`.ResolveScalars` - this only applies to datasets that are larger
@ -522,7 +536,12 @@ class CompleteContainerGroups(HDF5Map):
def check(
cls, src: H5ReadResult, provider: SchemaProvider, completed: Dict[str, H5ReadResult]
) -> bool:
return (src.model is None and src.neurodata_type is None and src.source.h5_type == "group" and all([depend in completed for depend in src.depends]))
return (
src.model is None
and src.neurodata_type is None
and src.source.h5_type == "group"
and all([depend in completed for depend in src.depends])
)
@classmethod
def apply(
@ -546,7 +565,12 @@ class CompleteModelGroups(HDF5Map):
def check(
cls, src: H5ReadResult, provider: SchemaProvider, completed: Dict[str, H5ReadResult]
) -> bool:
return (src.model is not None and src.source.h5_type == "group" and src.neurodata_type != "NWBFile" and all([depend in completed for depend in src.depends]))
return (
src.model is not None
and src.source.h5_type == "group"
and src.neurodata_type != "NWBFile"
and all([depend in completed for depend in src.depends])
)
@classmethod
def apply(
@ -562,9 +586,10 @@ class CompleteModelGroups(HDF5Map):
# but whose attributes are fixed (and thus should just be an array, rather than a subclass)
for k, v in src.model.model_fields.items():
annotation = unwrap_optional(v.annotation)
if inspect.isclass(annotation) and not issubclass(annotation, BaseModel):
if (
isinstance(res, dict)
inspect.isclass(annotation)
and not issubclass(annotation, BaseModel)
and isinstance(res, dict)
and k in res
and isinstance(res[k], dict)
and "array" in res[k]
@ -592,7 +617,8 @@ class CompleteNWBFile(HDF5Map):
.. todo::
This is truly hideous, just meant as a way to get to the finish line on a late night, will be cleaned up later
This is truly hideous, just meant as a way to get to the finish line on a late night,
will be cleaned up later
"""
@ -603,7 +629,9 @@ class CompleteNWBFile(HDF5Map):
def check(
cls, src: H5ReadResult, provider: SchemaProvider, completed: Dict[str, H5ReadResult]
) -> bool:
return (src.neurodata_type == "NWBFile" and all([depend in completed for depend in src.depends]))
return src.neurodata_type == "NWBFile" and all(
[depend in completed for depend in src.depends]
)
@classmethod
def apply(
@ -638,12 +666,6 @@ class CompleteNWBFile(HDF5Map):
trodes = trode_type.model_construct(trodes_original.model_dump())
res["general"]["extracellular_ephys"]["electrodes"] = trodes
# type(res['general']['extracellular_ephys']['electrodes']).__mro__ = tuple(anmro)
# electrodes_dict = res['general']['extracellular_ephys']['electrodes'].model_dump()
# with h5py.File(src.source.h5f_path, 'r') as h5f:
# electrodes_dict['group'] = [egroup_dict[h5f[e].name] for e in electrodes_dict['group'][:]]
# res['general']['extracellular_ephys']['electrodes'] = electrodes_dict
instance = src.model(**res)
return H5ReadResult(
path=src.path,
@ -685,7 +707,7 @@ class ReadQueue(BaseModel):
default_factory=list, description="Phases that have already been completed"
)
def apply_phase(self, phase: ReadPhases, max_passes=5) -> None:
def apply_phase(self, phase: ReadPhases, max_passes: int = 5) -> None:
phase_maps = [m for m in HDF5Map.__subclasses__() if m.phase == phase]
phase_maps = sorted(phase_maps, key=lambda x: x.priority)
@ -695,11 +717,13 @@ class ReadQueue(BaseModel):
for item in self.queue.values():
for op in phase_maps:
if op.check(item, self.provider, self.completed):
# Formerly there was an "exclusive" property in the maps which let potentially multiple
# operations be applied per stage, except if an operation was `exclusive` which would break
# iteration over the operations. This was removed because it was badly implemented, but
# if there is ever a need to do that, then we would need to decide what to do with the
# multiple results.
# Formerly there was an "exclusive" property in the maps which let
# potentially multiple operations be applied per stage,
# except if an operation was `exclusive` which would break
# iteration over the operations.
# This was removed because it was badly implemented,
# but if there is ever a need to do that,
# then we would need to decide what to do with the multiple results.
results.append(op.apply(item, self.provider, self.completed))
break # out of inner iteration
@ -748,9 +772,12 @@ class ReadQueue(BaseModel):
self.apply_phase(phase, max_passes=max_passes - 1)
def flatten_hdf(h5f: h5py.File | h5py.Group, skip="specifications") -> Dict[str, H5SourceItem]:
def flatten_hdf(
h5f: h5py.File | h5py.Group, skip: str = "specifications"
) -> Dict[str, H5SourceItem]:
"""
Flatten all child elements of hdf element into a dict of :class:`.H5SourceItem` s keyed by their path
Flatten all child elements of hdf element into a dict of :class:`.H5SourceItem` s
keyed by their path
Args:
h5f (:class:`h5py.File` | :class:`h5py.Group`): HDF file or group to flatten!

View file

@ -61,14 +61,7 @@ def dynamictable_to_model(
try:
items[col] = da.from_array(group[col])
except NotImplementedError:
# if str in get_inner_types(col_type.annotation):
# # dask can't handle this, we just arrayproxy it
items[col] = NDArrayProxy(h5f_file=group.file.filename, path=group[col].name)
# else:
# warnings.warn(f"Dask can't handle object type arrays like {col} in {group.name}. Skipping")
# pdb.set_trace()
# # can't auto-chunk with "object" type
# items[col] = da.from_array(group[col], chunks=-1)
return model.model_construct(hdf5_path=group.name, name=group.name.split("/")[-1], **items)

View file

@ -1,5 +1,12 @@
"""
Abstract base classes for Map types
.. todo::
Make this consistent or don't call them all maps lmao
"""
from abc import ABC, abstractmethod
from typing import Any
from typing import Any, Mapping, Sequence
class Map(ABC):
@ -10,10 +17,10 @@ class Map(ABC):
@classmethod
@abstractmethod
def check(cls, *args, **kwargs) -> bool:
def check(cls, *args: Sequence, **kwargs: Mapping) -> bool:
"""Check if this map applies to the given item to read"""
@classmethod
@abstractmethod
def apply(cls, *args, **kwargs) -> Any:
def apply(cls, *args: Sequence, **kwargs: Mapping) -> Any:
"""Actually apply the map!"""

View file

@ -1,3 +1,7 @@
"""
String manipulation methods for names
"""
import re
from pathlib import Path
@ -40,7 +44,7 @@ def version_module_case(name: str) -> str:
return name
def relative_path(target: Path, origin: Path):
def relative_path(target: Path, origin: Path) -> Path:
"""
return path of target relative to origin, even if they're
not in the same subpath
@ -49,7 +53,7 @@ def relative_path(target: Path, origin: Path):
- https://stackoverflow.com/a/71874881
"""
def _relative_path(target: Path, origin: Path):
def _relative_path(target: Path, origin: Path) -> Path:
try:
return Path(target).resolve().relative_to(Path(origin).resolve())
except ValueError: # target does not start with origin

View file

@ -10,16 +10,39 @@ from typing import ClassVar, List, Optional
class SCOPE_TYPES(StrEnum):
"""When a mapping should be applied
.. todo::
This is likely deprecated, check usage.
"""
namespace = "namespace"
class PHASES(StrEnum):
"""The times that a mapping can happen
.. todo::
This is likely deprecated, check usage.
"""
postload = "postload"
"""After the YAML for a model has been loaded"""
@dataclass
class KeyMap:
"""
Map for renaming keys used in schemas according to some rule
.. todo::
This is likely deprecated, check usage.
"""
scope: str
"""The namespace that the map is relevant to"""
scope_type: SCOPE_TYPES
@ -74,11 +97,20 @@ MAP_HDMF_DATATYPE_INC = KeyMap(
class MAP_TYPES(StrEnum):
"""
Types of mapping that can exist
.. todo::
This is likely deprecated, check usage.
"""
key = "key"
"""Mapping the name of one key to another key"""
def apply_postload(ns_dict) -> dict:
def apply_postload(ns_dict: dict) -> dict:
"""Apply all post-load maps to a YAML schema"""
maps = [m for m in KeyMap.instances if m.phase == PHASES.postload]
for amap in maps:
ns_dict = amap.apply(ns_dict)

View file

@ -2,6 +2,8 @@
Monkeypatches to external modules
"""
# ruff: noqa: ANN001 - not well defined types for this module
def patch_npytyping_perf() -> None:
"""
@ -115,6 +117,7 @@ def patch_schemaview() -> None:
def apply_patches() -> None:
"""Apply all monkeypatches"""
patch_npytyping_perf()
patch_nptyping_warnings()
patch_schemaview()

View file

@ -9,15 +9,17 @@ import dash_cytoscape as cyto
from dash import Dash, html
from rich import print
cyto.load_extra_layouts()
from nwb_linkml.io import load_nwb_core
from nwb_linkml.io.schema import load_nwb_core
from nwb_schema_language import Dataset, Group, Namespace
if TYPE_CHECKING:
from nwb_linkml.adapters import NamespacesAdapter
# from nwb_schema_language.datamodel import Namespaces
# ruff: noqa: D101
# ruff: noqa: D102
# ruff: noqa: D103
cyto.load_extra_layouts()
class _CytoNode(TypedDict):
@ -52,7 +54,9 @@ class Node:
return node
def make_node(element: Group | Dataset, parent=None, recurse: bool = True) -> List[Node]:
def make_node(
element: Group | Dataset, parent: Optional[str] = None, recurse: bool = True
) -> List[Node]:
if element.neurodata_type_def is None:
if element.name is None:
name = "anonymous" if element.neurodata_type_inc is None else element.neurodata_type_inc

View file

@ -1 +1,7 @@
"""
Classes used for acquiring things from elsewhere, managing build processes, and caching results.
"""
from nwb_linkml.providers.schema import LinkMLProvider, PydanticProvider, SchemaProvider
__all__ = ["LinkMLProvider", "PydanticProvider", "SchemaProvider"]

View file

@ -38,6 +38,7 @@ class NamespaceRepo(BaseModel):
)
def provide_from_git(self, commit: str | None = None) -> Path:
"""Provide a namespace file from a git repo"""
git = GitRepo(self, commit)
git.clone()
return git.namespace_file
@ -78,7 +79,7 @@ DEFAULT_REPOS = {
class GitError(OSError):
pass
"""Exceptions caused by git!"""
class GitRepo:
@ -93,13 +94,14 @@ class GitRepo:
Args:
namespace (:class:`.NamespaceRepo`): The namespace repository to clone!
commit (str): A specific commit or tag to check out
path (:class:`pathlib.Path`): A directory to clone to - if ``None``, use :attr:`~.Config.git_dir` / :attr:`.NamespaceRepo.name`
path (:class:`pathlib.Path`): A directory to clone to -
if ``None``, use :attr:`~.Config.git_dir` / :attr:`.NamespaceRepo.name`
"""
self._temp_directory = path
self.namespace = namespace
self._commit = commit
def _git_call(self, *args) -> subprocess.CompletedProcess:
def _git_call(self, *args: List[str]) -> subprocess.CompletedProcess:
res = subprocess.run(["git", "-C", self.temp_directory, *args], capture_output=True)
if res.returncode != 0:
raise GitError(
@ -152,7 +154,8 @@ class GitRepo:
If ``None``: if :attr:`NamespaceRepo.versions`, use the last version. Otherwise use ``HEAD``
Should match :attr:`.active_commit`, differs semantically in that it is used to
set the active_commit, while :attr:`.active_commit` reads what commit is actually checked out
set the active_commit, while :attr:`.active_commit`
reads what commit is actually checked out
"""
return self._commit
@ -252,7 +255,8 @@ class GitRepo:
if self.remote != str(self.namespace.repository):
warnings.warn(
"Repository exists, but has the wrong remote URL.\nExpected:"
f" {self.namespace.repository}\nGot:{self.remote.strip('.git')}"
f" {self.namespace.repository}\nGot:{self.remote.strip('.git')}",
stacklevel=2,
)
return False
@ -274,7 +278,8 @@ class GitRepo:
):
warnings.warn(
"Temp directory is outside of the system temp dir or git directory set by"
" environmental variables, not deleting in case this has been changed by mistake"
" environmental variables, not deleting in case this has been changed by mistake",
stacklevel=2,
)
self._temp_directory = None
return
@ -299,7 +304,8 @@ class GitRepo:
if not self.check():
warnings.warn(
"Destination directory is not empty and does not pass checks for"
" correctness! cleaning up"
" correctness! cleaning up",
stacklevel=2,
)
self.cleanup()
else:

View file

@ -9,7 +9,8 @@ pydantic models on the fly.
Relationship to other modules:
* :mod:`.adapters` manage the conversion from NWB schema language to linkML.
* :mod:`.generators` create models like pydantic models from the linkML schema
* :mod:`.providers` then use ``adapters`` and ``generators`` to provide models from generated schema!
* :mod:`.providers` then use ``adapters`` and ``generators``
to provide models from generated schema!
Providers create a set of directories with namespaces and versions,
so eg. for the linkML and pydantic providers:
@ -47,6 +48,7 @@ import shutil
import sys
from abc import ABC, abstractmethod
from importlib.abc import MetaPathFinder
from importlib.machinery import ModuleSpec
from pathlib import Path
from types import ModuleType
from typing import Any, Dict, List, Optional, Type, TypedDict, TypeVar
@ -108,7 +110,7 @@ class Provider(ABC):
"""
@abstractmethod
def build(self, *args: Any):
def build(self, *args: Any) -> P:
"""
Whatever needs to be done to build this thing, if applicable
"""
@ -247,7 +249,8 @@ class LinkMLProvider(Provider):
All of which feed into...
* :class:`~.adapters.NamespacesAdapter` used throughout the rest of ``nwb_linkml`` - :meth:`.build`
* :class:`~.adapters.NamespacesAdapter` used throughout the rest of ``nwb_linkml`` -
:meth:`.build`
After a namespace is built, it can be accessed using :meth:`.LinkMLProvider.get`, which
can also be consumed by other providers, so a given namespace and version should only need
@ -277,9 +280,12 @@ class LinkMLProvider(Provider):
@property
def path(self) -> Path:
"""``linkml_dir`` provided by :class:`.Config`"""
return self.config.linkml_dir
def build_from_yaml(self, path: Path, **kwargs):
def build_from_yaml(
self, path: Path, **kwargs: dict
) -> Dict[str | SchemaDefinitionName, LinkMLSchemaBuild]:
"""
Build a namespace's schema
@ -291,7 +297,7 @@ class LinkMLProvider(Provider):
return self.build(ns_adapter, **kwargs)
def build_from_dicts(
self, schemas: Dict[str, dict], **kwargs
self, schemas: Dict[str, dict], **kwargs: dict
) -> Dict[str | SchemaDefinitionName, LinkMLSchemaBuild]:
"""
Build from schema dictionaries, eg. as come from nwb files
@ -314,14 +320,14 @@ class LinkMLProvider(Provider):
ns_adapters[ns_name] = ns_adapter
# get the correct imports
for ns_name, adapter in ns_adapters.items():
for adapter in ns_adapters.values():
for schema_needs in adapter.needed_imports.values():
for needed in schema_needs:
adapter.imported.append(ns_adapters[needed])
# then do the build
res = {}
for ns_name, adapter in ns_adapters.items():
for adapter in ns_adapters.values():
res.update(self.build(adapter, **kwargs))
return res
@ -335,17 +341,19 @@ class LinkMLProvider(Provider):
) -> Dict[str | SchemaDefinitionName, LinkMLSchemaBuild]:
"""
Arguments:
namespaces (:class:`.NamespacesAdapter`): Adapter (populated with any necessary imported namespaces)
to build
namespaces (:class:`.NamespacesAdapter`): Adapter
(populated with any necessary imported namespaces) to build
versions (dict): Dict of specific versions to use
for cross-namespace imports. as ``{'namespace': 'version'}``
If none is provided, use the most recent version
available.
dump (bool): If ``True`` (default), dump generated schema to YAML. otherwise just return
force (bool): If ``False`` (default), don't build schema that already exist. If ``True`` , clear directory and rebuild
force (bool): If ``False`` (default), don't build schema that already exist.
If ``True`` , clear directory and rebuild
Returns:
Dict[str, LinkMLSchemaBuild]. For normal builds, :attr:`.LinkMLSchemaBuild.result` will be populated with results
Dict[str, LinkMLSchemaBuild]. For normal builds,
:attr:`.LinkMLSchemaBuild.result` will be populated with results
of the build. If ``force == False`` and the schema already exist, it will be ``None``
"""
@ -473,9 +481,8 @@ class LinkMLProvider(Provider):
class PydanticProvider(Provider):
"""
Provider for pydantic models built from linkml-style nwb schema (ie. as provided by :class:`.LinkMLProvider`)
Provider for pydantic models built from linkml-style nwb schema
(ie. as provided by :class:`.LinkMLProvider`)
"""
PROVIDES = "pydantic"
@ -488,6 +495,7 @@ class PydanticProvider(Provider):
@property
def path(self) -> Path:
"""``pydantic_dir`` provided by :class:`.Config`"""
return self.config.pydantic_dir
def build(
@ -499,7 +507,7 @@ class PydanticProvider(Provider):
split: bool = True,
dump: bool = True,
force: bool = False,
**kwargs,
**kwargs: dict,
) -> str | List[str]:
"""
@ -515,18 +523,25 @@ class PydanticProvider(Provider):
:class:`.LinkMLProvider` to get the converted schema. If a path,
assume we have been given an explicit ``namespace.yaml`` from a converted
NWB -> LinkML schema to load from.
out_file (Optional[Path]): Optionally override the output file. If ``None``, generate from namespace and version
out_file (Optional[Path]): Optionally override the output file. If ``None``,
generate from namespace and version
version (Optional[str]): The version of the schema to build, if present.
Works similarly to ``version`` in :class:`.LinkMLProvider`. Ignored if ``namespace`` is a Path.
Works similarly to ``version`` in :class:`.LinkMLProvider`.
Ignored if ``namespace`` is a Path.
versions (Optional[dict]): An explicit mapping of namespaces and versions to use when
building the combined pydantic `namespace.py` file. Since NWB doesn't have an explicit
version dependency system between schema, there is intrinsic ambiguity between which version
of which schema should be used when imported from another. This mapping allows those ambiguities to be resolved.
building the combined pydantic `namespace.py` file.
Since NWB doesn't have an explicit version dependency system between schema,
there is intrinsic ambiguity between which version
of which schema should be used when imported from another.
This mapping allows those ambiguities to be resolved.
See :class:`.NWBPydanticGenerator` 's ``versions`` argument for more information.
split (bool): If ``False`` (default), generate a single ``namespace.py`` file, otherwise generate a python file for each schema in the namespace
split (bool): If ``False`` (default), generate a single ``namespace.py`` file,
otherwise generate a python file for each schema in the namespace
in addition to a ``namespace.py`` that imports from them
dump (bool): If ``True`` (default), dump the model to the cache, otherwise just return the serialized string of built pydantic model
force (bool): If ``False`` (default), don't build the model if it already exists, if ``True`` , delete and rebuild any model
dump (bool): If ``True`` (default), dump the model to the cache,
otherwise just return the serialized string of built pydantic model
force (bool): If ``False`` (default), don't build the model if it already exists,
if ``True`` , delete and rebuild any model
**kwargs: Passed to :class:`.NWBPydanticGenerator`
Returns:
@ -549,7 +564,8 @@ class PydanticProvider(Provider):
else:
# given a path to a namespace linkml yaml file
path = Path(namespace)
# FIXME: this is extremely fragile, but get the details from the path. this is faster than reading yaml for now
# FIXME: this is extremely fragile, but get the details from the path.
# this is faster than reading yaml for now
name = path.parts[-3]
version = path.parts[-2]
fn = path.parts[-1]
@ -578,7 +594,15 @@ class PydanticProvider(Provider):
else:
return self._build_unsplit(path, versions, default_kwargs, dump, out_file, force)
def _build_unsplit(self, path, versions, default_kwargs, dump, out_file, force):
def _build_unsplit(
self,
path: Path,
versions: dict,
default_kwargs: dict,
dump: bool,
out_file: Path,
force: bool,
) -> Optional[str]:
if out_file.exists() and not force:
with open(out_file) as ofile:
serialized = ofile.read()
@ -602,7 +626,13 @@ class PydanticProvider(Provider):
return serialized
def _build_split(
self, path: Path, versions, default_kwargs, dump, out_file, force
self,
path: Path,
versions: dict,
default_kwargs: dict,
dump: bool,
out_file: Path,
force: bool,
) -> List[str]:
serialized = []
for schema_file in path.parent.glob("*.yaml"):
@ -654,6 +684,12 @@ class PydanticProvider(Provider):
@classmethod
def module_name(self, namespace: str, version: str) -> str:
"""Module name for the built module
e.g.::
nwb_linkml.models.pydantic.{namespace}.{version}
"""
name_pieces = [
"nwb_linkml",
"models",
@ -674,7 +710,8 @@ class PydanticProvider(Provider):
Args:
namespace (str): Name of namespace
version (Optional[str]): Version to import, if None, try and get the most recently built version.
version (Optional[str]): Version to import, if None,
try and get the most recently built version.
Returns:
:class:`types.ModuleType`
@ -712,7 +749,8 @@ class PydanticProvider(Provider):
"""
Get the imported module for a given namespace and version.
A given namespace will be stored in :data:`sys.modules` as ``nwb_linkml.models.{namespace}``,
A given namespace will be stored in :data:`sys.modules` as
``nwb_linkml.models.{namespace}``,
so first check if there is any already-imported module, and return that if so.
Then we check in the temporary directory for an already-built ``namespace.py`` file
@ -722,8 +760,10 @@ class PydanticProvider(Provider):
Notes:
The imported modules shadow the "actual"
``nwb_linkml.models`` module as would be imported from the usual location within the package directory.
This is intentional, as models can then be used as if they were integrated parts of the package,
``nwb_linkml.models`` module as would be imported from the usual location
within the package directory.
This is intentional, as models can then be used as if they were
integrated parts of the package,
and also so the active version of a namespace can be cleanly accessed
(ie. without ``from nwb_linkml.models.core import v2_2_0 as core`` ).
Accordingly, we assume that people will only be using a single version of NWB in a given
@ -731,13 +771,18 @@ class PydanticProvider(Provider):
Args:
namespace (str): Name of namespace to import. Must have either been previously built with :meth:`.PydanticProvider.build` or
a matching namespace/version combo must be available to the :class:`.LinkMLProvider`
version (Optional[str]): Version to import. If ``None``, get the most recently build module
allow_repo (bool): Allow getting modules provided within :mod:`nwb_linkml.models.pydantic`
namespace (str): Name of namespace to import. Must have either been previously built
with :meth:`.PydanticProvider.build` or
a matching namespace/version combo must be available to the
:class:`.LinkMLProvider`
version (Optional[str]): Version to import. If ``None``,
get the most recently build module
allow_repo (bool): Allow getting modules provided within
:mod:`nwb_linkml.models.pydantic`
Returns:
The imported :class:`types.ModuleType` object that has all the built classes at the root level.
The imported :class:`types.ModuleType` object that has all the built
classes at the root level.
"""
if allow_repo is None:
@ -795,8 +840,9 @@ class PydanticProvider(Provider):
Get a class from a given namespace and version!
Args:
namespace (str): Name of a namespace that has been previously built and cached, otherwise
we will attempt to build it from the :data:`.providers.git.DEFAULT_REPOS`
namespace (str): Name of a namespace that has been previously built and cached,
otherwise we will attempt to build it from the
:data:`.providers.git.DEFAULT_REPOS`
class_ (str): Name of class to retrieve
version (Optional[str]): Optional version of the schema to retrieve from
@ -825,7 +871,10 @@ class EctopicModelFinder(MetaPathFinder):
super().__init__(*args, **kwargs)
self.path = path
def find_spec(self, fullname, path, target=None):
def find_spec(
self, fullname: str, path: Optional[str], target: Optional[ModuleType] = None
) -> Optional[ModuleSpec]:
"""If we are loading a generated pydantic module, return an importlib spec"""
if not fullname.startswith(self.MODEL_STEM):
return None
else:
@ -873,9 +922,10 @@ class SchemaProvider(Provider):
def __init__(self, versions: Optional[Dict[str, str]] = None, **kwargs):
"""
Args:
versions (dict): Dictionary like ``{'namespace': 'v1.0.0'}`` used to specify that this provider should always
return models from a specific version of a namespace (unless explicitly requested otherwise
in a call to :meth:`.get` ).
versions (dict): Dictionary like ``{'namespace': 'v1.0.0'}``
used to specify that this provider should always
return models from a specific version of a namespace
(unless explicitly requested otherwise in a call to :meth:`.get` ).
**kwargs: passed to superclass __init__ (see :class:`.Provider` )
"""
self.versions = versions
@ -883,6 +933,7 @@ class SchemaProvider(Provider):
@property
def path(self) -> Path:
"""``cache_dir`` provided by :class:`.Config`"""
return self.config.cache_dir
def build(
@ -899,8 +950,10 @@ class SchemaProvider(Provider):
Args:
ns_adapter:
verbose (bool): If ``True`` (default), show progress bars
linkml_kwargs (Optional[dict]): Dictionary of kwargs optionally passed to :meth:`.LinkMLProvider.build`
pydantic_kwargs (Optional[dict]): Dictionary of kwargs optionally passed to :meth:`.PydanticProvider.build`
linkml_kwargs (Optional[dict]): Dictionary of kwargs optionally passed to
:meth:`.LinkMLProvider.build`
pydantic_kwargs (Optional[dict]): Dictionary of kwargs optionally passed to
:meth:`.PydanticProvider.build`
**kwargs: Common options added to both ``linkml_kwargs`` and ``pydantic_kwargs``
Returns:

View file

@ -1 +1,7 @@
"""
Custom types (likely deprecated)
"""
from nwb_linkml.types.ndarray import NDArray
__all__ = ["NDArray"]

View file

@ -3,8 +3,10 @@ Pydantic models that behave like pandas dataframes
.. note::
This is currently unused but kept in place as a stub in case it is worth revisiting in the future.
It turned out to be too momentarily difficult to make lazy-loading work with dask arrays per column
This is currently unused but kept in place as a stub in case it is worth
revisiting in the future.
It turned out to be too momentarily difficult to make lazy-loading work with
dask arrays per column
while still keeping pandas-like API intact. In the future we should investigate modifying the
:func:`dask.dataframe.read_hdf` function to treat individual hdf5 datasets like columns
@ -133,6 +135,7 @@ class DataFrame(BaseModel, pd.DataFrame):
def dynamictable_to_df(
group: h5py.Group, model: Optional[Type[DataFrame]] = None, base: Optional[BaseModel] = None
) -> DataFrame:
"""Generate a dataframe from an NDB DynamicTable"""
if model is None:
model = model_from_dynamictable(group, base)

View file

@ -1,3 +1,7 @@
"""
Types used with hdf5 io
"""
from typing import Any
from pydantic import GetCoreSchemaHandler
@ -5,7 +9,9 @@ from pydantic_core import CoreSchema, core_schema
class HDF5_Path(str):
"""Trivial subclass of string to indicate that it is a reference to a location within an HDF5 file"""
"""
Trivial subclass of string to indicate that it is a reference to a location within an HDF5 file
"""
@classmethod
def __get_pydantic_core_schema__(

View file

@ -4,6 +4,10 @@ Extension of nptyping NDArray for pydantic that allows for JSON-Schema serializa
* Order to store data in (row first)
"""
# ruff: noqa: ANN001
# ruff: noqa: ANN202
# FIXME: this has been moved to numpydantic, remove.
import base64
import sys
from copy import copy
@ -191,11 +195,5 @@ class NDArrayProxy:
_source_type: _NDArray,
_handler: Callable[[Any], core_schema.CoreSchema],
) -> core_schema.CoreSchema:
# return core_schema.no_info_after_validator_function(
# serialization=core_schema.plain_serializer_function_ser_schema(
# lambda instance: instance.tolist(),
# when_used='json'
# )
# )
return NDArray_.__get_pydantic_core_schema__(cls, _source_type, _handler)
return NDArray.__get_pydantic_core_schema__(cls, _source_type, _handler)

View file

@ -13,6 +13,8 @@ if TYPE_CHECKING:
class AdapterProgress:
"""Progress bar built with rich"""
def __init__(self, ns: "NamespacesAdapter"):
self.ns = ns
self.task_ids = {}
@ -46,12 +48,15 @@ class AdapterProgress:
)
def update(self, namespace: str, **kwargs) -> None:
"""Update the progressbar with a given namespace"""
self.progress.update(self.task_ids[namespace], **kwargs)
def start(self) -> None:
"""Start displaying progress"""
self.progress.start()
def stop(self) -> None:
"""Stop displaying progress"""
self.progress.stop()
def __enter__(self) -> Live:

View file

@ -1,3 +1,7 @@
"""
Pydantic representation of the NWB Schema Language specification
"""
import warnings
from typing import List, Union
@ -21,7 +25,8 @@ try:
except (NameError, RecursionError):
warnings.warn(
"Error importing pydantic classes, passing because we might be in the process of patching"
" them, but it is likely they are broken and you will be unable to use them!"
" them, but it is likely they are broken and you will be unable to use them!",
stacklevel=1,
)
__all__ = [

View file

@ -1,3 +1 @@
# from .nwb_schema_language import *
# create additional derived
"""Autogenerated models from linkML schema"""

View file

@ -1,3 +1,7 @@
"""
Shorthand instantiated schemaview from the linkml schema
"""
from pathlib import Path
from linkml_runtime.utils.schemaview import SchemaView

View file

@ -12,6 +12,8 @@ from typing import ClassVar, List
class Phases(StrEnum):
"""Phases of the loading and generation process"""
post_generation_pydantic = "post_generation_pydantic"
post_load_yaml = "post_load_yaml"
"""After the yaml of the nwb schema classes is loaded"""
@ -19,6 +21,10 @@ class Phases(StrEnum):
@dataclass
class Patch:
"""
Structured change to make to generated models
"""
phase: Phases
path: Path
"""Path relative to repository root"""
@ -47,20 +53,6 @@ patch_schema_slot = Patch(
replacement=r'\n schema_:\2alias="schema", \3',
)
# patch_neurodata_type_def_alias = Patch(
# phase=Phases.post_generation_pydantic,
# path=Path('src/nwb_schema_language/datamodel/nwb_schema_pydantic.py'),
# match=r"(\n\s*neurodata_type_def.*Field\(None, )(.*)",
# replacement=r'\1alias="data_type_def", \2',
# )
#
# patch_neurodata_type_inc_alias = Patch(
# phase=Phases.post_generation_pydantic,
# path=Path('src/nwb_schema_language/datamodel/nwb_schema_pydantic.py'),
# match=r"(\n\s*neurodata_type_inc.*Field\(None, )(.*)",
# replacement=r'\1alias="data_type_inc", \2',
# )
patch_dtype_single_multiple = Patch(
phase=Phases.post_generation_pydantic,
path=Path("src/nwb_schema_language/datamodel/nwb_schema_pydantic.py"),
@ -84,6 +76,9 @@ patch_contact_single_multiple = Patch(
def run_patches(phase: Phases, verbose: bool = False) -> None:
"""
Apply all declared :class:`.Path` instances
"""
patches = [p for p in Patch.instances if p.phase == phase]
for patch in patches:
if verbose:
@ -97,6 +92,9 @@ def run_patches(phase: Phases, verbose: bool = False) -> None:
def main() -> None:
"""
Run patches from the command line
"""
parser = argparse.ArgumentParser(description="Run patches for a given phase of code generation")
parser.add_argument("--phase", choices=list(Phases.__members__.keys()), type=Phases)
args = parser.parse_args()

View file

@ -58,6 +58,9 @@ select = [
"D419",
]
ignore = [
# annotations for *args and **kwargs
"ANN002", "ANN003",
# annoying annotation rules
"ANN101", "ANN102", "ANN401", "ANN204",
# explicit strict arg for zip
"B905",