diff --git a/b2mirror/b2plugin.py b/b2mirror/b2plugin.py index 2e428f2..82acd1f 100644 --- a/b2mirror/b2plugin.py +++ b/b2mirror/b2plugin.py @@ -1,9 +1,16 @@ import os +import sys +import logging from b2.api import B2Api from b2mirror.base import Provider, Reciever -from b2mirror.common import Result, results_ok +from b2mirror.common import Result, FileInfo +from b2 import exception as b2exception +from b2.download_dest import DownloadDestLocalFile +import sqlite3 +from contextlib import closing + class B2Provider(Provider): """ @@ -17,12 +24,13 @@ class B2Provider(Provider): class B2Reciever(Reciever): - max_chunk_size = 256*1024 + max_chunk_size = 256 * 1024 - def __init__(self, bucket, path, account_id, app_key, workers=10): + def __init__(self, bucket, path, account_id, app_key, workers=10, compare_method='mtime'): super(B2Reciever, self).__init__() + self.log = logging.getLogger("B2Reciever") self.bucket_name = bucket - self.path = path + self.path = path.lstrip('/') self.account_id = account_id self.app_key = app_key @@ -30,18 +38,153 @@ class B2Reciever(Reciever): self.api.authorize_account('production', self.account_id, self.app_key) self.bucket = self.api.get_bucket_by_name(self.bucket_name) + self.db = None + self._db_setup() + + # The receiver is responsible to determining if a file needs to be uploaded or not + self.should_transfer = { + "mtime": self._should_transfer_mtime, + "size": self._should_transfer_size + }[compare_method] + + def _db_setup(self, db_path=None): + """ + This plugin uses a sqlite database to track the contents of what is on the remote B2 bucket. Why? It's simply + faster than using B2's quite limited API to perform the same action. The sqlite DB is stored on the bucket. + This method: + - Downloads the DB + - if none present, creates a new db file + - Initializes/updates tables in the db + """ + if not db_path: + db_path = '/tmp/b2mirror.{}.db'.format(os.getpid()) + self.db_path = db_path + + fetch_success = self._fetch_remote_db(db_path) + self._open_db() + + if not fetch_success: + # no db was downloaded and the handle above is empty. initialize it. + self._init_db_contents() + logging.info("Initialized database") + + # Mark all files as unseen + # Files will be marked as seen as they are processed + # Later, unseen files will be purged + with closing(self.db.cursor()) as c: + c.execute("UPDATE 'files' SET seen=0;") + + def _open_db(self): + self.db = sqlite3.connect(self.db_path, check_same_thread=False, isolation_level=None) + self.db.row_factory = sqlite3.Row + + def _init_db_contents(self): + """ + Init the sqlite database. Creates missing tables. + """ + def table_exists(table_name): + c.execute("SELECT * FROM SQLITE_MASTER WHERE `type`='table' AND `name`=?", (table_name,)) + tables = c.fetchall() + if len(tables) == 0: + return False + return True + + tables = { + "files": """ + CREATE TABLE `files` ( + `path` varchar(4096) PRIMARY KEY, + `mtime` INTEGER, + `size` INTEGER, + `seen` BOOLEAN + );""" + } + + with closing(self.db.cursor()) as c: + for table_name, table_create_query in tables.items(): + if not table_exists(table_name): + c.execute(table_create_query) + + def _fetch_remote_db(self, db_path): + db_bucket_path = os.path.join(self.path, ".b2mirror.db") + self.log.info("Fetching tracking db from bucket ({}) to {}".format(db_bucket_path, db_path)) + try: + self.bucket.download_file_by_name(db_bucket_path, DownloadDestLocalFile(db_path)) + except b2exception.UnknownError as e: + if '404 not_found' in e.message: + return False + else: + raise + return True + + def teardown(self): + """ + Place the DB file back onto the remote + """ + self.db.close() + sqlite_finfo = FileInfo(self.db_path, + ".b2mirror.db", + os.path.getsize(self.db_path), + int(os.path.getmtime(self.db_path))) + self.put_file(sqlite_finfo, purge_historics=True) + os.unlink(self.db_path) + + def _should_transfer_mtime(self, row, f): + return not row or row['mtime'] < f.mtime + + def _should_transfer_size(self, row, f): + return not row or row['size'] != f.size + + def xfer_file(self, f): + """ + Future-called function that handles a single file. The file's modification time is checked against the database + to see if the file has new content that should be uploaded or is untouched since the last sync + """ + result = Result.failed + + with closing(self.db.cursor()) as c: + + row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone() + if self.should_transfer(row, f): + + print("Uploading:", f.rel_path) + try: + # upload the file. if a row existed it means there may be historic copies of the file already there + result = self.put_file(f, purge_historics=row is not None) + except: + print("Failed:", f.rel_path) + print("Unexpected error:", sys.exc_info()[0]) + raise + + # The file was uploaded, commit it to the db + c.execute("REPLACE INTO 'files' VALUES(?, ?, ?, ?);", (f.rel_path, f.mtime, f.size, 1)) + + else: + c.execute("UPDATE 'files' SET seen=1 WHERE `path` = ?;", (f.rel_path,)).fetchone() + result = Result.skipped + + return result + def put_file(self, file_info, purge_historics=False): dest_path = os.path.join(self.path, file_info.rel_path).lstrip('/') - upload_result = self.bucket.upload_local_file( - file_info.abs_path, - dest_path - ) - + upload_result = self.bucket.upload_local_file(file_info.abs_path, dest_path) # NOQA if purge_historics: self.delete_by_path(dest_path, skip=1) return Result.ok + def purge(self): + """ + Delete files on the remote that were not found when scanning the local tree. This assumes an upload phase has + already been doing using ***THIS B2Reciever INSTANCE***. + """ + with closing(self.db.cursor()) as c: + with closing(self.db.cursor()) as c_del: + + for purge_file in c.execute("SELECT * FROM 'files' WHERE seen=0;"): + print("Delete on remote: ", purge_file["path"]) + self.purge_file(purge_file["path"]) + c_del.execute("DELETE FROM 'files' WHERE path=?;", (purge_file["path"],)) + def purge_file(self, file_path): """ Remove a file and all historical copies from the bucket @@ -63,6 +206,6 @@ class B2Reciever(Reciever): if skip == 0: self.api.delete_file_version(f["fileId"], f["fileName"]) else: - skip-=1 + skip -= 1 else: return diff --git a/b2mirror/base.py b/b2mirror/base.py index c586f87..847077d 100644 --- a/b2mirror/base.py +++ b/b2mirror/base.py @@ -11,6 +11,9 @@ class Provider(object): def __next__(self): raise NotImplemented() + def teardown(self): + pass + class Reciever(object): """ @@ -21,3 +24,6 @@ class Reciever(object): def purge_file(self, file_path): raise NotImplemented() + + def teardown(self): + pass diff --git a/b2mirror/localplugin.py b/b2mirror/localplugin.py index 3bb75e2..4db86f7 100644 --- a/b2mirror/localplugin.py +++ b/b2mirror/localplugin.py @@ -4,13 +4,11 @@ from b2mirror.base import Provider, Reciever from b2mirror.common import FileInfo - - class LocalProvider(Provider): """ Iterates files on local disk """ - max_chunk_size = 8*1024*1024 + max_chunk_size = 8 * 1024 * 1024 def __init__(self, local_path): super(LocalProvider, self).__init__() diff --git a/b2mirror/mirror.py b/b2mirror/mirror.py index 43887ca..898ebbc 100644 --- a/b2mirror/mirror.py +++ b/b2mirror/mirror.py @@ -1,16 +1,13 @@ import re -import sys -import sqlite3 from urllib.parse import urlparse from itertools import islice, filterfalse from concurrent.futures import ThreadPoolExecutor from b2mirror.localplugin import LocalProvider from b2mirror.b2plugin import B2Reciever -from b2mirror.common import Result, results_ok +from b2mirror.common import results_ok -# import logging -# logging.basicConfig(level=logging.INFO) +import logging """ How it works: @@ -28,7 +25,7 @@ Dest will upload the file, and inform the manager it was completed class B2SyncManager(object): - def __init__(self, source_module, dest_module, exclude_res=None, workers=10, compare_method="mtime"): + def __init__(self, source_module, dest_module, exclude_res=None, workers=10): """ :param source_module: subclass instance of b2mirror.base.Provider acting as a file source :param dest_module: subclass of b2mirror.base.Receiver acting as a file destination @@ -36,60 +33,18 @@ class B2SyncManager(object): means skip the file (and delete on the remote). :param workers: Number of parallel transfers """ + self.log = logging.getLogger("B2SyncManager") self.src = source_module self.dest = dest_module - self.db = sqlite3.connect('./sync.db', check_same_thread=False) - self.db.row_factory = B2SyncManager.dict_factory - self.db.isolation_level = None # TBD - does it hurt perf? + self.exclude_res = [ re.compile(r'.*\.(DS_Store|pyc|dropbox)$'), re.compile(r'.*__pycache__.*'), re.compile(r'.*\.dropbox\.cache.*'), re.compile(r'.*\.AppleDouble.*') ] + (exclude_res if exclude_res else []) + self.workers = workers - self._init_db() - - self.should_transfer = { - "mtime": self._should_transfer_mtime, - "size": self._should_transfer_size - }[compare_method] - - @staticmethod - def dict_factory(cursor, row): - d = {} - for idx, col in enumerate(cursor.description): - d[col[0]] = row[idx] - return d - - def _init_db(self): - """ - Init the sqlite databsae. Creates missing tables. - """ - c = self.db.cursor() - - def table_exists(table_name): - c.execute("SELECT * FROM SQLITE_MASTER WHERE `type`='table' AND `name`=?", (table_name,)) - tables = c.fetchall() - if len(tables) == 0: - return False - return True - - tables = { - "files": """ - CREATE TABLE `files` ( - `path` varchar(4096) PRIMARY KEY, - `mtime` INTEGER, - `size` INTEGER, - `seen` BOOLEAN - );""" - } - - for table_name, table_create_query in tables.items(): - if not table_exists(table_name): - c.execute(table_create_query) - - c.close() def sync(self): """ @@ -99,21 +54,15 @@ class B2SyncManager(object): self.sync_up() # Phase 2 - Delete files on the remote missing locally self.purge_remote() + # Phase 3 - Tear down the src/dest modules + self.src.teardown() + self.dest.teardown() def sync_up(self): """ Sync local files to the remote. All files in the DB will be marked as unseen. When a file is found locally it is again marked as seen. This state later used to clear deleted files from the destination """ - #print("Syncing from {} to {}".format(self.src, self.dest)) - - # Mark all files as unseen - # Files will be marked as seen as they are processed - # Later, unseen files will be purged - c = self.db.cursor() - c.execute("UPDATE 'files' SET seen=0;") - c.close() - chunk_size = 1000 # if rel_path matches any of the REs, the filter is True and the file is skipped @@ -124,71 +73,23 @@ class B2SyncManager(object): for item in chunk: # long path names can't be put in sqlite - assert len(item.rel_path) < 512 + assert len(item.rel_path) < 1024 if len(chunk) == 0: break with ThreadPoolExecutor(max_workers=self.workers) as executor: - upload_futures = [executor.submit(self.xfer_file, item) for item in chunk] + upload_futures = [executor.submit(self.dest.xfer_file, item) for item in chunk] for i in upload_futures: assert i.result() in results_ok - def xfer_file(self, f): - """ - Future-called function that handles a single file. The file's modification time is checked against the database - to see if the file has new content that should be uploaded or is untouched since the last sync - """ - - result = Result.failed - - c = self.db.cursor() - - row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone() - - if self.should_transfer(row, f): - - print("Uploading:", f.rel_path) - try: - result = self.dest.put_file(f, purge_historics=row is not None) - except: - print("Failed:", f.rel_path) - print("Unexpected error:", sys.exc_info()[0]) - raise - - # The file was uploaded, commit it to the db - c.execute("REPLACE INTO 'files' VALUES(?, ?, ?, ?);", (f.rel_path, f.mtime, f.size, 1)) - - else: - c.execute("UPDATE 'files' SET seen=1 WHERE `path` = ?;", (f.rel_path,)).fetchone() - #print("Skipping:", f.rel_path) - result = Result.skipped - - c.close() - - return result - - def _should_transfer_mtime(self, row, f): - return not row or row['mtime'] < f.mtime - - def _should_transfer_size(self, row, f): - return not row or row['size'] != f.size - def purge_remote(self): """ - Delete files on the remote that were not found when scanning the local tree. + During upload phase it is expected that destination modules track state of what files have been seen on the + local end. When local scan + upload is complete, the module uses this state to purge dead files on the remote. """ - c = self.db.cursor() - c_del = self.db.cursor() - - for purge_file in c.execute("SELECT * FROM 'files' WHERE seen=0;"): - print("Delete on remote: ", purge_file["path"]) - self.dest.purge_file(purge_file["path"]) - c_del.execute("DELETE FROM 'files' WHERE path=?;", (purge_file["path"],)) - - c_del.close() - c.close() + self.dest.purge() def sync(source_uri, dest_uri, account_id, app_key, workers=10, exclude=[], compare_method="mtime"): @@ -205,12 +106,13 @@ def sync(source_uri, dest_uri, account_id, app_key, workers=10, exclude=[], comp if dest.scheme == 'b2': # Plain file URI dest_receiver = B2Reciever(bucket=dest.netloc, path=dest.path, account_id=account_id, app_key=app_key, - workers=workers) + workers=workers, compare_method=compare_method) else: raise Exception("Dests other than B2 URIs not yet supported") assert source_provider is not None assert dest_receiver is not None - syncer = B2SyncManager(source_provider, dest_receiver, workers=workers, exclude_res=exclude, compare_method=compare_method) + syncer = B2SyncManager(source_provider, dest_receiver, + workers=workers, exclude_res=exclude) syncer.sync() diff --git a/bin/b2mirror b/bin/b2mirror index 1d66c62..c8d210d 100755 --- a/bin/b2mirror +++ b/bin/b2mirror @@ -4,9 +4,14 @@ import argparse import re from b2mirror import mirror +import logging def main(): + logging.basicConfig(level=logging.INFO) + logging.getLogger("requests").setLevel(logging.ERROR) + logging.getLogger("urllib3").setLevel(logging.ERROR) + parser = argparse.ArgumentParser(description="Sync data to/from B2") parser.add_argument("-i", "--size", help="Compare by size instead of mtime", action="store_true", default=False) diff --git a/setup.py b/setup.py index 086a23e..8eb5ceb 100755 --- a/setup.py +++ b/setup.py @@ -3,14 +3,13 @@ from setuptools import setup from b2mirror import __version__ setup(name='b2mirror', - version=__version__, - description='Tool for syc', - url='http://gitlab.xmopx.net/dave/b2mirror', - author='dpedu', - author_email='dave@davepedu.com', - packages=['b2mirror'], - scripts=['bin/b2mirror'], - zip_safe=False, - install_requires=[ - 'b2==0.6.2', - ]) + version=__version__, + description='Tool for syc', + url='http://gitlab.xmopx.net/dave/b2mirror', + author='dpedu', + author_email='dave@davepedu.com', + packages=['b2mirror'], + scripts=['bin/b2mirror'], + zip_safe=False, + install_requires=['b2==0.6.2'] + )