import inspect
import json
from inspect import _empty # type: ignore
from typing import Union, Collection, Type, Callable, _GenericAlias # type: ignore
import numpy as np
from schema import Optional, Schema
from isimple.core.config import Config, EnforcedStr, HsvColor
from isimple.core.util import nbases, log, all_annotations, all_attributes
[docs]def resolve_type_to_most_specific(t: _GenericAlias) -> _GenericAlias:
"""Resolve Union in a type annotation to its most specific element
* Use case:
todo: extend to Optional
:param t:
:return:
"""
if hasattr(t, '__origin__'):
if t.__origin__ == Union:
# Return the argument with the highest number of bases
# * If there are multiple 'specific options', return the first one (!)
# * Doesn't cover nested Union which seems to be resolved to
# a flat Union at runtime anyway.
candidates = tuple(
[a for a in t.__args__
if nbases(a) == nbases(max(t.__args__, key=nbases))]
)
if len(candidates) == 1:
return candidates[0]
else:
return t.__args__[0]
elif issubclass(t.__origin__, Collection):
# Recurse over arguments
t.__args__ = tuple(
[resolve_type_to_most_specific(a) for a in t.__args__]
)
return t
else:
return t
def _type_to_schema(t, container=None, k=None) -> dict: # todo: how to type t here?
# todo: We're assuming everything is Optional here! This is NOT always the case for return annotations!
if container is None:
container = t
if k is None:
try:
k = t.__qualname__
except AttributeError:
k = 'none'
sk = Optional(k)
print(t)
if isinstance(t, _GenericAlias): # todo: is there a way to check this without _GenericAlias?
# Extract typing info
if hasattr(t, '__origin__'):
if t.__origin__ == tuple:
if t.__args__[1] == Ellipsis:
return {
sk: [Schema(_schemify(t.__args__[0]), name=k, as_reference=True)]
}
else:
raise NotImplementedError(f"Tuple ~ {t.__args__}")
elif t.__origin__ == list:
raise NotImplementedError(f"List ~ {t.__args__}")
elif t.__origin__ == dict:
raise NotImplementedError(f"Dict ~ {t.__args__}")
elif t.__origin__ == Union:
return {
sk: resolve_type_to_most_specific(t), # todo: doesn't handle unresolvable Unions
}
else:
raise NotImplementedError(f"Can't handle {t}")
else:
raise NotImplementedError(t)
elif issubclass(t, Config):
return {
sk: Schema(_schemify(t), name=k, as_reference=True),
}
elif t == HsvColor:
return {
sk: Schema({'h': float, 's': float, 'v': float})
}
elif t == str or t == int or t == float or t == bool:
return {
sk: t,
}
elif t == np.ndarray: # todo: consider using BSON instead!
return {
sk: {
'type': 'string',
'contentEncoding': 'base64',
'contentMediaType': 'application/x-numpy',
},
}
elif issubclass(t, EnforcedStr):
log.warning(f"{container.__qualname__} property '{k}' is an EnforcedStr, "
f"but its available options are not included in the schema.")
return {
sk: str, # todo: integrate options into schema
}
elif t == dict:
log.warning(
f"{container.__qualname__} property '{k}' is a {t.__qualname__}, "
f"consider making it a subclass of isimple.core.config.config instead.")
return {
sk: t
}
elif t == list:
log.warning(
f"{container.__qualname__} property '{k}' is a {t.__qualname__}, "
f"consider making it more specific.")
return {
sk: t,
}
elif t == tuple:
log.warning(
f"{container.__qualname__} property '{k}' is a {t.__qualname__}, "
f"consider making it more specific.")
return {
sk: t,
}
else:
raise NotImplementedError(t)
def _schemify(t: type) -> dict:
try:
if issubclass(t, Config):
schema = {}
annotations = all_annotations(t)
for a in all_attributes(t, include_under=False, include_methods=False):
if hasattr(t, '__annotations__'):
at = annotations[a]
else:
at = type(getattr(t, a))
schema.update(
_type_to_schema(
resolve_type_to_most_specific(at),
t, a
)
)
return schema
else:
return _type_to_schema(t, t, t)
except TypeError:
return _type_to_schema(t, t, t)
[docs]def get_config_schema(config: Type[Config]) -> Schema:
return Schema(_schemify(config))
[docs]def get_method_schema(method: Callable) -> Schema:
schema = {}
for a,p in inspect.signature(method).parameters.items():
t = p.annotation
if t is _empty:
raise TypeError(f"Can not generate schema for method {method.__qualname__}; "
f"the type of argument '{a}' is not annotated.")
else:
schema.update(
_type_to_schema(t, method, a)
)
return Schema(schema)
[docs]def get_return_schema(method: Callable) -> Schema:
schema: dict = {'return': []}
if inspect.signature(method).return_annotation is not None:
for v in _type_to_schema(inspect.signature(method).return_annotation, method, 'return').values():
schema['return'].append(v)
return Schema(schema)
else:
return Schema({})
[docs]def schema(obj) -> dict:
id = obj.__qualname__ + '.json'
try:
if issubclass(obj, Config):
return get_config_schema(obj).json_schema(id)
else:
raise TypeError
except TypeError:
return {
'call': get_method_schema(obj).json_schema(id),
'return': get_return_schema(obj).json_schema(id)
}