minio (s3) storage backend and backend uri support

This commit is contained in:
dave 2019-07-04 23:55:49 -07:00
parent a2e854568f
commit 83a0702341
7 changed files with 225 additions and 75 deletions

View File

@ -5,80 +5,12 @@ from datetime import datetime
from photoapp.types import Photo, PhotoSet, Tag, PhotoStatus, User, known_extensions, known_mimes, genuuid
from photoapp.utils import copysha, get_extension
from photoapp.image import special_magic_fobj
from photoapp.storage import StorageAdapter
from photoapp.dbutils import db
from contextlib import closing
import traceback
class StorageAdapter(object):
"""
Abstract interface for working with photo file storage. All paths are relative to the storage adapter's root param.
"""
def exists(self, path):
# TODO return true/false if the file path exists
raise NotImplementedError()
def open(self, path, mode):
# TODO return a handle to the path
# TODO this should work as a context manager
raise NotImplementedError()
def delete(self, path):
# TODO erase the path
raise NotImplementedError()
def getsize(self, path):
raise NotImplementedError()
class FilesystemAdapter(StorageAdapter):
def __init__(self, root):
super().__init__()
self.root = root # root path
def exists(self, path):
# TODO return true/false if the file path exists
return os.path.exists(self._abspath(path))
def open(self, path, mode):
# TODO return a handle to the path. this should work as a context manager
os.makedirs(os.path.dirname(self._abspath(path)), exist_ok=True)
return open(self._abspath(path), mode)
def delete(self, path):
# TODO delete the file
# TODO prune empty directories that were components of $path
os.unlink(self._abspath(path))
def getsize(self, path):
return os.path.getsize(self._abspath(path))
def _abspath(self, path):
return os.path.join(self.root, path)
class S3Adapter(StorageAdapter):
def exists(self, path):
# TODO return true/false if the file path exists
raise NotImplementedError()
def open(self, path, mode):
# TODO return a handle to the path. this should work as a context manager
raise NotImplementedError()
def delete(self, path):
# TODO erase the path
raise NotImplementedError()
def getsize(self, path):
raise NotImplementedError()
class GfapiAdapter(StorageAdapter):
pass # TODO gluster storage backend
class LibraryManager(object):
def __init__(self, storage):
assert isinstance(storage, StorageAdapter)

View File

@ -7,9 +7,11 @@ from datetime import datetime, timedelta
from photoapp.thumb import ThumbGenerator
from photoapp.types import Photo, PhotoSet, Tag, TagItem, PhotoStatus, User
from photoapp.common import pwhash
from photoapp.api import PhotosApi, LibraryManager, FilesystemAdapter
from photoapp.api import PhotosApi, LibraryManager
from photoapp.storage import FilesystemAdapter
from photoapp.dbutils import SAEnginePlugin, SATool, db, get_db_engine
from photoapp.utils import mime2ext, auth, require_auth, photoset_auth_filter, slugify
from photoapp.storage import uri_to_storage
from jinja2 import Environment, FileSystemLoader, select_autoescape
from sqlalchemy import desc, func, and_, or_
@ -438,7 +440,7 @@ def main():
SAEnginePlugin(cherrypy.engine, engine).subscribe()
# Create various internal tools
library_storage = FilesystemAdapter(args.library)
library_storage = uri_to_storage(args.library)
library_manager = LibraryManager(library_storage)
thumbnail_tool = ThumbGenerator(library_manager, args.cache)

View File

@ -121,7 +121,6 @@ def special_magic_fobj(fobj, fname):
if fname.split(".")[-1].lower() == "xmp":
return "application/octet-stream-xmp"
else:
fobj.seek(0)
return magic.from_buffer(fobj.read(1024), mime=True)

210
photoapp/storage.py Normal file
View File

