photolib/photoapp/storage.py

206 lines
6.0 KiB
Python

from urllib.parse import urlparse
import boto3
from botocore.client import Config as BotoConfig
import os
from botocore.exceptions import ClientError
from queue import Queue
from threading import Thread
def uri_to_storage(uri):
parsed = urlparse(uri)
for schemes, backend in {("file", ): FilesystemAdapter,
("minio", "minios", ): MinioAdapter}.items():
if parsed.scheme in schemes:
return backend.from_uri(parsed)
raise Exception(f"Unknown storage backend '{parsed.scheme}'")
class StorageAdapter(object):
"""
Abstract interface for working with photo file storage. All paths are relative to the storage adapter's root param.
TODO add __exit__, and maybe __enter__ if that matches normal files
"""
def exists(self, path):
# return true/false if the file path exists
raise NotImplementedError()
def open(self, path, mode):
# return a handle to the path
raise NotImplementedError()
def delete(self, path):
# erase the path
raise NotImplementedError()
def getsize(self, path):
# return the number of bytes contained at the path
raise NotImplementedError()
@staticmethod
def from_uri(uri):
# return an instance of the storage class backed by the passed URI
raise NotImplementedError()
class FilesystemAdapter(StorageAdapter):
def __init__(self, root):
super().__init__()
self.root = root # root path
def exists(self, path):
return os.path.exists(self._abspath(path))
def open(self, path, mode):
os.makedirs(os.path.dirname(self._abspath(path)), exist_ok=True)
return open(self._abspath(path), mode)
def delete(self, path):
os.unlink(self._abspath(path))
def getsize(self, path):
return os.path.getsize(self._abspath(path))
def _abspath(self, path):
return os.path.join(self.root, path)
@staticmethod
def from_uri(uri):
relative = True if uri.netloc and uri.netloc in (".", "localhost") else False
path = uri.path[1:] if relative else (uri.netloc + uri.path)
return FilesystemAdapter(os.path.abspath(path))
class S3WriteProxy(object):
def __init__(self, storage, key):
"""
A file-like class that accepts writes (until the writer closes me) and accepts reads (until closed)
"""
self.storage = storage
self.key = key
self.q = Queue(maxsize=1024) # number of $os write sizes to buffer
self.leftovers = None
self.closed = False
self.eof = False
self.s3_upload = Thread(target=self._upload)
self.s3_upload.start()
def _upload(self):
self.storage.s3.upload_fileobj(Fileobj=self,
Bucket=self.storage.bucket,
Key=self.key)
def write(self, data):
if self.closed:
raise Exception(f"cant write to a closed {self.__class__}")
self.q.put(data)
def read(self, size=0):
if self.eof:
return b''
buf = self.leftovers if self.leftovers else b''
while True:
p = self.q.get()
if not p:
self.eof = True
break
buf += p
if len(buf) >= size:
break
if len(buf) > size:
self.leftovers = buf[size:]
buf = buf[0:size]
else:
self.leftovers = None
return buf
def close(self):
self.closed = True
self.q.put(None)
self.s3_upload.join()
def __enter__(self, *args):
raise Exception("who called me?")
class S3ReadProxy(object):
def __init__(self, s3obj):
self.obj = s3obj
def close(self):
self.obj.close()
def read(self, size=None):
return self.obj.read(size)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.obj.close()
class MinioAdapter(StorageAdapter):
def __init__(self, s3, bucket, prefix):
self.s3 = s3
self.bucket = bucket
self.prefix = prefix
try:
self.s3.get_bucket_location(Bucket=self.bucket)
except Exception:
self.s3.create_bucket(Bucket=self.bucket)
def exists(self, path):
try:
self.s3.get_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path))
return True
except ClientError as ce:
if ce.response['Error']['Code'] == 'NoSuchKey':
return False
raise
def open(self, path, mode):
if "r" in mode:
return S3ReadProxy(self.s3.get_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path))["Body"])
elif "w" in mode:
return S3WriteProxy(self, os.path.join(self.prefix, path))
else:
raise Exception("unknown file open mode")
def delete(self, path):
self.s3.delete_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path))
def getsize(self, path):
return self.s3.head_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path))["ContentLength"]
@staticmethod
def from_uri(uri):
secure = uri.scheme.endswith("s")
s3args = {"config": BotoConfig(signature_version='s3v4')}
endpoint_url = f"http{'s' if secure else ''}://{uri.hostname}"
if uri.port:
endpoint_url += f":{uri.port}"
s3args["endpoint_url"] = endpoint_url
if uri.username and uri.password:
s3args["aws_access_key_id"] = uri.username
s3args["aws_secret_access_key"] = uri.password
s3 = boto3.client('s3', **s3args)
bucketpath = uri.path[1:]
splitted = bucketpath.split("/", 1)
bucket = splitted.pop(0)
prefix = splitted[0].lstrip("/") if splitted else ''
return MinioAdapter(s3, bucket, prefix)
class GfapiAdapter(StorageAdapter):
pass # TODO gluster storage backend