From 37b9372f0e58656163a051f2e3c6cc21d0ab6b6e Mon Sep 17 00:00:00 2001 From: Dave Pedu Date: Fri, 10 Jun 2016 13:06:38 -0700 Subject: [PATCH] Add comments and split stuff out --- b2mirror/b2plugin.py | 68 ++++++++++++++++ b2mirror/base.py | 23 ++++++ b2mirror/common.py | 14 ++++ b2mirror/localplugin.py | 41 ++++++++++ b2mirror/mirror.py | 172 ++++++++++++---------------------------- bin/b2mirror | 11 ++- 6 files changed, 206 insertions(+), 123 deletions(-) create mode 100644 b2mirror/b2plugin.py create mode 100644 b2mirror/base.py create mode 100644 b2mirror/common.py create mode 100644 b2mirror/localplugin.py diff --git a/b2mirror/b2plugin.py b/b2mirror/b2plugin.py new file mode 100644 index 0000000..2e428f2 --- /dev/null +++ b/b2mirror/b2plugin.py @@ -0,0 +1,68 @@ +import os + +from b2.api import B2Api + +from b2mirror.base import Provider, Reciever +from b2mirror.common import Result, results_ok + +class B2Provider(Provider): + """ + Iterates files in bucket + """ + + def __init__(self, accountId, appKey, bucketId, bucketBasePath): + super(B2Provider, self).__init__() + raise NotImplemented() + + +class B2Reciever(Reciever): + + max_chunk_size = 256*1024 + + def __init__(self, bucket, path, account_id, app_key, workers=10): + super(B2Reciever, self).__init__() + self.bucket_name = bucket + self.path = path + self.account_id = account_id + self.app_key = app_key + + self.api = B2Api(max_upload_workers=workers) + self.api.authorize_account('production', self.account_id, self.app_key) + self.bucket = self.api.get_bucket_by_name(self.bucket_name) + + def put_file(self, file_info, purge_historics=False): + dest_path = os.path.join(self.path, file_info.rel_path).lstrip('/') + upload_result = self.bucket.upload_local_file( + file_info.abs_path, + dest_path + ) + + if purge_historics: + self.delete_by_path(dest_path, skip=1) + + return Result.ok + + def purge_file(self, file_path): + """ + Remove a file and all historical copies from the bucket + :param file_path: File path relative to the source tree to delete. This should NOT include self.path + """ + dest_path = os.path.join(self.path, file_path).lstrip('/') + self.delete_by_path(dest_path) + + def delete_by_path(self, file_path, skip=0, max_entries=100): + """ + List all versions of a file and delete some or all of them + :param file_path: Bucket path to delete + :param skip: How many files to skip before starting deletion. 5 means keep 5 historical copies. Using a value + of 0 will delete a file and all it's revisions + :param max_entries: + """ + for f in self.bucket.list_file_versions(start_filename=file_path, max_entries=max_entries)["files"]: + if f["fileName"] == file_path: + if skip == 0: + self.api.delete_file_version(f["fileId"], f["fileName"]) + else: + skip-=1 + else: + return diff --git a/b2mirror/base.py b/b2mirror/base.py new file mode 100644 index 0000000..c586f87 --- /dev/null +++ b/b2mirror/base.py @@ -0,0 +1,23 @@ +class Provider(object): + """ + Base class file queue iterable + """ + def __init__(self): + pass + + def __iter__(self): + return self + + def __next__(self): + raise NotImplemented() + + +class Reciever(object): + """ + Base class for destinations + """ + def put_file(self, file_info, purge_historics=False): + raise NotImplemented() + + def purge_file(self, file_path): + raise NotImplemented() diff --git a/b2mirror/common.py b/b2mirror/common.py new file mode 100644 index 0000000..097dbf2 --- /dev/null +++ b/b2mirror/common.py @@ -0,0 +1,14 @@ +from enum import Enum +from collections import namedtuple + + +FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', ]) # 'fp' + + +class Result(Enum): + failed = 0 + ok = 1 + skipped = 2 + + +results_ok = [Result.ok, Result.skipped] diff --git a/b2mirror/localplugin.py b/b2mirror/localplugin.py new file mode 100644 index 0000000..3bb75e2 --- /dev/null +++ b/b2mirror/localplugin.py @@ -0,0 +1,41 @@ +import os + +from b2mirror.base import Provider, Reciever +from b2mirror.common import FileInfo + + + + +class LocalProvider(Provider): + """ + Iterates files on local disk + """ + max_chunk_size = 8*1024*1024 + + def __init__(self, local_path): + super(LocalProvider, self).__init__() + self.local_path = local_path + self.current_set = (None, [], []) + self.walker = os.walk(self.local_path) + + def __next__(self): + if len(self.current_set[2]) > 0: + file_abs_path = os.path.join(self.current_set[0], self.current_set[2].pop()) + relative_path = file_abs_path[len(self.local_path):] + return FileInfo( + file_abs_path, + relative_path, + os.path.getsize(file_abs_path), + int(os.path.getmtime(file_abs_path)), + # open(file_abs_path, 'rb') + ) + else: + self.current_set = self.walker.__next__() + self.current_set[1].sort(reverse=True) + self.current_set[2].sort(reverse=True) + return self.__next__() + + +class LocalReciever(Reciever): + def __init__(self): + raise NotImplemented() diff --git a/b2mirror/mirror.py b/b2mirror/mirror.py index e6e926c..d022049 100644 --- a/b2mirror/mirror.py +++ b/b2mirror/mirror.py @@ -1,18 +1,17 @@ -import os +import re +import sys import sqlite3 from urllib.parse import urlparse -from collections import namedtuple -from b2.api import B2Api -import sys from itertools import islice, filterfalse from concurrent.futures import ThreadPoolExecutor -import re + +from b2mirror.localplugin import LocalProvider +from b2mirror.b2plugin import B2Reciever +from b2mirror.common import Result, results_ok # import logging # logging.basicConfig(level=logging.INFO) -FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', ]) # 'fp' - """ How it works: @@ -30,17 +29,25 @@ Dest will upload the file, and inform the manager it was completed class B2SyncManager(object): - workers = 15 - - def __init__(self, source_module, dest_module, exclude_res=None): + def __init__(self, source_module, dest_module, exclude_res=None, workers=10): + """ + :param source_module: subclass instance of b2mirror.base.Provider acting as a file source + :param dest_module: subclass of b2mirror.base.Receiver acting as a file destination + :param exclude_res: compiled regular expression objects that file paths will be matched against. Finding a match + means skip the file (and delete on the remote). + :param workers: Number of parallel transfers + """ self.src = source_module self.dest = dest_module self.db = sqlite3.connect('./sync.db', check_same_thread=False) self.db.row_factory = B2SyncManager.dict_factory self.db.isolation_level = None # TBD - does it hurt perf? self.exclude_res = [ - re.compile(r'.*\.(DS_Store|pyc|dropbox)$') + re.compile(r'.*\.(DS_Store|pyc|dropbox)$'), + re.compile(r'.+__pycache__.+'), + re.compile(r'.+.dropbox\.cache.+') ] + (exclude_res if exclude_res else []) + self.workers = workers self._init_db() @staticmethod @@ -51,6 +58,9 @@ class B2SyncManager(object): return d def _init_db(self): + """ + Init the sqlite databsae. Creates missing tables. + """ c = self.db.cursor() def table_exists(table_name): @@ -77,13 +87,20 @@ class B2SyncManager(object): c.close() def sync(self): + """ + Sync the source to the dest. First uploads new local files, then cleans dead files from the remote. + """ # Phase 1 - Upload all local files missing on the remote self.sync_up() # Phase 2 - Delete files on the remote missing locally self.purge_remote() def sync_up(self): - print("Syncing from {} to {}".format(self.src, self.dest)) + """ + Sync local files to the remote. All files in the DB will be marked as unseen. When a file is found locally it is + again marked as seen. This state later used to clear deleted files from the destination + """ + #print("Syncing from {} to {}".format(self.src, self.dest)) # Mark all files as unseen # Files will be marked as seen as they are processed @@ -94,9 +111,8 @@ class B2SyncManager(object): chunk_size = 1000 - files_source = filterfalse( # if rel_path matches any of the REs, the filter is True and the file is skipped - lambda x: any([pattern.match(x.rel_path) for pattern in self.exclude_res]), - self.src) + # if rel_path matches any of the REs, the filter is True and the file is skipped + files_source = filterfalse(lambda x: any([pattern.match(x.rel_path) for pattern in self.exclude_res]), self.src) while True: chunk = list(islice(files_source, chunk_size)) @@ -108,13 +124,19 @@ class B2SyncManager(object): if len(chunk) == 0: break - with ThreadPoolExecutor(max_workers=B2SyncManager.workers) as executor: + with ThreadPoolExecutor(max_workers=self.workers) as executor: upload_futures = [executor.submit(self.xfer_file, item) for item in chunk] for i in upload_futures: - assert i.result() + assert i.result() in results_ok def xfer_file(self, f): + """ + Future-called function that handles a single file. The file's modification time is checked against the database + to see if the file has new content that should be uploaded or is untouched since the last sync + """ + result = Result.failed + c = self.db.cursor() row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone() @@ -123,7 +145,7 @@ class B2SyncManager(object): print("Uploading:", f.rel_path) try: - self.dest.put_file(f) + result = self.dest.put_file(f, purge_historics=row is not None) except: print("Failed:", f.rel_path) print("Unexpected error:", sys.exc_info()[0]) @@ -136,13 +158,17 @@ class B2SyncManager(object): else: c.execute("UPDATE 'files' SET seen=1 WHERE `path` = ?;", (f.rel_path,)).fetchone() - # print("Skipping:", f.rel_path) + #print("Skipping:", f.rel_path) + result = Result.skipped c.close() - return True + return result def purge_remote(self): + """ + Delete files on the remote that were not found when scanning the local tree. + """ c = self.db.cursor() c_del = self.db.cursor() @@ -155,106 +181,7 @@ class B2SyncManager(object): c.close() -class Provider(object): - """ - Base class file queue iterable - """ - def __init__(self): - pass - - def __iter__(self): - return self - - def __next__(self): - raise NotImplemented() - - -class LocalProvider(Provider): - """ - Iterates files on local disk - """ - max_chunk_size = 8*1024*1024 - - def __init__(self, local_path): - super(LocalProvider, self).__init__() - self.local_path = local_path - self.current_set = (None, [], []) - self.walker = os.walk(self.local_path) - - def __next__(self): - if len(self.current_set[2]) > 0: - file_abs_path = os.path.join(self.current_set[0], self.current_set[2].pop()) - relative_path = file_abs_path[len(self.local_path):] - return FileInfo( - file_abs_path, - relative_path, - os.path.getsize(file_abs_path), - int(os.path.getmtime(file_abs_path)), - # open(file_abs_path, 'rb') - ) - else: - self.current_set = self.walker.__next__() - self.current_set[1].sort(reverse=True) - self.current_set[2].sort(reverse=True) - return self.__next__() - - -class B2Provider(Provider): - """ - Iterates files in bucket - """ - - def __init__(self, accountId, appKey, bucketId, bucketBasePath): - super(B2Provider, self).__init__() - raise NotImplemented() - - -class Reciever(object): - """ - Base class for destinations - """ - def put_file(self, file_info): - raise NotImplemented() - - def purge_file(self, file_path): - raise NotImplemented() - - -class B2Reciever(Reciever): - - max_chunk_size = 256*1024 - - def __init__(self, bucket, path, account_id, app_key): - super(B2Reciever, self).__init__() - self.bucket_name = bucket - self.path = path - self.account_id = account_id - self.app_key = app_key - - self.api = B2Api(max_upload_workers=B2SyncManager.workers) - self.api.authorize_account('production', self.account_id, self.app_key) - self.bucket = self.api.get_bucket_by_name(self.bucket_name) - - def put_file(self, file_info): - dest_path = os.path.join(self.path, file_info.rel_path).lstrip('/') - self.bucket.upload_local_file( - file_info.abs_path, - dest_path - ) - - def purge_file(self, file_path): - dest_path = os.path.join(self.path, file_path).lstrip('/') - self.delete_by_path(dest_path) - - def delete_by_path(self, file_path): - for f in self.bucket.list_file_versions(start_filename=file_path, max_entries=100)["files"]: - if f["fileName"] == file_path: - self.api.delete_file_version(f["fileId"], f["fileName"]) - else: - return - - -def sync(source_uri, dest_uri, account_id, app_key): +def sync(source_uri, dest_uri, account_id, app_key, workers=10, exclude=[]): source = urlparse(source_uri) dest = urlparse(dest_uri) @@ -267,12 +194,13 @@ def sync(source_uri, dest_uri, account_id, app_key): raise Exception("Sources other than local file paths not supported") if dest.scheme == 'b2': # Plain file URI - dest_receiver = B2Reciever(bucket=dest.netloc, path=dest.path, account_id=account_id, app_key=app_key) + dest_receiver = B2Reciever(bucket=dest.netloc, path=dest.path, account_id=account_id, app_key=app_key, + workers=workers) else: raise Exception("Dests other than B2 URIs not yet supported") assert source_provider is not None assert dest_receiver is not None - syncer = B2SyncManager(source_provider, dest_receiver) + syncer = B2SyncManager(source_provider, dest_receiver, workers=workers, exclude_res=exclude) syncer.sync() diff --git a/bin/b2mirror b/bin/b2mirror index 66b5143..9017827 100755 --- a/bin/b2mirror +++ b/bin/b2mirror @@ -1,20 +1,29 @@ #!/usr/bin/env python3 import argparse +import re + from b2mirror import mirror + def main(): parser = argparse.ArgumentParser(description="Sync data to/from B2") + parser.add_argument("-w", "--workers", help="Maximum parallel uploads", type=int, default=10) + parser.add_argument("-s", "--source", required=True, help="Source URI") parser.add_argument("-d", "--dest", required=True, help="Dest URI") parser.add_argument("-a", "--account-id", required=True, help="Backblaze account ID") parser.add_argument("-k", "--app-key", required=True, help="Backblaze application key") + parser.add_argument("--exclude", nargs="+", help="Regexes to exclude from transfer") + args = parser.parse_args() - mirror.sync(args.source, args.dest, args.account_id, args.app_key) + ignore_res = [re.compile(i) for i in args.exclude] + + mirror.sync(args.source, args.dest, args.account_id, args.app_key, workers=args.workers, exclude=ignore_res) if __name__ == '__main__': main()