Source code for isimple.core.backend

import diskcache
import sys
import abc
import copy
import time
import threading
from contextlib import contextmanager
from typing import Any, Callable, List, Optional

from isimple.core.util import describe_function
from isimple.core.log import get_logger
from isimple.core.config import BackendInstanceConfig, CachingBackendInstanceConfig, BackendManagerConfig, Factory
from isimple.core.common import RootException, SetupError, Manager  # todo: RootException should probably be in a separate file


log = get_logger(__name__)


[docs]class BackendSetupError(SetupError): msg = 'Error while setting up backend'
[docs]class BackendError(RootException): msg = 'Error in backend'
[docs]class CacheAccessError(RootException): msg = 'Trying to access cache out of context'
[docs]class BackendInstance(object): _config: BackendInstanceConfig _default = BackendInstanceConfig() __attributes__: List[str] # todo: interface with isimple.core.meta # define legal values for strings in isimple.core.meta def __init__(self, config: Optional[BackendInstanceConfig]): self._configure(config) super(BackendInstance, self).__init__() log.debug(f'Initialized {self.__class__.__qualname__} with {self._config}') def _configure(self, config: BackendInstanceConfig = None): # todo: adapt to dataclass implementation """Handle a (flat) configuration dict - Look through __default__ dict of all classes in __bases__ - For all of the keys defined in __default__: -> if key not in config, use the default key -> if default value is an EnforcedStr and key is present in config, validate the value -> if default value is a Factory and key is present in config, validate and resolve to the associated class - Keys in config that are not defined in __default__ are skipped :param config: :return: """ if config is not None: # # Gather default config from all bases # default_config: dict = {} # for base_class in self.__class__.__bases__: # if hasattr(base_class, '__default__'): # default_config.update(base_class.__default__) #type: ignore # default_config.update(self.__default__) # # self.__attributes__ = list(default_config.keys()) # # _config = {} # for key, default in default_config.items(): # if key in config: # if isinstance(default, EnforcedStr): # # Pass config[key] through EnforcedStr # _config[key] = default.__class__(config[key]) # else: # _config[key] = config[key] # else: # _config[key] = default # # # Catch Factory instances, even if it's the default # if isinstance(_config[key], Factory): # # Get mapped class # _config[key] = _config[key].get() # type: ignore self._config = copy.deepcopy(config) # Each instance should have a *copy* of the config, not references to the actual values else: self._config = copy.deepcopy(self._default)
__BLOCKED__ = 'BLOCKED'
[docs]class CachingBackendInstance(BackendInstance): # todo: consider a waterfall cache: e.g. 2 GB in-memory, 4GB on-disk, finally the actual video """Interface to diskcache.Cache """ _cache: Optional[diskcache.Cache] _background: Optional[threading.Thread] _background_task: Callable _config: CachingBackendInstanceConfig _default = CachingBackendInstanceConfig() def __init__(self, config: CachingBackendInstanceConfig = None): super(CachingBackendInstance, self).__init__(config) self._cache = None self._background = None def _get_key(self, method, *args) -> str: # Key should be instance-independent to handle multithreading # and caching between application runs. # Hashing the string is a significant performance hit. return f"{describe_function(method)}:{args}" def _to_cache(self, key: str, value: Any): assert self._cache is not None, CacheAccessError self._cache.set(key, value) def _from_cache(self, key: str) -> Optional[Any]: # todo: implement memory/disk cache waterfall, maybe? assert self._cache is not None, CacheAccessError return self._cache.get(key) def _block(self, key: str): assert self._cache is not None, CacheAccessError self._cache.set(key, __BLOCKED__) def _is_blocked(self, key: str) -> bool: assert self._cache is not None, CacheAccessError return key in self._cache \ and isinstance(self._cache[key], str) \ and self._from_cache(key) == __BLOCKED__ def _drop(self, key: str): assert self._cache is not None, CacheAccessError del self._cache[key] def _cached_call(self, method, *args, **kwargs): """Wrapper for a method, handles caching 'at both ends' """ key = self._get_key(method, *args) if self._cache is not None: # Check if the file's already cached if key in self._cache: t0 = time.time() while self._is_blocked(key) and time.time() < t0 + self._config.block_timeout: # Some other thread is currently reading the same frame # Wait a bit and try to get from cache again log.vdebug(f'Cache: wait for {key} to be released...', 5) time.sleep(0.01) # todo: DiskCache-level events? value = self._from_cache(key) if isinstance(value, str) and value == __BLOCKED__: log.warning(f'Timed out waiting for {key}.') return None else: log.vdebug(f"Cache: read {key}.") return value if not self._config.cache_consumer: # Cache a temporary string to 'block' the key log.vdebug(f"Cache: block {key}.") self._block(key) log.vdebug(f"Execute {key}.") value = method(*args, **kwargs) if value is not None: log.vdebug(f"Cache: write {key}.") self._to_cache(key, value) return value else: log.vdebug(f"Cache: drop {key}.") self._drop(key) return None else: return None else: log.vdebug(f"Execute {key}.") return method(*args, **kwargs) def __enter__(self): if self._config.do_cache: log.debug(f'{self.__class__.__qualname__}: opening cache.') self._cache = diskcache.Cache( directory=self._config.cache_dir, size_limit=self._config.cache_size_limit, ) if self._config.do_background: log.debug(f'{self.__class__.__qualname__}: starting background thread.') pass # todo: can start caching frames in background thread here return self def __exit__(self, exc_type, exc_value, tb): if self._config.do_cache: if self._cache is not None: log.debug(f'{self.__class__.__qualname__}: closing cache.') self._cache.close() self._cache = None if self._background is not None and self._background.is_alive(): log.debug(f'{self.__class__.__qualname__}: stopping background thread.') pass # todo: can stop background thread here (gracefully) # ...also: self._background.is_alive() doesn't recognize self... if exc_type is not None: return False else: return True
[docs] @contextmanager def caching(self): try: self.__enter__() yield self finally: self.__exit__(*sys.exc_info())
[docs]class Handler(object): # todo: implementations of CachingBackendInstance in `_implementation` will not be found by `_gather_instances` """ """ _implementation: object _implementation_factory = Factory _implementation_class = object # actually, it's type, but that doesn't fly with MyPy for some reason
[docs] def set_implementation(self, implementation: str) -> str: impl_type: type = self._implementation_factory(implementation).get() assert issubclass(impl_type, self._implementation_class) self._implementation = impl_type() return self._implementation_factory.get_str( self._implementation.__class__ )
[docs] def get_implementation(self) -> str: return self._implementation.__class__.__qualname__
[docs]class BackendManager(BackendInstance, Manager): # todo: naming :( _instances: List[BackendInstance] _instance_class = BackendInstance def __init__(self, config: BackendManagerConfig = None): super(BackendManager, self).__init__(config) @abc.abstractmethod def _can_launch(self): raise NotImplementedError @abc.abstractmethod def _launch(self): raise NotImplementedError
[docs] def launch(self): if self._can_launch(): self._launch() self._gather_instances() else: raise BackendSetupError(f"{self.__class__.__qualname__} can not be launched.") # todo: try to be more verbose
[docs] @contextmanager def caching(self): """Caching contest on VideoAnalysis: propagate context to every contained BackendElement that implements caching """ caching_instances = [ e for e in self._instances if isinstance(e, CachingBackendInstance) ] log.debug(f'{self.__class__.__qualname__}: propagate caching context ' f'to {[i.__class__.__qualname__ for i in caching_instances]}') try: for element in caching_instances: element.__enter__() yield self finally: for element in caching_instances: element.__exit__(*sys.exc_info())
[docs] def save(self): raise NotImplementedError