diff --git a/package/__init__.py b/backupdb2/__init__.py similarity index 100% rename from package/__init__.py rename to backupdb2/__init__.py diff --git a/backupdb2/boto.py b/backupdb2/boto.py new file mode 100644 index 0000000..137fcfb --- /dev/null +++ b/backupdb2/boto.py @@ -0,0 +1,18 @@ +import boto3 +from botocore.client import Config as BotoConfig + + +def get_s3(url): + # set up s3 client + s3args = {"config": BotoConfig(signature_version='s3v4')} + + endpoint_url = f"{url.scheme}://{url.hostname}" + if url.port: + endpoint_url += f":{url.port}" + s3args["endpoint_url"] = endpoint_url + + if url.username and url.password: + s3args["aws_access_key_id"] = url.username + s3args["aws_secret_access_key"] = url.password + + return boto3.client('s3', **s3args) diff --git a/package/cli.py b/backupdb2/cli.py similarity index 100% rename from package/cli.py rename to backupdb2/cli.py diff --git a/backupdb2/common.py b/backupdb2/common.py new file mode 100644 index 0000000..0921583 --- /dev/null +++ b/backupdb2/common.py @@ -0,0 +1,4 @@ +MAX_QUEUED_CHUNKS = 5 # max size of pre-upload file chunk queue +MAX_PARALLEL_UPLOADS = 10 # max number of uploads happening in parallel +# memory usage will be the sum of the above numbers times the chunk size +CHUNK_SIZE = 1024 * 1024 * 10 # 10 MB diff --git a/backupdb2/server.py b/backupdb2/server.py new file mode 100644 index 0000000..71fea3d --- /dev/null +++ b/backupdb2/server.py @@ -0,0 +1,153 @@ +import json +import argparse +import cherrypy +import logging +import signal +import uuid +import hashlib +from datetime import datetime +from urllib.parse import urlparse +from queue import Queue + +from backupdb2.common import MAX_QUEUED_CHUNKS, CHUNK_SIZE +from backupdb2.upload import S3UploadQueueThread, Chunk +from backupdb2.boto import get_s3 + + +class BackupdbHttp(object): + def __init__(self, bucket, s3conn): + self.bucket = bucket + self.s3 = s3conn + + self.api = self + self.v1 = self + + @cherrypy.expose + def index(self): + yield "TODO list of backups/namespaces etc" + + @cherrypy.expose + @cherrypy.tools.json_out() + def upload(self, name, namespace="default"): + #TODO validate name & namespace + # cherrypy.response.timeout = 3600 + now = datetime.now() + uid = str(uuid.uuid4()) + sha = hashlib.sha256() + queue = Queue(MAX_QUEUED_CHUNKS) + background = S3UploadQueueThread(queue, self.bucket, self.s3) + background.start() + + total_bytes = 0 + sequence = 0 + + while True: + data = b'' + while len(data) < CHUNK_SIZE: + readmax = CHUNK_SIZE - len(data) + bit = cherrypy.request.body.read(readmax) + if not bit: + break + data += bit + if not data: + break + total_bytes += len(data) + sha.update(data) + queue.put(Chunk(sequence, data, uid, name, namespace)) + sequence += 1 + logging.debug(f"total uploaded: {total_bytes}") + + queue.put(None) # signals that there are no more items to be processed + shasum = sha.hexdigest() + metadata = { + "date": now.isoformat(), + "uid": uid, + "chunks": sequence, + "size": total_bytes, + "sha256": shasum + } + + logging.debug("read all chunks, joining uploader") + background.join() + + logging.debug("upload complete, writing metadata") + meta_response = self.s3.put_object( + Bucket=self.bucket, + Key=f"{namespace}/{name}/tmp/{uid}/meta.json", + Body=json.dumps(metadata, indent=4, sort_keys=True) + ) + if meta_response["ResponseMetadata"]["HTTPStatusCode"] != 200: + cherrypy.response.status = 500 + return {"errors": "backend upload failed: " + str(meta_response["ResponseMetadata"])} + + if background.success(): + logging.debug("upload success") + return metadata + else: + cherrypy.response.status = 500 + logging.error(f"uploader failed: {background.errors}") + return {"errors": background.errors} + + @cherrypy.expose + @cherrypy.tools.json_out() + def get_latest(self, name, namespace="default"): + pass + + +def run_http(args): + s3url = urlparse(args.s3_url) + bucket = s3url.path[1:] + s3 = get_s3(s3url) + + # ensure bucket exists + if bucket not in [b['Name'] for b in s3.list_buckets()['Buckets']]: + print("Creating bucket") + s3.create_bucket(Bucket=bucket) + + web = BackupdbHttp(bucket, s3) + + cherrypy.tree.mount(web, '/', {'/': {}}) + + # General config options + cherrypy.config.update({ + 'request.show_tracebacks': True, + 'server.thread_pool': 1, + 'server.socket_host': "0.0.0.0", + 'server.socket_port': args.port, + 'server.show_tracebacks': True, + 'log.screen': False, + 'engine.autoreload.on': args.debug, + 'server.max_request_body_size': 0, + }) + + def signal_handler(signum, stack): + logging.critical('Got sig {}, exiting...'.format(signum)) + cherrypy.engine.exit() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + cherrypy.engine.start() + cherrypy.engine.block() + finally: + cherrypy.engine.exit() + + +def get_args(): + p = argparse.ArgumentParser() + p.add_argument("-p", "--port", default=8080, type=int, help="listen port for http server") + p.add_argument("-s", "--s3-url", required=True, help="minio server address") + p.add_argument("--debug", action="store_true", help="debug mode") + return p.parse_args() + + +def main(): + args = get_args() + logging.basicConfig( + level=logging.DEBUG if args.debug else logging.INFO, + format="%(asctime)-15s %(levelname)-8s %(filename)s:%(lineno)d %(message)s" + ) + logging.getLogger("botocore").setLevel(logging.ERROR) + logging.getLogger("urllib3").setLevel(logging.ERROR) + run_http(args) diff --git a/backupdb2/upload.py b/backupdb2/upload.py new file mode 100644 index 0000000..832666c --- /dev/null +++ b/backupdb2/upload.py @@ -0,0 +1,68 @@ +import logging +import traceback +from threading import Thread, Semaphore +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass + +from backupdb2.common import MAX_PARALLEL_UPLOADS + + +@dataclass +class Chunk: + """ + Object containing one chunk of data to be uploaded + """ + sequence: int # order of the chunk in the file + data: bytes + uid: str # uid of the backup + name: str # name of the backup e.g. "plexmain" + namespace: str # namespace of the backup + + @property + def path(self) -> str: + return f"{self.namespace}/{self.name}/tmp/{self.uid}/backup.tar.gz.{self.sequence:08d}" + + +class S3UploadQueueThread(Thread): + def __init__(self, queue, bucket, s3conn): + super().__init__() + self.queue = queue + self.bucket = bucket + self.s3 = s3conn + self.lock = Semaphore(MAX_PARALLEL_UPLOADS) + self.errors = [] + + def run(self): + with ThreadPoolExecutor(max_workers=MAX_PARALLEL_UPLOADS) as executor: + futures = [] + while True: + chunk = self.queue.get() + if chunk is None: + break + self.lock.acquire() + futures.append(executor.submit(self.upload, chunk)) + + for future in as_completed(futures): + logging.debug("upload future completed") + try: + future.result() + except: + self.errors.append(traceback.format_exc()) + + def upload(self, chunk): + try: + self.do_upload(chunk) + finally: + self.lock.release() + + def do_upload(self, chunk): + logging.debug(f"uploading chunk {chunk.sequence} of len {len(chunk.data)} to {chunk.path}") + meta_response = self.s3.put_object( + Bucket=self.bucket, + Key=chunk.path, + Body=chunk.data + ) + assert(meta_response["ResponseMetadata"]["HTTPStatusCode"] == 200), f"Upload failed: {meta_response}" #TODO + + def success(self): + return not self.errors diff --git a/requirements.txt b/requirements.txt index e69de29..4bca80c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1,22 @@ +boto3==1.17.82 +botocore==1.20.82 +certifi==2020.12.5 +chardet==4.0.0 +cheroot==8.5.2 +CherryPy==18.6.0 +idna==2.10 +jaraco.classes==3.2.1 +jaraco.collections==3.3.0 +jaraco.functools==3.3.0 +jaraco.text==3.5.0 +jmespath==0.10.0 +more-itertools==8.8.0 +portend==2.7.1 +python-dateutil==2.8.1 +pytz==2021.1 +requests==2.25.1 +s3transfer==0.4.2 +six==1.16.0 +tempora==4.0.2 +urllib3==1.26.5 +zc.lockfile==2.0 diff --git a/setup.py b/setup.py index ca45cdc..a868b76 100644 --- a/setup.py +++ b/setup.py @@ -11,17 +11,18 @@ with open("requirements.txt") as f: deps = f.read().split() -setup(name='package', +setup(name='backupdb2', version=__version__, - description='description', + description='backup server and cli', url='', author='dpedu', author_email='dave@davepedu.com', - packages=['package'], + packages=['backupdb2'], install_requires=deps, entry_points={ "console_scripts": [ - "packagecli = package.cli:main", + "backupdb2 = backupdb2.cli:main", + "backupdbserver = backupdb2.server:main", ] }, zip_safe=False)