Source code for isimple.core.config

import os
import re
import yaml
from yaml.representer import SafeRepresenter
import json
from ast import literal_eval as make_tuple
import numpy as np
from typing import List, Optional, Tuple, Union, NamedTuple, Type
from dataclasses import dataclass
from collections.abc import Iterable
from collections import namedtuple
import abc
import datetime

from isimple.core.util import before_version, after_version
from isimple.core.log import get_logger


log = get_logger(__name__)

__version__: str = '0.2.2'

# Extension
__meta_ext__ = '.meta'

# Excel sheet name
__meta_sheet__ = 'metadata'


HsvColor = namedtuple('HsvColor', ('h', 's', 'v'))


[docs]class EnforcedStr(object): _options: List[str] = [''] _str: str def __init__(self, string: str = None): if string is not None: if string not in self.options: log.debug(f"Illegal {self.__class__.__name__} '{string}', " f"should be one of {self.options}. " f"Defaulting to '{self.default}'.") self._str = self.default else: self._str = str(string) else: self._str = self.default def __repr__(self): return f"<{self.__class__.__name__} '{self._str}'>" def __str__(self): return self._str def __eq__(self, other): if hasattr(other, '_str'): return self._str == other._str elif isinstance(other, str): return self._str == other else: return False @property def options(self): return self._options @property def default(self): return self._options[0] def __hash__(self): return hash(str(self))
[docs]class Factory(EnforcedStr): _mapping: dict = {'': None} _default: Optional[str] = None
[docs] def get(self) -> type: if self._str in self._mapping: return self._mapping[self._str] else: raise ValueError(f"Factory {self.__class__.__name__} doesn't map" f"{self._str} to a class.")
[docs] @classmethod def get_str(cls, mapped_value): str = cls.default for k,v in cls._mapping.items(): if mapped_value == v: str = k return str
@property def options(self): return list(self._mapping.keys()) @property def default(self): if self._default is not None: return self._default else: if len(self._mapping): return list(self._mapping.keys())[0] else: return None
[docs] @classmethod # todo: what about some kind of @extend(<FactoryClass>) decorator instead? def extend(cls, mapping: dict): # todo: sanity check this log.debug(f"Extending Factory '{cls.__name__}' with {mapping}") cls._mapping.update(mapping)
[docs]class ColorSpace(EnforcedStr): _options = ['hsv', 'bgr', 'rgb']
[docs]class FrameIntervalSetting(EnforcedStr): _options = ['dt', 'Nf']
[docs]class TransformType(Factory): _mapping: dict = {}
[docs]class FilterType(Factory): _mapping: dict = {}
[docs]class VideoFeatureType(Factory): _mapping: dict = {}
[docs]class BackendType(Factory): _mapping: dict = {}
[docs]@dataclass class Config(abc.ABC): """Abstract class for configuration data. * Default values for Config or Factory subclasses should be provided as None and '' respectively; in this way they should be caught by `self.resolve` and resolved at runtime. This is important to resolve to the latest version of the Factory, as it may have been extended. """ def __init__(self, **kwargs): """Initialize instance and call post-initialization method """ for kw, arg in kwargs.items(): if hasattr(self, kw): setattr(self, kw, arg) self.__post_init__() def __post_init__(self): """Resolve attribute values here """ pass
[docs] @staticmethod def resolve(val, type, iter=False): """Resolve the value of an attribute to match a specific type :param val: current value :param type: type to resolve to :param iter: if True, interpret `val` as an iterable and resolve all elements of `val` to `type` :return: the resolved value for `val`; this should be written to the original attribute, i.e. `self.attr = resolve(self.attr, type)` """ def _resolve(val, type): if isinstance(val, str): if issubclass(type, EnforcedStr): val = type(val) elif issubclass(type, tuple): val = Config.__str2namedtuple__(val, type) elif type == np.ndarray: val = Config.__str2ndarray__(val) if isinstance(val, list): if type == np.ndarray: val = np.array(val) else: val = type(val) elif isinstance(val, dict) and issubclass(type, Config): val = type(**val) return val if not isinstance(val, type): if iter and isinstance(val, Iterable): # Resolve every elemen,t in `val` val = [_resolve(v, type) for v in val] else: # Resolve `val` val = _resolve(val, type) return val
[docs] def to_dict(self) -> dict: """Return this instances value as a serializable dict. """ output: dict = {} def _represent(obj) -> Union[dict, str]: """Represent an object in a YAML-serializable way :param obj: object :return: """ if isinstance(obj, Config): # Recurse return obj.to_dict() if isinstance(obj, EnforcedStr): # Return str value try: return str(obj) except TypeError: return '' if isinstance(obj, tuple): # Convert to str & bypass YAML tuple representation return str(obj) if isinstance(obj, np.ndarray): # Convert to str & bypass YAML list representation return Config.__ndarray2json__(obj) else: # Assume that `obj` is serializable return obj for attr, val in self.__dict__.items(): if val is not None: if (isinstance(val, list) or isinstance(val, tuple)) \ and not (attr in ['c0', 'c1', 'radius']): # Filter out color attributes output[attr] = [] for v in val: output[attr].append(_represent(v)) else: output[attr] = _represent(val) return output
@staticmethod def __ndarray2json__(array: np.ndarray) -> str: return str(json.dumps(array.tolist())) @staticmethod def __str2ndarray__(string: str) -> np.ndarray: return np.array(json.loads(str(string))) @staticmethod def __str2namedtuple__(t: str, type: Type[tuple]) -> tuple: return type( **{k:float(v.strip("'")) for k,v,_ in re.findall('([A-Za-z0-9]*)=(.*?)(,|\))', t)} #type: ignore ) # todo: we're assuming tuples of floats here, will break for cases that are not colors!
[docs]class BackendInstanceConfig(Config): pass
[docs]@dataclass class CachingBackendInstanceConfig(BackendInstanceConfig): do_cache: bool = True do_background: bool = False cache_dir: str = '.cache' cache_size_limit: int = 2**32 block_timeout: float = 1 cache_consumer: bool = False
[docs]@dataclass class VideoFileHandlerConfig(CachingBackendInstanceConfig): do_resolve_frame_number: bool = True
[docs]@dataclass class TransformHandlerConfig(BackendInstanceConfig): type: Union[TransformType,str] = '' matrix: Union[np.ndarray,str] = np.eye(3) coordinates: Union[list, str] = '' def __post_init__(self): self.type = self.resolve(self.type, TransformType) self.matrix = self.resolve(self.matrix, np.ndarray) if len(self.coordinates) > 0: self.coordinates = self.resolve(self.coordinates, np.ndarray).tolist()
[docs]class FilterConfig(Config): pass
[docs]@dataclass class HsvRangeFilterConfig(FilterConfig): radius: Union[HsvColor, str] = HsvColor(10, 75, 75) c0: Union[HsvColor, str] = HsvColor(0, 0, 0) c1: Union[HsvColor, str] = HsvColor(0, 0, 0) def __post_init__(self): self.radius = self.resolve(self.radius, HsvColor) self.c0 = self.resolve(self.c0, HsvColor) self.c1 = self.resolve(self.c1, HsvColor)
[docs]@dataclass class FilterHandlerConfig(BackendInstanceConfig): type: Union[FilterType,str] = '' data: Union[FilterConfig, dict, None] = None def __post_init__(self): self.type = self.resolve(self.type, FilterType) self.data = self.resolve(self.data, self.type.get()._config_class) # todo: something something typing in Factory
[docs]@dataclass class MaskConfig(BackendInstanceConfig): name: Optional[str] = None height: Optional[float] = None filter: Union[FilterHandlerConfig,dict,None] = None def __post_init__(self): self.filter = self.resolve(self.filter, FilterHandlerConfig)
[docs]@dataclass class DesignFileHandlerConfig(CachingBackendInstanceConfig): render_dir: str = '.render' keep_renders: bool = False dpi: int = 400 overlay_alpha: float = 0.1 smoothing: int = 7
[docs]@dataclass class BackendManagerConfig(BackendInstanceConfig): pass
[docs]@dataclass class VideoAnalyzerConfig(BackendManagerConfig): video_path: Optional[str] = None design_path: Optional[str] = None frame_interval_setting: Union[FrameIntervalSetting,str] = '' dt: Optional[float] = 5.0 Nf: Optional[int] = 100 height: float = 0.153e-3 video: Union[VideoFileHandlerConfig,dict,None] = None design: Union[DesignFileHandlerConfig,dict,None] = None transform: Union[TransformHandlerConfig,dict,None] = None masks: Tuple[Union[MaskConfig,dict,None], ...] = (None,) features: Tuple[Union[VideoFeatureType,str], ...] = ('',) def __post_init__(self): self.frame_interval_setting = self.resolve(self.frame_interval_setting, FrameIntervalSetting) self.video = self.resolve(self.video, VideoFileHandlerConfig) self.design = self.resolve(self.design, DesignFileHandlerConfig) self.transform = self.resolve(self.transform, TransformHandlerConfig) self.masks = tuple(self.resolve(self.masks, MaskConfig, iter=True)) self.features = tuple(self.resolve(self.features, VideoFeatureType, iter=True))
[docs]def load(path: str) -> VideoAnalyzerConfig: # todo: internals should be replaced with more sensible methods for setting; reuse those in UI etc. log.debug(f'Loading VideoAnalyzerConfig from {path}') with open(path, 'r') as f: # todo: assuming it is yaml, sanity check? d = yaml.safe_load(f) # Normalize legacy configuration dictionaries if 'version' not in d: log.info(f"Normalizing legacy configuration file {path}") # Pre-v0.2 .meta file d = { 'video_path': d['video'], 'design_path': d['design'], 'transform': {'matrix': d['transform']}, 'masks': [ { 'name': mk, 'filter': { 'data': {'c0': HsvColor(*mv['from']), 'c1': HsvColor(*mv['to'])} } } for mk, mv in zip( d['colors'].keys(), [json.loads(mv) for mv in d['colors'].values()] ) ] } else: if before_version(d['version'], '0.2.1'): # Rename mask[i].filter.filter to mask[].filter.data for m in d['masks']: m['filter']['data'] = m['filter'].pop('filter') if before_version(d['version'], '0.2.2'): # Convert tuple string color '(0,0,0)' to HsvColor string 'HsvColor(h=0, s=0, v=0)' from ast import literal_eval as make_tuple # todo: this is unsafe! for m in d['masks']: if 'c0' in m['filter']['data']: m['filter']['data']['c0'] = str(HsvColor(*make_tuple(m['filter']['data']['c0']))) if 'c1' in m['filter']['data']: m['filter']['data']['c1'] = str(HsvColor(*make_tuple(m['filter']['data']['c1']))) if 'radius' in m['filter']['data']: m['filter']['data']['radius'] = str(HsvColor(*make_tuple(m['filter']['data']['radius']))) # Remove timestamp & version info d.pop('timestamp', None) d.pop('version', None) return VideoAnalyzerConfig(**d)
def _get_dict(config: VideoAnalyzerConfig) -> dict: # Add timestamp & version info d = { 'timestamp': datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S.%f'), 'version': __version__, } d.update(config.to_dict()) return d
[docs]def dump(config: VideoAnalyzerConfig, path:str): with open(path, 'w+') as f: yaml.safe_dump(_get_dict(config),f, width=999)
[docs]def dumps(config: VideoAnalyzerConfig) -> str: return yaml.safe_dump(_get_dict(config), width=999)