photolib/photoapp/thumbserver.py

351 lines
12 KiB
Python
Raw Normal View History

2021-08-18 22:23:43 -07:00
import os
import re
2021-08-18 22:23:43 -07:00
import logging
import cherrypy
import shutil
import tempfile
import traceback
2021-08-20 18:00:13 -07:00
import requests
import subprocess
from concurrent.futures import ThreadPoolExecutor
from threading import Thread, Semaphore
2021-08-18 22:23:43 -07:00
from contextlib import closing
from queue import LifoQueue, Empty
2021-08-20 18:00:13 -07:00
from shutil import copyfileobj
from urllib.parse import urlparse
from photoapp.dbutils import SAEnginePlugin, SATool, get_db_engine, create_db_sessionmaker
2021-08-18 22:23:43 -07:00
from photoapp.storage import uri_to_storage
from photoapp.types import User, Photo
from photoapp.common import pwhash
from photoapp.dbsession import DatabaseSession
2021-08-21 21:29:58 -07:00
from photoapp.thumb import thumb_path, image_file_style
from photoapp.webutils import validate_password
2021-08-25 23:21:11 -07:00
from photoapp.utils import genpw, get_extension
2021-08-18 22:23:43 -07:00
THUMBSERVICE_USER_INTERNAL = "_thumbservice"
RE_DURATION = re.compile(r' Duration: (?P<hours>\d\d):(?P<minutes>\d\d):(?P<seconds>\d\d).(?P<decseconds>\d\d),')
def get_video_duration(srcpath, timeout=30):
"""
Get the duration of a video, in seconds, by parsing ffmpeg stderr output line:
Duration: 00:00:00.94, start: 0.000000, bitrate: 15046 kb/s
"""
cmd = [
"ffmpeg",
"-hide_banner",
"-i", srcpath,
]
try:
p = subprocess.run(cmd, timeout=timeout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.TimeoutExpired:
logging.error("ffmpeg length: timed out")
return 0.0
if p.returncode != 1:
logging.error("ffmpeg length: unexpected return code %s", p.returncode)
logging.error("ffmpeg stdout: %s", p.stdout)
logging.error("ffmpeg stderr: %s", p.stderr)
return 0.0
if not p.stderr:
logging.error("ffmpeg length: no stderr")
return 0.0
stderr = p.stderr.decode()
match = RE_DURATION.search(stderr)
if not match:
logging.error("ffmpeg length: could not find duration")
logging.error("ffmpeg stdout: %s", p.stdout)
logging.error("ffmpeg stderr: %s", p.stderr)
return 0.0
times = match.groupdict()
return int(times["hours"]) * 60 * 60 + \
int(times["minutes"]) * 60 + \
int(times["seconds"]) + \
int(times["decseconds"]) / 100
def get_video_thumb(srcpath, outpath, timeout=30):
duration = get_video_duration(srcpath, timeout)
if duration == 0.0:
return False
2021-08-18 22:23:43 -07:00
cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
2021-08-18 22:23:43 -07:00
"-i", srcpath,
"-vframes", "1", # Output one frame
"-an", # Disable audio
# "-s", "400x222" # Output size
"-ss", "1" if duration > 5 else "0",
2021-08-18 22:23:43 -07:00
outpath
]
try:
p = subprocess.run(cmd, timeout=timeout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.TimeoutExpired:
logging.error("ffmpeg: timed out")
return False
if p.returncode != 0 or not os.path.exists(outpath):
logging.error("ffmpeg: no image produced.")
logging.error("ffmpeg return code: %s", p.returncode)
logging.error("ffmpeg stdout: %s", p.stdout)
logging.error("ffmpeg stderr: %s", p.stderr)
return False
return True
2021-08-18 22:23:43 -07:00
def setup_thumb_user(engine):
# create the internal User used to talk to this service
with closing(create_db_sessionmaker(engine)()) as s:
u = s.query(User).filter(User.name == THUMBSERVICE_USER_INTERNAL).first()
if u:
return
2021-08-25 23:21:11 -07:00
password_hash = os.environ.get("THUMBSERVICE_INITIAL_PASSWORD_HASH")
if not password_hash:
password = genpw()
logging.warning("created thumbserver user: %s:%s", THUMBSERVICE_USER_INTERNAL, password)
password_hash = pwhash(password)
2021-08-25 23:21:11 -07:00
s.add(User(name=THUMBSERVICE_USER_INTERNAL, password=password_hash))
s.commit()
def validate_thumbservice_password(realm, username, password):
if username != THUMBSERVICE_USER_INTERNAL:
return False
return validate_password(realm, username, password)
2021-08-18 22:23:43 -07:00
class ThumbWorker(Thread):
def __init__(self, engine, library, cache, max_workers=4):
2021-08-18 22:23:43 -07:00
super().__init__()
self.daemon = True
self.queue = LifoQueue()
2021-08-18 22:23:43 -07:00
self.engine = engine
self.library = library
self.cache = cache
self.max_workers = max_workers
self.throttle = Semaphore(max_workers)
2021-08-18 22:23:43 -07:00
def run(self):
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
while True:
try:
item = self.queue.get(block=True, timeout=5.0)
except Empty:
continue
# semaphore is used so that the queue is not immediately consumed into waiting Futures in the pool.
# this is to preserve the LIFO behavior of the queue
self.throttle.acquire()
pool.submit(self.handle_task, item)
qlen = self.queue.qsize()
if qlen:
logging.info("images to process: %s", qlen)
def handle_task(self, item):
image_uuid, style_name = item
try:
with (
closing(create_db_sessionmaker(self.engine)()) as s,
tempfile.TemporaryDirectory() as d,
):
self.do_thumb(image_uuid, style_name, s, d)
except:
traceback.print_exc() #TODO something like _failed_thumbs_cache
#TODO handle errors differently, like
# db error -> kill program
# filesystem error -> kill program
# PIL error -> ignore
self.queue.task_done()
self.throttle.release()
2021-08-18 22:23:43 -07:00
def do_thumb(self, image_uuid, style_name, session, tmpdir):
"""
Generate a thumbnail for the given image identified by uuid
"""
# find the image
image = session.query(Photo).filter(Photo.uuid == image_uuid).first()
if not image:
logging.info("attempted invalid uuid: %s", image_uuid)
return
2021-08-21 21:29:58 -07:00
# Bail if it exists in storage already
cache_path = thumb_path(style_name, image_uuid)
if self.cache.exists(cache_path):
return
2021-08-18 22:23:43 -07:00
# download the image
2021-08-25 23:21:11 -07:00
local_src_path = os.path.join(tmpdir, "input.{}".format(get_extension(image.fname)))
2021-08-21 21:29:58 -07:00
thumb_tmp_path = os.path.join(tmpdir, "thumb.jpg")
2021-08-25 23:21:11 -07:00
# TODO simplify low level operations like this
2021-08-18 22:23:43 -07:00
with (
self.library.open(image.path, "rb") as src,
open(local_src_path, "wb") as dest,
):
shutil.copyfileobj(src, dest)
# generate a still from the image
if not get_video_thumb(local_src_path, thumb_tmp_path):
logging.error("video extraction failed: %s", image_uuid)
return # TODO something like _failed_thumbs_cache
2021-08-21 21:29:58 -07:00
# Do normal cropping of the thumb
thumb_cropped_path = os.path.join(tmpdir, "thumb_cropped.jpg")
image_file_style(thumb_tmp_path, thumb_cropped_path, style_name, image.orientation)
2021-08-18 22:23:43 -07:00
2021-08-21 21:29:58 -07:00
# copy thumbnail to cache storage
with (
open(thumb_cropped_path, 'rb') as fsrc,
closing(self.cache.open(cache_path, 'wb')) as fdest
):
copyfileobj(fsrc, fdest)
2021-08-18 22:23:43 -07:00
logging.info("processed %s: %sb", image_uuid, str(os.path.getsize(thumb_tmp_path)))
2021-08-18 22:23:43 -07:00
class ThumbServiceWeb(object):
def __init__(self, queue_thumbnail):
self.queue_thumbnail = queue_thumbnail
@cherrypy.expose
def index(self):
yield "photoapp thumbnail service OK"
@cherrypy.expose
def thumb(self, uuid, style):
"""
Generate a thumbnail for the file identified. Calling this endpoint adds the image to the queue. Duplicate
requests are OK and are ignored later
"""
self.queue_thumbnail((uuid, style, ))
yield "ok"
2021-08-20 18:00:13 -07:00
class ThumbClient(object):
"""
Client for interacting with the thumbserver api
"""
def __init__(self, server_uri):
2021-08-20 18:00:13 -07:00
self.session = requests.Session()
uri = urlparse(server_uri)
port = uri.port or dict(http=80, https=443)[uri.scheme]
host = f"{uri.scheme}://{uri.hostname}:{port}"
if uri.path:
host = host + "/" + uri.path
if uri.username:
self.session.auth = (uri.username, uri.password, )
self.server_url = host
2021-08-20 18:00:13 -07:00
a = requests.adapters.HTTPAdapter(max_retries=0)
self.session.mount('http://', a)
def request_thumb(self, photo_uuid, style_name):
self.session.get(self.server_url + "/thumb", params=dict(uuid=photo_uuid, style=style_name))
2021-08-18 22:23:43 -07:00
def main():
# this is a slimmed down version of daemon.py TODO dedupe me
import argparse
import signal
parser = argparse.ArgumentParser(description="Photod photo server")
parser.add_argument('-p', '--port', help="tcp port to listen on",
2021-08-25 23:21:11 -07:00
default=int(os.environ.get("THUMBSERVICE_PORT", 8081)), type=int)
2021-08-18 22:23:43 -07:00
parser.add_argument('-l', '--library', default=os.environ.get("STORAGE_URL"), help="library path")
parser.add_argument('-c', '--cache', default=os.environ.get("CACHE_URL"), help="cache url")
# https://docs.sqlalchemy.org/en/13/core/engines.html
parser.add_argument('-s', '--database', help="sqlalchemy database connection uri",
default=os.environ.get("DATABASE_URL")),
parser.add_argument('--debug', action="store_true", help="enable development options")
parser.add_argument('--max-workers', type=int, default=4, help="number of image download/process threads")
2021-08-18 22:23:43 -07:00
args = parser.parse_args()
if not args.database:
parser.error("--database or DATABASE_URL is required")
if not args.library:
parser.error("--library or STORAGE_URL is required")
if not args.cache:
parser.error("--cache or CACHE_URL is required")
logging.basicConfig(level=logging.INFO if args.debug else logging.WARNING,
format="%(asctime)-15s %(levelname)-8s %(filename)s:%(lineno)d %(message)s")
# Get database connection
engine = get_db_engine(args.database)
setup_thumb_user(engine)
# Setup database in web framework
cherrypy.tools.db = SATool()
SAEnginePlugin(cherrypy.engine, engine).subscribe()
# Create various internal tools
library_storage = uri_to_storage(args.library)
cache_storage = uri_to_storage(args.cache)
thumbnail_worker = ThumbWorker(engine, library_storage, cache_storage, args.max_workers)
2021-08-18 22:23:43 -07:00
thumbnail_worker.start()
# Setup and mount web ui
web = ThumbServiceWeb(thumbnail_worker.queue.put)
cherrypy.tree.mount(web, '/', {'/': {'tools.trailing_slash.on': False,
'tools.db.on': True, },
'/thumb': {'tools.auth_basic.on': True,
'tools.auth_basic.realm': 'thumbservice',
'tools.auth_basic.checkpassword': validate_thumbservice_password}})
2021-08-18 22:23:43 -07:00
# General config options
cherrypy.config.update({
'tools.sessions.storage_class': DatabaseSession,
'tools.sessions.on': True,
'tools.sessions.locking': 'explicit',
'tools.sessions.timeout': 525600,
'request.show_tracebacks': True,
'server.socket_port': args.port,
'server.thread_pool': 5,
'server.socket_host': '0.0.0.0',
'server.show_tracebacks': True,
'log.screen': False,
'engine.autoreload.on': args.debug,
})
# Setup signal handling and run it.
def signal_handler(signum, stack):
logging.critical('Got sig {}, exiting...'.format(signum))
cherrypy.engine.exit()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
cherrypy.engine.start()
cherrypy.engine.block()
finally:
logging.info("API has shut down")
cherrypy.engine.exit()
if __name__ == '__main__':
main()