2023-10-01 11:48:05 -07:00

172 lines
4.8 KiB

import os
import sys
import json
import time
import subprocess
def fprint(*args, **kwargs):
print + flush output streams. Helps keep logs correctly interleaved when piping to a file
print(*args, **kwargs)
def die(msg, rc=1):
def pdie(popen):
def checkp(popen):
if popen.returncode != 0:
die("command failed", rc=popen.returncode)
return popen
def runp(popen):
if popen.returncode != 0:
raise Exception("command failed: rc={}".format(popen.returncode)) # TODO use subprocess.check_call
return popen
class ExecWrapper(object):
def __init__(self, pre=None, post=None):
self.pre = pre or [] = post or []
def __enter__(self):
for command in self.pre:
fprint("+", command)
subprocess.check_call(command, shell=True)
def __exit__(self, exc_type, exc_value, exc_tb):
for command in
fprint("+", command)
subprocess.check_call(command, shell=True)
def init_ok(message):
Restic doesn't have a way to check if a repo is initialized or otherwise cleanly initialize it. So, we try to
initialize it and read the error message. Either a sucess message or an error suggesting it is already initialized
will cause this function to return true.
Success message:
created restic repository xxxxx at xxxxx\n\n
Error message:
Fatal: create key in repository at xxxx failed: repository master key and config already initialized\n\n'
message = message.strip()
return message.startswith("created restic repository ") or (
message.startswith("Fatal: create key in repository at ") and
message.endswith("repository master key and config already initialized"))
def get_newest_snapshot(name, config):
#TODO just return "latest" lol restic supports that term to restore the latest snapshot
snapshout_groups = get_snapshot_groups(name, config)
if len(snapshout_groups) > 1:
return die("found {} groups, but only support 1".format(len(snapshout_groups)))
if not snapshout_groups:
return None
snapshots = snapshout_groups[0]['snapshots']
snapshots.sort(key=lambda x: x["time"], reverse=True)
if not snapshots:
return None
return snapshots[0]
def get_snapshot_groups(name, config):
cmd = ["snapshots", "--group-by", "tags", "--json"]
retention_tags = {
"name": name,
for k, v in retention_tags.items():
cmd.extend(["--tag", "{}={}".format(k, v)])
p = checkp(, stdout=subprocess.PIPE))
stdout, _ = p.communicate()
return json.loads(stdout.decode())
def get_retention_args(schedule):
given a retention schedule, return restic command arguments needed to make it so
mode = schedule.get("function")
if mode is None: # default is to just keep stuff lol
return None
if mode == "forever":
# do not perform deletions
return None
elif mode == "keep":
# just keep the last X snapshots
return ["--keep-last", str(mode["count"])]
elif mode == "cycle":
# keep the last $last backups.
# keep a daily backup after that for the past $daily days.
# keep a weekly backup after that for the past $weekly weeks.
# keep a monthly backup after that for the past $monthly months.
cmd = []
last = schedule.get("last")
if last is not None:
cmd.extend(["--keep-last", str(last)])
daily = schedule.get("daily")
if daily is not None:
cmd.extend(["--keep-daily", str(daily)])
weekly = schedule.get("weekly")
if weekly is not None:
cmd.extend(["--keep-weekly", str(weekly)])
monthly = schedule.get("monthly")
if monthly is not None:
cmd.extend(["--keep-monthly", str(monthly)])
return cmd or None
raise Exception("unknown retention function {}".format(mode))
def update_statefile(path, success):
the statefile contains a json list of timestamps of the last 10 times the backup ran, noting success or failure
d = os.path.dirname(path)
if not os.path.exists(d):
with open(path) as f:
state = json.load(f)
except (json.decoder.JSONDecodeError, FileNotFoundError):
state = []
"time": int(time.time()),
"success": bool(success),
state.sort(key=lambda x: x["time"])
state = state[-10:]
with open(path, "w") as f:
json.dump(state, f, indent=4, sort_keys=True)