backupdb2/backupdb2/upload.py

118 lines
3.1 KiB
Python

import hashlib
import logging
import traceback
from queue import Queue
from threading import Thread, Semaphore
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from backupdb2.common import MAX_QUEUED_CHUNKS, STREAM_CHUNK_SIZE, MAX_PARALLEL_UPLOADS
@dataclass
class S3Chunk:
"""
Object containing one chunk of data to be uploaded
"""
key: str
data: bytes
class S3UploadQueueThread(Thread):
def __init__(self, queue, bucket, s3conn):
super().__init__()
self.queue = queue
self.bucket = bucket
self.s3 = s3conn
self.lock = Semaphore(MAX_PARALLEL_UPLOADS)
self.errors = []
def run(self):
with ThreadPoolExecutor(max_workers=MAX_PARALLEL_UPLOADS) as executor:
futures = []
while True:
chunk = self.queue.get()
if chunk is None:
break
self.lock.acquire()
futures.append(executor.submit(self.upload, chunk))
for future in as_completed(futures):
# logging.debug("upload future completed")
try:
future.result()
except:
self.errors.append(traceback.format_exc())
def upload(self, chunk):
try:
self.do_upload(chunk)
finally:
self.lock.release()
def do_upload(self, chunk):
logging.debug(f"uploading chunk of len {len(chunk.data)} to {chunk.key}")
meta_response = self.s3.put_object(
Bucket=self.bucket,
Key=chunk.key,
Body=chunk.data
)
assert(meta_response["ResponseMetadata"]["HTTPStatusCode"] == 200), f"Upload failed: {meta_response}" #TODO
def success(self):
return not self.errors
class S3UploadError(Exception):
def __init__(self, message, errors):
super().__init__(message)
self.errors = errors
def stream_to_s3(
fstream,
s3,
bucket_name,
keypattern, # some format string with a "sequence" field
chunk_size=STREAM_CHUNK_SIZE,
):
sha = hashlib.sha256()
queue = Queue(MAX_QUEUED_CHUNKS)
background = S3UploadQueueThread(queue, bucket_name, s3)
background.start()
total_bytes = 0
sequence = 0
while True:
data = b''
while len(data) < chunk_size:
readmax = chunk_size - len(data)
bit = fstream.read(readmax)
if not bit:
break
data += bit
if not data:
break
total_bytes += len(data)
sha.update(data)
queue.put(S3Chunk(keypattern.format(sequence=sequence), data))
sequence += 1
logging.debug(f"total uploaded: {total_bytes}")
queue.put(None) # signals that there are no more items to be processed
logging.debug("read all chunks, joining uploader")
background.join()
if not background.success():
raise S3UploadError("upload failed", background.errors)
shasum = sha.hexdigest()
metadata = {
"chunks": sequence,
"size": total_bytes,
"sha256": shasum
}
return metadata