From c3261071f469db125f2bb61340bac59daed3528e Mon Sep 17 00:00:00 2001 From: dave Date: Sat, 22 Apr 2017 00:55:51 -0700 Subject: [PATCH] Stream tarball uploads Previously, this would pipe tar's output to curl. But curl would buffer the entire output to memory in order to calculate the content-length header. This pointlessly made large tar backups impossible as the backend does not require a content-length header sent with the data. --- README.md | 9 +++-- datadb/__init__.py | 2 +- datadb/datadb.py | 84 +++++++++++++++++++++++++++++++++++----------- 3 files changed, 73 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 1b3b97c..937bdab 100644 --- a/README.md +++ b/README.md @@ -86,12 +86,17 @@ Restore operations have a degree of sanity checking. Upon a successful restore, Command line usage is agnostic to the underlying transport protocol used. -### Testing -The following environment variables can be used to ease testing: +### Tips + +If available, `pigz` will be selected over gzip. Pigz is a multicore-capable implementation of gzip and is recommended +on multicore machines. See http://zlib.net/pigz/. + +The following environment variables can be used to ease testing or deployment: * DATADB_CONF=./example.ini * DATADB_KEYPATH=./datadb.key +* DATADB_HTTP_API='http://127.0.0.1:1234/cgi-bin/' ## TODO diff --git a/datadb/__init__.py b/datadb/__init__.py index b1a19e3..034f46c 100755 --- a/datadb/__init__.py +++ b/datadb/__init__.py @@ -1 +1 @@ -__version__ = "0.0.5" +__version__ = "0.0.6" diff --git a/datadb/datadb.py b/datadb/datadb.py index f62f35e..903a2ee 100755 --- a/datadb/datadb.py +++ b/datadb/datadb.py @@ -13,7 +13,7 @@ from requests import get, put, head SSH_KEY_PATH = environ["DATADB_KEYPATH"] if "DATADB_KEYPATH" in environ else '/root/.ssh/datadb.key' RSYNC_DEFAULT_ARGS = ['rsync', '-avzr', '--exclude=.datadb.lock', '--whole-file', '--one-file-system', '--delete', '-e', 'ssh -i {} -p 4874 -o StrictHostKeyChecking=no'.format(SSH_KEY_PATH)] -DATADB_HTTP_API = 'http://datadb.services.davepedu.com:4875/cgi-bin/' +DATADB_HTTP_API = environ.get('DATADB_HTTP_API', 'http://datadb.services.davepedu.com:4875/cgi-bin/') class SyncStatus(Enum): @@ -23,6 +23,29 @@ class SyncStatus(Enum): DATA_MISSING = 2 +# Requests will call tell() on the file-like stdout stream if the tell attribute exists. However subprocess' +# stdout stream (_io.BufferedReader) does not support this (raises OSError: [Errno 29] Illegal seek). +# If the tell attribute is missing, requests will fall back to simply iterating on the file-like object, +# so, we support only the iterable interface +class WrappedStdout(object): + BUFFSIZE = 256 * 1024 + + def __init__(self, stdout): + self.stdout = stdout + + def __iter__(self): + return self + + def __next__(self): + data = self.stdout.read(self.BUFFSIZE) + if not data: + raise StopIteration() + return data + + def close(self): + self.stdout.close() + + def restore(profile, conf, force=False): # remote_uri, local_dir, identity='/root/.ssh/datadb.key' """ Restore data from datadb @@ -30,7 +53,8 @@ def restore(profile, conf, force=False): # remote_uri, local_dir, identity='/ro # Sanity check: If the lockfile exists we assume the data is already there, so we wouldn't want to call rsync again # as it would wipe out local changes. This can be overridden with --force - assert (status(profile, conf) == SyncStatus.DATA_MISSING) or force, "Data already exists (Use --force?)" + if not ((status(profile, conf) == SyncStatus.DATA_MISSING) or force): + raise Exception("Data already exists (Use --force?)") original_perms = stat(conf["dir"]) dest = urlparse(conf["uri"]) @@ -72,8 +96,10 @@ def restore(profile, conf, force=False): # remote_uri, local_dir, identity='/ro extract.wait() # TODO: convert to pure python? - assert dl.returncode == 0, "Could not download archive" - assert extract.returncode == 0, "Could not extract archive" + if dl.returncode != 0: + raise Exception("Could not download archive") + if extract.returncode != 0: + raise Exception("Could not extract archive") # Restore original permissions on data dir # TODO store these in conf file @@ -89,7 +115,8 @@ def backup(profile, conf, force=False): # Sanity check: If the lockfile doesn't exist we assume the data is missing, so we wouldn't want to call rsync # again as it would wipe out the backup. - assert (status(profile, conf) == SyncStatus.DATA_AVAILABLE) or force, "Data is missing (Use --force?)" + if not ((status(profile, conf) == SyncStatus.DATA_AVAILABLE) or force): + raise Exception("Data is missing (Use --force?)") dest = urlparse(conf["uri"]) @@ -132,10 +159,16 @@ def backup(profile, conf, force=False): elif dest.scheme == 'archive': # CD to local source dir - # create tarball - # http PUT file to backup server + # tar+gz data and stream to backup server + args_tar = ['tar', '--exclude=.datadb.lock'] + # Use pigz if available (Parallel gzip - http://zlib.net/pigz/) + if has_binary("pigz"): + args_tar += ["--use-compress-program", "pigz"] + else: + args_tar += ["-z"] + # Excluded paths if conf["exclude"]: for exclude_path in conf["exclude"].split(","): @@ -143,21 +176,22 @@ def backup(profile, conf, force=False): args_tar.append("--exclude") args_tar.append(exclude_path) - args_tar += ['-zcv', './'] - args_curl = ['curl', '-v', '-XPUT', '--data-binary', '@-', '{}new_backup?proto=archive&name={}&keep={}'. - format(DATADB_HTTP_API, profile, conf["keep"])] + args_tar += ['-cv', './'] + tar_dir = normpath(conf["dir"]) + '/' + print("Tar call in {}: {}".format(args_tar, tar_dir)) - print("Tar backup call: {} | {}".format(' '.join(args_tar), ' '.join(args_curl))) + tar = subprocess.Popen(args_tar, stdout=subprocess.PIPE, cwd=tar_dir) - compress = subprocess.Popen(args_tar, stdout=subprocess.PIPE, cwd=normpath(conf["dir"]) + '/') - upload = subprocess.Popen(args_curl, stdin=compress.stdout) + put_url = '{}new_backup?proto=archive&name={}&keep={}'.format(DATADB_HTTP_API, profile, conf["keep"]) + print("Putting to: {}".format(put_url)) - compress.wait() - upload.wait() - # TODO: convert to pure python? + upload = put(put_url, data=WrappedStdout(tar.stdout)) + if upload.status_code != 200: + raise Exception("Upload failed with code: {}".format(upload.status_code)) - assert compress.returncode == 0, "Could not create archive" - assert upload.returncode == 0, "Could not upload archive" + tar.wait() + if tar.returncode != 0: + raise Exception("Tar process exited with nonzero code {}".format(tar.returncode)) def status(profile, conf): @@ -180,6 +214,18 @@ def shell_exec(cmd, workdir='/tmp/'): subprocess.Popen(cmd, shell=True, cwd=workdir).wait() +def has_binary(name): + """ + Check if the passed command is available + :return: boolean + """ + try: + subprocess.check_call(['which', name], stdout=subprocess.DEVNULL) + except subprocess.CalledProcessError: + return False + return True + + def main(): """ Excepts a config file at /etc/datadb.ini. Example: @@ -302,7 +348,7 @@ def main(): if not args.no_pre_exec and config[args.profile]['export_preexec']: shell_exec(config[args.profile]['export_preexec']) - backup(args.profile, config[args.profile]) + backup(args.profile, config[args.profile], force=args.force) if not args.no_post_exec and config[args.profile]['export_postexec']: shell_exec(config[args.profile]['export_postexec'])