photolib/photoapp/thumbserver.py
2021-08-25 23:21:11 -07:00

351 lines
12 KiB
Python

import os
import re
import logging
import cherrypy
import shutil
import tempfile
import traceback
import requests
import subprocess
from concurrent.futures import ThreadPoolExecutor
from threading import Thread, Semaphore
from contextlib import closing
from queue import LifoQueue, Empty
from shutil import copyfileobj
from urllib.parse import urlparse
from photoapp.dbutils import SAEnginePlugin, SATool, get_db_engine, create_db_sessionmaker
from photoapp.storage import uri_to_storage
from photoapp.types import User, Photo
from photoapp.common import pwhash
from photoapp.dbsession import DatabaseSession
from photoapp.thumb import thumb_path, image_file_style
from photoapp.webutils import validate_password
from photoapp.utils import genpw, get_extension
THUMBSERVICE_USER_INTERNAL = "_thumbservice"
RE_DURATION = re.compile(r' Duration: (?P<hours>\d\d):(?P<minutes>\d\d):(?P<seconds>\d\d).(?P<decseconds>\d\d),')
def get_video_duration(srcpath, timeout=30):
"""
Get the duration of a video, in seconds, by parsing ffmpeg stderr output line:
Duration: 00:00:00.94, start: 0.000000, bitrate: 15046 kb/s
"""
cmd = [
"ffmpeg",
"-hide_banner",
"-i", srcpath,
]
try:
p = subprocess.run(cmd, timeout=timeout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.TimeoutExpired:
logging.error("ffmpeg length: timed out")
return 0.0
if p.returncode != 1:
logging.error("ffmpeg length: unexpected return code %s", p.returncode)
logging.error("ffmpeg stdout: %s", p.stdout)
logging.error("ffmpeg stderr: %s", p.stderr)
return 0.0
if not p.stderr:
logging.error("ffmpeg length: no stderr")
return 0.0
stderr = p.stderr.decode()
match = RE_DURATION.search(stderr)
if not match:
logging.error("ffmpeg length: could not find duration")
logging.error("ffmpeg stdout: %s", p.stdout)
logging.error("ffmpeg stderr: %s", p.stderr)
return 0.0
times = match.groupdict()
return int(times["hours"]) * 60 * 60 + \
int(times["minutes"]) * 60 + \
int(times["seconds"]) + \
int(times["decseconds"]) / 100
def get_video_thumb(srcpath, outpath, timeout=30):
duration = get_video_duration(srcpath, timeout)
if duration == 0.0:
return False
cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
"-i", srcpath,
"-vframes", "1", # Output one frame
"-an", # Disable audio
# "-s", "400x222" # Output size
"-ss", "1" if duration > 5 else "0",
outpath
]
try:
p = subprocess.run(cmd, timeout=timeout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.TimeoutExpired:
logging.error("ffmpeg: timed out")
return False
if p.returncode != 0 or not os.path.exists(outpath):
logging.error("ffmpeg: no image produced.")
logging.error("ffmpeg return code: %s", p.returncode)
logging.error("ffmpeg stdout: %s", p.stdout)
logging.error("ffmpeg stderr: %s", p.stderr)
return False
return True
def setup_thumb_user(engine):
# create the internal User used to talk to this service
with closing(create_db_sessionmaker(engine)()) as s:
u = s.query(User).filter(User.name == THUMBSERVICE_USER_INTERNAL).first()
if u:
return
password_hash = os.environ.get("THUMBSERVICE_INITIAL_PASSWORD_HASH")
if not password_hash:
password = genpw()
logging.warning("created thumbserver user: %s:%s", THUMBSERVICE_USER_INTERNAL, password)
password_hash = pwhash(password)
s.add(User(name=THUMBSERVICE_USER_INTERNAL, password=password_hash))
s.commit()
def validate_thumbservice_password(realm, username, password):
if username != THUMBSERVICE_USER_INTERNAL:
return False
return validate_password(realm, username, password)
class ThumbWorker(Thread):
def __init__(self, engine, library, cache, max_workers=4):
super().__init__()
self.daemon = True
self.queue = LifoQueue()
self.engine = engine
self.library = library
self.cache = cache
self.max_workers = max_workers
self.throttle = Semaphore(max_workers)
def run(self):
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
while True:
try:
item = self.queue.get(block=True, timeout=5.0)
except Empty:
continue
# semaphore is used so that the queue is not immediately consumed into waiting Futures in the pool.
# this is to preserve the LIFO behavior of the queue
self.throttle.acquire()
pool.submit(self.handle_task, item)
qlen = self.queue.qsize()
if qlen:
logging.info("images to process: %s", qlen)
def handle_task(self, item):
image_uuid, style_name = item
try:
with (
closing(create_db_sessionmaker(self.engine)()) as s,
tempfile.TemporaryDirectory() as d,
):
self.do_thumb(image_uuid, style_name, s, d)
except:
traceback.print_exc() #TODO something like _failed_thumbs_cache
#TODO handle errors differently, like
# db error -> kill program
# filesystem error -> kill program
# PIL error -> ignore
self.queue.task_done()
self.throttle.release()
def do_thumb(self, image_uuid, style_name, session, tmpdir):
"""
Generate a thumbnail for the given image identified by uuid
"""
# find the image
image = session.query(Photo).filter(Photo.uuid == image_uuid).first()
if not image:
logging.info("attempted invalid uuid: %s", image_uuid)
return
# Bail if it exists in storage already
cache_path = thumb_path(style_name, image_uuid)
if self.cache.exists(cache_path):
return
# download the image
local_src_path = os.path.join(tmpdir, "input.{}".format(get_extension(image.fname)))
thumb_tmp_path = os.path.join(tmpdir, "thumb.jpg")
# TODO simplify low level operations like this
with (
self.library.open(image.path, "rb") as src,
open(local_src_path, "wb") as dest,
):
shutil.copyfileobj(src, dest)
# generate a still from the image
if not get_video_thumb(local_src_path, thumb_tmp_path):
logging.error("video extraction failed: %s", image_uuid)
return # TODO something like _failed_thumbs_cache
# Do normal cropping of the thumb
thumb_cropped_path = os.path.join(tmpdir, "thumb_cropped.jpg")
image_file_style(thumb_tmp_path, thumb_cropped_path, style_name, image.orientation)
# copy thumbnail to cache storage
with (
open(thumb_cropped_path, 'rb') as fsrc,
closing(self.cache.open(cache_path, 'wb')) as fdest
):
copyfileobj(fsrc, fdest)
logging.info("processed %s: %sb", image_uuid, str(os.path.getsize(thumb_tmp_path)))
class ThumbServiceWeb(object):
def __init__(self, queue_thumbnail):
self.queue_thumbnail = queue_thumbnail
@cherrypy.expose
def index(self):
yield "photoapp thumbnail service OK"
@cherrypy.expose
def thumb(self, uuid, style):
"""
Generate a thumbnail for the file identified. Calling this endpoint adds the image to the queue. Duplicate
requests are OK and are ignored later
"""
self.queue_thumbnail((uuid, style, ))
yield "ok"
class ThumbClient(object):
"""
Client for interacting with the thumbserver api
"""
def __init__(self, server_uri):
self.session = requests.Session()
uri = urlparse(server_uri)
port = uri.port or dict(http=80, https=443)[uri.scheme]
host = f"{uri.scheme}://{uri.hostname}:{port}"
if uri.path:
host = host + "/" + uri.path
if uri.username:
self.session.auth = (uri.username, uri.password, )
self.server_url = host
a = requests.adapters.HTTPAdapter(max_retries=0)
self.session.mount('http://', a)
def request_thumb(self, photo_uuid, style_name):
self.session.get(self.server_url + "/thumb", params=dict(uuid=photo_uuid, style=style_name))
def main():
# this is a slimmed down version of daemon.py TODO dedupe me
import argparse
import signal
parser = argparse.ArgumentParser(description="Photod photo server")
parser.add_argument('-p', '--port', help="tcp port to listen on",
default=int(os.environ.get("THUMBSERVICE_PORT", 8081)), type=int)
parser.add_argument('-l', '--library', default=os.environ.get("STORAGE_URL"), help="library path")
parser.add_argument('-c', '--cache', default=os.environ.get("CACHE_URL"), help="cache url")
# https://docs.sqlalchemy.org/en/13/core/engines.html
parser.add_argument('-s', '--database', help="sqlalchemy database connection uri",
default=os.environ.get("DATABASE_URL")),
parser.add_argument('--debug', action="store_true", help="enable development options")
parser.add_argument('--max-workers', type=int, default=4, help="number of image download/process threads")
args = parser.parse_args()
if not args.database:
parser.error("--database or DATABASE_URL is required")
if not args.library:
parser.error("--library or STORAGE_URL is required")
if not args.cache:
parser.error("--cache or CACHE_URL is required")
logging.basicConfig(level=logging.INFO if args.debug else logging.WARNING,
format="%(asctime)-15s %(levelname)-8s %(filename)s:%(lineno)d %(message)s")
# Get database connection
engine = get_db_engine(args.database)
setup_thumb_user(engine)
# Setup database in web framework
cherrypy.tools.db = SATool()
SAEnginePlugin(cherrypy.engine, engine).subscribe()
# Create various internal tools
library_storage = uri_to_storage(args.library)
cache_storage = uri_to_storage(args.cache)
thumbnail_worker = ThumbWorker(engine, library_storage, cache_storage, args.max_workers)
thumbnail_worker.start()
# Setup and mount web ui
web = ThumbServiceWeb(thumbnail_worker.queue.put)
cherrypy.tree.mount(web, '/', {'/': {'tools.trailing_slash.on': False,
'tools.db.on': True, },
'/thumb': {'tools.auth_basic.on': True,
'tools.auth_basic.realm': 'thumbservice',
'tools.auth_basic.checkpassword': validate_thumbservice_password}})
# General config options
cherrypy.config.update({
'tools.sessions.storage_class': DatabaseSession,
'tools.sessions.on': True,
'tools.sessions.locking': 'explicit',
'tools.sessions.timeout': 525600,
'request.show_tracebacks': True,
'server.socket_port': args.port,
'server.thread_pool': 5,
'server.socket_host': '0.0.0.0',
'server.show_tracebacks': True,
'log.screen': False,
'engine.autoreload.on': args.debug,
})
# Setup signal handling and run it.
def signal_handler(signum, stack):
logging.critical('Got sig {}, exiting...'.format(signum))
cherrypy.engine.exit()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
cherrypy.engine.start()
cherrypy.engine.block()
finally:
logging.info("API has shut down")
cherrypy.engine.exit()
if __name__ == '__main__':
main()