remove nwb_linkml remnants, add to_json method for json serialization

This commit is contained in:
sneakers-the-rat 2024-04-22 19:31:56 -07:00
parent 46060c1154
commit 5b722bb6da
Signed by untrusted user who does not match committer: jonny
GPG key ID: 6DCB96EF1E4D232D
11 changed files with 212 additions and 204 deletions

View file

@ -8,7 +8,6 @@ authors = [
dependencies = [
"pydantic>=2.3.0",
"nptyping>=2.5.0",
"blosc2<3.0.0,>=2.5.1",
"numpy>=1.24.0",
]
requires-python = "<4.0,>=3.9"

View file

@ -1,4 +1,5 @@
from typing import Any
import numpy as np
from numpydantic.interface.interface import Interface
try:
@ -28,3 +29,19 @@ class DaskInterface(Interface):
def enabled(cls) -> bool:
"""check if we successfully imported dask"""
return DaskArray is not None
@classmethod
def to_json(cls, array: DaskArray) -> list:
"""
Convert an array to a JSON serializable array by first converting to a numpy
array and then to a list.
.. note::
This is likely a very memory intensive operation if you are using dask for
large arrays. This can't be avoided, since the creation of the json string
happens in-memory with Pydantic, so you are likely looking for a different
method of serialization here using the python object itself rather than
its JSON representation.
"""
return np.array(array).tolist()

View file

@ -1,3 +1,4 @@
import pdb
from pathlib import Path
from typing import Any, NamedTuple, Tuple, Union, TypeAlias
@ -14,7 +15,7 @@ except ImportError:
H5Arraylike: TypeAlias = Tuple[Union[Path, str], str]
class H5Array(NamedTuple):
class H5ArrayPath(NamedTuple):
"""Location specifier for arrays within an HDF5 file"""
file: Union[Path, str]
@ -43,6 +44,7 @@ class H5Proxy:
"""
def __init__(self, file: Union[Path, str], path: str):
self._h5f = None
self.file = Path(file)
self.path = path
@ -53,8 +55,8 @@ class H5Proxy:
return obj is not None
@classmethod
def from_h5array(cls, h5array: H5Array) -> "H5Proxy":
"""Instantiate using :class:`.H5Array`"""
def from_h5array(cls, h5array: H5ArrayPath) -> "H5Proxy":
"""Instantiate using :class:`.H5ArrayPath`"""
return H5Proxy(file=h5array.file, path=h5array.path)
def __getattr__(self, item: str):
@ -72,18 +74,37 @@ class H5Proxy:
obj = h5f.get(self.path)
obj[key] = value
def open(self, mode: str = "r"):
"""
Return the opened :class:`h5py.Dataset` object
You must remember to close the associated file with :meth:`.close`
"""
if self._h5f is None:
self._h5f = h5py.File(self.file, mode)
return self._h5f.get(self.path)
def close(self):
"""
Close the :class:`h5py.File` object left open when returning the dataset with
:meth:`.open`
"""
if self._h5f is not None:
self._h5f.close()
self._h5f = None
class H5Interface(Interface):
"""
Interface for Arrays stored as datasets within an HDF5 file.
Takes a :class:`.H5Array` specifier to select a :class:`h5py.Dataset` from a
Takes a :class:`.H5ArrayPath` specifier to select a :class:`h5py.Dataset` from a
:class:`h5py.File` and returns a :class:`.H5Proxy` class that acts like a
passthrough numpy-like interface to the dataset.
"""
input_types = (
H5Array,
H5ArrayPath,
H5Arraylike,
)
return_type = H5Proxy
@ -94,9 +115,9 @@ class H5Interface(Interface):
return h5py is not None
@classmethod
def check(cls, array: Union[H5Array, Tuple[Union[Path, str], str]]) -> bool:
"""Check that the given array is a :class:`.H5Array` or something that resembles one."""
if isinstance(array, H5Array):
def check(cls, array: Union[H5ArrayPath, Tuple[Union[Path, str], str]]) -> bool:
"""Check that the given array is a :class:`.H5ArrayPath` or something that resembles one."""
if isinstance(array, H5ArrayPath):
return True
if isinstance(array, (tuple, list)) and len(array) == 2:
@ -125,7 +146,7 @@ class H5Interface(Interface):
def before_validation(self, array: Any) -> NDArrayType:
"""Create an :class:`.H5Proxy` to use throughout validation"""
if isinstance(array, H5Array):
if isinstance(array, H5ArrayPath):
array = H5Proxy.from_h5array(h5array=array)
elif isinstance(array, (tuple, list)) and len(array) == 2:
array = H5Proxy(file=array[0], path=array[1])
@ -141,3 +162,17 @@ class H5Interface(Interface):
)
return array
@classmethod
def to_json(cls, array: H5Proxy) -> dict:
try:
dset = array.open()
meta = {
"file": array.file,
"path": array.path,
"attrs": dict(dset.attrs),
"array": dset[:].tolist(),
}
return meta
finally:
array.close()

View file

@ -1,7 +1,8 @@
from abc import ABC, abstractmethod
from operator import attrgetter
from typing import Any, Generic, Tuple, Type, TypeVar
from typing import Any, Generic, Tuple, Type, TypeVar, Union
import numpy as np
from nptyping.shape_expression import check_shape
from numpydantic.exceptions import DtypeError, ShapeError
@ -92,6 +93,15 @@ class Interface(ABC, Generic[T]):
Check whether this array interface can be used (eg. its dependent packages are installed, etc.)
"""
@classmethod
def to_json(cls, array: Type[T]) -> Union[list, dict]:
"""
Convert an array of :attr:`.return_type` to a JSON-compatible format using base python types
"""
if not isinstance(array, np.ndarray):
array = np.array(array)
return array.tolist()
@classmethod
def interfaces(cls) -> Tuple[Type["Interface"], ...]:
"""
@ -106,7 +116,7 @@ class Interface(ABC, Generic[T]):
)
@classmethod
def array_types(cls) -> Tuple[NDArrayType, ...]:
def return_types(cls) -> Tuple[NDArrayType, ...]:
"""Return types for all enabled interfaces"""
return tuple([i.return_type for i in cls.interfaces()])
@ -125,7 +135,7 @@ class Interface(ABC, Generic[T]):
@classmethod
def match(cls, array: Any) -> Type["Interface"]:
"""
Find the interface that should be used for this array
Find the interface that should be used for this array based on its input type
"""
matches = [i for i in cls.interfaces() if i.check(array)]
if len(matches) > 1:
@ -136,3 +146,21 @@ class Interface(ABC, Generic[T]):
raise ValueError(f"No matching interfaces found for input {array}")
else:
return matches[0]
@classmethod
def match_output(cls, array: Any) -> Type["Interface"]:
"""
Find the interface that should be used based on the output type -
in the case that the output type differs from the input type, eg.
the HDF5 interface, match an instantiated array for purposes of
serialization to json, etc.
"""
matches = [i for i in cls.interfaces() if isinstance(array, i.return_type)]
if len(matches) > 1:
msg = f"More than one interface matches output {array}:\n"
msg += "\n".join([f" - {i}" for i in matches])
raise ValueError(msg)
elif len(matches) == 0:
raise ValueError(f"No matching interfaces found for output {array}")
else:
return matches[0]

View file

@ -2,6 +2,7 @@ from datetime import datetime
from typing import Any
import numpy as np
from nptyping import Float, Int, String, Bool
np_to_python = {
Any: Any,
@ -74,3 +75,5 @@ flat_to_nptyping = {
"AnyType": "Any",
"object": "Object",
}
python_to_nptyping = {float: Float, str: String, int: Int, bool: Bool}

View file

@ -4,19 +4,14 @@ Extension of nptyping NDArray for pydantic that allows for JSON-Schema serializa
* Order to store data in (row first)
"""
import base64
import sys
from collections.abc import Callable
from copy import copy
from typing import Any, Tuple, TypeVar, cast, Union
from typing import Any, Tuple, Union
import blosc2
import nptyping.structure
import numpy as np
from nptyping import Shape
from nptyping.ndarray import NDArrayMeta as _NDArrayMeta
from nptyping.nptyping_type import NPTypingType
from nptyping.shape_expression import check_shape
from pydantic_core import core_schema
from pydantic_core.core_schema import ListSchema
@ -65,57 +60,7 @@ def list_of_lists_schema(shape: Shape, array_type_handler: dict) -> ListSchema:
return list_schema
def jsonize_array(array: NDArrayType) -> list | dict:
"""
Render an array to base python types that can be serialized to JSON
For small arrays, returns a list of lists.
If the array is over :class:`.COMPRESSION_THRESHOLD` bytes, use :func:`.compress_array`
to return a compressed b64 encoded string.
Args:
array (:class:`np.ndarray`, :class:`dask.DaskArray`): Array to render as a list!
"""
# if isinstance(array, DaskArray):
# arr = array.__array__()
# elif isinstance(array, NDArrayProxy):
# arr = array[:]
# else:
# arr = array
arr = array
# If we're larger than 16kB then compress array!
if sys.getsizeof(arr) > COMPRESSION_THRESHOLD:
packed = blosc2.pack_array2(arr)
packed = base64.b64encode(packed)
ret = {
"array": packed,
"shape": copy(arr.shape),
"dtype": copy(arr.dtype.name),
"unpack_fns": ["base64.b64decode", "blosc2.unpack_array2"],
}
return ret
else:
return arr.tolist()
def get_validate_shape(shape: Shape) -> Callable:
"""
Get a closure around a shape validation function that includes the shape definition
"""
def validate_shape(value: Any) -> np.ndarray:
assert shape is Any or check_shape(
value.shape, shape
), f"Invalid shape! expected shape {shape.prepared_args}, got shape {value.shape}"
return value
return validate_shape
def get_validate_interface(shape: ShapeType, dtype: DtypeType) -> Callable:
def _get_validate_interface(shape: ShapeType, dtype: DtypeType) -> Callable:
"""
Validate using a matching :class:`.Interface` class using its :meth:`.Interface.validate` method
"""
@ -129,6 +74,11 @@ def get_validate_interface(shape: ShapeType, dtype: DtypeType) -> Callable:
return validate_interface
def _jsonize_array(value: Any) -> Union[list, dict]:
interface_cls = Interface.match_output(value)
return interface_cls.to_json(value)
def coerce_list(value: Any) -> np.ndarray:
"""
If a value is passed as a list or list of lists, try and coerce it into an array
@ -147,9 +97,6 @@ class NDArrayMeta(_NDArrayMeta, implementation="NDArray"):
"""
T = TypeVar("T")
class NDArray(NPTypingType, metaclass=NDArrayMeta):
"""
Constrained array type allowing npytyping syntax for dtype and shape validation and serialization.
@ -196,15 +143,11 @@ class NDArray(NPTypingType, metaclass=NDArrayMeta):
[
core_schema.no_info_plain_validator_function(coerce_list),
core_schema.with_info_plain_validator_function(
get_validate_interface(shape, dtype)
_get_validate_interface(shape, dtype)
),
]
),
serialization=core_schema.plain_serializer_function_ser_schema(
jsonize_array, when_used="json"
_jsonize_array, when_used="json"
),
)
NDArray = cast(Union[np.ndarray, list[int]], NDArray)
# NDArray = cast(Union[Interface.array_types()], NDArray)

View file

@ -1,48 +0,0 @@
from collections.abc import Callable
from pathlib import Path
from typing import Any
import h5py
import numpy as np
from nptyping import NDArray as _NDArray
from pydantic_core import core_schema
class NDArrayProxy:
"""
Thin proxy to numpy arrays stored within hdf5 files,
only read into memory when accessed, but otherwise
passthrough all attempts to access attributes.
"""
def __init__(self, h5f_file: Path | str, path: str):
"""
Args:
h5f_file (:class:`pathlib.Path`): Path to source HDF5 file
path (str): Location within HDF5 file where this array is located
"""
self.h5f_file = Path(h5f_file)
self.path = path
def __getattr__(self, item) -> Any:
with h5py.File(self.h5f_file, "r") as h5f:
obj = h5f.get(self.path)
return getattr(obj, item)
def __getitem__(self, slice: slice) -> np.ndarray:
with h5py.File(self.h5f_file, "r") as h5f:
obj = h5f.get(self.path)
return obj[slice]
def __setitem__(self, slice, value) -> None:
raise NotImplementedError("Cant write into an arrayproxy yet!")
@classmethod
def __get_pydantic_core_schema__(
cls,
_source_type: _NDArray,
_handler: Callable[[Any], core_schema.CoreSchema],
) -> core_schema.CoreSchema:
from numpydantic import NDArray
return NDArray.__get_pydantic_core_schema__(cls, _source_type, _handler)

View file

@ -1,40 +1,11 @@
import pytest
from pathlib import Path
from typing import Optional, Union, Type
import h5py
import numpy as np
from pydantic import BaseModel, Field
from numpydantic.interface.hdf5 import H5Array
from numpydantic import NDArray, Shape
from nptyping import Number
from tests.fixtures import *
@pytest.fixture(scope="session")
def model_rgb() -> Type[BaseModel]:
class RGB(BaseModel):
array: Optional[
Union[
NDArray[Shape["* x, * y"], Number],
NDArray[Shape["* x, * y, 3 r_g_b"], Number],
NDArray[Shape["* x, * y, 3 r_g_b, 4 r_g_b_a"], Number],
]
] = Field(None)
return RGB
@pytest.fixture(scope="function")
def h5file(tmp_path) -> h5py.File:
h5f = h5py.File(tmp_path / "file.h5", "w")
yield h5f
h5f.close()
@pytest.fixture(scope="function")
def h5_array(h5file) -> H5Array:
"""trivial hdf5 array used for testing array existence"""
path = "/data"
h5file.create_dataset(path, data=np.zeros((3, 4)))
return H5Array(file=Path(h5file.filename), path=path)
def pytest_addoption(parser):
parser.addoption(
"--with-output",
action="store_true",
help="Keep test outputs in the __tmp__ directory",
)

View file

@ -1,27 +1,38 @@
import shutil
from pathlib import Path
from typing import Callable, Optional, Tuple, Type, Union
import h5py
import numpy as np
import pytest
from linkml_runtime.linkml_model import ClassDefinition, SlotDefinition
from nptyping import Number
from pydantic import BaseModel, Field
from numpydantic.interface.hdf5 import H5ArrayPath
from numpydantic import NDArray, Shape
from numpydantic.maps import python_to_nptyping
@pytest.fixture(scope="session")
def tmp_output_dir() -> Path:
def tmp_output_dir(request: pytest.FixtureRequest) -> Path:
path = Path(__file__).parent.resolve() / "__tmp__"
if path.exists():
shutil.rmtree(str(path))
path.mkdir()
return path
yield path
if not request.config.getvalue("--with-output"):
shutil.rmtree(str(path))
@pytest.fixture(scope="function")
def tmp_output_dir_func(tmp_output_dir) -> Path:
def tmp_output_dir_func(tmp_output_dir, request: pytest.FixtureRequest) -> Path:
"""
tmp output dir that gets cleared between every function
cleans at the start rather than at cleanup in case the output is to be inspected
"""
subpath = tmp_output_dir / "__tmpfunc__"
subpath = tmp_output_dir / f"__tmpfunc_{request.node.name}__"
if subpath.exists():
shutil.rmtree(str(subpath))
subpath.mkdir()
@ -29,46 +40,68 @@ def tmp_output_dir_func(tmp_output_dir) -> Path:
@pytest.fixture(scope="module")
def tmp_output_dir_mod(tmp_output_dir) -> Path:
def tmp_output_dir_mod(tmp_output_dir, request: pytest.FixtureRequest) -> Path:
"""
tmp output dir that gets cleared between every function
cleans at the start rather than at cleanup in case the output is to be inspected
"""
subpath = tmp_output_dir / "__tmpmod__"
subpath = tmp_output_dir / f"__tmpmod_{request.module}__"
if subpath.exists():
shutil.rmtree(str(subpath))
subpath.mkdir()
return subpath
@pytest.fixture()
def nwb_linkml_array() -> tuple[ClassDefinition, str]:
classdef = ClassDefinition(
name="NWB_Linkml Array",
description="Main class's array",
is_a="Arraylike",
attributes=[
SlotDefinition(name="x", range="numeric", required=True),
SlotDefinition(name="y", range="numeric", required=True),
SlotDefinition(
name="z",
range="numeric",
required=False,
maximum_cardinality=3,
minimum_cardinality=3,
),
SlotDefinition(
name="a",
range="numeric",
required=False,
minimum_cardinality=4,
maximum_cardinality=4,
),
],
)
generated = """Union[
NDArray[Shape["* x, * y"], Number],
NDArray[Shape["* x, * y, 3 z"], Number],
NDArray[Shape["* x, * y, 3 z, 4 a"], Number]
]"""
return classdef, generated
@pytest.fixture(scope="function")
def array_model() -> (
Callable[[Tuple[int, ...], Union[Type, np.dtype]], Type[BaseModel]]
):
def _model(
shape: Tuple[int, ...] = (10, 10), dtype: Union[Type, np.dtype] = float
) -> Type[BaseModel]:
shape_str = ", ".join([str(s) for s in shape])
class MyModel(BaseModel):
array: NDArray[Shape[shape_str], python_to_nptyping[dtype]]
return MyModel
return _model
@pytest.fixture(scope="session")
def model_rgb() -> Type[BaseModel]:
class RGB(BaseModel):
array: Optional[
Union[
NDArray[Shape["* x, * y"], Number],
NDArray[Shape["* x, * y, 3 r_g_b"], Number],
NDArray[Shape["* x, * y, 3 r_g_b, 4 r_g_b_a"], Number],
]
] = Field(None)
return RGB
@pytest.fixture(scope="function")
def hdf5_file(tmp_output_dir_func) -> h5py.File:
h5f_file = tmp_output_dir_func / "h5f.h5"
h5f = h5py.File(h5f_file, "w")
yield h5f
h5f.close()
@pytest.fixture(scope="function")
def hdf5_array(
hdf5_file, request
) -> Callable[[Tuple[int, ...], Union[np.dtype, type]], H5ArrayPath]:
def _hdf5_array(
shape: Tuple[int, ...] = (10, 10), dtype: Union[np.dtype, type] = float
) -> H5ArrayPath:
array_path = "/" + "_".join([str(s) for s in shape]) + "__" + dtype.__name__
data = np.random.random(shape).astype(dtype)
_ = hdf5_file.create_dataset(array_path, data=data)
return H5ArrayPath(Path(hdf5_file.filename), array_path)
return _hdf5_array

View file

@ -4,7 +4,7 @@ import numpy as np
import dask.array as da
from numpydantic import interface
from tests.conftest import h5_array, h5file
from tests.fixtures import hdf5_array
@pytest.fixture(
@ -12,10 +12,10 @@ from tests.conftest import h5_array, h5file
params=[
([[1, 2], [3, 4]], interface.NumpyInterface),
(np.zeros((3, 4)), interface.NumpyInterface),
(h5_array, interface.H5Interface),
(hdf5_array, interface.H5Interface),
(da.random.random((10, 10)), interface.DaskInterface),
],
ids=["numpy_list", "numpy", "H5Array", "dask"],
ids=["numpy_list", "numpy", "H5ArrayPath", "dask"],
)
def interface_type(request):
return request.param

View file

@ -0,0 +1,27 @@
import pdb
import json
from pydantic import BaseModel
def test_to_json(hdf5_array, array_model):
"""
Test serialization of HDF5 arrays to JSON
Args:
hdf5_array:
Returns:
"""
array = hdf5_array((10, 10), int)
model = array_model((10, 10), int)
instance = model(array=array) # type: BaseModel
json_str = instance.model_dump_json()
json_dict = json.loads(json_str)["array"]
assert json_dict["file"] == str(array.file)
assert json_dict["path"] == str(array.path)
assert json_dict["attrs"] == {}
assert json_dict["array"] == instance.array[:].tolist()