b2mirror/b2mirror/mirror.py

133 lines
4.6 KiB
Python
Raw Permalink Normal View History

2016-06-10 13:06:38 -07:00
import re
2016-06-07 20:24:35 -07:00
from urllib.parse import urlparse
from itertools import islice, filterfalse
2016-06-09 15:13:12 -07:00
from concurrent.futures import ThreadPoolExecutor
2016-06-10 13:06:38 -07:00
2016-07-24 13:05:12 -07:00
from b2mirror.plugin.localplugin import LocalProvider
from b2mirror.plugin.b2plugin import B2Reciever
from b2mirror.common import results_ok
import logging
2016-06-07 20:24:35 -07:00
"""
How it works:
B2SyncManager manages the transfer
2016-06-09 14:48:37 -07:00
It holds a src and dest object, src objects provide an iterable of FileInfos.
2016-06-07 20:24:35 -07:00
The manager will iterate the set of FileInfos, and pass each to the dest
Dest will upload the file, and inform the manager it was completed
"""
2016-06-09 15:13:12 -07:00
2016-06-07 20:24:35 -07:00
class B2SyncManager(object):
def __init__(self, source_module, dest_module, exclude_res=None, workers=10):
2016-06-10 13:06:38 -07:00
"""
:param source_module: subclass instance of b2mirror.base.Provider acting as a file source
:param dest_module: subclass of b2mirror.base.Receiver acting as a file destination
:param exclude_res: compiled regular expression objects that file paths will be matched against. Finding a match
means skip the file (and delete on the remote).
:param workers: Number of parallel transfers
"""
self.log = logging.getLogger("B2SyncManager")
2016-06-07 20:24:35 -07:00
self.src = source_module
self.dest = dest_module
self.exclude_res = [
2016-06-10 13:06:38 -07:00
re.compile(r'.*\.(DS_Store|pyc|dropbox)$'),
2016-06-10 13:07:02 -07:00
re.compile(r'.*__pycache__.*'),
re.compile(r'.*\.dropbox\.cache.*'),
re.compile(r'.*\.AppleDouble.*')
2016-06-09 14:48:37 -07:00
] + (exclude_res if exclude_res else [])
2016-06-07 20:24:35 -07:00
self.workers = workers
2016-06-07 20:24:35 -07:00
2016-07-24 13:00:32 -07:00
self.log.info("Initialized with %s workers, %s ignores", self.workers, len(self.exclude_res))
2016-06-07 20:24:35 -07:00
def sync(self):
2016-06-10 13:06:38 -07:00
"""
Sync the source to the dest. First uploads new local files, then cleans dead files from the remote.
"""
2016-06-09 13:37:17 -07:00
# Phase 1 - Upload all local files missing on the remote
self.sync_up()
# Phase 2 - Delete files on the remote missing locally
self.purge_remote()
# Phase 3 - Tear down the src/dest modules
2016-07-24 13:00:32 -07:00
self.cleanup()
2016-06-09 13:37:17 -07:00
def sync_up(self):
2016-06-10 13:06:38 -07:00
"""
Sync local files to the remote. All files in the DB will be marked as unseen. When a file is found locally it is
again marked as seen. This state later used to clear deleted files from the destination
"""
2016-07-24 13:00:32 -07:00
self.log.info("beginning upload phase")
chunk_size = 1000
2016-06-07 20:24:35 -07:00
2016-06-10 13:06:38 -07:00
# if rel_path matches any of the REs, the filter is True and the file is skipped
files_source = filterfalse(lambda x: any([pattern.match(x.rel_path) for pattern in self.exclude_res]), self.src)
2016-06-09 13:37:17 -07:00
while True:
chunk = list(islice(files_source, chunk_size))
for item in chunk:
2016-06-09 13:37:17 -07:00
# long path names can't be put in sqlite
assert len(item.rel_path) < 1024
if len(chunk) == 0:
break
2016-06-10 13:06:38 -07:00
with ThreadPoolExecutor(max_workers=self.workers) as executor:
upload_futures = [executor.submit(self.dest.xfer_file, item) for item in chunk]
for i in upload_futures:
2016-06-10 13:06:38 -07:00
assert i.result() in results_ok
2016-06-09 14:48:37 -07:00
def purge_remote(self):
2016-06-10 13:06:38 -07:00
"""
During upload phase it is expected that destination modules track state of what files have been seen on the
local end. When local scan + upload is complete, the module uses this state to purge dead files on the remote.
2016-06-10 13:06:38 -07:00
"""
2016-07-24 13:00:32 -07:00
self.log.info("beginning remote purge phase")
self.dest.purge()
2016-06-09 14:48:37 -07:00
2016-07-24 13:00:32 -07:00
def cleanup(self):
self.log.info("beginning cleanp phase")
self.src.teardown()
self.dest.teardown()
2016-06-07 20:24:35 -07:00
def sync(source_uri, dest_uri, account_id, app_key, workers=10, exclude=[], compare_method="mtime"):
2016-07-24 13:00:32 -07:00
log = logging.getLogger("mirror")
2016-06-07 20:24:35 -07:00
source = urlparse(source_uri)
dest = urlparse(dest_uri)
2016-06-07 20:24:35 -07:00
source_provider = None
dest_receiver = None
2016-06-09 15:13:12 -07:00
if source.scheme == '': # Plain file URI
2016-06-07 20:24:35 -07:00
source_provider = LocalProvider(source.path)
else:
raise Exception("Sources other than local file paths not supported")
2016-06-09 15:13:12 -07:00
if dest.scheme == 'b2': # Plain file URI
2016-06-10 13:06:38 -07:00
dest_receiver = B2Reciever(bucket=dest.netloc, path=dest.path, account_id=account_id, app_key=app_key,
workers=workers, compare_method=compare_method)
2016-06-07 20:24:35 -07:00
else:
raise Exception("Dests other than B2 URIs not yet supported")
assert source_provider is not None
assert dest_receiver is not None
2016-07-24 13:00:32 -07:00
log.info("Source: %s", source_provider)
log.info("Dest: %s", dest_receiver)
syncer = B2SyncManager(source_provider, dest_receiver,
workers=workers, exclude_res=exclude)
syncer.sync()