resticbackup/resticbackup/cli.py

165 lines
6.0 KiB
Python

import os
import argparse
import subprocess
from socket import getfqdn
from resticbackup.types import ClientConfig, load_config, load_base_config, list_configs
from resticbackup.lib import ExecWrapper, init_ok, die, pdie, checkp, get_retention_args, get_newest_snapshot
def cmd_backup(args, parser):
config = load_config(args.name)
# the tags in both backup_tags and retention_tags will be added to the backup
backup_tags = {
"host": getfqdn()
}
# the tags in retention_tags will be used when pruning backups
retention_tags = {
"name": args.name,
}
all_tags = dict(retention_tags)
all_tags.update(backup_tags)
# perform pre/post-exec
# post-exec is executed if the backup fails OR if any pre-exec commands fail
# pre-exec commands failing will stop the backup from being executed
os.chdir(config.backup.path)
with ExecWrapper(post=config.backup.backup_postexec):
with ExecWrapper(pre=config.backup.backup_preexec):
proc = config.run(["init"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, _ = proc.communicate()
stdout = stdout.decode()
if not init_ok(stdout):
raise Exception(stdout)
# run backup
backup_cmd = ["backup", "./"]
for k, v in all_tags.items():
backup_cmd.extend(["--tag", "{}={}".format(k, v)])
for entry in config.backup.exclude:
if entry.startswith("/"):
entry = os.path.join(config.backup.path, entry[1:])
backup_cmd.extend(["--exclude", entry])
checkp(config.run(backup_cmd))
# perform retention
retention_args = get_retention_args(config.backup.schedule)
if not retention_args:
return
forget_cmd = ["forget", "--group-by", "tags"] + retention_args
for k, v in retention_tags.items():
forget_cmd.extend(["--tag", "{}={}".format(k, v)])
checkp(config.run(forget_cmd))
# perform junk deletion
return pdie(config.run(["prune"]))
def cmd_restore(args, parser):
config = load_config(args.name)
try:
if os.listdir(config.backup.path):
return die("output dir has files present - refusing to restore")
except FileNotFoundError:
return die("output dir '{}' does not exist".format(config.backup.path))
snapshot = get_newest_snapshot(args.name, config)
if not snapshot:
return die("no snapshots found")
os.chdir(config.backup.path)
with ExecWrapper(post=config.backup.restore_postexec):
with ExecWrapper(pre=config.backup.restore_preexec):
return pdie(config.run(["restore", snapshot["id"], "-t", "./"]))
def cmd_retrieve(args, parser):
# this is the same as cmd_restore except for how the config is formed and no execs
config = ClientConfig.load(load_base_config(), {"repo": args.repo, "path": args.outdir})
try:
if os.listdir(config.backup.path):
return die("output dir has files present - refusing to restore")
except FileNotFoundError:
return die("output dir does not exist")
snapshot = get_newest_snapshot(args.name, config)
if not snapshot:
return die("no snapshots found")
return pdie(config.run(["restore", snapshot["id"], "-t", args.outdir]))
def cmd_list(args, parser):
if args.name:
config = load_config(args.name)
cmd = ["snapshots", "--group-by", "tags"]
retention_tags = {
"name": args.name,
}
for k, v in retention_tags.items():
cmd.extend(["--tag", "{}={}".format(k, v)])
return pdie(config.run(cmd))
for entry in list_configs():
print(entry)
def cmd_exec(args, parser):
checkp(load_config(args.name).run(args.args))
def main():
parser = argparse.ArgumentParser(description="restic wrapper")
# parser.add_argument('-n', '--no-exec', action='store_true', help='do not run pre/post-exec commands') # TODO
# parser.add_argument('-b', '--no-pre-exec', action='store_true', help='do not run pre-exec commands') # TODO
# parser.add_argument('-m', '--no-post-exec', action='store_true', help='do not run post-exec commands') # TODO
sp_action = parser.add_subparsers(dest="action", help="action to take")
p_backup = sp_action.add_parser("backup", help="take a backup")
p_backup.set_defaults(func=cmd_backup)
p_backup.add_argument("name", help="name of backup to execute")
# p_backup.add_argument("-c", "--print", action="store_true", help="just print the restic commands instead") # TODO
# p_backup.add_argument("-e", "--env", action="store_true", help="just print the environment variables instead") # TODO
p_restore = sp_action.add_parser("restore", help="restore a backup")
p_restore.set_defaults(func=cmd_restore)
p_restore.add_argument("name", help="name of backup to restore")
# p_restore.add_argument("snapshot", nargs="?", help="id of snapshot to restore") # TODO
# p_restore.add_argument('--no-snapshots-ok', action='store_true',
# help='don\'t return an error if no snapshots are available to be restored')
p_exec = sp_action.add_parser("exec", help="execute a restic command")
p_exec.set_defaults(func=cmd_exec)
p_exec.add_argument("name", help="name of backup to configure restic for")
p_exec.add_argument("args", nargs="+", help="command arguments")
p_list = sp_action.add_parser("list", help="list configs or snapshots")
p_list.set_defaults(func=cmd_list)
p_list.add_argument("name", nargs="?", help="name of config to list snapshots for")
p_retrieve = sp_action.add_parser("retrieve", help="download arbitrary snapshots")
p_retrieve.set_defaults(func=cmd_retrieve)
p_retrieve.add_argument("repo", help="repo path")
p_retrieve.add_argument("name", help="backup name")
p_retrieve.add_argument("outdir", help="file path to write to")
args = parser.parse_args()
args.func(args, parser)
if __name__ == '__main__':
main()