move sqlite tracking logic to B2 plugin

This commit is contained in:
dave 2016-07-24 12:46:46 -07:00
parent 07ce5b4059
commit 416b53d83a
6 changed files with 192 additions and 139 deletions

View File

@ -1,9 +1,16 @@
import os
import sys
import logging
from b2.api import B2Api
from b2mirror.base import Provider, Reciever
from b2mirror.common import Result, results_ok
from b2mirror.common import Result, FileInfo
from b2 import exception as b2exception
from b2.download_dest import DownloadDestLocalFile
import sqlite3
from contextlib import closing
class B2Provider(Provider):
"""
@ -17,12 +24,13 @@ class B2Provider(Provider):
class B2Reciever(Reciever):
max_chunk_size = 256*1024
max_chunk_size = 256 * 1024
def __init__(self, bucket, path, account_id, app_key, workers=10):
def __init__(self, bucket, path, account_id, app_key, workers=10, compare_method='mtime'):
super(B2Reciever, self).__init__()
self.log = logging.getLogger("B2Reciever")
self.bucket_name = bucket
self.path = path
self.path = path.lstrip('/')
self.account_id = account_id
self.app_key = app_key
@ -30,18 +38,153 @@ class B2Reciever(Reciever):
self.api.authorize_account('production', self.account_id, self.app_key)
self.bucket = self.api.get_bucket_by_name(self.bucket_name)
self.db = None
self._db_setup()
# The receiver is responsible to determining if a file needs to be uploaded or not
self.should_transfer = {
"mtime": self._should_transfer_mtime,
"size": self._should_transfer_size
}[compare_method]
def _db_setup(self, db_path=None):
"""
This plugin uses a sqlite database to track the contents of what is on the remote B2 bucket. Why? It's simply
faster than using B2's quite limited API to perform the same action. The sqlite DB is stored on the bucket.
This method:
- Downloads the DB
- if none present, creates a new db file
- Initializes/updates tables in the db
"""
if not db_path:
db_path = '/tmp/b2mirror.{}.db'.format(os.getpid())
self.db_path = db_path
fetch_success = self._fetch_remote_db(db_path)
self._open_db()
if not fetch_success:
# no db was downloaded and the handle above is empty. initialize it.
self._init_db_contents()
logging.info("Initialized database")
# Mark all files as unseen
# Files will be marked as seen as they are processed
# Later, unseen files will be purged
with closing(self.db.cursor()) as c:
c.execute("UPDATE 'files' SET seen=0;")
def _open_db(self):
self.db = sqlite3.connect(self.db_path, check_same_thread=False, isolation_level=None)
self.db.row_factory = sqlite3.Row
def _init_db_contents(self):
"""
Init the sqlite database. Creates missing tables.
"""
def table_exists(table_name):
c.execute("SELECT * FROM SQLITE_MASTER WHERE `type`='table' AND `name`=?", (table_name,))
tables = c.fetchall()
if len(tables) == 0:
return False
return True
tables = {
"files": """
CREATE TABLE `files` (
`path` varchar(4096) PRIMARY KEY,
`mtime` INTEGER,
`size` INTEGER,
`seen` BOOLEAN
);"""
}
with closing(self.db.cursor()) as c:
for table_name, table_create_query in tables.items():
if not table_exists(table_name):
c.execute(table_create_query)
def _fetch_remote_db(self, db_path):
db_bucket_path = os.path.join(self.path, ".b2mirror.db")
self.log.info("Fetching tracking db from bucket ({}) to {}".format(db_bucket_path, db_path))
try:
self.bucket.download_file_by_name(db_bucket_path, DownloadDestLocalFile(db_path))
except b2exception.UnknownError as e:
if '404 not_found' in e.message:
return False
else:
raise
return True
def teardown(self):
"""
Place the DB file back onto the remote
"""
self.db.close()
sqlite_finfo = FileInfo(self.db_path,
".b2mirror.db",
os.path.getsize(self.db_path),
int(os.path.getmtime(self.db_path)))
self.put_file(sqlite_finfo, purge_historics=True)
os.unlink(self.db_path)
def _should_transfer_mtime(self, row, f):
return not row or row['mtime'] < f.mtime
def _should_transfer_size(self, row, f):
return not row or row['size'] != f.size
def xfer_file(self, f):
"""
Future-called function that handles a single file. The file's modification time is checked against the database
to see if the file has new content that should be uploaded or is untouched since the last sync
"""
result = Result.failed
with closing(self.db.cursor()) as c:
row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone()
if self.should_transfer(row, f):
print("Uploading:", f.rel_path)
try:
# upload the file. if a row existed it means there may be historic copies of the file already there
result = self.put_file(f, purge_historics=row is not None)
except:
print("Failed:", f.rel_path)
print("Unexpected error:", sys.exc_info()[0])
raise
# The file was uploaded, commit it to the db
c.execute("REPLACE INTO 'files' VALUES(?, ?, ?, ?);", (f.rel_path, f.mtime, f.size, 1))
else:
c.execute("UPDATE 'files' SET seen=1 WHERE `path` = ?;", (f.rel_path,)).fetchone()
result = Result.skipped
return result
def put_file(self, file_info, purge_historics=False):
dest_path = os.path.join(self.path, file_info.rel_path).lstrip('/')
upload_result = self.bucket.upload_local_file(
file_info.abs_path,
dest_path
)
upload_result = self.bucket.upload_local_file(file_info.abs_path, dest_path) # NOQA
if purge_historics:
self.delete_by_path(dest_path, skip=1)
return Result.ok
def purge(self):
"""
Delete files on the remote that were not found when scanning the local tree. This assumes an upload phase has
already been doing using ***THIS B2Reciever INSTANCE***.
"""
with closing(self.db.cursor()) as c:
with closing(self.db.cursor()) as c_del:
for purge_file in c.execute("SELECT * FROM 'files' WHERE seen=0;"):
print("Delete on remote: ", purge_file["path"])
self.purge_file(purge_file["path"])
c_del.execute("DELETE FROM 'files' WHERE path=?;", (purge_file["path"],))
def purge_file(self, file_path):
"""
Remove a file and all historical copies from the bucket
@ -63,6 +206,6 @@ class B2Reciever(Reciever):
if skip == 0:
self.api.delete_file_version(f["fileId"], f["fileName"])
else:
skip-=1
skip -= 1
else:
return

