commit b5b4579d7413d5177db487e6743af84fb57935e3 Author: dave Date: Tue Jun 7 20:24:35 2016 -0700 initial commit - bare uploading works diff --git a/b2mirror/__init__.py b/b2mirror/__init__.py new file mode 100644 index 0000000..6c8e6b9 --- /dev/null +++ b/b2mirror/__init__.py @@ -0,0 +1 @@ +__version__ = "0.0.0" diff --git a/b2mirror/mirror.py b/b2mirror/mirror.py new file mode 100644 index 0000000..c47d89e --- /dev/null +++ b/b2mirror/mirror.py @@ -0,0 +1,197 @@ +import os +import sqlite3 +from threading import Thread +from urllib.parse import urlparse +from collections import namedtuple +from b2.api import B2Api +import sys + +from concurrent.futures import ThreadPoolExecutor, Future +""" +How it works: + +B2SyncManager manages the transfer + +It holes a src and dest object, src objects provide an iterable of FileInfos. + +The manager will iterate the set of FileInfos, and pass each to the dest + +Dest will upload the file, and inform the manager it was completed + +""" +class B2SyncManager(object): + + threads = 5 + + def __init__(self, source_module, dest_module): + self.src = source_module + self.dest = dest_module + self.db = sqlite3.connect('./sync.db') + self.db.row_factory = B2SyncManager.dict_factory + self.db.isolation_level = None # TBD - does it hurt perf? + self._init_db() + + @staticmethod + def dict_factory(cursor, row): + d = {} + for idx, col in enumerate(cursor.description): + d[col[0]] = row[idx] + return d + + def _init_db(self): + c = self.db.cursor() + + def table_exists(table_name): + c.execute("SELECT * FROM SQLITE_MASTER WHERE `type`='table' AND `name`=?", (table_name,)) + tables = c.fetchall() + if len(tables)==0: + return False + return True + + tables = { + "files": """ + CREATE TABLE `files` ( + `path` varchar(1024), + `mtime` INTEGER, + `size` INTEGER + );""" + } + + for table_name, table_create_query in tables.items(): + if not table_exists(table_name): + c.execute(table_create_query) + + c.close() + + def sync(self): + print("Syncing from {} to {}".format(self.src, self.dest)) + for f in self.src: + c = self.db.cursor() + + row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone() + + if not row or row['mtime'] < f.mtime: + + print("Uploading: ", f.rel_path, end='') + sys.stdout.flush() + + self.dest.put_file(f) + f.fp.close() + + print(" ok") + + # The file was uploaded, commit it to the db + + c.execute("INSERT INTO 'files' VALUES(?, ?, ?);", (f.rel_path, f.mtime, f.size,)) + + else: + print("Skipping: ", f.rel_path) + + c.close() + + +class Provider(object): + """ + Base class file queue iterable + """ + def __init__(self): + pass + +FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', 'fp']) + +class LocalProvider(Provider): + """ + Iterates files on local disk + """ + max_chunk_size = 8*1024*1024 + def __init__(self, local_path): + super(LocalProvider, self).__init__() + self.local_path = local_path + self.current_set = (None, [], []) + + def __iter__(self): + self.walker = os.walk(self.local_path) + return self + + def __next__(self): + if len(self.current_set[2]) > 0: + file_abs_path = os.path.join(self.current_set[0], self.current_set[2].pop()) + relative_path = file_abs_path[len(self.local_path):] + return FileInfo( + file_abs_path, + relative_path, + os.path.getsize(file_abs_path), + int(os.path.getmtime(file_abs_path)), + open(file_abs_path, 'rb') + ) + else: + self.current_set = self.walker.__next__() + self.current_set[1].sort(reverse=True) + self.current_set[2].sort(reverse=True) + return self.__next__() + + +class B2Provider(Provider): + """ + Iterates files in bucket + """ + + def __init__(self, accountId, appKey, bucketId, bucketBasePath): + super(B2Provider, self).__init__() + raise NotImplemented() + + +class Reciever(object): + """ + Base class for destinations + """ + +class B2Reciever(Reciever): + + max_chunk_size = 256*1024 + + def __init__(self, bucket, path, account_id, app_key): + super(B2Reciever, self).__init__() + self.bucket = bucket + self.path = path + self.account_id = account_id + self.app_key = app_key + + self.api = B2Api() + self.api.authorize_account('production', self.account_id, self.app_key) + self.bucket = self.api.get_bucket_by_name(self.bucket) + + def put_file(self, file_info): + #print(">>> {}".format(file_info.abs_path)) + dest_path = os.path.join(self.path, file_info.rel_path).lstrip('/') + self.bucket.upload_local_file( + file_info.abs_path, + dest_path + ) + +def sync(source_uri, dest_uri, account_id, app_key): + source = urlparse(source_uri) + dest = urlparse(dest_uri) + + syncer = B2SyncManager(source_uri, dest_uri) + + source_provider = None + dest_receiver = None + + if source.scheme == '': # Plain file URI + source_provider = LocalProvider(source.path) + else: + raise Exception("Sources other than local file paths not supported") + + if dest.scheme == 'b2': # Plain file URI + dest_receiver = B2Reciever(bucket=dest.netloc, path=dest.path, account_id=account_id, app_key=app_key) + else: + raise Exception("Dests other than B2 URIs not yet supported") + + + + assert source_provider is not None + assert dest_receiver is not None + + syncer = B2SyncManager(source_provider, dest_receiver) + syncer.sync() \ No newline at end of file diff --git a/bin/b2mirror b/bin/b2mirror new file mode 100755 index 0000000..66b5143 --- /dev/null +++ b/bin/b2mirror @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 + +import argparse +from b2mirror import mirror + +def main(): + parser = argparse.ArgumentParser(description="Sync data to/from B2") + + parser.add_argument("-s", "--source", required=True, help="Source URI") + parser.add_argument("-d", "--dest", required=True, help="Dest URI") + + parser.add_argument("-a", "--account-id", required=True, help="Backblaze account ID") + parser.add_argument("-k", "--app-key", required=True, help="Backblaze application key") + + args = parser.parse_args() + + mirror.sync(args.source, args.dest, args.account_id, args.app_key) + +if __name__ == '__main__': + main() diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..a06a666 --- /dev/null +++ b/setup.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 +from setuptools import setup +from b2mirror import __version__ + +setup(name='b2mirror', + version=__version__, + description='Tool for syc', + url='http://gitlab.xmopx.net/dave/b2mirror', + author='dpedu', + author_email='dave@davepedu.com', + packages=['b2mirror'], + scripts=['bin/b2mirror'], + zip_safe=False)