84 lines
3.5 KiB
Python
84 lines
3.5 KiB
Python
import os
|
|
import sys
|
|
import traceback
|
|
from time import time
|
|
from collections import defaultdict
|
|
from multiprocessing import Process
|
|
from PIL import Image, ImageOps
|
|
import tempfile
|
|
from shutil import copyfileobj
|
|
|
|
|
|
class ThumbGenerator(object):
|
|
def __init__(self, library, cache_path):
|
|
self.library = library
|
|
self.cache_path = cache_path
|
|
self._failed_thumbs_cache = defaultdict(dict)
|
|
|
|
def get_datedir_path(self, date):
|
|
"""
|
|
Return a path like 2018/3/31 given a datetime object representing the same date
|
|
"""
|
|
return os.path.join(str(date.year), str(date.month), str(date.day))
|
|
|
|
def make_thumb(self, photo, style):
|
|
"""
|
|
Create a thumbnail of the given photo, scaled/cropped to the given named style
|
|
:return: local path to thumbnail file or None if creation failed or was blocked
|
|
"""
|
|
# style tuples: max x, max y, rotate ok)
|
|
# rotate ok means x and y maxes can be swapped if it fits the image's aspect ratio better
|
|
styles = {"tiny": (80, 80, False),
|
|
"small": (100, 100, False),
|
|
"feed": (250, 250, False),
|
|
"preview": (1024, 768, True),
|
|
"big": (2048, 1536, True)}
|
|
dest = os.path.join(self.cache_path, "thumbs", style, "{}.jpg".format(photo.uuid))
|
|
if os.path.exists(dest):
|
|
return os.path.abspath(dest)
|
|
if photo.width is None: # todo better detection of images that PIL can't open
|
|
return None
|
|
if photo.uuid not in self._failed_thumbs_cache[style]:
|
|
thumb_width, thumb_height, flip_ok = styles[style]
|
|
i_width = photo.width
|
|
i_height = photo.height
|
|
im_is_rotated = photo.orientation % 2 != 0 or i_height > i_width
|
|
|
|
if im_is_rotated and flip_ok:
|
|
thumb_width, thumb_height = thumb_height, thumb_width
|
|
|
|
thumb_width = min(thumb_width, i_width if i_width > 0 else 999999999) # TODO do we even have photo.width if PIL can't read the image?
|
|
thumb_height = min(thumb_height, i_height if i_height > 0 else 999999999) # TODO this seems bad
|
|
|
|
# TODO have the subprocess download the file
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
fpath = os.path.join(tmpdir, "image")
|
|
with self.library.storage.open(photo.path, 'rb') as fsrc, open(fpath, 'wb') as fdest:
|
|
copyfileobj(fsrc, fdest)
|
|
|
|
p = Process(target=self.gen_thumb, args=(fpath, dest, thumb_width, thumb_height, photo.orientation))
|
|
p.start()
|
|
p.join()
|
|
if p.exitcode != 0:
|
|
self._failed_thumbs_cache[style][photo.uuid] = True # dont retry failed generations
|
|
return None
|
|
return os.path.abspath(dest)
|
|
return None
|
|
|
|
@staticmethod
|
|
def gen_thumb(src_img, dest_img, width, height, rotation):
|
|
try:
|
|
start = time()
|
|
# TODO lock around the dir creation
|
|
os.makedirs(os.path.split(dest_img)[0], exist_ok=True)
|
|
image = Image.open(src_img)
|
|
image = image.rotate(90 * rotation, expand=True)
|
|
thumb = ImageOps.fit(image, (width, height), Image.ANTIALIAS)
|
|
thumb.save(dest_img, 'JPEG')
|
|
print("Generated {} in {}s".format(dest_img, round(time() - start, 4)))
|
|
except Exception:
|
|
traceback.print_exc()
|
|
if os.path.exists(dest_img):
|
|
os.unlink(dest_img)
|
|
sys.exit(1)
|