nwb-linkml/nwb_linkml/adapters/adapter.py

118 lines
4.1 KiB
Python

"""
Base class for adapters
"""
from abc import abstractmethod
import warnings
from dataclasses import dataclass, field
from typing import List, Dict, Type, Generator, Any, Tuple, Optional
from pydantic import BaseModel, Field, validator
from linkml_runtime.linkml_model import Element, SchemaDefinition, ClassDefinition, SlotDefinition, TypeDefinition
# SchemaDefClass = dataclass(SchemaDefinition).__pydantic_model__
@dataclass
class BuildResult:
# pass
schemas: List[SchemaDefinition] = field(default_factory=list)
classes: List[ClassDefinition] = field(default_factory=list)
slots: List[SlotDefinition] = field(default_factory=list)
types: List[TypeDefinition] = field(default_factory=list)
def __post_init__(self):
for field in ('schemas', 'classes', 'slots', 'types'):
attr = getattr(self, field)
if not isinstance(attr, list):
setattr(self, field, [attr])
def _dedupe(self, ours, others):
existing_names = [c.name for c in ours]
others_dedupe = [o for o in others if o.name not in existing_names]
return others_dedupe
def __add__(self, other:'BuildResult') -> 'BuildResult':
# if not isinstance(other, 'BuildResult'):
# raise TypeError('Can only add two build results together')
self.schemas.extend(self._dedupe(self.schemas, other.schemas))
self.classes.extend(self._dedupe(self.classes, other.classes))
# existing_names = [c.name for c in self.classes]
# for newc in other.classes:
# if newc.name in existing_names:
# warnings.warn(f'Not creating duplicate class for {newc.name}')
# continue
# self.classes.append(newc)
# self.classes.extend(other.classes)
self.slots.extend(other.slots)
self.types.extend(other.types)
return self
class Adapter(BaseModel):
@abstractmethod
def build(self) -> 'BuildResult':
"""
Generate the corresponding linkML element for this adapter
"""
def walk(self, input: BaseModel | list | dict):
yield input
if isinstance(input, BaseModel):
for key in input.__fields__.keys():
val = getattr(input, key)
yield (key, val)
if isinstance(val, (BaseModel, dict, list)):
yield from self.walk(val)
elif isinstance(input, dict):
for key, val in input.items():
yield (key, val)
if isinstance(val, (BaseModel, dict, list)):
yield from self.walk(val)
elif isinstance(input, (list, tuple)):
yield input
for val in input:
yield from self.walk(val)
else:
# do nothing, is a string or whatever
pass
def walk_fields(self, input: BaseModel | list | dict, field: str):
for item in self.walk(input):
if isinstance(item, tuple) and item[0] == field and item[1] is not None:
yield item[1]
def walk_types(self, input: BaseModel | list | dict, get_type: Type | List[Type] | Tuple[Type]):
if not isinstance(get_type, (list, tuple)):
get_type = [get_type]
for item in self.walk(input):
if any([type(item) == atype for atype in get_type]):
yield item
#
#
# if isinstance(input, BaseModel):
# for key in input.__fields__.keys():
# val = getattr(input, key)
# if key == field:
# yield val
# if isinstance(val, (BaseModel, dict, list)):
# yield from self.walk(val, field)
#
# elif isinstance(input, dict):
# for key, val in input.items():
# if key == field:
# yield val
# if isinstance(val, (BaseModel, dict, list)):
# yield from self.walk(val, field)
#
# elif isinstance(input, (list, tuple)):
# for val in input:
# yield from self.walk(val, field)
#
# else:
# # do nothing, is a string or whatever
# pass