Source code for isimple.db

import json
from typing import Optional, Tuple, List, Dict, Type
from pathlib import Path
import datetime

from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine, Column, Integer, Float, String, DateTime, ForeignKey

from isimple.core.db import Base, DbModel, SessionWrapper, FileModel, BaseAnalysisModel
from isimple import Settings, settings, get_logger
from isimple.config import normalize_config, VideoAnalyzerConfig

from isimple.core.backend import BaseVideoAnalyzer, BaseAnalyzerConfig


log = get_logger(__name__)


[docs]class VideoFileModel(FileModel): __tablename__ = 'video_file'
[docs] def resolve(self) -> 'VideoFileModel': video = super().resolve() assert isinstance(video, VideoFileModel) return video
[docs]class DesignFileModel(FileModel): __tablename__ = 'design_file'
[docs] def resolve(self) -> 'DesignFileModel': design = super().resolve() assert isinstance(design, DesignFileModel) return design
[docs]class ConfigModel(DbModel): __tablename__ = 'config' id = Column(Integer, primary_key=True) video = Column(Integer, ForeignKey('video_file.id')) design = Column(Integer, ForeignKey('design_file.id')) analysis = Column(Integer, ForeignKey('analysis.id')) json = Column(String) added = Column(DateTime)
[docs]class ResultsModel(DbModel): __tablename__ = 'results' id = Column(Integer, primary_key=True) analysis = Column(Integer, ForeignKey('analysis.id')) feature = Column(String) """The feature that was analyzed""" data = Column(String) """Results of the analysis. In JSON, formatted ~ ``pandas.DataFrame.to_json(orient='split')``""" started = Column(DateTime) finished = Column(DateTime) elapsed = Column(Float)
[docs]class AnalysisModel(BaseAnalysisModel): """Database model of an analysis. Contains a reference to a ``BaseVideoAnalyzer`` instance. """ __tablename__ = 'analysis' _analyzer: Optional[BaseVideoAnalyzer] _video: Optional[VideoFileModel] _design: Optional[DesignFileModel] _config: Optional[ConfigModel] _added_by_context: Dict[str, datetime.datetime] id = Column(Integer, primary_key=True) video = Column(Integer, ForeignKey('video_file.id')) design = Column(Integer, ForeignKey('design_file.id')) config = Column(Integer, ForeignKey('config.id')) results = Column(Integer, ForeignKey('results.id')) name = Column(String) description = Column(String) added = Column(DateTime) modified = Column(DateTime) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._resolve_attributes() def _resolve_attributes(self): for attr in ['_analyzer', '_video', '_design', '_config']: if not hasattr(self, attr): setattr(self, attr, None)
[docs] def get_name(self) -> str: """Name of the analysis from the database. Unset names are reset to '#{id}' """ with self.session(): if self.name is None: self.name = f"#{self.id}" return self.name
[docs] def set_analyzer(self, analyzer: BaseVideoAnalyzer): self._analyzer = analyzer
def _add_video(self, path: str) -> VideoFileModel: model = VideoFileModel(path=path) model.connect(self) return model def _add_design(self, path: str) -> DesignFileModel: model = DesignFileModel(path=path) model.connect(self) return model def _resolve_files(self): if self._analyzer.config.video_path and ( self._video is None or self._video.get('path') != self._analyzer.config.video_path): try: self._video = self._add_video( path=self._analyzer.config.video_path) except ValueError as e: pass if self._analyzer.config.design_path and ( self._design is None or self._design.get('path') != self._analyzer.config.design_path): try: self._design = self._add_design( path=self._analyzer.config.design_path) except ValueError as e: pass if self._video is not None: self._video = self._video.resolve() if self._design is not None: self._design = self._design.resolve() with self.session() as s: if self._video is not None: self.video = self._video.id if self._design is not None: self.design = self._design.id def _add_config(self, json: str) -> Optional[ConfigModel]: with self.session(): video = self.video design = self.design analysis = self.id if video is not None or design is not None: model = ConfigModel( video=video, design=design, analysis=analysis, json = json, added=datetime.datetime.now(), ) model.connect(self) return model else: return None
[docs] def store(self): # todo: consider passing analyzer to store() instead of keeping a reference """Store analysis information from wrapped ``BaseVideoAnalyzer`` to the database.""" self._resolve_attributes() if self._analyzer is not None: config_json = json.dumps(self._analyzer.get_config(do_tag=True)) self._resolve_files() if self._config is None: self._config = self._add_config(json=config_json) else: if config_json != self._config.get('json'): self._config = self._add_config(json=config_json) with self.session() as s: if self._analyzer.config.name is not None: if not self._analyzer.config.name.strip(): self._analyzer.config.name = f"#{self.id}" else: self.name = self._analyzer.config.name.strip() self._analyzer.config.name = self.name if self._analyzer.config.description is not None: self.description = self._analyzer.config.description s.commit() if self._config is not None: self.config = self._config.id # Store results for k, df in self._analyzer.results.items(): # Add columnsfe if not df.isnull().all().all(): model = ResultsModel( analysis=self.id, feature=k, data=df.to_json(orient='split'), ) # todo: should have a _results: Dict[ <?>, ResultsModel] so these don't spawn new results each time s.add(model) # Store timing info t = self._analyzer.timing if t is not None: model.started = datetime.datetime.fromtimestamp(t.t0) model.finished = datetime.datetime.fromtimestamp(t.t1) model.elapsed = t.elapsed s.commit() self.results = model.id
[docs] def load_config(self, video_path: str = None, design_path: str = None, include: List[str] = None) -> Optional[dict]: """Load configuration from the database. Parameters ---------- video_path: str Path to video file design_path: str Path to design file include: List[str] List of fields which must be included in the configuration. If a matching ConfigModel doesn't provide all of these, the other matches will be parsed to complete it. Returns ------- dict: Configuration dict, if a matching config is found. Otherwise, returns ``None`` """ if include is None: include = ['transform', 'masks'] # Check whether all fields in include are valid for field in include: assert field in VideoAnalyzerConfig.__fields__, \ f"'{field}' in `include` is not a `VideoAnalyzerConfig` field." if video_path is not None: self._video = self._add_video(path=video_path) if design_path is not None: self._design = self._add_design(path=design_path) if self._video is not None: self._video = self._video.resolve() if self._design is not None: self._design = self._design.resolve() # Query for latest usages of video.id & design.id) with self.session() as s: q = s.query(ConfigModel) q = q.filter(ConfigModel.video == self._video.id) if self._design is not None: q = q.filter(ConfigModel.design == self._design.id) q = q.filter(ConfigModel.analysis != self.id) config = {} for match in q.order_by(ConfigModel.id.desc()): match_config = normalize_config(json.loads(match.json)) # Assimilate `include` fields from match for field in include: if field in match_config: config[field] = match_config[field] # Check if enough info in ìncluded config ok = [] if 'transform' in config and 'transform' in include: # 'transform' field should contain ROI if 'roi' in config['transform']: if config['transform']['roi'] is not {}: ok.append(True) include.remove('transform') if 'masks' in config and 'masks' in include: # 'masks' field should not be empty if len(config['masks']) > 0: ok.append(True) include.remove('masks') if len(ok) > 0 and all(ok): break return config else: return None
[docs] def get_config_json(self) -> Optional[str]: with self.session() as s: return s.query(ConfigModel.json).\ filter(ConfigModel.id == self.config).first()[0] # todo: why does it return a tuple of length 1?
def _fetch_latest_config(self) -> Optional[ConfigModel]: with self.session() as s: return s.query(ConfigModel). \ order_by(ConfigModel.added.desc()). \ first() # todo: check if ordering by datetime works properly def _added(self, context: str = None) -> datetime.datetime: if self._config is None: self._config = self._fetch_latest_config() if not hasattr(self, '_added_by_context'): self._added_by_context = {} with self.session() as s: if context not in self._added_by_context: if self._config is not None: assert isinstance(self._config.added, datetime.datetime) return self._config.added else: assert isinstance(self.added, datetime.datetime) return self.added else: return self._added_by_context[context] def _step_config(self, filter, order, context: str = None) -> Optional[dict]: with self.session() as s: q = list( s.query(ConfigModel).\ filter(ConfigModel.video == self.video).\ filter(ConfigModel.design == self.design).\ filter(filter).\ order_by(order) ) for match in q: assert isinstance(match, ConfigModel) assert isinstance(match.json, str) # todo: fail more gracefully if json is empty; skip & remove from database config = normalize_config(json.loads(match.json)) if context is None: self._config = match self._config.connect(self) s.add(self._config) return config else: assert self._analyzer is not None if context in config and config[context] != self._analyzer.config.to_dict()[context]: self._config = None assert isinstance(match.added, datetime.datetime) self._added_by_context[context] = match.added return {context: config[context]} return None
[docs] def undo_config(self, context: str = None): """Undo configuration. If a ``context`` is supplied, ensure that the ``context`` field changes, but the other fields remain the same Parameters ---------- context: str Name of a ``VideoAnalyzerConfig`` field Raises ------ ValueError If ``context`` is not a ``VideoAnalyzer`` field """ if context is None or context in VideoAnalyzerConfig.__fields__: config = self._step_config( ConfigModel.added < self._added(context), ConfigModel.added.desc(), context ) if self._analyzer is not None and config is not None: self._analyzer.set_config(config=config, silent=(context is None)) else: raise ValueError(f"Invalid undo context '{context}'")
[docs] def redo_config(self, context: str = None): """Redo configuration. If a ``context`` is supplied, ensure that the ``context`` field changes, but the other fields remain the same Parameters ---------- context: str Name of a ``VideoAnalyzerConfig`` field Raises ------ ValueError If ``context`` is not a ``VideoAnalyzer`` field """ if context is None or context in VideoAnalyzerConfig.__fields__: config = self._step_config( ConfigModel.added > self._added(context), ConfigModel.added, context ) if self._analyzer is not None and config is not None: self._analyzer.set_config(config=config, silent=(context is None)) else: raise ValueError(f"Invalid redo context '{context}'")
[docs]class History(SessionWrapper): """Interface to the history database """ def __init__(self, path: Path = None): if path is None: path = settings.db.path self._engine = create_engine(f'sqlite:///{str(path)}') Base.metadata.create_all(self._engine) self._session_factory = scoped_session(sessionmaker(bind=self._engine))
[docs] def add_video_file(self, path: str) -> VideoFileModel: """Add a video file to the database. Duplicate files are resolved to the original entry.""" file = VideoFileModel(path=path) file.connect(self) file.resolve() return file
[docs] def add_design_file(self, path: str) -> DesignFileModel: """Add a design file to the database. Duplicate files are resolved to the original entry.""" file = DesignFileModel(path=path) file.connect(self) file.resolve() return file
[docs] def add_analysis(self, analyzer: BaseVideoAnalyzer, model: AnalysisModel = None) -> AnalysisModel: if model is None: with self.session() as s: model = AnalysisModel() s.add(model) model.connect(self) model.set_analyzer(analyzer) analyzer.set_model(model) return model
[docs] def fetch_analysis(self, id: int) -> Optional[AnalysisModel]: with self.session() as s: return s.query(AnalysisModel).filter(AnalysisModel.id == id).\ first()
[docs] def fetch_paths(self) -> Dict[str, list]: """Fetch the latest video and design file paths from the database. Number of paths is limited by ``settings.app.recent_files`` """ with self.session() as s: return { 'video_path': [r[0] for r in s.query(VideoFileModel.path).\ order_by(VideoFileModel.used.desc()).\ limit(settings.app.recent_files).all()], 'design_path': [r[0] for r in s.query(DesignFileModel.path). \ order_by(DesignFileModel.used.desc()). \ limit(settings.app.recent_files).all()] }
[docs] def clean(self) -> None: """Clean the database * remove 'analysis' entries with ``<null>`` config * remove 'config' entries with ``<null>`` json * for 'analysis' entries older than ``settings.db.cleanup_interval`` * remove all non-primary 'config' entries * remove all non-primary 'results' entries """ log.debug(f"cleaning history") threshold = datetime.datetime.now() - datetime.timedelta( days=settings.db.cleanup_interval ) with self.session() as s: s.query(ConfigModel).filter_by(json=None).delete() s.query(AnalysisModel).filter_by(config=None).delete() for old in s.query(AnalysisModel).\ filter(AnalysisModel.modified < threshold): s.query(ConfigModel). \ filter(ConfigModel.analysis == old.id). \ filter(ConfigModel.id != old.config).delete() s.query(ResultsModel). \ filter(ResultsModel.analysis == old.id). \ filter(ResultsModel.id != old.results).delete()
[docs] def forget(self) -> None: """Remove everything.""" models = [ AnalysisModel, VideoFileModel, DesignFileModel, ConfigModel, ResultsModel ] with self.session() as s: for model in models: s.query(model).delete()