import os
import re
import threading
import copy
from typing import Callable, Any, Dict, Generator, Optional, List, Tuple, Type
import datetime
import json
import cv2
import numpy as np
import pandas as pd
from OnionSVG import OnionSVG, check_svg
from isimple import get_logger, settings, ResultSaveMode
from isimple.config import VideoFileHandlerConfig, TransformHandlerConfig, \
FilterHandlerConfig, MaskConfig, \
DesignFileHandlerConfig, VideoAnalyzerConfig, dump, \
FrameIntervalSetting, BaseAnalyzerConfig, FlipConfig
from isimple.core import Lockable
from isimple.core.backend import Instance, CachingInstance, \
Handler, BaseVideoAnalyzer, BackendSetupError, AnalyzerType, Feature, \
FeatureSet, \
FeatureType, backend, AnalyzerState, AnalyzerEvent, FeatureConfig
from isimple.core.config import extend, __meta_ext__, __meta_sheet__
from isimple.core.interface import TransformInterface, FilterConfig, \
FilterInterface, FilterType, TransformType
from isimple.core.streaming import stream, streams
from isimple.maths.colors import Color, HsvColor, BgrColor, convert, css_hex
from isimple.maths.images import to_mask, crop_mask, ckernel, \
overlay, rect_contains
from isimple.maths.coordinates import ShapeCoo, Roi
from isimple.util import frame_number_iterator
log = get_logger(__name__)
[docs]class VideoFileTypeError(BackendSetupError):
msg = 'Unrecognized video file type' # todo: formatting
[docs]class VideoFileHandler(CachingInstance, Lockable):
"""Interface to video files ~ OpenCV
"""
path: str
frame_count: int
fps: float
frame_number: int
_requested_frames: List[int]
_capture: cv2.VideoCapture
_shape: tuple
colorspace: str
_config: VideoFileHandlerConfig
_config_class = VideoFileHandlerConfig
_class = VideoFileHandlerConfig()
_progress_callback: Callable[[float], None]
def __init__(self, video_path, config: VideoFileHandlerConfig = None):
super(VideoFileHandler, self).__init__(config)
self._is_caching = False
if not os.path.isfile(video_path):
raise FileNotFoundError
self.path = video_path
self._cache = None
self._cached = False
self._capture = cv2.VideoCapture(
os.path.join(os.getcwd(), self.path)
) # todo: handle failure to open capture
self.frame_count = int(self._capture.get(cv2.CAP_PROP_FRAME_COUNT)) # todo: should be 'private' attributes
self.fps = self._capture.get(cv2.CAP_PROP_FPS)
self.frame_number = int(self._capture.get(cv2.CAP_PROP_POS_FRAMES))
self._shape = tuple([
int(self._capture.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(self._capture.get(cv2.CAP_PROP_FRAME_HEIGHT)),
])
if self.frame_count == 0:
raise VideoFileTypeError
@property
def config(self) -> VideoFileHandlerConfig:
return self._config
@property
def shape(self):
return self._shape
@property
def cached(self):
return self._cached
[docs] def check_cached(self) -> Optional[bool]:
if self._cache is not None:
self._cached = all([
self._is_cached(self._read_frame, self.path, fn)
for fn in self._requested_frames
])
return self._cached
else:
with self.caching(True):
return self.check_cached()
[docs] def set_requested_frames(self, requested_frames: List[int]) -> None:
"""Add a list of requested frames.
Used to determine which frames to cache in the background and
in `_resolve_frame`
"""
log.debug(f"Requested frames: {requested_frames}")
self._requested_frames = requested_frames
self.check_cached()
# self._touch_keys(
# [self._get_key(self._read_frame, self.path, fn)
# for fn in requested_frames]
# )
def _cache_frames(self, progress_callback: Callable[[float], None], state_callback: Callable[[int], None]):
try:
with self.caching():
self._is_caching = True
for frame_number in self._requested_frames:
if self._cancel_caching.is_set():
break
args = (self._read_frame, self.path, frame_number)
if not self._is_cached(*args):
self._cached_call(*args)
progress_callback(frame_number / float(self.frame_count))
progress_callback(0.0)
self.seek(0.5)
self._is_caching = False
if not self._cancel_caching.is_set():
self._cached = True
else:
self._cancel_caching.clear()
except Exception:
state_callback(AnalyzerState.ERROR)
[docs] def cache_frames(self, progress_callback: Callable[[float], None], state_callback: Callable[[int], None]):
if settings.cache.do_cache and not self.cached:
self._background = threading.Thread(
target=self._cache_frames, args=(progress_callback, state_callback,), daemon=True
)
self._background.start()
def _resolve_frame(self, frame_number) -> int:
"""Resolve a frame_number to the nearest requested frame number.
This is done in order to limit the polled frames to the
frames that are to be cached or are cached already.
"""
if hasattr(self, '_requested_frames'):
return min(
self._requested_frames,
key=lambda x:abs(x - frame_number)
)
else:
return frame_number
def _set_position(self, frame_number: int):
"""Set the position of the OpenCV VideoCapture.
"""
self._capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
self._get_position()
def _get_position(self) -> int:
"""Get the position of the OpenCV VideoCapture.
Due to some internal workings of OpenCV, the actual position the
capture object ends up at may differ from the requested position.
"""
self.frame_number = self._capture.get(cv2.CAP_PROP_POS_FRAMES)
return self.frame_number
def _read_frame(self, _: str, frame_number: int = None) -> np.ndarray:
"""Read frame from video file, HSV color space
the `_` parameter is a placeholder for the (unused) path of the
video file, which is used to make the cache key in order to make
this function cachable across multiple files.
"""
with self.lock():
if frame_number is None:
frame_number = self.frame_number
log.debug(f"reading {self.path} frame {self.frame_number}")
self._set_position(frame_number)
ret, frame = self._capture.read()
if ret:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV, frame)
return frame
else:
log.warning(f"could not read {self.path} frame {self.frame_number}")
[docs] @backend.expose(backend.get_time)
def get_time(self, frame_number: int = None) -> float:
if frame_number is None:
frame_number = self.frame_number
return self._resolve_frame(frame_number) / self.fps
[docs] @backend.expose(backend.get_fps)
def get_fps(self) -> float:
return self.fps
[docs] @backend.expose(backend.get_total_time)
def get_total_time(self) -> float:
return self.frame_count / self.fps
[docs] @stream
@backend.expose(backend.get_raw_frame)
def read_frame(self, frame_number: Optional[int] = None) -> np.ndarray:
"""Wrapper for `_read_frame`.
Enables caching (if in a caching context!) and provides the video
file's path to determine the cache key.
"""
if frame_number is None:
frame_number = self.frame_number
if settings.cache.resolve_frame_number:
return self._cached_call(self._read_frame, self.path, self._resolve_frame(frame_number))
else:
return self._cached_call(self._read_frame, self.path, frame_number)
[docs] def seek(self, position: float = None) -> float:
"""Seek to the relative position ~ [0,1]
"""
# todo: had to remove lock to enable reading frames :/
# (otherwise streams.update() can get deadocked @ VideoFileHandler if not reading frames from cache)
if position is not None:
frame_number = int(position * self.frame_count)
if settings.cache.resolve_frame_number:
self.frame_number = self._resolve_frame(frame_number)
else:
self.frame_number = frame_number
log.debug(f"seeking {self.path} {self.frame_number}/{self.frame_count}")
streams.update()
return self.frame_number / self.frame_count
[docs] @backend.expose(backend.get_seek_position)
def get_seek_position(self) -> float:
"""Get current relative position ~ [0,1]
"""
return self.frame_number / self.frame_count
[docs]class FilterHandler(Instance, Handler):
_implementation: FilterInterface
_implementation_factory = FilterType
_implementation_class = FilterInterface
_config_class = FilterHandlerConfig
_config: FilterHandlerConfig
def __init__(self, config: FilterHandlerConfig = None):
super(FilterHandler, self).__init__(config)
if config is not None:
self._config = config
else:
self._config = FilterHandlerConfig()
self.set_implementation()
@property
def config(self) -> FilterHandlerConfig:
return self._config
[docs] def set_config(self, config: dict) -> None:
self._config(**config)
self.set_implementation()
@property
def implementation(self):
return self._implementation
[docs] def mean_color(self) -> Color:
return self.implementation.mean_color(self.config.data)
[docs] def set(self, color: HsvColor = None) -> FilterConfig:
# if isinstance(self.config.data, dict): todo: should be ok
# self.config.data = self.implementation.config_class()(**self.config.data)
if color is None:
color = HsvColor(0,0,0)
self.config(data=self.implementation.set_filter(self.config.data, color))
log.debug(f"Filter config: {self.config}")
return self.config.data
[docs] def set_implementation(self, implementation: str = None) -> str:
if implementation is None:
implementation = self.config.type.__str__()
implementation = super(FilterHandler, self).set_implementation(implementation)
# Keep matching config fields accross implementations
old_data = self.config.data.to_dict()
new_keys = list(self.implementation.config_class().__fields__.keys())
self.config(
type=implementation,
data=self.implementation.config_class()(
**{key:old_data[key] for key in new_keys if key in old_data}
)
)
return implementation
def __call__(self, frame: np.ndarray, mask: np.ndarray = None) -> np.ndarray:
return self.implementation.filter(self.config.data, frame, mask)
[docs]class Mask(Instance):
"""Handles masks in the context of a video file
"""
filter: FilterHandler
_design: 'DesignFileHandler'
_config_class = MaskConfig
_config: MaskConfig
_dpi: float
_full: np.ndarray
_part: np.ndarray
_rect: np.ndarray
_center: Tuple[int,int]
def __init__(
self,
design: 'DesignFileHandler',
mask: np.ndarray,
name: str,
config: MaskConfig = None,
filter: FilterHandler = None,
):
if config is None:
config = MaskConfig()
super(Mask, self).__init__(config)
self._design = design
self._full = mask
self.config(name=name)
self._full = mask
self._part, self._rect, self._center = crop_mask(self._full)
# Each Mask should have its own FilterHandler instance, unless otherwise specified
if filter is None:
if self.config.filter is not None:
assert isinstance(self.config.filter, FilterHandlerConfig)
filter = FilterHandler(self.config.filter)
assert isinstance(filter, FilterHandler), BackendSetupError
self.filter = filter
self.config(filter=self.filter.config)
@property
def config(self) -> MaskConfig:
return self._config
[docs] def set_config(self, config: dict) -> None:
self._config(**config)
self.filter.set_config(self.config.filter.to_dict())
[docs] def set_filter(self, color: HsvColor):
self.filter.set(color=color)
self.config.filter(**self.filter.config.to_dict()) # todo: otherwise, mask config is not updated!
def __call__(self, img: np.ndarray) -> np.ndarray:
"""Mask an image.
Writes to the provided variable!
If caller needs the original value, they should copy explicitly
"""
img = self._crop(img)
return cv2.bitwise_and(img, img, mask=self.part)
def _crop(self, img: np.ndarray) -> np.ndarray:
"""Crop an image to fit self._part
Writes to the provided variable!
If caller needs the original value, they should copy explicitly
"""
return img[self.rows, self.cols]
[docs] def contains(self, coordinate: ShapeCoo) -> bool:
if rect_contains(self.rect, coordinate):
return bool(
self.part[coordinate.idx[0] - self.rect[0], coordinate.idx[1] - self.rect[2]]
)
else:
return False
@property
def design(self):
return self._design
@property
def rows(self):
return slice(self._rect[0], self._rect[1])
@property
def cols(self):
return slice(self._rect[2], self._rect[3])
@property
def name(self):
return self.config.name
@property
def part(self):
return self._part
@property
def rect(self) -> np.ndarray:
return self._rect
@property
def ready(self):
return self.config.ready
@property
def skip(self):
return self.config.skip
[docs]class DesignFileHandler(CachingInstance):
_overlay: np.ndarray
_masks: List[Mask]
_config: DesignFileHandlerConfig
_config_class = DesignFileHandlerConfig
def __init__(self, path: str, config: DesignFileHandlerConfig = None, mask_config: Tuple[MaskConfig,...] = None):
super(DesignFileHandler, self).__init__(config)
if not os.path.isfile(path):
raise FileNotFoundError
self._path = path
self._render(path, mask_config)
def _render(self, path: str = None, mask_config: Tuple[MaskConfig, ...] = None):
if path is None:
path = self._path
with self.caching():
self._overlay = self.peel_design(path, self.config.dpi)
self._shape = (self._overlay.shape[1], self._overlay.shape[0])
self._masks = []
for i, (mask, name) in enumerate(zip(*self.read_masks(path, self.config.dpi))):
if mask_config is not None and len(mask_config) > 0 and len(mask_config) >= i + 1: # handle case len(mask_config) < len(self.read_masks(path))
self._masks.append(
Mask(self, mask, name, mask_config[i])
)
else:
self._masks.append(Mask(self, mask, name))
@property
def config(self) -> DesignFileHandlerConfig:
return self._config
def _clear_renders(self):
log.debug(f'Clearing render directory {settings.render.dir}')
renders = [f for f in os.listdir(settings.render.dir)]
for f in renders:
os.remove(os.path.join(settings.render.dir, f))
def _peel_design(self, design_path, dpi) -> np.ndarray:
if not os.path.isdir(settings.render.dir):
os.mkdir(settings.render.dir)
else:
self._clear_renders()
check_svg(design_path)
OnionSVG(design_path, dpi=dpi).peel(
'all', to=settings.render.dir # todo: should maybe prepend file name to avoid overwriting previous renders?
)
print("\n")
overlay = cv2.imread(
os.path.join(settings.render.dir, 'overlay.png')
)
return overlay
def _read_masks(self, _, __) -> Tuple[List[np.ndarray], List[str]]:
files = os.listdir(settings.render.dir)
files.remove('overlay.png')
# Catch file names of numbered layers
pattern = re.compile(r'(\d+)[?\-=_#/\\\ ]+([?\w\-=_#/\\\ ]+)')
sorted_files = []
matched = {}
mismatched = []
for path in files:
match = pattern.search(os.path.splitext(path)[0])
path = os.path.join(settings.render.dir, path)
if match:
matched.update( # numbered layer
{int(match.groups()[0]): path} # keep track of index
)
else:
mismatched.append(path) # not a numbered layer
# Sort numbered layers
for index in sorted(matched.keys()):
sorted_files.append(matched[index])
# Append unnumbered layers to the end of the list
sorted_files = sorted_files + mismatched
masks = []
names = []
for path in sorted_files:
masks.append(
to_mask(
cv2.imread(path), ckernel(self.config.smoothing)
)
)
match = pattern.search(path)
if match:
names.append(match.groups()[1].strip())
else:
names.append(path)
if not settings.render.keep:
self._clear_renders()
return masks, names
[docs] def peel_design(self, design_path: str, dpi: int) -> np.ndarray:
return self._cached_call(self._peel_design, design_path, dpi)
[docs] def read_masks(self, design_path: str, dpi: int) -> Tuple[List[np.ndarray], List[str]]:
return self._cached_call(self._read_masks, design_path, dpi)
@property
def shape(self):
return self._shape
[docs] def overlay(self) -> np.ndarray:
return cv2.cvtColor(self._overlay, cv2.COLOR_BGR2HSV)
[docs] def overlay_frame(self, frame: np.ndarray) -> np.ndarray:
frame = cv2.cvtColor(frame, cv2.COLOR_HSV2BGR)
frame = overlay(frame, self._overlay, self.config.overlay_alpha)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
return frame
@property
def masks(self):
return self._masks
[docs]class MaskFunction(Feature):
mask: Mask
filter: FilterHandler
dpi: int
_feature_type: FeatureType
def __init__(self, mask: Mask, global_config: FeatureConfig, config: Optional[dict] = None):
self.mask = mask
self.filter = mask.filter
self.dpi = mask.design.config.dpi
super(MaskFunction, self).__init__(
(self.mask, self.filter), global_config, config
)
self._feature_type = FeatureType(self.__class__.__name__)
@property
def name(self):
return self.mask.name
@property
def feature_type(self) -> FeatureType:
return self._feature_type
@property
def ready(self) -> bool:
return self.mask.config.ready
@property
def skip(self):
return self.mask.config.skip
[docs] def px2mm(self, value):
"""Convert design-space pixels to mm
:param value: # of pixels
:return:
"""
return value / (self.dpi / 25.4)
[docs] def pxsq2mmsq(self, value):
"""Convert design-space pixels to mm²
:param value:
:return:
"""
return value / (self.dpi / 25.4) ** 2
def _guideline_color(self) -> Color:
return self.filter.mean_color()
[docs] def value(self, frame) -> Any:
return self._function(self.filter(self.mask(frame), self.mask.part))
[docs] def state(self, frame: np.ndarray, state: np.ndarray) -> np.ndarray:
"""Generate a state image (BGR)
"""
if not self.skip:
if self.ready:
# Masked & filtered pixels ~ frame
binary = self.filter(self.mask(frame), self.mask.part)
substate = np.multiply(
np.ones((binary.shape[0], binary.shape[1], 3),
dtype=np.uint8),
convert(self.color, BgrColor).np3d
)
state[self.mask.rows, self.mask.cols, :] \
+= cv2.bitwise_and(substate, substate, mask=binary)
else:
# Not ready -> highlight feature with a rectangle
substate = np.zeros((*self.mask.part.shape, 3), dtype=np.uint8)
for i,c in enumerate(convert(self.color, BgrColor).list): # todo: numpy this
substate[0:2, :, i] = c
substate[-2:, :, i] = c
substate[:, 0:2, i] = c
substate[:, -2:, i] = c
state[self.mask.rows, self.mask.cols, :] += substate
# todo: overwrites rect with white
return state
def _function(self, frame: np.ndarray) -> Any:
raise NotImplementedError
[docs]@extend(AnalyzerType)
class VideoAnalyzer(BaseVideoAnalyzer):
"""Main video handling class
* Load frames from video files
* Load mask files
* Load/save measurement metadata
"""
_config: VideoAnalyzerConfig
_config_class = VideoAnalyzerConfig
# _endpoints: BackendRegistry = backend
video: VideoFileHandler
design: DesignFileHandler
transform: TransformHandler
features: Tuple[Feature,...]
results: Dict[str, pd.DataFrame]
_featuresets: Dict[FeatureType, FeatureSet]
def __init__(self, config: VideoAnalyzerConfig = None):
super().__init__(config)
self.results: Dict[FeatureType, pd.DataFrame] = {}
self._gather_instances()
@property
def config(self) -> VideoAnalyzerConfig:
return self._config
def _gather_config(self):
# todo: would be nice if this wasn't necessary :(
if hasattr(self, 'video') and self.config.video != self.video.config:
self.config(video=self.video.config)
if hasattr(self, 'design') and self.config.design != self.design.config:
self.config(design=self.design.config)
if hasattr(self, 'transform') and self.config.transform != self.transform.config:
self.config(transform=self.transform.config)
if hasattr(self, 'masks'):
self.config(masks=tuple([mask.config for mask in self.masks]))
@property
def cached(self) -> bool:
if hasattr(self, 'video'):
return self.video.cached
else:
return False
@property
def has_results(self) -> bool:
return hasattr(self, 'results')
[docs] def can_launch(self) -> bool:
video_ok = False
design_ok = False
if self.config.video_path is not None:
video_ok = os.path.isfile(self.config.video_path)
if not video_ok:
log.warning(f"invalid video file: {self.config.video_path}")
if self.config.design_path is not None:
design_ok = os.path.isfile(self.config.design_path)
if not video_ok:
log.warning(f"invalid design file: {self.config.design_path}")
return video_ok and design_ok
[docs] def can_filter(self) -> bool:
return self.transform.is_set
[docs] def can_analyze(self) -> bool:
return self.launched and all([mask.ready or mask.skip for mask in self.masks])
def _launch(self):
self.load_config()
log.debug(f'{self.__class__.__name__}: launch nested instances.')
self.video = VideoFileHandler(
self.config.video_path,
self.config.video
)
self.video.set_requested_frames(list(self.frame_numbers()))
self.design = DesignFileHandler(
self.config.design_path,
self.config.design,
self.config.masks
)
self.transform = TransformHandler(
self.video.shape,
self.design.shape,
self.config.transform
)
self.masks = self.design.masks
self.filters = [mask.filter for mask in self.masks]
# Link self._config to _config of nested instances
self._gather_config()
# Initialize FeatureSets
self._get_featuresets()
def _get_featuresets(self):
self._featuresets = {
feature: FeatureSet(
tuple(
feature.get()(
mask,
global_config,
mask.config.parameters[index] if index < len(mask.config.parameters) else None
) for mask in self.design.masks
),
) for index, (feature, global_config) in enumerate(
zip(
self.config.features,
self.config.feature_parameters
)
)
}
self.get_colors()
def _new_results(self):
self.results = {}
for fs, feature in zip(self._featuresets.values(), self.config.features):
self.results[str(feature)] = pd.DataFrame(
[],
columns=['time'] + [f.name for f in fs.features],
index=list(self.frame_numbers())
)
@property
def position(self):
if hasattr(self, 'video'):
return self.video.get_seek_position()
else:
return -1.0
def _set_config(self, config: dict):
# todo: would be better if nested instance config was *referencing* global config
if 'video' in config and hasattr(self, 'video'):
self.video._config(**config.pop('video'))
if 'design' in config and hasattr(self, 'design'):
self.design._config(**config.pop('design'))
if 'transform' in config and hasattr(self, 'transform'):
self.transform._config(**config.pop('transform'))
if 'masks' in config and hasattr(self, 'masks'):
for mask, mask_config in zip(self.masks, config.pop('masks')):
mask._config(**mask_config)
mask.filter._config(**mask_config['filter'])
self._config(**config)
self._gather_config()
[docs] def set_config(self, config: dict, silent: bool = False) -> dict:
with self.lock():
if True: # todo: sanity check
do_commit = False
do_relaunch = False
log.debug(f"Setting VideoAnalyzerConfig to {config}")
previous_name = copy.copy(self.config.name)
previous_desc = copy.copy(self.config.description)
previous_Nf = copy.copy(self.config.Nf)
previous_dt = copy.copy(self.config.dt)
previous_fis = copy.copy(self.config.frame_interval_setting)
previous_features = copy.deepcopy(self.config.features) # todo: clean up
previous_video_path = copy.deepcopy(self.config.video_path)
previous_design_path = copy.deepcopy(self.config.design_path)
previous_design = copy.deepcopy(self.config.design)
previous_flip = copy.deepcopy(self.config.transform.flip)
previous_turn = copy.deepcopy(self.config.transform.turn)
previous_roi = copy.deepcopy(self.config.transform.roi)
previous_masks = copy.deepcopy(self.config.masks)
# Set implementations
if hasattr(self, 'transform') and 'transform' in config and 'type' in config['transform']:
self.transform.set_implementation(config['transform']['type']) # todo: shouldn't do this every time
if hasattr(self, 'design') and 'masks' in config:
for i, mask in enumerate(self.design.masks):
if 'type' in config['masks'][i]['filter']:
mask.filter.set_implementation(config['masks'][i]['filter']['type']) # todo: shouldn't do this every time
self._set_config(config)
if hasattr(self, 'transform'):
self.transform._config(**self.config.transform.to_dict())
self.transform.set_implementation() # todo: shouldn't do this every time
if hasattr(self, 'design'):
self.design._config(**self.config.design.to_dict())
# Check for file changes
if self.launched and previous_video_path != self.config.video_path:
do_commit = True
do_relaunch = True
if self.launched and previous_design_path != self.config.design_path:
do_commit = True
do_relaunch = True
# Check for design render changes
if previous_design != self.config.design:
self.design._render(mask_config=self.config.masks)
self.transform._design_shape = self.design.shape
self.estimate_transform()
do_commit = True
# Check for name/description changes
if self.config.name != previous_name:
do_commit = True
if self.config.description != previous_desc:
do_commit = True
# Check for changes in frames
if hasattr(self, 'video'):
if any([
previous_Nf != self.config.Nf,
previous_dt != self.config.dt,
previous_fis != self.config.frame_interval_setting
]):
self.video.set_requested_frames(list(self.frame_numbers()))
do_commit = True
# Check for changes in features
if previous_features != self.config.features:
if self.launched:
self._get_featuresets()
do_commit = True
# Get featureset instances todo: overlap with previous block?
if self.launched and not self._featuresets:
self._get_featuresets()
# Check for ROI adjustments
if previous_flip != self.config.transform.flip \
or previous_turn != self.config.transform.turn \
or previous_roi != self.config.transform.roi:
self.estimate_transform() # todo: self.config.bla.thing should be exactly self.bla.config.thing always
do_commit = True
# Check for mask adjustments
if previous_masks != self.config.masks:
for i, mask in enumerate(self.design.masks):
mask.set_config(self.config.masks[i].to_dict())
do_commit = True
if do_commit and not silent: # todo: better config handling in AnalysisMdoel.store() instead!
self.commit()
if do_relaunch:
self._launch()
self._gather_instances()
# Check for state transitions
self.state_transition(push=True)
config = self.get_config()
# Push config event
self.event(AnalyzerEvent.CONFIG, config)
# Push streams
streams.update()
return config
[docs] @backend.expose(backend.get_overlaid_frame)
def get_frame_overlay(self, frame_number: int) -> np.ndarray:
return self.design.overlay_frame(
self.get_transformed_frame(frame_number))
[docs] @backend.expose(backend.get_colors) # todo: per feature in each feature set; maybe better as a dict instead of a list of tuples?
def get_colors(self) -> Dict[str, Tuple[str, ...]]:
return {str(k):tuple([css_hex(c) for c in featureset.resolve_colors()]) for k, featureset in self._featuresets.items()}
[docs] def frame_numbers(self) -> Generator[int, None, None]:
if self.config.frame_interval_setting == FrameIntervalSetting('Nf'):
return frame_number_iterator(self.video.frame_count-1, Nf = self.config.Nf)
elif self.config.frame_interval_setting == FrameIntervalSetting('dt'):
return frame_number_iterator(self.video.frame_count-1, dt = self.config.dt, fps = self.video.fps)
else:
raise NotImplementedError(self.config.frame_interval_setting)
[docs] @stream
@backend.expose(backend.get_inverse_overlaid_frame)
def get_inverse_overlaid_frame(self, frame_number: Optional[int] = None) -> np.ndarray:
if self.transform.is_set:
return cv2.cvtColor( # todo: loads of unnecessary color conversion here
overlay(
cv2.cvtColor(self.video.read_frame(frame_number), cv2.COLOR_HSV2BGR),
self.transform.inverse(self.design._overlay),
alpha=self.design.config.overlay_alpha
), cv2.COLOR_BGR2HSV)
else:
log.debug('transform not set, showing raw frame')
return self.video.read_frame(frame_number)
[docs] @backend.expose(backend.seek)
def seek(self, position: float = None) -> float:
self.video.seek(position)
self.push_status()
return self.position
[docs] @backend.expose(backend.clear_roi)
def clear_roi(self) -> None:
self.transform.clear()
self.state_transition()
[docs] @backend.expose(backend.turn_cw)
def turn_cw(self):
self.set_config(
{'transform': {'turn': self.config.transform.turn + 1}}
)
[docs] @backend.expose(backend.turn_ccw)
def turn_ccw(self):
self.set_config(
{'transform': {'turn': self.config.transform.turn - 1}}
)
[docs] @backend.expose(backend.flip_h)
def flip_h(self):
self.set_config(
{'transform': {'flip': {'horizontal': not self.config.transform.flip.horizontal}}}
)
[docs] @backend.expose(backend.flip_v)
def flip_v(self):
self.set_config(
{'transform': {'flip': {'vertical': not self.config.transform.flip.vertical}}}
)
[docs] @backend.expose(backend.undo_config)
def undo_config(self, context: str = None) -> dict: # todo: implement undo/redo context (e.g. transform, masks)
self.model.undo_config(context)
return self.get_config()
[docs] @backend.expose(backend.redo_config)
def redo_config(self, context: str = None) -> dict:
self.model.redo_config(context)
return self.get_config()
[docs] @backend.expose(backend.set_filter_click)
def set_filter_click(self, relative_x: float, relative_y: float) -> dict:
log.debug(f'set_filter_click @ ({relative_x}, {relative_y})')
response: Dict[str, Any] = {}
click = ShapeCoo(
x = relative_x,
y = relative_y,
shape = self.design.shape[::-1]
)
hits = [mask for mask in self.masks if mask.contains(click)]
if len(hits) == 1:
hit = hits[0]
frame = self.video.read_frame()
click = self.transform.coordinate(click)
color = HsvColor(*click.value(frame))
log.debug(f"color @ {click.idx}: {color}")
hit.set_filter(color)
for fs in self._featuresets.values():
for feature in fs.features:
assert isinstance(feature, MaskFunction)
try:
if feature.mask == hit: # type: ignore
feature._ready = True
except AttributeError:
pass
self.get_colors()
self.state_transition()
self.event(AnalyzerEvent.CONFIG, self.get_config())
self.commit()
streams.update()
self.commit()
response['color'] = color.to_dict()
elif len(hits) == 0:
log.debug(f"no hit for {click.idx}")
elif len(hits) > 1:
response['message'] = f"Multiple valid options: " \
f"{[hit.name for hit in hits]}. " \
f"Select a point where masks don't overlap."
log.warning(response['message'])
return response
[docs] @stream
@backend.expose(backend.get_state_frame)
def get_state_frame(self, frame_number: Optional[int] = None, featureset: Optional[int] = None) -> np.ndarray:
# todo: eliminate duplicate code ~ calculate (calculate should just call get_state_frame, ideally)
if featureset is None:
featureset = 0
# Empty state image in BGR
state = np.zeros(self.design._overlay.shape, dtype=np.uint8)
if hasattr(self, '_featuresets') and len(self._featuresets):
frame = self.transform(self.video.read_frame(frame_number))
k,fs = list(self._featuresets.items())[featureset]
for feature in fs._features:
if not feature.skip:
state = feature.state(
frame.copy(), # don't overwrite self.frame ~ cv2 dst parameter
state # additive; each feature adds to state
)
# Add overlay on top of state
# state = overlay(self.design._overlay.copy(), state, self.design.config.overlay_alpha)
else:
log.debug('skipping state frame')
state[np.equal(state, 0)] = 255
return cv2.cvtColor(state, cv2.COLOR_BGR2HSV)
[docs] @backend.expose(backend.get_overlay_png)
def get_overlay_png(self) -> bytes:
_, buffer = cv2.imencode('.png', self.design._overlay.copy())
return buffer.tobytes()
[docs] @backend.expose(backend.get_mask_rects)
def get_mask_rects(self) -> Dict[str, np.ndarray]:
# todo: placeholder -- mask rect info to frontend (in relative coordinates)
return {mask.name: mask.rect for mask in self.masks}
[docs] def calculate(self, frame_number: int):
"""Return a state image for each FeatureSet
"""
log.debug(f"calculating features for frame {frame_number}")
try:
t = self.video.get_time(frame_number)
raw_frame = self.video.read_frame(frame_number)
if raw_frame is not None:
frame = self.transform(raw_frame)
result = {'t': t}
for k,fs in self._featuresets.items():
values = []
for feature in fs._features: # todo: make featureset iterable maybe?
value, state = feature.calculate(
frame.copy(), # don't overwrite self.frame ~ cv2 dst parameter # todo: better to let OpenCV handle copying, or not?
)
values.append(value)
result.update({k: values})
self.results[k].loc[frame_number] = [t] + values
self.set_progress(frame_number / self.video.frame_count)
self.event(AnalyzerEvent.RESULT, result)
else:
log.warning(f"skipping unreadable frame {frame_number}")
except cv2.error as e:
log.error(str(e))
self._error.set()
[docs] def analyze(self):
with self.busy_context(AnalyzerState.ANALYZING, AnalyzerState.DONE):
assert isinstance(self._cancel, threading.Event)
if self.model is None:
log.warning(f"{self} has no database model; result data may be lost")
with self.lock(), self.time(f"Analyzing {self.id}", log):
self._get_featuresets()
self.commit()
self._new_results()
self.event(AnalyzerEvent.RMETAD, {'colors': self.get_colors()})
for fn in self.frame_numbers():
if not self.canceled and not self.errored:
self.calculate(fn)
else:
break
self.commit()
self.export()
if self.canceled:
self.clear_cancel()
self.set_state(AnalyzerState.CANCELED)
if self.errored:
self.clear_error()
self.set_state(AnalyzerState.ERROR)
[docs] def export(self):
"""Export video analysis results & metadata to .xlsx"""
base_f = None
if settings.app.save_result == ResultSaveMode.next_to_video:
base_f = str(os.path.splitext(self._config.video_path)[0])
elif settings.app.save_result == ResultSaveMode.next_to_design:
base_f = str(os.path.splitext(self._config.design_path)[0])
elif settings.app.save_result == ResultSaveMode.directory:
base_f = os.path.join(str(settings.app.result_dir), self.config.name)
if base_f is not None:
f = base_f + ' ' + datetime.datetime.now().strftime(
settings.format.datetime_format_fs
) + '.xlsx'
w = pd.ExcelWriter(f)
for k, v in self.results.items():
v.to_excel(w, sheet_name=k)
pd.DataFrame([json.dumps(self.get_config(), indent=2)]).to_excel(w, sheet_name=__meta_sheet__)
w.save()
w.close()
log.info(f"{self.id} results exported to {f}")
else:
log.warning(f"{self.id} results were not exported!")
[docs] @backend.expose(backend.get_results)
def get_result(self) -> dict:
return {
# Convert NaN to None -> JSON serializable
str(feature):result.where(pd.notnull(result),None).to_dict(orient='split')
for feature,result in self.results.items()
}
[docs] def load_config(self):
"""Load video analysis configuration from history database
"""
if self._model is not None:
log.debug('loading config from database...')
include = ['video', 'design', 'transform', 'masks']
config = self.model.load_config(
include=include
)
if config is not None:
self._set_config(config)
self.commit()
log.info(f'config ~ database: {config}')
log.info(f'loaded as {self.config}')
else:
log.warning('could not load config')
@property # todo: this was deprecated, right?
def _video_to_hash(self):
return self.config.video_path
@property
def _design_to_hash(self):
return self.config.design_path
[docs]def init(config: BaseAnalyzerConfig) -> BaseVideoAnalyzer:
mapping: Dict[Type[BaseAnalyzerConfig], Type[BaseVideoAnalyzer]] = {
VideoAnalyzerConfig: VideoAnalyzer
}
return mapping[type(config)](config)