diff --git a/photoapp/api.py b/photoapp/api.py index f3554c7..08071e2 100644 --- a/photoapp/api.py +++ b/photoapp/api.py @@ -5,80 +5,12 @@ from datetime import datetime from photoapp.types import Photo, PhotoSet, Tag, PhotoStatus, User, known_extensions, known_mimes, genuuid from photoapp.utils import copysha, get_extension from photoapp.image import special_magic_fobj +from photoapp.storage import StorageAdapter from photoapp.dbutils import db from contextlib import closing import traceback -class StorageAdapter(object): - """ - Abstract interface for working with photo file storage. All paths are relative to the storage adapter's root param. - """ - - def exists(self, path): - # TODO return true/false if the file path exists - raise NotImplementedError() - - def open(self, path, mode): - # TODO return a handle to the path - # TODO this should work as a context manager - raise NotImplementedError() - - def delete(self, path): - # TODO erase the path - raise NotImplementedError() - - def getsize(self, path): - raise NotImplementedError() - - -class FilesystemAdapter(StorageAdapter): - def __init__(self, root): - super().__init__() - self.root = root # root path - - def exists(self, path): - # TODO return true/false if the file path exists - return os.path.exists(self._abspath(path)) - - def open(self, path, mode): - # TODO return a handle to the path. this should work as a context manager - os.makedirs(os.path.dirname(self._abspath(path)), exist_ok=True) - return open(self._abspath(path), mode) - - def delete(self, path): - # TODO delete the file - # TODO prune empty directories that were components of $path - os.unlink(self._abspath(path)) - - def getsize(self, path): - return os.path.getsize(self._abspath(path)) - - def _abspath(self, path): - return os.path.join(self.root, path) - - -class S3Adapter(StorageAdapter): - def exists(self, path): - # TODO return true/false if the file path exists - raise NotImplementedError() - - def open(self, path, mode): - # TODO return a handle to the path. this should work as a context manager - raise NotImplementedError() - - def delete(self, path): - # TODO erase the path - raise NotImplementedError() - - def getsize(self, path): - raise NotImplementedError() - - -class GfapiAdapter(StorageAdapter): - pass # TODO gluster storage backend - - class LibraryManager(object): def __init__(self, storage): assert isinstance(storage, StorageAdapter) diff --git a/photoapp/daemon.py b/photoapp/daemon.py index f8ce0fb..6e33b9c 100644 --- a/photoapp/daemon.py +++ b/photoapp/daemon.py @@ -7,9 +7,11 @@ from datetime import datetime, timedelta from photoapp.thumb import ThumbGenerator from photoapp.types import Photo, PhotoSet, Tag, TagItem, PhotoStatus, User from photoapp.common import pwhash -from photoapp.api import PhotosApi, LibraryManager, FilesystemAdapter +from photoapp.api import PhotosApi, LibraryManager +from photoapp.storage import FilesystemAdapter from photoapp.dbutils import SAEnginePlugin, SATool, db, get_db_engine from photoapp.utils import mime2ext, auth, require_auth, photoset_auth_filter, slugify +from photoapp.storage import uri_to_storage from jinja2 import Environment, FileSystemLoader, select_autoescape from sqlalchemy import desc, func, and_, or_ @@ -438,7 +440,7 @@ def main(): SAEnginePlugin(cherrypy.engine, engine).subscribe() # Create various internal tools - library_storage = FilesystemAdapter(args.library) + library_storage = uri_to_storage(args.library) library_manager = LibraryManager(library_storage) thumbnail_tool = ThumbGenerator(library_manager, args.cache) diff --git a/photoapp/image.py b/photoapp/image.py index 04f5735..8f07587 100644 --- a/photoapp/image.py +++ b/photoapp/image.py @@ -121,7 +121,6 @@ def special_magic_fobj(fobj, fname): if fname.split(".")[-1].lower() == "xmp": return "application/octet-stream-xmp" else: - fobj.seek(0) return magic.from_buffer(fobj.read(1024), mime=True) diff --git a/photoapp/storage.py b/photoapp/storage.py new file mode 100644 index 0000000..8c4175a --- /dev/null +++ b/photoapp/storage.py @@ -0,0 +1,210 @@ +from urllib.parse import urlparse +import boto3 +from botocore.client import Config as BotoConfig +import os +from botocore.exceptions import ClientError +from queue import Queue +from threading import Thread + + +def uri_to_storage(uri): + parsed = urlparse(uri) + for schemes, backend in {("file", ): FilesystemAdapter, + ("minio", "minios", ): MinioAdapter}.items(): + if parsed.scheme in schemes: + return backend.from_uri(parsed) + raise Exception(f"Unknown storage backend '{parsed.scheme}'") + + +class StorageAdapter(object): + """ + Abstract interface for working with photo file storage. All paths are relative to the storage adapter's root param. + """ + + def exists(self, path): + # return true/false if the file path exists + raise NotImplementedError() + + def open(self, path, mode): + # return a handle to the path + raise NotImplementedError() + + def delete(self, path): + # erase the path + raise NotImplementedError() + + def getsize(self, path): + # return the number of bytes contained at the path + raise NotImplementedError() + + @staticmethod + def from_uri(uri): + # return an instance of the storage class backed by the passed URI + raise NotImplementedError() + + +class FilesystemAdapter(StorageAdapter): + def __init__(self, root): + super().__init__() + self.root = root # root path + + def exists(self, path): + # TODO return true/false if the file path exists + return os.path.exists(self._abspath(path)) + + def open(self, path, mode): + # TODO return a handle to the path. this should work as a context manager + os.makedirs(os.path.dirname(self._abspath(path)), exist_ok=True) + return open(self._abspath(path), mode) + + def delete(self, path): + # TODO delete the file + # TODO prune empty directories that were components of $path + os.unlink(self._abspath(path)) + + def getsize(self, path): + return os.path.getsize(self._abspath(path)) + + def _abspath(self, path): + return os.path.join(self.root, path) + + @staticmethod + def from_uri(uri): + relative = True if uri.netloc and uri.netloc in (".", "localhost") else False + path = uri.path[1:] if relative else (uri.netloc + uri.path) + return FilesystemAdapter(os.path.abspath(path)) + + +class S3WriteProxy(object): + def __init__(self, storage, key): + """ + A file-like class that accepts writes (until the writer closes me) and accepts reads (until closed) + """ + self.storage = storage + self.key = key + self.q = Queue(maxsize=1024) # number of $os write sizes to buffer + self.leftovers = None + self.closed = False + self.eof = False + self.s3_upload = Thread(target=self._upload) + self.s3_upload.start() + + def _upload(self): + self.storage.s3.upload_fileobj(Fileobj=self, + Bucket=self.storage.bucket, + Key=self.key) + + def write(self, data): + if self.closed: + raise Exception(f"cant write to a closed {self.__class__}") + self.q.put(data) + + def read(self, size=0): + if self.eof: + return b'' + buf = self.leftovers if self.leftovers else b'' + while True: + p = self.q.get() + if not p: + self.eof = True + break + buf += p + if len(buf) >= size: + break + if len(buf) > size: + self.leftovers = buf[size:] + buf = buf[0:size] + else: + self.leftovers = None + + return buf + + def close(self): + self.closed = True + self.q.put(None) + self.s3_upload.join() + + def __enter__(self, *args): + raise Exception("who called me?") + + +class S3ReadProxy(object): + def __init__(self, s3obj): + self.obj = s3obj + + def close(self): + self.obj.close() + + def read(self, size=None): + return self.obj.read(size) + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.obj.close() + + +class MinioAdapter(StorageAdapter): + def __init__(self, s3, bucket, prefix): + self.s3 = s3 + self.bucket = bucket + self.prefix = prefix + + try: + self.s3.get_bucket_location(Bucket=self.bucket) + except Exception: + self.s3.create_bucket(Bucket=self.bucket) + + def exists(self, path): + try: + self.s3.get_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path)) + return True + except ClientError as ce: + if ce.response['Error']['Code'] == 'NoSuchKey': + return False + raise + + def open(self, path, mode): + # TODO return a handle to the path. this should work as a context manager + if "r" in mode: + return S3ReadProxy(self.s3.get_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path))["Body"]) + elif "w" in mode: + return S3WriteProxy(self, os.path.join(self.prefix, path)) + else: + raise Exception("unknown file open mode") + + def delete(self, path): + # TODO handle verification of return status + self.s3.delete_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path)) + + def getsize(self, path): + return self.s3.head_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path))["ContentLength"] + + @staticmethod + def from_uri(uri): + secure = uri.scheme.endswith("s") + + s3args = {"config": BotoConfig(signature_version='s3v4')} + + endpoint_url = f"http{'s' if secure else ''}://{uri.hostname}" + if uri.port: + endpoint_url += f":{uri.port}" + s3args["endpoint_url"] = endpoint_url + + if uri.username and uri.password: + s3args["aws_access_key_id"] = uri.username + s3args["aws_secret_access_key"] = uri.password + + s3 = boto3.client('s3', **s3args) + bucketpath = uri.path[1:] + + splitted = bucketpath.split("/", 1) + bucket = splitted.pop(0) + prefix = splitted[0].lstrip("/") if splitted else '' + + return MinioAdapter(s3, bucket, prefix) + + +class GfapiAdapter(StorageAdapter): + pass # TODO gluster storage backend diff --git a/photoapp/thumb.py b/photoapp/thumb.py index 1adf0e4..7bffe1c 100644 --- a/photoapp/thumb.py +++ b/photoapp/thumb.py @@ -53,8 +53,9 @@ class ThumbGenerator(object): # TODO have the subprocess download the file with tempfile.TemporaryDirectory() as tmpdir: fpath = os.path.join(tmpdir, "image") - with self.library.storage.open(photo.path, 'rb') as fsrc, open(fpath, 'wb') as fdest: - copyfileobj(fsrc, fdest) + with self.library.storage.open(photo.path, 'rb') as fsrc: + with open(fpath, 'wb') as fdest: + copyfileobj(fsrc, fdest) p = Process(target=self.gen_thumb, args=(fpath, dest, thumb_width, thumb_height, photo.orientation)) p.start() diff --git a/photoapp/utils.py b/photoapp/utils.py index f6028ce..fa4dc5a 100644 --- a/photoapp/utils.py +++ b/photoapp/utils.py @@ -7,7 +7,7 @@ import hashlib def copysha(fpin, fpout): sha = hashlib.sha256() while True: - b = fpin.read(4096) + b = fpin.read(1024 * 256) if not b: break fpout.write(b) diff --git a/requirements.txt b/requirements.txt index 4deb51f..23bef51 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,19 +1,25 @@ backports.functools-lru-cache==1.5 +boto3==1.9.183 +botocore==1.12.183 certifi==2019.6.16 chardet==3.0.4 cheroot==6.5.2 CherryPy==18.1.2 contextlib2==0.5.5 +docutils==0.14 idna==2.8 jaraco.functools==1.20 Jinja2==2.10.1 +jmespath==0.9.4 MarkupSafe==1.0 more-itertools==4.3.0 Pillow==5.2.0 portend==2.3 +python-dateutil==2.8.0 python-magic==0.4.15 pytz==2018.5 requests==2.22.0 +s3transfer==0.2.1 six==1.11.0 SQLAlchemy==1.3.5 tabulate==0.8.3