diff --git a/b2mirror/mirror.py b/b2mirror/mirror.py index 701a0c3..e6e926c 100644 --- a/b2mirror/mirror.py +++ b/b2mirror/mirror.py @@ -1,18 +1,17 @@ import os import sqlite3 -from threading import Thread from urllib.parse import urlparse from collections import namedtuple from b2.api import B2Api import sys from itertools import islice, filterfalse -from concurrent.futures import ThreadPoolExecutor, Future +from concurrent.futures import ThreadPoolExecutor import re -#import logging -#logging.basicConfig(level=logging.INFO) +# import logging +# logging.basicConfig(level=logging.INFO) -FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', ]) # 'fp' +FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', ]) # 'fp' """ @@ -27,16 +26,18 @@ The manager will iterate the set of FileInfos, and pass each to the dest Dest will upload the file, and inform the manager it was completed """ + + class B2SyncManager(object): - workers = 10 + workers = 15 def __init__(self, source_module, dest_module, exclude_res=None): self.src = source_module self.dest = dest_module self.db = sqlite3.connect('./sync.db', check_same_thread=False) self.db.row_factory = B2SyncManager.dict_factory - self.db.isolation_level = None # TBD - does it hurt perf? + self.db.isolation_level = None # TBD - does it hurt perf? self.exclude_res = [ re.compile(r'.*\.(DS_Store|pyc|dropbox)$') ] + (exclude_res if exclude_res else []) @@ -55,7 +56,7 @@ class B2SyncManager(object): def table_exists(table_name): c.execute("SELECT * FROM SQLITE_MASTER WHERE `type`='table' AND `name`=?", (table_name,)) tables = c.fetchall() - if len(tables)==0: + if len(tables) == 0: return False return True @@ -88,15 +89,14 @@ class B2SyncManager(object): # Files will be marked as seen as they are processed # Later, unseen files will be purged c = self.db.cursor() - row = c.execute("UPDATE 'files' SET seen=0;") + c.execute("UPDATE 'files' SET seen=0;") c.close() chunk_size = 1000 files_source = filterfalse( # if rel_path matches any of the REs, the filter is True and the file is skipped - lambda x: any([pattern.match(x.rel_path) for pattern in self.exclude_res]), - self.src - ) + lambda x: any([pattern.match(x.rel_path) for pattern in self.exclude_res]), + self.src) while True: chunk = list(islice(files_source, chunk_size)) @@ -119,7 +119,6 @@ class B2SyncManager(object): row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone() - if not row or row['mtime'] < f.mtime: print("Uploading:", f.rel_path) @@ -129,15 +128,15 @@ class B2SyncManager(object): print("Failed:", f.rel_path) print("Unexpected error:", sys.exc_info()[0]) raise - #print("Ok: ", f.rel_path) + # print("Ok: ", f.rel_path) # The file was uploaded, commit it to the db c.execute("REPLACE INTO 'files' VALUES(?, ?, ?, ?);", (f.rel_path, f.mtime, f.size, 1)) - #print("Done: ", f.rel_path) + # print("Done: ", f.rel_path) else: c.execute("UPDATE 'files' SET seen=1 WHERE `path` = ?;", (f.rel_path,)).fetchone() - #print("Skipping:", f.rel_path) + # print("Skipping:", f.rel_path) c.close() @@ -175,6 +174,7 @@ class LocalProvider(Provider): Iterates files on local disk """ max_chunk_size = 8*1024*1024 + def __init__(self, local_path): super(LocalProvider, self).__init__() self.local_path = local_path @@ -190,7 +190,7 @@ class LocalProvider(Provider): relative_path, os.path.getsize(file_abs_path), int(os.path.getmtime(file_abs_path)), - #open(file_abs_path, 'rb') + # open(file_abs_path, 'rb') ) else: self.current_set = self.walker.__next__() @@ -205,8 +205,8 @@ class B2Provider(Provider): """ def __init__(self, accountId, appKey, bucketId, bucketBasePath): - super(B2Provider, self).__init__() - raise NotImplemented() + super(B2Provider, self).__init__() + raise NotImplemented() class Reciever(object): @@ -219,6 +219,7 @@ class Reciever(object): def purge_file(self, file_path): raise NotImplemented() + class B2Reciever(Reciever): max_chunk_size = 256*1024 @@ -235,7 +236,6 @@ class B2Reciever(Reciever): self.bucket = self.api.get_bucket_by_name(self.bucket_name) def put_file(self, file_info): - #print(">>> {}".format(file_info.abs_path)) dest_path = os.path.join(self.path, file_info.rel_path).lstrip('/') self.bucket.upload_local_file( file_info.abs_path, @@ -246,7 +246,6 @@ class B2Reciever(Reciever): dest_path = os.path.join(self.path, file_path).lstrip('/') self.delete_by_path(dest_path) - def delete_by_path(self, file_path): for f in self.bucket.list_file_versions(start_filename=file_path, max_entries=100)["files"]: if f["fileName"] == file_path: @@ -262,12 +261,12 @@ def sync(source_uri, dest_uri, account_id, app_key): source_provider = None dest_receiver = None - if source.scheme == '': # Plain file URI + if source.scheme == '': # Plain file URI source_provider = LocalProvider(source.path) else: raise Exception("Sources other than local file paths not supported") - if dest.scheme == 'b2': # Plain file URI + if dest.scheme == 'b2': # Plain file URI dest_receiver = B2Reciever(bucket=dest.netloc, path=dest.path, account_id=account_id, app_key=app_key) else: raise Exception("Dests other than B2 URIs not yet supported")