Code cleanup

This commit is contained in:
Dave Pedu 2016-06-09 15:13:12 -07:00
parent a2922b90a9
commit 4b958968d1
1 changed files with 22 additions and 23 deletions

View File

@ -1,18 +1,17 @@
import os import os
import sqlite3 import sqlite3
from threading import Thread
from urllib.parse import urlparse from urllib.parse import urlparse
from collections import namedtuple from collections import namedtuple
from b2.api import B2Api from b2.api import B2Api
import sys import sys
from itertools import islice, filterfalse from itertools import islice, filterfalse
from concurrent.futures import ThreadPoolExecutor, Future from concurrent.futures import ThreadPoolExecutor
import re import re
#import logging # import logging
#logging.basicConfig(level=logging.INFO) # logging.basicConfig(level=logging.INFO)
FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', ]) # 'fp' FileInfo = namedtuple('FileInfo', ['abs_path', 'rel_path', 'size', 'mtime', ]) # 'fp'
""" """
@ -27,16 +26,18 @@ The manager will iterate the set of FileInfos, and pass each to the dest
Dest will upload the file, and inform the manager it was completed Dest will upload the file, and inform the manager it was completed
""" """
class B2SyncManager(object): class B2SyncManager(object):
workers = 10 workers = 15
def __init__(self, source_module, dest_module, exclude_res=None): def __init__(self, source_module, dest_module, exclude_res=None):
self.src = source_module self.src = source_module
self.dest = dest_module self.dest = dest_module
self.db = sqlite3.connect('./sync.db', check_same_thread=False) self.db = sqlite3.connect('./sync.db', check_same_thread=False)
self.db.row_factory = B2SyncManager.dict_factory self.db.row_factory = B2SyncManager.dict_factory
self.db.isolation_level = None # TBD - does it hurt perf? self.db.isolation_level = None # TBD - does it hurt perf?
self.exclude_res = [ self.exclude_res = [
re.compile(r'.*\.(DS_Store|pyc|dropbox)$') re.compile(r'.*\.(DS_Store|pyc|dropbox)$')
] + (exclude_res if exclude_res else []) ] + (exclude_res if exclude_res else [])
@ -55,7 +56,7 @@ class B2SyncManager(object):
def table_exists(table_name): def table_exists(table_name):
c.execute("SELECT * FROM SQLITE_MASTER WHERE `type`='table' AND `name`=?", (table_name,)) c.execute("SELECT * FROM SQLITE_MASTER WHERE `type`='table' AND `name`=?", (table_name,))
tables = c.fetchall() tables = c.fetchall()
if len(tables)==0: if len(tables) == 0:
return False return False
return True return True
@ -88,15 +89,14 @@ class B2SyncManager(object):
# Files will be marked as seen as they are processed # Files will be marked as seen as they are processed
# Later, unseen files will be purged # Later, unseen files will be purged
c = self.db.cursor() c = self.db.cursor()
row = c.execute("UPDATE 'files' SET seen=0;") c.execute("UPDATE 'files' SET seen=0;")
c.close() c.close()
chunk_size = 1000 chunk_size = 1000
files_source = filterfalse( # if rel_path matches any of the REs, the filter is True and the file is skipped files_source = filterfalse( # if rel_path matches any of the REs, the filter is True and the file is skipped
lambda x: any([pattern.match(x.rel_path) for pattern in self.exclude_res]), lambda x: any([pattern.match(x.rel_path) for pattern in self.exclude_res]),
self.src self.src)
)
while True: while True:
chunk = list(islice(files_source, chunk_size)) chunk = list(islice(files_source, chunk_size))
@ -119,7 +119,6 @@ class B2SyncManager(object):
row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone() row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone()
if not row or row['mtime'] < f.mtime: if not row or row['mtime'] < f.mtime:
print("Uploading:", f.rel_path) print("Uploading:", f.rel_path)
@ -129,15 +128,15 @@ class B2SyncManager(object):
print("Failed:", f.rel_path) print("Failed:", f.rel_path)
print("Unexpected error:", sys.exc_info()[0]) print("Unexpected error:", sys.exc_info()[0])
raise raise
#print("Ok: ", f.rel_path) # print("Ok: ", f.rel_path)
# The file was uploaded, commit it to the db # The file was uploaded, commit it to the db
c.execute("REPLACE INTO 'files' VALUES(?, ?, ?, ?);", (f.rel_path, f.mtime, f.size, 1)) c.execute("REPLACE INTO 'files' VALUES(?, ?, ?, ?);", (f.rel_path, f.mtime, f.size, 1))
#print("Done: ", f.rel_path) # print("Done: ", f.rel_path)
else: else:
c.execute("UPDATE 'files' SET seen=1 WHERE `path` = ?;", (f.rel_path,)).fetchone() c.execute("UPDATE 'files' SET seen=1 WHERE `path` = ?;", (f.rel_path,)).fetchone()
#print("Skipping:", f.rel_path) # print("Skipping:", f.rel_path)
c.close() c.close()
@ -175,6 +174,7 @@ class LocalProvider(Provider):
Iterates files on local disk Iterates files on local disk
""" """
max_chunk_size = 8*1024*1024 max_chunk_size = 8*1024*1024
def __init__(self, local_path): def __init__(self, local_path):
super(LocalProvider, self).__init__() super(LocalProvider, self).__init__()
self.local_path = local_path self.local_path = local_path
@ -190,7 +190,7 @@ class LocalProvider(Provider):
relative_path, relative_path,
os.path.getsize(file_abs_path), os.path.getsize(file_abs_path),
int(os.path.getmtime(file_abs_path)), int(os.path.getmtime(file_abs_path)),
#open(file_abs_path, 'rb') # open(file_abs_path, 'rb')
) )
else: else:
self.current_set = self.walker.__next__() self.current_set = self.walker.__next__()
@ -205,8 +205,8 @@ class B2Provider(Provider):
""" """
def __init__(self, accountId, appKey, bucketId, bucketBasePath): def __init__(self, accountId, appKey, bucketId, bucketBasePath):
super(B2Provider, self).__init__() super(B2Provider, self).__init__()
raise NotImplemented() raise NotImplemented()
class Reciever(object): class Reciever(object):
@ -219,6 +219,7 @@ class Reciever(object):
def purge_file(self, file_path): def purge_file(self, file_path):
raise NotImplemented() raise NotImplemented()
class B2Reciever(Reciever): class B2Reciever(Reciever):
max_chunk_size = 256*1024 max_chunk_size = 256*1024
@ -235,7 +236,6 @@ class B2Reciever(Reciever):
self.bucket = self.api.get_bucket_by_name(self.bucket_name) self.bucket = self.api.get_bucket_by_name(self.bucket_name)
def put_file(self, file_info): def put_file(self, file_info):
#print(">>> {}".format(file_info.abs_path))
dest_path = os.path.join(self.path, file_info.rel_path).lstrip('/') dest_path = os.path.join(self.path, file_info.rel_path).lstrip('/')
self.bucket.upload_local_file( self.bucket.upload_local_file(
file_info.abs_path, file_info.abs_path,
@ -246,7 +246,6 @@ class B2Reciever(Reciever):
dest_path = os.path.join(self.path, file_path).lstrip('/') dest_path = os.path.join(self.path, file_path).lstrip('/')
self.delete_by_path(dest_path) self.delete_by_path(dest_path)
def delete_by_path(self, file_path): def delete_by_path(self, file_path):
for f in self.bucket.list_file_versions(start_filename=file_path, max_entries=100)["files"]: for f in self.bucket.list_file_versions(start_filename=file_path, max_entries=100)["files"]:
if f["fileName"] == file_path: if f["fileName"] == file_path:
@ -262,12 +261,12 @@ def sync(source_uri, dest_uri, account_id, app_key):
source_provider = None source_provider = None
dest_receiver = None dest_receiver = None
if source.scheme == '': # Plain file URI if source.scheme == '': # Plain file URI
source_provider = LocalProvider(source.path) source_provider = LocalProvider(source.path)
else: else:
raise Exception("Sources other than local file paths not supported") raise Exception("Sources other than local file paths not supported")
if dest.scheme == 'b2': # Plain file URI if dest.scheme == 'b2': # Plain file URI
dest_receiver = B2Reciever(bucket=dest.netloc, path=dest.path, account_id=account_id, app_key=app_key) dest_receiver = B2Reciever(bucket=dest.netloc, path=dest.path, account_id=account_id, app_key=app_key)
else: else:
raise Exception("Dests other than B2 URIs not yet supported") raise Exception("Dests other than B2 URIs not yet supported")