tests for paths

This commit is contained in:
sneakers-the-rat 2024-09-23 17:30:06 -07:00
parent 66fffc49f8
commit f9a992843e
Signed by untrusted user who does not match committer: jonny
GPG key ID: 6DCB96EF1E4D232D
6 changed files with 146 additions and 34 deletions

View file

@ -567,7 +567,9 @@ class Interface(ABC, Generic[T]):
if interface_module is None if interface_module is None
else version(interface_module.split(".")[0]) else version(interface_module.split(".")[0])
) )
except PackageNotFoundError: except (
PackageNotFoundError
): # pragma: no cover - no tests for missing interface deps
v = None v = None
return InterfaceMark( return InterfaceMark(

View file

@ -25,8 +25,6 @@ if TYPE_CHECKING: # pragma: no cover
from numpydantic import Shape from numpydantic import Shape
_UNSUPPORTED_TYPES = (complex,)
def _numeric_dtype( def _numeric_dtype(
dtype: DtypeType, _handler: "CallbackGetCoreSchemaHandler" dtype: DtypeType, _handler: "CallbackGetCoreSchemaHandler"
@ -41,10 +39,6 @@ def _numeric_dtype(
elif issubclass(dtype, np.integer): elif issubclass(dtype, np.integer):
info = np.iinfo(dtype) info = np.iinfo(dtype)
schema = core_schema.int_schema(le=int(info.max), ge=int(info.min)) schema = core_schema.int_schema(le=int(info.max), ge=int(info.min))
elif dtype is float:
schema = core_schema.float_schema()
elif dtype is int:
schema = core_schema.int_schema()
else: else:
schema = _handler.generate_schema(dtype) schema = _handler.generate_schema(dtype)
@ -89,10 +83,7 @@ def _lol_dtype(
# does this need a warning? # does this need a warning?
python_type = Any python_type = Any
if python_type in _UNSUPPORTED_TYPES: if python_type in (float, int):
array_type = core_schema.any_schema()
# TODO: warn and log here
elif python_type in (float, int):
array_type = _numeric_dtype(dtype, _handler) array_type = _numeric_dtype(dtype, _handler)
elif python_type is bool: elif python_type is bool:
array_type = core_schema.bool_schema() array_type = core_schema.bool_schema()

View file

@ -30,6 +30,9 @@ def jsonize_array(value: Any, info: SerializationInfo) -> Union[list, dict]:
else: else:
relative_to = info.context.get("relative_to", ".") relative_to = info.context.get("relative_to", ".")
array = _relativize_paths(array, relative_to) array = _relativize_paths(array, relative_to)
else:
# relativize paths by default
array = _relativize_paths(array, ".")
return array return array
@ -40,6 +43,7 @@ def _relativize_paths(value: dict, relative_to: str = ".") -> dict:
``relative_to`` directory, if provided in the context ``relative_to`` directory, if provided in the context
""" """
relative_to = Path(relative_to).resolve() relative_to = Path(relative_to).resolve()
# pdb.set_trace()
def _r_path(v: Any) -> Any: def _r_path(v: Any) -> Any:
try: try:
@ -85,25 +89,6 @@ def _walk_and_apply(value: T, f: Callable[[U], U]) -> T:
return value return value
# def relative_path(target: Path, origin: Path) -> Path:
# """
# return path of target relative to origin, even if they're
# not in the same subpath
#
# References:
# - https://stackoverflow.com/a/71874881
# """
# try:
# return Path(target).resolve().relative_to(Path(origin).resolve())
# except ValueError: # target does not start with origin
# # recursion with origin (eventually origin is root so try will succeed)
# try:
# return Path("..").joinpath(relative_path(target, Path(origin).parent))
# except ValueError:
# # break recursion in windows when
# pass
def relative_path(self: Path, other: Path, walk_up: bool = True) -> Path: def relative_path(self: Path, other: Path, walk_up: bool = True) -> Path:
""" """
"Backport" of :meth:`pathlib.Path.relative_to` with ``walk_up=True`` "Backport" of :meth:`pathlib.Path.relative_to` with ``walk_up=True``
@ -119,7 +104,8 @@ def relative_path(self: Path, other: Path, walk_up: bool = True) -> Path:
References: References:
https://github.com/python/cpython/blob/8a2baedc4bcb606da937e4e066b4b3a18961cace/Lib/pathlib/_abc.py#L244-L270 https://github.com/python/cpython/blob/8a2baedc4bcb606da937e4e066b4b3a18961cace/Lib/pathlib/_abc.py#L244-L270
""" """
if not isinstance(other, Path): # pdb.set_trace()
if not isinstance(other, Path): # pragma: no cover - ripped from cpython
other = Path(other) other = Path(other)
self_parts = self.parts self_parts = self.parts
other_parts = other.parts other_parts = other.parts
@ -130,7 +116,7 @@ def relative_path(self: Path, other: Path, walk_up: bool = True) -> Path:
while parts0 and parts1 and parts0[-1] == parts1[-1]: while parts0 and parts1 and parts0[-1] == parts1[-1]:
parts0.pop() parts0.pop()
parts1.pop() parts1.pop()
for part in parts1: for part in parts1: # pragma: no cover - not testing, ripped off from cpython
if not part or part == ".": if not part or part == ".":
pass pass
elif not walk_up: elif not walk_up:

View file

@ -122,7 +122,9 @@ def test_to_json(hdf5_array, array_model, round_trip):
instance = model(array=array) # type: BaseModel instance = model(array=array) # type: BaseModel
json_str = instance.model_dump_json(round_trip=round_trip) json_str = instance.model_dump_json(
round_trip=round_trip, context={"absolute_paths": True}
)
json_dumped = json.loads(json_str)["array"] json_dumped = json.loads(json_str)["array"]
if round_trip: if round_trip:
assert json_dumped["file"] == str(array.file) assert json_dumped["file"] == str(array.file)

View file

@ -9,7 +9,13 @@ from typing import Literal
import pytest import pytest
import numpy as np import numpy as np
from numpydantic.interface import Interface, JsonDict from numpydantic.interface import (
Interface,
JsonDict,
InterfaceMark,
NumpyInterface,
MarkedJson,
)
from pydantic import ValidationError from pydantic import ValidationError
from numpydantic.interface.interface import V from numpydantic.interface.interface import V
@ -210,3 +216,33 @@ def test_jsondict_handle_input():
for item in (valid, instantiated): for item in (valid, instantiated):
result = MyJsonDict.handle_input(item) result = MyJsonDict.handle_input(item)
assert result == expected assert result == expected
@pytest.mark.serialization
@pytest.mark.parametrize("interface", Interface.interfaces())
def test_interface_mark_match_by_name(interface):
"""
Interface mark should match an interface by its name
"""
# other parts don't matter
mark = InterfaceMark(module="fake", cls="fake", version="fake", name=interface.name)
fake_mark = InterfaceMark(
module="fake", cls="fake", version="fake", name="also_fake"
)
assert mark.match_by_name() is interface
assert fake_mark.match_by_name() is None
@pytest.mark.serialization
def test_marked_json_try_cast():
"""
MarkedJson.try_cast should try and cast to a markedjson!
returning the value unchanged if it's not a match
"""
valid = {"interface": NumpyInterface.mark_interface(), "value": [[1, 2], [3, 4]]}
invalid = [1, 2, 3, 4, 5]
mimic = {"interface": "not really", "value": "still not really"}
assert isinstance(MarkedJson.try_cast(valid), MarkedJson)
assert MarkedJson.try_cast(invalid) is invalid
assert MarkedJson.try_cast(mimic) is mimic

View file

@ -0,0 +1,95 @@
"""
Test serialization-specific functionality that doesn't need to be
applied across every interface (use test_interface/test_interfaces for that
"""
import pdb
import h5py
import pytest
from pathlib import Path
from typing import Callable
import numpy as np
import json
@pytest.fixture(scope="module")
def hdf5_at_path() -> Callable[[Path], None]:
_path = ""
def _hdf5_at_path(path: Path) -> None:
nonlocal _path
_path = path
h5f = h5py.File(path, "w")
_ = h5f.create_dataset("/data", data=np.array([[1, 2], [3, 4]]))
_ = h5f.create_dataset("subpath/to/dataset", data=np.array([[1, 2], [4, 5]]))
h5f.close()
yield _hdf5_at_path
Path(_path).unlink(missing_ok=True)
def test_relative_path(hdf5_at_path, tmp_output_dir, model_blank):
"""
By default, we should make all paths relative to the cwd
"""
out_path = tmp_output_dir / "relative.h5"
hdf5_at_path(out_path)
model = model_blank(array=(out_path, "/data"))
rt = model.model_dump_json(round_trip=True)
file = json.loads(rt)["array"]["file"]
# should not be absolute
assert not Path(file).is_absolute()
# should be relative to cwd
out_file = (Path.cwd() / file).resolve()
assert out_file == out_path.resolve()
def test_relative_to_path(hdf5_at_path, tmp_output_dir, model_blank):
"""
When explicitly passed a path to be ``relative_to`` ,
relative to that instead of cwd
"""
out_path = tmp_output_dir / "relative.h5"
relative_to_path = Path(__file__) / "fake_dir" / "sub_fake_dir"
expected_path = "../../../__tmp__/relative.h5"
hdf5_at_path(out_path)
model = model_blank(array=(out_path, "/data"))
rt = model.model_dump_json(
round_trip=True, context={"relative_to": str(relative_to_path)}
)
data = json.loads(rt)["array"]
file = data["file"]
# should not be absolute
assert not Path(file).is_absolute()
# should be expected path and reach the file
assert file == expected_path
assert (relative_to_path / file).resolve() == out_path.resolve()
# we shouldn't have touched `/data` even though it is pathlike
assert data["path"] == "/data"
def test_relative_to_path(hdf5_at_path, tmp_output_dir, model_blank):
"""
When told, we make paths absolute
"""
out_path = tmp_output_dir / "relative.h5"
expected_dataset = "subpath/to/dataset"
hdf5_at_path(out_path)
model = model_blank(array=(out_path, expected_dataset))
rt = model.model_dump_json(round_trip=True, context={"absolute_paths": True})
data = json.loads(rt)["array"]
file = data["file"]
# should be absolute and equal to out_path
assert Path(file).is_absolute()
assert Path(file) == out_path.resolve()
# shouldn't have absolutized subpath even if it's pathlike
assert data["path"] == expected_dataset