from urllib.parse import urlparse import boto3 from botocore.client import Config as BotoConfig import os from botocore.exceptions import ClientError from queue import Queue from threading import Thread def uri_to_storage(uri): parsed = urlparse(uri) for schemes, backend in {("file", ): FilesystemAdapter, ("minio", "minios", ): MinioAdapter}.items(): if parsed.scheme in schemes: return backend.from_uri(parsed) raise Exception(f"Unknown storage backend '{parsed.scheme}'") class StorageAdapter(object): """ Abstract interface for working with photo file storage. All paths are relative to the storage adapter's root param. TODO add __exit__, and maybe __enter__ if that matches normal files """ def exists(self, path): # return true/false if the file path exists raise NotImplementedError() def open(self, path, mode): # return a handle to the path raise NotImplementedError() def delete(self, path): # erase the path raise NotImplementedError() def getsize(self, path): # return the number of bytes contained at the path raise NotImplementedError() @staticmethod def from_uri(uri): # return an instance of the storage class backed by the passed URI raise NotImplementedError() class FilesystemAdapter(StorageAdapter): def __init__(self, root): super().__init__() self.root = root # root path def exists(self, path): return os.path.exists(self._abspath(path)) def open(self, path, mode): os.makedirs(os.path.dirname(self._abspath(path)), exist_ok=True) return open(self._abspath(path), mode) def delete(self, path): os.unlink(self._abspath(path)) def getsize(self, path): return os.path.getsize(self._abspath(path)) def _abspath(self, path): return os.path.join(self.root, path) @staticmethod def from_uri(uri): relative = True if uri.netloc and uri.netloc in (".", "localhost") else False path = uri.path[1:] if relative else (uri.netloc + uri.path) return FilesystemAdapter(os.path.abspath(path)) class S3WriteProxy(object): def __init__(self, storage, key): """ A file-like class that accepts writes (until the writer closes me) and accepts reads (until closed) """ self.storage = storage self.key = key self.q = Queue(maxsize=1024) # number of $os write sizes to buffer self.leftovers = None self.closed = False self.eof = False self.s3_upload = Thread(target=self._upload) self.s3_upload.start() def _upload(self): self.storage.s3.upload_fileobj(Fileobj=self, Bucket=self.storage.bucket, Key=self.key) def write(self, data): if self.closed: raise Exception(f"cant write to a closed {self.__class__}") self.q.put(data) def read(self, size=0): if self.eof: return b'' buf = self.leftovers if self.leftovers else b'' while True: p = self.q.get() if not p: self.eof = True break buf += p if len(buf) >= size: break if len(buf) > size: self.leftovers = buf[size:] buf = buf[0:size] else: self.leftovers = None return buf def close(self): self.closed = True self.q.put(None) self.s3_upload.join() def __enter__(self, *args): raise Exception("who called me?") class S3ReadProxy(object): def __init__(self, s3obj): self.obj = s3obj def close(self): self.obj.close() def read(self, size=None): return self.obj.read(size) def __enter__(self): return self def __exit__(self, type, value, traceback): self.obj.close() class MinioAdapter(StorageAdapter): def __init__(self, s3, bucket, prefix): self.s3 = s3 self.bucket = bucket self.prefix = prefix try: self.s3.get_bucket_location(Bucket=self.bucket) except Exception: self.s3.create_bucket(Bucket=self.bucket) def exists(self, path): try: self.s3.get_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path)) return True except ClientError as ce: if ce.response['Error']['Code'] == 'NoSuchKey': return False raise def open(self, path, mode): if "r" in mode: return S3ReadProxy(self.s3.get_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path))["Body"]) elif "w" in mode: return S3WriteProxy(self, os.path.join(self.prefix, path)) else: raise Exception("unknown file open mode") def delete(self, path): self.s3.delete_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path)) def getsize(self, path): return self.s3.head_object(Bucket=self.bucket, Key=os.path.join(self.prefix, path))["ContentLength"] @staticmethod def from_uri(uri): secure = uri.scheme.endswith("s") s3args = {"config": BotoConfig(signature_version='s3v4')} endpoint_url = f"http{'s' if secure else ''}://{uri.hostname}" if uri.port: endpoint_url += f":{uri.port}" s3args["endpoint_url"] = endpoint_url if uri.username and uri.password: s3args["aws_access_key_id"] = uri.username s3args["aws_secret_access_key"] = uri.password s3 = boto3.client('s3', **s3args) bucketpath = uri.path[1:] splitted = bucketpath.split("/", 1) bucket = splitted.pop(0) prefix = splitted[0].lstrip("/") if splitted else '' return MinioAdapter(s3, bucket, prefix) class GfapiAdapter(StorageAdapter): pass # TODO gluster storage backend