Source code for isimple.core.streaming

import abc
import json
from typing import Optional, Tuple, Generator, Callable, Dict, Type, Any, Union, List
from functools import wraps

from isimple import get_logger
from isimple.core import _Streaming, Lockable

from isimple.util import Singleton
from isimple.util.meta import unbind
import queue

import threading
import time

import numpy as np
import cv2


# cheated off of https://www.pyimagesearch.com/2019/09/02/opencv-stream-video-to-web-browser-html-page/

log = get_logger(__name__)


[docs]class BaseStreamer(abc.ABC): _queue: queue.Queue _stop: threading.Event _paused: bool _empty_queue_timeout: float = 0.02 _stop_timeout: float = 60 _boundary: Optional[bytes] = None _content_type: Optional[bytes] = None _mime_type: Optional[str] = None _double_yield: bool = False def __init__(self): self._queue = queue.Queue() self._stop = threading.Event() self._paused = False
[docs] def push(self, value: Any): if self._validate(value): self._queue.put(value) else: log.warning(f"{self.__class__.__name__}: skipping invalid value")
[docs] def stream(self) -> Generator[Any, None, None]: self._stop.clear() while not self._stop.is_set(): if not self._queue.empty(): value = self._queue.get() output = self._decorate(self._encode(value)) if output is not None: log.vdebug(f"{self}: yielding...") yield output if self._double_yield: yield output # todo: image streaming doesn't work properly if not yielded twice for some reason else: log.warning(f"{self.__class__.__name__}: encoding failed for {value}") continue else: time.sleep(self._empty_queue_timeout)
[docs] def stop(self): self._stop.set() with self._queue.mutex: self._queue.queue.clear()
[docs] @classmethod def mime_type(cls) -> str: if cls._mime_type is None: assert cls._boundary is not None return f"multipart/x-mixed-replace; boundary={cls._boundary.decode('utf-8')}" else: return cls._mime_type
[docs] @classmethod def content_type(cls): return cls._content_type
@abc.abstractmethod def _validate(self, value: Any) -> bool: raise NotImplementedError @abc.abstractmethod def _encode(self, value: Any) -> Optional[bytes]: raise NotImplementedError @abc.abstractmethod def _decorate(self, value: Optional[bytes]) -> Optional[bytes]: raise NotImplementedError
[docs]class JsonStreamer(BaseStreamer): _boundary = b"data" _mime_type = "text/event-stream" def _validate(self, value: Any) -> bool: return isinstance(value, dict) def _encode(self, value: dict) -> Optional[bytes]: try: return json.dumps(value).encode('utf-8') except Exception: # todo: make more specific return None def _decorate(self, value: Optional[bytes]) -> Optional[bytes]: if value is not None: return b"data: " + value + b"\n\n" else: return None
[docs]class EventStreamer(JsonStreamer):
[docs] def event(self, category: str, id: str, data: Any): """Push a JSON event :param category: event category :param id: UUID of event source :param data: event data :return: """ log.debug(f"pushing event - id:{id} category:{category} data:{data}") self.push({'category': category, 'id': id, 'data': data})
[docs]class FrameStreamer(BaseStreamer): _boundary = b"frame" _empty_queue_timeout: float = 0.02 _stop_timeout: float = 60 _double_yield = True def _validate(self, value: Any) -> bool: return isinstance(value, np.ndarray) def _decorate(self, data: Optional[bytes]) -> Optional[bytes]: if data is not None: try: return ( b"--frame\r\nContent-Type: " + self.content_type() + b"\r\n\r\n" + bytearray(data) + b"\r\n" ) except Exception as e: log.error(e) return None else: return None
[docs]class JpegStreamer(FrameStreamer): # todo: configure quality in settings _content_type = b"image/jpeg" def _encode(self, frame: np.ndarray) -> Optional[bytes]: # Assuming HSV input frame, cv2.imencode works with BGR (success, encoded_frame) = cv2.imencode( ".jpg", cv2.cvtColor(frame, cv2.COLOR_HSV2BGR), params = [cv2.IMWRITE_JPEG_QUALITY, 85] ) if success: return encoded_frame else: return None
_stream_mapping: dict = { _Streaming('json'): JsonStreamer, _Streaming('image'): JpegStreamer, }
[docs]class StreamHandler(Lockable): """A singleton object to handle streaming frames from methods """ __metaclass__ = Singleton _streams: Dict[object, Dict[Callable, BaseStreamer]] def __init__(self): super().__init__() self._streams = {}
[docs] def register(self, instance: object, method, stream_type: Type[BaseStreamer] = None) -> BaseStreamer: """Register `method`, start a streamer. If `method` has been registered already, return its streamer. """ with self.lock(): method = unbind(method) if self.is_registered(instance, method): stream = self._streams[instance][method] else: if hasattr(method, '_endpoint'): stream_type = _stream_mapping[method._endpoint.streaming] if stream_type is not None: stream = stream_type() if instance not in self._streams: self._streams[instance] = {} self._streams[instance][method] = stream log.debug(f'registering {instance}, {method} as {stream}') self.push(instance, method, method(instance)) else: raise ValueError('cannot resolve stream type') return stream
[docs] def is_registered(self, instance: object, method = None) -> bool: if instance in self._streams: if method is not None: return method in self._streams[instance] else: return True else: return False
[docs] def push(self, instance: object, method, data): """If `method` is registered, push `data` to its streamer. """ method = unbind(method) if isinstance(method, list): for m in method: if self.is_registered(instance, m): log.debug(f"pushing {m.__qualname__} to {self._streams[instance][m]}") self._streams[instance][m].push(data) else: if self.is_registered(instance, method): log.debug(f"pushing {method.__qualname__} to {self._streams[instance][method]}") self._streams[instance][method].push(data)
[docs] def unregister(self, instance: object, method = None): """Unregister `method`: stop its streamer & delete """ # todo: should unregister explicitly e.g. when closing a page def _unregister(method): method = unbind(method) if self.is_registered(instance, method): log.debug(f'unregistering {instance}, {method}') self._streams[instance][method].stop() del self._streams[instance][method] with self.lock(): if method is not None: _unregister(method) else: for method in self._streams[instance].values(): _unregister(method)
[docs] def update(self): try: for instance in self._streams.keys(): for method in self._streams[instance].keys(): try: log.debug(f'updating {instance}, {method}') self.push(instance, method, method(instance)) except Exception as e: log.error(f"{e} occured @ {instance}, {method}") except RuntimeError: log.debug(f"new stream opened while updating") # Repeat the update. This doesn't happen too often, # so don't worry about the performance hit. self.update() # Recursion could be problematic if too many streams are opened # within a short time span, but this shouldn't be an issue. except Exception as e: log.error(f"{e} occurred")
[docs] def stop(self): for instance in list(self._streams): self.unregister(instance)
# Global StreamHandler instance streams = StreamHandler()
[docs]def stream(method): """Decorator for streaming methods. To stream frames, the wrapped method should be registered in the global StreamHandler `streams`. """ @wraps(method) def wrapped_method(*args, **kwargs): data = method(*args, **kwargs) # Push data to streamer streams.push( instance = args[0], method = unbind(method), data = data ) return data # Pass on attributes from `method` to `wrapped_method` todo: this is *very* wonky! for (attr, value) in method.__dict__.items(): setattr(wrapped_method, attr, value) return wrapped_method