@ -0,0 +1,210 @@
from urllib.parse import urlparse
import boto3
from botocore.client import Config as BotoConfig
import os
from botocore.exceptions import ClientError
from queue import Queue
from threading import Thread
def uri_to_storage(uri):
parsed = urlparse(uri)
for schemes, backend in {("file", ): FilesystemAdapter,
("minio", "minios", ): MinioAdapter}.items():
if parsed.scheme in schemes:
return backend.from_uri(parsed)
raise Exception(f"Unknown storage backend '{parsed.scheme}'")
class StorageAdapter(object):
"""
Abstract interface for working with photo file storage. All paths are relative to the storage adapter's root param.
"""
def exists(self, path):
# return true/false if the file path exists
raise NotImplementedError()
def open(self, path, mode):
# return a handle to the path
raise NotImplementedError()
def delete(self, path):
# erase the path
raise NotImplementedError()
def getsize(self, path):
# return the number of bytes contained at the path
raise NotImplementedError()
@staticmethod
def from_uri(uri):
# return an instance of the storage class backed by the passed URI
raise NotImplementedError()
class FilesystemAdapter(StorageAdapter):
def __init__(self, root):
super().__init__()
self.root = root # root path
def exists(self, path):
# TODO return true/false if the file path exists
return os.path.exists(self._abspath(path))
def open(self, path, mode):
# TODO return a handle to the path. this should work as a context manager
os.makedirs(os.path.dirname(self._abspath(path)), exist_ok=True)
return open(self._abspath(path), mode)
def delete(self, path):
# TODO delete the file
# TODO prune empty directories that were components of $path
os.unlink(self._abspath(path))
def getsize(self, path):
return os.path.getsize(self._abspath(path))
def _abspath(self, path):
return os.path.join(self.root, path)
@staticmethod
def from_uri(uri):
relative = True if uri.netloc and uri.netloc in (".", "localhost") else False
path = uri.path[1:] if relative else (uri.netloc + uri.path)
return FilesystemAdapter(os.path.abspath(path))
class S3WriteProxy(object):
def __init__(self, storage, key):
"""
A file-like class that accepts writes (until the writer closes me) and accepts reads (until closed)
"""
self.storage = storage
self.key = key
self.q = Queue(maxsize=1024) # number of $os write sizes to buffer
self.leftovers = None
self.closed = False
self.eof = False
self.s3_upload = Thread(target=self._upload)
self.s3_upload.start()
def _upload(self):
self.storage.s3.upload_fileobj(Fileobj=self,
Bucket=self.storage.bucket,
Key=self.key)
def write(self, data):
if self.closed:
raise Exception(f"cant write to a closed {self.__class__}")
self.q.put(data)
def read(self, size=0):
if self.eof:
return b''
buf = self.leftovers if self.leftovers else b''
while True:
p = self.q.get()
if not p:
self.eof = True
break
buf += p
if len(buf) >= size:
break
if len(buf) > size:
self.leftovers = buf[size:]
buf = buf[0:size]
else:
self.leftovers = None
return buf
def close(self):
self.closed = True
self.q.put(None)
self.s3_upload.join()
def __enter__(self, *args):
raise Exception("who called me?")
class S3ReadProxy(object):
def __init__(self, s3obj):
self.obj = s3obj
def close(self):
self.obj.close()
def read(self, size=None):
return self.obj.read(size)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.obj.close()
class MinioAdapter(StorageAdapter):
def __init__(self, s3, bucket, prefix):
self.s3 = s3
self.bucket = bucket
self.prefix = prefix
try:
self.s3.get_bucket_location(Bucket=self.bucket)
except Exception:
self.s3.create_bucket(Bucket=self.bucket)
def exists(self, path):
try:
self.s3.get_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path))
return True
except ClientError as ce:
if ce.response['Error']['Code'] == 'NoSuchKey':
return False
raise
def open(self, path, mode):
# TODO return a handle to the path. this should work as a context manager
if "r" in mode:
return S3ReadProxy(self.s3.get_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path))["Body"])
elif "w" in mode:
return S3WriteProxy(self, os.path.join(self.prefix, path))
else:
raise Exception("unknown file open mode")
def delete(self, path):
# TODO handle verification of return status
self.s3.delete_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path))
def getsize(self, path):
return self.s3.head_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path))["ContentLength"]
@staticmethod
def from_uri(uri):
secure = uri.scheme.endswith("s")
s3args = {"config": BotoConfig(signature_version='s3v4')}
endpoint_url = f"http{'s' if secure else ''}://{uri.hostname}"
if uri.port:
endpoint_url += f":{uri.port}"
s3args["endpoint_url"] = endpoint_url
if uri.username and uri.password:
s3args["aws_access_key_id"] = uri.username
s3args["aws_secret_access_key"] = uri.password
s3 = boto3.client('s3', **s3args)
bucketpath = uri.path[1:]
splitted = bucketpath.split("/", 1)
bucket = splitted.pop(0)
prefix = splitted[0].lstrip("/") if splitted else ''
return MinioAdapter(s3, bucket, prefix)
class GfapiAdapter(StorageAdapter):
pass # TODO gluster storage backend

View File

@ -53,8 +53,9 @@ class ThumbGenerator(object):
# TODO have the subprocess download the file
with tempfile.TemporaryDirectory() as tmpdir:
fpath = os.path.join(tmpdir, "image")
with self.library.storage.open(photo.path, 'rb') as fsrc, open(fpath, 'wb') as fdest:
copyfileobj(fsrc, fdest)
with self.library.storage.open(photo.path, 'rb') as fsrc:
with open(fpath, 'wb') as fdest:
copyfileobj(fsrc, fdest)
p = Process(target=self.gen_thumb, args=(fpath, dest, thumb_width, thumb_height, photo.orientation))
p.start()

View File

@ -7,7 +7,7 @@ import hashlib
def copysha(fpin, fpout):
sha = hashlib.sha256()
while True:
b = fpin.read(4096)
b = fpin.read(1024 * 256)
if not b:
break
fpout.write(b)

View File

@ -1,19 +1,25 @@
backports.functools-lru-cache==1.5
boto3==1.9.183
botocore==1.12.183
certifi==2019.6.16
chardet==3.0.4
cheroot==6.5.2
CherryPy==18.1.2
contextlib2==0.5.5
docutils==0.14
idna==2.8
jaraco.functools==1.20
Jinja2==2.10.1
jmespath==0.9.4
MarkupSafe==1.0
more-itertools==4.3.0
Pillow==5.2.0
portend==2.3
python-dateutil==2.8.0
python-magic==0.4.15
pytz==2018.5
requests==2.22.0
s3transfer==0.2.1
six==1.11.0
SQLAlchemy==1.3.5
tabulate==0.8.3