basic uploading
This commit is contained in:
parent
1ff5ddf63f
commit
5eda113bb4
|
@ -1,8 +1,14 @@
|
|||
import os
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
import logging
|
||||
import subprocess
|
||||
import requests
|
||||
from backupdb2.misc import load_cli_config, tabulate
|
||||
from threading import Thread
|
||||
from backupdb2.misc import load_cli_config, tabulate, tabulate_dict, has_binary, get_tarcmd, \
|
||||
tar_scan_errors, WrappedStdout
|
||||
from backupdb2.common import LOCKFILE
|
||||
|
||||
|
||||
class BackupdbClient(object):
|
||||
|
@ -20,6 +26,9 @@ class BackupdbClient(object):
|
|||
def post(self, url, **params):
|
||||
return self.do("post", url, **params)
|
||||
|
||||
def put(self, url, **params):
|
||||
return self.do("put", url, **params)
|
||||
|
||||
def delete(self, url, **params):
|
||||
return self.do("delete", url, **params)
|
||||
|
||||
|
@ -37,6 +46,9 @@ class BackupdbClient(object):
|
|||
def list_dates(self, backup, namespace="default"):
|
||||
return self.get("dates", params=dict(namespace=namespace, backup=backup)).json()
|
||||
|
||||
def upload(self, stream, backup, namespace="default"):
|
||||
return self.post("upload", params=dict(namespace=namespace, name=backup), data=WrappedStdout(stream))
|
||||
|
||||
# def create_user(self, username, password):
|
||||
# return self.post("user", data={"username": username,
|
||||
# "password_hash": pwhash(password)})
|
||||
|
@ -104,7 +116,69 @@ def cmd_backup(args, parser, config, client):
|
|||
"""
|
||||
Create a new backup - requires it be defined in the config file
|
||||
"""
|
||||
pass
|
||||
backup_config = config["backups"][args.backup]
|
||||
|
||||
# Refuse to backup if a lockfile created by this cli is not present
|
||||
if not os.path.exists(os.path.join(backup_config["dir"], LOCKFILE)) and not args.force:
|
||||
print("Error: data is missing (Use --force?)")
|
||||
return 1
|
||||
|
||||
# stream tar/gz to backup server
|
||||
|
||||
args_tar = []
|
||||
|
||||
if has_binary("ionice"):
|
||||
args_tar += ['ionice', '-c', '3']
|
||||
|
||||
args_tar += ['nice', '-n', '19']
|
||||
args_tar += [get_tarcmd(),
|
||||
f'--exclude={LOCKFILE}',
|
||||
'--warning=no-file-changed',
|
||||
'--warning=no-file-removed',
|
||||
'--warning=no-file-ignored',
|
||||
'--warning=no-file-shrank']
|
||||
|
||||
# Use pigz if available (Parallel gzip - http://zlib.net/pigz/)
|
||||
if has_binary("pigz"):
|
||||
args_tar += ["--use-compress-program", "pigz"]
|
||||
else:
|
||||
args_tar += ["-z"]
|
||||
|
||||
# Excluded paths
|
||||
if backup_config["exclude"]:
|
||||
for exclude_path in backup_config["exclude"].split(","):
|
||||
if exclude_path:
|
||||
args_tar.append("--exclude")
|
||||
args_tar.append(exclude_path)
|
||||
|
||||
args_tar += ['-cv', './']
|
||||
tar_dir = os.path.normpath(backup_config["dir"]) + '/'
|
||||
tar = subprocess.Popen(args_tar, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=tar_dir)
|
||||
|
||||
tar_errors = []
|
||||
error_scanner = Thread(target=tar_scan_errors, args=(tar.stderr, tar_errors), daemon=True)
|
||||
error_scanner.start()
|
||||
|
||||
response = client.upload(tar.stdout, args.backup, args.namespace)
|
||||
|
||||
tar.wait()
|
||||
error_scanner.join()
|
||||
|
||||
if response.status_code != 200:
|
||||
print("Error: upload failed with code: {}".format(response.status_code))
|
||||
print(response.text)
|
||||
return 1
|
||||
|
||||
if tar.returncode != 0 and tar_errors > 0:
|
||||
print("Error: tar process exited with nonzero code: {}.".format(tar.returncode))
|
||||
print("Tar errors: \n {}".format("\n ".join(tar_errors)))
|
||||
return 1
|
||||
#TODO call delete api if we detected tar errors
|
||||
#TODO calculate sha256 on the fly and verify in response
|
||||
#TODO also call delete api on ctrl-c too early
|
||||
|
||||
print("\nbackup complete!\n")
|
||||
tabulate_dict(response.json())
|
||||
|
||||
|
||||
def cmd_restore(args, parser, config, client):
|
||||
|
@ -141,6 +215,7 @@ def get_args():
|
|||
|
||||
p_backup = sp_action.add_parser("backup", help="backup action")
|
||||
p_backup.set_defaults(func=cmd_backup)
|
||||
p_backup.add_argument("--force", action="store_true", help="force backup operation if lockfile not found")
|
||||
p_backup.add_argument("-n", "--namespace", default="default", help="parent namespace backup to")
|
||||
p_backup.add_argument("backup", help="backup to make")
|
||||
|
||||
|
@ -152,6 +227,8 @@ def get_args():
|
|||
|
||||
p_restore = sp_action.add_parser("restore", help="restore action")
|
||||
p_restore.set_defaults(func=cmd_restore)
|
||||
p_restore.add_argument("--force", help="force restore operation if destination data already exists",
|
||||
action="store_true", )
|
||||
p_restore.add_argument("-n", "--namespace", default="default", help="parent namespace download from")
|
||||
p_restore.add_argument("backup", help="backup to download")
|
||||
p_restore.add_argument("date", help="date of backup to download")
|
||||
|
@ -163,7 +240,18 @@ def main():
|
|||
logging.basicConfig(level=logging.INFO)
|
||||
args, parser = get_args()
|
||||
|
||||
logging.basicConfig(
|
||||
# level=logging.DEBUG if args.debug else logging.INFO,
|
||||
format="%(asctime)-15s %(levelname)-8s %(filename)s:%(lineno)d %(message)s"
|
||||
)
|
||||
# logging.getLogger("botocore").setLevel(logging.ERROR)
|
||||
# logging.getLogger("urllib3").setLevel(logging.ERROR)
|
||||
|
||||
config = load_cli_config(args.config)
|
||||
client = BackupdbClient(args.server or config["options"].get("server"))
|
||||
|
||||
args.func(args, parser, config, client)
|
||||
sys.exit(args.func(args, parser, config, client) or 0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
@ -2,3 +2,5 @@ MAX_QUEUED_CHUNKS = 5 # max size of pre-upload file chunk queue
|
|||
MAX_PARALLEL_UPLOADS = 10 # max number of uploads happening in parallel
|
||||
# memory usage will be the sum of the above numbers times the chunk size
|
||||
CHUNK_SIZE = 1024 * 1024 * 10 # 10 MB
|
||||
|
||||
LOCKFILE = ".datadb.lock"
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import os
|
||||
import subprocess
|
||||
import logging
|
||||
from configparser import ConfigParser
|
||||
|
||||
|
@ -132,6 +133,7 @@ def validate_section(section_name, d, required_keys):
|
|||
|
||||
|
||||
def tabulate(rows, headers):
|
||||
rows = [list(row) for row in list(rows)]
|
||||
lengths = [0] * len(headers)
|
||||
for row in rows + [headers]:
|
||||
for i, c in enumerate(row):
|
||||
|
@ -149,3 +151,64 @@ def tabulate(rows, headers):
|
|||
print(c + " " * (lengths[i] - len(c)) + " ", end="")
|
||||
print()
|
||||
print()
|
||||
|
||||
|
||||
def tabulate_dict(d):
|
||||
tabulate(d.items(), headers=["property", "value"])
|
||||
|
||||
|
||||
def get_tarcmd():
|
||||
return "gtar" if has_binary("gtar") else "tar"
|
||||
|
||||
|
||||
def has_binary(name):
|
||||
"""
|
||||
Check if the passed command is available
|
||||
:return: boolean
|
||||
"""
|
||||
try:
|
||||
subprocess.check_call(['which', name], stdout=subprocess.DEVNULL)
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def tar_scan_errors(stream, error_list):
|
||||
"""
|
||||
Read and print lines from a stream, appending messages that look like errors to error_list
|
||||
Tar does not have an option to ignore file-removed errors. The warnings can be hidden but even with
|
||||
--ignore-failed-read, file-removed errors cause a non-zero exit. So, hide the warnings we don't care about
|
||||
using --warnings=no-xxx and scan output for unknown messages, assuming anything found is bad.
|
||||
"""
|
||||
for line in stream:
|
||||
line = line.decode("UTF-8").strip()
|
||||
if not line.startswith("./"):
|
||||
if line not in error_list:
|
||||
error_list.append(line)
|
||||
logging.info(line)
|
||||
|
||||
|
||||
class WrappedStdout(object):
|
||||
BUFFSIZE = 1024 * 1024
|
||||
|
||||
"""
|
||||
Requests will call tell() on the file-like stdout stream if the tell attribute exists. However subprocess'
|
||||
stdout stream (_io.BufferedReader) does not support this (raises OSError: [Errno 29] Illegal seek).
|
||||
If the tell attribute is missing, requests will fall back to simply iterating on the file-like object,
|
||||
so, we support only the iterable interface
|
||||
"""
|
||||
def __init__(self, stdout):
|
||||
self.stdout = stdout
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
data = self.stdout.read(self.BUFFSIZE)
|
||||
if not data:
|
||||
logging.info("end of stream")
|
||||
raise StopIteration()
|
||||
return data
|
||||
|
||||
def close(self):
|
||||
self.stdout.close()
|
||||
|
|
Loading…
Reference in New Issue