make s3 uploader generic
This commit is contained in:
parent
9bebaab13a
commit
d399767f72
|
@ -56,17 +56,10 @@ class BackupdbClient(object):
|
|||
|
||||
def download(self, backup, namespace="default", date=None):
|
||||
"""
|
||||
Download a backup by date, or the latest if date is not supplied.
|
||||
Download a backup by date, or the latest if date is not supplied. Returns the request's Response object
|
||||
"""
|
||||
return self.get("download", stream=True, params=dict(namespace=namespace, name=backup, date=date))
|
||||
|
||||
# def create_user(self, username, password):
|
||||
# return self.post("user", data={"username": username,
|
||||
# "password_hash": pwhash(password)})
|
||||
|
||||
# def upload(self, files, metadata):
|
||||
# return self.post("upload", files=files, data={"meta": json.dumps(metadata)})
|
||||
|
||||
|
||||
def cmd_list_configured(args, parser, config, client):
|
||||
"""
|
||||
|
@ -171,7 +164,7 @@ def cmd_backup(args, parser, config, client):
|
|||
error_scanner.start()
|
||||
|
||||
response, local_sha = client.upload(tar.stdout, args.backup, args.namespace)
|
||||
logging.info("local sha256:", local_sha)
|
||||
logging.info("local sha256: %s", local_sha)
|
||||
|
||||
tar.wait()
|
||||
error_scanner.join()
|
||||
|
@ -302,8 +295,8 @@ def main():
|
|||
# level=logging.DEBUG if args.debug else logging.INFO,
|
||||
format="%(asctime)-15s %(levelname)-8s %(filename)s:%(lineno)d %(message)s"
|
||||
)
|
||||
# logging.getLogger("botocore").setLevel(logging.ERROR)
|
||||
# logging.getLogger("urllib3").setLevel(logging.ERROR)
|
||||
logging.getLogger("botocore").setLevel(logging.ERROR)
|
||||
logging.getLogger("urllib3").setLevel(logging.ERROR)
|
||||
|
||||
config = load_cli_config(args.config)
|
||||
client = BackupdbClient(args.server or config["options"].get("server"))
|
||||
|
|
|
@ -1,6 +1,83 @@
|
|||
MAX_QUEUED_CHUNKS = 5 # max size of pre-upload file chunk queue
|
||||
MAX_PARALLEL_UPLOADS = 10 # max number of uploads happening in parallel
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
|
||||
|
||||
MAX_QUEUED_CHUNKS = int(os.environ.get("BACKUPDB_S3_QUEUE_SIZE", 5)) # max size of pre-upload file chunk queue
|
||||
MAX_PARALLEL_UPLOADS = int(os.environ.get("BACKUPDB_PARALLEL_UPLOADS", 10)) # max number of uploads happening in parallel
|
||||
# memory usage will be the sum of the above numbers times the chunk size
|
||||
CHUNK_SIZE = 1024 * 1024 * 10 # 10 MB
|
||||
STREAM_CHUNK_SIZE = int(os.environ.get("BACKUPDB_STREAM_CHUNK_SIZE", 1024 * 1024 * 10)) # 10 MB
|
||||
|
||||
LOCKFILE = ".datadb.lock"
|
||||
|
||||
|
||||
import botocore.errorfactory
|
||||
|
||||
|
||||
class BackupManager(object):
|
||||
"""
|
||||
Client for listing/reading backups
|
||||
"""
|
||||
def __init__(self, bucket, s3conn):
|
||||
self.bucket = bucket
|
||||
self.s3 = s3conn
|
||||
|
||||
def list_namespaces(self):
|
||||
return self.list_prefix()
|
||||
|
||||
def list_backups(self, namespace="default"):
|
||||
return self.list_prefix(f"{namespace}/")
|
||||
|
||||
def list_dates(self, backup, namespace="default"):
|
||||
return self.list_prefix(f"{namespace}/{backup}/backups/") #TODO technically we should only list those with a meta.json as that signals completeness
|
||||
|
||||
def date_valid(self, backup, date, namespace="default"):
|
||||
"""
|
||||
Return true is a backup is valid (has meta.json) or false otherwise
|
||||
"""
|
||||
try:
|
||||
self.get_metadata(backup, date, namespace)
|
||||
return True
|
||||
except self.s3.exceptions.NoSuchKey:
|
||||
return False
|
||||
|
||||
def list_prefix(self, prefix=""):
|
||||
r = self.s3.list_objects(
|
||||
Bucket=self.bucket,
|
||||
Delimiter="/",
|
||||
Prefix=prefix,
|
||||
)
|
||||
|
||||
if r["ResponseMetadata"]["HTTPStatusCode"] != 200:
|
||||
raise Exception(str(r["ResponseMetadata"]))
|
||||
|
||||
plen = len(prefix)
|
||||
|
||||
return [
|
||||
o["Prefix"][plen:].rstrip("/") for o in r.get("CommonPrefixes", [])
|
||||
] + [
|
||||
o["Key"][plen:].rstrip("/") for o in r.get("Contents", [])
|
||||
]
|
||||
|
||||
def get_metadata(self, backup, date, namespace="default"):
|
||||
return json.loads(
|
||||
self.s3.get_object(
|
||||
Bucket=self.bucket,
|
||||
Key=f"{namespace}/{backup}/backups/{date}/meta.json"
|
||||
)["Body"].read().decode("utf-8")
|
||||
)
|
||||
|
||||
def get_stream(self, backup, date, namespace="default"):
|
||||
prefix = f"{namespace}/{backup}/backups/{date}/"
|
||||
chunks = self.list_prefix(prefix)
|
||||
chunks.sort()
|
||||
|
||||
for chunk in chunks:
|
||||
if not chunk.startswith("backup.tar.gz."):
|
||||
continue # ignore metadata etc
|
||||
chunk_key = f"{prefix}{chunk}"
|
||||
logging.info("fetching chunk %s", chunk_key)
|
||||
yield from self.s3.get_object(
|
||||
Bucket=self.bucket,
|
||||
Key=chunk_key,
|
||||
)["Body"].iter_chunks(chunk_size=1024 * 1024)
|
||||
|
|
|
@ -3,74 +3,12 @@ import argparse
|
|||
import cherrypy
|
||||
import logging
|
||||
import signal
|
||||
import uuid
|
||||
import hashlib
|
||||
from datetime import datetime
|
||||
from urllib.parse import urlparse
|
||||
from queue import Queue
|
||||
|
||||
from backupdb2.common import MAX_QUEUED_CHUNKS, CHUNK_SIZE
|
||||
from backupdb2.upload import S3UploadQueueThread, Chunk
|
||||
from backupdb2.upload import stream_to_s3, S3UploadError
|
||||
from backupdb2.boto import get_s3
|
||||
|
||||
|
||||
class BackupManager(object):
|
||||
"""
|
||||
Client for listing/reading backups
|
||||
"""
|
||||
def __init__(self, bucket, s3conn):
|
||||
self.bucket = bucket
|
||||
self.s3 = s3conn
|
||||
|
||||
def list_namespaces(self):
|
||||
return self.list_prefix()
|
||||
|
||||
def list_backups(self, namespace="default"):
|
||||
return self.list_prefix(f"{namespace}/")
|
||||
|
||||
def list_dates(self, backup, namespace="default"):
|
||||
return self.list_prefix(f"{namespace}/{backup}/backups/") #TODO technically we should only list those with a meta.json as that signals completeness
|
||||
|
||||
def list_prefix(self, prefix=""):
|
||||
r = self.s3.list_objects(
|
||||
Bucket=self.bucket,
|
||||
Delimiter="/",
|
||||
Prefix=prefix,
|
||||
)
|
||||
|
||||
if r["ResponseMetadata"]["HTTPStatusCode"] != 200:
|
||||
raise Exception(str(r["ResponseMetadata"]))
|
||||
|
||||
plen = len(prefix)
|
||||
|
||||
return [
|
||||
o["Prefix"][plen:].rstrip("/") for o in r.get("CommonPrefixes", [])
|
||||
] + [
|
||||
o["Key"][plen:].rstrip("/") for o in r.get("Contents", [])
|
||||
]
|
||||
|
||||
def get_metadata(self, backup, date, namespace="default"):
|
||||
return json.loads(
|
||||
self.s3.get_object(
|
||||
Bucket=self.bucket,
|
||||
Key=f"{namespace}/{backup}/backups/{date}/meta.json"
|
||||
)["Body"].read().decode("utf-8")
|
||||
)
|
||||
|
||||
def get_stream(self, backup, date, namespace="default"):
|
||||
prefix = f"{namespace}/{backup}/backups/{date}/"
|
||||
chunks = self.list_prefix(prefix)
|
||||
chunks.sort()
|
||||
|
||||
for chunk in chunks:
|
||||
if not chunk.startswith("backup.tar.gz."):
|
||||
continue # ignore metadata etc
|
||||
chunk_key = f"{prefix}{chunk}"
|
||||
logging.info("fetching chunk %s", chunk_key)
|
||||
yield from self.s3.get_object(
|
||||
Bucket=self.bucket,
|
||||
Key=chunk_key,
|
||||
)["Body"].iter_chunks(chunk_size=1024 * 1024)
|
||||
from backupdb2.common import BackupManager
|
||||
|
||||
|
||||
class WebBase(object):
|
||||
|
@ -177,44 +115,22 @@ class BackupdbApiV1(WebBase):
|
|||
def upload(self, name, namespace="default"):
|
||||
#TODO validate name & namespace
|
||||
# cherrypy.response.timeout = 3600
|
||||
|
||||
now = datetime.now()
|
||||
uid = str(uuid.uuid4())
|
||||
sha = hashlib.sha256()
|
||||
queue = Queue(MAX_QUEUED_CHUNKS)
|
||||
background = S3UploadQueueThread(queue, self.bucket, self.s3)
|
||||
background.start()
|
||||
|
||||
total_bytes = 0
|
||||
sequence = 0
|
||||
try:
|
||||
metadata = stream_to_s3(
|
||||
cherrypy.request.body,
|
||||
self.s3,
|
||||
self.bucket,
|
||||
f"{namespace}/{name}/backups/{now.isoformat()}/backup.tar.gz.{{sequence:08d}}"
|
||||
)
|
||||
except S3UploadError as ue:
|
||||
cherrypy.response.status = 500
|
||||
logging.error(f"uploader failed: {ue.errors}")
|
||||
return {"errors": ue.errors}
|
||||
|
||||
while True:
|
||||
data = b''
|
||||
while len(data) < CHUNK_SIZE:
|
||||
readmax = CHUNK_SIZE - len(data)
|
||||
bit = cherrypy.request.body.read(readmax)
|
||||
if not bit:
|
||||
break
|
||||
data += bit
|
||||
if not data:
|
||||
break
|
||||
total_bytes += len(data)
|
||||
sha.update(data)
|
||||
queue.put(Chunk(sequence, data, uid, now, name, namespace))
|
||||
sequence += 1
|
||||
logging.debug(f"total uploaded: {total_bytes}")
|
||||
|
||||
queue.put(None) # signals that there are no more items to be processed
|
||||
shasum = sha.hexdigest()
|
||||
metadata = {
|
||||
"date": now.isoformat(),
|
||||
"uid": uid,
|
||||
"chunks": sequence,
|
||||
"size": total_bytes,
|
||||
"sha256": shasum
|
||||
}
|
||||
|
||||
logging.debug("read all chunks, joining uploader")
|
||||
background.join()
|
||||
metadata["date"] = now.isoformat()
|
||||
|
||||
logging.debug("upload complete, writing metadata")
|
||||
meta_response = self.s3.put_object(
|
||||
|
@ -226,13 +142,8 @@ class BackupdbApiV1(WebBase):
|
|||
cherrypy.response.status = 500
|
||||
return {"errors": "backend upload failed: " + str(meta_response["ResponseMetadata"])}
|
||||
|
||||
if background.success():
|
||||
logging.debug("upload success")
|
||||
return metadata
|
||||
else:
|
||||
cherrypy.response.status = 500
|
||||
logging.error(f"uploader failed: {background.errors}")
|
||||
return {"errors": background.errors}
|
||||
logging.debug("upload success")
|
||||
return metadata
|
||||
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
|
|
|
@ -1,28 +1,21 @@
|
|||
import hashlib
|
||||
import logging
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from queue import Queue
|
||||
from threading import Thread, Semaphore
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from dataclasses import dataclass
|
||||
|
||||
from backupdb2.common import MAX_PARALLEL_UPLOADS
|
||||
from backupdb2.common import MAX_QUEUED_CHUNKS, STREAM_CHUNK_SIZE, MAX_PARALLEL_UPLOADS
|
||||
|
||||
|
||||
@dataclass
|
||||
class Chunk:
|
||||
class S3Chunk:
|
||||
"""
|
||||
Object containing one chunk of data to be uploaded
|
||||
"""
|
||||
sequence: int # order of the chunk in the file
|
||||
key: str
|
||||
data: bytes
|
||||
uid: str # uid label
|
||||
date: datetime # date label
|
||||
name: str # name of the backup e.g. "plexmain"
|
||||
namespace: str # namespace of the backup
|
||||
|
||||
@property
|
||||
def path(self) -> str:
|
||||
return f"{self.namespace}/{self.name}/backups/{self.date.isoformat()}/backup.tar.gz.{self.sequence:08d}"
|
||||
|
||||
|
||||
class S3UploadQueueThread(Thread):
|
||||
|
@ -58,13 +51,67 @@ class S3UploadQueueThread(Thread):
|
|||
self.lock.release()
|
||||
|
||||
def do_upload(self, chunk):
|
||||
logging.debug(f"uploading chunk {chunk.sequence} of len {len(chunk.data)} to {chunk.path}")
|
||||
logging.debug(f"uploading chunk of len {len(chunk.data)} to {chunk.key}")
|
||||
meta_response = self.s3.put_object(
|
||||
Bucket=self.bucket,
|
||||
Key=chunk.path,
|
||||
Key=chunk.key,
|
||||
Body=chunk.data
|
||||
)
|
||||
assert(meta_response["ResponseMetadata"]["HTTPStatusCode"] == 200), f"Upload failed: {meta_response}" #TODO
|
||||
|
||||
def success(self):
|
||||
return not self.errors
|
||||
|
||||
|
||||
class S3UploadError(Exception):
|
||||
def __init__(self, message, errors):
|
||||
super().__init__(message)
|
||||
self.errors = errors
|
||||
|
||||
|
||||
def stream_to_s3(
|
||||
fstream,
|
||||
s3,
|
||||
bucket_name,
|
||||
keypattern, # some format string with a "sequence" field
|
||||
chunk_size=STREAM_CHUNK_SIZE,
|
||||
):
|
||||
sha = hashlib.sha256()
|
||||
queue = Queue(MAX_QUEUED_CHUNKS)
|
||||
background = S3UploadQueueThread(queue, bucket_name, s3)
|
||||
background.start()
|
||||
|
||||
total_bytes = 0
|
||||
sequence = 0
|
||||
|
||||
while True:
|
||||
data = b''
|
||||
while len(data) < chunk_size:
|
||||
readmax = chunk_size - len(data)
|
||||
bit = fstream.read(readmax)
|
||||
if not bit:
|
||||
break
|
||||
data += bit
|
||||
if not data:
|
||||
break
|
||||
total_bytes += len(data)
|
||||
sha.update(data)
|
||||
queue.put(S3Chunk(keypattern.format(sequence=sequence), data))
|
||||
sequence += 1
|
||||
logging.debug(f"total uploaded: {total_bytes}")
|
||||
|
||||
queue.put(None) # signals that there are no more items to be processed
|
||||
logging.debug("read all chunks, joining uploader")
|
||||
background.join()
|
||||
|
||||
if not background.success():
|
||||
raise S3UploadError("upload failed", background.errors)
|
||||
|
||||
shasum = sha.hexdigest()
|
||||
metadata = {
|
||||
"chunks": sequence,
|
||||
"size": total_bytes,
|
||||
"sha256": shasum
|
||||
}
|
||||
|
||||
return metadata
|
||||
|
|
Loading…
Reference in New Issue