add dataclass for config

This commit is contained in:
dave 2022-12-14 00:04:07 -08:00
parent 03b96b2790
commit 9249301092

View File

@ -4,8 +4,8 @@ import json
import argparse
import subprocess
from socket import getfqdn
from dataclasses import dataclass
from urllib.parse import urlparse
from dataclasses import dataclass, field
CFG_DIR = os.environ.get("RESTICBACKUP_CONFIG_DIR", "/etc/resticbackup.d")
@ -17,11 +17,11 @@ def die(msg, rc=1):
sys.exit(rc)
def load_configs():
def load_config(name):
# load configs from /etc/resticbackup.d/
# we have special handling for /etc/resticbackup.d/main.json
# return (main_config, dict(config_name=>config))
main = None
main_config = None
configs = {}
for fname in os.listdir(CFG_DIR):
@ -36,41 +36,73 @@ def load_configs():
fname = fname.split(".")[0]
if fname == "main":
main = data
main_config = data
else:
configs[fname] = data
if not main:
if not main_config:
die("could not find main config")
return ClientConfig.from_json(main), configs
try:
backup_config = configs[name]
except KeyError:
raise Exception("backup config not found for: '{}'".format(name))
return ClientConfig.load(main_config, backup_config)
@dataclass
class BackupConfig:
path: str
repo: str
schedule: dict = field(default_factory=dict)
exclude: list[str] = field(default_factory=list)
backup_preexec: list[str] = field(default_factory=list)
backup_postexec: list[str] = field(default_factory=list)
restore_preexec: list[str] = field(default_factory=list)
restore_postexec: list[str] = field(default_factory=list)
@staticmethod
def load(data: dict) -> 'BackupConfig':
return BackupConfig(
path=data['path'],
repo=data['repo'],
schedule=data.get('schedule', {}),
exclude=data.get('exclude', []),
backup_preexec=data.get('backup_preexec', []),
backup_postexec=data.get('backup_postexec', []),
restore_preexec=data.get('restore_preexec', []),
restore_postexec=data.get('restore_postexec', []),
)
@dataclass
class ClientConfig:
server_type: str
server: str
uri: str
secret: str
backup: BackupConfig
@property
def repo(self): # port ignored
return "{}:{}://{}{}".format(self.server_type, self.server.scheme, self.server.hostname, self.server.path)
return "{}:{}://{}/{}".format(self.server_type, self.uri.scheme, self.uri.hostname, self.backup.repo)
@property
def env(self):
return {
"AWS_ACCESS_KEY_ID": self.server.username,
"AWS_SECRET_ACCESS_KEY": self.server.password,
"AWS_ACCESS_KEY_ID": self.uri.username,
"AWS_SECRET_ACCESS_KEY": self.uri.password,
"RESTIC_PASSWORD": self.secret,
"RESTIC_REPOSITORY": self.repo,
}
@staticmethod
def from_json(data) -> "ClientConfig":
server_type, server = data["server"].split(":", 1)
def load(main, backup) -> "ClientConfig":
backup = BackupConfig.load(backup)
server_type, server = main["server"].split(":", 1)
if server_type != "s3":
raise Exception("unsupported server type: {}".format(server_type))
return ClientConfig(server_type=server_type, server=urlparse(server), secret=data["secret"])
return ClientConfig(server_type=server_type, uri=urlparse(server), secret=main["secret"], backup=backup)
class ExecWrapper(object):
@ -114,12 +146,7 @@ def init_ok(message):
def cmd_backup(args, parser):
config, backup_configs = load_configs()
try:
backup_config = backup_configs[args.name]
except KeyError:
die("invalid backup name: '{}'".format(args.name))
config = load_config(args.name)
env = dict(os.environ)
env.update(**config.env)
@ -132,9 +159,9 @@ def cmd_backup(args, parser):
# perform pre/post-exec
# post-exec is executed if the backup fails OR if any pre-exec commands fail
# pre-exec commands failing will stop the backup from being executed
os.chdir(backup_config["path"])
with ExecWrapper(post=backup_config.get("backup_postexec", [])):
with ExecWrapper(pre=backup_config.get("backup_preexec", [])):
os.chdir(config.backup.path)
with ExecWrapper(post=config.backup.backup_postexec):
with ExecWrapper(pre=config.backup.backup_preexec):
# init the repo
init_cmd = [RESTIC_BIN, "init"]
proc = subprocess.Popen(init_cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
@ -147,20 +174,16 @@ def cmd_backup(args, parser):
backup_cmd = [RESTIC_BIN, "backup", "./"]
for k, v in backup_tags.items():
backup_cmd.extend(["--tag", "{}={}".format(k, v)])
for entry in backup_config.get("exclude"):
for entry in config.backup.exclude:
if entry.startswith("/"):
entry = os.path.join(backup_config["path"], entry[1:])
entry = os.path.join(config.backup.path, entry[1:])
backup_cmd.extend(["--exclude", entry])
proc = subprocess.Popen(backup_cmd, env=env)
proc.communicate(input=None, timeout=None)
# perform retention
schedule = backup_config.get('schedule')
if not schedule:
return
retention_args = get_retention_args(schedule)
retention_args = get_retention_args(config.backup.schedule)
if not retention_args:
return
@ -176,7 +199,9 @@ def get_retention_args(schedule):
"""
given a retention schedule, return restic command(s) needed to make it so
"""
mode = schedule["function"]
mode = schedule.get("function")
if mode is None: # default is to just keep stuff lol
return None
if mode == "forever":
# do not perform deletions
return None
@ -215,12 +240,7 @@ def cmd_restore(args, parser):
def cmd_exec(args, parser):
config, backup_configs = load_configs()
try:
backup_config = backup_configs[args.name]
except KeyError:
die("invalid backup name: '{}'".format(args.name))
config = load_config(args.name)
env = dict(os.environ)
env.update(**config.env)