datadb-cli/datadb/datadb.py

419 lines
15 KiB
Python
Raw Permalink Normal View History

2015-12-26 22:12:36 -08:00
#!/usr/bin/env python3
import argparse
from configparser import ConfigParser
from urllib.parse import urlparse
from os.path import normpath, join, exists
from os import chmod, chown, stat, environ
2015-12-26 22:12:36 -08:00
from enum import Enum
import subprocess
2017-04-22 00:06:12 -07:00
from requests import get, put, head
2017-04-24 00:19:18 -07:00
from threading import Thread
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
2016-04-19 23:19:58 -07:00
SSH_KEY_PATH = environ["DATADB_KEYPATH"] if "DATADB_KEYPATH" in environ else '/root/.ssh/datadb.key'
2019-05-02 20:22:58 -07:00
RSYNC_DEFAULT_ARGS = ['rsync', '-avzr', '--exclude=.datadb.lock', '--whole-file', '--one-file-system', '--delete']
SSH_CMD = 'ssh -i {} -p {} -o StrictHostKeyChecking=no'
2015-12-26 22:12:36 -08:00
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
class SyncStatus(Enum):
"Data is on local disk"
DATA_AVAILABLE = 1
"Data is not on local disk"
DATA_MISSING = 2
# Requests will call tell() on the file-like stdout stream if the tell attribute exists. However subprocess'
# stdout stream (_io.BufferedReader) does not support this (raises OSError: [Errno 29] Illegal seek).
# If the tell attribute is missing, requests will fall back to simply iterating on the file-like object,
# so, we support only the iterable interface
class WrappedStdout(object):
BUFFSIZE = 256 * 1024
def __init__(self, stdout):
self.stdout = stdout
def __iter__(self):
return self
def __next__(self):
data = self.stdout.read(self.BUFFSIZE)
if not data:
raise StopIteration()
return data
def close(self):
self.stdout.close()
2019-05-02 20:45:05 -07:00
def restore(api_url, profile, conf, force=False): # remote_uri, local_dir, identity='/root/.ssh/datadb.key'
2015-12-26 22:12:36 -08:00
"""
Restore data from datadb
"""
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
# Sanity check: If the lockfile exists we assume the data is already there, so we wouldn't want to call rsync again
# as it would wipe out local changes. This can be overridden with --force
if not ((status(profile, conf) == SyncStatus.DATA_MISSING) or force):
raise Exception("Data already exists (Use --force?)")
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
original_perms = stat(conf["dir"])
dest = urlparse(conf["uri"])
2017-04-22 00:06:12 -07:00
2019-05-02 20:45:05 -07:00
status_code = head(api_url + 'get_backup', params={'proto': dest.scheme, 'name': profile}).status_code
2015-12-29 23:25:23 -08:00
if status_code == 404:
print("Connected to datadb, but datasource '{}' doesn't exist. Exiting".format(profile))
# TODO: special exit code >1 to indicate this?
return
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
if dest.scheme == 'rsync':
args = RSYNC_DEFAULT_ARGS[:]
2019-05-02 20:22:58 -07:00
args += ['-e', SSH_CMD.format(SSH_KEY_PATH, dest.port or 22)]
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
# Request backup server to prepare the backup, the returned dir is what we sync from
2019-05-02 20:45:05 -07:00
rsync_path = get(api_url + 'get_backup', params={'proto': 'rsync', 'name': profile}).text.rstrip()
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
# Add rsync source path
2019-05-02 20:15:57 -07:00
args.append('nexus@{}:{}'.format(dest.hostname, normpath(rsync_path) + '/'))
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
# Add local dir
2017-04-22 00:06:12 -07:00
args.append(normpath(conf["dir"]) + '/')
2015-12-26 22:12:36 -08:00
print("Rsync restore call: {}".format(' '.join(args)))
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
subprocess.check_call(args)
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
elif dest.scheme == 'archive':
# http request backup server
# download tarball
2019-05-02 20:45:05 -07:00
args_curl = ['curl', '-s', '-v', '-XGET', '{}get_backup?proto=archive&name={}'.format(api_url, profile)]
2015-12-26 22:12:36 -08:00
# unpack
2019-05-02 20:15:57 -07:00
args_tar = [get_tarcmd(), 'zxv', '-C', normpath(conf["dir"]) + '/']
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
print("Tar restore call: {} | {}".format(' '.join(args_curl), ' '.join(args_tar)))
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
dl = subprocess.Popen(args_curl, stdout=subprocess.PIPE)
extract = subprocess.Popen(args_tar, stdin=dl.stdout)
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
dl.wait()
extract.wait()
2017-04-22 00:06:12 -07:00
# TODO: convert to pure python?
if dl.returncode != 0:
raise Exception("Could not download archive")
if extract.returncode != 0:
raise Exception("Could not extract archive")
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
# Restore original permissions on data dir
# TODO store these in conf file
chmod(conf["dir"], original_perms.st_mode)
chown(conf["dir"], original_perms.st_uid, original_perms.st_gid)
# TODO apply other permissions
2019-05-02 20:45:05 -07:00
def backup(api_url, profile, conf, force=False):
2015-12-26 22:12:36 -08:00
"""
Backup data to datadb
"""
2017-04-22 00:06:12 -07:00
# Sanity check: If the lockfile doesn't exist we assume the data is missing, so we wouldn't want to call rsync
2015-12-26 22:12:36 -08:00
# again as it would wipe out the backup.
if not ((status(profile, conf) == SyncStatus.DATA_AVAILABLE) or force):
raise Exception("Data is missing (Use --force?)")
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
dest = urlparse(conf["uri"])
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
if dest.scheme == 'rsync':
args = RSYNC_DEFAULT_ARGS[:]
2019-05-02 20:22:58 -07:00
args += ['-e', SSH_CMD.format(SSH_KEY_PATH, dest.port or 22)]
# args += ["--port", str(dest.port or 22)]
2017-04-22 00:06:12 -07:00
2016-04-19 23:19:58 -07:00
# Excluded paths
if conf["exclude"]:
for exclude_path in conf["exclude"].split(","):
if not exclude_path == "":
args.append("--exclude")
args.append(exclude_path)
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
# Add local dir
2017-04-22 00:06:12 -07:00
args.append(normpath(conf["dir"]) + '/')
new_backup_params = {'proto': 'rsync',
'name': profile,
'keep': conf["keep"]}
if conf["inplace"]:
new_backup_params["inplace"] = 1
2015-12-26 22:12:36 -08:00
# Hit backupdb via http to retreive absolute path of rsync destination of remote server
2019-05-02 20:45:05 -07:00
rsync_path, token = get(api_url + 'new_backup', params=new_backup_params).json()
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
# Add rsync source path
args.append(normpath('nexus@{}:{}'.format(dest.hostname, rsync_path)) + '/')
2017-04-22 00:06:12 -07:00
# print("Rsync backup call: {}".format(' '.join(args)))
2016-03-21 22:58:22 -07:00
try:
subprocess.check_call(args)
except subprocess.CalledProcessError as cpe:
2017-04-22 00:06:12 -07:00
if cpe.returncode not in [0, 24]: # ignore partial transfer due to vanishing files on our end
2016-03-21 22:58:22 -07:00
raise
2017-04-22 00:06:12 -07:00
# confirm completion if backup wasnt already in place
if not conf["inplace"]:
2019-05-02 20:45:05 -07:00
put(api_url + 'new_backup', params={'proto': 'rsync', 'name': profile, 'token': token,
2017-04-22 00:06:12 -07:00
'keep': conf["keep"]})
2015-12-26 22:12:36 -08:00
elif dest.scheme == 'archive':
# CD to local source dir
# tar+gz data and stream to backup server
2019-05-02 20:15:57 -07:00
args_tar = []
if has_binary("ionice"):
args_tar += ['ionice', '-c', '3']
args_tar += ['nice', '-n', '19']
args_tar += [get_tarcmd(),
'--exclude=.datadb.lock',
'--warning=no-file-changed',
'--warning=no-file-removed',
'--warning=no-file-ignored',
'--warning=no-file-shrank']
2017-04-22 00:06:12 -07:00
# Use pigz if available (Parallel gzip - http://zlib.net/pigz/)
if has_binary("pigz"):
args_tar += ["--use-compress-program", "pigz"]
else:
args_tar += ["-z"]
2016-04-19 23:19:58 -07:00
# Excluded paths
if conf["exclude"]:
for exclude_path in conf["exclude"].split(","):
if not exclude_path == "":
args_tar.append("--exclude")
args_tar.append(exclude_path)
2016-04-19 23:19:58 -07:00
args_tar += ['-cv', './']
tar_dir = normpath(conf["dir"]) + '/'
print("Tar call in {}: {}".format(args_tar, tar_dir))
2017-04-22 00:06:12 -07:00
tar = subprocess.Popen(args_tar, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=tar_dir)
2017-04-22 00:06:12 -07:00
2019-05-02 20:45:05 -07:00
put_url = '{}new_backup?proto=archive&name={}&keep={}'.format(api_url, profile, conf["keep"])
print("Putting to: {}".format(put_url))
2017-04-22 00:06:12 -07:00
2017-04-24 00:19:18 -07:00
tar_errors = []
error_scanner = Thread(target=scan_errors, args=(tar.stderr, tar_errors), daemon=True)
error_scanner.start()
upload = put(put_url, data=WrappedStdout(tar.stdout))
if upload.status_code != 200:
print(upload.text)
raise Exception("Upload failed with code: {}".format(upload.status_code))
2017-04-22 00:06:12 -07:00
tar.wait()
2017-04-24 00:19:18 -07:00
error_scanner.join()
if tar.returncode != 0 and len(tar_errors) > 0:
raise Exception("Tar process exited with nonzero code {}. Tar errors: \n {}".
format(tar.returncode, "\n ".join(tar_errors)))
2015-12-26 22:12:36 -08:00
2017-04-24 00:19:18 -07:00
def scan_errors(stream, error_list):
"""
Read and print lines from a stream, appending messages that look like errors to error_list
"""
# Tar does not have an option to ignore file-removed errors. The warnings can be hidden but even with
# --ignore-failed-read, file-removed errors cause a non-zero exit. So, hide the warnings we don't care about
# using --warnings=no-xxx and scan output for unknown messages, assuming anything found is bad.
for line in stream:
line = line.decode("UTF-8").strip()
if not line.startswith("./"):
if line not in error_list:
error_list.append(line)
print(line)
2015-12-26 22:12:36 -08:00
def status(profile, conf):
"""
Check status of local dir - if the lock file is in place, we assume the data is there
"""
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
lockfile = join(conf["dir"], '.datadb.lock')
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
if exists(lockfile):
return SyncStatus.DATA_AVAILABLE
return SyncStatus.DATA_MISSING
2015-12-27 22:52:34 -08:00
def shell_exec(cmd, workdir='/tmp/'):
"""
Execute a command in shell, wait for exit.
"""
print("Calling: {}".format(cmd))
subprocess.Popen(cmd, shell=True, cwd=workdir).wait()
2019-05-02 20:15:57 -07:00
def get_tarcmd():
return "gtar" if has_binary("gtar") else "tar"
def has_binary(name):
"""
Check if the passed command is available
:return: boolean
"""
try:
subprocess.check_call(['which', name], stdout=subprocess.DEVNULL)
except subprocess.CalledProcessError:
return False
return True
2015-12-26 22:12:36 -08:00
def main():
"""
Excepts a config file at /etc/datadb.ini. Example:
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
----------------------------
[gyfd]
uri=
dir=
keep=
auth=
restore_preexec=
restore_postexec=
export_preexec=
export_postexec=
2016-04-19 23:19:58 -07:00
exclude=
2015-12-26 22:12:36 -08:00
----------------------------
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
Each [section] defines one backup task.
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
Fields:
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
*uri*: Destination/source for this instance's data. Always fits the following format:
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
<procotol>://<server>/<backup name>
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
Valid protocols:
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
rsync - rsync executed over SSH. The local dir will be synced with the remote backup dir using rsync.
2017-04-22 00:06:12 -07:00
archive - tar archives transported over HTTP. The local dir will be tarred and PUT to the backup server's
remote dir via http.
2015-12-26 22:12:36 -08:00
*dir*: Local dir for this backup
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
*keep*: Currently unused. Number of historical copies to keep on remote server
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
*auth*: Currently unused. Username:password string to use while contacting the datadb via HTTP.
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
*restore_preexec*: Shell command to exec before pulling/restoring data
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
*restore_postexec*: Shell command to exec after pulling/restoring data
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
*export_preexec*: Shell command to exec before pushing data
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
*export_postexec*: Shell command to exec after pushing data
2017-04-22 00:06:12 -07:00
*exclude*: if the underlying transport method supports excluding paths, a comma separated list of paths to exclude.
Applies to backup operations only.
*inplace*: rsync only. if enabled, the server will keep only a single copy that you will rsync over. intended for
single copies of LARGE datasets. overrides "keep".
2015-12-26 22:12:36 -08:00
"""
2017-04-22 00:06:12 -07:00
required_conf_params = ['dir', 'uri']
2017-04-22 00:06:12 -07:00
conf_params = {'export_preexec': None,
'exclude': None,
'keep': 5,
'restore_preexec': None,
'restore_postexec': None,
'auth': '',
'export_postexec': None,
'inplace': False}
conf_path = environ["DATADB_CONF"] if "DATADB_CONF" in environ else "/etc/datadb.ini"
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
# Load profiles
config = ConfigParser()
config.read(conf_path)
2017-04-22 00:06:12 -07:00
config = {section: {k: config[section][k] for k in config[section]} for section in config.sections()}
2019-05-02 20:45:05 -07:00
global_config = {}
for conf_k, conf_dict in config.items():
2019-05-02 20:45:05 -07:00
if conf_k == "_backupdb":
global_config = conf_dict
continue
for expect_param, expect_default in conf_params.items():
if expect_param not in conf_dict.keys():
conf_dict[expect_param] = expect_default
for expect_param in required_conf_params:
if expect_param not in conf_dict.keys():
raise Exception("Required parameter {} missing for profile {}".format(expect_param, conf_k))
2015-12-26 22:12:36 -08:00
parser = argparse.ArgumentParser(description="Backupdb Agent depends on config: /etc/datadb.ini")
2017-04-22 00:06:12 -07:00
parser.add_argument('-f', '--force', default=False, action='store_true',
help='force restore operation if destination data already exists')
2019-05-02 20:45:05 -07:00
parser.add_argument('--http-api', help="http endpoint", default=environ.get('DATADB_HTTP_API'))
parser.add_argument('-n', '--no-exec', default=False, action='store_true', help='don\'t run pre/post-exec commands')
parser.add_argument('-b', '--no-pre-exec', default=False, action='store_true', help='don\'t run pre-exec commands')
2017-04-22 00:06:12 -07:00
parser.add_argument('-m', '--no-post-exec', default=False, action='store_true',
help='don\'t run post-exec commands')
2015-12-26 22:12:36 -08:00
parser.add_argument('profile', type=str, choices=config.keys(), help='Profile to restore')
2017-04-22 00:06:12 -07:00
# parser.add_argument('-i', '--identity',
2015-12-26 22:12:36 -08:00
# help='Ssh keyfile to use', type=str, default='/root/.ssh/datadb.key')
2017-04-22 00:06:12 -07:00
# parser.add_argument('-r', '--remote',
2015-12-26 22:12:36 -08:00
# help='Remote server (rsync://...)', type=str, required=True)
2017-04-22 00:06:12 -07:00
# parser.add_argument('-l', '--local_dir',
2015-12-26 22:12:36 -08:00
# help='Local path', type=str, required=True)
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
subparser_modes = parser.add_subparsers(dest='mode', help='modes (only "rsync")')
2017-04-22 00:06:12 -07:00
2019-05-02 20:45:05 -07:00
subparser_modes.add_parser('backup', help='backup to datastore')
subparser_modes.add_parser('restore', help='restore from datastore')
subparser_modes.add_parser('status', help='get info for profile')
2017-04-22 00:06:12 -07:00
2019-05-02 20:45:05 -07:00
args = parser.parse_args()
2017-04-22 00:06:12 -07:00
2019-05-02 20:45:05 -07:00
if args.http_api:
api = args.http_api
else:
api = global_config.get("http_api", None)
2017-04-22 00:06:12 -07:00
2019-05-02 20:45:05 -07:00
if not api:
parser.error("--http-api is requried")
2017-04-22 00:06:12 -07:00
if args.no_exec:
args.no_pre_exec = True
args.no_post_exec = True
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
if args.mode == 'restore':
if not args.no_pre_exec and config[args.profile]['restore_preexec']:
2015-12-27 22:57:24 -08:00
shell_exec(config[args.profile]['restore_preexec'])
2017-04-22 00:06:12 -07:00
2019-05-02 20:45:05 -07:00
restore(api, args.profile, config[args.profile], force=args.force)
2017-04-22 00:06:12 -07:00
if not args.no_post_exec and config[args.profile]['restore_postexec']:
2015-12-27 22:57:24 -08:00
shell_exec(config[args.profile]['restore_postexec'])
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
elif args.mode == 'backup':
if not args.no_pre_exec and config[args.profile]['export_preexec']:
2015-12-27 22:57:24 -08:00
shell_exec(config[args.profile]['export_preexec'])
2017-04-22 00:06:12 -07:00
try:
2019-05-02 20:45:05 -07:00
backup(api, args.profile, config[args.profile], force=args.force)
finally:
if not args.no_post_exec and config[args.profile]['export_postexec']:
shell_exec(config[args.profile]['export_postexec'])
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
elif args.mode == 'status':
info = status(args.profile, config[args.profile])
print(SyncStatus(info))
2017-04-22 00:06:12 -07:00
2015-12-26 22:12:36 -08:00
else:
parser.print_usage()
2019-05-02 20:45:05 -07:00
2015-12-26 22:12:36 -08:00
if __name__ == '__main__':
main()