Source code for isimple.core.backend

from enum import IntEnum, Enum

import diskcache
import sys
import abc
import time
import threading
from contextlib import contextmanager
from typing import Any, Callable, List, Optional, Union, Tuple, Dict, Type

import numpy as np
import pandas as pd

from pydantic import Field, FilePath, DirectoryPath

from isimple import settings, get_logger, get_cache
from isimple.endpoints import BackendRegistry

from isimple.core import RootException, SetupError, RootInstance, Described
from isimple.maths.colors import Color, HsvColor, convert, as_hsv
from isimple.util.meta import describe_function
from isimple.util import Timer, Timing, hash_file, timed
from isimple.core.db import BaseAnalysisModel
from isimple.core.config import Factory, untag, BaseConfig, Instance, Configurable
from isimple.core.streaming import stream, streams, EventStreamer

from isimple.core.interface import InterfaceFactory


log = get_logger(__name__)
backend = BackendRegistry()


[docs]class BackendSetupError(SetupError): msg = 'Error while setting up backend'
[docs]class BackendError(RootException): msg = 'Error in backend'
[docs]class CacheAccessError(RootException): msg = 'Trying to access cache out of context'
_BLOCKED = 'BLOCKED'
[docs]class CachingInstance(Instance): # todo: consider a waterfall cache: e.g. 2 GB in-memory, 4GB on-disk, finally the actual video """Interface to diskcache.Cache """ _cache: Optional[diskcache.Cache] _is_caching: bool _background: threading.Thread _cancel_caching: threading.Event def __init__(self, config: BaseConfig = None): super(CachingInstance, self).__init__(config) self._cache = None self._cancel_caching = threading.Event()
[docs] @backend.expose(backend.is_caching) def is_caching(self) -> bool: return self._is_caching
[docs] @backend.expose(backend.cancel_caching) def cancel_caching(self) -> None: if self._cancel_caching is not None: self._cancel_caching.set()
def _get_key(self, method, *args) -> str: # Key should be instance-independent to handle multithreading # and caching between application runs. # Hashing the string is a significant performance hit. return f"{describe_function(method)}{args}" def _to_cache(self, key: str, value: Any): assert self._cache is not None, CacheAccessError self._cache.set(key, value) def _from_cache(self, key: str) -> Optional[Any]: assert self._cache is not None, CacheAccessError return self._cache.get(key) def _block(self, key: str): assert self._cache is not None, CacheAccessError self._cache.set(key, _BLOCKED) def _is_blocked(self, key: str) -> bool: assert self._cache is not None, CacheAccessError return key in self._cache \ and isinstance(self._cache[key], str) \ and self._from_cache(key) == _BLOCKED def _touch_keys(self, keys: List[str]): if self._cache is not None: for key in keys: if key in self._cache: self._cache.touch(key) else: with self.caching(): self._touch_keys(keys) def _drop(self, key: str): assert self._cache is not None, CacheAccessError del self._cache[key] def _is_cached(self, method, *args): return self._get_key(method, *args) in self._cache def _cached_call(self, method, *args, **kwargs): # todo: kwargs necessary? """Wrapper for a method, handles caching 'at both ends' """ key = self._get_key(method, *args) if self._cache is not None: # Check if the file's already cached if key in self._cache: t0 = time.time() while self._is_blocked(key) and time.time() < t0 + settings.cache.block_timeout: # Some other thread is currently reading the same frame # Wait a bit and try to get from cache again log.debug(f'{self.__class__}: waiting for {key} to be released...', 5) time.sleep(0.01) value = self._from_cache(key) if isinstance(value, str) and value == _BLOCKED: log.warning(f'{self.__class__}: timed out waiting for {key}.') else: log.debug(f"{self.__class__}: read cached {key}.") return value # Cache a temporary string to 'block' the key log.debug(f"{self.__class__}: caching {key}") log.vdebug(f"{self.__class__}: block {key}.") self._block(key) log.vdebug(f"{self.__class__}: execute {key}.") value = method(*args, **kwargs) log.vdebug(f"{self.__class__}: write {key}.") self._to_cache(key, value) return value log.vdebug(f"Execute {key}.") return method(*args, **kwargs) def __enter__(self, override: bool = False): if settings.cache.do_cache or override: if self._cache is None: log.debug(f'{self.__class__.__qualname__}: opening cache @ {settings.cache.dir}') self._cache = get_cache(settings) return self def __exit__(self, exc_type, exc_value, tb): if self._cache is not None: log.debug(f'{self.__class__.__qualname__}: closing cache @ {settings.cache.dir}') self._cache.close() self._cache = None if exc_type != None: # `is not` doesn't work here raise(exc_type, exc_value, tb) else: return True
[docs] @contextmanager def caching(self, override: bool = False): try: self.__enter__(override) yield self finally: self.__exit__(*sys.exc_info())
[docs]class Handler(abc.ABC): _implementation: Configurable _implementation_factory: Type[InterfaceFactory] _implementation_class: Type[Configurable]
[docs] def set_implementation(self, implementation: str) -> str: impl_type: type = self._implementation_factory(implementation).get() assert issubclass(impl_type, self._implementation_class) self._implementation = impl_type() return self._implementation_factory.get_str( # todo: this is not necessary when using @extend(<Factory>) self._implementation.__class__ )
[docs] def get_implementation(self) -> str: return self._implementation.__class__.__qualname__
[docs] def implementation_config(self) -> BaseConfig: pass
[docs]class FeatureConfig(BaseConfig, abc.ABC): pass
[docs]class Feature(abc.ABC, Configurable): # todo: should probably use Config for parameters after all :) """A feature implements interactions between BackendElements to produce a certain value """ _color: Optional[Color] _state: Optional[np.ndarray] _label: str = '' # todo: keep these in the config instead? _unit: str = '' _description: str = '' _elements: Tuple[Instance, ...] = () _config: Optional[FeatureConfig] _global_config: FeatureConfig _config_class: Type[FeatureConfig] = FeatureConfig def __init__(self, elements: Tuple[Instance, ...], global_config: FeatureConfig, config: Optional[dict] = None): self._skip = False self._ready = False self._elements = elements self._global_config = global_config if config is not None: self._config = global_config.__class__(**config) else: self._config = None self._color = HsvColor(h=0,s=200,v=255) # start out as red
[docs] def calculate(self, frame: np.ndarray, state: np.ndarray = None) \ -> Tuple[Any, Optional[np.ndarray]]: """Calculate Feature for given frame and update state image (optional) """ if state is not None: state = self.state(frame, state) return self.value(frame), state
[docs] @classmethod def label(cls) -> str: return cls._label
[docs] @classmethod def unit(cls) -> str: return cls._unit
@property def skip(self) -> bool: raise NotImplementedError @property def ready(self) -> bool: raise NotImplementedError
[docs] def set_color(self, color: Color): self._color = color
@property def color(self) -> Color: """Color of the Feature in figures. A Feature's color must be set as not to overlap with other Features in the same FeatureSet. Therefore, <Feature>._color must be determined by FeatureSet! """ if self._color is not None: return self._color else: raise ValueError @abc.abstractmethod def _guideline_color(self) -> Color: """Returns the 'guideline color' of a Feature instance Used by FeatureSet to determine the actual _color """ raise NotImplementedError
[docs] @abc.abstractmethod # todo: we're dealing with frames explicitly, so maybe this should be an isimple.video thing... def state(self, frame: np.ndarray, state: np.ndarray) -> np.ndarray: """Return the Feature instance's state image for a given frame """ raise NotImplementedError
[docs] @abc.abstractmethod def value(self, frame: np.ndarray) -> Any: """Compute the value of the Feature instance for a given frame """ raise NotImplementedError
[docs] @classmethod def description(cls) -> str: return cls._description
@property def config(self): if self._config is not None: return self._config else: return self._global_config
[docs]class FeatureSet(object): _feature: Tuple[Feature, ...] _colors: Tuple[Color, ...] def __init__(self, features: Tuple[Feature, ...]): self._features = features
[docs] def resolve_colors(self) -> Tuple[Color, ...]: guideline_colors = [as_hsv(f._guideline_color()) for f in self._features] min_v = 20.0 max_v = 255.0 tolerance = 15 bins: list = [] # todo: clean up binning for index, color in enumerate(guideline_colors): if not bins: bins.append([index]) else: in_bin = False for bin in bins: if abs(float(color.h) - np.mean([guideline_colors[i].h for i in bin])) < tolerance: bin.append(index) in_bin = True break if not in_bin: bins.append([index]) for bin in bins: if len(bin) < 4: increment = 60.0 else: increment = (max_v - min_v) / len(bin) for repetition, index in enumerate(bin): self._features[index].set_color( HsvColor( h=guideline_colors[index].h, s=220, v=int(max_v - repetition * increment) ) ) self._colors = tuple([feature.color for feature in self._features]) return self.colors
@property def colors(self) -> Tuple[Color, ...]: return self._colors @property def features(self) -> Tuple[Feature, ...]: return self._features
[docs]class FeatureType(Factory): # todo: nest in Feature? _type = Feature _mapping: Dict[str, Type[Described]] = {}
[docs] def get(self) -> Type[Feature]: feature = super().get() if issubclass(feature, Feature): return feature else: raise TypeError( f"'{self.__class__.__name__}' tried to return an unexpected type '{feature}'. " f"This is very weird and shouldn't happen, really." )
[docs] def config_schema(self) -> dict: return self.get().config_schema()
[docs]class BaseAnalyzerConfig(BaseConfig): video_path: Optional[str] = Field(default=None) design_path: Optional[str] = Field(default=None) name: Optional[str] = Field(default=None) description: Optional[str] = Field(default=None)
[docs]class AnalyzerEvent(Enum): # todo: make a 'toast' event -> frontend shows a message (e.g. warnings) STATUS = 'status' CONFIG = 'config' RESULT = 'result' RMETAD = 'result_metadata'
[docs]class AnalyzerState(IntEnum): UNKNOWN = 0 INCOMPLETE = 1 CAN_LAUNCH = 2 LAUNCHED = 3 CAN_FILTER = 4 CAN_ANALYZE = 5 ANALYZING = 6 DONE = 7 CANCELED = 8 ERROR = 9
[docs] @classmethod def can_launch(cls, state: int) -> bool: return state in [ cls.CAN_LAUNCH, cls.LAUNCHED, cls.CAN_ANALYZE, cls.DONE, cls.CANCELED, ]
[docs] @classmethod def is_launched(cls, state: int) -> bool: return state in [ cls.LAUNCHED, cls.CAN_FILTER, cls.CAN_ANALYZE, cls.DONE, cls.ANALYZING, cls.CANCELED, ]
[docs]class BaseVideoAnalyzer(Instance, RootInstance): _endpoints: BackendRegistry = backend _instances: List[Instance] _instance_class = Instance _config: BaseAnalyzerConfig _state: int _busy: bool _progress: float _cancel: threading.Event _error: threading.Event results: Dict[str, pd.DataFrame] _timer: Timer _video_hash: Optional[str] _design_hash: Optional[str] _model: Optional[BaseAnalysisModel] _eventstreamer: Optional[EventStreamer] def __init__(self, config: BaseAnalyzerConfig = None, eventstreamer: EventStreamer = None): self.set_eventstreamer(eventstreamer) super().__init__(config) self._timer = Timer(self) self._launched = False self._hash_video = None self._hash_design = None self._state: AnalyzerState = AnalyzerState.INCOMPLETE self._busy = False self._progress = 0.0 self._model = None
[docs] def set_model(self, model: BaseAnalysisModel): self._model = model if self.config.name is None: # todo: move to AnalysisModel instead self.config(name=self.model.get_name())
@property def model(self): return self._model @property def eventstreamer(self): return self._eventstreamer
[docs] def set_eventstreamer(self, eventstreamer: EventStreamer = None): self._eventstreamer = eventstreamer
[docs] def event(self, category: AnalyzerEvent, data: dict): """Push an event :param category: event category :param data: event data :return: """ if self.eventstreamer is not None: self.eventstreamer.event(category.value, self.id, data)
[docs] @backend.expose(backend.commit) def commit(self) -> bool: """Save video analysis configuration to history database """ if self._model is not None: log.debug(f"committing {self.id}") self._model.store() return True else: return False
[docs] @abc.abstractmethod @backend.expose(backend.can_launch) def can_launch(self) -> bool: raise NotImplementedError
[docs] @abc.abstractmethod def can_filter(self) -> bool: raise NotImplementedError
[docs] @abc.abstractmethod @backend.expose(backend.can_analyze) def can_analyze(self) -> bool: raise NotImplementedError
@property def launched(self): return AnalyzerState.is_launched(self.state)
[docs] def set_state(self, state: int, push: bool = True): self._state = state if push: self.push_status()
@property def state(self) -> AnalyzerState: assert isinstance(self._state, AnalyzerState) # todo: fix int / AnalyzerState typing return self._state @property def done(self) -> bool: return self.state == AnalyzerState.DONE
[docs] @backend.expose(backend.state_transition) def state_transition(self, push: bool = True) -> int: """Handle state transitions """ if self.state == AnalyzerState.INCOMPLETE and self.can_launch(): self.set_state(AnalyzerState.CAN_LAUNCH, push) elif self.state == AnalyzerState.LAUNCHED or self.state == AnalyzerState.CAN_FILTER: if self.can_analyze(): self.set_state(AnalyzerState.CAN_ANALYZE, push) elif self.can_filter(): self.set_state(AnalyzerState.CAN_FILTER, push) else: self.set_state(AnalyzerState.LAUNCHED, push) elif self.state == AnalyzerState.DONE or self.state == AnalyzerState.CANCELED: self.set_progress(0.0, push=False) if self.can_analyze(): self.set_state(AnalyzerState.CAN_ANALYZE, push) elif self.can_filter(): self.set_state(AnalyzerState.CAN_FILTER, push) elif self.launched: self.set_state(AnalyzerState.LAUNCHED, push) elif self.can_launch(): self.set_state(AnalyzerState.CAN_LAUNCH, push) return int(self.state)
[docs] def set_busy(self, busy: bool, push: bool = True): self._busy = busy if push: self.push_status()
@property def busy(self) -> bool: return self._busy
[docs] @contextmanager def busy_context(self, busy_state: AnalyzerState = None, done_state: AnalyzerState = None): if done_state is None: done_state = self.state try: if busy_state is not None: self.set_state(busy_state) self.set_busy(True) yield finally: self.set_busy(False) self.set_state(done_state)
[docs] @backend.expose(backend.cancel) def cancel(self) -> None: super().cancel() self.set_state(AnalyzerState.CANCELED)
[docs] def error(self) -> None: super().error() self.set_state(AnalyzerState.ERROR)
[docs] def set_progress(self, progress: float, push: bool = True): self._progress = progress if push: self.push_status()
@property def progress(self) -> float: return self._progress @abc.abstractmethod def _launch(self): raise NotImplementedError @property def config(self) -> BaseAnalyzerConfig: return self._config @abc.abstractmethod def _new_results(self): raise NotImplementedError
[docs] @abc.abstractmethod @backend.expose(backend.analyze) def analyze(self) -> bool: raise NotImplementedError
@property @abc.abstractmethod def position(self) -> float: raise NotImplementedError @property @abc.abstractmethod def cached(self) -> bool: raise NotImplementedError @property @abc.abstractmethod def has_results(self) -> bool: raise NotImplementedError
[docs] @backend.expose(backend.status) def status(self) -> dict: status = { 'state': self.state, 'busy': self.busy, 'cached': self.cached, 'results': self.has_results, 'position': self.position, 'progress': self.progress, } return status
[docs] def push_status(self): self.event(AnalyzerEvent.STATUS, self.status())
[docs] @backend.expose(backend.get_config) def get_config(self, do_tag=False) -> dict: self._gather_config() config = self.config.to_dict(do_tag) return config
[docs] @abc.abstractmethod @backend.expose(backend.set_config) def set_config(self, config: dict, silent: bool = False) -> dict: raise NotImplementedError
@abc.abstractmethod def _gather_config(self): raise NotImplementedError
[docs] @backend.expose(backend.launch) def launch(self) -> bool: with self.lock(): if self.can_launch(): self._launch() self._gather_instances() # Commit to history self.commit() # Push events self.set_state(AnalyzerState.LAUNCHED) self.event(AnalyzerEvent.CONFIG, self.get_config()) # State transition (may change from LAUNCHED ~ config) self.state_transition() return self.launched else: log.warning(f"{self.__class__.__qualname__} can not be launched.") # todo: try to be more verbose return False
[docs] @contextmanager def caching(self): """Caching context: propagated context to every object in _instances that implements caching """ caching_instances = [ e for e in self._instances if isinstance(e, CachingInstance) ] log.debug(f'{self.__class__.__name__}: propagate caching context ' f'to {[i.__class__.__name__ for i in caching_instances]}') try: for element in caching_instances: element.__enter__() yield self finally: for element in caching_instances: element.__exit__(*sys.exc_info())
[docs] def cache_open(self): caching_instances = [ e for e in self._instances if isinstance(e, CachingInstance) ] log.debug(f'{self.__class__.__name__}: propagate caching context ' f'to {[i.__class__.__name__ for i in caching_instances]}') for element in caching_instances: element.__enter__()
[docs] def cache_close(self): caching_instances = [ e for e in self._instances if isinstance(e, CachingInstance) ] log.debug(f'close cache') for element in caching_instances: element.__exit__(*sys.exc_info())
[docs] @contextmanager def time(self, message: str = '', logger = log): try: self._timer.set_logger(logger) self._timer.__enter__(message) yield self finally: self._timer.__exit__()
@property def timing(self) -> Optional[Timing]: if self._timer.timing is not None: return Timing(*self._timer.timing) else: return None
[docs] def export(self): raise NotImplementedError
@property def description(self): return self._description
[docs]class AnalyzerType(Factory): _type = BaseVideoAnalyzer _mapping: Dict[str, Type[Described]] = {}
[docs] def get(self) -> Type[BaseVideoAnalyzer]: t = super().get() assert issubclass(t, self._type) return t
[docs] def config_schema(self) -> dict: return self.get().config_class()().schema()