View File

@ -11,6 +11,9 @@ class Provider(object):
def __next__(self):
raise NotImplemented()
def teardown(self):
pass
class Reciever(object):
"""
@ -21,3 +24,6 @@ class Reciever(object):
def purge_file(self, file_path):
raise NotImplemented()
def teardown(self):
pass

View File

@ -4,13 +4,11 @@ from b2mirror.base import Provider, Reciever
from b2mirror.common import FileInfo
class LocalProvider(Provider):
"""
Iterates files on local disk
"""
max_chunk_size = 8*1024*1024
max_chunk_size = 8 * 1024 * 1024
def __init__(self, local_path):
super(LocalProvider, self).__init__()

View File

@ -1,16 +1,13 @@
import re
import sys
import sqlite3
from urllib.parse import urlparse
from itertools import islice, filterfalse
from concurrent.futures import ThreadPoolExecutor
from b2mirror.localplugin import LocalProvider
from b2mirror.b2plugin import B2Reciever
from b2mirror.common import Result, results_ok
from b2mirror.common import results_ok
# import logging
# logging.basicConfig(level=logging.INFO)
import logging
"""
How it works:
@ -28,7 +25,7 @@ Dest will upload the file, and inform the manager it was completed
class B2SyncManager(object):
def __init__(self, source_module, dest_module, exclude_res=None, workers=10, compare_method="mtime"):
def __init__(self, source_module, dest_module, exclude_res=None, workers=10):
"""
:param source_module: subclass instance of b2mirror.base.Provider acting as a file source
:param dest_module: subclass of b2mirror.base.Receiver acting as a file destination
@ -36,60 +33,18 @@ class B2SyncManager(object):
means skip the file (and delete on the remote).
:param workers: Number of parallel transfers
"""
self.log = logging.getLogger("B2SyncManager")
self.src = source_module
self.dest = dest_module
self.db = sqlite3.connect('./sync.db', check_same_thread=False)
self.db.row_factory = B2SyncManager.dict_factory
self.db.isolation_level = None # TBD - does it hurt perf?
self.exclude_res = [
re.compile(r'.*\.(DS_Store|pyc|dropbox)$'),
re.compile(r'.*__pycache__.*'),
re.compile(r'.*\.dropbox\.cache.*'),
re.compile(r'.*\.AppleDouble.*')
] + (exclude_res if exclude_res else [])
self.workers = workers
self._init_db()
self.should_transfer = {
"mtime": self._should_transfer_mtime,
"size": self._should_transfer_size
}[compare_method]
@staticmethod
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def _init_db(self):
"""
Init the sqlite databsae. Creates missing tables.
"""
c = self.db.cursor()
def table_exists(table_name):
c.execute("SELECT * FROM SQLITE_MASTER WHERE `type`='table' AND `name`=?", (table_name,))
tables = c.fetchall()
if len(tables) == 0:
return False
return True
tables = {
"files": """
CREATE TABLE `files` (
`path` varchar(4096) PRIMARY KEY,
`mtime` INTEGER,
`size` INTEGER,
`seen` BOOLEAN
);"""
}
for table_name, table_create_query in tables.items():
if not table_exists(table_name):
c.execute(table_create_query)
c.close()
def sync(self):
"""
@ -99,21 +54,15 @@ class B2SyncManager(object):
self.sync_up()
# Phase 2 - Delete files on the remote missing locally
self.purge_remote()
# Phase 3 - Tear down the src/dest modules
self.src.teardown()
self.dest.teardown()
def sync_up(self):
"""
Sync local files to the remote. All files in the DB will be marked as unseen. When a file is found locally it is
again marked as seen. This state later used to clear deleted files from the destination
"""
#print("Syncing from {} to {}".format(self.src, self.dest))
# Mark all files as unseen
# Files will be marked as seen as they are processed
# Later, unseen files will be purged
c = self.db.cursor()
c.execute("UPDATE 'files' SET seen=0;")
c.close()
chunk_size = 1000
# if rel_path matches any of the REs, the filter is True and the file is skipped
@ -124,71 +73,23 @@ class B2SyncManager(object):
for item in chunk:
# long path names can't be put in sqlite
assert len(item.rel_path) < 512
assert len(item.rel_path) < 1024
if len(chunk) == 0:
break
with ThreadPoolExecutor(max_workers=self.workers) as executor:
upload_futures = [executor.submit(self.xfer_file, item) for item in chunk]
upload_futures = [executor.submit(self.dest.xfer_file, item) for item in chunk]
for i in upload_futures:
assert i.result() in results_ok
def xfer_file(self, f):
"""
Future-called function that handles a single file. The file's modification time is checked against the database
to see if the file has new content that should be uploaded or is untouched since the last sync
"""
result = Result.failed
c = self.db.cursor()
row = c.execute("SELECT * FROM 'files' WHERE `path` = ?;", (f.rel_path,)).fetchone()
if self.should_transfer(row, f):
print("Uploading:", f.rel_path)
try:
result = self.dest.put_file(f, purge_historics=row is not None)
except:
print("Failed:", f.rel_path)
print("Unexpected error:", sys.exc_info()[0])
raise
# The file was uploaded, commit it to the db
c.execute("REPLACE INTO 'files' VALUES(?, ?, ?, ?);", (f.rel_path, f.mtime, f.size, 1))
else:
c.execute("UPDATE 'files' SET seen=1 WHERE `path` = ?;", (f.rel_path,)).fetchone()
#print("Skipping:", f.rel_path)
result = Result.skipped
c.close()
return result
def _should_transfer_mtime(self, row, f):
return not row or row['mtime'] < f.mtime
def _should_transfer_size(self, row, f):
return not row or row['size'] != f.size
def purge_remote(self):
"""
Delete files on the remote that were not found when scanning the local tree.
During upload phase it is expected that destination modules track state of what files have been seen on the
local end. When local scan + upload is complete, the module uses this state to purge dead files on the remote.
"""
c = self.db.cursor()
c_del = self.db.cursor()
for purge_file in c.execute("SELECT * FROM 'files' WHERE seen=0;"):
print("Delete on remote: ", purge_file["path"])
self.dest.purge_file(purge_file["path"])
c_del.execute("DELETE FROM 'files' WHERE path=?;", (purge_file["path"],))
c_del.close()
c.close()
self.dest.purge()
def sync(source_uri, dest_uri, account_id, app_key, workers=10, exclude=[], compare_method="mtime"):
@ -205,12 +106,13 @@ def sync(source_uri, dest_uri, account_id, app_key, workers=10, exclude=[], comp
if dest.scheme == 'b2': # Plain file URI
dest_receiver = B2Reciever(bucket=dest.netloc, path=dest.path, account_id=account_id, app_key=app_key,
workers=workers)
workers=workers, compare_method=compare_method)
else:
raise Exception("Dests other than B2 URIs not yet supported")
assert source_provider is not None
assert dest_receiver is not None
syncer = B2SyncManager(source_provider, dest_receiver, workers=workers, exclude_res=exclude, compare_method=compare_method)
syncer = B2SyncManager(source_provider, dest_receiver,
workers=workers, exclude_res=exclude)
syncer.sync()

View File

@ -4,9 +4,14 @@ import argparse
import re
from b2mirror import mirror
import logging
def main():
logging.basicConfig(level=logging.INFO)
logging.getLogger("requests").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
parser = argparse.ArgumentParser(description="Sync data to/from B2")
parser.add_argument("-i", "--size", help="Compare by size instead of mtime", action="store_true", default=False)

View File

@ -3,14 +3,13 @@ from setuptools import setup
from b2mirror import __version__
setup(name='b2mirror',
version=__version__,
description='Tool for syc',
url='http://gitlab.xmopx.net/dave/b2mirror',
author='dpedu',
author_email='dave@davepedu.com',
packages=['b2mirror'],
scripts=['bin/b2mirror'],
zip_safe=False,
install_requires=[
'b2==0.6.2',
])
version=__version__,
description='Tool for syc',
url='http://gitlab.xmopx.net/dave/b2mirror',
author='dpedu',
author_email='dave@davepedu.com',
packages=['b2mirror'],
scripts=['bin/b2mirror'],
zip_safe=False,
install_requires=['b2==0.6.2']
)