parallel uploads and excluding by regex

This commit is contained in:
Dave Pedu 2016-06-09 13:28:56 -07:00
parent b5b4579d74
commit ccaac0e8c1
1 changed files with 75 additions and 30 deletions

View File

@ -5,8 +5,15 @@ from urllib.parse import urlparse
from collections import namedtuple from collections import namedtuple
from b2.api import B2Api from b2.api import B2Api
import sys import sys
from itertools import islice, filterfalse
from concurrent.futures import ThreadPoolExecutor, Future from concurrent.futures import ThreadPoolExecutor, Future
import logging
import re
#logging.basicConfig(level=logging.INFO)
""" """
How it works: How it works:
@ -21,16 +28,19 @@ Dest will upload the file, and inform the manager it was completed
""" """
class B2SyncManager(object): class B2SyncManager(object):
threads = 5 workers = 10
def __init__(self, source_module, dest_module): def __init__(self, source_module, dest_module):
self.src = source_module self.src = source_module
self.dest = dest_module self.dest = dest_module
self.db = sqlite3.connect('./sync.db') self.db = sqlite3.connect('./sync.db', check_same_thread=False)
self.db.row_factory = B2SyncManager.dict_factory self.db.row_factory = B2SyncManager.dict_factory
self.db.isolation_level = None # TBD - does it hurt perf? self.db.isolation_level = None # TBD - does it hurt perf?
self.exclude_res = [
re.compile(r'.*\.(DS_Store|pyc)$')
]
self._init_db() self._init_db()
@staticmethod @staticmethod
def dict_factory(cursor, row): def dict_factory(cursor, row):
d = {} d = {}
@ -51,9 +61,10 @@ class B2SyncManager(object):
tables = { tables = {
"files": """ "files": """
CREATE TABLE `files` ( CREATE TABLE `files` (
`path` varchar(1024), `path` varchar(1024) PRIMARY KEY,
`mtime` INTEGER, `mtime` INTEGER,
`size` INTEGER `size` INTEGER,
`seen` BOOLEAN
);""" );"""
} }
@ -65,29 +76,63 @@ class B2SyncManager(object):
def sync(self): def sync(self):
print("Syncing from {} to {}".format(self.src, self.dest)) print("Syncing from {} to {}".format(self.src, self.dest))
for f in self.src:
c = self.db.cursor()
row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone()
if not row or row['mtime'] < f.mtime:
print("Uploading: ", f.rel_path, end='') print(i for i in self.src)
sys.stdout.flush() chunk_size = 1000
while True:
chunk = list(
filterfalse( # if rel_path matches any of the REs, the filter is True and the file is skipped
lambda x: any([pattern.match(x.rel_path) for pattern in self.exclude_res]),
islice(self.src, chunk_size)
)
)
for item in chunk:
assert len(item.rel_path) < 512
if len(chunk) == 0:
break
with ThreadPoolExecutor(max_workers=B2SyncManager.workers) as executor:
upload_futures = [executor.submit(self.xfer_file, item) for item in chunk]
#print("Queued {} tasks".format(len(chunk)))
for i in upload_futures:
assert i.result()
def canskip(self, f):
if f.rel_path.endswith('.DS_Store'):
return True
else:
return False
def xfer_file(self, f):
c = self.db.cursor()
row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone()
if self.canskip(f) or not row or row['mtime'] < f.mtime:
print("Starting:", f.rel_path)
try:
self.dest.put_file(f) self.dest.put_file(f)
f.fp.close() except:
print("Failed:", f.rel_path)
print("Unexpected error:", sys.exc_info()[0])
raise
#print("Ok: ", f.rel_path)
#f.fp.close()
# The file was uploaded, commit it to the db
c.execute("REPLACE INTO 'files' VALUES(?, ?, ?, ?);", (f.rel_path, f.mtime, f.size, 1))
#print("Done: ", f.rel_path)
print(" ok") else:
print("Skipping:", f.rel_path)
# The file was uploaded, commit it to the db c.close()
c.execute("INSERT INTO 'files' VALUES(?, ?, ?);", (f.rel_path, f.mtime, f.size,)) return True
else:
print("Skipping: ", f.rel_path)
c.close()
class Provider(object): class Provider(object):
@ -97,7 +142,7 @@ class Provider(object):
def __init__(self): def __init__(self):
pass pass
FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', 'fp']) FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', ]) # 'fp'
class LocalProvider(Provider): class LocalProvider(Provider):
""" """
@ -108,9 +153,9 @@ class LocalProvider(Provider):
super(LocalProvider, self).__init__() super(LocalProvider, self).__init__()
self.local_path = local_path self.local_path = local_path
self.current_set = (None, [], []) self.current_set = (None, [], [])
self.walker = os.walk(self.local_path)
def __iter__(self): def __iter__(self):
self.walker = os.walk(self.local_path)
return self return self
def __next__(self): def __next__(self):
@ -122,7 +167,7 @@ class LocalProvider(Provider):
relative_path, relative_path,
os.path.getsize(file_abs_path), os.path.getsize(file_abs_path),
int(os.path.getmtime(file_abs_path)), int(os.path.getmtime(file_abs_path)),
open(file_abs_path, 'rb') #open(file_abs_path, 'rb')
) )
else: else:
self.current_set = self.walker.__next__() self.current_set = self.walker.__next__()
@ -157,7 +202,7 @@ class B2Reciever(Reciever):
self.account_id = account_id self.account_id = account_id
self.app_key = app_key self.app_key = app_key
self.api = B2Api() self.api = B2Api(max_upload_workers=B2SyncManager.workers)
self.api.authorize_account('production', self.account_id, self.app_key) self.api.authorize_account('production', self.account_id, self.app_key)
self.bucket = self.api.get_bucket_by_name(self.bucket) self.bucket = self.api.get_bucket_by_name(self.bucket)
@ -172,7 +217,7 @@ class B2Reciever(Reciever):
def sync(source_uri, dest_uri, account_id, app_key): def sync(source_uri, dest_uri, account_id, app_key):
source = urlparse(source_uri) source = urlparse(source_uri)
dest = urlparse(dest_uri) dest = urlparse(dest_uri)
syncer = B2SyncManager(source_uri, dest_uri) syncer = B2SyncManager(source_uri, dest_uri)
source_provider = None source_provider = None
@ -192,6 +237,6 @@ def sync(source_uri, dest_uri, account_id, app_key):
assert source_provider is not None assert source_provider is not None
assert dest_receiver is not None assert dest_receiver is not None
syncer = B2SyncManager(source_provider, dest_receiver) syncer = B2SyncManager(source_provider, dest_receiver)
syncer.sync() syncer.sync()