we don't need blosc tho

This commit is contained in:
sneakers-the-rat 2024-07-10 00:09:17 -07:00
parent 878c51e069
commit d6750f8df1
Signed by untrusted user who does not match committer: jonny
GPG key ID: 6DCB96EF1E4D232D
4 changed files with 4 additions and 204 deletions

View file

@ -12,6 +12,9 @@ Cleanup
- [ ] Update pydantic generator
- [ ] Make a minimal pydanticgen-only package to slim linkml deps?
- [ ] Disambiguate "maps" terminology - split out simple maps from the eg. dataset mapping classes
- [ ] Remove unnecessary imports
- dask
- nptyping
Important things that are not implemented yet!
@ -25,6 +28,7 @@ Important things that are not implemented yet!
- Or do we want to just say "no dynamictables, just subclass and add more slots since it's super easy to do that."
- method to return a dataframe
- append rows/this should just be a df basically.
- existing handler is fucked, for example, in `maps/hdmf`
- [ ] Handle indirect indexing eg. https://pynwb.readthedocs.io/en/stable/tutorials/general/plot_timeintervals.html#accessing-referenced-timeseries
## Docs TODOs

View file

@ -20,7 +20,6 @@ dependencies = [
"h5py>=3.9.0",
"pydantic-settings>=2.0.3",
"dask>=2023.9.2",
"blosc2>=2.2.7",
"tqdm>=4.66.1",
'typing-extensions>=4.12.2;python_version<"3.11"',
"numpydantic>=1.2.1",

View file

@ -1,199 +0,0 @@
"""
Extension of nptyping NDArray for pydantic that allows for JSON-Schema serialization
* Order to store data in (row first)
"""
# ruff: noqa: ANN001
# ruff: noqa: ANN202
# FIXME: this has been moved to numpydantic, remove.
import base64
import sys
from copy import copy
from pathlib import Path
from typing import Any, Callable
import blosc2
import h5py
import nptyping.structure
import numpy as np
from dask.array.core import Array as DaskArray
from nptyping import NDArray as _NDArray
from nptyping.ndarray import NDArrayMeta as _NDArrayMeta
from nptyping.nptyping_type import NPTypingType
from nptyping.shape_expression import check_shape
from pydantic_core import core_schema
from nwb_linkml.maps.dtype import allowed_precisions, np_to_python
def _list_of_lists_schema(shape, array_type_handler):
"""
Make a pydantic JSON schema for an array as a list of lists
"""
shape_parts = shape.__args__[0].split(",")
split_parts = [p.split(" ")[1] if len(p.split(" ")) == 2 else None for p in shape_parts]
# Construct a list of list schema
# go in reverse order - construct list schemas such that
# the final schema is the one that checks the first dimension
shape_labels = reversed(split_parts)
shape_args = reversed(shape.prepared_args)
list_schema = None
for arg, label in zip(shape_args, shape_labels):
# which handler to use? for the first we use the actual type
# handler, everywhere else we use the prior list handler
inner_schema = array_type_handler if list_schema is None else list_schema
# make a label annotation, if we have one
metadata = {"name": label} if label is not None else None
# make the current level list schema, accounting for shape
if arg == "*":
list_schema = core_schema.list_schema(inner_schema, metadata=metadata)
else:
arg = int(arg)
list_schema = core_schema.list_schema(
inner_schema, min_length=arg, max_length=arg, metadata=metadata
)
return list_schema
class NDArrayMeta(_NDArrayMeta, implementation="NDArray"):
"""
Kept here to allow for hooking into metaclass, which has
been necessary on and off as we work this class into a stable
state
"""
class NDArray(NPTypingType, metaclass=NDArrayMeta):
"""
Following the example here: https://docs.pydantic.dev/latest/usage/types/custom/#handling-third-party-types
"""
__args__ = (Any, Any)
@classmethod
def __get_pydantic_core_schema__(
cls,
_source_type: "NDArray",
_handler: Callable[[Any], core_schema.CoreSchema],
) -> core_schema.CoreSchema:
shape, dtype = _source_type.__args__
# get pydantic core schema for the given specified type
if isinstance(dtype, nptyping.structure.StructureMeta):
raise NotImplementedError("Jonny finish this")
# functools.reduce(operator.or_, [int, float, str])
else:
array_type_handler = _handler.generate_schema(np_to_python[dtype])
def validate_dtype(value: np.ndarray) -> np.ndarray:
if dtype is Any:
return value
assert (
value.dtype == dtype or value.dtype.name in allowed_precisions[dtype.__name__]
), f"Invalid dtype! expected {dtype}, got {value.dtype}"
return value
def validate_shape(value: Any) -> np.ndarray:
assert shape is Any or check_shape(
value.shape, shape
), f"Invalid shape! expected shape {shape.prepared_args}, got shape {value.shape}"
return value
def coerce_list(value: Any) -> np.ndarray:
if isinstance(value, list):
value = np.array(value)
return value
# get the names of the shape constraints, if any
if shape is Any:
list_schema = core_schema.list_schema(core_schema.any_schema())
else:
list_schema = _list_of_lists_schema(shape, array_type_handler)
def array_to_list(instance: np.ndarray | DaskArray) -> list | dict:
if isinstance(instance, DaskArray):
arr = instance.__array__()
elif isinstance(instance, NDArrayProxy):
arr = instance[:]
else:
arr = instance
# If we're larger than 16kB then compress array!
if sys.getsizeof(arr) > 16 * 1024:
packed = blosc2.pack_array2(arr)
packed = base64.b64encode(packed)
ret = {
"array": packed,
"shape": copy(arr.shape),
"dtype": copy(arr.dtype.name),
"unpack_fns": ["base64.b64decode", "blosc2.unpack_array2"],
}
return ret
else:
return arr.tolist()
return core_schema.json_or_python_schema(
json_schema=list_schema,
python_schema=core_schema.chain_schema(
[
core_schema.no_info_plain_validator_function(coerce_list),
core_schema.union_schema(
[
core_schema.is_instance_schema(cls=np.ndarray),
core_schema.is_instance_schema(cls=DaskArray),
core_schema.is_instance_schema(cls=NDArrayProxy),
]
),
core_schema.no_info_plain_validator_function(validate_dtype),
core_schema.no_info_plain_validator_function(validate_shape),
]
),
serialization=core_schema.plain_serializer_function_ser_schema(
array_to_list, when_used="json"
),
)
class NDArrayProxy:
"""
Thin proxy to numpy arrays stored within hdf5 files,
only read into memory when accessed, but otherwise
passthrough all attempts to access attributes.
"""
def __init__(self, h5f_file: Path | str, path: str):
"""
Args:
h5f_file (:class:`pathlib.Path`): Path to source HDF5 file
path (str): Location within HDF5 file where this array is located
"""
self.h5f_file = Path(h5f_file)
self.path = path
def __getattr__(self, item):
with h5py.File(self.h5f_file, "r") as h5f:
obj = h5f.get(self.path)
return getattr(obj, item)
def __getitem__(self, slice) -> np.ndarray:
with h5py.File(self.h5f_file, "r") as h5f:
obj = h5f.get(self.path)
return obj[slice]
def __setitem__(self, slice, value):
raise NotImplementedError("Can't write into an arrayproxy yet!")
@classmethod
def __get_pydantic_core_schema__(
cls,
_source_type: _NDArray,
_handler: Callable[[Any], core_schema.CoreSchema],
) -> core_schema.CoreSchema:
return NDArray.__get_pydantic_core_schema__(cls, _source_type, _handler)

View file

@ -1,4 +0,0 @@
import numpy as np
NDArray = np.ndarray
NDArrayProxy = np.ndarray