81 lines
2.7 KiB
Python
81 lines
2.7 KiB
Python
import os
|
|
import json
|
|
import logging
|
|
|
|
|
|
MAX_QUEUED_CHUNKS = int(os.environ.get("BACKUPDB_S3_QUEUE_SIZE", 5)) # max size of pre-upload file chunk queue
|
|
MAX_PARALLEL_UPLOADS = int(os.environ.get("BACKUPDB_PARALLEL_UPLOADS", 10)) # max number of uploads happening in parallel
|
|
# memory usage will be the sum of the above numbers times the chunk size
|
|
STREAM_CHUNK_SIZE = int(os.environ.get("BACKUPDB_STREAM_CHUNK_SIZE", 1024 * 1024 * 10)) # 10 MB
|
|
|
|
LOCKFILE = ".datadb.lock"
|
|
|
|
|
|
class BackupManager(object):
|
|
"""
|
|
Client for listing/reading backups
|
|
"""
|
|
def __init__(self, bucket, s3conn):
|
|
self.bucket = bucket
|
|
self.s3 = s3conn
|
|
|
|
def list_namespaces(self):
|
|
return self.list_prefix()
|
|
|
|
def list_backups(self, namespace="default"):
|
|
return self.list_prefix(f"{namespace}/")
|
|
|
|
def list_dates(self, backup, namespace="default"):
|
|
return self.list_prefix(f"{namespace}/{backup}/backups/") #TODO technically we should only list those with a meta.json as that signals completeness
|
|
|
|
def date_valid(self, backup, date, namespace="default"):
|
|
"""
|
|
Return true is a backup is valid (has meta.json) or false otherwise
|
|
"""
|
|
try:
|
|
self.get_metadata(backup, date, namespace)
|
|
return True
|
|
except self.s3.exceptions.NoSuchKey:
|
|
return False
|
|
|
|
def list_prefix(self, prefix=""):
|
|
r = self.s3.list_objects(
|
|
Bucket=self.bucket,
|
|
Delimiter="/",
|
|
Prefix=prefix,
|
|
)
|
|
|
|
if r["ResponseMetadata"]["HTTPStatusCode"] != 200:
|
|
raise Exception(str(r["ResponseMetadata"]))
|
|
|
|
plen = len(prefix)
|
|
|
|
return [
|
|
o["Prefix"][plen:].rstrip("/") for o in r.get("CommonPrefixes", [])
|
|
] + [
|
|
o["Key"][plen:].rstrip("/") for o in r.get("Contents", [])
|
|
]
|
|
|
|
def get_metadata(self, backup, date, namespace="default"):
|
|
return json.loads(
|
|
self.s3.get_object(
|
|
Bucket=self.bucket,
|
|
Key=f"{namespace}/{backup}/backups/{date}/meta.json"
|
|
)["Body"].read().decode("utf-8")
|
|
)
|
|
|
|
def get_stream(self, backup, date, namespace="default"):
|
|
prefix = f"{namespace}/{backup}/backups/{date}/"
|
|
chunks = self.list_prefix(prefix)
|
|
chunks.sort()
|
|
|
|
for chunk in chunks:
|
|
if not chunk.startswith("backup.tar.gz."):
|
|
continue # ignore metadata etc
|
|
chunk_key = f"{prefix}{chunk}"
|
|
logging.info("fetching chunk %s", chunk_key)
|
|
yield from self.s3.get_object(
|
|
Bucket=self.bucket,
|
|
Key=chunk_key,
|
|
)["Body"].iter_chunks(chunk_size=1024 * 1024)
|