Source code for isimple.util

import os
import sys
import time
import json
from distutils.util import strtobool
from functools import wraps, lru_cache
from typing import Any, Generator, Optional
from collections import namedtuple
import multiprocessing
import threading
import queue
import hashlib
from contextlib import contextmanager
import diskcache

import numpy as np

from isimple import get_logger, settings, Logger

log = get_logger(__name__)


[docs]def str2bool(value: str) -> bool: return strtobool(value)
[docs]def as_string(value: Any) -> str: """Redirect `dbcore` calls to [`beets.util.as_string`](https://github.com/beetbox/beets/blob/545c65d903e38d37fd2c1734ec69eac609bea035/beets/util/__init__.py#L717-L733) Remove Python 2.7 compatibility """ if value is None: return u'' elif isinstance(value, memoryview): return bytes(value).decode('utf-8', 'ignore') elif isinstance(value, bytes): return value.decode('utf-8', 'ignore') else: return str(value)
[docs]def ndarray2str(array: np.ndarray) -> str: return str(json.dumps(array.tolist()))
[docs]def str2ndarray(string: str) -> np.ndarray: return np.array(json.loads(str(string)))
[docs]def restrict(val, minval, maxval): """https://stackoverflow.com/questions/4092528 """ if val < minval: return minval if val > maxval: return maxval return val
[docs]def rotations(sequence) -> list: # todo: clean up """Returns all rotations of a list. """ def rotate(seq, n: int) -> list: return seq[n:] + seq[:n] rotation_list = [] for N in range(len(sequence)): rotation_list.append(rotate(sequence, N)) return rotation_list
Timing = namedtuple('Timing', ('t0', 't1', 'elapsed'))
[docs]def timed(f): """Function decorator to measure elapsed time. """ @wraps(f) def wrap(*args, **kwargs): ts = time.time() result = f(*args, **kwargs) te = time.time() log.info(f"{f.__qualname__}() --> {te-ts} s elapsed.") return result return wrap
[docs]def logged(f): """Function decorator to log before & after call """ @wraps(f) def wrap(*args, **kwargs): log.debug(f"{f.__qualname__}() --> call...") result = f(*args, **kwargs) log.debug(f"{f.__qualname__}() --> done") return result return wrap
[docs]class Timer(object): _t0: float _t1: float _parent: object _logger: Logger _message: str _elapsed: Optional[float] def __init__(self, parent: object): self._parent = parent self._elapsed = None self.set_logger() def __enter__(self, message: str = ''): self._message = message self._t0 = time.time() self._logger.info(f"{self._message}...") def __exit__(self): if hasattr(self, '_t0'): self._t1 = time.time() self._elapsed = self._t1 - self._t0 self._logger.info(f"{self._message}: {self._elapsed} s. elapsed ")
[docs] def set_logger(self, logger: Logger = log): self._logger = logger
@property def timing(self) -> Optional[tuple]: if all([hasattr(self, attr) for attr in ('_t0', '_t1', '_elapsed')]): return self._t0, self._t1, self._elapsed else: return None
[docs]def frame_number_iterator(total: int, Nf: int = None, dt: float = None, fps: float = None) \ -> Generator[int, None, None]: if Nf is not None and (dt is None and fps is None): # todo: a bit awkward, make two methods instead? Nf = min(Nf, total) for f in np.linspace(0, total, Nf): yield int(f) elif (dt is not None and fps is not None) and Nf is None: df = restrict(dt * fps, 1, total) for f in np.arange(0, total, df): yield int(f) else: ValueError()
[docs]def before_version(version_a, version_b): """Check whether `version_a` precedes `version_b`. Only handles numerics, i.e. no '1.25b.3v7' """ return tuple(int(s) for s in version_a.split('.')) \ < tuple(int(s) for s in version_b.split('.'))
[docs]def after_version(version_a, version_b): return not before_version(version_a, version_b)
[docs]def hash_file(path: str, blocksize: int = 1024) -> queue.Queue: if os.path.isfile: q: queue.Queue = queue.Queue() def _hash_file(): nonlocal q m = hashlib.sha1() with open(path, 'rb') as f: while True: buf = f.read(blocksize) if not buf: break m.update(buf) hash = m.hexdigest() q.put(hash) threading.Thread(target=_hash_file, daemon=True).start() return q
[docs]@contextmanager def suppress_stdout(): """https://stackoverflow.com/questions/2125702/ """ with open(os.devnull, "w") as devnull: old_stdout = sys.stdout sys.stdout = devnull try: yield finally: sys.stdout = old_stdout
[docs]def sizeof_fmt(num, suffix='B'): for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']: if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, 'Yi', suffix)
[docs]class Singleton(type): _instances: dict = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls]