diff --git a/b2mirror/mirror.py b/b2mirror/mirror.py index 42ce82a..701a0c3 100644 --- a/b2mirror/mirror.py +++ b/b2mirror/mirror.py @@ -7,19 +7,20 @@ from b2.api import B2Api import sys from itertools import islice, filterfalse from concurrent.futures import ThreadPoolExecutor, Future -import logging import re - +#import logging #logging.basicConfig(level=logging.INFO) +FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', ]) # 'fp' + """ How it works: B2SyncManager manages the transfer -It holes a src and dest object, src objects provide an iterable of FileInfos. +It holds a src and dest object, src objects provide an iterable of FileInfos. The manager will iterate the set of FileInfos, and pass each to the dest @@ -30,7 +31,7 @@ class B2SyncManager(object): workers = 10 - def __init__(self, source_module, dest_module): + def __init__(self, source_module, dest_module, exclude_res=None): self.src = source_module self.dest = dest_module self.db = sqlite3.connect('./sync.db', check_same_thread=False) @@ -38,7 +39,7 @@ class B2SyncManager(object): self.db.isolation_level = None # TBD - does it hurt perf? self.exclude_res = [ re.compile(r'.*\.(DS_Store|pyc|dropbox)$') - ] + ] + (exclude_res if exclude_res else []) self._init_db() @staticmethod @@ -83,6 +84,13 @@ class B2SyncManager(object): def sync_up(self): print("Syncing from {} to {}".format(self.src, self.dest)) + # Mark all files as unseen + # Files will be marked as seen as they are processed + # Later, unseen files will be purged + c = self.db.cursor() + row = c.execute("UPDATE 'files' SET seen=0;") + c.close() + chunk_size = 1000 files_source = filterfalse( # if rel_path matches any of the REs, the filter is True and the file is skipped @@ -102,25 +110,19 @@ class B2SyncManager(object): with ThreadPoolExecutor(max_workers=B2SyncManager.workers) as executor: upload_futures = [executor.submit(self.xfer_file, item) for item in chunk] - #print("Queued {} tasks".format(len(chunk))) for i in upload_futures: assert i.result() - def canskip(self, f): - if f.rel_path.endswith('.DS_Store'): - return True - else: - return False - def xfer_file(self, f): c = self.db.cursor() row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone() - if self.canskip(f) or not row or row['mtime'] < f.mtime: - print("Starting:", f.rel_path) + if not row or row['mtime'] < f.mtime: + + print("Uploading:", f.rel_path) try: self.dest.put_file(f) except: @@ -128,18 +130,31 @@ class B2SyncManager(object): print("Unexpected error:", sys.exc_info()[0]) raise #print("Ok: ", f.rel_path) - #f.fp.close() + # The file was uploaded, commit it to the db c.execute("REPLACE INTO 'files' VALUES(?, ?, ?, ?);", (f.rel_path, f.mtime, f.size, 1)) #print("Done: ", f.rel_path) else: - print("Skipping:", f.rel_path) + c.execute("UPDATE 'files' SET seen=1 WHERE `path` = ?;", (f.rel_path,)).fetchone() + #print("Skipping:", f.rel_path) c.close() return True + def purge_remote(self): + c = self.db.cursor() + c_del = self.db.cursor() + + for purge_file in c.execute("SELECT * FROM 'files' WHERE seen=0;"): + print("Delete on remote: ", purge_file["path"]) + self.dest.purge_file(purge_file["path"]) + c_del.execute("DELETE FROM 'files' WHERE path=?;", (purge_file["path"],)) + + c_del.close() + c.close() + class Provider(object): """ @@ -148,7 +163,12 @@ class Provider(object): def __init__(self): pass -FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', ]) # 'fp' + def __iter__(self): + return self + + def __next__(self): + raise NotImplemented() + class LocalProvider(Provider): """ @@ -161,9 +181,6 @@ class LocalProvider(Provider): self.current_set = (None, [], []) self.walker = os.walk(self.local_path) - def __iter__(self): - return self - def __next__(self): if len(self.current_set[2]) > 0: file_abs_path = os.path.join(self.current_set[0], self.current_set[2].pop()) @@ -196,6 +213,11 @@ class Reciever(object): """ Base class for destinations """ + def put_file(self, file_info): + raise NotImplemented() + + def purge_file(self, file_path): + raise NotImplemented() class B2Reciever(Reciever): @@ -203,14 +225,14 @@ class B2Reciever(Reciever): def __init__(self, bucket, path, account_id, app_key): super(B2Reciever, self).__init__() - self.bucket = bucket + self.bucket_name = bucket self.path = path self.account_id = account_id self.app_key = app_key self.api = B2Api(max_upload_workers=B2SyncManager.workers) self.api.authorize_account('production', self.account_id, self.app_key) - self.bucket = self.api.get_bucket_by_name(self.bucket) + self.bucket = self.api.get_bucket_by_name(self.bucket_name) def put_file(self, file_info): #print(">>> {}".format(file_info.abs_path)) @@ -220,12 +242,23 @@ class B2Reciever(Reciever): dest_path ) + def purge_file(self, file_path): + dest_path = os.path.join(self.path, file_path).lstrip('/') + self.delete_by_path(dest_path) + + + def delete_by_path(self, file_path): + for f in self.bucket.list_file_versions(start_filename=file_path, max_entries=100)["files"]: + if f["fileName"] == file_path: + self.api.delete_file_version(f["fileId"], f["fileName"]) + else: + return + + def sync(source_uri, dest_uri, account_id, app_key): source = urlparse(source_uri) dest = urlparse(dest_uri) - syncer = B2SyncManager(source_uri, dest_uri) - source_provider = None dest_receiver = None @@ -239,8 +272,6 @@ def sync(source_uri, dest_uri, account_id, app_key): else: raise Exception("Dests other than B2 URIs not yet supported") - - assert source_provider is not None assert dest_receiver is not None