import abc
import threading
from typing import Callable, Dict, List, Tuple, Type, _GenericAlias # type: ignore
import collections
from contextlib import contextmanager
import uuid
from isimple import get_logger
from isimple.util.meta import all_attributes, get_overridden_methods
log = get_logger(__name__)
[docs]class RootException(Exception):
msg = ''
def __init__(self, *args):
#https://stackoverflow.com/questions/49224770/
# if no arguments are passed set the first positional argument
# to be the default message. To do that, we have to replace the
# 'args' tuple with another one, that will only contain the message.
# (we cannot do an assignment since tuples are immutable)
if not (args):
args = (self.msg,)
super(Exception, self).__init__(*args)
[docs]class SetupError(RootException):
pass
[docs]class EnforcedStr(str): # todo: should derive from enum.StrEnum instead
_options: List[str] = ['']
_descriptions: Dict[str, str] = {}
_str: str
def __init__(self, string: str = None):
super().__init__()
if string is not None:
if string not in self.options:
if string:
log.warning(f"Illegal {self.__class__.__name__} '{string}', "
f"should be one of {self.options}. "
f"Defaulting to '{self.default}'.")
self._str = str(self.default)
else:
self._str = str(string)
else:
self._str = str(self.default)
def __repr__(self):
return f"<{self.__class__.__name__} '{self._str}'>"
def __str__(self):
return str(self._str) # Make SURE it's a string :(
def __eq__(self, other):
if hasattr(other, '_str'):
return self._str == other._str
elif isinstance(other, str):
return self._str == other
else:
return False
@property
def options(self):
return self._options
@property
def descriptions(self):
return self._descriptions
@property
def describe(self):
return self.descriptions[self._str]
@property
def default(self):
return self._options[0]
def __hash__(self): # todo: why?
return hash(str(self))
class _Streaming(EnforcedStr):
_options = ['off', 'image', 'json']
stream_off = _Streaming('off')
stream_image = _Streaming('image')
stream_json = _Streaming('json')
[docs]class Endpoint(object):
_name: str
_registered: bool
_signature: Type[Callable]
_streaming: _Streaming
def __init__(self, signature: _GenericAlias, streaming: _Streaming = stream_off): # todo: type Callable[] correctly
assert signature.__origin__ == collections.abc.Callable
self._registered = False
if not hasattr(signature, '__args__'):
raise SetupError('Cannot define an Endpoint without a signature!')
self._signature = signature
self._streaming = streaming
[docs] def compatible(self, method: Callable) -> bool:
if hasattr(method, '__annotations__'):
args: List = []
for arg in self.signature:
if arg == type(None):
arg = None
args.append(arg)
# Don't be too pedantic unannotated None-type return
return tuple(method.__annotations__.values()) == tuple(args)
else:
return False
@property
def signature(self):
return self._signature.__args__
@property
def streaming(self):
return self._streaming
@property
def registered(self):
return self._registered
@property
def name(self):
return self._name
[docs] def add(self, method):
if not self.compatible(method):
log.warning(f"Method '{method.__qualname__}' "
f"is incompatible with endpoint '{self.name}'. \n"
f"{method.__annotations__} vs. {self.signature}")
[docs] def register(self, name: str):
self._registered = True
self._name = name
[docs]class EndpointRegistry(object):
_entries: List
def __init__(self):
if not hasattr(self, '_entries'):
_entries = []
for attr, val in self.__class__.__dict__.items():
if isinstance(val, Endpoint):
val.register(attr)
_entries.append(val)
self._entries = _entries
def _add_entry(self, entry: Endpoint):
self._entries.append(entry)
[docs]class InstanceRegistry(EndpointRegistry):
"""This one is global, collects callables that expose endpoints
"""
_entries: List[Endpoint]
_callable_mapping: Dict[Endpoint, Callable]
def __init__(self):
super(InstanceRegistry, self).__init__()
self._callable_mapping = {}
[docs] def expose(self, endpoint: Endpoint):
def wrapper(method):
if endpoint in self._callable_mapping:
log.debug( # todo: add traceback
f"Exposing '{method.__qualname__}' at endpoint '{endpoint.name}' will override "
f"previously exposed method '{self._callable_mapping[endpoint].__qualname__}'."
) # todo: keep in mind we're also marking the methods themselves
try:
self._entries.append(endpoint)
endpoint.add(method)
try:
method._endpoint = endpoint
except AttributeError:
method.__func__._endpoint = endpoint
self._callable_mapping.update({endpoint: method})
except TypeError:
raise TypeError(
f"Cannot expose '{method.__qualname__}' at endpoint '{endpoint.name}'."
f"incompatible signature: {method.__annotations__} vs. {endpoint.signature}"
)
return method
return wrapper
[docs] def exposes(self, endpoint: Endpoint):
return endpoint in self._callable_mapping
@property
def endpoints(self) -> List[Endpoint]:
return list(self._callable_mapping.keys())
[docs]class ImmutableRegistry(EndpointRegistry):
_entries: Tuple[Endpoint, ...] #type: ignore
_endpoints: InstanceRegistry
def __init__(self, endpoints: InstanceRegistry = None):
_entries = []
for attr, val in self.__class__.__dict__.items():
if isinstance(val, Endpoint):
val.register(attr)
_entries.append(val)
if endpoints is not None:
self._endpoints = endpoints
else:
self._endpoints = InstanceRegistry()
self._entries = tuple(_entries)
super(ImmutableRegistry, self).__init__()
def _add_entry(self, entry: Endpoint):
raise NotImplementedError
[docs] def expose(self, endpoint: Endpoint):
return self._endpoints.expose(endpoint)
[docs] def exposes(self, endpoint: Endpoint):
return self._endpoints.exposes(endpoint)
@property
def endpoints(self) -> List[Endpoint]:
return self._endpoints.endpoints
[docs]class Described(object):
@classmethod
def _description(cls):
if cls.__doc__ is not None:
return cls.__doc__.split('\n')[0]
else:
return cls.__name__
[docs]class Lockable(abc.ABC):
_lock: threading.Lock
_cancel: threading.Event
_error: threading.Event
def __init__(self):
self._lock = threading.Lock()
self._cancel = threading.Event()
self._error = threading.Event()
[docs] @contextmanager
def lock(self):
log.vdebug(f"Acquiring lock {self}...")
lock = self._lock.acquire()
log.vdebug(f"Acquired lock {self}")
try:
log.vdebug(f"Locking {self}")
yield lock
finally:
log.vdebug(f"Unlocking {self}")
self._lock.release()
[docs] def cancel(self):
self._cancel.set()
[docs] def error(self):
self._error.set()
@property
def canceled(self) -> bool:
return self._cancel.is_set()
@property
def errored(self) -> bool:
return self._error.is_set()
[docs] def clear_cancel(self):
self._cancel.clear()
[docs] def clear_error(self):
self._error.clear()
[docs]class RootInstance(Lockable):
_id: str
_endpoints: ImmutableRegistry
_instances: List
_instance_class: type
_instance_mapping: Dict[Endpoint, List[Callable]]
def __init__(self):
super().__init__()
self.get_id()
[docs] def get_id(self):
self._id = str(uuid.uuid1())
def _set_id(self, id: str):
self._id = id
@property
def id(self):
return self._id
@property
def instance_mapping(self):
return self._instance_mapping
def _gather_instances(self): # todo: needs major clean-up
log.debug(f'{self.__class__.__name__}: gather nested instances')
self._instance_mapping = {}
instances = []
attributes = [attr for attr in self.__dir__()] # todo: all_attributes fails here because that's ~ class!
for attr in sorted(attributes):
value = getattr(self, attr)
if isinstance(value, self._instance_class) and not isinstance(value, list):
instances.append(value)
elif isinstance(value, list) and all(isinstance(v, self._instance_class) for v in value):
instances += list(value)
elif isinstance(value, dict) and all(isinstance(v, self._instance_class) for v in value.values()):
instances += [v for v in value.values()]
for instance in [self] + instances:
self._add_instance(instance)
self._instances = instances
def _add_instance(self, instance: object):
if isinstance(instance, self._instance_class):
for attr in [attr for attr in all_attributes(instance)]:
value = getattr(instance, attr) # bound method
if hasattr(value, '__func__') and not isinstance(getattr(instance.__class__, attr), property):
endpoint = None
implementations = get_overridden_methods(instance.__class__, getattr(instance.__class__, attr))
# Returns an empty list for wrapped methods -> workaround
if len(implementations) == 0:
endpoint = value._endpoint
# todo: there will probably be some bugs with inheritance & wrapping
for implementation in implementations: # unbound methods
try:
endpoint = implementation._endpoint # todo: won't catch endpoints defined at multiple places in the methods inheritance tree
except AttributeError:
pass
if endpoint is not None:
if endpoint not in self._instance_mapping:
self._instance_mapping[endpoint] = [value]
else:
if value not in self._instance_mapping[endpoint]:
self._instance_mapping[endpoint].append(value)
else:
pass
[docs] def get(self, endpoint: Endpoint, index: int = None) -> Callable:
if endpoint not in self._endpoints._entries:
raise SetupError(f"'{endpoint}' is not defined in '{self._endpoints}'.")
elif endpoint not in self._instance_mapping:
raise SetupError(f"'{self.__class__.__name__}' does not map "
f"'{endpoint.name}' to a bound method.")
else:
log.vdebug(f"{self.__class__.__name__}: get callback for "
f"endpoint '{endpoint.name}' with index {index}")
methods = self._instance_mapping[endpoint]
if index is None:
index = 0
if index+1 < len(methods):
log.vdebug(f"No index specified for endpoint '{endpoint.name}' "
f"-- defaulting to entry 0 ({len(methods)} in total)") # todo: traceback
elif len(methods) == 1:
index = 0 # Ignore the index if only one method is mapped
return self._instance_mapping[endpoint][index]