diff --git a/b2mirror/mirror.py b/b2mirror/mirror.py index c47d89e..88015a4 100644 --- a/b2mirror/mirror.py +++ b/b2mirror/mirror.py @@ -5,8 +5,15 @@ from urllib.parse import urlparse from collections import namedtuple from b2.api import B2Api import sys - +from itertools import islice, filterfalse from concurrent.futures import ThreadPoolExecutor, Future +import logging +import re + + +#logging.basicConfig(level=logging.INFO) + + """ How it works: @@ -21,16 +28,19 @@ Dest will upload the file, and inform the manager it was completed """ class B2SyncManager(object): - threads = 5 + workers = 10 def __init__(self, source_module, dest_module): self.src = source_module self.dest = dest_module - self.db = sqlite3.connect('./sync.db') + self.db = sqlite3.connect('./sync.db', check_same_thread=False) self.db.row_factory = B2SyncManager.dict_factory self.db.isolation_level = None # TBD - does it hurt perf? + self.exclude_res = [ + re.compile(r'.*\.(DS_Store|pyc)$') + ] self._init_db() - + @staticmethod def dict_factory(cursor, row): d = {} @@ -51,9 +61,10 @@ class B2SyncManager(object): tables = { "files": """ CREATE TABLE `files` ( - `path` varchar(1024), + `path` varchar(1024) PRIMARY KEY, `mtime` INTEGER, - `size` INTEGER + `size` INTEGER, + `seen` BOOLEAN );""" } @@ -65,29 +76,63 @@ class B2SyncManager(object): def sync(self): print("Syncing from {} to {}".format(self.src, self.dest)) - for f in self.src: - c = self.db.cursor() - - row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone() - - if not row or row['mtime'] < f.mtime: - print("Uploading: ", f.rel_path, end='') - sys.stdout.flush() + print(i for i in self.src) + chunk_size = 1000 + while True: + chunk = list( + filterfalse( # if rel_path matches any of the REs, the filter is True and the file is skipped + lambda x: any([pattern.match(x.rel_path) for pattern in self.exclude_res]), + islice(self.src, chunk_size) + ) + ) + + for item in chunk: + assert len(item.rel_path) < 512 + + if len(chunk) == 0: + break + + with ThreadPoolExecutor(max_workers=B2SyncManager.workers) as executor: + upload_futures = [executor.submit(self.xfer_file, item) for item in chunk] + #print("Queued {} tasks".format(len(chunk))) + + for i in upload_futures: + assert i.result() + + def canskip(self, f): + if f.rel_path.endswith('.DS_Store'): + return True + else: + return False + + def xfer_file(self, f): + c = self.db.cursor() + + row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone() + + if self.canskip(f) or not row or row['mtime'] < f.mtime: + + print("Starting:", f.rel_path) + try: self.dest.put_file(f) - f.fp.close() + except: + print("Failed:", f.rel_path) + print("Unexpected error:", sys.exc_info()[0]) + raise + #print("Ok: ", f.rel_path) + #f.fp.close() + # The file was uploaded, commit it to the db + c.execute("REPLACE INTO 'files' VALUES(?, ?, ?, ?);", (f.rel_path, f.mtime, f.size, 1)) + #print("Done: ", f.rel_path) - print(" ok") + else: + print("Skipping:", f.rel_path) - # The file was uploaded, commit it to the db + c.close() - c.execute("INSERT INTO 'files' VALUES(?, ?, ?);", (f.rel_path, f.mtime, f.size,)) - - else: - print("Skipping: ", f.rel_path) - - c.close() + return True class Provider(object): @@ -97,7 +142,7 @@ class Provider(object): def __init__(self): pass -FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', 'fp']) +FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', ]) # 'fp' class LocalProvider(Provider): """ @@ -108,9 +153,9 @@ class LocalProvider(Provider): super(LocalProvider, self).__init__() self.local_path = local_path self.current_set = (None, [], []) + self.walker = os.walk(self.local_path) def __iter__(self): - self.walker = os.walk(self.local_path) return self def __next__(self): @@ -122,7 +167,7 @@ class LocalProvider(Provider): relative_path, os.path.getsize(file_abs_path), int(os.path.getmtime(file_abs_path)), - open(file_abs_path, 'rb') + #open(file_abs_path, 'rb') ) else: self.current_set = self.walker.__next__() @@ -157,7 +202,7 @@ class B2Reciever(Reciever): self.account_id = account_id self.app_key = app_key - self.api = B2Api() + self.api = B2Api(max_upload_workers=B2SyncManager.workers) self.api.authorize_account('production', self.account_id, self.app_key) self.bucket = self.api.get_bucket_by_name(self.bucket) @@ -172,7 +217,7 @@ class B2Reciever(Reciever): def sync(source_uri, dest_uri, account_id, app_key): source = urlparse(source_uri) dest = urlparse(dest_uri) - + syncer = B2SyncManager(source_uri, dest_uri) source_provider = None @@ -192,6 +237,6 @@ def sync(source_uri, dest_uri, account_id, app_key): assert source_provider is not None assert dest_receiver is not None - + syncer = B2SyncManager(source_provider, dest_receiver) - syncer.sync() \ No newline at end of file + syncer.sync()