# cheated off of https://testdriven.io/blog/developing-a-single-page-app-with-flask-and-vuejs/
# cheated off of https://stackoverflow.com/questions/39801718
import json
import pickle
import os
import time
import subprocess
from threading import Thread, Event, Lock
from typing import Dict, Any, List, Optional
from enum import IntEnum
import cv2
from flask import Flask, send_from_directory, jsonify, request, Response, make_response
import waitress
from OnionSVG import check_svg
import isimple
import isimple.config
import isimple.util as util
import isimple.util.filedialog
import isimple.core.backend as backend
import isimple.core.streaming as streaming
import isimple.db as db
import isimple.video as video
import isimple.plugins as plugins
log = isimple.get_logger(__name__)
UI = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
, 'ui', 'dist'
)
[docs]def respond(*args) -> str:
return jsonify(*args)
[docs]def restart_server(host: str, port: int):
log.info('restarting server...')
subprocess.Popen(
f'sleep 1; python .venv.py .server.py --host {host} --port {port}',
shell=True,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
[docs]class ServerThread(Thread, metaclass=util.Singleton):
_app: Flask
_host: str
_port: int
def __init__(self, app, host, port):
self._app = app
self._host = host
self._port = port
super().__init__(daemon=True)
[docs] def run(self):
try:
waitress.serve(
self._app,
host=self._host,
port=self._port,
threads=32,
)
except OSError:
log.warning('address already in use')
self.stop()
[docs] def stop(self):
os._exit(0)
[docs]class QueueState(IntEnum):
STOPPED = 0
RUNNING = 1
PAUSED = 2
[docs]class Main(isimple.core.Lockable):
__metaclass__ = util.Singleton
_app: Flask
_roots: Dict[str, backend.BaseVideoAnalyzer] = {}
_models: Dict[str, db.AnalysisModel] = {}
_history: db.History
_server: ServerThread
_lock = Lock()
_ping = Event()
_unload = Event()
_quit = Event()
_done = Event()
_timeout_suppress = 0.5 # todo: load from settings.yaml
_timeout_unload = 5 # todo: load from settings.yaml
_timeout_loop = 0.1 # todo: load from settings.yaml
_stop_log: Event
_eventstreamer = streaming.EventStreamer()
_q_state: int
_pause_q: Event
_stop_q: Event
_host: str
_port: int
def __init__(self):
self._history = db.History()
super().__init__()
app = Flask(__name__, static_url_path='')
app.config.from_object(__name__)
self._stop_log = Event() # todo: these could be class attributes instead
self._pause_q = Event()
self._stop_q = Event()
self._q_state = QueueState.STOPPED
# Serve webapp (bypassed when frontend runs in development mode)
@app.route('/', methods=['GET'])
def index_html():
log.debug(f"Serving 'index.html'")
return send_from_directory(UI, 'index.html')
@app.route('/<file>')
@app.route('/<directory1>/<file>')
@app.route('/<directory1>/<directory2>/<file>')
def get_file(file, directory1 = '', directory2 = ''):
directory = os.path.join(UI, directory1, directory2)
log.debug(f"Serving '{os.path.join(directory,file)}'")
return send_from_directory(directory, file)
# API: general
def active():
if self._unload.is_set():
log.debug('Incoming traffic - cancelling quit.')
self._unload.clear()
self._ping.set()
@app.route('/api/ping', methods=['GET'])
def ping():
log.vdebug('received ping')
active()
return respond(True)
@app.route('/api/pid_hash', methods=['GET'])
def get_pid_hash():
import hashlib
return hashlib.sha1(bytes(os.getpid())).hexdigest() + '\n'
@app.route('/api/unload', methods=['POST'])
def unload():
self.save_state()
self._unload.set()
return respond(True)
@app.route('/api/quit', methods=['POST'])
def quit():
self._quit.set()
return respond(True)
@app.route('/api/restart', methods=['POST'])
def restart():
quit()
while not self._done.is_set():
pass
restart_server(self._host, self._port)
return respond(True)
@app.route('/api/settings_schema')
def settings_schema():
return respond(isimple.settings.schema())
@app.route('/api/get_settings', methods=['GET'])
def get_settings():
return respond(isimple.settings.to_dict())
@app.route('/api/set_settings', methods=['POST'])
def set_settings():
# todo: check if anything's changed, don't reload if not!
new_settings = json.loads(request.data)['settings']
log.info(f'setting settings: {new_settings}')
isimple.update_settings(new_settings)
restart()
return respond(isimple.settings.to_dict())
@app.route('/api/options/<for_type>', methods=['GET'])
def get_options(for_type): # todo: should be a single-call endpoint, /api/schemas
active()
log.debug(f"get options for '{for_type}'")
if for_type == "state":
return respond(dict(video.AnalyzerState.__members__))
elif for_type == "analyzer":
return respond(video.AnalyzerType().options)
elif for_type == "feature":
ft = video.FeatureType()
features = [video.FeatureType(k).get() for k in ft.options]
return respond({
'options': ft.options,
'labels': {
k: feature.label() for k, feature
in zip(ft.options, features)
},
'units': {
k: feature.unit() for k, feature
in zip(ft.options, features)
},
'descriptions': {
k: feature.description() for k, feature
in zip(ft.options, features)
},
'parameters': { # todo: replace with JSON schema ~ pydantic
k: list(feature._config_class.__fields__.keys()) for k, feature
in zip(ft.options, features)
},
'parameter_defaults': { # todo: replace with JSON schema ~ pydantic
k: {par: field.default for par, field in feature._config_class.__fields__.items()} for k, feature
in zip(ft.options, features)
},
'parameter_descriptions': { # todo: replace with JSON schema ~ pydantic
k: {par: field.field_info.description for par, field in feature._config_class.__fields__.items()} for k, feature
in zip(ft.options, features)
},
})
elif for_type == "frame_interval_setting":
fis = video.FrameIntervalSetting()
return respond(
{'options': fis.options, 'descriptions': fis.descriptions}
)
elif for_type == "filter":
return respond({
'options': video.FilterType().options,
'descriptions': video.FilterType().descriptions
})
elif for_type == "transform":
return respond({
'options': video.TransformType().options,
'descriptions': video.TransformType().descriptions
})
elif for_type == "paths":
return respond(self._history.fetch_paths())
elif for_type == "config":
return respond(
backend.AnalyzerType().config_schema() # todo: { AnalyzerType:<schema> }
)
else:
raise ValueError(f"No options for '{for_type}'")
@app.route('/api/select_video_path', methods=['GET'])
def select_video():
return respond(isimple.util.filedialog.select_video()) # todo: should not be able to spawn multiple windows
@app.route('/api/select_design_path', methods=['GET']) # todo: should not be able to spawn multiple windows
def select_design():
return respond(isimple.util.filedialog.select_design())
@app.route('/api/check_video_path', methods=['PUT'])
def check_video():
return respond(self.check_video_path(json.loads(request.data)['video_path']))
@app.route('/api/check_design_path', methods=['PUT'])
def check_design():
return respond(self.check_design_path(json.loads(request.data)['design_path']))
@app.route('/api/start', methods=['POST'])
def start():
data = json.loads(request.data)
if 'queue' in data:
queue = data['queue']
else:
queue = list(self._roots.keys())
return respond(self.q_start(queue))
@app.route('/api/stop', methods=['POST'])
def stop():
return respond(self.q_stop())
# API: working with Analyzer instances
@app.route('/api/init', methods=['POST'])
def init(): # todo: also add a model instance to self._models
active()
if 'type' in request.args.to_dict():
bt = request.args.to_dict()['type']
else:
bt = None
return respond(self.add_instance(video.AnalyzerType(bt)))
@app.route('/api/<id>/launch', methods=['POST'])
def launch(id: str):
active()
return respond(self.call(id, 'launch', {}))
@app.route('/api/<id>/can_launch', methods=['GET'])
def can_launch(id: str):
return respond(self.call(id, 'can_launch', {}))
@app.route('/api/<id>/get_state', methods=['GET'])
def get_state(id: str):
return respond(self._roots[id].state)
@app.route('/api/<id>/remove', methods=['POST'])
def remove(id: str):
return respond(self.remove_instance(id))
@app.route('/api/<id>/call/<endpoint>', methods=['GET','PUT','POST'])
def call(id: str, endpoint: str):
active()
if request.data:
data = json.loads(request.data)
else:
data = {k:json.loads(v) for k,v in request.args.to_dict().items() if v != ''}
result = self.call(id, endpoint, data)
if result is None:
result = True
if isinstance(result, bytes):
return make_response(result)
else:
return respond(result)
# API: streaming
@app.route('/api/<id>/stream/<endpoint>', methods=['GET'])
def stream(id: str, endpoint: str):
"""Start streaming data ~ id & endpoint
:param id: analyzer UUID
:param endpoint: string corresponding to an attribute of isimple.endpoints.BackendRegistry
"""
# todo: sanity check if `endpoint' is streamable
stream = self.stream(id, endpoint)
if stream is not None:
response = Response(
stream.stream(),
mimetype = stream.mime_type(),
)
# response.cache_control.no_cache = True
return response
else:
return respond(None)
@app.route('/api/<id>/stream/<endpoint>/stop', methods=['GET'])
def stop_stream(id: str, endpoint: str):
"""Stop streaming data ~ id & endpoint
:param id: analyzer UUID
:param endpoint: string corresponding to an attribute of isimple.endpoints.BackendRegistry
"""
# todo: sanity check if `endpoint' is streamable
self.stop_stream(id, endpoint)
return respond(True)
@app.route('/api/stream/events', methods=['GET'])
def stream_events():
"""Stream application events (
"""
return Response(
self.events.stream(),
mimetype = self.events.mime_type()
)
# API: utility
@app.route('/api/app_state', methods=['GET'])
def get_app_state():
"""List instances in self._roots
"""
active()
return respond({
'q_state': self._q_state,
'ids': [k for k in self._roots.keys()],
'status': [a.status() for a in self._roots.values()],
})
@app.route('/api/get_log')
def get_log():
# todo: move to core.streaming
# todo: try to refactor ~ isimple.core.streaming
"""Start streaming log file
"""
# cheated off of https://stackoverflow.com/questions/35540885/
log.debug("streaming log file")
# Stop previous log reader if active
if hasattr(self, '_stop_log'):
self._stop_log.set()
self._stop_log.clear()
def generate():
with open(isimple.settings.log.path) as f:
while not self._stop_log.is_set():
yield f.read()
time.sleep(1)
response = Response(generate(), mimetype='text/plain')
response.headers['Content-Disposition'] = \
'attachment; filename=current.log'
return response
@app.route('/api/stop_log', methods=['PUT'])
def stop_log():
"""Stop streaming log file
"""
log.debug("stopping log file stream")
self._stop_log.set()
return respond(True)
# API: application
@app.route('/api/app-state/save')
def save_state():
with self.lock():
self.save_state()
return respond(True)
@app.route('/api/app-state/load')
def load_state():
with self.lock():
self.load_state()
return respond(True)
@app.route('/api/cache/clear', methods=['POST'])
def clear_cache():
log.info('clearing cache')
cache = isimple.get_cache(isimple.settings)
cache.clear()
cache.close()
return respond(True)
@app.route('/api/db/clear', methods=['POST'])
def clear_db():
log.info('clearing database')
with self.lock():
self._history.forget()
return respond(True)
@app.before_first_request
def initialize():
self._history.clean()
self.load_state()
self._app = app
[docs] def serve(self, host, port):
"""Serve the application
Parameters
----------
host: str
Host address
port: int
Host port
"""
# Don't show waitress console output (server URL)
self._host = host
self._port = port
log.info(f"serving on http://{host}:{port}")
with util.suppress_stdout():
self._server = ServerThread(self._app, host, port)
self._server.start()
time.sleep(self._timeout_suppress) # Wait for Waitress to catch up
try:
while not self._quit.is_set():
if self._ping.is_set():
self._ping.clear()
if self._unload.is_set():
log.debug(f'Unloaded from browser, waiting for traffic.')
time.sleep(self._timeout_unload)
if not self._ping.is_set():
log.debug(f'No traffic for {self._timeout_unload} seconds - quitting...')
self._quit.set()
time.sleep(self._timeout_loop)
except KeyboardInterrupt:
log.info('interrupted by user')
self._done.set()
self.save_state()
streaming.streams.stop()
log.info('Main.serve() stopped.')
[docs] def check_video_path(self, path: str) -> bool:
"""Check whether the path is a valid video and add it to
the history database"""
try:
log.debug(f"Checking video file {path}")
if os.path.isfile(path):
try:
capture = cv2.VideoCapture(path)
if int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) > 0:
self._history.add_video_file(path)
return True
finally:
pass
except KeyError:
pass
return False
[docs] def check_design_path(self, path: str) -> bool:
"""Check whether the path is a valid design and add it to
the history database"""
try:
log.debug(f"Checking design file {path}")
if os.path.isfile(path):
try:
check_svg(path)
self._history.add_design_file(path)
return True
finally:
pass
except KeyError:
pass
return False
[docs] def add_instance(self, type: video.AnalyzerType = None) -> str:
"""Add a new analyzer instance
Parameters
----------
type: AnalyzerType
Type of ``BaseVideoAnalyzer`` to instantiate
Returns
-------
str
The ``id`` of the new analyzer
"""
with self.lock():
if type is None:
type = video.AnalyzerType()
analyzer = type.get()()
analyzer.set_eventstreamer(self._eventstreamer)
log.info(f"Adding {{'{analyzer.id}': {analyzer}}}")
self._roots[analyzer.id] = analyzer
assert isinstance(analyzer, video.VideoAnalyzer)
self._models[analyzer.id] = self._history.add_analysis(analyzer)
self.save_state()
return analyzer.id
[docs] def remove_instance(self, id: str) -> bool:
"""Remove a analyzer instance
Parameters
----------
id: str
The ``id`` of the analyzer to remove
"""
with self.lock():
if self.valid(id):
log.info(f"Removing '{id}'")
analyzer = self._roots.pop(id)
with analyzer.lock():
analyzer.commit()
del analyzer
self.save_state()
return True
else:
raise ValueError
[docs] def q_start(self, q: List[str]) -> bool:
"""Queue analysis
Parameters
----------
q: List[str]
List of analyzer ``id`` to queue.
"""
if self._q_state == QueueState.STOPPED:
done = False
if all(self._roots[id].can_analyze for id in q): # todo: handle non-id entries in q
log.info(f"analyzing queue: {q}")
for id in q:
while self._pause_q.is_set():
self._q_state = QueueState.PAUSED
time.sleep(0.5)
self._q_state = QueueState.RUNNING
if self._stop_q.is_set():
done = False
break
if not self._roots[id].done:
self._roots[id].analyze()
else:
log.info(f"skipping {id}")
done = True
self._pause_q.clear()
self._stop_q.clear()
self._q_state = QueueState.STOPPED
return done
else:
log.info(f"CAN'T ANALYZE FOR ALL ANALYZERS")
return False
else:
log.info(f"already started analyzing queue!")
return False
[docs] def q_stop(self):
"""Stop analysis queue"""
log.info('stopping analysis queue')
if self._pause_q.is_set():
self._pause_q.clear()
self._stop_q.set()
def _commit(self):
for root in self._roots.values():
root.commit()
[docs] def save_state(self):
"""Save application state to ``isimple.settings.app.state_path``
"""
if isimple.settings.app.save_state:
log.info("saving application state")
self._commit()
s = {
id: root.model.get('id')
for id,root in self._roots.items()
if not root.done
}
with open(isimple.settings.app.state_path, 'wb') as f:
pickle.dump(s, f)
[docs] def load_state(self):
"""Load application state from ``isimple.settings.app.state_path``"""
if isimple.settings.app.load_state:
with self.lock():
log.info("loading application state")
# todo: check if instances retain reference to self._eventstreamer!
try:
with open(isimple.settings.app.state_path, 'rb') as f:
S = pickle.load(f)
for id,model_id in S.items():
assert isinstance(id, str)
assert isinstance(model_id, int)
model = self._history.fetch_analysis(model_id)
if model is not None:
model.connect(self._history)
analyzer = video.init(isimple.config.loads(model.get_config_json()))
analyzer._set_id(id)
analyzer.set_eventstreamer(self._eventstreamer)
analyzer.launch()
self._roots[id] = analyzer
self._models[id] = self._history.add_analysis(analyzer, model)
except FileNotFoundError:
pass
except EOFError:
pass
[docs] def call(self, id: str, endpoint: str, data: dict = None) -> Any:
"""Call an analyzer endpoint
Parameters
----------
id: str
Analyzer id
endpoint: str
Endpoint name, should correspond to a :class:`isimple.endpoints.BackendRegistry`
attribute
data
"""
if data is None:
data = {}
if self.valid(id):
t0 = time.time()
log.debug(f"{self._roots[id]}: call '{endpoint}'")
# todo: sanity check this
method = self._roots[id].get(getattr(backend.backend, endpoint))
result = method(**data)
log.debug(f"{self._roots[id]}: return '{endpoint}' "
f"({time.time() - t0} s elapsed)")
return result
else:
return None
[docs] def stream(self, id: str, endpoint: str) -> Optional[streaming.BaseStreamer]: # todo: extend to handle json streaming also
with self.lock():
if self.valid(id):
# todo: sanity check this also
method = self._roots[id].get(getattr(backend.backend, endpoint)) # todo: check whether endpoint.streaming is not _Streaming('off')
self._roots[id].cache_open()
new_stream = streaming.streams.register(method.__self__, method) # type: ignore
log.debug(f"{self._roots[id]}: stream '{endpoint}'")
return new_stream
else:
return None
[docs] def stop_stream(self, id: str, endpoint: str):
with self.lock():
if self.valid(id):
method = self._roots[id].get(getattr(backend.backend, endpoint))
# todo: type / assert properly
streaming.streams.unregister(method.__self__, method) # type: ignore
log.debug(f"{self._roots[id]}: stopped streaming '{endpoint}'")
[docs] def valid(self, id):
if id in self._roots:
return True
else:
log.warning(f"{id} is not a valid analyzer id or refers to an analyzer from a previous session")
return False
@property
def events(self) -> streaming.EventStreamer:
return self._eventstreamer
wsgi = Main()._app