118 lines
3.1 KiB
Python
118 lines
3.1 KiB
Python
import hashlib
|
|
import logging
|
|
import traceback
|
|
from queue import Queue
|
|
from threading import Thread, Semaphore
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from dataclasses import dataclass
|
|
|
|
from backupdb2.common import MAX_QUEUED_CHUNKS, STREAM_CHUNK_SIZE, MAX_PARALLEL_UPLOADS
|
|
|
|
|
|
@dataclass
|
|
class S3Chunk:
|
|
"""
|
|
Object containing one chunk of data to be uploaded
|
|
"""
|
|
key: str
|
|
data: bytes
|
|
|
|
|
|
class S3UploadQueueThread(Thread):
|
|
def __init__(self, queue, bucket, s3conn):
|
|
super().__init__()
|
|
self.queue = queue
|
|
self.bucket = bucket
|
|
self.s3 = s3conn
|
|
self.lock = Semaphore(MAX_PARALLEL_UPLOADS)
|
|
self.errors = []
|
|
|
|
def run(self):
|
|
with ThreadPoolExecutor(max_workers=MAX_PARALLEL_UPLOADS) as executor:
|
|
futures = []
|
|
while True:
|
|
chunk = self.queue.get()
|
|
if chunk is None:
|
|
break
|
|
self.lock.acquire()
|
|
futures.append(executor.submit(self.upload, chunk))
|
|
|
|
for future in as_completed(futures):
|
|
# logging.debug("upload future completed")
|
|
try:
|
|
future.result()
|
|
except:
|
|
self.errors.append(traceback.format_exc())
|
|
|
|
def upload(self, chunk):
|
|
try:
|
|
self.do_upload(chunk)
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def do_upload(self, chunk):
|
|
logging.debug(f"uploading chunk of len {len(chunk.data)} to {chunk.key}")
|
|
meta_response = self.s3.put_object(
|
|
Bucket=self.bucket,
|
|
Key=chunk.key,
|
|
Body=chunk.data
|
|
)
|
|
assert(meta_response["ResponseMetadata"]["HTTPStatusCode"] == 200), f"Upload failed: {meta_response}" #TODO
|
|
|
|
def success(self):
|
|
return not self.errors
|
|
|
|
|
|
class S3UploadError(Exception):
|
|
def __init__(self, message, errors):
|
|
super().__init__(message)
|
|
self.errors = errors
|
|
|
|
|
|
def stream_to_s3(
|
|
fstream,
|
|
s3,
|
|
bucket_name,
|
|
keypattern, # some format string with a "sequence" field
|
|
chunk_size=STREAM_CHUNK_SIZE,
|
|
):
|
|
sha = hashlib.sha256()
|
|
queue = Queue(MAX_QUEUED_CHUNKS)
|
|
background = S3UploadQueueThread(queue, bucket_name, s3)
|
|
background.start()
|
|
|
|
total_bytes = 0
|
|
sequence = 0
|
|
|
|
while True:
|
|
data = b''
|
|
while len(data) < chunk_size:
|
|
readmax = chunk_size - len(data)
|
|
bit = fstream.read(readmax)
|
|
if not bit:
|
|
break
|
|
data += bit
|
|
if not data:
|
|
break
|
|
total_bytes += len(data)
|
|
sha.update(data)
|
|
queue.put(S3Chunk(keypattern.format(sequence=sequence), data))
|
|
sequence += 1
|
|
logging.debug(f"total uploaded: {total_bytes}")
|
|
|
|
queue.put(None) # signals that there are no more items to be processed
|
|
logging.debug("read all chunks, joining uploader")
|
|
background.join()
|
|
|
|
if not background.success():
|
|
raise S3UploadError("upload failed", background.errors)
|
|
|
|
shasum = sha.hexdigest()
|
|
metadata = {
|
|
"chunks": sequence,
|
|
"size": total_bytes,
|
|
"sha256": shasum
|
|
}
|
|
|
|
return metadata
|