successfully building many versions of nwb schema.

working on hdf5 importing, come back to it when fresh, just sorta poking at it because it's so close.
This commit is contained in:
sneakers-the-rat 2023-09-14 02:45:01 -07:00
parent 32f81fd409
commit e6a41415f5
9 changed files with 550 additions and 127 deletions

View file

@ -48,7 +48,7 @@ class NamespacesAdapter(Adapter):
"""
from nwb_linkml.io import schema as schema_io
ns_adapter = schema_io.load_namespaces(path)
ns_adapter = schema_io.load_namespace_schema(ns_adapter, path)
ns_adapter = schema_io.load_namespace_adapter(ns_adapter, path)
# try and find imported schema
@ -73,10 +73,19 @@ class NamespacesAdapter(Adapter):
sch_result = BuildResult()
for sch in self.schemas:
if progress is not None:
progress.update(sch.namespace, action=sch.name)
try:
progress.update(sch.namespace, action=sch.name)
except KeyError:
# happens when we skip builds due to cachine
pass
sch_result += sch.build()
if progress is not None:
progress.update(sch.namespace, advance=1)
try:
progress.update(sch.namespace, advance=1)
except KeyError:
# happens when we skip builds due to caching
pass
# recursive step
if not skip_imports:
@ -145,10 +154,9 @@ class NamespacesAdapter(Adapter):
sources = [sch.source for sch in ns.schema_]
if sch_name in sources or sch.path.stem in sources:
sch.namespace = ns.name
sch.version = ns.version
break
def find_type_source(self, name:str) -> SchemaAdapter:
"""
Given some neurodata_type_inc, find the schema that it's defined in.

View file

