69 lines
2.1 KiB
Python
69 lines
2.1 KiB
Python
import logging
|
|
import traceback
|
|
from threading import Thread, Semaphore
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from dataclasses import dataclass
|
|
|
|
from backupdb2.common import MAX_PARALLEL_UPLOADS
|
|
|
|
|
|
@dataclass
|
|
class Chunk:
|
|
"""
|
|
Object containing one chunk of data to be uploaded
|
|
"""
|
|
sequence: int # order of the chunk in the file
|
|
data: bytes
|
|
uid: str # uid of the backup
|
|
name: str # name of the backup e.g. "plexmain"
|
|
namespace: str # namespace of the backup
|
|
|
|
@property
|
|
def path(self) -> str:
|
|
return f"{self.namespace}/{self.name}/tmp/{self.uid}/backup.tar.gz.{self.sequence:08d}"
|
|
|
|
|
|
class S3UploadQueueThread(Thread):
|
|
def __init__(self, queue, bucket, s3conn):
|
|
super().__init__()
|
|
self.queue = queue
|
|
self.bucket = bucket
|
|
self.s3 = s3conn
|
|
self.lock = Semaphore(MAX_PARALLEL_UPLOADS)
|
|
self.errors = []
|
|
|
|
def run(self):
|
|
with ThreadPoolExecutor(max_workers=MAX_PARALLEL_UPLOADS) as executor:
|
|
futures = []
|
|
while True:
|
|
chunk = self.queue.get()
|
|
if chunk is None:
|
|
break
|
|
self.lock.acquire()
|
|
futures.append(executor.submit(self.upload, chunk))
|
|
|
|
for future in as_completed(futures):
|
|
logging.debug("upload future completed")
|
|
try:
|
|
future.result()
|
|
except:
|
|
self.errors.append(traceback.format_exc())
|
|
|
|
def upload(self, chunk):
|
|
try:
|
|
self.do_upload(chunk)
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def do_upload(self, chunk):
|
|
logging.debug(f"uploading chunk {chunk.sequence} of len {len(chunk.data)} to {chunk.path}")
|
|
meta_response = self.s3.put_object(
|
|
Bucket=self.bucket,
|
|
Key=chunk.path,
|
|
Body=chunk.data
|
|
)
|
|
assert(meta_response["ResponseMetadata"]["HTTPStatusCode"] == 200), f"Upload failed: {meta_response}" #TODO
|
|
|
|
def success(self):
|
|
return not self.errors
|