import os import re import logging import cherrypy import shutil import tempfile import traceback import requests import subprocess from concurrent.futures import ThreadPoolExecutor from threading import Thread, Semaphore from contextlib import closing from queue import LifoQueue, Empty from shutil import copyfileobj from urllib.parse import urlparse from photoapp.dbutils import SAEnginePlugin, SATool, get_db_engine, create_db_sessionmaker from photoapp.storage import uri_to_storage from photoapp.types import User, Photo from photoapp.common import pwhash from photoapp.dbsession import DatabaseSession from photoapp.thumb import thumb_path, image_file_style from photoapp.webutils import validate_password from photoapp.utils import genpw, get_extension THUMBSERVICE_USER_INTERNAL = "_thumbservice" RE_DURATION = re.compile(r' Duration: (?P\d\d):(?P\d\d):(?P\d\d).(?P\d\d),') def get_video_duration(srcpath, timeout=30): """ Get the duration of a video, in seconds, by parsing ffmpeg stderr output line: Duration: 00:00:00.94, start: 0.000000, bitrate: 15046 kb/s """ cmd = [ "ffmpeg", "-hide_banner", "-i", srcpath, ] try: p = subprocess.run(cmd, timeout=timeout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except subprocess.TimeoutExpired: logging.error("ffmpeg length: timed out") return 0.0 if p.returncode != 1: logging.error("ffmpeg length: unexpected return code %s", p.returncode) logging.error("ffmpeg stdout: %s", p.stdout) logging.error("ffmpeg stderr: %s", p.stderr) return 0.0 if not p.stderr: logging.error("ffmpeg length: no stderr") return 0.0 stderr = p.stderr.decode() match = RE_DURATION.search(stderr) if not match: logging.error("ffmpeg length: could not find duration") logging.error("ffmpeg stdout: %s", p.stdout) logging.error("ffmpeg stderr: %s", p.stderr) return 0.0 times = match.groupdict() return int(times["hours"]) * 60 * 60 + \ int(times["minutes"]) * 60 + \ int(times["seconds"]) + \ int(times["decseconds"]) / 100 def get_video_thumb(srcpath, outpath, timeout=30): duration = get_video_duration(srcpath, timeout) if duration == 0.0: return False cmd = [ "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", srcpath, "-vframes", "1", # Output one frame "-an", # Disable audio # "-s", "400x222" # Output size "-ss", "1" if duration > 5 else "0", outpath ] try: p = subprocess.run(cmd, timeout=timeout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except subprocess.TimeoutExpired: logging.error("ffmpeg: timed out") return False if p.returncode != 0 or not os.path.exists(outpath): logging.error("ffmpeg: no image produced.") logging.error("ffmpeg return code: %s", p.returncode) logging.error("ffmpeg stdout: %s", p.stdout) logging.error("ffmpeg stderr: %s", p.stderr) return False return True def setup_thumb_user(engine): # create the internal User used to talk to this service with closing(create_db_sessionmaker(engine)()) as s: u = s.query(User).filter(User.name == THUMBSERVICE_USER_INTERNAL).first() if u: return password_hash = os.environ.get("THUMBSERVICE_INITIAL_PASSWORD_HASH") if not password_hash: password = genpw() logging.warning("created thumbserver user: %s:%s", THUMBSERVICE_USER_INTERNAL, password) password_hash = pwhash(password) s.add(User(name=THUMBSERVICE_USER_INTERNAL, password=password_hash)) s.commit() def validate_thumbservice_password(realm, username, password): if username != THUMBSERVICE_USER_INTERNAL: return False return validate_password(realm, username, password) class ThumbWorker(Thread): def __init__(self, engine, library, cache, max_workers=4): super().__init__() self.daemon = True self.queue = LifoQueue() self.engine = engine self.library = library self.cache = cache self.max_workers = max_workers self.throttle = Semaphore(max_workers) def run(self): with ThreadPoolExecutor(max_workers=self.max_workers) as pool: while True: try: item = self.queue.get(block=True, timeout=5.0) except Empty: continue # semaphore is used so that the queue is not immediately consumed into waiting Futures in the pool. # this is to preserve the LIFO behavior of the queue self.throttle.acquire() pool.submit(self.handle_task, item) qlen = self.queue.qsize() if qlen: logging.info("images to process: %s", qlen) def handle_task(self, item): image_uuid, style_name = item try: with ( closing(create_db_sessionmaker(self.engine)()) as s, tempfile.TemporaryDirectory() as d, ): self.do_thumb(image_uuid, style_name, s, d) except: traceback.print_exc() #TODO something like _failed_thumbs_cache #TODO handle errors differently, like # db error -> kill program # filesystem error -> kill program # PIL error -> ignore self.queue.task_done() self.throttle.release() def do_thumb(self, image_uuid, style_name, session, tmpdir): """ Generate a thumbnail for the given image identified by uuid """ # find the image image = session.query(Photo).filter(Photo.uuid == image_uuid).first() if not image: logging.info("attempted invalid uuid: %s", image_uuid) return # Bail if it exists in storage already cache_path = thumb_path(style_name, image_uuid) if self.cache.exists(cache_path): return # download the image local_src_path = os.path.join(tmpdir, "input.{}".format(get_extension(image.fname))) thumb_tmp_path = os.path.join(tmpdir, "thumb.jpg") # TODO simplify low level operations like this with ( self.library.open(image.path, "rb") as src, open(local_src_path, "wb") as dest, ): shutil.copyfileobj(src, dest) # generate a still from the image if not get_video_thumb(local_src_path, thumb_tmp_path): logging.error("video extraction failed: %s", image_uuid) return # TODO something like _failed_thumbs_cache # Do normal cropping of the thumb thumb_cropped_path = os.path.join(tmpdir, "thumb_cropped.jpg") image_file_style(thumb_tmp_path, thumb_cropped_path, style_name, image.orientation) # copy thumbnail to cache storage with ( open(thumb_cropped_path, 'rb') as fsrc, closing(self.cache.open(cache_path, 'wb')) as fdest ): copyfileobj(fsrc, fdest) logging.info("processed %s: %sb", image_uuid, str(os.path.getsize(thumb_tmp_path))) class ThumbServiceWeb(object): def __init__(self, queue_thumbnail): self.queue_thumbnail = queue_thumbnail @cherrypy.expose def index(self): yield "photoapp thumbnail service OK" @cherrypy.expose def thumb(self, uuid, style): """ Generate a thumbnail for the file identified. Calling this endpoint adds the image to the queue. Duplicate requests are OK and are ignored later """ self.queue_thumbnail((uuid, style, )) yield "ok" class ThumbClient(object): """ Client for interacting with the thumbserver api """ def __init__(self, server_uri): self.session = requests.Session() uri = urlparse(server_uri) port = uri.port or dict(http=80, https=443)[uri.scheme] host = f"{uri.scheme}://{uri.hostname}:{port}" if uri.path: host = host + "/" + uri.path if uri.username: self.session.auth = (uri.username, uri.password, ) self.server_url = host a = requests.adapters.HTTPAdapter(max_retries=0) self.session.mount('http://', a) def request_thumb(self, photo_uuid, style_name): self.session.get(self.server_url + "/thumb", params=dict(uuid=photo_uuid, style=style_name)) def main(): # this is a slimmed down version of daemon.py TODO dedupe me import argparse import signal parser = argparse.ArgumentParser(description="Photod photo server") parser.add_argument('-p', '--port', help="tcp port to listen on", default=int(os.environ.get("THUMBSERVICE_PORT", 8081)), type=int) parser.add_argument('-l', '--library', default=os.environ.get("STORAGE_URL"), help="library path") parser.add_argument('-c', '--cache', default=os.environ.get("CACHE_URL"), help="cache url") # https://docs.sqlalchemy.org/en/13/core/engines.html parser.add_argument('-s', '--database', help="sqlalchemy database connection uri", default=os.environ.get("DATABASE_URL")), parser.add_argument('--debug', action="store_true", help="enable development options") parser.add_argument('--max-workers', type=int, default=4, help="number of image download/process threads") args = parser.parse_args() if not args.database: parser.error("--database or DATABASE_URL is required") if not args.library: parser.error("--library or STORAGE_URL is required") if not args.cache: parser.error("--cache or CACHE_URL is required") logging.basicConfig(level=logging.INFO if args.debug else logging.WARNING, format="%(asctime)-15s %(levelname)-8s %(filename)s:%(lineno)d %(message)s") # Get database connection engine = get_db_engine(args.database) setup_thumb_user(engine) # Setup database in web framework cherrypy.tools.db = SATool() SAEnginePlugin(cherrypy.engine, engine).subscribe() # Create various internal tools library_storage = uri_to_storage(args.library) cache_storage = uri_to_storage(args.cache) thumbnail_worker = ThumbWorker(engine, library_storage, cache_storage, args.max_workers) thumbnail_worker.start() # Setup and mount web ui web = ThumbServiceWeb(thumbnail_worker.queue.put) cherrypy.tree.mount(web, '/', {'/': {'tools.trailing_slash.on': False, 'tools.db.on': True, }, '/thumb': {'tools.auth_basic.on': True, 'tools.auth_basic.realm': 'thumbservice', 'tools.auth_basic.checkpassword': validate_thumbservice_password}}) # General config options cherrypy.config.update({ 'tools.sessions.storage_class': DatabaseSession, 'tools.sessions.on': True, 'tools.sessions.locking': 'explicit', 'tools.sessions.timeout': 525600, 'request.show_tracebacks': True, 'server.socket_port': args.port, 'server.thread_pool': 5, 'server.socket_host': '0.0.0.0', 'server.show_tracebacks': True, 'log.screen': False, 'engine.autoreload.on': args.debug, }) # Setup signal handling and run it. def signal_handler(signum, stack): logging.critical('Got sig {}, exiting...'.format(signum)) cherrypy.engine.exit() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: cherrypy.engine.start() cherrypy.engine.block() finally: logging.info("API has shut down") cherrypy.engine.exit() if __name__ == '__main__': main()