@ -33,6 +33,10 @@ class SchemaAdapter(Adapter):
namespace: Optional[str] = Field(
None,
description="""String of containing namespace. Populated by NamespacesAdapter""")
version: Optional[str] = Field(
None,
description="Version of schema, populated by NamespacesAdapter since individual schema files dont know their version in NWB Schema Lang"
)
split: bool = Field(
False,
description="Split anonymous subclasses into a separate schema file"
@ -67,7 +71,6 @@ class SchemaAdapter(Adapter):
- `id` (but need to have a placeholder to instantiate)
- `version`
"""
res = BuildResult()
for dset in self.datasets:
@ -90,7 +93,8 @@ class SchemaAdapter(Adapter):
imports = [i.name if isinstance(i, SchemaAdapter) else i for i in self.imports ],
classes=res.classes,
slots=res.slots,
types=res.types
types=res.types,
version=self.version
)
# every schema needs the language elements
sch.imports.append('.'.join([self.namespace, 'nwb.language']))

View file

@ -219,10 +219,14 @@ class NWBPydanticGenerator(PydanticGenerator):
# Don't get classes that are defined in this schema!
if module_name == self.schema.name:
continue
if self.versions and module_name in self.versions:
version = version_module_case(self.versions[module_name])
local_mod_name = '....' + module_case(module_name) + '.' + version + '.' + 'namespace'
# pdb.set_trace()
schema_name = module_name.split('.')[0]
if self.versions and schema_name != self.schema.name.split('.')[0] and schema_name in self.versions:
version = version_module_case(self.versions[schema_name])
if self.split:
local_mod_name = '...' + module_case(schema_name) + '.' + version + '.' + module_case(module_name)
else:
local_mod_name = '...' + module_case(schema_name) + '.' + version + '.' + 'namespace'
else:
local_mod_name = '.' + module_case(module_name)
@ -372,7 +376,7 @@ class NWBPydanticGenerator(PydanticGenerator):
try:
dtype = flat_to_npytyping[list(attrs.values())[0].range]
except KeyError as e:
warnings.warn(e)
warnings.warn(str(e))
range = list(attrs.values())[0].range
return f'List[{range}] | {range}'
suffix = "]"

View file

@ -1,11 +1,14 @@
"""
This is a sandbox file that should be split out to its own pydantic-hdf5 package, but just experimenting here to get our bearings
"""
import pdb
import typing
from typing import Optional, List, Dict, overload, Literal, Type, Any
from pathlib import Path
from types import ModuleType
from typing import TypeVar, TYPE_CHECKING
from abc import abstractmethod
import json
import h5py
from pydantic import BaseModel
@ -15,13 +18,14 @@ from nwb_linkml.translate import generate_from_nwbfile
#from nwb_linkml.models.core_nwb_file import NWBFile
if TYPE_CHECKING:
from nwb_linkml.models.core_nwb_file import NWBFile
from nwb_linkml.providers.schema import SchemaProvider
@dataclass
class HDF5Element():
cls: h5py.Dataset | h5py.Group
models: Dict[str, ModuleType]
parent: Type[BaseModel]
model: Optional[Any] = None
@abstractmethod
def read(self) -> BaseModel | List[BaseModel]:
@ -40,34 +44,69 @@ class HDF5Element():
"""Just the terminal group name"""
return self.cls.name.split('/')[-1]
def get_model(self) -> Type[BaseModel | dict]:
def get_model(self) -> Type[BaseModel | dict | list]:
"""
Find our model
- If we have a neurodata_type in our attrs, use that
- Otherwise, use our parent to resolve the type
"""
if self.model is not None:
return self.model
if 'neurodata_type' in self.cls.attrs.keys():
return get_model(self.cls.attrs, self.models)
return get_model(self.cls)
else:
parent_model = get_model(self.cls.parent.attrs, self.models)
parent_model = get_model(self.cls.parent)
field = parent_model.model_fields.get(self.name)
if issubclass(type(field.annotation), BaseModel):
return field.annotation
else:
try:
if issubclass(field.annotation, BaseModel):
return field.annotation
except TypeError:
pass
# remove any optionals
annotation = field.annotation
annotation = unwrap_optional(annotation)
if typing.get_origin(annotation) is list:
return list
else:
return dict
#raise NotImplementedError('Need to unpack at least listlike annotations')
def unwrap_optional(annotation):
if typing.get_origin(annotation) == typing.Union:
args = typing.get_args(annotation)
if len(args) == 2 and args[1].__name__ == 'NoneType':
annotation = args[0]
return annotation
def take_outer_type(annotation):
if typing.get_origin(annotation) is list:
return list
return annotation
@dataclass
class H5Dataset(HDF5Element):
cls: h5py.Dataset
def read(self) -> Any:
model = self.get_model()
# TODO: Handle references
if self.cls.dtype == h5py.ref_dtype:
return None
if self.cls.shape == ():
return self.cls[()]
elif len(self.cls.shape) == 1:
elif model is list:
return self.cls[:].tolist()
else:
raise NotImplementedError('oop')
return {'array':self.cls[:], 'name': self.cls.name.split('/')[-1]}
#raise NotImplementedError('oop')
@dataclass
class H5Group(HDF5Element):
@ -82,14 +121,25 @@ class H5Group(HDF5Element):
}
data.update(model_attrs)
for k, v in self.cls.items():
child_model = None
if isinstance(model, type) and issubclass(model, BaseModel):
child_field = model.model_fields.get(k, None)
if child_field is not None:
child_model = unwrap_optional(child_field.annotation)
child_model = take_outer_type(child_model)
if isinstance(v, h5py.Group):
data[k] = H5Group(cls=v, models=self.models, parent=model).read()
data[k] = H5Group(cls=v, parent=model, model=child_model).read()
elif isinstance(v, h5py.Dataset):
data[k] = H5Dataset(cls=v, models=self.models, parent=model).read()
data[k] = H5Dataset(cls=v, parent=model, model=child_model).read()
return model(**data)
if issubclass(model, BaseModel):
data['name'] = self.cls.name.split('/')[-1]
return model(**data)
elif model is list:
return list(data.values())
class HDF5IO():
@ -112,20 +162,25 @@ class HDF5IO():
def read(self, path:Optional[str] = None):
h5f = h5py.File(str(self.path))
schema = read_specs(h5f.get('specifications'))
# build schema so we have them cached
provider = SchemaProvider()
res = provider.build_from_dicts(schema)
if path:
src = h5f.get(path)
parent = get_model(src.attrs, self.modules)
parent = get_model(src)
else:
src = h5f
parent = getattr(self.modules['core'], 'NWBFile')
parent = provider.get_class('core', 'NWBFile')
data = {}
for k, v in src.items():
if isinstance(v, h5py.Group):
data[k] = H5Group(cls=v, models=self.modules, parent=parent).read()
data[k] = H5Group(cls=v, parent=parent).read()
elif isinstance(v, h5py.Dataset):
data[k] = H5Dataset(cls=v, models=self.modules, parent=parent).read()
data[k] = H5Dataset(cls=v, parent=parent).read()
if path is None:
return parent(**data)
@ -168,13 +223,36 @@ class HDF5IO():
if len(data.shape) == 1:
return list(data[:])
def get_model(attrs: h5py.AttributeManager, models: Dict[str, ModuleType]) -> Type[BaseModel]:
def read_specs(group: h5py.Group) -> dict:
spec_dict = {}
def _read_spec(name, node):
if isinstance(node, h5py.Dataset):
# make containing dict if they dont exist
pieces = node.name.split('/')
if pieces[-3] not in spec_dict.keys():
spec_dict[pieces[-3]] = {}
spec = json.loads(node[()])
spec_dict[pieces[-3]][pieces[-1]] = spec
group.visititems(_read_spec)
return spec_dict
def get_model(cls: h5py.Group | h5py.Dataset) -> Type[BaseModel]:
attrs = cls.attrs
ns = attrs.get('namespace')
model_name = attrs.get('neurodata_type')
return getattr(models[ns], model_name)
# if __name__ == "__main__":
# NWBFILE = Path('/Users/jonny/Dropbox/lab/p2p_ld/data/nwb/sub-738651046_ses-760693773.nwb')
# h5f = HDF5IO(NWBFILE)
try:
return SchemaProvider().get_class(ns, model_name)
except:
# try to get parent class
mod = get_model(cls.parent)
return mod.model_fields[cls.name.split('/')[-1]].annotation

View file

@ -25,7 +25,8 @@ def load_yaml(path:Path) -> dict:
ns_dict = amap.apply(ns_dict)
return ns_dict
def load_namespaces(path:Path|NamespaceRepo) -> Namespaces:
def _load_namespaces(path:Path|NamespaceRepo) -> Namespaces:
"""Loads the NWB SCHEMA LANGUAGE namespaces (not the namespacesadapter)"""
if isinstance(path, NamespaceRepo):
path = path.provide_from_git()
@ -37,6 +38,10 @@ def load_namespaces(path:Path|NamespaceRepo) -> Namespaces:
def load_schema_file(path:Path, yaml:Optional[dict] = None) -> SchemaAdapter:
if yaml is not None:
source = yaml
# apply maps
maps = [m for m in Map.instances if m.phase == PHASES.postload]
for amap in maps:
source = amap.apply(source)
else:
source = load_yaml(path)
@ -64,25 +69,39 @@ def load_schema_file(path:Path, yaml:Optional[dict] = None) -> SchemaAdapter:
)
return schema
def load_namespace_schema(namespace: Namespaces, path:Path=Path('..')) -> NamespacesAdapter:
def load_namespace_adapter(namespace: Path | NamespaceRepo | Namespaces, path:Optional[Path]=None) -> NamespacesAdapter:
"""
Load all schema referenced by a namespace file
Args:
namespace (:class:`.Namespace`):
namespace (:class:`:class:`.Namespace`):
path (:class:`pathlib.Path`): Location of the namespace file - all relative paths are interpreted relative to this
Returns:
:class:`.NamespacesAdapter`
"""
if path is None:
path = Path('..')
if isinstance(namespace, Path):
path = namespace
namespaces = _load_namespaces(path)
elif isinstance(namespace, NamespaceRepo):
path = namespace.provide_from_git()
namespaces = _load_namespaces(namespace)
elif isinstance(namespace, Namespaces):
namespaces = namespace
else:
raise ValueError(f"Namespace must be a path, namespace repo, or already loaded namespaces")
path = Path(path).resolve()
if path.is_file():
# given the namespace file itself, so find paths relative to its directory
path = path.parent
sch = []
for ns in namespace.namespaces:
for ns in namespaces.namespaces:
for schema in ns.schema_:
if schema.source is None:
# this is normal, we'll resolve later
@ -91,7 +110,7 @@ def load_namespace_schema(namespace: Namespaces, path:Path=Path('..')) -> Namesp
sch.append(load_schema_file(yml_file))
adapter = NamespacesAdapter(
namespaces=namespace,
namespaces=namespaces,
schemas=sch
)
@ -99,13 +118,8 @@ def load_namespace_schema(namespace: Namespaces, path:Path=Path('..')) -> Namesp
def load_nwb_core() -> NamespacesAdapter:
# First get hdmf-common:
hdmf_ns_file = HDMF_COMMON_REPO.provide_from_git()
hdmf_ns = load_namespaces(hdmf_ns_file)
hdmf_schema = load_namespace_schema(hdmf_ns, hdmf_ns_file)
namespace_file = NWB_CORE_REPO.provide_from_git()
ns = load_namespaces(namespace_file)
schema = load_namespace_schema(ns, namespace_file)
hdmf_schema = load_namespace_adapter(HDMF_COMMON_REPO)
schema = load_namespace_adapter(NWB_CORE_REPO)
schema.imported.append(hdmf_schema)

View file

@ -131,6 +131,7 @@ class GitRepo:
self._git_call('checkout', "HEAD")
else:
self._git_call('checkout', commit)
self._git_call('submodule', 'update', '--init', '--recursive')
self._commit = commit
@property
@ -166,6 +167,7 @@ class GitRepo:
self._git_call('fetch', '--all', '--tags')
self._git_call('checkout', f'tags/{tag}')
# error will be raised by _git_call if tag not found
self._git_call('submodule', 'update', '--init', '--recursive')
@property
def default_branch(self) -> str:

View file

@ -10,9 +10,37 @@ Relationship to other modules:
* :mod:`.adapters` manage the conversion from NWB schema language to linkML.
* :mod:`.generators` create models like pydantic models from the linkML schema
* :mod:`.providers` then use ``adapters`` and ``generators`` to provide models from generated schema!
Providers create a set of directories with namespaces and versions,
so eg. for the linkML and pydantic providers:
cache_dir
- linkml
- nwb_core
- v0_2_0
- namespace.yaml
- nwb.core.file.yaml
- ...
- v0_2_1
- namespace.yaml
- ...
- my_schema
- v0_1_0
- ...
- pydantic
- nwb_core
- v0_2_0
- namespace.py
- ...
- v0_2_1
- namespace.py
- ...
"""
import pdb
from typing import Dict, TypedDict, List, Optional, Literal, TypeVar, Any, Dict
import shutil
from typing import Dict, TypedDict, List, Optional, Literal, TypeVar, Any, Dict, Type
from types import ModuleType
from pathlib import Path
import os
@ -83,6 +111,13 @@ class Provider(ABC):
Whatever needs to be done to build this thing, if applicable
"""
@abstractmethod
def get(self, *args: Any) -> Any:
"""
Get a cached item.
Optionally, try any build it if it's possible to do so
"""
def namespace_path(
self,
@ -136,12 +171,48 @@ class Provider(ABC):
return version_path
@property
def versions(self) -> Dict[str,List[str]]:
"""
Dictionary mapping a namespace to a list of built versions
"""
versions = {} # type: Dict[str, List[Path]]
# first get any builtins provided by the package itself
# these get overwritten by
module_path = Path(importlib.util.find_spec('nwb_linkml').origin).parent
builtin_namespaces = []
if self.PROVIDES == 'linkml':
namespace_path = module_path / 'schema'
builtin_namespaces = list(namespace_path.iterdir())
elif self.PROVIDES == 'pydantic':
namespace_path = module_path / 'models'
builtin_namespaces = list(namespace_path.iterdir())
for ns_dir in builtin_namespaces + list(self.path.iterdir()):
if not ns_dir.is_dir():
continue
if ns_dir.name not in versions.keys():
versions[ns_dir.name] = []
versions[ns_dir.name].extend([v for v in ns_dir.iterdir() if v.is_dir()])
# flatten out in case we got duplicates between the builtins and cache
res = {
k: [v.name for v in sorted(set(v_paths), key=os.path.getmtime)]
for k, v_paths in versions.items()
}
return res
class LinkMLSchemaBuild(TypedDict):
"""Build result from :meth:`.LinkMLProvider.build`"""
result: BuildResult
version: str
namespace: Path
name: str
result: Optional[BuildResult]
class LinkMLProvider(Provider):
@ -219,19 +290,35 @@ class LinkMLProvider(Provider):
to infer version and schema name. Post-load maps should have already
been applied
"""
ns = Namespaces(**schemas['namespace'])
typed_schemas = [
io.schema.load_schema_file(
path=Path(key + ".yaml"),
yaml=val)
for key, val in schemas.items()
if key != 'namespace'
]
ns_adapter = adapters.NamespacesAdapter(
namespaces=ns,
schemas=typed_schemas
)
return self.build(ns_adapter, **kwargs)
ns_adapters = {}
for ns_name, ns_schemas in schemas.items():
ns = Namespaces(**ns_schemas['namespace'])
typed_schemas = [
io.schema.load_schema_file(
path=Path(key + ".yaml"),
yaml=val)
for key, val in ns_schemas.items()
if key != 'namespace'
]
ns_adapter = adapters.NamespacesAdapter(
namespaces=ns,
schemas=typed_schemas
)
ns_adapters[ns_name] = ns_adapter
# get the correct imports
for ns_name, adapter in ns_adapters.items():
for schema_needs in adapter.needed_imports.values():
for needed in schema_needs:
adapter.imported.append(ns_adapters[needed])
# then do the build
res = {}
for ns_name, adapter in ns_adapters.items():
res.update(self.build(adapter, **kwargs))
return res
def build(
@ -239,6 +326,7 @@ class LinkMLProvider(Provider):
ns_adapter: adapters.NamespacesAdapter,
versions: Optional[dict] = None,
dump: bool = True,
force: bool = False
) -> Dict[str | SchemaDefinitionName, LinkMLSchemaBuild]:
"""
Arguments:
@ -249,8 +337,24 @@ class LinkMLProvider(Provider):
If none is provided, use the most recent version
available.
dump (bool): If ``True`` (default), dump generated schema to YAML. otherwise just return
force (bool): If ``False`` (default), don't build schema that already exist. If ``True`` , clear directory and rebuild
Returns:
Dict[str, LinkMLSchemaBuild]. For normal builds, :attr:`.LinkMLSchemaBuild.result` will be populated with results
of the build. If ``force == False`` and the schema already exist, it will be ``None``
"""
if not force:
if all([(self.namespace_path(ns, version) / 'namespace.yaml').exists() for ns, version in ns_adapter.versions.items()]):
return {
k: LinkMLSchemaBuild(
name=k,
result=None,
namespace=self.namespace_path(k, v) / 'namespace.yaml',
version=v
) for k,v in ns_adapter.versions.items()
}
#self._find_imports(ns_adapter, versions, populate=True)
if self.verbose:
progress = AdapterProgress(ns_adapter)
@ -265,27 +369,32 @@ class LinkMLProvider(Provider):
build_result = {}
namespace_sch = [sch for sch in built.schemas if 'namespace' in sch.annotations.keys()]
namespace_names = [sch.name for sch in namespace_sch]
for ns_linkml in namespace_sch:
version = ns_adapter.versions[ns_linkml.name]
version_path = self.namespace_path(ns_linkml.name, version, allow_repo=False)
if version_path.exists() and force:
shutil.rmtree(str(version_path))
version_path.mkdir(exist_ok=True, parents=True)
ns_file = version_path / 'namespace.yaml'
ns_linkml = self._fix_schema_imports(ns_linkml, ns_adapter, ns_file)
yaml_dumper.dump(ns_linkml, ns_file)
# schema built as part of this namespace that aren't the namespace file
other_schema = [sch for sch in built.schemas if
sch.name.split('.')[0] == ns_linkml.name and sch not in namespace_sch]
# write the schemas for this namespace
other_schema = [sch for sch in built.schemas if sch.name.split('.')[0] == ns_linkml.name and sch not in namespace_sch]
for sch in other_schema:
output_file = version_path / (sch.name + '.yaml')
# fix the paths for intra-schema imports
sch = self._fix_schema_imports(sch, ns_adapter, output_file)
if force or (not force and not ns_file.exists()):
ns_linkml = self._fix_schema_imports(ns_linkml, ns_adapter, ns_file)
yaml_dumper.dump(ns_linkml, ns_file)
yaml_dumper.dump(sch, output_file)
# write the schemas for this namespace
for sch in other_schema:
output_file = version_path / (sch.name + '.yaml')
# fix the paths for intra-schema imports
sch = self._fix_schema_imports(sch, ns_adapter, output_file)
yaml_dumper.dump(sch, output_file)
# make return result for just this namespace
build_result[ns_linkml.name] = LinkMLSchemaBuild(
namespace=ns_file,
name=ns_linkml.name,
result= BuildResult(schemas=[ns_linkml, *other_schema]),
version=version
)
@ -350,27 +459,40 @@ class PydanticProvider(Provider):
def build(
self,
namespace: str | Path,
out_file: Optional[Path] = None,
version: Optional[str] = None,
versions: Optional[dict] = None,
split: bool = False,
dump: bool = True,
force: bool = False,
**kwargs
) -> str:
"""
Notes:
We currently infer namespace and version from the path when ``namespace`` is a Path,
which is a patently Bad Thing To Do. This is a temporary measure until we decide on
a permanent means by which we want to cache built artifacts <3. Hierarchies of folders
is not the target design.
Args:
namespace (Union[str, :class:`pathlib.Path`]): If a string, use a
:class:`.LinkMLProvider` to get the converted schema. If a path,
assume we have been given an explicit ``namespace.yaml`` from a converted
NWB -> LinkML schema to load from.
out_file (Optional[Path]): Optionally override the output file. If ``None``, generate from namespace and version
version (Optional[str]): The version of the schema to build, if present.
Works similarly to ``version`` in :class:`.LinkMLProvider`
Works similarly to ``version`` in :class:`.LinkMLProvider`. Ignored if ``namespace`` is a Path.
versions (Optional[dict]): An explicit mapping of namespaces and versions to use when
building the combined pydantic `namespace.py` file. Since NWB doesn't have an explicit
version dependency system between schema, there is intrinsic ambiguity between which version
of which schema should be used when imported from another. This mapping allows those ambiguities to be resolved.
See :class:`.NWBPydanticGenerator` 's ``versions`` argument for more information.
split (bool): If ``False`` (default), generate a single ``namespace.py`` file, otherwise generate a python file for each schema in the namespace
in addition to a ``namespace.py`` that imports from them
dump (bool): If ``True`` (default), dump the model to the cache, otherwise just return the serialized string of built pydantic model
force (bool): If ``False`` (default), don't build the model if it already exists, if ``True`` , delete and rebuild any model
**kwargs: Passed to :class:`.NWBPydanticGenerator`
Returns:
@ -379,13 +501,36 @@ class PydanticProvider(Provider):
if isinstance(namespace, str) and not (namespace.endswith('.yaml') or namespace.endswith('.yml')):
# we're given a name of a namespace to build
name = namespace
path = LinkMLProvider(path=self.config.cache_dir).namespace_path(namespace, version) / 'namespace.yaml'
if version is None:
# Get the most recently built version
version = LinkMLProvider(path=self.config.cache_dir).versions[name][-1]
fn = path.parts[-1]
else:
# given a path to a namespace linkml yaml file
path = Path(namespace)
# FIXME: this is extremely fragile, but get the details from the path. this is faster than reading yaml for now
name = path.parts[-3]
version = path.parts[-2]
fn = path.parts[-1]
version = version_module_case(version)
# this is extremely fragile, we should not be inferring version number from paths...
if out_file is None:
fn = fn.strip('.yaml')
fn = module_case(fn) + '.py'
out_file = self.path / name / version / fn
if out_file.exists() and not force:
with open(out_file, 'r') as ofile:
serialized = ofile.read()
return serialized
default_kwargs = {
'split': False,
'split': split,
'emit_metadata': True,
'gen_slots': True,
'pydantic_version': '2'
@ -399,10 +544,16 @@ class PydanticProvider(Provider):
)
serialized = generator.serialize()
if dump:
out_file = self.path / path.parts[-3] / path.parts[-2] / 'namespace.py'
out_file.parent.mkdir(parents=True,exist_ok=True)
with open(out_file, 'w') as ofile:
ofile.write(serialized)
with open(out_file.parent / '__init__.py', 'w') as initfile:
initfile.write(' ')
# make parent file, being a bit more careful because it could be for another module
parent_init = out_file.parent.parent / '__init__.py'
if not parent_init.exists():
with open(parent_init, 'w') as initfile:
initfile.write(' ')
return serialized
@ -487,7 +638,7 @@ class PydanticProvider(Provider):
module = self.import_module(namespace, version)
return module
def get_class(self, namespace: str, class_: str, version: Optional[str] = None) -> BaseModel:
def get_class(self, namespace: str, class_: str, version: Optional[str] = None) -> Type[BaseModel]:
"""
Get a class from a given namespace and version!
@ -507,10 +658,10 @@ class PydanticProvider(Provider):
class SchemaProvider:
class SchemaProvider(Provider):
"""
Class to manage building and caching linkml and pydantic models generated
from nwb schema language
from nwb schema language. Combines :class:`.LinkMLProvider` and :class:`.PydanticProvider`
Behaves like a singleton without needing to be one - since we're working off
caches on disk that are indexed by hash in most "normal" conditions you should
@ -519,52 +670,81 @@ class SchemaProvider:
Store each generated schema in a directory structure indexed by
schema namespace name and version
eg:
cache_dir
- linkml
- nwb_core
- v0_2_0
- namespace.yaml
- nwb.core.file.yaml
- ...
- v0_2_1
- namespace.yaml
- ...
- my_schema
- v0_1_0
- ...
- pydantic
- nwb_core
- v0_2_0
- namespace.py
- ...
- v0_2_1
- namespace.py
- ...
"""
build_from_yaml = LinkMLProvider.build_from_yaml
"""
Alias for :meth:`.LinkMLProvider.build_from_yaml` that also builds a pydantic model
"""
build_from_dicts = LinkMLProvider.build_from_dicts
"""
Alias for :meth:`.LinkMLProvider.build_from_dicts` that also builds a pydantic model
"""
def __init__(self,
path: Optional[Path] = None,
verbose: bool = True):
@property
def path(self) -> Path:
return self.config.cache_dir
def build(
self,
ns_adapter: adapters.NamespacesAdapter,
verbose: bool = True,
linkml_kwargs: Optional[dict] = None,
pydantic_kwargs: Optional[dict] = None,
**kwargs
) -> Dict[str, str]:
"""
Arguments:
path (bool): If provided, output to an explicit base directory.
Otherwise use that provided in ``NWB_LINKML_CACHE_DIR``
verbose (bool): If ``True`` (default), show progress bars and other messages
useful for interactive use
Build a namespace, storing its linkML and pydantic models.
Args:
ns_adapter:
verbose (bool): If ``True`` (default), show progress bars
linkml_kwargs (Optional[dict]): Dictionary of kwargs optionally passed to :meth:`.LinkMLProvider.build`
pydantic_kwargs (Optional[dict]): Dictionary of kwargs optionally passed to :meth:`.PydanticProvider.build`
**kwargs: Common options added to both ``linkml_kwargs`` and ``pydantic_kwargs``
Returns:
Dict[str,str] mapping namespaces to built pydantic sources
"""
if path is not None:
config = Config(cache_dir=path)
else:
config = Config()
self.cache_dir = config.cache_dir
self.pydantic_dir = config.pydantic_dir
self.linkml_dir = config.linkml_dir
if linkml_kwargs is None:
linkml_kwargs = {}
if pydantic_kwargs is None:
pydantic_kwargs = {}
linkml_kwargs.update(kwargs)
pydantic_kwargs.update(kwargs)
linkml_provider = LinkMLProvider(path=self.path, verbose=verbose)
pydantic_provider = PydanticProvider(path=self.path, verbose=verbose)
linkml_res = linkml_provider.build(ns_adapter=ns_adapter, **linkml_kwargs)
results = {}
for ns, ns_result in linkml_res.items():
results[ns] = pydantic_provider.build(ns_result['namespace'], **pydantic_kwargs)
return results
def get(self, namespace: str, version: Optional[str] = None) -> ModuleType:
"""
Get a built pydantic model for a given namespace and version.
Wrapper around :meth:`.PydanticProvider.get`
"""
return PydanticProvider(path=self.path).get(namespace, version)
def get_class(self, namespace: str, class_: str, version: Optional[str] = None) -> Type[BaseModel]:
"""
Get a pydantic model class from a given namespace and version!
Wrapper around :meth:`.PydanticProvider.get_class`
"""
return PydanticProvider(path=self.path).get_class(namespace, class_, version)
self.verbose = verbose

View file

@ -1,6 +1,10 @@
import pdb
import pytest
from pathlib import Path
from ..fixtures import tmp_output_dir, set_config_vars
from nwb_linkml.io.hdf5 import HDF5IO
@pytest.mark.skip()
def test_hdf_read():
@ -8,4 +12,6 @@ def test_hdf_read():
if not NWBFILE.exists():
return
io = HDF5IO(path=NWBFILE)
model = io.read('/general')
model = io.read('acquisition')
pdb.set_trace()

View file

@ -1,19 +1,34 @@
import pdb
import shutil
import os
import traceback
from argparse import ArgumentParser
from pathlib import Path
from linkml_runtime.dumpers import yaml_dumper
from rich.live import Live
from rich.panel import Panel
from rich.console import Group
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, Column
from rich import print
from nwb_linkml.generators.pydantic import NWBPydanticGenerator
from nwb_linkml.src.nwb_linkml import io
from nwb_linkml.providers.schema import LinkMLProvider, PydanticProvider
from nwb_linkml.providers.git import NWB_CORE_REPO, GitRepo
from nwb_linkml.io import schema as io
def generate_core_yaml(output_path:Path, dry_run:bool=False):
"""Just build the latest version of the core schema"""
def generate_core_yaml(output_path:Path):
core = io.load_nwb_core()
built_schemas = core.build().schemas
for schema in built_schemas:
output_file = output_path / (schema.name + '.yaml')
yaml_dumper.dump(schema, output_file)
if not dry_run:
yaml_dumper.dump(schema, output_file)
def generate_core_pydantic(yaml_path:Path, output_path:Path):
def generate_core_pydantic(yaml_path:Path, output_path:Path, dry_run:bool=False):
"""Just generate the latest version of the core schema"""
for schema in yaml_path.glob('*.yaml'):
python_name = schema.stem.replace('.', '_').replace('-', '_')
pydantic_file = (output_path / python_name).with_suffix('.py')
@ -26,11 +41,109 @@ def generate_core_pydantic(yaml_path:Path, output_path:Path):
gen_slots=True
)
gen_pydantic = generator.serialize()
with open(pydantic_file, 'w') as pfile:
pfile.write(gen_pydantic)
if not dry_run:
with open(pydantic_file, 'w') as pfile:
pfile.write(gen_pydantic)
def generate_versions(yaml_path:Path, pydantic_path:Path, dry_run:bool=False):
"""
Generate linkml models for all versions
"""
repo = GitRepo(NWB_CORE_REPO)
#repo.clone(force=True)
repo.clone()
# use a directory underneath this one as the temporary directory rather than
# the default hidden one
tmp_dir = Path(__file__).parent / '__tmp__'
if tmp_dir.exists():
shutil.rmtree(tmp_dir)
tmp_dir.mkdir()
linkml_provider = LinkMLProvider(path=tmp_dir, verbose=False)
pydantic_provider = PydanticProvider(path=tmp_dir, verbose=False)
failed_versions = {}
overall_progress = Progress()
overall_task = overall_progress.add_task('All Versions', total=len(NWB_CORE_REPO.versions))
build_progress = Progress(
TextColumn("[bold blue]{task.fields[name]} - [bold green]{task.fields[action]}",
table_column=Column(ratio=1)),
BarColumn(table_column=Column(ratio=1), bar_width=None)
)
panel = Panel(Group(build_progress, overall_progress))
with Live(panel) as live:
# make pbar tasks
linkml_task = None
pydantic_task = None
for version in NWB_CORE_REPO.versions:
# build linkml
try:
# check out the version (this should also refresh the hdmf-common schema)
linkml_task = build_progress.add_task('', name=version, action='Checkout Version', total=3)
repo.tag = version
build_progress.update(linkml_task, advance=1, action="Load Namespaces")
# first load the core namespace
core_ns = io.load_namespace_adapter(repo.namespace_file)
# then the hdmf-common namespace
hdmf_common_ns = io.load_namespace_adapter(repo.temp_directory / 'hdmf-common-schema' / 'common' / 'namespace.yaml')
core_ns.imported.append(hdmf_common_ns)
build_progress.update(linkml_task, advance=1, action="Build LinkML")
linkml_res = linkml_provider.build(core_ns)
build_progress.update(linkml_task, advance=1, action="Built LinkML")
# build pydantic
ns_files = [res['namespace'] for res in linkml_res.values()]
all_schema = []
for ns_file in ns_files:
all_schema.extend(list(ns_file.parent.glob('*.yaml')))
pydantic_task = build_progress.add_task('', name=version, action='', total=len(all_schema))
for schema in all_schema:
pbar_string = ' - '.join([schema.parts[-3], schema.parts[-2], schema.parts[-1]])
build_progress.update(pydantic_task, action=pbar_string)
pydantic_provider.build(schema, versions=core_ns.versions, split=True)
build_progress.update(pydantic_task, advance=1)
build_progress.update(pydantic_task, action='Built Pydantic')
except Exception as e:
build_progress.stop_task(linkml_task)
if linkml_task is not None:
build_progress.update(linkml_task, action='[bold red]LinkML Build Failed')
build_progress.stop_task(linkml_task)
if pydantic_task is not None:
build_progress.update(pydantic_task, action='[bold red]LinkML Build Failed')
build_progress.stop_task(pydantic_task)
failed_versions[version] = traceback.format_exception(e)
finally:
overall_progress.update(overall_task, advance=1)
linkml_task = None
pydantic_task = None
if not dry_run:
shutil.move(tmp_dir / 'linkml', yaml_path)
shutil.move(tmp_dir / 'pydantic', pydantic_path)
if len(failed_versions) > 0:
print('Failed Building Versions:')
print(failed_versions)
def parser() -> ArgumentParser:
parser = ArgumentParser('Generate NWB core schema')
parser = ArgumentParser('Generate all available versions of NWB core schema')
parser.add_argument(
'--yaml',
help="directory to export linkML schema to",
@ -43,15 +156,29 @@ def parser() -> ArgumentParser:
type=Path,
default=Path(__file__).parent.parent / 'nwb_linkml' / 'src' / 'nwb_linkml' / 'models'
)
parser.add_argument(
'--latest',
help="Only generate the latest version of the core schemas.",
action="store_true"
)
parser.add_argument(
'--dry-run',
help="Generate schema and pydantic models without moving them into the target directories, for testing purposes",
action='store_true'
)
return parser
def main():
args = parser().parse_args()
args.yaml.mkdir(exist_ok=True)
args.pydantic.mkdir(exist_ok=True)
generate_core_yaml(args.yaml)
generate_core_pydantic(args.yaml, args.pydantic)
if not args.dry_run:
args.yaml.mkdir(exist_ok=True)
args.pydantic.mkdir(exist_ok=True)
if args.latest:
generate_core_yaml(args.yaml, args.dry_run)
generate_core_pydantic(args.yaml, args.pydantic, args.dry_run)
else:
generate_versions(args.yaml, args.pydantic, args.dry_run)
if __name__ == "__main__":
main()